From 931f0a5a4897a3354ac5d1b42cd1e2fac290b67c Mon Sep 17 00:00:00 2001 From: fisch0920 Date: Fri, 9 May 2014 00:23:14 -0400 Subject: [PATCH] refactored tracker tracker Client into a container for multiple Trackers each with their own, independent update interval and trackerIds --- index.js | 208 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 140 insertions(+), 68 deletions(-) diff --git a/index.js b/index.js index 1d6962d..d187802 100644 --- a/index.js +++ b/index.js @@ -1,3 +1,5 @@ +/* vim: set ts=2 sw=2 sts=2 et: */ + exports.Client = Client exports.Server = Server @@ -20,51 +22,54 @@ var ACTIONS = { CONNECT: 0, ANNOUNCE: 1 } var EVENTS = { completed: 1, started: 2, stopped: 3 } var MAX_UINT = 4294967295 -inherits(Client, EventEmitter) +inherits(Tracker, EventEmitter) -function Client (peerId, port, torrent, opts) { +/** + * An individual torrent tracker + * + * @param {Client} client parent bittorrent tracker client + * @param {string} announceUrl announce url of tracker + * @param {Number} interval interval in ms to send announce requests to the tracker + * @param {Object} opts optional options + */ +function Tracker (client, announceUrl, interval, opts) { var self = this - if (!(self instanceof Client)) return new Client(peerId, port, torrent, opts) + if (!(self instanceof Tracker)) return new Tracker(client, announceUrl, interval, opts) EventEmitter.call(self) self._opts = opts || {} - - // required - self._peerId = Buffer.isBuffer(peerId) - ? peerId - : new Buffer(peerId, 'utf8') - self._port = port - self._infoHash = Buffer.isBuffer(torrent.infoHash) - ? torrent.infoHash - : new Buffer(torrent.infoHash, 'hex') - self._torrentLength = torrent.length - self._announce = torrent.announce - - // optional - self._numWant = self._opts.numWant || 80 - self._intervalMs = self._opts.interval || (30 * 60 * 1000) // default: 30 minutes - + + self.client = client + + self._announceUrl = announceUrl + self._intervalMs = interval self._interval = null + + if (self._announceUrl.indexOf('udp:') === 0) { + self._requestImpl = self._requestUdp.bind(self) + } else { + self._requestImpl = self._requestHttp.bind(self) + } } -Client.prototype.start = function (opts) { +Tracker.prototype.start = function (opts) { var self = this opts = opts || {} opts.event = 'started' self._request(opts) - + self.setInterval(self._intervalMs) // start announcing on intervals } -Client.prototype.stop = function (opts) { +Tracker.prototype.stop = function (opts) { var self = this opts = opts || {} opts.event = 'stopped' self._request(opts) - + self.setInterval(0) // stop announcing on intervals } -Client.prototype.complete = function (opts) { +Tracker.prototype.complete = function (opts) { var self = this opts = opts || {} opts.event = 'completed' @@ -72,13 +77,13 @@ Client.prototype.complete = function (opts) { self._request(opts) } -Client.prototype.update = function (opts) { +Tracker.prototype.update = function (opts) { var self = this opts = opts || {} self._request(opts) } -Client.prototype.setInterval = function (intervalMs) { +Tracker.prototype.setInterval = function (intervalMs) { var self = this if (self._interval) { clearInterval(self._interval) @@ -93,59 +98,53 @@ Client.prototype.setInterval = function (intervalMs) { /** * Send a request to the tracker */ -Client.prototype._request = function (opts) { +Tracker.prototype._request = function (opts) { var self = this opts = extend({ - info_hash: bytewiseEncodeURIComponent(self._infoHash), - peer_id: bytewiseEncodeURIComponent(self._peerId), - port: self._port, - left: self._torrentLength - (opts.downloaded || 0), + info_hash: bytewiseEncodeURIComponent(self.client._infoHash), + peer_id: bytewiseEncodeURIComponent(self.client._peerId), + port: self.client._port, + left: self.client._torrentLength - (opts.downloaded || 0), compact: 1, - numwant: self._numWant, + numwant: self.client._numWant, uploaded: 0, // default, user should provide real value downloaded: 0 // default, user should provide real value }, opts) - + if (self._trackerId) { opts.trackerid = self._trackerId } - - self._announce.forEach(function (announceUrl) { - if (announceUrl.indexOf('udp:') === 0) { - self._requestUdp(announceUrl, opts) - } else { - self._requestHttp(announceUrl, opts) - } - }) + + self._requestImpl(opts) } -Client.prototype._requestHttp = function (announceUrl, opts) { +Tracker.prototype._requestHttp = function (opts) { var self = this - var fullUrl = announceUrl + '?' + querystring.stringify(opts) + var fullUrl = self._announceUrl + '?' + querystring.stringify(opts) var req = http.get(fullUrl, function (res) { var data = '' if (res.statusCode !== 200) { res.resume() // consume the whole stream - self.emit('error', new Error('Invalid response code ' + res.statusCode + ' from tracker')) + self.client.emit('error', new Error('Invalid response code ' + res.statusCode + ' from tracker')) return } res.on('data', function (chunk) { data += chunk }) res.on('end', function () { - self._handleResponse(data, announceUrl) + self._handleResponse(data) }) }) req.on('error', function (err) { - self.emit('error', err) + self.client.emit('error', err) }) } -Client.prototype._requestUdp = function (announceUrl, opts) { +Tracker.prototype._requestUdp = function (opts) { var self = this - var parsedUrl = url.parse(announceUrl) + var parsedUrl = url.parse(self._announceUrl) var socket = dgram.createSocket('udp4') var transactionId = new Buffer(hat(32), 'hex') @@ -158,7 +157,7 @@ Client.prototype._requestUdp = function (announceUrl, opts) { } function error (message) { - self.emit('error', new Error(message)) + self.client.emit('error', new Error(message)) socket.close() clearTimeout(timeout) } @@ -187,7 +186,6 @@ Client.prototype._requestUdp = function (announceUrl, opts) { return error(new Error('invalid announce message')) } - // TODO: this should be stored per tracker, not globally for all trackers var interval = message.readUInt32BE(8) if (interval && !self._opts.interval && self._intervalMs !== 0) { // use the interval the tracker recommends, UNLESS the user manually specifies an @@ -195,14 +193,14 @@ Client.prototype._requestUdp = function (announceUrl, opts) { self.setInterval(interval * 1000) } - self.emit('update', { - announce: announceUrl, + self.client.emit('update', { + announce: self._announceUrl, complete: message.readUInt32BE(16), incomplete: message.readUInt32BE(12) }) compact2string.multi(message.slice(20)).forEach(function (addr) { - self.emit('peer', addr) + self.client.emit('peer', addr) }) clearTimeout(timeout) @@ -225,16 +223,16 @@ Client.prototype._requestUdp = function (announceUrl, opts) { connectionId, toUInt32(ACTIONS.ANNOUNCE), transactionId, - self._infoHash, - self._peerId, + self.client._infoHash, + self.client._peerId, toUInt64(opts.downloaded || 0), toUInt64(opts.left || 0), toUInt64(opts.uploaded || 0), toUInt32(EVENTS[opts.event] || 0), toUInt32(0), // ip address (optional) toUInt32(0), // key (optional) - toUInt32(self._numWant), - toUInt16(self._port || 0) + toUInt32(self.client._numWant), + toUInt16(self.client._port || 0) ])) } @@ -245,25 +243,24 @@ Client.prototype._requestUdp = function (announceUrl, opts) { ])) } -Client.prototype._handleResponse = function (data, announceUrl) { +Tracker.prototype._handleResponse = function (data) { var self = this try { data = bncode.decode(data) } catch (err) { - return self.emit('error', new Error('Error decoding tracker response: ' + err.message)) + return self.client.emit('error', new Error('Error decoding tracker response: ' + err.message)) } var failure = data['failure reason'] if (failure) { - return self.emit('error', new Error(failure)) + return self.client.emit('error', new Error(failure)) } var warning = data['warning message'] if (warning) { - self.emit('warning', warning); + self.client.emit('warning', warning); } - // TODO: this should be stored per tracker, not globally for all trackers var interval = data.interval || data['min interval'] if (interval && !self._opts.interval && self._intervalMs !== 0) { // use the interval the tracker recommends, UNLESS the user manually specifies an @@ -271,15 +268,14 @@ Client.prototype._handleResponse = function (data, announceUrl) { self.setInterval(interval * 1000) } - // TODO: this should be stored per tracker, not globally for all trackers var trackerId = data['tracker id'] if (trackerId) { // If absent, do not discard previous trackerId value self._trackerId = trackerId } - self.emit('update', { - announce: announceUrl, + self.client.emit('update', { + announce: self._announceUrl, complete: data.complete, incomplete: data.incomplete }) @@ -287,17 +283,92 @@ Client.prototype._handleResponse = function (data, announceUrl) { if (Buffer.isBuffer(data.peers)) { // tracker returned compact response compact2string.multi(data.peers).forEach(function (addr) { - self.emit('peer', addr) + self.client.emit('peer', addr) }) } else if (Array.isArray(data.peers)) { // tracker returned normal response data.peers.forEach(function (peer) { var ip = peer.ip - self.emit('peer', ip[0] + '.' + ip[1] + '.' + ip[2] + '.' + ip[3] + ':' + peer.port) + self.client.emit('peer', ip[0] + '.' + ip[1] + '.' + ip[2] + '.' + ip[3] + ':' + peer.port) }) } } +inherits(Client, EventEmitter) + +/** + * A Client manages tracker connections for a torrent. + * + * @param {string} peerId this peer's id + * @param {Number} port port number that the client is listening on + * @param {Object} torrent parsed torrent + * @param {Object} opts optional options + * @param {Number} opts.numWant number of peers to request + * @param {Number} opts.interval interval in ms to send announce requests to the tracker + */ +function Client (peerId, port, torrent, opts) { + var self = this + if (!(self instanceof Client)) return new Client(peerId, port, torrent, opts) + EventEmitter.call(self) + self._opts = opts || {} + + // required + self._peerId = Buffer.isBuffer(peerId) + ? peerId + : new Buffer(peerId, 'utf8') + self._port = port + self._infoHash = Buffer.isBuffer(torrent.infoHash) + ? torrent.infoHash + : new Buffer(torrent.infoHash, 'hex') + self._torrentLength = torrent.length + self._announce = torrent.announce + + // optional + self._numWant = self._opts.numWant || 80 + self._intervalMs = self._opts.interval || (30 * 60 * 1000) // default: 30 minutes + + self._trackers = torrent.announce.map(function (announceUrl) { + return Tracker(self, announceUrl, self._intervalMs, self._opts) + }) +} + +Client.prototype.start = function (opts) { + var self = this + self._trackers.forEach(function (tracker) { + tracker.start(opts) + }) +} + +Client.prototype.stop = function (opts) { + var self = this + self._trackers.forEach(function (tracker) { + tracker.stop(opts) + }) +} + +Client.prototype.complete = function (opts) { + var self = this + self._trackers.forEach(function (tracker) { + tracker.complete(opts) + }) +} + +Client.prototype.update = function (opts) { + var self = this + self._trackers.forEach(function (tracker) { + tracker.update(opts) + }) +} + +Client.prototype.setInterval = function (intervalMs) { + var self = this + self._intervalMs = intervalMs + + self._trackers.forEach(function (tracker) { + tracker.setInterval(intervalMs) + }) +} + inherits(Server, EventEmitter) function Server (opts) { @@ -452,7 +523,7 @@ Server.prototype._onHttpRequest = function (req, res) { return error('unexpected `update` event from peer that is not in swarm') } - self.emit('update', addr, params) + self.client.emit('update', addr, params) break default: @@ -476,6 +547,7 @@ Server.prototype._onHttpRequest = function (req, res) { } res.end(bncode.encode(response)) + } else { // TODO: handle unofficial scrape messages } }