From 02032d0536579b3d150588d4d0ee053393a186a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20W=C3=A4rting?= Date: Tue, 2 Oct 2018 18:26:35 +0200 Subject: [PATCH 1/5] lebab sever.js --- server.js | 1319 ++++++++++++++++++++++++++--------------------------- 1 file changed, 650 insertions(+), 669 deletions(-) diff --git a/server.js b/server.js index 4f90569..b9d7d6b 100644 --- a/server.js +++ b/server.js @@ -1,24 +1,19 @@ -module.exports = Server +const Buffer = require('safe-buffer').Buffer +const bencode = require('bencode') +const debug = require('debug')('bittorrent-tracker:server') +const dgram = require('dgram') +const EventEmitter = require('events').EventEmitter +const http = require('http') +const peerid = require('bittorrent-peerid') +const series = require('run-series') +const string2compact = require('string2compact') +const WebSocketServer = require('ws').Server -var Buffer = require('safe-buffer').Buffer -var bencode = require('bencode') -var debug = require('debug')('bittorrent-tracker:server') -var dgram = require('dgram') -var EventEmitter = require('events').EventEmitter -var http = require('http') -var inherits = require('inherits') -var peerid = require('bittorrent-peerid') -var series = require('run-series') -var string2compact = require('string2compact') -var WebSocketServer = require('ws').Server - -var common = require('./lib/common') -var Swarm = require('./lib/server/swarm') -var parseHttpRequest = require('./lib/server/parse-http') -var parseUdpRequest = require('./lib/server/parse-udp') -var parseWebSocketRequest = require('./lib/server/parse-websocket') - -inherits(Server, EventEmitter) +const common = require('./lib/common') +const Swarm = require('./lib/server/swarm') +const parseHttpRequest = require('./lib/server/parse-http') +const parseUdpRequest = require('./lib/server/parse-udp') +const parseWebSocketRequest = require('./lib/server/parse-websocket') /** * BitTorrent tracker server. @@ -36,745 +31,729 @@ inherits(Server, EventEmitter) * @param {boolean} opts.stats enable web-based statistics? (default: true) * @param {function} opts.filter black/whitelist fn for disallowing/allowing torrents */ -function Server (opts) { - var self = this - if (!(self instanceof Server)) return new Server(opts) - EventEmitter.call(self) - if (!opts) opts = {} +class Server extends EventEmitter { + constructor (opts = {}) { + super() + const self = this - debug('new server %s', JSON.stringify(opts)) + debug('new server %s', JSON.stringify(opts)) - self.intervalMs = opts.interval - ? opts.interval - : 10 * 60 * 1000 // 10 min + self.intervalMs = opts.interval + ? opts.interval + : 10 * 60 * 1000 // 10 min - self._trustProxy = !!opts.trustProxy - if (typeof opts.filter === 'function') self._filter = opts.filter + self._trustProxy = !!opts.trustProxy + if (typeof opts.filter === 'function') self._filter = opts.filter - self.peersCacheLength = opts.peersCacheLength - self.peersCacheTtl = opts.peersCacheTtl + self.peersCacheLength = opts.peersCacheLength + self.peersCacheTtl = opts.peersCacheTtl - self._listenCalled = false - self.listening = false - self.destroyed = false - self.torrents = {} + self._listenCalled = false + self.listening = false + self.destroyed = false + self.torrents = {} - self.http = null - self.udp4 = null - self.udp6 = null - self.ws = null + self.http = null + self.udp4 = null + self.udp6 = null + self.ws = null - // start an http tracker unless the user explictly says no - if (opts.http !== false) { - self.http = http.createServer() - self.http.on('error', function (err) { self._onError(err) }) - self.http.on('listening', onListening) - - // Add default http request handler on next tick to give user the chance to add - // their own handler first. Handle requests untouched by user's handler. - process.nextTick(function () { - self.http.on('request', function (req, res) { - if (res.headersSent) return - self.onHttpRequest(req, res) - }) - }) - } - - // start a udp tracker unless the user explicitly says no - if (opts.udp !== false) { - var isNode10 = /^v0.10./.test(process.version) - - self.udp4 = self.udp = dgram.createSocket( - isNode10 ? 'udp4' : { type: 'udp4', reuseAddr: true } - ) - self.udp4.on('message', function (msg, rinfo) { self.onUdpRequest(msg, rinfo) }) - self.udp4.on('error', function (err) { self._onError(err) }) - self.udp4.on('listening', onListening) - - self.udp6 = dgram.createSocket( - isNode10 ? 'udp6' : { type: 'udp6', reuseAddr: true } - ) - self.udp6.on('message', function (msg, rinfo) { self.onUdpRequest(msg, rinfo) }) - self.udp6.on('error', function (err) { self._onError(err) }) - self.udp6.on('listening', onListening) - } - - // start a websocket tracker (for WebTorrent) unless the user explicitly says no - if (opts.ws !== false) { - if (!self.http) { + // start an http tracker unless the user explictly says no + if (opts.http !== false) { self.http = http.createServer() - self.http.on('error', function (err) { self._onError(err) }) + self.http.on('error', err => { self._onError(err) }) self.http.on('listening', onListening) // Add default http request handler on next tick to give user the chance to add // their own handler first. Handle requests untouched by user's handler. - process.nextTick(function () { - self.http.on('request', function (req, res) { + process.nextTick(() => { + self.http.on('request', (req, res) => { if (res.headersSent) return - // For websocket trackers, we only need to handle the UPGRADE http method. - // Return 404 for all other request types. - res.statusCode = 404 - res.end('404 Not Found') + self.onHttpRequest(req, res) }) }) } - self.ws = new WebSocketServer({ - server: self.http, - perMessageDeflate: false, - clientTracking: false - }) - self.ws.address = function () { - return self.http.address() - } - self.ws.on('error', function (err) { self._onError(err) }) - self.ws.on('connection', function (socket, req) { - // Note: socket.upgradeReq was removed in ws@3.0.0, so re-add it. - // https://github.com/websockets/ws/pull/1099 - socket.upgradeReq = req - self.onWebSocketConnection(socket) - }) - } - if (opts.stats !== false) { - if (!self.http) { - self.http = http.createServer() - self.http.on('error', function (err) { self._onError(err) }) - self.http.on('listening', onListening) + // start a udp tracker unless the user explicitly says no + if (opts.udp !== false) { + const isNode10 = /^v0.10./.test(process.version) + + self.udp4 = self.udp = dgram.createSocket( + isNode10 ? 'udp4' : { type: 'udp4', reuseAddr: true } + ) + self.udp4.on('message', (msg, rinfo) => { self.onUdpRequest(msg, rinfo) }) + self.udp4.on('error', err => { self._onError(err) }) + self.udp4.on('listening', onListening) + + self.udp6 = dgram.createSocket( + isNode10 ? 'udp6' : { type: 'udp6', reuseAddr: true } + ) + self.udp6.on('message', (msg, rinfo) => { self.onUdpRequest(msg, rinfo) }) + self.udp6.on('error', err => { self._onError(err) }) + self.udp6.on('listening', onListening) } - // Http handler for '/stats' route - self.http.on('request', function (req, res) { - if (res.headersSent) return + // start a websocket tracker (for WebTorrent) unless the user explicitly says no + if (opts.ws !== false) { + if (!self.http) { + self.http = http.createServer() + self.http.on('error', err => { self._onError(err) }) + self.http.on('listening', onListening) - var infoHashes = Object.keys(self.torrents) - var activeTorrents = 0 - var allPeers = {} - - function countPeers (filterFunction) { - var count = 0 - var key - - for (key in allPeers) { - if (allPeers.hasOwnProperty(key) && filterFunction(allPeers[key])) { - count++ - } - } - - return count - } - - function groupByClient () { - var clients = {} - for (var key in allPeers) { - if (allPeers.hasOwnProperty(key)) { - var peer = allPeers[key] - - if (!clients[peer.client.client]) { - clients[peer.client.client] = {} - } - var client = clients[peer.client.client] - // If the client is not known show 8 chars from peerId as version - var version = peer.client.version || Buffer.from(peer.peerId, 'hex').toString().substring(0, 8) - if (!client[version]) { - client[version] = 0 - } - client[version]++ - } - } - return clients - } - - function printClients (clients) { - var html = '' - return html - } - - if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { - infoHashes.forEach(function (infoHash) { - var peers = self.torrents[infoHash].peers - var keys = peers.keys - if (keys.length > 0) activeTorrents++ - - keys.forEach(function (peerId) { - // Don't mark the peer as most recently used for stats - var peer = peers.peek(peerId) - if (peer == null) return // peers.peek() can evict the peer - - if (!allPeers.hasOwnProperty(peerId)) { - allPeers[peerId] = { - ipv4: false, - ipv6: false, - seeder: false, - leecher: false - } - } - - if (peer.ip.indexOf(':') >= 0) { - allPeers[peerId].ipv6 = true - } else { - allPeers[peerId].ipv4 = true - } - - if (peer.complete) { - allPeers[peerId].seeder = true - } else { - allPeers[peerId].leecher = true - } - - allPeers[peerId].peerId = peer.peerId - allPeers[peerId].client = peerid(peer.peerId) + // Add default http request handler on next tick to give user the chance to add + // their own handler first. Handle requests untouched by user's handler. + process.nextTick(() => { + self.http.on('request', (req, res) => { + if (res.headersSent) return + // For websocket trackers, we only need to handle the UPGRADE http method. + // Return 404 for all other request types. + res.statusCode = 404 + res.end('404 Not Found') }) }) + } + self.ws = new WebSocketServer({ + server: self.http, + perMessageDeflate: false, + clientTracking: false + }) + self.ws.address = () => { + return self.http.address() + } + self.ws.on('error', err => { self._onError(err) }) + self.ws.on('connection', (socket, req) => { + // Note: socket.upgradeReq was removed in ws@3.0.0, so re-add it. + // https://github.com/websockets/ws/pull/1099 + socket.upgradeReq = req + self.onWebSocketConnection(socket) + }) + } - var isSeederOnly = function (peer) { return peer.seeder && peer.leecher === false } - var isLeecherOnly = function (peer) { return peer.leecher && peer.seeder === false } - var isSeederAndLeecher = function (peer) { return peer.seeder && peer.leecher } - var isIPv4 = function (peer) { return peer.ipv4 } - var isIPv6 = function (peer) { return peer.ipv6 } + if (opts.stats !== false) { + if (!self.http) { + self.http = http.createServer() + self.http.on('error', err => { self._onError(err) }) + self.http.on('listening', onListening) + } - var stats = { - torrents: infoHashes.length, - activeTorrents: activeTorrents, - peersAll: Object.keys(allPeers).length, - peersSeederOnly: countPeers(isSeederOnly), - peersLeecherOnly: countPeers(isLeecherOnly), - peersSeederAndLeecher: countPeers(isSeederAndLeecher), - peersIPv4: countPeers(isIPv4), - peersIPv6: countPeers(isIPv6), - clients: groupByClient() + // Http handler for '/stats' route + self.http.on('request', (req, res) => { + if (res.headersSent) return + + const infoHashes = Object.keys(self.torrents) + let activeTorrents = 0 + const allPeers = {} + + function countPeers (filterFunction) { + let count = 0 + let key + + for (key in allPeers) { + if (allPeers.hasOwnProperty(key) && filterFunction(allPeers[key])) { + count++ + } + } + + return count } - if (req.url === '/stats.json' || req.headers['accept'] === 'application/json') { - res.write(JSON.stringify(stats)) - res.end() - } else if (req.url === '/stats') { - res.end('

' + stats.torrents + ' torrents (' + stats.activeTorrents + ' active)

\n' + - '

Connected Peers: ' + stats.peersAll + '

\n' + - '

Peers Seeding Only: ' + stats.peersSeederOnly + '

\n' + - '

Peers Leeching Only: ' + stats.peersLeecherOnly + '

\n' + - '

Peers Seeding & Leeching: ' + stats.peersSeederAndLeecher + '

\n' + - '

IPv4 Peers: ' + stats.peersIPv4 + '

\n' + - '

IPv6 Peers: ' + stats.peersIPv6 + '

\n' + - '

Clients:

\n' + - printClients(stats.clients) - ) + function groupByClient () { + const clients = {} + for (const key in allPeers) { + if (allPeers.hasOwnProperty(key)) { + const peer = allPeers[key] + + if (!clients[peer.client.client]) { + clients[peer.client.client] = {} + } + const client = clients[peer.client.client] + // If the client is not known show 8 chars from peerId as version + const version = peer.client.version || Buffer.from(peer.peerId, 'hex').toString().substring(0, 8) + if (!client[version]) { + client[version] = 0 + } + client[version]++ + } + } + return clients } + + function printClients (clients) { + let html = '' + return html + } + + if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { + infoHashes.forEach(infoHash => { + const peers = self.torrents[infoHash].peers + const keys = peers.keys + if (keys.length > 0) activeTorrents++ + + keys.forEach(peerId => { + // Don't mark the peer as most recently used for stats + const peer = peers.peek(peerId) + if (peer == null) return // peers.peek() can evict the peer + + if (!allPeers.hasOwnProperty(peerId)) { + allPeers[peerId] = { + ipv4: false, + ipv6: false, + seeder: false, + leecher: false + } + } + + if (peer.ip.includes(':')) { + allPeers[peerId].ipv6 = true + } else { + allPeers[peerId].ipv4 = true + } + + if (peer.complete) { + allPeers[peerId].seeder = true + } else { + allPeers[peerId].leecher = true + } + + allPeers[peerId].peerId = peer.peerId + allPeers[peerId].client = peerid(peer.peerId) + }) + }) + + const isSeederOnly = peer => { return peer.seeder && peer.leecher === false } + const isLeecherOnly = peer => { return peer.leecher && peer.seeder === false } + const isSeederAndLeecher = peer => { return peer.seeder && peer.leecher } + const isIPv4 = peer => { return peer.ipv4 } + const isIPv6 = peer => { return peer.ipv6 } + + const stats = { + torrents: infoHashes.length, + activeTorrents, + peersAll: Object.keys(allPeers).length, + peersSeederOnly: countPeers(isSeederOnly), + peersLeecherOnly: countPeers(isLeecherOnly), + peersSeederAndLeecher: countPeers(isSeederAndLeecher), + peersIPv4: countPeers(isIPv4), + peersIPv6: countPeers(isIPv6), + clients: groupByClient() + } + + if (req.url === '/stats.json' || req.headers['accept'] === 'application/json') { + res.write(JSON.stringify(stats)) + res.end() + } else if (req.url === '/stats') { + res.end(`

${stats.torrents} torrents (${stats.activeTorrents} active)

\n

Connected Peers: ${stats.peersAll}

\n

Peers Seeding Only: ${stats.peersSeederOnly}

\n

Peers Leeching Only: ${stats.peersLeecherOnly}

\n

Peers Seeding & Leeching: ${stats.peersSeederAndLeecher}

\n

IPv4 Peers: ${stats.peersIPv4}

\n

IPv6 Peers: ${stats.peersIPv6}

\n

Clients:

\n${printClients(stats.clients)}` + ) + } + } + }) + } + + let num = !!self.http + !!self.udp4 + !!self.udp6 + function onListening () { + num -= 1 + if (num === 0) { + self.listening = true + debug('listening') + self.emit('listening') + } + } + } + + _onError (err) { + const self = this + self.emit('error', err) + } + + listen (...args) /* port, hostname, onlistening */{ + const self = this + + if (self._listenCalled || self.listening) throw new Error('server already listening') + self._listenCalled = true + + const lastArg = args[args.length - 1] + if (typeof lastArg === 'function') self.once('listening', lastArg) + + const port = toNumber(args[0]) || args[0] || 0 + const hostname = typeof args[1] !== 'function' ? args[1] : undefined + + debug('listen (port: %o hostname: %o)', port, hostname) + + function isObject (obj) { + return typeof obj === 'object' && obj !== null + } + + const httpPort = isObject(port) ? (port.http || 0) : port + const udpPort = isObject(port) ? (port.udp || 0) : port + + // binding to :: only receives IPv4 connections if the bindv6only sysctl is set 0, + // which is the default on many operating systems + const httpHostname = isObject(hostname) ? hostname.http : hostname + const udp4Hostname = isObject(hostname) ? hostname.udp : hostname + const udp6Hostname = isObject(hostname) ? hostname.udp6 : hostname + + if (self.http) self.http.listen(httpPort, httpHostname) + if (self.udp4) self.udp4.bind(udpPort, udp4Hostname) + if (self.udp6) self.udp6.bind(udpPort, udp6Hostname) + } + + close (cb = noop) { + debug('close') + + this.listening = false + this.destroyed = true + + if (this.udp4) { + try { + this.udp4.close() + } catch (err) {} + } + + if (this.udp6) { + try { + this.udp6.close() + } catch (err) {} + } + + if (this.ws) { + try { + this.ws.close() + } catch (err) {} + } + + if (this.http) this.http.close(cb) + else cb(null) + } + + createSwarm (infoHash, cb) { + if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') + + process.nextTick(() => { + const swarm = this.torrents[infoHash] = new Server.Swarm(infoHash, this) + cb(null, swarm) + }) + } + + getSwarm (infoHash, cb) { + if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') + + process.nextTick(() => { + cb(null, this.torrents[infoHash]) + }) + } + + onHttpRequest (req, res, opts = {}) { + opts.trustProxy = opts.trustProxy || this._trustProxy + + let params + try { + params = parseHttpRequest(req, opts) + params.httpReq = req + params.httpRes = res + } catch (err) { + res.end(bencode.encode({ + 'failure reason': err.message + })) + + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) + this.emit('warning', err) + return + } + + this._onRequest(params, (err, response) => { + if (err) { + this.emit('warning', err) + response = { + 'failure reason': err.message + } + } + if (this.destroyed) return res.end() + + delete response.action // only needed for UDP encoding + res.end(bencode.encode(response)) + + if (params.action === common.ACTIONS.ANNOUNCE) { + this.emit(common.EVENT_NAMES[params.event], params.addr, params) } }) } - var num = !!self.http + !!self.udp4 + !!self.udp6 - function onListening () { - num -= 1 - if (num === 0) { - self.listening = true - debug('listening') - self.emit('listening') - } - } -} - -Server.Swarm = Swarm - -Server.prototype._onError = function (err) { - var self = this - self.emit('error', err) -} - -Server.prototype.listen = function (/* port, hostname, onlistening */) { - var self = this - - if (self._listenCalled || self.listening) throw new Error('server already listening') - self._listenCalled = true - - var lastArg = arguments[arguments.length - 1] - if (typeof lastArg === 'function') self.once('listening', lastArg) - - var port = toNumber(arguments[0]) || arguments[0] || 0 - var hostname = typeof arguments[1] !== 'function' ? arguments[1] : undefined - - debug('listen (port: %o hostname: %o)', port, hostname) - - function isObject (obj) { - return typeof obj === 'object' && obj !== null - } - - var httpPort = isObject(port) ? (port.http || 0) : port - var udpPort = isObject(port) ? (port.udp || 0) : port - - // binding to :: only receives IPv4 connections if the bindv6only sysctl is set 0, - // which is the default on many operating systems - var httpHostname = isObject(hostname) ? hostname.http : hostname - var udp4Hostname = isObject(hostname) ? hostname.udp : hostname - var udp6Hostname = isObject(hostname) ? hostname.udp6 : hostname - - if (self.http) self.http.listen(httpPort, httpHostname) - if (self.udp4) self.udp4.bind(udpPort, udp4Hostname) - if (self.udp6) self.udp6.bind(udpPort, udp6Hostname) -} - -Server.prototype.close = function (cb) { - var self = this - if (!cb) cb = noop - debug('close') - - self.listening = false - self.destroyed = true - - if (self.udp4) { + onUdpRequest (msg, rinfo) { + let params try { - self.udp4.close() - } catch (err) {} - } - - if (self.udp6) { - try { - self.udp6.close() - } catch (err) {} - } - - if (self.ws) { - try { - self.ws.close() - } catch (err) {} - } - - if (self.http) self.http.close(cb) - else cb(null) -} - -Server.prototype.createSwarm = function (infoHash, cb) { - var self = this - if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') - - process.nextTick(function () { - var swarm = self.torrents[infoHash] = new Server.Swarm(infoHash, self) - cb(null, swarm) - }) -} - -Server.prototype.getSwarm = function (infoHash, cb) { - var self = this - if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') - - process.nextTick(function () { - cb(null, self.torrents[infoHash]) - }) -} - -Server.prototype.onHttpRequest = function (req, res, opts) { - var self = this - if (!opts) opts = {} - opts.trustProxy = opts.trustProxy || self._trustProxy - - var params - try { - params = parseHttpRequest(req, opts) - params.httpReq = req - params.httpRes = res - } catch (err) { - res.end(bencode.encode({ - 'failure reason': err.message - })) - - // even though it's an error for the client, it's just a warning for the server. - // don't crash the server because a client sent bad data :) - self.emit('warning', err) - return - } - - self._onRequest(params, function (err, response) { - if (err) { - self.emit('warning', err) - response = { - 'failure reason': err.message - } - } - if (self.destroyed) return res.end() - - delete response.action // only needed for UDP encoding - res.end(bencode.encode(response)) - - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.addr, params) - } - }) -} - -Server.prototype.onUdpRequest = function (msg, rinfo) { - var self = this - - var params - try { - params = parseUdpRequest(msg, rinfo) - } catch (err) { - self.emit('warning', err) - // Do not reply for parsing errors - return - } - - self._onRequest(params, function (err, response) { - if (err) { - self.emit('warning', err) - response = { - action: common.ACTIONS.ERROR, - 'failure reason': err.message - } - } - if (self.destroyed) return - - response.transactionId = params.transactionId - response.connectionId = params.connectionId - - var buf = makeUdpPacket(response) - - try { - var udp = (rinfo.family === 'IPv4') ? self.udp4 : self.udp6 - udp.send(buf, 0, buf.length, rinfo.port, rinfo.address) + params = parseUdpRequest(msg, rinfo) } catch (err) { - self.emit('warning', err) + this.emit('warning', err) + // Do not reply for parsing errors + return } - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.addr, params) + this._onRequest(params, (err, response) => { + if (err) { + this.emit('warning', err) + response = { + action: common.ACTIONS.ERROR, + 'failure reason': err.message + } + } + if (this.destroyed) return + + response.transactionId = params.transactionId + response.connectionId = params.connectionId + + const buf = makeUdpPacket(response) + + try { + const udp = (rinfo.family === 'IPv4') ? this.udp4 : this.udp6 + udp.send(buf, 0, buf.length, rinfo.port, rinfo.address) + } catch (err) { + this.emit('warning', err) + } + + if (params.action === common.ACTIONS.ANNOUNCE) { + this.emit(common.EVENT_NAMES[params.event], params.addr, params) + } + }) + } + + onWebSocketConnection (socket, opts) { + const self = this + if (!opts) opts = {} + opts.trustProxy = opts.trustProxy || self._trustProxy + + socket.peerId = null // as hex + socket.infoHashes = [] // swarms that this socket is participating in + socket.onSend = err => { + self._onWebSocketSend(socket, err) } - }) -} -Server.prototype.onWebSocketConnection = function (socket, opts) { - var self = this - if (!opts) opts = {} - opts.trustProxy = opts.trustProxy || self._trustProxy + socket.onMessageBound = params => { + self._onWebSocketRequest(socket, opts, params) + } + socket.on('message', socket.onMessageBound) - socket.peerId = null // as hex - socket.infoHashes = [] // swarms that this socket is participating in - socket.onSend = function (err) { - self._onWebSocketSend(socket, err) + socket.onErrorBound = err => { + self._onWebSocketError(socket, err) + } + socket.on('error', socket.onErrorBound) + + socket.onCloseBound = () => { + self._onWebSocketClose(socket) + } + socket.on('close', socket.onCloseBound) } - socket.onMessageBound = function (params) { - self._onWebSocketRequest(socket, opts, params) - } - socket.on('message', socket.onMessageBound) + _onWebSocketRequest (socket, opts, params) { + const self = this - socket.onErrorBound = function (err) { - self._onWebSocketError(socket, err) - } - socket.on('error', socket.onErrorBound) - - socket.onCloseBound = function () { - self._onWebSocketClose(socket) - } - socket.on('close', socket.onCloseBound) -} - -Server.prototype._onWebSocketRequest = function (socket, opts, params) { - var self = this - - try { - params = parseWebSocketRequest(socket, opts, params) - } catch (err) { - socket.send(JSON.stringify({ - 'failure reason': err.message - }), socket.onSend) - - // even though it's an error for the client, it's just a warning for the server. - // don't crash the server because a client sent bad data :) - self.emit('warning', err) - return - } - - if (!socket.peerId) socket.peerId = params.peer_id // as hex - - self._onRequest(params, function (err, response) { - if (self.destroyed || socket.destroyed) return - if (err) { + try { + params = parseWebSocketRequest(socket, opts, params) + } catch (err) { socket.send(JSON.stringify({ - action: params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape', - 'failure reason': err.message, - info_hash: common.hexToBinary(params.info_hash) + 'failure reason': err.message }), socket.onSend) + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) self.emit('warning', err) return } - response.action = params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape' + if (!socket.peerId) socket.peerId = params.peer_id // as hex - var peers - if (response.action === 'announce') { - peers = response.peers - delete response.peers + self._onRequest(params, (err, response) => { + if (self.destroyed || socket.destroyed) return + if (err) { + socket.send(JSON.stringify({ + action: params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape', + 'failure reason': err.message, + info_hash: common.hexToBinary(params.info_hash) + }), socket.onSend) - if (socket.infoHashes.indexOf(params.info_hash) === -1) { - socket.infoHashes.push(params.info_hash) + self.emit('warning', err) + return } - response.info_hash = common.hexToBinary(params.info_hash) + response.action = params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape' - // WebSocket tracker should have a shorter interval – default: 2 minutes - response.interval = Math.ceil(self.intervalMs / 1000 / 5) - } + let peers + if (response.action === 'announce') { + peers = response.peers + delete response.peers - // Skip sending update back for 'answer' announce messages – not needed - if (!params.answer) { - socket.send(JSON.stringify(response), socket.onSend) - debug('sent response %s to %s', JSON.stringify(response), params.peer_id) - } - - if (Array.isArray(params.offers)) { - debug('got %s offers from %s', params.offers.length, params.peer_id) - debug('got %s peers from swarm %s', peers.length, params.info_hash) - peers.forEach(function (peer, i) { - peer.socket.send(JSON.stringify({ - action: 'announce', - offer: params.offers[i].offer, - offer_id: params.offers[i].offer_id, - peer_id: common.hexToBinary(params.peer_id), - info_hash: common.hexToBinary(params.info_hash) - }), peer.socket.onSend) - debug('sent offer to %s from %s', peer.peerId, params.peer_id) - }) - } - - if (params.answer) { - debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) - - self.getSwarm(params.info_hash, function (err, swarm) { - if (self.destroyed) return - if (err) return self.emit('warning', err) - if (!swarm) { - return self.emit('warning', new Error('no swarm with that `info_hash`')) - } - // Mark the destination peer as recently used in cache - var toPeer = swarm.peers.get(params.to_peer_id) - if (!toPeer) { - return self.emit('warning', new Error('no peer with that `to_peer_id`')) + if (!socket.infoHashes.includes(params.info_hash)) { + socket.infoHashes.push(params.info_hash) } - toPeer.socket.send(JSON.stringify({ - action: 'announce', - answer: params.answer, - offer_id: params.offer_id, - peer_id: common.hexToBinary(params.peer_id), - info_hash: common.hexToBinary(params.info_hash) - }), toPeer.socket.onSend) - debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) + response.info_hash = common.hexToBinary(params.info_hash) + // WebSocket tracker should have a shorter interval – default: 2 minutes + response.interval = Math.ceil(self.intervalMs / 1000 / 5) + } + + // Skip sending update back for 'answer' announce messages – not needed + if (!params.answer) { + socket.send(JSON.stringify(response), socket.onSend) + debug('sent response %s to %s', JSON.stringify(response), params.peer_id) + } + + if (Array.isArray(params.offers)) { + debug('got %s offers from %s', params.offers.length, params.peer_id) + debug('got %s peers from swarm %s', peers.length, params.info_hash) + peers.forEach((peer, i) => { + peer.socket.send(JSON.stringify({ + action: 'announce', + offer: params.offers[i].offer, + offer_id: params.offers[i].offer_id, + peer_id: common.hexToBinary(params.peer_id), + info_hash: common.hexToBinary(params.info_hash) + }), peer.socket.onSend) + debug('sent offer to %s from %s', peer.peerId, params.peer_id) + }) + } + + if (params.answer) { + debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) + + self.getSwarm(params.info_hash, (err, swarm) => { + if (self.destroyed) return + if (err) return self.emit('warning', err) + if (!swarm) { + return self.emit('warning', new Error('no swarm with that `info_hash`')) + } + // Mark the destination peer as recently used in cache + const toPeer = swarm.peers.get(params.to_peer_id) + if (!toPeer) { + return self.emit('warning', new Error('no peer with that `to_peer_id`')) + } + + toPeer.socket.send(JSON.stringify({ + action: 'announce', + answer: params.answer, + offer_id: params.offer_id, + peer_id: common.hexToBinary(params.peer_id), + info_hash: common.hexToBinary(params.info_hash) + }), toPeer.socket.onSend) + debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) + + done() + }) + } else { done() - }) - } else { - done() - } - - function done () { - // emit event once the announce is fully "processed" - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.peer_id, params) } - } - }) -} -Server.prototype._onWebSocketSend = function (socket, err) { - var self = this - if (err) self._onWebSocketError(socket, err) -} - -Server.prototype._onWebSocketClose = function (socket) { - var self = this - debug('websocket close %s', socket.peerId) - socket.destroyed = true - - if (socket.peerId) { - socket.infoHashes.slice(0).forEach(function (infoHash) { - var swarm = self.torrents[infoHash] - if (swarm) { - swarm.announce({ - type: 'ws', - event: 'stopped', - numwant: 0, - peer_id: socket.peerId - }, noop) + function done () { + // emit event once the announce is fully "processed" + if (params.action === common.ACTIONS.ANNOUNCE) { + self.emit(common.EVENT_NAMES[params.event], params.peer_id, params) + } } }) } - // ignore all future errors - socket.onSend = noop - socket.on('error', noop) - - socket.peerId = null - socket.infoHashes = null - - if (typeof socket.onMessageBound === 'function') { - socket.removeListener('message', socket.onMessageBound) + _onWebSocketSend (socket, err) { + const self = this + if (err) self._onWebSocketError(socket, err) } - socket.onMessageBound = null - if (typeof socket.onErrorBound === 'function') { - socket.removeListener('error', socket.onErrorBound) + _onWebSocketClose (socket) { + const self = this + debug('websocket close %s', socket.peerId) + socket.destroyed = true + + if (socket.peerId) { + socket.infoHashes.slice(0).forEach(infoHash => { + const swarm = self.torrents[infoHash] + if (swarm) { + swarm.announce({ + type: 'ws', + event: 'stopped', + numwant: 0, + peer_id: socket.peerId + }, noop) + } + }) + } + + // ignore all future errors + socket.onSend = noop + socket.on('error', noop) + + socket.peerId = null + socket.infoHashes = null + + if (typeof socket.onMessageBound === 'function') { + socket.removeListener('message', socket.onMessageBound) + } + socket.onMessageBound = null + + if (typeof socket.onErrorBound === 'function') { + socket.removeListener('error', socket.onErrorBound) + } + socket.onErrorBound = null + + if (typeof socket.onCloseBound === 'function') { + socket.removeListener('close', socket.onCloseBound) + } + socket.onCloseBound = null } - socket.onErrorBound = null - if (typeof socket.onCloseBound === 'function') { - socket.removeListener('close', socket.onCloseBound) + _onWebSocketError (socket, err) { + const self = this + debug('websocket error %s', err.message || err) + self.emit('warning', err) + self._onWebSocketClose(socket) } - socket.onCloseBound = null -} -Server.prototype._onWebSocketError = function (socket, err) { - var self = this - debug('websocket error %s', err.message || err) - self.emit('warning', err) - self._onWebSocketClose(socket) -} - -Server.prototype._onRequest = function (params, cb) { - var self = this - if (params && params.action === common.ACTIONS.CONNECT) { - cb(null, { action: common.ACTIONS.CONNECT }) - } else if (params && params.action === common.ACTIONS.ANNOUNCE) { - self._onAnnounce(params, cb) - } else if (params && params.action === common.ACTIONS.SCRAPE) { - self._onScrape(params, cb) - } else { - cb(new Error('Invalid action')) + _onRequest (params, cb) { + const self = this + if (params && params.action === common.ACTIONS.CONNECT) { + cb(null, { action: common.ACTIONS.CONNECT }) + } else if (params && params.action === common.ACTIONS.ANNOUNCE) { + self._onAnnounce(params, cb) + } else if (params && params.action === common.ACTIONS.SCRAPE) { + self._onScrape(params, cb) + } else { + cb(new Error('Invalid action')) + } } -} -Server.prototype._onAnnounce = function (params, cb) { - var self = this + _onAnnounce (params, cb) { + const self = this - if (self._filter) { - self._filter(params.info_hash, params, function (err) { - // Presence of `err` means that this announce request is disallowed - if (err) return cb(err) + if (self._filter) { + self._filter(params.info_hash, params, err => { + // Presence of `err` means that this announce request is disallowed + if (err) return cb(err) - getOrCreateSwarm(function (err, swarm) { + getOrCreateSwarm((err, swarm) => { + if (err) return cb(err) + announce(swarm) + }) + }) + } else { + getOrCreateSwarm((err, swarm) => { if (err) return cb(err) announce(swarm) }) - }) - } else { - getOrCreateSwarm(function (err, swarm) { - if (err) return cb(err) - announce(swarm) - }) - } + } - // Get existing swarm, or create one if one does not exist - function getOrCreateSwarm (cb) { - self.getSwarm(params.info_hash, function (err, swarm) { - if (err) return cb(err) - if (swarm) return cb(null, swarm) - self.createSwarm(params.info_hash, function (err, swarm) { + // Get existing swarm, or create one if one does not exist + function getOrCreateSwarm (cb) { + self.getSwarm(params.info_hash, (err, swarm) => { if (err) return cb(err) - cb(null, swarm) + if (swarm) return cb(null, swarm) + self.createSwarm(params.info_hash, (err, swarm) => { + if (err) return cb(err) + cb(null, swarm) + }) }) - }) + } + + function announce (swarm) { + if (!params.event || params.event === 'empty') params.event = 'update' + swarm.announce(params, (err, response) => { + if (err) return cb(err) + + if (!response.action) response.action = common.ACTIONS.ANNOUNCE + if (!response.interval) response.interval = Math.ceil(self.intervalMs / 1000) + + if (params.compact === 1) { + const peers = response.peers + + // Find IPv4 peers + response.peers = string2compact(peers.filter(peer => { + return common.IPV4_RE.test(peer.ip) + }).map(peer => { + return `${peer.ip}:${peer.port}` + })) + // Find IPv6 peers + response.peers6 = string2compact(peers.filter(peer => { + return common.IPV6_RE.test(peer.ip) + }).map(peer => { + return `[${peer.ip}]:${peer.port}` + })) + } else if (params.compact === 0) { + // IPv6 peers are not separate for non-compact responses + response.peers = response.peers.map(peer => { + return { + 'peer id': common.hexToBinary(peer.peerId), + ip: peer.ip, + port: peer.port + } + }) + } // else, return full peer objects (used for websocket responses) + + cb(null, response) + }) + } } - function announce (swarm) { - if (!params.event || params.event === 'empty') params.event = 'update' - swarm.announce(params, function (err, response) { - if (err) return cb(err) + _onScrape (params, cb) { + const self = this - if (!response.action) response.action = common.ACTIONS.ANNOUNCE - if (!response.interval) response.interval = Math.ceil(self.intervalMs / 1000) + if (params.info_hash == null) { + // if info_hash param is omitted, stats for all torrents are returned + // TODO: make this configurable! + params.info_hash = Object.keys(self.torrents) + } - if (params.compact === 1) { - var peers = response.peers - - // Find IPv4 peers - response.peers = string2compact(peers.filter(function (peer) { - return common.IPV4_RE.test(peer.ip) - }).map(function (peer) { - return peer.ip + ':' + peer.port - })) - // Find IPv6 peers - response.peers6 = string2compact(peers.filter(function (peer) { - return common.IPV6_RE.test(peer.ip) - }).map(function (peer) { - return '[' + peer.ip + ']:' + peer.port - })) - } else if (params.compact === 0) { - // IPv6 peers are not separate for non-compact responses - response.peers = response.peers.map(function (peer) { - return { - 'peer id': common.hexToBinary(peer.peerId), - ip: peer.ip, - port: peer.port + series(params.info_hash.map(infoHash => { + return cb => { + self.getSwarm(infoHash, (err, swarm) => { + if (err) return cb(err) + if (swarm) { + swarm.scrape(params, (err, scrapeInfo) => { + if (err) return cb(err) + cb(null, { + infoHash, + complete: (scrapeInfo && scrapeInfo.complete) || 0, + incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0 + }) + }) + } else { + cb(null, { infoHash, complete: 0, incomplete: 0 }) } }) - } // else, return full peer objects (used for websocket responses) + } + }), (err, results) => { + if (err) return cb(err) + + const response = { + action: common.ACTIONS.SCRAPE, + files: {}, + flags: { min_request_interval: Math.ceil(self.intervalMs / 1000) } + } + + results.forEach(result => { + response.files[common.hexToBinary(result.infoHash)] = { + complete: result.complete || 0, + incomplete: result.incomplete || 0, + downloaded: result.complete || 0 // TODO: this only provides a lower-bound + } + }) cb(null, response) }) } } -Server.prototype._onScrape = function (params, cb) { - var self = this - - if (params.info_hash == null) { - // if info_hash param is omitted, stats for all torrents are returned - // TODO: make this configurable! - params.info_hash = Object.keys(self.torrents) - } - - series(params.info_hash.map(function (infoHash) { - return function (cb) { - self.getSwarm(infoHash, function (err, swarm) { - if (err) return cb(err) - if (swarm) { - swarm.scrape(params, function (err, scrapeInfo) { - if (err) return cb(err) - cb(null, { - infoHash: infoHash, - complete: (scrapeInfo && scrapeInfo.complete) || 0, - incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0 - }) - }) - } else { - cb(null, { infoHash: infoHash, complete: 0, incomplete: 0 }) - } - }) - } - }), function (err, results) { - if (err) return cb(err) - - var response = { - action: common.ACTIONS.SCRAPE, - files: {}, - flags: { min_request_interval: Math.ceil(self.intervalMs / 1000) } - } - - results.forEach(function (result) { - response.files[common.hexToBinary(result.infoHash)] = { - complete: result.complete || 0, - incomplete: result.incomplete || 0, - downloaded: result.complete || 0 // TODO: this only provides a lower-bound - } - }) - - cb(null, response) - }) -} +Server.Swarm = Swarm function makeUdpPacket (params) { - var packet + let packet switch (params.action) { case common.ACTIONS.CONNECT: packet = Buffer.concat([ @@ -794,12 +773,12 @@ function makeUdpPacket (params) { ]) break case common.ACTIONS.SCRAPE: - var scrapeResponse = [ + const scrapeResponse = [ common.toUInt32(common.ACTIONS.SCRAPE), common.toUInt32(params.transactionId) ] - for (var infoHash in params.files) { - var file = params.files[infoHash] + for (const infoHash in params.files) { + const file = params.files[infoHash] scrapeResponse.push( common.toUInt32(file.complete), common.toUInt32(file.downloaded), // TODO: this only provides a lower-bound @@ -816,7 +795,7 @@ function makeUdpPacket (params) { ]) break default: - throw new Error('Action not implemented: ' + params.action) + throw new Error(`Action not implemented: ${params.action}`) } return packet } @@ -827,3 +806,5 @@ function toNumber (x) { } function noop () {} + +module.exports = Server From ecf91b6f1b90f8f904ca61c76d9b5a7f95889d8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20W=C3=A4rting?= Date: Tue, 2 Oct 2018 18:33:00 +0200 Subject: [PATCH 2/5] removed self --- server.js | 41 +++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/server.js b/server.js index b9d7d6b..61e9124 100644 --- a/server.js +++ b/server.js @@ -277,13 +277,11 @@ class Server extends EventEmitter { } listen (...args) /* port, hostname, onlistening */{ - const self = this - - if (self._listenCalled || self.listening) throw new Error('server already listening') - self._listenCalled = true + if (this._listenCalled || this.listening) throw new Error('server already listening') + this._listenCalled = true const lastArg = args[args.length - 1] - if (typeof lastArg === 'function') self.once('listening', lastArg) + if (typeof lastArg === 'function') this.once('listening', lastArg) const port = toNumber(args[0]) || args[0] || 0 const hostname = typeof args[1] !== 'function' ? args[1] : undefined @@ -303,9 +301,9 @@ class Server extends EventEmitter { const udp4Hostname = isObject(hostname) ? hostname.udp : hostname const udp6Hostname = isObject(hostname) ? hostname.udp6 : hostname - if (self.http) self.http.listen(httpPort, httpHostname) - if (self.udp4) self.udp4.bind(udpPort, udp4Hostname) - if (self.udp6) self.udp6.bind(udpPort, udp6Hostname) + if (this.http) this.http.listen(httpPort, httpHostname) + if (this.udp4) this.udp4.bind(udpPort, udp4Hostname) + if (this.udp6) this.udp6.bind(udpPort, udp6Hostname) } close (cb = noop) { @@ -429,28 +427,27 @@ class Server extends EventEmitter { } onWebSocketConnection (socket, opts) { - const self = this if (!opts) opts = {} - opts.trustProxy = opts.trustProxy || self._trustProxy + opts.trustProxy = opts.trustProxy || this._trustProxy socket.peerId = null // as hex socket.infoHashes = [] // swarms that this socket is participating in socket.onSend = err => { - self._onWebSocketSend(socket, err) + this._onWebSocketSend(socket, err) } socket.onMessageBound = params => { - self._onWebSocketRequest(socket, opts, params) + this._onWebSocketRequest(socket, opts, params) } socket.on('message', socket.onMessageBound) socket.onErrorBound = err => { - self._onWebSocketError(socket, err) + this._onWebSocketError(socket, err) } socket.on('error', socket.onErrorBound) socket.onCloseBound = () => { - self._onWebSocketClose(socket) + this._onWebSocketClose(socket) } socket.on('close', socket.onCloseBound) } @@ -564,18 +561,16 @@ class Server extends EventEmitter { } _onWebSocketSend (socket, err) { - const self = this - if (err) self._onWebSocketError(socket, err) + if (err) this._onWebSocketError(socket, err) } _onWebSocketClose (socket) { - const self = this debug('websocket close %s', socket.peerId) socket.destroyed = true if (socket.peerId) { socket.infoHashes.slice(0).forEach(infoHash => { - const swarm = self.torrents[infoHash] + const swarm = this.torrents[infoHash] if (swarm) { swarm.announce({ type: 'ws', @@ -611,20 +606,18 @@ class Server extends EventEmitter { } _onWebSocketError (socket, err) { - const self = this debug('websocket error %s', err.message || err) - self.emit('warning', err) - self._onWebSocketClose(socket) + this.emit('warning', err) + this._onWebSocketClose(socket) } _onRequest (params, cb) { - const self = this if (params && params.action === common.ACTIONS.CONNECT) { cb(null, { action: common.ACTIONS.CONNECT }) } else if (params && params.action === common.ACTIONS.ANNOUNCE) { - self._onAnnounce(params, cb) + this._onAnnounce(params, cb) } else if (params && params.action === common.ACTIONS.SCRAPE) { - self._onScrape(params, cb) + this._onScrape(params, cb) } else { cb(new Error('Invalid action')) } From 502b11fb9ed59333f3b56957c8d4dbcf97882358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20W=C3=A4rting?= Date: Tue, 2 Oct 2018 18:33:30 +0200 Subject: [PATCH 3/5] EventEmitter is circular --- server.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.js b/server.js index 61e9124..2703e11 100644 --- a/server.js +++ b/server.js @@ -2,7 +2,7 @@ const Buffer = require('safe-buffer').Buffer const bencode = require('bencode') const debug = require('debug')('bittorrent-tracker:server') const dgram = require('dgram') -const EventEmitter = require('events').EventEmitter +const EventEmitter = require('events') const http = require('http') const peerid = require('bittorrent-peerid') const series = require('run-series') From 7df1bdfa3fde6fa90add79e9aba0d537a538ff13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20W=C3=A4rting?= Date: Tue, 2 Oct 2018 18:37:46 +0200 Subject: [PATCH 4/5] few es6 sugar --- server.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server.js b/server.js index 2703e11..123583c 100644 --- a/server.js +++ b/server.js @@ -1,4 +1,4 @@ -const Buffer = require('safe-buffer').Buffer +const { Buffer } = require('safe-buffer') const bencode = require('bencode') const debug = require('debug')('bittorrent-tracker:server') const dgram = require('dgram') @@ -272,8 +272,7 @@ class Server extends EventEmitter { } _onError (err) { - const self = this - self.emit('error', err) + this.emit('error', err) } listen (...args) /* port, hostname, onlistening */{ @@ -426,8 +425,7 @@ class Server extends EventEmitter { }) } - onWebSocketConnection (socket, opts) { - if (!opts) opts = {} + onWebSocketConnection (socket, opts = {}) { opts.trustProxy = opts.trustProxy || this._trustProxy socket.peerId = null // as hex From 1e8487038ff9ac9fb08f95820d0b3a44fb936500 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20W=C3=A4rting?= Date: Wed, 3 Oct 2018 11:49:45 +0200 Subject: [PATCH 5/5] removed some more self variables --- server.js | 156 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 80 insertions(+), 76 deletions(-) diff --git a/server.js b/server.js index 123583c..9d29bef 100644 --- a/server.js +++ b/server.js @@ -34,42 +34,40 @@ const parseWebSocketRequest = require('./lib/server/parse-websocket') class Server extends EventEmitter { constructor (opts = {}) { super() - const self = this - debug('new server %s', JSON.stringify(opts)) - self.intervalMs = opts.interval + this.intervalMs = opts.interval ? opts.interval : 10 * 60 * 1000 // 10 min - self._trustProxy = !!opts.trustProxy - if (typeof opts.filter === 'function') self._filter = opts.filter + this._trustProxy = !!opts.trustProxy + if (typeof opts.filter === 'function') this._filter = opts.filter - self.peersCacheLength = opts.peersCacheLength - self.peersCacheTtl = opts.peersCacheTtl + this.peersCacheLength = opts.peersCacheLength + this.peersCacheTtl = opts.peersCacheTtl - self._listenCalled = false - self.listening = false - self.destroyed = false - self.torrents = {} + this._listenCalled = false + this.listening = false + this.destroyed = false + this.torrents = {} - self.http = null - self.udp4 = null - self.udp6 = null - self.ws = null + this.http = null + this.udp4 = null + this.udp6 = null + this.ws = null // start an http tracker unless the user explictly says no if (opts.http !== false) { - self.http = http.createServer() - self.http.on('error', err => { self._onError(err) }) - self.http.on('listening', onListening) + this.http = http.createServer() + this.http.on('error', err => { this._onError(err) }) + this.http.on('listening', onListening) // Add default http request handler on next tick to give user the chance to add // their own handler first. Handle requests untouched by user's handler. process.nextTick(() => { - self.http.on('request', (req, res) => { + this.http.on('request', (req, res) => { if (res.headersSent) return - self.onHttpRequest(req, res) + this.onHttpRequest(req, res) }) }) } @@ -78,32 +76,32 @@ class Server extends EventEmitter { if (opts.udp !== false) { const isNode10 = /^v0.10./.test(process.version) - self.udp4 = self.udp = dgram.createSocket( + this.udp4 = this.udp = dgram.createSocket( isNode10 ? 'udp4' : { type: 'udp4', reuseAddr: true } ) - self.udp4.on('message', (msg, rinfo) => { self.onUdpRequest(msg, rinfo) }) - self.udp4.on('error', err => { self._onError(err) }) - self.udp4.on('listening', onListening) + this.udp4.on('message', (msg, rinfo) => { this.onUdpRequest(msg, rinfo) }) + this.udp4.on('error', err => { this._onError(err) }) + this.udp4.on('listening', onListening) - self.udp6 = dgram.createSocket( + this.udp6 = dgram.createSocket( isNode10 ? 'udp6' : { type: 'udp6', reuseAddr: true } ) - self.udp6.on('message', (msg, rinfo) => { self.onUdpRequest(msg, rinfo) }) - self.udp6.on('error', err => { self._onError(err) }) - self.udp6.on('listening', onListening) + this.udp6.on('message', (msg, rinfo) => { this.onUdpRequest(msg, rinfo) }) + this.udp6.on('error', err => { this._onError(err) }) + this.udp6.on('listening', onListening) } // start a websocket tracker (for WebTorrent) unless the user explicitly says no if (opts.ws !== false) { - if (!self.http) { - self.http = http.createServer() - self.http.on('error', err => { self._onError(err) }) - self.http.on('listening', onListening) + if (!this.http) { + this.http = http.createServer() + this.http.on('error', err => { this._onError(err) }) + this.http.on('listening', onListening) // Add default http request handler on next tick to give user the chance to add // their own handler first. Handle requests untouched by user's handler. process.nextTick(() => { - self.http.on('request', (req, res) => { + this.http.on('request', (req, res) => { if (res.headersSent) return // For websocket trackers, we only need to handle the UPGRADE http method. // Return 404 for all other request types. @@ -112,35 +110,35 @@ class Server extends EventEmitter { }) }) } - self.ws = new WebSocketServer({ - server: self.http, + this.ws = new WebSocketServer({ + server: this.http, perMessageDeflate: false, clientTracking: false }) - self.ws.address = () => { - return self.http.address() + this.ws.address = () => { + return this.http.address() } - self.ws.on('error', err => { self._onError(err) }) - self.ws.on('connection', (socket, req) => { + this.ws.on('error', err => { this._onError(err) }) + this.ws.on('connection', (socket, req) => { // Note: socket.upgradeReq was removed in ws@3.0.0, so re-add it. // https://github.com/websockets/ws/pull/1099 socket.upgradeReq = req - self.onWebSocketConnection(socket) + this.onWebSocketConnection(socket) }) } if (opts.stats !== false) { - if (!self.http) { - self.http = http.createServer() - self.http.on('error', err => { self._onError(err) }) - self.http.on('listening', onListening) + if (!this.http) { + this.http = http.createServer() + this.http.on('error', err => { this._onError(err) }) + this.http.on('listening', onListening) } // Http handler for '/stats' route - self.http.on('request', (req, res) => { + this.http.on('request', (req, res) => { if (res.headersSent) return - const infoHashes = Object.keys(self.torrents) + const infoHashes = Object.keys(this.torrents) let activeTorrents = 0 const allPeers = {} @@ -196,7 +194,7 @@ class Server extends EventEmitter { if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { infoHashes.forEach(infoHash => { - const peers = self.torrents[infoHash].peers + const peers = this.torrents[infoHash].peers const keys = peers.keys if (keys.length > 0) activeTorrents++ @@ -253,14 +251,24 @@ class Server extends EventEmitter { res.write(JSON.stringify(stats)) res.end() } else if (req.url === '/stats') { - res.end(`

${stats.torrents} torrents (${stats.activeTorrents} active)

\n

Connected Peers: ${stats.peersAll}

\n

Peers Seeding Only: ${stats.peersSeederOnly}

\n

Peers Leeching Only: ${stats.peersLeecherOnly}

\n

Peers Seeding & Leeching: ${stats.peersSeederAndLeecher}

\n

IPv4 Peers: ${stats.peersIPv4}

\n

IPv6 Peers: ${stats.peersIPv6}

\n

Clients:

\n${printClients(stats.clients)}` - ) + res.end(` +

${stats.torrents} torrents (${stats.activeTorrents} active)

+

Connected Peers: ${stats.peersAll}

+

Peers Seeding Only: ${stats.peersSeederOnly}

+

Peers Leeching Only: ${stats.peersLeecherOnly}

+

Peers Seeding & Leeching: ${stats.peersSeederAndLeecher}

+

IPv4 Peers: ${stats.peersIPv4}

+

IPv6 Peers: ${stats.peersIPv6}

+

Clients:

+ ${printClients(stats.clients)} + `.replace(/^\s+/gm, '')) // trim left } } }) } - let num = !!self.http + !!self.udp4 + !!self.udp6 + let num = !!this.http + !!this.udp4 + !!this.udp6 + const self = this function onListening () { num -= 1 if (num === 0) { @@ -451,8 +459,6 @@ class Server extends EventEmitter { } _onWebSocketRequest (socket, opts, params) { - const self = this - try { params = parseWebSocketRequest(socket, opts, params) } catch (err) { @@ -462,14 +468,14 @@ class Server extends EventEmitter { // even though it's an error for the client, it's just a warning for the server. // don't crash the server because a client sent bad data :) - self.emit('warning', err) + this.emit('warning', err) return } if (!socket.peerId) socket.peerId = params.peer_id // as hex - self._onRequest(params, (err, response) => { - if (self.destroyed || socket.destroyed) return + this._onRequest(params, (err, response) => { + if (this.destroyed || socket.destroyed) return if (err) { socket.send(JSON.stringify({ action: params.action === common.ACTIONS.ANNOUNCE ? 'announce' : 'scrape', @@ -477,7 +483,7 @@ class Server extends EventEmitter { info_hash: common.hexToBinary(params.info_hash) }), socket.onSend) - self.emit('warning', err) + this.emit('warning', err) return } @@ -495,7 +501,7 @@ class Server extends EventEmitter { response.info_hash = common.hexToBinary(params.info_hash) // WebSocket tracker should have a shorter interval – default: 2 minutes - response.interval = Math.ceil(self.intervalMs / 1000 / 5) + response.interval = Math.ceil(this.intervalMs / 1000 / 5) } // Skip sending update back for 'answer' announce messages – not needed @@ -519,19 +525,26 @@ class Server extends EventEmitter { }) } + const done = () => { + // emit event once the announce is fully "processed" + if (params.action === common.ACTIONS.ANNOUNCE) { + this.emit(common.EVENT_NAMES[params.event], params.peer_id, params) + } + } + if (params.answer) { debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) - self.getSwarm(params.info_hash, (err, swarm) => { - if (self.destroyed) return - if (err) return self.emit('warning', err) + this.getSwarm(params.info_hash, (err, swarm) => { + if (this.destroyed) return + if (err) return this.emit('warning', err) if (!swarm) { - return self.emit('warning', new Error('no swarm with that `info_hash`')) + return this.emit('warning', new Error('no swarm with that `info_hash`')) } // Mark the destination peer as recently used in cache const toPeer = swarm.peers.get(params.to_peer_id) if (!toPeer) { - return self.emit('warning', new Error('no peer with that `to_peer_id`')) + return this.emit('warning', new Error('no peer with that `to_peer_id`')) } toPeer.socket.send(JSON.stringify({ @@ -548,13 +561,6 @@ class Server extends EventEmitter { } else { done() } - - function done () { - // emit event once the announce is fully "processed" - if (params.action === common.ACTIONS.ANNOUNCE) { - self.emit(common.EVENT_NAMES[params.event], params.peer_id, params) - } - } }) } @@ -624,8 +630,8 @@ class Server extends EventEmitter { _onAnnounce (params, cb) { const self = this - if (self._filter) { - self._filter(params.info_hash, params, err => { + if (this._filter) { + this._filter(params.info_hash, params, err => { // Presence of `err` means that this announce request is disallowed if (err) return cb(err) @@ -693,17 +699,15 @@ class Server extends EventEmitter { } _onScrape (params, cb) { - const self = this - if (params.info_hash == null) { // if info_hash param is omitted, stats for all torrents are returned // TODO: make this configurable! - params.info_hash = Object.keys(self.torrents) + params.info_hash = Object.keys(this.torrents) } series(params.info_hash.map(infoHash => { return cb => { - self.getSwarm(infoHash, (err, swarm) => { + this.getSwarm(infoHash, (err, swarm) => { if (err) return cb(err) if (swarm) { swarm.scrape(params, (err, scrapeInfo) => { @@ -725,7 +729,7 @@ class Server extends EventEmitter { const response = { action: common.ACTIONS.SCRAPE, files: {}, - flags: { min_request_interval: Math.ceil(self.intervalMs / 1000) } + flags: { min_request_interval: Math.ceil(this.intervalMs / 1000) } } results.forEach(result => {