diff --git a/index.js b/index.js index bbe1215..264ce7e 100644 --- a/index.js +++ b/index.js @@ -3,6 +3,7 @@ exports.Server = Server var BN = require('bn.js') var bncode = require('bncode') +var bufferEqual = require('buffer-equal') var compact2string = require('compact2string') var concat = require('concat-stream') var dgram = require('dgram') @@ -11,6 +12,7 @@ var extend = require('extend.js') var hat = require('hat') var http = require('http') var inherits = require('inherits') +var ipLib = require('ip') var parallel = require('run-parallel') var querystring = require('querystring') var string2compact = require('string2compact') @@ -18,7 +20,7 @@ var url = require('url') var CONNECTION_ID = Buffer.concat([ toUInt32(0x417), toUInt32(0x27101980) ]) var ACTIONS = { CONNECT: 0, ANNOUNCE: 1, SCRAPE: 2, ERROR: 3 } -var EVENTS = { completed: 1, started: 2, stopped: 3 } +var EVENTS = { update: 0, completed: 1, started: 2, stopped: 3 } var MAX_UINT = 4294967295 inherits(Tracker, EventEmitter) @@ -195,7 +197,14 @@ Tracker.prototype._requestUdp = function (requestUrl, opts) { if (msg.length < 16) { return error('invalid udp handshake') } - announce(msg.slice(8, 16), opts) + + var scrapeStr = 'scrape' + if (requestUrl.substr(requestUrl.lastIndexOf('/') + 1, scrapeStr.length) === scrapeStr) { + scrape(msg.slice(8, 16), opts) + } else { + announce(msg.slice(8, 16), opts) + } + return case 1: // announce @@ -253,12 +262,6 @@ Tracker.prototype._requestUdp = function (requestUrl, opts) { } }) - function error (message) { - self.client.emit('error', new Error(message + ' (connecting to tracker ' + requestUrl + ')')) - try { socket.close() } catch (e) { } - clearTimeout(timeout) - } - function send (message) { if (!parsedUrl.port) { parsedUrl.port = 80; @@ -266,6 +269,12 @@ Tracker.prototype._requestUdp = function (requestUrl, opts) { socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname) } + function error (message) { + self.client.emit('error', new Error(message + ' (connecting to tracker ' + requestUrl + ')')) + try { socket.close() } catch (e) { } + clearTimeout(timeout) + } + function genTransactionId () { transactionId = new Buffer(hat(32), 'hex') } @@ -523,6 +532,7 @@ Server.prototype.listen = function (port) { Server.prototype.close = function (cb) { var self = this + cb = cb || function () {} if (self._udpServer) { self._udpServer.close() } @@ -533,16 +543,22 @@ Server.prototype.close = function (cb) { } } -Server.prototype._onHttpRequest = function (req, res) { +Server.prototype._getSwarm = function (infoHash) { var self = this - - function error (message) { - res.end(bncode.encode({ - 'failure reason': message - })) - self.emit('error', new Error(message)) + var swarm = self.torrents[infoHash] + if (!swarm) { + swarm = self.torrents[infoHash] = { + complete: 0, + incomplete: 0, + peers: {} + } } + return swarm +} + +Server.prototype._onHttpRequest = function (req, res) { + var self = this var warning var s = req.url.split('?') var params = querystring.parse(s[1]) @@ -570,22 +586,23 @@ Server.prototype._onHttpRequest = function (req, res) { case 'started': if (peer) { warning = 'unexpected `started` event from peer that is already in swarm' - } - - var left = Number(params.left) - - if (left === 0) { - swarm.complete += 1 } else { - swarm.incomplete += 1 + var left = Number(params.left) + + if (left === 0) { + swarm.complete += 1 + } else { + swarm.incomplete += 1 + } + + peer = swarm.peers[addr] = { + ip: ip, + port: port, + peerId: peerId + } + self.emit('start', addr) } - peer = swarm.peers[addr] = { - ip: ip, - port: port, - peerId: peerId - } - self.emit('start', addr, params) break case 'stopped': @@ -599,9 +616,9 @@ Server.prototype._onHttpRequest = function (req, res) { swarm.incomplete -= 1 } - delete swarm.peers[addr] + swarm.peers[addr] = null - self.emit('stop', addr, params) + self.emit('stop', addr) break case 'completed': @@ -616,7 +633,7 @@ Server.prototype._onHttpRequest = function (req, res) { swarm.complete += 1 swarm.incomplete -= 1 - self.emit('complete', addr, params) + self.emit('complete', addr) break case '': // update @@ -625,7 +642,7 @@ Server.prototype._onHttpRequest = function (req, res) { return error('unexpected `update` event from peer that is not in swarm') } - self.client.emit('update', addr, params) + self.emit('update', addr) break default: @@ -664,24 +681,186 @@ Server.prototype._onHttpRequest = function (req, res) { res.end(bncode.encode(response)) } + + function error (message) { + res.end(bncode.encode({ + 'failure reason': message + })) + + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) + self.emit('warning', new Error(message)) + } } -Server.prototype._getSwarm = function (infoHash) { +Server.prototype._onUdpRequest = function (msg, rinfo) { var self = this - var swarm = self.torrents[infoHash] - if (!swarm) { - swarm = self.torrents[infoHash] = { - complete: 0, - incomplete: 0, - peers: {} - } + + if (msg.length < 16) { + return error('received packet is too short') } - return swarm -} + if (rinfo.family !== 'IPv4') { + return error('udp tracker does not support IPv6') + } -Server.prototype._onUdpRequest = function (req, res) { - // TODO: implement UDP server + var connectionId = msg.slice(0, 8) // 64-bit + var action = msg.readUInt32BE(8) + var transactionId = msg.readUInt32BE(12) + + if (!bufferEqual(connectionId, CONNECTION_ID)) { + return error('received packet with invalid connection id') + } + + var socket = dgram.createSocket('udp4') + + if (action === ACTIONS.CONNECT) { + send(Buffer.concat([ + toUInt32(ACTIONS.CONNECT), + toUInt32(transactionId), + connectionId + ])) + } else if (action === ACTIONS.ANNOUNCE) { + var infoHash = msg.slice(16, 36).toString('hex') // 20 bytes + var peerId = msg.slice(36, 56).toString('utf8') // 20 bytes + var downloaded = fromUInt64(msg.slice(56, 64)) + var left = fromUInt64(msg.slice(64, 72)) + var uploaded = fromUInt64(msg.slice(72, 80)) + var event = msg.readUInt32BE(80) + var ip = msg.readUInt32BE(84) // optional + var key = msg.readUInt32BE(88) + var numWant = msg.readUInt32BE(92) // optional + var port = msg.readUInt16BE(96) // optional + + if (ip) { + ip = ipLib.toString(ip) + } else { + ip = rinfo.address + } + + if (!port) { + port = rinfo.port + } + + var addr = ip + ':' + port + + var swarm = self._getSwarm(infoHash) + var peer = swarm.peers[addr] + + switch (event) { + case EVENTS.started: + if (peer) { + return error('unexpected `started` event from peer that is already in swarm') + } + + if (left === 0) { + swarm.complete += 1 + } else { + swarm.incomplete += 1 + } + + peer = swarm.peers[addr] = { + ip: ip, + port: port, + peerId: peerId + } + self.emit('start', addr) + + break + + case EVENTS.stopped: + if (!peer) { + return error('unexpected `stopped` event from peer that is not in swarm') + } + + if (peer.complete) { + swarm.complete -= 1 + } else { + swarm.incomplete -= 1 + } + + swarm.peers[addr] = null + + self.emit('stop', addr) + break + + case EVENTS.completed: + if (!peer) { + return error('unexpected `completed` event from peer that is not in swarm') + } + if (peer.complete) { + warning = 'unexpected `completed` event from peer that is already marked as completed' + } + peer.complete = true + + swarm.complete += 1 + swarm.incomplete -= 1 + + self.emit('complete', addr) + break + + case EVENTS.update: // update + if (!peer) { + return error('unexpected `update` event from peer that is not in swarm') + } + + self.emit('update', addr) + break + + default: + return error('unexpected event: ' + event) // early return + } + + // send peers + var peers = self._getPeersCompact(swarm) + + // never send more than 70 peers or else the UDP packet will get too big + if (peers.length >= 70 * 6) { + peers = peers.slice(0, 70 * 6) + } + + send(Buffer.concat([ + toUInt32(ACTIONS.ANNOUNCE), + toUInt32(transactionId), + toUInt32(self._interval), + toUInt32(swarm.incomplete), + toUInt32(swarm.complete), + peers + ])) + + } else if (action === ACTIONS.SCRAPE) { // scrape message + var infoHash = msg.slice(16, 36).toString('hex') // 20 bytes + + // TODO: support multiple info_hash scrape + if (msg.length > 36) { + error('multiple info_hash scrape not supported') + } + + var swarm = self._getSwarm(infoHash) + + send(Buffer.concat([ + toUInt32(ACTIONS.SCRAPE), + toUInt32(transactionId), + toUInt32(swarm.complete), + toUInt32(swarm.complete), // TODO: this only provides a lower-bound + toUInt32(swarm.incomplete) + ])) + } + + function send (buf) { + socket.send(buf, 0, buf.length, rinfo.port, rinfo.address, function () { + socket.close() + }) + } + + function error (message) { + send(Buffer.concat([ + toUInt32(ACTIONS.ERROR), + toUInt32(transactionId || 0), + new Buffer(message, 'utf8') + ])) + self.emit('warning', new Error(message)) + } } Server.prototype._getPeers = function (swarm) { @@ -700,9 +879,13 @@ Server.prototype._getPeers = function (swarm) { Server.prototype._getPeersCompact = function (swarm) { var self = this - var addrs = Object.keys(swarm.peers).map(function (peerId) { + var addrs = [] + + Object.keys(swarm.peers).forEach(function (peerId) { var peer = swarm.peers[peerId] - return peer.ip + ':' + peer.port + if (peer) { + addrs.push(peer.ip + ':' + peer.port) + } }) return string2compact(addrs) @@ -735,6 +918,20 @@ function toUInt64 (n) { return Buffer.concat([toUInt32(0), toUInt32(n)]) } +var TWO_PWR_32 = (1 << 16) * 2 + +/** + * Return the closest floating-point representation to the buffer value. Precision will be + * lost for big numbers. + */ +function fromUInt64 (buf) { + var high = buf.readUInt32BE(0) | 0 // force + var low = buf.readUInt32BE(4) | 0 + var lowUnsigned = (low >= 0) ? low : TWO_PWR_32 + low + + return high * TWO_PWR_32 + lowUnsigned +} + function bytewiseEncodeURIComponent (buf) { return encodeURIComponent(buf.toString('binary')) } diff --git a/package.json b/package.json index 616aa24..10bc00e 100644 --- a/package.json +++ b/package.json @@ -13,11 +13,13 @@ "dependencies": { "bn.js": "^0.7.1", "bncode": "^0.5.3", + "buffer-equal": "0.0.0", "compact2string": "^1.2.0", "concat-stream": "^1.4.5", "extend.js": "0.0.1", "hat": "0.0.3", "inherits": "^2.0.1", + "ip": "^0.3.0", "querystring": "^0.2.0", "run-parallel": "^1.0.0", "string2compact": "^1.1.1"