module.exports = Server var bencode = require('bencode') var debug = require('debug')('bittorrent-tracker') var dgram = require('dgram') var EventEmitter = require('events').EventEmitter var http = require('http') var inherits = require('inherits') var series = require('run-series') var string2compact = require('string2compact') var WebSocketServer = require('ws').Server var common = require('./lib/common') var Swarm = require('./lib/swarm') var parseHttpRequest = require('./lib/parse_http') var parseUdpRequest = require('./lib/parse_udp') var parseWebSocketRequest = require('./lib/parse_websocket') inherits(Server, EventEmitter) /** * BitTorrent tracker server. * * HTTP service which responds to GET requests from torrent clients. Requests include * metrics from clients that help the tracker keep overall statistics about the torrent. * Responses include a peer list that helps the client participate in the torrent. * * @param {Object} opts options object * @param {Number} opts.interval tell clients to announce on this interval (ms) * @param {Number} opts.trustProxy trust 'x-forwarded-for' header from reverse proxy * @param {boolean} opts.http start an http server? (default: true) * @param {boolean} opts.udp start a udp server? (default: true) * @param {boolean} opts.ws start a websocket server? (default: true) * @param {function} opts.filter black/whitelist fn for disallowing/allowing torrents */ function Server (opts) { var self = this if (!(self instanceof Server)) return new Server(opts) EventEmitter.call(self) if (!opts) opts = {} debug('new server %s', JSON.stringify(opts)) self._intervalMs = opts.interval ? opts.interval : 10 * 60 * 1000 // 10 min self._trustProxy = !!opts.trustProxy if (typeof opts.filter === 'function') self._filter = opts.filter self.listening = false self.torrents = {} self.http = null self.udp4 = null self.udp6 = null self.ws = null // start an http tracker unless the user explictly says no if (opts.http !== false) { self.http = http.createServer() self.http.on('request', self.onHttpRequest.bind(self)) self.http.on('error', self._onError.bind(self)) self.http.on('listening', onListening) } // start a udp tracker unless the user explicitly says no if (opts.udp !== false) { self.udp4 = self.udp = dgram.createSocket('udp4') self.udp4.on('message', self.onUdpRequest.bind(self)) self.udp4.on('error', self._onError.bind(self)) self.udp4.on('listening', onListening) self.udp6 = dgram.createSocket('udp6') self.udp6.on('message', self.onUdpRequest.bind(self)) self.udp6.on('error', self._onError.bind(self)) self.udp6.on('listening', onListening) } // start a websocket tracker (for WebTorrent) unless the user explicitly says no if (opts.ws === true) { if (!self.http) { self.http = http.createServer() self.http.on('error', self._onError.bind(self)) self.http.on('listening', onListening) } self.ws = new WebSocketServer({ server: self.http }) self.ws.on('error', self._onError.bind(self)) self.ws.on('connection', self.onWebSocketConnection.bind(self)) } var num = !!self.http + !!self.udp4 + !!self.udp6 function onListening () { num -= 1 if (num === 0) { self.listening = true debug('listening') self.emit('listening') } } } Server.prototype._onError = function (err) { var self = this self.emit('error', err) } Server.prototype.listen = function (/* port, hostname, onlistening */) { var self = this var lastArg = arguments[arguments.length - 1] if (typeof lastArg === 'function') self.once('listening', lastArg) var port = toNumber(arguments[0]) || arguments[0] || 0 var hostname = typeof arguments[1] !== 'function' ? arguments[1] : undefined if (self.listening) throw new Error('server already listening') debug('listen %o %o', port, hostname) function isObject (obj) { return typeof obj === 'object' && obj !== null } var httpPort = isObject(port) ? (port.http || 0) : port var udpPort = isObject(port) ? (port.udp || 0) : port // binding to :: only receives IPv4 connections if the bindv6only sysctl is set 0, // which is the default on many operating systems var httpHostname = isObject(hostname) ? hostname.http : (hostname || '::') var udp4Hostname = isObject(hostname) ? hostname.udp : hostname var udp6Hostname = isObject(hostname) ? hostname.udp6 : hostname if (self.http) self.http.listen(httpPort, httpHostname) if (self.udp4) self.udp4.bind(udpPort, udp4Hostname) if (self.udp6) self.udp6.bind(udpPort, udp6Hostname) } Server.prototype.close = function (cb) { var self = this if (!cb) cb = function () {} debug('close') self.listening = false if (self.udp4) { try { self.udp4.close() } catch (err) {} } if (self.udp6) { try { self.udp6.close() } catch (err) {} } if (self.ws) { try { self.ws.close() } catch (err) {} } if (self.http) self.http.close(cb) else cb(null) } Server.prototype.getSwarm = function (infoHash, params) { var self = this if (!params) params = {} if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') if (self._filter && !self._filter(infoHash, params)) return null var swarm = self.torrents[infoHash] if (!swarm) swarm = self.torrents[infoHash] = new Swarm(infoHash, self) return swarm } Server.prototype.onHttpRequest = function (req, res, opts) { var self = this if (!opts) opts = {} opts.trustProxy = opts.trustProxy || self._trustProxy var params try { params = parseHttpRequest(req, opts) params.httpReq = req params.httpRes = res } catch (err) { res.end(bencode.encode({ 'failure reason': err.message })) // even though it's an error for the client, it's just a warning for the server. // don't crash the server because a client sent bad data :) self.emit('warning', err) return } self._onRequest(params, function (err, response) { if (err) { self.emit('warning', err) response = { 'failure reason': err.message } } if (!self.listening) return delete response.action // only needed for UDP encoding res.end(bencode.encode(response)) if (params.action === common.ACTIONS.ANNOUNCE) { self.emit(common.EVENT_NAMES[params.event], params.addr) } }) } Server.prototype.onUdpRequest = function (msg, rinfo) { var self = this var params try { params = parseUdpRequest(msg, rinfo) } catch (err) { self.emit('warning', err) // Do not reply for parsing errors return } self._onRequest(params, function (err, response) { if (err) { self.emit('warning', err) response = { action: common.ACTIONS.ERROR, 'failure reason': err.message } } if (!self.listening) return response.transactionId = params.transactionId response.connectionId = params.connectionId var buf = makeUdpPacket(response) try { var udp = (rinfo.family === 'IPv4') ? self.udp4 : self.udp6 udp.send(buf, 0, buf.length, rinfo.port, rinfo.address) } catch (err) { self.emit('warning', err) } if (params.action === common.ACTIONS.ANNOUNCE) { self.emit(common.EVENT_NAMES[params.event], params.addr) } }) } Server.prototype.onWebSocketConnection = function (socket) { var self = this socket.peerId = null // as hex socket.infoHashes = [] socket.onSend = self._onWebSocketSend.bind(self, socket) socket.on('message', self._onWebSocketRequest.bind(self, socket)) socket.on('error', self._onWebSocketError.bind(self, socket)) socket.on('close', self._onWebSocketClose.bind(self, socket)) } Server.prototype._onWebSocketRequest = function (socket, params) { var self = this try { params = parseWebSocketRequest(socket, params) } catch (err) { socket.send(JSON.stringify({ 'failure reason': err.message, info_hash: common.hexToBinary(params.info_hash) }), socket.onSend) // even though it's an error for the client, it's just a warning for the server. // don't crash the server because a client sent bad data :) self.emit('warning', err) return } if (!socket.peerId) socket.peerId = params.peer_id // as hex self._onRequest(params, function (err, response) { if (err) { self.emit('warning', err) response = { 'failure reason': err.message } } if (!self.listening) return if (socket.infoHashes.indexOf(params.info_hash) === -1) { socket.infoHashes.push(params.info_hash) } var peers = response.peers delete response.peers response.interval = self._intervalMs response.info_hash = common.hexToBinary(params.info_hash) socket.send(JSON.stringify(response), socket.onSend) debug('sent response %s to %s', JSON.stringify(response), params.peer_id) if (params.numwant) { debug('got offers %s from %s', JSON.stringify(params.offers), params.peer_id) debug('got %s peers from swarm %s', peers.length, params.info_hash) peers.forEach(function (peer, i) { peer.socket.send(JSON.stringify({ offer: params.offers[i].offer, offer_id: params.offers[i].offer_id, peer_id: common.hexToBinary(params.peer_id), info_hash: common.hexToBinary(params.info_hash) })) debug('sent offer to %s from %s', peer.peerId, params.peer_id) }) } if (params.answer) { debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) var swarm = self.getSwarm(params.info_hash, params) var toPeer = swarm.peers[params.to_peer_id] if (!toPeer) { return self.emit('warning', new Error('no peer with that `to_peer_id`')) } toPeer.socket.send(JSON.stringify({ answer: params.answer, offer_id: params.offer_id, peer_id: common.hexToBinary(params.peer_id), info_hash: common.hexToBinary(params.info_hash) })) debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) } if (params.action === common.ACTIONS.ANNOUNCE) { self.emit(common.EVENT_NAMES[params.event], params.peer_id) } }) } Server.prototype._onRequest = function (params, cb) { var self = this if (params && params.action === common.ACTIONS.CONNECT) { cb(null, { action: common.ACTIONS.CONNECT }) } else if (params && params.action === common.ACTIONS.ANNOUNCE) { self._onAnnounce(params, cb) } else if (params && params.action === common.ACTIONS.SCRAPE) { self._onScrape(params, cb) } else { cb(new Error('Invalid action')) } } Server.prototype._onAnnounce = function (params, cb) { var self = this var swarm = self.getSwarm(params.info_hash, params) if (swarm === null) return cb(new Error('disallowed info_hash')) if (!params.event || params.event === 'empty') params.event = 'update' swarm.announce(params, function (err, response) { if (err) return cb(err) if (!response.action) response.action = common.ACTIONS.ANNOUNCE if (!response.interval) response.interval = Math.ceil(self._intervalMs / 1000) if (params.compact === 1) { var peers = response.peers // Find IPv4 peers response.peers = string2compact(peers.filter(function (peer) { return common.IPV4_RE.test(peer.ip) }).map(function (peer) { return peer.ip + ':' + peer.port })) // Find IPv6 peers response.peers6 = string2compact(peers.filter(function (peer) { return common.IPV6_RE.test(peer.ip) }).map(function (peer) { return '[' + peer.ip + ']:' + peer.port })) } else if (params.compact === 0) { // IPv6 peers are not separate for non-compact responses response.peers = response.peers.map(function (peer) { return { 'peer id': peer.peerId, ip: peer.ip, port: peer.port } }) } // else, return full peer objects (used for websocket responses) cb(err, response) }) } Server.prototype._onScrape = function (params, cb) { var self = this if (params.info_hash == null) { // if info_hash param is omitted, stats for all torrents are returned // TODO: make this configurable! params.info_hash = Object.keys(self.torrents) } series(params.info_hash.map(function (infoHash) { var swarm = self.getSwarm(infoHash) return function (cb) { swarm.scrape(params, function (err, scrapeInfo) { cb(err, scrapeInfo && { infoHash: infoHash, complete: scrapeInfo.complete || 0, incomplete: scrapeInfo.incomplete || 0 }) }) } }), function (err, results) { if (err) return cb(err) var response = { action: common.ACTIONS.SCRAPE, files: {}, flags: { min_request_interval: Math.ceil(self._intervalMs / 1000) } } results.forEach(function (result) { response.files[common.hexToBinary(result.infoHash)] = { complete: result.complete, incomplete: result.incomplete, downloaded: result.complete // TODO: this only provides a lower-bound } }) cb(null, response) }) } function makeUdpPacket (params) { var packet switch (params.action) { case common.ACTIONS.CONNECT: packet = Buffer.concat([ common.toUInt32(common.ACTIONS.CONNECT), common.toUInt32(params.transactionId), params.connectionId ]) break case common.ACTIONS.ANNOUNCE: packet = Buffer.concat([ common.toUInt32(common.ACTIONS.ANNOUNCE), common.toUInt32(params.transactionId), common.toUInt32(params.interval), common.toUInt32(params.incomplete), common.toUInt32(params.complete), params.peers ]) break case common.ACTIONS.SCRAPE: var scrapeResponse = [ common.toUInt32(common.ACTIONS.SCRAPE), common.toUInt32(params.transactionId) ] for (var infoHash in params.files) { var file = params.files[infoHash] scrapeResponse.push( common.toUInt32(file.complete), common.toUInt32(file.downloaded), // TODO: this only provides a lower-bound common.toUInt32(file.incomplete) ) } packet = Buffer.concat(scrapeResponse) break case common.ACTIONS.ERROR: packet = Buffer.concat([ common.toUInt32(common.ACTIONS.ERROR), common.toUInt32(params.transactionId || 0), new Buffer(params['failure reason'], 'utf8') ]) break default: throw new Error('Action not implemented: ' + params.action) } return packet } Server.prototype._onWebSocketSend = function (socket, err) { var self = this if (err) self._onWebSocketError(socket, err) } Server.prototype._onWebSocketClose = function (socket) { var self = this if (!socket.peerId || !socket.infoHashes) return debug('websocket close') socket.infoHashes.forEach(function (infoHash) { var swarm = self.torrents[infoHash] if (swarm) { swarm.announce({ event: 'stopped', numwant: 0, peer_id: socket.peerId }, function () {}) } }) } Server.prototype._onWebSocketError = function (socket, err) { var self = this debug('websocket error %s', err.message || err) self.emit('warning', err) self._onWebSocketClose(socket) } function toNumber (x) { x = Number(x) return x >= 0 ? x : false }