From f575cfc3c921f51e5f6e6b461b7a9252e8ed4066 Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Tue, 24 Mar 2015 01:01:49 -0700 Subject: [PATCH] major client refactor - each tracker type into its own file in preparation of adding a new websocket tracker type, for webtorrent --- client.js | 582 ++++++++------------------------------- lib/common.js | 4 +- lib/http-tracker.js | 197 +++++++++++++ lib/parse_http.js | 2 +- lib/parse_udp.js | 2 +- lib/udp-tracker.js | 241 ++++++++++++++++ lib/websocket-tracker.js | 0 7 files changed, 554 insertions(+), 474 deletions(-) create mode 100644 lib/http-tracker.js create mode 100644 lib/udp-tracker.js create mode 100644 lib/websocket-tracker.js diff --git a/client.js b/client.js index 1d14776..7f0d2ea 100644 --- a/client.js +++ b/client.js @@ -1,35 +1,34 @@ module.exports = Client -var bencode = require('bencode') -var BN = require('bn.js') -var common = require('./lib/common') -var compact2string = require('compact2string') var debug = require('debug')('bittorrent-tracker') -var dgram = require('dgram') var EventEmitter = require('events').EventEmitter -var get = require('simple-get') -var hat = require('hat') var inherits = require('inherits') var once = require('once') var url = require('url') +var common = require('./lib/common') +var HTTPTracker = require('./lib/http-tracker') +var UDPTracker = require('./lib/udp-tracker') +var WebSocketTracker = require('./lib/websocket-tracker') + inherits(Client, EventEmitter) /** * A Client manages tracker connections for a torrent. * - * @param {string} peerId this peer's id - * @param {Number} port port number that the client is listening on - * @param {Object} torrent parsed torrent - * @param {Object} opts optional options + * @param {string} peerId peer id + * @param {Number} port torrent client listening port + * @param {Object} torrent parsed torrent + * @param {Object} opts options object * @param {Number} opts.numWant number of peers to request - * @param {Number} opts.interval interval in ms to send announce requests to the tracker + * @param {Number} opts.interval announce interval (in ms) + * @param {Number} opts.rtcConfig RTCPeerConnection configuration object */ function Client (peerId, port, torrent, opts) { var self = this if (!(self instanceof Client)) return new Client(peerId, port, torrent, opts) EventEmitter.call(self) - self._opts = opts || {} + if (!opts) opts = {} // required self._peerId = Buffer.isBuffer(peerId) @@ -42,19 +41,29 @@ function Client (peerId, port, torrent, opts) { self.torrentLength = torrent.length // optional - self._numWant = self._opts.numWant || 50 - self._intervalMs = self._opts.interval || (30 * 60 * 1000) // default: 30 minutes + self._numWant = opts.numWant || common.DEFAULT_ANNOUNCE_PEERS + self._intervalMs = opts.interval || common.DEFAULT_ANNOUNCE_INTERVAL debug('new client %s', self._infoHash.toString('hex')) if (typeof torrent.announce === 'string') torrent.announce = [ torrent.announce ] + self._trackers = (torrent.announce || []) .filter(function (announceUrl) { var protocol = url.parse(announceUrl).protocol - return protocol === 'udp:' || protocol === 'http:' || protocol === 'https:' + return [ 'udp:', 'http:', 'https:', 'ws:', 'wss:' ].indexOf(protocol) !== -1 }) .map(function (announceUrl) { - return new Tracker(self, announceUrl, self._opts) + var trackerOpts = { interval: self._intervalMs } + var protocol = url.parse(announceUrl).protocol + + if (protocol === 'http:' || protocol === 'https:') { + return new HTTPTracker(self, announceUrl, trackerOpts) + } else if (protocol === 'udp:') { + return new UDPTracker(self, announceUrl, trackerOpts) + } else if (protocol === 'ws:' || protocol === 'wss:') { + return new WebSocketTracker(self, announceUrl, trackerOpts) + } }) } @@ -67,15 +76,14 @@ function Client (peerId, port, torrent, opts) { */ Client.scrape = function (announceUrl, infoHash, cb) { cb = once(cb) - var dummy = { - peerId: new Buffer('01234567890123456789'), - port: 6881, - torrent: { - infoHash: infoHash, - announce: [ announceUrl ] - } + + var peerId = new Buffer('01234567890123456789') // dummy value + var port = 6881 // dummy value + var torrent = { + infoHash: infoHash, + announce: [ announceUrl ] } - var client = new Client(dummy.peerId, dummy.port, dummy.torrent) + var client = new Client(peerId, port, torrent) client.once('error', cb) client.once('scrape', function (data) { cb(null, data) @@ -83,36 +91,95 @@ Client.scrape = function (announceUrl, infoHash, cb) { client.scrape() } +/** + * Send a `start` announce to the trackers. + * @param {Object} opts + * @param {number=} opts.uploaded + * @param {number=} opts.downloaded + * @param {number=} opts.left (if not set, calculated automatically) + */ Client.prototype.start = function (opts) { var self = this + debug('send `start`') + opts = self._defaultAnnounceOpts(opts) + opts.event = 'started' + self._announce(opts) + + // start announcing on intervals self._trackers.forEach(function (tracker) { - tracker.start(opts) + tracker.setInterval(self._intervalMs) }) } +/** + * Send a `stop` announce to the trackers. + * @param {Object} opts + * @param {number=} opts.uploaded + * @param {number=} opts.downloaded + * @param {number=} opts.left (if not set, calculated automatically) + */ Client.prototype.stop = function (opts) { var self = this - self._trackers.forEach(function (tracker) { - tracker.stop(opts) - }) + debug('send `stop`') + opts = self._defaultAnnounceOpts(opts) + opts.event = 'stopped' + self._announce(opts) + + self.destroy() } +/** + * Send a `complete` announce to the trackers. + * @param {Object} opts + * @param {number=} opts.uploaded + * @param {number=} opts.downloaded + * @param {number=} opts.left (if not set, calculated automatically) + */ Client.prototype.complete = function (opts) { var self = this - self._trackers.forEach(function (tracker) { - tracker.complete(opts) - }) + debug('send `complete`') + if (!opts) opts = {} + if (opts.downloaded == null && self.torrentLength != null) { + opts.downloaded = self.torrentLength + } + opts = self._defaultAnnounceOpts(opts) + opts.event = 'completed' + self._announce(opts) } +/** + * Send a `update` announce to the trackers. + * @param {Object} opts + * @param {number=} opts.uploaded + * @param {number=} opts.downloaded + * @param {number=} opts.left (if not set, calculated automatically) + */ Client.prototype.update = function (opts) { + var self = this + debug('send `update`') + opts = self._defaultAnnounceOpts(opts) + if (opts.event) delete opts.event + self._announce(opts) +} + +Client.prototype._announce = function (opts) { var self = this self._trackers.forEach(function (tracker) { - tracker.update(opts) + tracker.announce(opts) }) } +/** + * Send a scrape request to the trackers. + * @param {Object} opts + * @param {number=} opts.uploaded + * @param {number=} opts.downloaded + * @param {number=} opts.left (if not set, calculated automatically) + */ Client.prototype.scrape = function (opts) { var self = this + debug('send `scrape`') + if (!opts) opts = {} self._trackers.forEach(function (tracker) { tracker.scrape(opts) }) @@ -120,6 +187,7 @@ Client.prototype.scrape = function (opts) { Client.prototype.setInterval = function (intervalMs) { var self = this + debug('setInterval') self._intervalMs = intervalMs self._trackers.forEach(function (tracker) { @@ -129,453 +197,25 @@ Client.prototype.setInterval = function (intervalMs) { Client.prototype.destroy = function () { var self = this + debug('destroy') + self._trackers.forEach(function (tracker) { - tracker.destroy() + if (tracker.destroy) tracker.destroy() + tracker.setInterval(0) // stop announcing on intervals }) } -inherits(Tracker, EventEmitter) - -/** - * An individual torrent tracker (used by Client) - * - * @param {Client} client parent bittorrent tracker client - * @param {string} announceUrl announce url of tracker - * @param {Object} opts optional options - */ -function Tracker (client, announceUrl, opts) { +Client.prototype._defaultAnnounceOpts = function (opts) { var self = this - EventEmitter.call(self) - self._opts = opts || {} + if (!opts) opts = {} - self.client = client + if (opts.numWant == null) opts.numWant = self._numWant - debug('new tracker %s', announceUrl) - - self._announceUrl = announceUrl - self._intervalMs = self.client._intervalMs // use client interval initially - self._interval = null - - var protocol = url.parse(self._announceUrl).protocol - if (protocol === 'udp:') { - self._requestImpl = self._requestUdp - } else if (protocol === 'http:' || protocol === 'https:') { - self._requestImpl = self._requestHttp - } -} - -Tracker.prototype.start = function (opts) { - var self = this - opts = opts || {} - opts.event = 'started' - - debug('sent `start` %s', self._announceUrl) - self._announce(opts) - self.setInterval(self._intervalMs) // start announcing on intervals -} - -Tracker.prototype.stop = function (opts) { - var self = this - opts = opts || {} - opts.event = 'stopped' - - debug('sent `stop` %s', self._announceUrl) - self._announce(opts) - self.destroy() -} - -Tracker.prototype.complete = function (opts) { - var self = this - opts = opts || {} - opts.event = 'completed' - opts.downloaded = opts.downloaded || self.torrentLength || 0 - - debug('sent `complete` %s', self._announceUrl) - self._announce(opts) -} - -Tracker.prototype.update = function (opts) { - var self = this - opts = opts || {} - - debug('sent `update` %s', self._announceUrl) - self._announce(opts) -} - -Tracker.prototype.destroy = function () { - var self = this - debug('destroy', self._announceUrl) - self.setInterval(0) // stop announcing on intervals -} - -/** - * Send an announce request to the tracker. - * @param {Object} opts - * @param {number=} opts.uploaded - * @param {number=} opts.downloaded - * @param {number=} opts.left (if not set, calculated automatically) - */ -Tracker.prototype._announce = function (opts) { - var self = this - - // defaults, user should provide real values if (opts.uploaded == null) opts.uploaded = 0 if (opts.downloaded == null) opts.downloaded = 0 - if (self.client.torrentLength != null && opts.left == null) { - opts.left = self.client.torrentLength - (opts.downloaded || 0) + if (opts.left == null && self.torrentLength != null) { + opts.left = self.torrentLength - opts.downloaded } - - self._requestImpl(self._announceUrl, opts) -} - -/** - * Send a scrape request to the tracker. - */ -Tracker.prototype.scrape = function () { - var self = this - - self._scrapeUrl = self._scrapeUrl || getScrapeUrl(self._announceUrl) - - if (!self._scrapeUrl) { - debug('scrape not supported %s', self._announceUrl) - self.client.emit('error', new Error('scrape not supported for announceUrl ' + self._announceUrl)) - return - } - - debug('sent `scrape` %s', self._announceUrl) - self._requestImpl(self._scrapeUrl, { _scrape: true }) -} - -Tracker.prototype.setInterval = function (intervalMs) { - var self = this - clearInterval(self._interval) - - self._intervalMs = intervalMs - if (intervalMs) { - self._interval = setInterval(self.update.bind(self), self._intervalMs) - } -} - -Tracker.prototype._requestHttp = function (requestUrl, opts) { - var self = this - - if (opts._scrape) { - if (opts.info_hash == null) opts.info_hash = self.client._infoHash.toString('binary') - } else { - if (opts.info_hash == null) opts.info_hash = self.client._infoHash.toString('binary') - if (opts.peer_id == null) opts.peer_id = self.client._peerId.toString('binary') - if (opts.port == null) opts.port = self.client._port - if (opts.compact == null) opts.compact = 1 - if (opts.numwant == null) opts.numwant = self.client._numWant - - if (self._trackerId) { - opts.trackerid = self._trackerId - } - } - - var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + - common.querystringStringify(opts) - - get.concat(u, function (err, data, res) { - if (err) return self.client.emit('warning', err) - if (res.statusCode !== 200) return self.client.emit('warning', new Error('Non-200 response code ' + res.statusCode + ' from ' + requestUrl)) - if (data && data.length) self._handleResponse(requestUrl, data) - }) -} - -Tracker.prototype._requestUdp = function (requestUrl, opts) { - var self = this - opts = opts || {} - var parsedUrl = url.parse(requestUrl) - var socket = dgram.createSocket('udp4') - var transactionId = new Buffer(hat(32), 'hex') - - var stopped = opts.event === 'stopped' - // if we're sending a stopped message, we don't really care if it arrives, so set - // a short timer and don't call error - var timeout = setTimeout(function () { - timeout = null - cleanup() - if (!stopped) { - error('tracker request timed out') - } - }, stopped ? 1500 : 15000) - - if (timeout && timeout.unref) { - timeout.unref() - } - - send(Buffer.concat([ - common.CONNECTION_ID, - common.toUInt32(common.ACTIONS.CONNECT), - transactionId - ])) - - socket.on('error', error) - - socket.on('message', function (msg) { - if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { - return error('tracker sent invalid transaction id') - } - - var action = msg.readUInt32BE(0) - debug(requestUrl + ' UDP response, action ' + action) - switch (action) { - case 0: // handshake - if (msg.length < 16) { - return error('invalid udp handshake') - } - - if (opts._scrape) { - scrape(msg.slice(8, 16)) - } else { - announce(msg.slice(8, 16), opts) - } - - return - - case 1: // announce - cleanup() - if (msg.length < 20) { - return error('invalid announce message') - } - - var interval = msg.readUInt32BE(8) - if (interval && !self._opts.interval && self._intervalMs !== 0) { - // use the interval the tracker recommends, UNLESS the user manually specifies an - // interval they want to use - self.setInterval(interval * 1000) - } - - self.client.emit('update', { - announce: self._announceUrl, - complete: msg.readUInt32BE(16), - incomplete: msg.readUInt32BE(12) - }) - - var addrs - try { - addrs = compact2string.multi(msg.slice(20)) - } catch (err) { - return self.client.emit('warning', err) - } - addrs.forEach(function (addr) { - self.client.emit('peer', addr) - }) - break - - case 2: // scrape - cleanup() - if (msg.length < 20) { - return error('invalid scrape message') - } - self.client.emit('scrape', { - announce: self._announceUrl, - complete: msg.readUInt32BE(8), - downloaded: msg.readUInt32BE(12), - incomplete: msg.readUInt32BE(16) - }) - break - - case 3: // error - cleanup() - if (msg.length < 8) { - return error('invalid error message') - } - self.client.emit('warning', new Error(msg.slice(8).toString())) - break - - default: - error('tracker sent invalid action') - break - } - }) - - function send (message) { - if (!parsedUrl.port) { - parsedUrl.port = 80 - } - socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname) - } - - function error (message) { - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', new Error(message + ' (' + requestUrl + ')')) - cleanup() - } - - function cleanup () { - if (timeout) { - clearTimeout(timeout) - timeout = null - } - try { socket.close() } catch (err) {} - } - - function genTransactionId () { - transactionId = new Buffer(hat(32), 'hex') - } - - function announce (connectionId, opts) { - opts = opts || {} - genTransactionId() - - send(Buffer.concat([ - connectionId, - common.toUInt32(common.ACTIONS.ANNOUNCE), - transactionId, - self.client._infoHash, - self.client._peerId, - toUInt64(opts.downloaded || 0), - opts.left ? toUInt64(opts.left) : new Buffer('FFFFFFFFFFFFFFFF', 'hex'), - toUInt64(opts.uploaded || 0), - common.toUInt32(common.EVENTS[opts.event] || 0), - common.toUInt32(0), // ip address (optional) - common.toUInt32(0), // key (optional) - common.toUInt32(self.client._numWant), - toUInt16(self.client._port || 0) - ])) - } - - function scrape (connectionId) { - genTransactionId() - - send(Buffer.concat([ - connectionId, - common.toUInt32(common.ACTIONS.SCRAPE), - transactionId, - self.client._infoHash - ])) - } -} - -Tracker.prototype._handleResponse = function (requestUrl, data) { - var self = this - - try { - data = bencode.decode(data) - } catch (err) { - return self.client.emit('warning', new Error('Error decoding tracker response: ' + err.message)) - } - var failure = data['failure reason'] - if (failure) { - debug('failure from ' + requestUrl + ' (' + failure + ')') - return self.client.emit('warning', new Error(failure)) - } - - var warning = data['warning message'] - if (warning) { - debug('warning from ' + requestUrl + ' (' + warning + ')') - self.client.emit('warning', new Error(warning)) - } - - debug('response from ' + requestUrl) - - if (requestUrl === self._announceUrl) { - var interval = data.interval || data['min interval'] - if (interval && !self._opts.interval && self._intervalMs !== 0) { - // use the interval the tracker recommends, UNLESS the user manually specifies an - // interval they want to use - self.setInterval(interval * 1000) - } - - var trackerId = data['tracker id'] - if (trackerId) { - // If absent, do not discard previous trackerId value - self._trackerId = trackerId - } - - self.client.emit('update', { - announce: self._announceUrl, - complete: data.complete, - incomplete: data.incomplete - }) - - var addrs - if (Buffer.isBuffer(data.peers)) { - // tracker returned compact response - try { - addrs = compact2string.multi(data.peers) - } catch (err) { - return self.client.emit('warning', err) - } - addrs.forEach(function (addr) { - self.client.emit('peer', addr) - }) - } else if (Array.isArray(data.peers)) { - // tracker returned normal response - data.peers.forEach(function (peer) { - self.client.emit('peer', peer.ip + ':' + peer.port) - }) - } - - if (Buffer.isBuffer(data.peers6)) { - // tracker returned compact response - try { - addrs = compact2string.multi6(data.peers6) - } catch (err) { - return self.client.emit('warning', err) - } - addrs.forEach(function (addr) { - self.client.emit('peer', addr) - }) - } else if (Array.isArray(data.peers6)) { - // tracker returned normal response - data.peers6.forEach(function (peer) { - var ip = /^\[/.test(peer.ip) || !/:/.test(peer.ip) - ? peer.ip /* ipv6 w/ brackets or domain name */ - : '[' + peer.ip + ']' /* ipv6 without brackets */ - self.client.emit('peer', ip + ':' + peer.port) - }) - } - } else if (requestUrl === self._scrapeUrl) { - // NOTE: the unofficial spec says to use the 'files' key but i've seen 'host' in practice - data = data.files || data.host || {} - data = data[self.client._infoHash.toString('binary')] - - if (!data) { - self.client.emit('warning', new Error('invalid scrape response')) - } else { - // TODO: optionally handle data.flags.min_request_interval (separate from announce interval) - self.client.emit('scrape', { - announce: self._announceUrl, - complete: data.complete, - incomplete: data.incomplete, - downloaded: data.downloaded - }) - } - } -} - -function toUInt16 (n) { - var buf = new Buffer(2) - buf.writeUInt16BE(n, 0) - return buf -} - -var MAX_UINT = 4294967295 - -function toUInt64 (n) { - if (n > MAX_UINT || typeof n === 'string') { - var bytes = new BN(n).toArray() - while (bytes.length < 8) { - bytes.unshift(0) - } - return new Buffer(bytes) - } - return Buffer.concat([common.toUInt32(0), common.toUInt32(n)]) -} - -var UDP_TRACKER = /^udp:\/\// -var HTTP_SCRAPE_SUPPORT = /\/(announce)[^\/]*$/ - -function getScrapeUrl (announceUrl) { - if (announceUrl.match(UDP_TRACKER)) return announceUrl - var match = announceUrl.match(HTTP_SCRAPE_SUPPORT) - if (match) { - var i = match.index - return announceUrl.slice(0, i) + '/scrape' + announceUrl.slice(i + 9) - } - return null + return opts } diff --git a/lib/common.js b/lib/common.js index 5d3eda5..bde4424 100644 --- a/lib/common.js +++ b/lib/common.js @@ -7,7 +7,9 @@ var querystring = require('querystring') exports.IPV4_RE = /^[\d\.]+$/ exports.IPV6_RE = /^[\da-fA-F:]+$/ -exports.NUM_ANNOUNCE_PEERS = 50 +exports.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes + +exports.DEFAULT_ANNOUNCE_PEERS = 50 exports.MAX_ANNOUNCE_PEERS = 82 exports.CONNECTION_ID = Buffer.concat([ toUInt32(0x417), toUInt32(0x27101980) ]) diff --git a/lib/http-tracker.js b/lib/http-tracker.js new file mode 100644 index 0000000..4210990 --- /dev/null +++ b/lib/http-tracker.js @@ -0,0 +1,197 @@ +module.exports = HTTPTracker + +var bencode = require('bencode') +var compact2string = require('compact2string') +var debug = require('debug')('bittorrent-tracker:http-tracker') +var EventEmitter = require('events').EventEmitter +var get = require('simple-get') +var inherits = require('inherits') + +var common = require('./common') + +var HTTP_SCRAPE_SUPPORT = /\/(announce)[^\/]*$/ + +inherits(HTTPTracker, EventEmitter) + +/** + * HTTP torrent tracker client (for an individual tracker) + * + * @param {Client} client parent bittorrent tracker client + * @param {string} announceUrl announce url of tracker + * @param {Object} opts options object + */ +function HTTPTracker (client, announceUrl, opts) { + var self = this + EventEmitter.call(self) + debug('new http tracker %s', announceUrl) + + self.client = client + + self._opts = opts + self._announceUrl = announceUrl + self._intervalMs = self.client._intervalMs // use client interval initially + self._interval = null + + // Determine scrape url (if http tracker supports it) + self._scrapeUrl = null + var m + if ((m = self._announceUrl.match(HTTP_SCRAPE_SUPPORT))) { + self._scrapeUrl = self._announceUrl.slice(0, m.index) + '/scrape' + + self._announceUrl.slice(m.index + 9) + } +} + +HTTPTracker.prototype.announce = function (opts) { + var self = this + if (self._trackerId) opts.trackerid = self._trackerId + + if (opts.compact == null) opts.compact = 1 + if (opts.numwant == null) opts.numwant = self.client._numWant // spec says 'numwant' + + opts.info_hash = self.client._infoHash.toString('binary') + opts.peer_id = self.client._peerId.toString('binary') + opts.port = self.client._port + + self._request(self._announceUrl, opts, self._onAnnounceResponse.bind(self)) +} + +HTTPTracker.prototype.scrape = function (opts) { + var self = this + + if (!self._scrapeUrl) { + self.client.emit('error', new Error('scrape not supported ' + self._announceUrl)) + return + } + + opts.info_hash = self.client._infoHash.toString('binary') + self._request(self._scrapeUrl, opts, self._onScrapeResponse.bind(self)) +} + +// TODO: Improve this interface +HTTPTracker.prototype.setInterval = function (intervalMs) { + var self = this + clearInterval(self._interval) + + self._intervalMs = intervalMs + if (intervalMs) { + // HACK + var update = self.announce.bind(self, self.client._defaultAnnounceOpts()) + self._interval = setInterval(update, self._intervalMs) + } +} + +HTTPTracker.prototype._request = function (requestUrl, opts, cb) { + var self = this + + var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + + common.querystringStringify(opts) + + get.concat(u, function (err, data, res) { + if (err) return self.client.emit('warning', err) + if (res.statusCode !== 200) return self.client.emit('warning', new Error('Non-200 response code ' + res.statusCode + ' from ' + self.a)) + if (!data || data.length === 0) return + + try { + data = bencode.decode(data) + } catch (err) { + return self.client.emit('warning', new Error('Error decoding tracker response: ' + err.message)) + } + var failure = data['failure reason'] + if (failure) { + debug('failure from ' + requestUrl + ' (' + failure + ')') + return self.client.emit('warning', new Error(failure)) + } + + var warning = data['warning message'] + if (warning) { + debug('warning from ' + requestUrl + ' (' + warning + ')') + self.client.emit('warning', new Error(warning)) + } + + debug('response from ' + requestUrl) + + cb(data) + }) +} + +HTTPTracker.prototype._onAnnounceResponse = function (data) { + var self = this + + var interval = data.interval || data['min interval'] + if (interval && !self._opts.interval && self._intervalMs !== 0) { + // use the interval the tracker recommends, UNLESS the user manually specifies an + // interval they want to use + self.setInterval(interval * 1000) + } + + var trackerId = data['tracker id'] + if (trackerId) { + // If absent, do not discard previous trackerId value + self._trackerId = trackerId + } + + self.client.emit('update', { + announce: self._announceUrl, + complete: data.complete, + incomplete: data.incomplete + }) + + var addrs + if (Buffer.isBuffer(data.peers)) { + // tracker returned compact response + try { + addrs = compact2string.multi(data.peers) + } catch (err) { + return self.client.emit('warning', err) + } + addrs.forEach(function (addr) { + self.client.emit('peer', addr) + }) + } else if (Array.isArray(data.peers)) { + // tracker returned normal response + data.peers.forEach(function (peer) { + self.client.emit('peer', peer.ip + ':' + peer.port) + }) + } + + if (Buffer.isBuffer(data.peers6)) { + // tracker returned compact response + try { + addrs = compact2string.multi6(data.peers6) + } catch (err) { + return self.client.emit('warning', err) + } + addrs.forEach(function (addr) { + self.client.emit('peer', addr) + }) + } else if (Array.isArray(data.peers6)) { + // tracker returned normal response + data.peers6.forEach(function (peer) { + var ip = /^\[/.test(peer.ip) || !/:/.test(peer.ip) + ? peer.ip /* ipv6 w/ brackets or domain name */ + : '[' + peer.ip + ']' /* ipv6 without brackets */ + self.client.emit('peer', ip + ':' + peer.port) + }) + } +} + +HTTPTracker.prototype._onScrapeResponse = function (data) { + var self = this + // NOTE: the unofficial spec says to use the 'files' key, 'host' has been + // seen in practice + data = data.files || data.host || {} + data = data[self.client._infoHash.toString('binary')] + + if (!data) { + self.client.emit('warning', new Error('invalid scrape response')) + } else { + // TODO: optionally handle data.flags.min_request_interval + // (separate from announce interval) + self.client.emit('scrape', { + announce: self._announceUrl, + complete: data.complete, + incomplete: data.incomplete, + downloaded: data.downloaded + }) + } +} diff --git a/lib/parse_http.js b/lib/parse_http.js index bfc0d94..7d06937 100644 --- a/lib/parse_http.js +++ b/lib/parse_http.js @@ -27,7 +27,7 @@ function parseHttpRequest (req, options) { params.left = Number(params.left) params.compact = Number(params.compact) params.numwant = Math.min( - Number(params.numwant) || common.NUM_ANNOUNCE_PEERS, + Number(params.numwant) || common.DEFAULT_ANNOUNCE_PEERS, common.MAX_ANNOUNCE_PEERS ) diff --git a/lib/parse_udp.js b/lib/parse_udp.js index dd6634f..e1819fd 100644 --- a/lib/parse_udp.js +++ b/lib/parse_udp.js @@ -42,7 +42,7 @@ function parseUdpRequest (msg, rinfo) { // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than // 512 bytes which is not safe params.numwant = Math.min( - msg.readUInt32BE(92) || common.NUM_ANNOUNCE_PEERS, // optional + msg.readUInt32BE(92) || common.DEFAULT_ANNOUNCE_PEERS, // optional common.MAX_ANNOUNCE_PEERS ) diff --git a/lib/udp-tracker.js b/lib/udp-tracker.js new file mode 100644 index 0000000..e9e1807 --- /dev/null +++ b/lib/udp-tracker.js @@ -0,0 +1,241 @@ +module.exports = UDPTracker + +var BN = require('bn.js') +var compact2string = require('compact2string') +var debug = require('debug')('bittorrent-tracker:http-tracker') +var dgram = require('dgram') +var EventEmitter = require('events').EventEmitter +var hat = require('hat') +var inherits = require('inherits') +var url = require('url') + +var common = require('./common') + +inherits(UDPTracker, EventEmitter) + +/** + * UDP torrent tracker client (for an individual tracker) + * + * @param {Client} client parent bittorrent tracker client + * @param {string} announceUrl announce url of tracker + * @param {Object} opts options object + */ +function UDPTracker (client, announceUrl, opts) { + var self = this + EventEmitter.call(self) + debug('new udp tracker %s', announceUrl) + + self.client = client + + self._opts = opts + self._announceUrl = announceUrl + self._intervalMs = self.client._intervalMs // use client interval initially + self._interval = null +} + +UDPTracker.prototype.announce = function (opts) { + var self = this + self._request(self._announceUrl, opts) +} + +UDPTracker.prototype.scrape = function (opts) { + var self = this + opts._scrape = true + self._request(self._announceUrl, opts) // udp scrape uses same announce url +} + +UDPTracker.prototype._request = function (requestUrl, opts) { + var self = this + if (!opts) opts = {} + var parsedUrl = url.parse(requestUrl) + var socket = dgram.createSocket('udp4') + var transactionId = genTransactionId() + + // does not matter if `stopped` event arrives, so supress errors & cleanup after timeout + var timeout = setTimeout(function () { + timeout = null + cleanup() + if (opts.event !== 'stopped') { + error('tracker request timed out') + } + }, opts.event === 'stopped' ? 1500 : 15000) + + if (timeout && timeout.unref) { + timeout.unref() + } + + send(Buffer.concat([ + common.CONNECTION_ID, + common.toUInt32(common.ACTIONS.CONNECT), + transactionId + ])) + + socket.on('error', error) + + socket.on('message', function (msg) { + if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { + return error('tracker sent invalid transaction id') + } + + var action = msg.readUInt32BE(0) + debug(requestUrl + ' UDP response, action ' + action) + switch (action) { + case 0: // handshake + if (msg.length < 16) { + return error('invalid udp handshake') + } + + if (opts._scrape) { + scrape(msg.slice(8, 16)) + } else { + announce(msg.slice(8, 16), opts) + } + + return + + case 1: // announce + cleanup() + if (msg.length < 20) { + return error('invalid announce message') + } + + var interval = msg.readUInt32BE(8) + if (interval && !self._opts.interval && self._intervalMs !== 0) { + // use the interval the tracker recommends, UNLESS the user manually specifies an + // interval they want to use + self.setInterval(interval * 1000) + } + + self.client.emit('update', { + announce: self._announceUrl, + complete: msg.readUInt32BE(16), + incomplete: msg.readUInt32BE(12) + }) + + var addrs + try { + addrs = compact2string.multi(msg.slice(20)) + } catch (err) { + return self.client.emit('warning', err) + } + addrs.forEach(function (addr) { + self.client.emit('peer', addr) + }) + break + + case 2: // scrape + cleanup() + if (msg.length < 20) { + return error('invalid scrape message') + } + self.client.emit('scrape', { + announce: self._announceUrl, + complete: msg.readUInt32BE(8), + downloaded: msg.readUInt32BE(12), + incomplete: msg.readUInt32BE(16) + }) + break + + case 3: // error + cleanup() + if (msg.length < 8) { + return error('invalid error message') + } + self.client.emit('warning', new Error(msg.slice(8).toString())) + break + + default: + error('tracker sent invalid action') + break + } + }) + + function send (message) { + if (!parsedUrl.port) { + parsedUrl.port = 80 + } + socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname) + } + + function error (message) { + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', new Error(message + ' (' + requestUrl + ')')) + cleanup() + } + + function cleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + try { socket.close() } catch (err) {} + } + + function announce (connectionId, opts) { + opts = opts || {} + transactionId = genTransactionId() + + send(Buffer.concat([ + connectionId, + common.toUInt32(common.ACTIONS.ANNOUNCE), + transactionId, + self.client._infoHash, + self.client._peerId, + toUInt64(opts.downloaded || 0), + opts.left != null ? toUInt64(opts.left) : new Buffer('FFFFFFFFFFFFFFFF', 'hex'), + toUInt64(opts.uploaded || 0), + common.toUInt32(common.EVENTS[opts.event] || 0), + common.toUInt32(0), // ip address (optional) + common.toUInt32(0), // key (optional) + common.toUInt32(opts.numWant || common.DEFAULT_ANNOUNCE_PEERS), + toUInt16(self.client._port || 0) + ])) + } + + function scrape (connectionId) { + transactionId = genTransactionId() + + send(Buffer.concat([ + connectionId, + common.toUInt32(common.ACTIONS.SCRAPE), + transactionId, + self.client._infoHash + ])) + } +} + +// TODO: Improve this interface +UDPTracker.prototype.setInterval = function (intervalMs) { + var self = this + clearInterval(self._interval) + + self._intervalMs = intervalMs + if (intervalMs) { + // HACK + var update = self.announce.bind(self, self.client._defaultAnnounceOpts()) + self._interval = setInterval(update, self._intervalMs) + } +} + +function genTransactionId () { + return new Buffer(hat(32), 'hex') +} + +function toUInt16 (n) { + var buf = new Buffer(2) + buf.writeUInt16BE(n, 0) + return buf +} + +var MAX_UINT = 4294967295 + +function toUInt64 (n) { + if (n > MAX_UINT || typeof n === 'string') { + var bytes = new BN(n).toArray() + while (bytes.length < 8) { + bytes.unshift(0) + } + return new Buffer(bytes) + } + return Buffer.concat([common.toUInt32(0), common.toUInt32(n)]) +} diff --git a/lib/websocket-tracker.js b/lib/websocket-tracker.js new file mode 100644 index 0000000..e69de29