From 037a53accba0b2a7d2f3af2f40be7042859b1e0b Mon Sep 17 00:00:00 2001 From: Astro Date: Mon, 8 Dec 2014 23:42:05 +0100 Subject: [PATCH 01/16] server: split out parseHttpRequest() --- server.js | 155 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 54 deletions(-) diff --git a/server.js b/server.js index 3370b56..26b12e0 100644 --- a/server.js +++ b/server.js @@ -1,4 +1,5 @@ module.exports = Server +module.exports.parseHttpRequest = parseHttpRequest var bencode = require('bencode') var bufferEqual = require('buffer-equal') @@ -138,48 +139,43 @@ Server.prototype._getSwarm = function (binaryInfoHash) { Server.prototype._onHttpRequest = function (req, res) { var self = this - var s = req.url.split('?') - var params = common.querystringParse(s[1]) + + var params + try { + params = parseHttpRequest(req, { + trustProxy: self._trustProxy + }) + } catch (err) { + debug('sent error %s', err.message) + res.end(bencode.encode({ + 'failure reason': err.message + })) + + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) + self.emit('warning', err) + + return + } + var response - if (s[0] === '/announce') { - var infoHash = typeof params.info_hash === 'string' && params.info_hash - var peerId = typeof params.peer_id === 'string' && common.binaryToUtf8(params.peer_id) - var port = Number(params.port) - - if (!infoHash) return error('invalid info_hash') - if (infoHash.length !== 20) return error('invalid info_hash') - if (!peerId) return error('invalid peer_id') - if (peerId.length !== 20) return error('invalid peer_id') - if (!port) return error('invalid port') - - var left = Number(params.left) - var compact = Number(params.compact) - - var ip = self._trustProxy - ? req.headers['x-forwarded-for'] || req.connection.remoteAddress - : req.connection.remoteAddress.replace(REMOVE_IPV6_RE, '') // force ipv4 - var addr = ip + ':' + port - var swarm = self._getSwarm(infoHash) - var peer = swarm.peers[addr] - - var numWant = Math.min( - Number(params.numwant) || NUM_ANNOUNCE_PEERS, - MAX_ANNOUNCE_PEERS - ) + if (params && params.request === 'announce') { + var swarm = self._getSwarm(params.info_hash) + var peer = swarm.peers[params.addr] var start = function () { if (peer) { debug('unexpected `started` event from peer that is already in swarm') return update() // treat as an update } - if (left === 0) swarm.complete += 1 + if (params.left === 0) swarm.complete += 1 else swarm.incomplete += 1 - peer = swarm.peers[addr] = { - ip: ip, - port: port, - peerId: peerId + peer = swarm.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id } - self.emit('start', addr) + self.emit('start', params.addr) } var stop = function () { @@ -189,8 +185,8 @@ Server.prototype._onHttpRequest = function (req, res) { } if (peer.complete) swarm.complete -= 1 else swarm.incomplete -= 1 - swarm.peers[addr] = null - self.emit('stop', addr) + swarm.peers[params.addr] = null + self.emit('stop', params.addr) } var complete = function () { @@ -205,7 +201,7 @@ Server.prototype._onHttpRequest = function (req, res) { swarm.complete += 1 swarm.incomplete -= 1 peer.complete = true - self.emit('complete', addr) + self.emit('complete', params.addr) } var update = function () { @@ -213,7 +209,7 @@ Server.prototype._onHttpRequest = function (req, res) { debug('unexpected `update` event from peer that is not in swarm') return start() // treat as a start } - self.emit('update', addr) + self.emit('update', params.addr) } switch (params.event) { @@ -233,12 +229,12 @@ Server.prototype._onHttpRequest = function (req, res) { return error('invalid event') // early return } - if (left === 0 && peer) peer.complete = true + if (params.left === 0 && peer) peer.complete = true // send peers - var peers = compact === 1 - ? self._getPeersCompact(swarm, numWant) - : self._getPeers(swarm, numWant) + var peers = params.compact === 1 + ? self._getPeersCompact(swarm, params.numwant) + : self._getPeers(swarm, params.numwant) response = { complete: swarm.complete, @@ -250,11 +246,12 @@ Server.prototype._onHttpRequest = function (req, res) { res.end(bencode.encode(response)) debug('sent response %s', response) - } else if (s[0] === '/scrape') { // unofficial scrape message + } else if (params.request === 'scrape') { // unofficial scrape message if (typeof params.info_hash === 'string') { params.info_hash = [ params.info_hash ] } else if (params.info_hash == null) { // if info_hash param is omitted, stats for all torrents are returned + // TODO: make this configurable! params.info_hash = Object.keys(self.torrents) } @@ -268,11 +265,6 @@ Server.prototype._onHttpRequest = function (req, res) { } params.info_hash.some(function (infoHash) { - if (infoHash.length !== 20) { - error('invalid info_hash') - return true // early return - } - var swarm = self._getSwarm(infoHash) response.files[infoHash] = { @@ -338,7 +330,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { var event = msg.readUInt32BE(80) var ip = msg.readUInt32BE(84) // optional var key = msg.readUInt32BE(88) // TODO: what is this for? - var numWant = msg.readUInt32BE(92) // optional + var numwant = msg.readUInt32BE(92) // optional var port = msg.readUInt16BE(96) // optional if (ip) { @@ -358,7 +350,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than // 512 bytes which is not safe - numWant = Math.min(numWant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) + numwant = Math.min(numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) var start = function () { if (peer) { @@ -429,7 +421,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { if (left === 0 && peer) peer.complete = true // send peers - var peers = self._getPeersCompact(swarm, numWant) + var peers = self._getPeersCompact(swarm, numwant) send(Buffer.concat([ common.toUInt32(common.ACTIONS.ANNOUNCE), @@ -479,10 +471,10 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { } } -Server.prototype._getPeers = function (swarm, numWant) { +Server.prototype._getPeers = function (swarm, numwant) { var peers = [] for (var peerId in swarm.peers) { - if (peers.length >= numWant) break + if (peers.length >= numwant) break var peer = swarm.peers[peerId] if (!peer) continue // ignore null values peers.push({ @@ -494,11 +486,11 @@ Server.prototype._getPeers = function (swarm, numWant) { return peers } -Server.prototype._getPeersCompact = function (swarm, numWant) { +Server.prototype._getPeersCompact = function (swarm, numwant) { var peers = [] for (var peerId in swarm.peers) { - if (peers.length >= numWant) break + if (peers.length >= numwant) break var peer = swarm.peers[peerId] if (!peer) continue // ignore null values peers.push(peer.ip + ':' + peer.port) @@ -507,6 +499,61 @@ Server.prototype._getPeersCompact = function (swarm, numWant) { return string2compact(peers) } + +function parseHttpRequest (req, options) { + var s = req.url.split('?') + var params = common.querystringParse(s[1]) + + if (s[0] === '/announce') { + params.request = 'announce' + + params.peer_id = typeof params.peer_id === 'string' && common.binaryToUtf8(params.peer_id) + params.port = Number(params.port) + + if (typeof params.info_hash !== 'string') throw new Error('invalid info_hash') + if (params.info_hash.length !== 20) throw new Error('invalid info_hash length') + if (typeof params.peer_id !== 'string') throw new Error('invalid peer_id') + if (params.peer_id.length !== 20) throw new Error('invalid peer_id length') + if (!params.port) throw new Error('invalid port') + + params.left = Number(params.left) + params.compact = Number(params.compact) + + params.ip = options.trustProxy + ? req.headers['x-forwarded-for'] || req.connection.remoteAddress + : req.connection.remoteAddress.replace(REMOVE_IPV6_RE, '') // force ipv4 + params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets? + + params.numwant = Math.min( + Number(params.numwant) || NUM_ANNOUNCE_PEERS, + MAX_ANNOUNCE_PEERS + ) + + return params + } else if (s[0] === '/scrape') { // unofficial scrape message + params.request = 'scrape' + + if (typeof params.info_hash === 'string') { + params.info_hash = [ params.info_hash ] + } + + if (params.info_hash) { + if (!Array.isArray(params.info_hash)) throw new Error('invalid info_hash') + + params.info_hash.some(function (infoHash) { + if (infoHash.length !== 20) { + throw new Error('invalid info_hash') + } + }) + } + + return params + } else { + return null + } +} + + // HELPER FUNCTIONS var TWO_PWR_32 = (1 << 16) * 2 From 0bc88bcf22a81436faa7ba9277ea319ffcb3e349 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 9 Dec 2014 00:22:36 +0100 Subject: [PATCH 02/16] server: split out parseUdpRequest() --- lib/common.js | 6 ++ server.js | 173 +++++++++++++++++++++++++++----------------------- 2 files changed, 101 insertions(+), 78 deletions(-) diff --git a/lib/common.js b/lib/common.js index ccf1bd4..cb83d5b 100644 --- a/lib/common.js +++ b/lib/common.js @@ -7,6 +7,12 @@ var querystring = require('querystring') exports.CONNECTION_ID = Buffer.concat([ toUInt32(0x417), toUInt32(0x27101980) ]) exports.ACTIONS = { CONNECT: 0, ANNOUNCE: 1, SCRAPE: 2, ERROR: 3 } exports.EVENTS = { update: 0, completed: 1, started: 2, stopped: 3 } +exports.EVENT_IDS = { + 0: 'update', + 1: 'completed', + 2: 'started', + 3: 'stopped' +}; function toUInt32 (n) { var buf = new Buffer(4) diff --git a/server.js b/server.js index 26b12e0..ec8790c 100644 --- a/server.js +++ b/server.js @@ -296,75 +296,40 @@ Server.prototype._onHttpRequest = function (req, res) { Server.prototype._onUdpRequest = function (msg, rinfo) { var self = this - if (msg.length < 16) { - return error('received packet is too short') + var params + try { + params = parseUdpRequest(msg, rinfo) + } catch (err) { + console.error(err.stack) + return error(err.message) } - - if (rinfo.family !== 'IPv4') { - return error('udp tracker does not support IPv6') - } - - var connectionId = msg.slice(0, 8) // 64-bit - var action = msg.readUInt32BE(8) - var transactionId = msg.readUInt32BE(12) - - if (!bufferEqual(connectionId, common.CONNECTION_ID)) { - return error('received packet with invalid connection id') - } - + var socket = dgram.createSocket('udp4') - var infoHash, swarm - if (action === common.ACTIONS.CONNECT) { + var swarm + if (params && params.request === 'connect') { send(Buffer.concat([ common.toUInt32(common.ACTIONS.CONNECT), - common.toUInt32(transactionId), - connectionId + common.toUInt32(params.transactionId), + params.connectionId ])) - } else if (action === common.ACTIONS.ANNOUNCE) { - infoHash = msg.slice(16, 36).toString('binary') // 20 bytes - var peerId = msg.slice(36, 56).toString('utf8') // 20 bytes - var downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? - var left = fromUInt64(msg.slice(64, 72)) - var uploaded = fromUInt64(msg.slice(72, 80)) // TODO: track this? - var event = msg.readUInt32BE(80) - var ip = msg.readUInt32BE(84) // optional - var key = msg.readUInt32BE(88) // TODO: what is this for? - var numwant = msg.readUInt32BE(92) // optional - var port = msg.readUInt16BE(96) // optional - - if (ip) { - ip = ipLib.toString(ip) - } else { - ip = rinfo.address - } - - if (!port) { - port = rinfo.port - } - - var addr = ip + ':' + port - - swarm = self._getSwarm(infoHash) - var peer = swarm.peers[addr] - - // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than - // 512 bytes which is not safe - numwant = Math.min(numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) + } else if (params && params.request === 'announce') { + swarm = self._getSwarm(params.info_hash) + var peer = swarm.peers[params.addr] var start = function () { if (peer) { debug('unexpected `started` event from peer that is already in swarm') return update() // treat as an update } - if (left === 0) swarm.complete += 1 + if (params.left === 0) swarm.complete += 1 else swarm.incomplete += 1 - peer = swarm.peers[addr] = { - ip: ip, - port: port, - peerId: peerId + peer = swarm.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id } - self.emit('start', addr) + self.emit('start', params.addr) } var stop = function () { @@ -374,8 +339,8 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { } if (peer.complete) swarm.complete -= 1 else swarm.incomplete -= 1 - swarm.peers[addr] = null - self.emit('stop', addr) + swarm.peers[params.addr] = null + self.emit('stop', params.addr) } var complete = function () { @@ -390,7 +355,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { swarm.complete += 1 swarm.incomplete -= 1 peer.complete = true - self.emit('complete', addr) + self.emit('complete', params.addr) } var update = function () { @@ -398,53 +363,46 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { debug('unexpected `update` event from peer that is not in swarm') return start() // treat as a start } - self.emit('update', addr) + self.emit('update', params.addr) } - switch (event) { - case common.EVENTS.started: + switch (params.event) { + case 'started': start() break - case common.EVENTS.stopped: + case 'stopped': stop() break - case common.EVENTS.completed: + case 'completed': complete() break - case common.EVENTS.update: // update + case 'update': update() break default: return error('invalid event') // early return } - if (left === 0 && peer) peer.complete = true + if (params.left === 0 && peer) peer.complete = true // send peers - var peers = self._getPeersCompact(swarm, numwant) + var peers = self._getPeersCompact(swarm, params.numwant) send(Buffer.concat([ common.toUInt32(common.ACTIONS.ANNOUNCE), - common.toUInt32(transactionId), + common.toUInt32(params.transactionId), common.toUInt32(self._intervalMs), common.toUInt32(swarm.incomplete), common.toUInt32(swarm.complete), peers ])) - } else if (action === common.ACTIONS.SCRAPE) { // scrape message - infoHash = msg.slice(16, 36).toString('binary') // 20 bytes - - // TODO: support multiple info_hash scrape - if (msg.length > 36) { - error('multiple info_hash scrape not supported') - } - - swarm = self._getSwarm(infoHash) + } else if (params && params.request === 'scrape') { // scrape message + swarm = self._getSwarm(params.info_hash) send(Buffer.concat([ common.toUInt32(common.ACTIONS.SCRAPE), - common.toUInt32(transactionId), + common.toUInt32(params.transactionId), common.toUInt32(swarm.complete), common.toUInt32(swarm.complete), // TODO: this only provides a lower-bound common.toUInt32(swarm.incomplete) @@ -464,7 +422,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { debug('sent error %s', message) send(Buffer.concat([ common.toUInt32(common.ACTIONS.ERROR), - common.toUInt32(transactionId || 0), + common.toUInt32(params.transactionId || 0), new Buffer(message, 'utf8') ])) self.emit('warning', new Error(message)) @@ -553,6 +511,65 @@ function parseHttpRequest (req, options) { } } +function parseUdpRequest (msg, rinfo) { + if (msg.length < 16) { + throw new Error('received packet is too short') + } + + if (rinfo.family !== 'IPv4') { + throw new Error('udp tracker does not support IPv6') + } + + var params = { + connectionId: msg.slice(0, 8), // 64-bit + action: msg.readUInt32BE(8), + transactionId: msg.readUInt32BE(12) + } + + // TODO: randomize: + if (!bufferEqual(params.connectionId, common.CONNECTION_ID)) { + throw new Error('received packet with invalid connection id') + } + + if (params.action === common.ACTIONS.CONNECT) { + params.request = 'connect' + } else if (params.action === common.ACTIONS.ANNOUNCE) { + params.request = 'announce' + params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes + params.peer_id = msg.slice(36, 56).toString('utf8') // 20 bytes + params.downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? + params.left = fromUInt64(msg.slice(64, 72)) + params.uploaded = fromUInt64(msg.slice(72, 80)) // TODO: track this? + params.event = msg.readUInt32BE(80) + params.event = common.EVENT_IDS[params.event] + if (!params.event) throw new Error('invalid event') // early return + params.ip = msg.readUInt32BE(84) // optional + params.ip = params.ip ? + ipLib.toString(params.ip) : + params.ip = rinfo.address + params.key = msg.readUInt32BE(88) // TODO: what is this for? + params.numwant = msg.readUInt32BE(92) // optional + // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than + // 512 bytes which is not safe + params.numwant = Math.min(params.numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) + params.port = msg.readUInt16BE(96) || rinfo.port // optional + params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets + + } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message + params.request = 'scrape' + params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes + + // TODO: support multiple info_hash scrape + if (msg.length > 36) { + throw new Error('multiple info_hash scrape not supported') + } + } else { + return null + } + + return params +} + // HELPER FUNCTIONS From 85be917d5177f871190c9a77cae6c41864d89008 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 9 Dec 2014 00:25:29 +0100 Subject: [PATCH 03/16] server: replace request string by action number --- server.js | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/server.js b/server.js index ec8790c..dc041ba 100644 --- a/server.js +++ b/server.js @@ -159,7 +159,7 @@ Server.prototype._onHttpRequest = function (req, res) { } var response - if (params && params.request === 'announce') { + if (params && params.action === common.ACTIONS.ANNOUNCE) { var swarm = self._getSwarm(params.info_hash) var peer = swarm.peers[params.addr] @@ -246,7 +246,7 @@ Server.prototype._onHttpRequest = function (req, res) { res.end(bencode.encode(response)) debug('sent response %s', response) - } else if (params.request === 'scrape') { // unofficial scrape message + } else if (params.action === common.ACTIONS.SCRAPE) { // unofficial scrape message if (typeof params.info_hash === 'string') { params.info_hash = [ params.info_hash ] } else if (params.info_hash == null) { @@ -307,13 +307,13 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { var socket = dgram.createSocket('udp4') var swarm - if (params && params.request === 'connect') { + if (params && params.action === common.ACTIONS.CONNECT) { send(Buffer.concat([ common.toUInt32(common.ACTIONS.CONNECT), common.toUInt32(params.transactionId), params.connectionId ])) - } else if (params && params.request === 'announce') { + } else if (params && params.action === common.ACTIONS.ANNOUNCE) { swarm = self._getSwarm(params.info_hash) var peer = swarm.peers[params.addr] @@ -397,7 +397,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { peers ])) - } else if (params && params.request === 'scrape') { // scrape message + } else if (params && params.action === common.ACTIONS.SCRAPE) { // scrape message swarm = self._getSwarm(params.info_hash) send(Buffer.concat([ @@ -463,7 +463,7 @@ function parseHttpRequest (req, options) { var params = common.querystringParse(s[1]) if (s[0] === '/announce') { - params.request = 'announce' + params.action = common.ACTIONS.ANNOUNCE params.peer_id = typeof params.peer_id === 'string' && common.binaryToUtf8(params.peer_id) params.port = Number(params.port) @@ -489,7 +489,7 @@ function parseHttpRequest (req, options) { return params } else if (s[0] === '/scrape') { // unofficial scrape message - params.request = 'scrape' + params.action = common.ACTIONS.SCRAPE if (typeof params.info_hash === 'string') { params.info_hash = [ params.info_hash ] @@ -532,9 +532,8 @@ function parseUdpRequest (msg, rinfo) { } if (params.action === common.ACTIONS.CONNECT) { - params.request = 'connect' + // No further params } else if (params.action === common.ACTIONS.ANNOUNCE) { - params.request = 'announce' params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes params.peer_id = msg.slice(36, 56).toString('utf8') // 20 bytes params.downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? @@ -556,7 +555,6 @@ function parseUdpRequest (msg, rinfo) { params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message - params.request = 'scrape' params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes // TODO: support multiple info_hash scrape From 366b49bf068b17c4adbb72efa696a991650d4d39 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 9 Dec 2014 02:35:05 +0100 Subject: [PATCH 04/16] server: split out _onRequest() et al --- server.js | 452 +++++++++++++++++++++++++----------------------------- 1 file changed, 206 insertions(+), 246 deletions(-) diff --git a/server.js b/server.js index dc041ba..10f8bcb 100644 --- a/server.js +++ b/server.js @@ -157,140 +157,18 @@ Server.prototype._onHttpRequest = function (req, res) { return } - - var response - if (params && params.action === common.ACTIONS.ANNOUNCE) { - var swarm = self._getSwarm(params.info_hash) - var peer = swarm.peers[params.addr] - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update + this._onRequest(params, function (err, response) { + if (err) { + self.emit('warning', new Error(err.message)) + response = { + action: common.ACTIONS.ERRROR, + 'failure reason': err.message } - if (params.left === 0) swarm.complete += 1 - else swarm.incomplete += 1 - peer = swarm.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) - } - - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) swarm.complete -= 1 - else swarm.incomplete -= 1 - swarm.peers[params.addr] = null - self.emit('stop', params.addr) - } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - swarm.complete += 1 - swarm.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) - } - - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) - } - - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case '': case undefined: // update - update() - break - default: - return error('invalid event') // early return - } - - if (params.left === 0 && peer) peer.complete = true - - // send peers - var peers = params.compact === 1 - ? self._getPeersCompact(swarm, params.numwant) - : self._getPeers(swarm, params.numwant) - - response = { - complete: swarm.complete, - incomplete: swarm.incomplete, - peers: peers, - interval: self._intervalMs } res.end(bencode.encode(response)) - debug('sent response %s', response) - - } else if (params.action === common.ACTIONS.SCRAPE) { // unofficial scrape message - if (typeof params.info_hash === 'string') { - params.info_hash = [ params.info_hash ] - } else if (params.info_hash == null) { - // if info_hash param is omitted, stats for all torrents are returned - // TODO: make this configurable! - params.info_hash = Object.keys(self.torrents) - } - - if (!Array.isArray(params.info_hash)) return error('invalid info_hash') - - response = { - files: {}, - flags: { - min_request_interval: self._intervalMs - } - } - - params.info_hash.some(function (infoHash) { - var swarm = self._getSwarm(infoHash) - - response.files[infoHash] = { - complete: swarm.complete, - incomplete: swarm.incomplete, - downloaded: swarm.complete // TODO: this only provides a lower-bound - } - }) - - res.end(bencode.encode(response)) - debug('sent response %s', response) - - } else { - error('only /announce and /scrape are valid endpoints') - } - - function error (message) { - debug('sent error %s', message) - res.end(bencode.encode({ - 'failure reason': message - })) - - // even though it's an error for the client, it's just a warning for the server. - // don't crash the server because a client sent bad data :) - self.emit('warning', new Error(message)) - } + }) } Server.prototype._onUdpRequest = function (msg, rinfo) { @@ -300,133 +178,173 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { try { params = parseUdpRequest(msg, rinfo) } catch (err) { - console.error(err.stack) - return error(err.message) - } - - var socket = dgram.createSocket('udp4') - - var swarm - if (params && params.action === common.ACTIONS.CONNECT) { - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.CONNECT), - common.toUInt32(params.transactionId), - params.connectionId - ])) - } else if (params && params.action === common.ACTIONS.ANNOUNCE) { - swarm = self._getSwarm(params.info_hash) - var peer = swarm.peers[params.addr] - - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update - } - if (params.left === 0) swarm.complete += 1 - else swarm.incomplete += 1 - peer = swarm.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) - } - - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) swarm.complete -= 1 - else swarm.incomplete -= 1 - swarm.peers[params.addr] = null - self.emit('stop', params.addr) - } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - swarm.complete += 1 - swarm.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) - } - - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) - } - - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case 'update': - update() - break - default: - return error('invalid event') // early return - } - - if (params.left === 0 && peer) peer.complete = true - - // send peers - var peers = self._getPeersCompact(swarm, params.numwant) - - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.ANNOUNCE), - common.toUInt32(params.transactionId), - common.toUInt32(self._intervalMs), - common.toUInt32(swarm.incomplete), - common.toUInt32(swarm.complete), - peers - ])) - - } else if (params && params.action === common.ACTIONS.SCRAPE) { // scrape message - swarm = self._getSwarm(params.info_hash) - - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.SCRAPE), - common.toUInt32(params.transactionId), - common.toUInt32(swarm.complete), - common.toUInt32(swarm.complete), // TODO: this only provides a lower-bound - common.toUInt32(swarm.incomplete) - ])) + self.emit('warning', err) + // Do not reply for parsing errors + return } - function send (buf) { - debug('sent response %s', buf.toString('hex')) + // Do nothing with invalid request + if (!params) return + + // Handle + this._onRequest(params, function (err, response) { + if (err) { + self.emit('warning', new Error(err.message)) + response = { + action: common.ACTIONS.ERRROR, + 'failure reason': err.message + } + } + + var socket = dgram.createSocket('udp4') + response.transactionId = params.transactionId + response.connectionId = params.connectionId + var buf = makeUdpPacket(response) socket.send(buf, 0, buf.length, rinfo.port, rinfo.address, function () { try { socket.close() } catch (err) {} }) + }) +} + +Server.prototype._onRequest = function (params, cb) { + var response + if (params && params.action === common.ACTIONS.CONNECT) { + cb(null, { action: common.ACTIONS.CONNECT }) + } else if (params && params.action === common.ACTIONS.ANNOUNCE) { + this._onAnnounce(params, cb) + } else if (params && params.action === common.ACTIONS.SCRAPE) { + this._onScrape(params, cb) + } else { + cb(new Error('Invalid action')) + } +} + +Server.prototype._onAnnounce = function (params, cb) { + var self = this + + var swarm = self._getSwarm(params.info_hash) + var peer = swarm.peers[params.addr] + + var start = function () { + if (peer) { + debug('unexpected `started` event from peer that is already in swarm') + return update() // treat as an update + } + if (params.left === 0) swarm.complete += 1 + else swarm.incomplete += 1 + peer = swarm.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id + } + self.emit('start', params.addr) } - function error (message) { - debug('sent error %s', message) - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.ERROR), - common.toUInt32(params.transactionId || 0), - new Buffer(message, 'utf8') - ])) - self.emit('warning', new Error(message)) + var stop = function () { + if (!peer) { + debug('unexpected `stopped` event from peer that is not in swarm') + return // do nothing + } + if (peer.complete) swarm.complete -= 1 + else swarm.incomplete -= 1 + swarm.peers[params.addr] = null + self.emit('stop', params.addr) } + + var complete = function () { + if (!peer) { + debug('unexpected `completed` event from peer that is not in swarm') + return start() // treat as a start + } + if (peer.complete) { + debug('unexpected `completed` event from peer that is already marked as completed') + return // do nothing + } + swarm.complete += 1 + swarm.incomplete -= 1 + peer.complete = true + self.emit('complete', params.addr) + } + + var update = function () { + if (!peer) { + debug('unexpected `update` event from peer that is not in swarm') + return start() // treat as a start + } + self.emit('update', params.addr) + } + + switch (params.event) { + case 'started': + start() + break + case 'stopped': + stop() + break + case 'completed': + complete() + break + case '': case undefined: case 'empty': case 'update': // update + update() + break + default: + return cb(new Error('invalid event')) // early return + } + + if (params.left === 0 && peer) peer.complete = true + + // send peers + var peers = params.compact === 1 + ? self._getPeersCompact(swarm, params.numwant) + : self._getPeers(swarm, params.numwant) + + cb(null, { + action: common.ACTIONS.ANNOUNCE, + complete: swarm.complete, + incomplete: swarm.incomplete, + peers: peers, + intervalMs: self._intervalMs + }) +} + +Server.prototype._onScrape = function (params, cb) { + var self = this + + if (typeof params.info_hash === 'string') { + params.info_hash = [ params.info_hash ] + } else if (params.info_hash == null) { + // if info_hash param is omitted, stats for all torrents are returned + // TODO: make this configurable! + params.info_hash = Object.keys(self.torrents) + } + + if (!Array.isArray(params.info_hash)) { + var err = new Error('invalid info_hash') + self.emit('warning', err) + return cb(err) + } + + var response = { + action: common.ACTIONS.SCRAPE, + files: {}, + flags: { + min_request_interval: self._intervalMs + } + } + + params.info_hash.some(function (infoHash) { + var swarm = self._getSwarm(infoHash) + + response.files[infoHash] = { + complete: swarm.complete, + incomplete: swarm.incomplete, + downloaded: swarm.complete // TODO: this only provides a lower-bound + } + }) + + cb(null, response) } Server.prototype._getPeers = function (swarm, numwant) { @@ -553,6 +471,7 @@ function parseUdpRequest (msg, rinfo) { params.numwant = Math.min(params.numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) params.port = msg.readUInt16BE(96) || rinfo.port // optional params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets + params.compact = 1 // udp is always compact } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes @@ -568,6 +487,47 @@ function parseUdpRequest (msg, rinfo) { return params } +function makeUdpPacket (params) { + switch (params.action) { + case common.ACTIONS.CONNECT: + return Buffer.concat([ + common.toUInt32(common.ACTIONS.CONNECT), + common.toUInt32(params.transactionId), + params.connectionId + ]) + case common.ACTIONS.ANNOUNCE: + return Buffer.concat([ + common.toUInt32(common.ACTIONS.ANNOUNCE), + common.toUInt32(params.transactionId), + common.toUInt32(params.intervalMs), + common.toUInt32(params.incomplete), + common.toUInt32(params.complete), + params.peers + ]) + case common.ACTIONS.SCRAPE: + var firstInfoHash = Object.keys(params.files)[0] + var scrapeInfo = firstInfoHash ? { + complete: params.files[firstInfoHash].complete, + incomplete: params.files[firstInfoHash].incomplete, + completed: params.files[firstInfoHash].complete // TODO: this only provides a lower-bound + } : {} + return Buffer.concat([ + common.toUInt32(common.ACTIONS.SCRAPE), + common.toUInt32(params.transactionId), + common.toUInt32(scrapeInfo.complete), + common.toUInt32(scrapeInfo.completed), + common.toUInt32(scrapeInfo.incomplete) + ]) + case common.ACTIONS.ERROR: + return Buffer.concat([ + common.toUInt32(common.ACTIONS.ERROR), + common.toUInt32(params.transactionId || 0), + new Buffer(params.message, 'utf8') + ]) + default: + throw new Error('Action not implemented: ' + params.action) + } +} // HELPER FUNCTIONS From fee289b0740aa8070a8b3fd7024ac35c85189666 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 9 Dec 2014 23:18:20 +0100 Subject: [PATCH 05/16] server: rm superfluous action fields --- server.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.js b/server.js index 10f8bcb..7e18dfc 100644 --- a/server.js +++ b/server.js @@ -162,11 +162,11 @@ Server.prototype._onHttpRequest = function (req, res) { if (err) { self.emit('warning', new Error(err.message)) response = { - action: common.ACTIONS.ERRROR, 'failure reason': err.message } } + delete response.action // only needed for UDP encoding res.end(bencode.encode(response)) }) } From aea3c44c0841b6df64e74bd1fcd5695a32d5076f Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 9 Dec 2014 23:18:47 +0100 Subject: [PATCH 06/16] server: expose getSwarm() drops capability to pass a *hex* infoHash --- server.js | 21 ++++++++------------- test/server.js | 2 +- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/server.js b/server.js index 7e18dfc..93df5af 100644 --- a/server.js +++ b/server.js @@ -116,16 +116,9 @@ Server.prototype.close = function (cb) { } } -Server.prototype.getSwarm = function (infoHash) { - var self = this - var binaryInfoHash = Buffer.isBuffer(infoHash) - ? infoHash.toString('binary') - : new Buffer(infoHash, 'hex').toString('binary') - return self._getSwarm(binaryInfoHash) -} - -Server.prototype._getSwarm = function (binaryInfoHash) { +Server.prototype.getSwarm = function (binaryInfoHash) { var self = this + if (Buffer.isBuffer(binaryInfoHash)) binaryInfoHash = binaryInfoHash.toString('binary') var swarm = self.torrents[binaryInfoHash] if (!swarm) { swarm = self.torrents[binaryInfoHash] = { @@ -224,7 +217,7 @@ Server.prototype._onRequest = function (params, cb) { Server.prototype._onAnnounce = function (params, cb) { var self = this - var swarm = self._getSwarm(params.info_hash) + var swarm = self.getSwarm(params.info_hash) var peer = swarm.peers[params.addr] var start = function () { @@ -335,7 +328,7 @@ Server.prototype._onScrape = function (params, cb) { } params.info_hash.some(function (infoHash) { - var swarm = self._getSwarm(infoHash) + var swarm = self.getSwarm(infoHash) response.files[infoHash] = { complete: swarm.complete, @@ -414,12 +407,14 @@ function parseHttpRequest (req, options) { } if (params.info_hash) { - if (!Array.isArray(params.info_hash)) throw new Error('invalid info_hash') + if (!Array.isArray(params.info_hash)) throw new Error('invalid info_hash array') - params.info_hash.some(function (infoHash) { + params.info_hash = params.info_hash.map(function (infoHash) { if (infoHash.length !== 20) { throw new Error('invalid info_hash') } + + return infoHash }) } diff --git a/test/server.js b/test/server.js index c27b4f5..f6ddd8d 100644 --- a/test/server.js +++ b/test/server.js @@ -2,7 +2,7 @@ var Client = require('../') var Server = require('../').Server var test = require('tape') -var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' +var infoHash = new Buffer('4cb67059ed6bd08362da625b3ae77f6f4a075705', 'hex') var peerId = '01234567890123456789' var peerId2 = '12345678901234567890' var torrentLength = 50000 From 1c754a2f49cad23512336c7112372743a6b20c01 Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 10 Dec 2014 00:44:45 +0100 Subject: [PATCH 07/16] move parseHttpRequest(), parseUdpRequest() to lib/parse_{http,udp}.js --- lib/common.js | 3 + lib/parse_http.js | 60 ++++++++++++++++++++ lib/parse_udp.js | 79 ++++++++++++++++++++++++++ server.js | 137 +--------------------------------------------- 4 files changed, 145 insertions(+), 134 deletions(-) create mode 100644 lib/parse_http.js create mode 100644 lib/parse_udp.js diff --git a/lib/common.js b/lib/common.js index cb83d5b..727e239 100644 --- a/lib/common.js +++ b/lib/common.js @@ -4,6 +4,9 @@ var querystring = require('querystring') +exports.NUM_ANNOUNCE_PEERS = 50 +exports.MAX_ANNOUNCE_PEERS = 82 + exports.CONNECTION_ID = Buffer.concat([ toUInt32(0x417), toUInt32(0x27101980) ]) exports.ACTIONS = { CONNECT: 0, ANNOUNCE: 1, SCRAPE: 2, ERROR: 3 } exports.EVENTS = { update: 0, completed: 1, started: 2, stopped: 3 } diff --git a/lib/parse_http.js b/lib/parse_http.js new file mode 100644 index 0000000..0f7a775 --- /dev/null +++ b/lib/parse_http.js @@ -0,0 +1,60 @@ +var common = require('./common') + +var REMOVE_IPV6_RE = /^::ffff:/ + +module.exports = parseHttpRequest + +function parseHttpRequest (req, options) { + var s = req.url.split('?') + var params = common.querystringParse(s[1]) + + if (s[0] === '/announce') { + params.action = common.ACTIONS.ANNOUNCE + + params.peer_id = typeof params.peer_id === 'string' && common.binaryToUtf8(params.peer_id) + params.port = Number(params.port) + + if (typeof params.info_hash !== 'string') throw new Error('invalid info_hash') + if (params.info_hash.length !== 20) throw new Error('invalid info_hash length') + if (typeof params.peer_id !== 'string') throw new Error('invalid peer_id') + if (params.peer_id.length !== 20) throw new Error('invalid peer_id length') + if (!params.port) throw new Error('invalid port') + + params.left = Number(params.left) + params.compact = Number(params.compact) + + params.ip = options.trustProxy + ? req.headers['x-forwarded-for'] || req.connection.remoteAddress + : req.connection.remoteAddress.replace(REMOVE_IPV6_RE, '') // force ipv4 + params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets? + + params.numwant = Math.min( + Number(params.numwant) || common.NUM_ANNOUNCE_PEERS, + common.MAX_ANNOUNCE_PEERS + ) + + return params + } else if (s[0] === '/scrape') { // unofficial scrape message + params.action = common.ACTIONS.SCRAPE + + if (typeof params.info_hash === 'string') { + params.info_hash = [ params.info_hash ] + } + + if (params.info_hash) { + if (!Array.isArray(params.info_hash)) throw new Error('invalid info_hash array') + + params.info_hash = params.info_hash.map(function (infoHash) { + if (infoHash.length !== 20) { + throw new Error('invalid info_hash') + } + + return infoHash + }) + } + + return params + } else { + return null + } +} diff --git a/lib/parse_udp.js b/lib/parse_udp.js new file mode 100644 index 0000000..411d5b2 --- /dev/null +++ b/lib/parse_udp.js @@ -0,0 +1,79 @@ +var bufferEqual = require('buffer-equal') +var common = require('./common') + + +module.exports = parseUdpRequest + +function parseUdpRequest (msg, rinfo) { + if (msg.length < 16) { + throw new Error('received packet is too short') + } + + if (rinfo.family !== 'IPv4') { + throw new Error('udp tracker does not support IPv6') + } + + var params = { + connectionId: msg.slice(0, 8), // 64-bit + action: msg.readUInt32BE(8), + transactionId: msg.readUInt32BE(12) + } + + // TODO: randomize: + if (!bufferEqual(params.connectionId, common.CONNECTION_ID)) { + throw new Error('received packet with invalid connection id') + } + + if (params.action === common.ACTIONS.CONNECT) { + // No further params + } else if (params.action === common.ACTIONS.ANNOUNCE) { + params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes + params.peer_id = msg.slice(36, 56).toString('utf8') // 20 bytes + params.downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? + params.left = fromUInt64(msg.slice(64, 72)) + params.uploaded = fromUInt64(msg.slice(72, 80)) // TODO: track this? + params.event = msg.readUInt32BE(80) + params.event = common.EVENT_IDS[params.event] + if (!params.event) throw new Error('invalid event') // early return + params.ip = msg.readUInt32BE(84) // optional + params.ip = params.ip ? + ipLib.toString(params.ip) : + params.ip = rinfo.address + params.key = msg.readUInt32BE(88) // TODO: what is this for? + params.numwant = msg.readUInt32BE(92) // optional + // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than + // 512 bytes which is not safe + params.numwant = Math.min(params.numwant || common.NUM_ANNOUNCE_PEERS, common.MAX_ANNOUNCE_PEERS) + params.port = msg.readUInt16BE(96) || rinfo.port // optional + params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets + params.compact = 1 // udp is always compact + + } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message + params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes + + // TODO: support multiple info_hash scrape + if (msg.length > 36) { + throw new Error('multiple info_hash scrape not supported') + } + } else { + return null + } + + return params +} + +// HELPER FUNCTIONS + +var TWO_PWR_32 = (1 << 16) * 2 + +/** + * Return the closest floating-point representation to the buffer value. Precision will be + * lost for big numbers. + */ +function fromUInt64 (buf) { + var high = buf.readUInt32BE(0) | 0 // force + var low = buf.readUInt32BE(4) | 0 + var lowUnsigned = (low >= 0) ? low : TWO_PWR_32 + low + + return high * TWO_PWR_32 + lowUnsigned +} diff --git a/server.js b/server.js index 93df5af..d84de01 100644 --- a/server.js +++ b/server.js @@ -1,8 +1,6 @@ module.exports = Server -module.exports.parseHttpRequest = parseHttpRequest var bencode = require('bencode') -var bufferEqual = require('buffer-equal') var common = require('./lib/common') var debug = require('debug')('bittorrent-tracker') var dgram = require('dgram') @@ -13,12 +11,12 @@ var ipLib = require('ip') var portfinder = require('portfinder') var string2compact = require('string2compact') +var parseHttpRequest = require('./lib/parse_http') +var parseUdpRequest = require('./lib/parse_udp') + // Use random port above 1024 portfinder.basePort = Math.floor(Math.random() * 60000) + 1025 -var NUM_ANNOUNCE_PEERS = 50 -var MAX_ANNOUNCE_PEERS = 82 -var REMOVE_IPV6_RE = /^::ffff:/ inherits(Server, EventEmitter) @@ -369,119 +367,6 @@ Server.prototype._getPeersCompact = function (swarm, numwant) { } -function parseHttpRequest (req, options) { - var s = req.url.split('?') - var params = common.querystringParse(s[1]) - - if (s[0] === '/announce') { - params.action = common.ACTIONS.ANNOUNCE - - params.peer_id = typeof params.peer_id === 'string' && common.binaryToUtf8(params.peer_id) - params.port = Number(params.port) - - if (typeof params.info_hash !== 'string') throw new Error('invalid info_hash') - if (params.info_hash.length !== 20) throw new Error('invalid info_hash length') - if (typeof params.peer_id !== 'string') throw new Error('invalid peer_id') - if (params.peer_id.length !== 20) throw new Error('invalid peer_id length') - if (!params.port) throw new Error('invalid port') - - params.left = Number(params.left) - params.compact = Number(params.compact) - - params.ip = options.trustProxy - ? req.headers['x-forwarded-for'] || req.connection.remoteAddress - : req.connection.remoteAddress.replace(REMOVE_IPV6_RE, '') // force ipv4 - params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets? - - params.numwant = Math.min( - Number(params.numwant) || NUM_ANNOUNCE_PEERS, - MAX_ANNOUNCE_PEERS - ) - - return params - } else if (s[0] === '/scrape') { // unofficial scrape message - params.action = common.ACTIONS.SCRAPE - - if (typeof params.info_hash === 'string') { - params.info_hash = [ params.info_hash ] - } - - if (params.info_hash) { - if (!Array.isArray(params.info_hash)) throw new Error('invalid info_hash array') - - params.info_hash = params.info_hash.map(function (infoHash) { - if (infoHash.length !== 20) { - throw new Error('invalid info_hash') - } - - return infoHash - }) - } - - return params - } else { - return null - } -} - -function parseUdpRequest (msg, rinfo) { - if (msg.length < 16) { - throw new Error('received packet is too short') - } - - if (rinfo.family !== 'IPv4') { - throw new Error('udp tracker does not support IPv6') - } - - var params = { - connectionId: msg.slice(0, 8), // 64-bit - action: msg.readUInt32BE(8), - transactionId: msg.readUInt32BE(12) - } - - // TODO: randomize: - if (!bufferEqual(params.connectionId, common.CONNECTION_ID)) { - throw new Error('received packet with invalid connection id') - } - - if (params.action === common.ACTIONS.CONNECT) { - // No further params - } else if (params.action === common.ACTIONS.ANNOUNCE) { - params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes - params.peer_id = msg.slice(36, 56).toString('utf8') // 20 bytes - params.downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? - params.left = fromUInt64(msg.slice(64, 72)) - params.uploaded = fromUInt64(msg.slice(72, 80)) // TODO: track this? - params.event = msg.readUInt32BE(80) - params.event = common.EVENT_IDS[params.event] - if (!params.event) throw new Error('invalid event') // early return - params.ip = msg.readUInt32BE(84) // optional - params.ip = params.ip ? - ipLib.toString(params.ip) : - params.ip = rinfo.address - params.key = msg.readUInt32BE(88) // TODO: what is this for? - params.numwant = msg.readUInt32BE(92) // optional - // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than - // 512 bytes which is not safe - params.numwant = Math.min(params.numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) - params.port = msg.readUInt16BE(96) || rinfo.port // optional - params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets - params.compact = 1 // udp is always compact - - } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message - params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes - - // TODO: support multiple info_hash scrape - if (msg.length > 36) { - throw new Error('multiple info_hash scrape not supported') - } - } else { - return null - } - - return params -} - function makeUdpPacket (params) { switch (params.action) { case common.ACTIONS.CONNECT: @@ -523,19 +408,3 @@ function makeUdpPacket (params) { throw new Error('Action not implemented: ' + params.action) } } - -// HELPER FUNCTIONS - -var TWO_PWR_32 = (1 << 16) * 2 - -/** - * Return the closest floating-point representation to the buffer value. Precision will be - * lost for big numbers. - */ -function fromUInt64 (buf) { - var high = buf.readUInt32BE(0) | 0 // force - var low = buf.readUInt32BE(4) | 0 - var lowUnsigned = (low >= 0) ? low : TWO_PWR_32 + low - - return high * TWO_PWR_32 + lowUnsigned -} From b72960dee3dc52869a298509da01d163c189dc1e Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 10 Dec 2014 16:47:41 +0100 Subject: [PATCH 08/16] server: split out Swarm --- lib/swarm.js | 133 ++++++++++++++++++++++++++++++++++++++++++++ package.json | 1 + server.js | 152 ++++++++++----------------------------------------- 3 files changed, 163 insertions(+), 123 deletions(-) create mode 100644 lib/swarm.js diff --git a/lib/swarm.js b/lib/swarm.js new file mode 100644 index 0000000..2413139 --- /dev/null +++ b/lib/swarm.js @@ -0,0 +1,133 @@ +var debug = require('debug')('bittorrent-tracker') +var string2compact = require('string2compact') + +module.exports = Swarm + +// Regard this as the default implementation of an interface that you +// need to support when overriding Server.getSwarm() +function Swarm (infoHash, server) { + this.peers = {} + this.complete = 0 + this.incomplete = 0 + this.emit = server.emit.bind(server) +} + +Swarm.prototype.announce = function (params, cb) { + var self = this + var peer = self.peers[params.addr] + + var start = function () { + if (peer) { + debug('unexpected `started` event from peer that is already in swarm') + return update() // treat as an update + } + if (params.left === 0) self.complete += 1 + else self.incomplete += 1 + peer = self.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id + } + self.emit('start', params.addr) + } + + var stop = function () { + if (!peer) { + debug('unexpected `stopped` event from peer that is not in swarm') + return // do nothing + } + if (peer.complete) self.complete -= 1 + else self.incomplete -= 1 + self.peers[params.addr] = null + self.emit('stop', params.addr) + } + + var complete = function () { + if (!peer) { + debug('unexpected `completed` event from peer that is not in swarm') + return start() // treat as a start + } + if (peer.complete) { + debug('unexpected `completed` event from peer that is already marked as completed') + return // do nothing + } + self.complete += 1 + self.incomplete -= 1 + peer.complete = true + self.emit('complete', params.addr) + } + + var update = function () { + if (!peer) { + debug('unexpected `update` event from peer that is not in swarm') + return start() // treat as a start + } + self.emit('update', params.addr) + } + + switch (params.event) { + case 'started': + start() + break + case 'stopped': + stop() + break + case 'completed': + complete() + break + case '': case undefined: case 'empty': case 'update': // update + update() + break + default: + return cb(new Error('invalid event')) // early return + } + + if (params.left === 0 && peer) peer.complete = true + + // send peers + var peers = params.compact === 1 + ? self._getPeersCompact(params.numwant) + : self._getPeers(params.numwant) + + cb(null, { + complete: this.complete, + incomplete: this.incomplete, + peers: peers + }) +} + +Swarm.prototype._getPeers = function (numwant) { + var peers = [] + for (var peerId in this.peers) { + if (peers.length >= numwant) break + var peer = this.peers[peerId] + if (!peer) continue // ignore null values + peers.push({ + 'peer id': peer.peerId, + ip: peer.ip, + port: peer.port + }) + } + return peers +} + +Swarm.prototype._getPeersCompact = function (numwant) { + var peers = [] + + for (var peerId in this.peers) { + if (peers.length >= numwant) break + var peer = this.peers[peerId] + if (!peer) continue // ignore null values + peers.push(peer.ip + ':' + peer.port) + } + + return string2compact(peers) +} + + +Swarm.prototype.scrape = function (infoHash, params, cb) { + cb(null, { + complete: this.complete, + incomplete: this.incomplete + }) +} diff --git a/package.json b/package.json index a83fc5c..e002d8a 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "ip": "^0.3.0", "once": "^1.3.0", "portfinder": "^0.2.1", + "run-series": "^1.0.2", "string2compact": "^1.1.1" }, "devDependencies": { diff --git a/server.js b/server.js index d84de01..241e12c 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,6 @@ module.exports = Server var bencode = require('bencode') -var common = require('./lib/common') var debug = require('debug')('bittorrent-tracker') var dgram = require('dgram') var EventEmitter = require('events').EventEmitter @@ -9,8 +8,10 @@ var http = require('http') var inherits = require('inherits') var ipLib = require('ip') var portfinder = require('portfinder') -var string2compact = require('string2compact') +var series = require('run-series') +var common = require('./lib/common') +var Swarm = require('./lib/swarm') var parseHttpRequest = require('./lib/parse_http') var parseUdpRequest = require('./lib/parse_udp') @@ -119,11 +120,7 @@ Server.prototype.getSwarm = function (binaryInfoHash) { if (Buffer.isBuffer(binaryInfoHash)) binaryInfoHash = binaryInfoHash.toString('binary') var swarm = self.torrents[binaryInfoHash] if (!swarm) { - swarm = self.torrents[binaryInfoHash] = { - complete: 0, - incomplete: 0, - peers: {} - } + swarm = self.torrents[binaryInfoHash] = new Swarm(binaryInfoHash, this) } return swarm } @@ -214,89 +211,13 @@ Server.prototype._onRequest = function (params, cb) { Server.prototype._onAnnounce = function (params, cb) { var self = this - var swarm = self.getSwarm(params.info_hash) - var peer = swarm.peers[params.addr] - - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update + swarm.announce(params, function (err, response) { + if (response) { + if (!response.action) response.action = common.ACTIONS.ANNOUNCE + if (!response.intervalMs) response.intervalMs = self._intervalMs } - if (params.left === 0) swarm.complete += 1 - else swarm.incomplete += 1 - peer = swarm.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) - } - - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) swarm.complete -= 1 - else swarm.incomplete -= 1 - swarm.peers[params.addr] = null - self.emit('stop', params.addr) - } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - swarm.complete += 1 - swarm.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) - } - - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) - } - - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case '': case undefined: case 'empty': case 'update': // update - update() - break - default: - return cb(new Error('invalid event')) // early return - } - - if (params.left === 0 && peer) peer.complete = true - - // send peers - var peers = params.compact === 1 - ? self._getPeersCompact(swarm, params.numwant) - : self._getPeers(swarm, params.numwant) - - cb(null, { - action: common.ACTIONS.ANNOUNCE, - complete: swarm.complete, - incomplete: swarm.incomplete, - peers: peers, - intervalMs: self._intervalMs + cb(err, response) }) } @@ -324,46 +245,31 @@ Server.prototype._onScrape = function (params, cb) { min_request_interval: self._intervalMs } } - - params.info_hash.some(function (infoHash) { + + series(params.info_hash.map(function (infoHash) { var swarm = self.getSwarm(infoHash) - - response.files[infoHash] = { - complete: swarm.complete, - incomplete: swarm.incomplete, - downloaded: swarm.complete // TODO: this only provides a lower-bound + return function (cb) { + swarm.scrape(infoHash, params, function (err, scrapeInfo) { + cb(err, scrapeInfo && { + infoHash: infoHash, + complete: scrapeInfo.complete || 0, + incomplete: scrapeInfo.incomplete || 0 + }) + }) } - }) + }), function (err, results) { + if (err) return cb(err) - cb(null, response) -} - -Server.prototype._getPeers = function (swarm, numwant) { - var peers = [] - for (var peerId in swarm.peers) { - if (peers.length >= numwant) break - var peer = swarm.peers[peerId] - if (!peer) continue // ignore null values - peers.push({ - 'peer id': peer.peerId, - ip: peer.ip, - port: peer.port + results.forEach(function (result) { + response.files[result.infoHash] = { + complete: result.complete, + incomplete: result.incomplete, + downloaded: result.complete // TODO: this only provides a lower-bound + } }) - } - return peers -} -Server.prototype._getPeersCompact = function (swarm, numwant) { - var peers = [] - - for (var peerId in swarm.peers) { - if (peers.length >= numwant) break - var peer = swarm.peers[peerId] - if (!peer) continue // ignore null values - peers.push(peer.ip + ':' + peer.port) - } - - return string2compact(peers) + cb(null, response) + }) } From 8439300df99943fc4993ef07f2aad60b13c461a4 Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 10 Dec 2014 16:51:35 +0100 Subject: [PATCH 09/16] server: unify compact serialization --- lib/swarm.js | 19 +------------------ server.js | 6 ++++++ 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/lib/swarm.js b/lib/swarm.js index 2413139..fbdc9df 100644 --- a/lib/swarm.js +++ b/lib/swarm.js @@ -1,5 +1,4 @@ var debug = require('debug')('bittorrent-tracker') -var string2compact = require('string2compact') module.exports = Swarm @@ -85,9 +84,7 @@ Swarm.prototype.announce = function (params, cb) { if (params.left === 0 && peer) peer.complete = true // send peers - var peers = params.compact === 1 - ? self._getPeersCompact(params.numwant) - : self._getPeers(params.numwant) + var peers = self._getPeers(params.numwant) cb(null, { complete: this.complete, @@ -111,20 +108,6 @@ Swarm.prototype._getPeers = function (numwant) { return peers } -Swarm.prototype._getPeersCompact = function (numwant) { - var peers = [] - - for (var peerId in this.peers) { - if (peers.length >= numwant) break - var peer = this.peers[peerId] - if (!peer) continue // ignore null values - peers.push(peer.ip + ':' + peer.port) - } - - return string2compact(peers) -} - - Swarm.prototype.scrape = function (infoHash, params, cb) { cb(null, { complete: this.complete, diff --git a/server.js b/server.js index 241e12c..e567835 100644 --- a/server.js +++ b/server.js @@ -9,6 +9,7 @@ var inherits = require('inherits') var ipLib = require('ip') var portfinder = require('portfinder') var series = require('run-series') +var string2compact = require('string2compact') var common = require('./lib/common') var Swarm = require('./lib/swarm') @@ -216,6 +217,11 @@ Server.prototype._onAnnounce = function (params, cb) { if (response) { if (!response.action) response.action = common.ACTIONS.ANNOUNCE if (!response.intervalMs) response.intervalMs = self._intervalMs + if (params.compact === 1) { + response.peers = string2compact(response.peers.map(function (peer) { + return peer.ip + ':' + peer.port // TODO: ipv6 brackets + })) + } } cb(err, response) }) From e234f14d56aa5dd0c008457422c7af8a14593b5c Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 10 Dec 2014 17:01:34 +0100 Subject: [PATCH 10/16] server swarm: split out announce events --- lib/swarm.js | 132 ++++++++++++++++++++++++++------------------------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/lib/swarm.js b/lib/swarm.js index fbdc9df..40e4577 100644 --- a/lib/swarm.js +++ b/lib/swarm.js @@ -15,82 +15,84 @@ Swarm.prototype.announce = function (params, cb) { var self = this var peer = self.peers[params.addr] - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update - } - if (params.left === 0) self.complete += 1 - else self.incomplete += 1 - peer = self.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) + // Dispatch announce event + if (!params.event || params.event === 'empty') params.event = 'update' + var fn = '_onAnnounce_' + params.event + if (self[fn]) { + self[fn](params, peer, function (err) { + // event processed, prepare response: + + if (params.left === 0 && peer) peer.complete = true + + // send peers + var peers = self._getPeers(params.numwant) + + cb(null, { + complete: self.complete, + incomplete: self.incomplete, + peers: peers + }) + }) + } else { + cb(new Error('invalid event')) } +} - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) self.complete -= 1 - else self.incomplete -= 1 - self.peers[params.addr] = null - self.emit('stop', params.addr) +Swarm.prototype._onAnnounce_started = function (params, peer, cb) { + if (peer) { + debug('unexpected `started` event from peer that is already in swarm') + return this._onAnnounce_update() // treat as an update } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - self.complete += 1 - self.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) + if (params.left === 0) this.complete += 1 + else this.incomplete += 1 + peer = this.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id } + this.emit('start', params.addr) - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) + cb() +} + +Swarm.prototype._onAnnounce_stopped = function (params, peer, cb) { + if (!peer) { + debug('unexpected `stopped` event from peer that is not in swarm') + return // do nothing } + if (peer.complete) this.complete -= 1 + else this.incomplete -= 1 + this.peers[params.addr] = null + this.emit('stop', params.addr) - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case '': case undefined: case 'empty': case 'update': // update - update() - break - default: - return cb(new Error('invalid event')) // early return + cb() +} + +Swarm.prototype._onAnnounce_completed = function (params, peer, cb) { + if (!peer) { + debug('unexpected `completed` event from peer that is not in swarm') + return start() // treat as a start } + if (peer.complete) { + debug('unexpected `completed` event from peer that is already marked as completed') + return // do nothing + } + this.complete += 1 + this.incomplete -= 1 + peer.complete = true + this.emit('complete', params.addr) - if (params.left === 0 && peer) peer.complete = true + cb() +} - // send peers - var peers = self._getPeers(params.numwant) +Swarm.prototype._onAnnounce_update = function (params, peer, cb) { + if (!peer) { + debug('unexpected `update` event from peer that is not in swarm') + return start() // treat as a start + } + this.emit('update', params.addr) - cb(null, { - complete: this.complete, - incomplete: this.incomplete, - peers: peers - }) + cb() } Swarm.prototype._getPeers = function (numwant) { From 7a417a591d35a6d001fa82000dc97fc60b216d67 Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 11 Dec 2014 16:11:12 +0100 Subject: [PATCH 11/16] parseUdpRequest(): fix params.ip --- lib/parse_udp.js | 3 ++- server.js | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/parse_udp.js b/lib/parse_udp.js index 411d5b2..2c1ee8e 100644 --- a/lib/parse_udp.js +++ b/lib/parse_udp.js @@ -1,4 +1,5 @@ var bufferEqual = require('buffer-equal') +var ipLib = require('ip') var common = require('./common') @@ -38,7 +39,7 @@ function parseUdpRequest (msg, rinfo) { params.ip = msg.readUInt32BE(84) // optional params.ip = params.ip ? ipLib.toString(params.ip) : - params.ip = rinfo.address + rinfo.address params.key = msg.readUInt32BE(88) // TODO: what is this for? params.numwant = msg.readUInt32BE(92) // optional // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than diff --git a/server.js b/server.js index e567835..8fb4aaa 100644 --- a/server.js +++ b/server.js @@ -6,7 +6,6 @@ var dgram = require('dgram') var EventEmitter = require('events').EventEmitter var http = require('http') var inherits = require('inherits') -var ipLib = require('ip') var portfinder = require('portfinder') var series = require('run-series') var string2compact = require('string2compact') From e64fecc063b896ae3286420bcdced571ec9e572b Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 11 Dec 2014 16:19:08 +0100 Subject: [PATCH 12/16] parseUdpRequest(): throw on invalid packet --- lib/parse_udp.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/parse_udp.js b/lib/parse_udp.js index 2c1ee8e..7e5ccdf 100644 --- a/lib/parse_udp.js +++ b/lib/parse_udp.js @@ -57,7 +57,7 @@ function parseUdpRequest (msg, rinfo) { throw new Error('multiple info_hash scrape not supported') } } else { - return null + throw new Error('Invalid action in UDP packet: ' + params.action) } return params From 74d91390494becb06af12e1b353f21a81eb1bbfb Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 11 Dec 2014 16:22:17 +0100 Subject: [PATCH 13/16] server _onHttpRequest(): ensure non-empty params --- server.js | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/server.js b/server.js index 8fb4aaa..b5a3cdb 100644 --- a/server.js +++ b/server.js @@ -127,21 +127,26 @@ Server.prototype.getSwarm = function (binaryInfoHash) { Server.prototype._onHttpRequest = function (req, res) { var self = this - + var error var params try { params = parseHttpRequest(req, { trustProxy: self._trustProxy }) } catch (err) { - debug('sent error %s', err.message) + error = err + } + + if (!error && !params) error = new Error('Empty HTTP parameters') + if (error) { + debug('sent error %s', error.message) res.end(bencode.encode({ - 'failure reason': err.message + 'failure reason': error.message })) // even though it's an error for the client, it's just a warning for the server. // don't crash the server because a client sent bad data :) - self.emit('warning', err) + self.emit('warning', error) return } From 7a8f9a779fb026b70e90319f828c90a7ea68dcd8 Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 11 Dec 2014 16:24:33 +0100 Subject: [PATCH 14/16] server _onHttpRequest(): don't wrap errors in errors --- server.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server.js b/server.js index b5a3cdb..72c0ce8 100644 --- a/server.js +++ b/server.js @@ -153,7 +153,7 @@ Server.prototype._onHttpRequest = function (req, res) { this._onRequest(params, function (err, response) { if (err) { - self.emit('warning', new Error(err.message)) + self.emit('warning', err) response = { 'failure reason': err.message } @@ -182,7 +182,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { // Handle this._onRequest(params, function (err, response) { if (err) { - self.emit('warning', new Error(err.message)) + self.emit('warning', err) response = { action: common.ACTIONS.ERRROR, 'failure reason': err.message From 217bcf7de5238892748ce05183809079fcc64270 Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 11 Dec 2014 16:39:10 +0100 Subject: [PATCH 15/16] server _onUdpRequest(): don't check empty params, errors will be thrown now --- server.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/server.js b/server.js index 72c0ce8..ed1a229 100644 --- a/server.js +++ b/server.js @@ -176,9 +176,6 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { return } - // Do nothing with invalid request - if (!params) return - // Handle this._onRequest(params, function (err, response) { if (err) { From c97e4236f4f6a97bded56482072b056a61d374cb Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 11 Dec 2014 17:07:37 +0100 Subject: [PATCH 16/16] server parseHttpRequest(): simplify info_hash checking --- lib/parse_http.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/parse_http.js b/lib/parse_http.js index 0f7a775..85e3519 100644 --- a/lib/parse_http.js +++ b/lib/parse_http.js @@ -44,12 +44,10 @@ function parseHttpRequest (req, options) { if (params.info_hash) { if (!Array.isArray(params.info_hash)) throw new Error('invalid info_hash array') - params.info_hash = params.info_hash.map(function (infoHash) { + params.info_hash.forEach(function (infoHash) { if (infoHash.length !== 20) { throw new Error('invalid info_hash') } - - return infoHash }) }