From 386e0a5fbe28700154a0551e39177490dfd441dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jimmy=20W=C3=A4rting?= Date: Wed, 3 Oct 2018 14:44:11 +0200 Subject: [PATCH] lebab lib/client --- lib/client/http-tracker.js | 478 +++++++++--------- lib/client/tracker.js | 44 +- lib/client/udp-tracker.js | 495 +++++++++--------- lib/client/websocket-tracker.js | 855 ++++++++++++++++---------------- 4 files changed, 934 insertions(+), 938 deletions(-) diff --git a/lib/client/http-tracker.js b/lib/client/http-tracker.js index 66440a6..656d96d 100644 --- a/lib/client/http-tracker.js +++ b/lib/client/http-tracker.js @@ -1,18 +1,13 @@ -module.exports = HTTPTracker +const arrayRemove = require('unordered-array-remove') +const bencode = require('bencode') +const compact2string = require('compact2string') +const debug = require('debug')('bittorrent-tracker:http-tracker') +const get = require('simple-get') -var arrayRemove = require('unordered-array-remove') -var bencode = require('bencode') -var compact2string = require('compact2string') -var debug = require('debug')('bittorrent-tracker:http-tracker') -var get = require('simple-get') -var inherits = require('inherits') +const common = require('../common') +const Tracker = require('./tracker') -var common = require('../common') -var Tracker = require('./tracker') - -var HTTP_SCRAPE_SUPPORT = /\/(announce)[^/]*$/ - -inherits(HTTPTracker, Tracker) +const HTTP_SCRAPE_SUPPORT = /\/(announce)[^/]*$/ /** * HTTP torrent tracker client (for an individual tracker) @@ -21,238 +16,241 @@ inherits(HTTPTracker, Tracker) * @param {string} announceUrl announce url of tracker * @param {Object} opts options object */ -function HTTPTracker (client, announceUrl, opts) { - var self = this - Tracker.call(self, client, announceUrl) - debug('new http tracker %s', announceUrl) +class HTTPTracker extends Tracker { + constructor (client, announceUrl, opts) { + super(client, announceUrl) - // Determine scrape url (if http tracker supports it) - self.scrapeUrl = null + const self = this + debug('new http tracker %s', announceUrl) - var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT) - if (match) { - var pre = self.announceUrl.slice(0, match.index) - var post = self.announceUrl.slice(match.index + 9) - self.scrapeUrl = pre + '/scrape' + post + // Determine scrape url (if http tracker supports it) + self.scrapeUrl = null + + const match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT) + if (match) { + const pre = self.announceUrl.slice(0, match.index) + const post = self.announceUrl.slice(match.index + 9) + self.scrapeUrl = `${pre}/scrape${post}` + } + + self.cleanupFns = [] + self.maybeDestroyCleanup = null } - self.cleanupFns = [] - self.maybeDestroyCleanup = null + announce (opts) { + const self = this + if (self.destroyed) return + + const params = Object.assign({}, opts, { + compact: (opts.compact == null) ? 1 : opts.compact, + info_hash: self.client._infoHashBinary, + peer_id: self.client._peerIdBinary, + port: self.client._port + }) + if (self._trackerId) params.trackerid = self._trackerId + + self._request(self.announceUrl, params, (err, data) => { + if (err) return self.client.emit('warning', err) + self._onAnnounceResponse(data) + }) + } + + scrape (opts) { + const self = this + if (self.destroyed) return + + if (!self.scrapeUrl) { + self.client.emit('error', new Error(`scrape not supported ${self.announceUrl}`)) + return + } + + const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) + ? opts.infoHash.map(infoHash => { + return infoHash.toString('binary') + }) + : (opts.infoHash && opts.infoHash.toString('binary')) || self.client._infoHashBinary + const params = { + info_hash: infoHashes + } + self._request(self.scrapeUrl, params, (err, data) => { + if (err) return self.client.emit('warning', err) + self._onScrapeResponse(data) + }) + } + + destroy (cb) { + const self = this + if (self.destroyed) return cb(null) + self.destroyed = true + clearInterval(self.interval) + + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = () => { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(cleanup => { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } + } + + _request (requestUrl, params, cb) { + const self = this + const u = requestUrl + (!requestUrl.includes('?') ? '?' : '&') + + common.querystringStringify(params) + + self.cleanupFns.push(cleanup) + + let request = get.concat({ + url: u, + timeout: common.REQUEST_TIMEOUT, + headers: { + 'user-agent': self.client._userAgent || '' + } + }, onResponse) + + function cleanup () { + if (request) { + arrayRemove(self.cleanupFns, self.cleanupFns.indexOf(cleanup)) + request.abort() + request = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() + } + + function onResponse (err, res, data) { + cleanup() + if (self.destroyed) return + + if (err) return cb(err) + if (res.statusCode !== 200) { + return cb(new Error(`Non-200 response code ${res.statusCode} from ${self.announceUrl}`)) + } + if (!data || data.length === 0) { + return cb(new Error(`Invalid tracker response from${self.announceUrl}`)) + } + + try { + data = bencode.decode(data) + } catch (err) { + return cb(new Error(`Error decoding tracker response: ${err.message}`)) + } + const failure = data['failure reason'] + if (failure) { + debug(`failure from ${requestUrl} (${failure})`) + return cb(new Error(failure)) + } + + const warning = data['warning message'] + if (warning) { + debug(`warning from ${requestUrl} (${warning})`) + self.client.emit('warning', new Error(warning)) + } + + debug(`response from ${requestUrl}`) + + cb(null, data) + } + } + + _onAnnounceResponse (data) { + const self = this + + const interval = data.interval || data['min interval'] + if (interval) self.setInterval(interval * 1000) + + const trackerId = data['tracker id'] + if (trackerId) { + // If absent, do not discard previous trackerId value + self._trackerId = trackerId + } + + const response = Object.assign({}, data, { + announce: self.announceUrl, + infoHash: common.binaryToHex(data.info_hash) + }) + self.client.emit('update', response) + + let addrs + if (Buffer.isBuffer(data.peers)) { + // tracker returned compact response + try { + addrs = compact2string.multi(data.peers) + } catch (err) { + return self.client.emit('warning', err) + } + addrs.forEach(addr => { + self.client.emit('peer', addr) + }) + } else if (Array.isArray(data.peers)) { + // tracker returned normal response + data.peers.forEach(peer => { + self.client.emit('peer', `${peer.ip}:${peer.port}`) + }) + } + + if (Buffer.isBuffer(data.peers6)) { + // tracker returned compact response + try { + addrs = compact2string.multi6(data.peers6) + } catch (err) { + return self.client.emit('warning', err) + } + addrs.forEach(addr => { + self.client.emit('peer', addr) + }) + } else if (Array.isArray(data.peers6)) { + // tracker returned normal response + data.peers6.forEach(peer => { + const ip = /^\[/.test(peer.ip) || !/:/.test(peer.ip) + ? peer.ip /* ipv6 w/ brackets or domain name */ + : `[${peer.ip}]` /* ipv6 without brackets */ + self.client.emit('peer', `${ip}:${peer.port}`) + }) + } + } + + _onScrapeResponse (data) { + const self = this + // NOTE: the unofficial spec says to use the 'files' key, 'host' has been + // seen in practice + data = data.files || data.host || {} + + const keys = Object.keys(data) + if (keys.length === 0) { + self.client.emit('warning', new Error('invalid scrape response')) + return + } + + keys.forEach(infoHash => { + // TODO: optionally handle data.flags.min_request_interval + // (separate from announce interval) + const response = Object.assign(data[infoHash], { + announce: self.announceUrl, + infoHash: common.binaryToHex(infoHash) + }) + self.client.emit('scrape', response) + }) + } } HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes -HTTPTracker.prototype.announce = function (opts) { - var self = this - if (self.destroyed) return - - var params = Object.assign({}, opts, { - compact: (opts.compact == null) ? 1 : opts.compact, - info_hash: self.client._infoHashBinary, - peer_id: self.client._peerIdBinary, - port: self.client._port - }) - if (self._trackerId) params.trackerid = self._trackerId - - self._request(self.announceUrl, params, function (err, data) { - if (err) return self.client.emit('warning', err) - self._onAnnounceResponse(data) - }) -} - -HTTPTracker.prototype.scrape = function (opts) { - var self = this - if (self.destroyed) return - - if (!self.scrapeUrl) { - self.client.emit('error', new Error('scrape not supported ' + self.announceUrl)) - return - } - - var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) - ? opts.infoHash.map(function (infoHash) { - return infoHash.toString('binary') - }) - : (opts.infoHash && opts.infoHash.toString('binary')) || self.client._infoHashBinary - var params = { - info_hash: infoHashes - } - self._request(self.scrapeUrl, params, function (err, data) { - if (err) return self.client.emit('warning', err) - self._onScrapeResponse(data) - }) -} - -HTTPTracker.prototype.destroy = function (cb) { - var self = this - if (self.destroyed) return cb(null) - self.destroyed = true - clearInterval(self.interval) - - // If there are no pending requests, destroy immediately. - if (self.cleanupFns.length === 0) return destroyCleanup() - - // Otherwise, wait a short time for pending requests to complete, then force - // destroy them. - var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) - - // But, if all pending requests complete before the timeout fires, do cleanup - // right away. - self.maybeDestroyCleanup = function () { - if (self.cleanupFns.length === 0) destroyCleanup() - } - - function destroyCleanup () { - if (timeout) { - clearTimeout(timeout) - timeout = null - } - self.maybeDestroyCleanup = null - self.cleanupFns.slice(0).forEach(function (cleanup) { - cleanup() - }) - self.cleanupFns = [] - cb(null) - } -} - -HTTPTracker.prototype._request = function (requestUrl, params, cb) { - var self = this - var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + - common.querystringStringify(params) - - self.cleanupFns.push(cleanup) - - var request = get.concat({ - url: u, - timeout: common.REQUEST_TIMEOUT, - headers: { - 'user-agent': self.client._userAgent || '' - } - }, onResponse) - - function cleanup () { - if (request) { - arrayRemove(self.cleanupFns, self.cleanupFns.indexOf(cleanup)) - request.abort() - request = null - } - if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() - } - - function onResponse (err, res, data) { - cleanup() - if (self.destroyed) return - - if (err) return cb(err) - if (res.statusCode !== 200) { - return cb(new Error('Non-200 response code ' + - res.statusCode + ' from ' + self.announceUrl)) - } - if (!data || data.length === 0) { - return cb(new Error('Invalid tracker response from' + - self.announceUrl)) - } - - try { - data = bencode.decode(data) - } catch (err) { - return cb(new Error('Error decoding tracker response: ' + err.message)) - } - var failure = data['failure reason'] - if (failure) { - debug('failure from ' + requestUrl + ' (' + failure + ')') - return cb(new Error(failure)) - } - - var warning = data['warning message'] - if (warning) { - debug('warning from ' + requestUrl + ' (' + warning + ')') - self.client.emit('warning', new Error(warning)) - } - - debug('response from ' + requestUrl) - - cb(null, data) - } -} - -HTTPTracker.prototype._onAnnounceResponse = function (data) { - var self = this - - var interval = data.interval || data['min interval'] - if (interval) self.setInterval(interval * 1000) - - var trackerId = data['tracker id'] - if (trackerId) { - // If absent, do not discard previous trackerId value - self._trackerId = trackerId - } - - var response = Object.assign({}, data, { - announce: self.announceUrl, - infoHash: common.binaryToHex(data.info_hash) - }) - self.client.emit('update', response) - - var addrs - if (Buffer.isBuffer(data.peers)) { - // tracker returned compact response - try { - addrs = compact2string.multi(data.peers) - } catch (err) { - return self.client.emit('warning', err) - } - addrs.forEach(function (addr) { - self.client.emit('peer', addr) - }) - } else if (Array.isArray(data.peers)) { - // tracker returned normal response - data.peers.forEach(function (peer) { - self.client.emit('peer', peer.ip + ':' + peer.port) - }) - } - - if (Buffer.isBuffer(data.peers6)) { - // tracker returned compact response - try { - addrs = compact2string.multi6(data.peers6) - } catch (err) { - return self.client.emit('warning', err) - } - addrs.forEach(function (addr) { - self.client.emit('peer', addr) - }) - } else if (Array.isArray(data.peers6)) { - // tracker returned normal response - data.peers6.forEach(function (peer) { - var ip = /^\[/.test(peer.ip) || !/:/.test(peer.ip) - ? peer.ip /* ipv6 w/ brackets or domain name */ - : '[' + peer.ip + ']' /* ipv6 without brackets */ - self.client.emit('peer', ip + ':' + peer.port) - }) - } -} - -HTTPTracker.prototype._onScrapeResponse = function (data) { - var self = this - // NOTE: the unofficial spec says to use the 'files' key, 'host' has been - // seen in practice - data = data.files || data.host || {} - - var keys = Object.keys(data) - if (keys.length === 0) { - self.client.emit('warning', new Error('invalid scrape response')) - return - } - - keys.forEach(function (infoHash) { - // TODO: optionally handle data.flags.min_request_interval - // (separate from announce interval) - var response = Object.assign(data[infoHash], { - announce: self.announceUrl, - infoHash: common.binaryToHex(infoHash) - }) - self.client.emit('scrape', response) - }) -} +module.exports = HTTPTracker diff --git a/lib/client/tracker.js b/lib/client/tracker.js index 73de2cd..cd3175e 100644 --- a/lib/client/tracker.js +++ b/lib/client/tracker.js @@ -1,30 +1,30 @@ -module.exports = Tracker +const EventEmitter = require('events') -var EventEmitter = require('events').EventEmitter -var inherits = require('inherits') +class Tracker extends EventEmitter { + constructor (client, announceUrl) { + super() -inherits(Tracker, EventEmitter) + const self = this + self.client = client + self.announceUrl = announceUrl -function Tracker (client, announceUrl) { - var self = this - EventEmitter.call(self) - self.client = client - self.announceUrl = announceUrl + self.interval = null + self.destroyed = false + } - self.interval = null - self.destroyed = false -} + setInterval (intervalMs) { + const self = this + if (intervalMs == null) intervalMs = self.DEFAULT_ANNOUNCE_INTERVAL -Tracker.prototype.setInterval = function (intervalMs) { - var self = this - if (intervalMs == null) intervalMs = self.DEFAULT_ANNOUNCE_INTERVAL + clearInterval(self.interval) - clearInterval(self.interval) - - if (intervalMs) { - self.interval = setInterval(function () { - self.announce(self.client._defaultAnnounceOpts()) - }, intervalMs) - if (self.interval.unref) self.interval.unref() + if (intervalMs) { + self.interval = setInterval(() => { + self.announce(self.client._defaultAnnounceOpts()) + }, intervalMs) + if (self.interval.unref) self.interval.unref() + } } } + +module.exports = Tracker diff --git a/lib/client/udp-tracker.js b/lib/client/udp-tracker.js index 3198edc..7d0f901 100644 --- a/lib/client/udp-tracker.js +++ b/lib/client/udp-tracker.js @@ -1,19 +1,14 @@ -module.exports = UDPTracker +const arrayRemove = require('unordered-array-remove') +const BN = require('bn.js') +const Buffer = require('safe-buffer').Buffer +const compact2string = require('compact2string') +const debug = require('debug')('bittorrent-tracker:udp-tracker') +const dgram = require('dgram') +const randombytes = require('randombytes') +const url = require('url') -var arrayRemove = require('unordered-array-remove') -var BN = require('bn.js') -var Buffer = require('safe-buffer').Buffer -var compact2string = require('compact2string') -var debug = require('debug')('bittorrent-tracker:udp-tracker') -var dgram = require('dgram') -var inherits = require('inherits') -var randombytes = require('randombytes') -var url = require('url') - -var common = require('../common') -var Tracker = require('./tracker') - -inherits(UDPTracker, Tracker) +const common = require('../common') +const Tracker = require('./tracker') /** * UDP torrent tracker client (for an individual tracker) @@ -22,256 +17,258 @@ inherits(UDPTracker, Tracker) * @param {string} announceUrl announce url of tracker * @param {Object} opts options object */ -function UDPTracker (client, announceUrl, opts) { - var self = this - Tracker.call(self, client, announceUrl) - debug('new udp tracker %s', announceUrl) +class UDPTracker extends Tracker { + constructor (client, announceUrl, opts) { + super(client, announceUrl) + const self = this + debug('new udp tracker %s', announceUrl) - self.cleanupFns = [] - self.maybeDestroyCleanup = null + self.cleanupFns = [] + self.maybeDestroyCleanup = null + } + + announce (opts) { + const self = this + if (self.destroyed) return + self._request(opts) + } + + scrape (opts) { + const self = this + if (self.destroyed) return + opts._scrape = true + self._request(opts) // udp scrape uses same announce url + } + + destroy (cb) { + const self = this + if (self.destroyed) return cb(null) + self.destroyed = true + clearInterval(self.interval) + + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = () => { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(cleanup => { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } + } + + _request (opts) { + const self = this + if (!opts) opts = {} + const parsedUrl = url.parse(self.announceUrl) + let transactionId = genTransactionId() + let socket = dgram.createSocket('udp4') + + let timeout = setTimeout(() => { + // does not matter if `stopped` event arrives, so supress errors + if (opts.event === 'stopped') cleanup() + else onError(new Error(`tracker request timed out (${opts.event})`)) + timeout = null + }, common.REQUEST_TIMEOUT) + if (timeout.unref) timeout.unref() + + self.cleanupFns.push(cleanup) + + send(Buffer.concat([ + common.CONNECTION_ID, + common.toUInt32(common.ACTIONS.CONNECT), + transactionId + ])) + + socket.once('error', onError) + socket.on('message', onSocketMessage) + + function cleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + if (socket) { + arrayRemove(self.cleanupFns, self.cleanupFns.indexOf(cleanup)) + socket.removeListener('error', onError) + socket.removeListener('message', onSocketMessage) + socket.on('error', noop) // ignore all future errors + try { socket.close() } catch (err) {} + socket = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() + } + + function onError (err) { + cleanup() + if (self.destroyed) return + + if (err.message) err.message += ` (${self.announceUrl})` + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', err) + } + + function onSocketMessage (msg) { + if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { + return onError(new Error('tracker sent invalid transaction id')) + } + + const action = msg.readUInt32BE(0) + debug('UDP response %s, action %s', self.announceUrl, action) + switch (action) { + case 0: // handshake + // Note: no check for `self.destroyed` so that pending messages to the + // tracker can still be sent/received even after destroy() is called + + if (msg.length < 16) return onError(new Error('invalid udp handshake')) + + if (opts._scrape) scrape(msg.slice(8, 16)) + else announce(msg.slice(8, 16), opts) + + break + + case 1: // announce + cleanup() + if (self.destroyed) return + + if (msg.length < 20) return onError(new Error('invalid announce message')) + + const interval = msg.readUInt32BE(8) + if (interval) self.setInterval(interval * 1000) + + self.client.emit('update', { + announce: self.announceUrl, + complete: msg.readUInt32BE(16), + incomplete: msg.readUInt32BE(12) + }) + + let addrs + try { + addrs = compact2string.multi(msg.slice(20)) + } catch (err) { + return self.client.emit('warning', err) + } + addrs.forEach(addr => { + self.client.emit('peer', addr) + }) + + break + + case 2: // scrape + cleanup() + if (self.destroyed) return + + if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { + return onError(new Error('invalid scrape message')) + } + const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) + ? opts.infoHash.map(infoHash => { return infoHash.toString('hex') }) + : [ (opts.infoHash && opts.infoHash.toString('hex')) || self.client.infoHash ] + + for (let i = 0, len = (msg.length - 8) / 12; i < len; i += 1) { + self.client.emit('scrape', { + announce: self.announceUrl, + infoHash: infoHashes[i], + complete: msg.readUInt32BE(8 + (i * 12)), + downloaded: msg.readUInt32BE(12 + (i * 12)), + incomplete: msg.readUInt32BE(16 + (i * 12)) + }) + } + + break + + case 3: // error + cleanup() + if (self.destroyed) return + + if (msg.length < 8) return onError(new Error('invalid error message')) + self.client.emit('warning', new Error(msg.slice(8).toString())) + + break + + default: + onError(new Error('tracker sent invalid action')) + break + } + } + + function send (message) { + if (!parsedUrl.port) { + parsedUrl.port = 80 + } + socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname) + } + + function announce (connectionId, opts) { + transactionId = genTransactionId() + + send(Buffer.concat([ + connectionId, + common.toUInt32(common.ACTIONS.ANNOUNCE), + transactionId, + self.client._infoHashBuffer, + self.client._peerIdBuffer, + toUInt64(opts.downloaded), + opts.left != null ? toUInt64(opts.left) : Buffer.from('FFFFFFFFFFFFFFFF', 'hex'), + toUInt64(opts.uploaded), + common.toUInt32(common.EVENTS[opts.event] || 0), + common.toUInt32(0), // ip address (optional) + common.toUInt32(0), // key (optional) + common.toUInt32(opts.numwant), + toUInt16(self.client._port) + ])) + } + + function scrape (connectionId) { + transactionId = genTransactionId() + + const infoHash = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) + ? Buffer.concat(opts.infoHash) + : (opts.infoHash || self.client._infoHashBuffer) + + send(Buffer.concat([ + connectionId, + common.toUInt32(common.ACTIONS.SCRAPE), + transactionId, + infoHash + ])) + } + } } UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes -UDPTracker.prototype.announce = function (opts) { - var self = this - if (self.destroyed) return - self._request(opts) -} - -UDPTracker.prototype.scrape = function (opts) { - var self = this - if (self.destroyed) return - opts._scrape = true - self._request(opts) // udp scrape uses same announce url -} - -UDPTracker.prototype.destroy = function (cb) { - var self = this - if (self.destroyed) return cb(null) - self.destroyed = true - clearInterval(self.interval) - - // If there are no pending requests, destroy immediately. - if (self.cleanupFns.length === 0) return destroyCleanup() - - // Otherwise, wait a short time for pending requests to complete, then force - // destroy them. - var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) - - // But, if all pending requests complete before the timeout fires, do cleanup - // right away. - self.maybeDestroyCleanup = function () { - if (self.cleanupFns.length === 0) destroyCleanup() - } - - function destroyCleanup () { - if (timeout) { - clearTimeout(timeout) - timeout = null - } - self.maybeDestroyCleanup = null - self.cleanupFns.slice(0).forEach(function (cleanup) { - cleanup() - }) - self.cleanupFns = [] - cb(null) - } -} - -UDPTracker.prototype._request = function (opts) { - var self = this - if (!opts) opts = {} - var parsedUrl = url.parse(self.announceUrl) - var transactionId = genTransactionId() - var socket = dgram.createSocket('udp4') - - var timeout = setTimeout(function () { - // does not matter if `stopped` event arrives, so supress errors - if (opts.event === 'stopped') cleanup() - else onError(new Error('tracker request timed out (' + opts.event + ')')) - timeout = null - }, common.REQUEST_TIMEOUT) - if (timeout.unref) timeout.unref() - - self.cleanupFns.push(cleanup) - - send(Buffer.concat([ - common.CONNECTION_ID, - common.toUInt32(common.ACTIONS.CONNECT), - transactionId - ])) - - socket.once('error', onError) - socket.on('message', onSocketMessage) - - function cleanup () { - if (timeout) { - clearTimeout(timeout) - timeout = null - } - if (socket) { - arrayRemove(self.cleanupFns, self.cleanupFns.indexOf(cleanup)) - socket.removeListener('error', onError) - socket.removeListener('message', onSocketMessage) - socket.on('error', noop) // ignore all future errors - try { socket.close() } catch (err) {} - socket = null - } - if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() - } - - function onError (err) { - cleanup() - if (self.destroyed) return - - if (err.message) err.message += ' (' + self.announceUrl + ')' - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', err) - } - - function onSocketMessage (msg) { - if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { - return onError(new Error('tracker sent invalid transaction id')) - } - - var action = msg.readUInt32BE(0) - debug('UDP response %s, action %s', self.announceUrl, action) - switch (action) { - case 0: // handshake - // Note: no check for `self.destroyed` so that pending messages to the - // tracker can still be sent/received even after destroy() is called - - if (msg.length < 16) return onError(new Error('invalid udp handshake')) - - if (opts._scrape) scrape(msg.slice(8, 16)) - else announce(msg.slice(8, 16), opts) - - break - - case 1: // announce - cleanup() - if (self.destroyed) return - - if (msg.length < 20) return onError(new Error('invalid announce message')) - - var interval = msg.readUInt32BE(8) - if (interval) self.setInterval(interval * 1000) - - self.client.emit('update', { - announce: self.announceUrl, - complete: msg.readUInt32BE(16), - incomplete: msg.readUInt32BE(12) - }) - - var addrs - try { - addrs = compact2string.multi(msg.slice(20)) - } catch (err) { - return self.client.emit('warning', err) - } - addrs.forEach(function (addr) { - self.client.emit('peer', addr) - }) - - break - - case 2: // scrape - cleanup() - if (self.destroyed) return - - if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { - return onError(new Error('invalid scrape message')) - } - var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) - ? opts.infoHash.map(function (infoHash) { return infoHash.toString('hex') }) - : [ (opts.infoHash && opts.infoHash.toString('hex')) || self.client.infoHash ] - - for (var i = 0, len = (msg.length - 8) / 12; i < len; i += 1) { - self.client.emit('scrape', { - announce: self.announceUrl, - infoHash: infoHashes[i], - complete: msg.readUInt32BE(8 + (i * 12)), - downloaded: msg.readUInt32BE(12 + (i * 12)), - incomplete: msg.readUInt32BE(16 + (i * 12)) - }) - } - - break - - case 3: // error - cleanup() - if (self.destroyed) return - - if (msg.length < 8) return onError(new Error('invalid error message')) - self.client.emit('warning', new Error(msg.slice(8).toString())) - - break - - default: - onError(new Error('tracker sent invalid action')) - break - } - } - - function send (message) { - if (!parsedUrl.port) { - parsedUrl.port = 80 - } - socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname) - } - - function announce (connectionId, opts) { - transactionId = genTransactionId() - - send(Buffer.concat([ - connectionId, - common.toUInt32(common.ACTIONS.ANNOUNCE), - transactionId, - self.client._infoHashBuffer, - self.client._peerIdBuffer, - toUInt64(opts.downloaded), - opts.left != null ? toUInt64(opts.left) : Buffer.from('FFFFFFFFFFFFFFFF', 'hex'), - toUInt64(opts.uploaded), - common.toUInt32(common.EVENTS[opts.event] || 0), - common.toUInt32(0), // ip address (optional) - common.toUInt32(0), // key (optional) - common.toUInt32(opts.numwant), - toUInt16(self.client._port) - ])) - } - - function scrape (connectionId) { - transactionId = genTransactionId() - - var infoHash = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) - ? Buffer.concat(opts.infoHash) - : (opts.infoHash || self.client._infoHashBuffer) - - send(Buffer.concat([ - connectionId, - common.toUInt32(common.ACTIONS.SCRAPE), - transactionId, - infoHash - ])) - } -} - function genTransactionId () { return randombytes(4) } function toUInt16 (n) { - var buf = Buffer.allocUnsafe(2) + const buf = Buffer.allocUnsafe(2) buf.writeUInt16BE(n, 0) return buf } -var MAX_UINT = 4294967295 +const MAX_UINT = 4294967295 function toUInt64 (n) { if (n > MAX_UINT || typeof n === 'string') { - var bytes = new BN(n).toArray() + const bytes = new BN(n).toArray() while (bytes.length < 8) { bytes.unshift(0) } @@ -281,3 +278,5 @@ function toUInt64 (n) { } function noop () {} + +module.exports = UDPTracker diff --git a/lib/client/websocket-tracker.js b/lib/client/websocket-tracker.js index 7e30292..f0294a0 100644 --- a/lib/client/websocket-tracker.js +++ b/lib/client/websocket-tracker.js @@ -1,445 +1,444 @@ -module.exports = WebSocketTracker +const debug = require('debug')('bittorrent-tracker:websocket-tracker') +const Peer = require('simple-peer') +const randombytes = require('randombytes') +const Socket = require('simple-websocket') -var debug = require('debug')('bittorrent-tracker:websocket-tracker') -var inherits = require('inherits') -var Peer = require('simple-peer') -var randombytes = require('randombytes') -var Socket = require('simple-websocket') - -var common = require('../common') -var Tracker = require('./tracker') +const common = require('../common') +const Tracker = require('./tracker') // Use a socket pool, so tracker clients share WebSocket objects for the same server. // In practice, WebSockets are pretty slow to establish, so this gives a nice performance // boost, and saves browser resources. -var socketPool = {} -// Normally this shouldn't be accessed but is occasionally useful -WebSocketTracker._socketPool = socketPool +const socketPool = {} -var RECONNECT_MINIMUM = 15 * 1000 -var RECONNECT_MAXIMUM = 30 * 60 * 1000 -var RECONNECT_VARIANCE = 30 * 1000 -var OFFER_TIMEOUT = 50 * 1000 +const RECONNECT_MINIMUM = 15 * 1000 +const RECONNECT_MAXIMUM = 30 * 60 * 1000 +const RECONNECT_VARIANCE = 30 * 1000 +const OFFER_TIMEOUT = 50 * 1000 -inherits(WebSocketTracker, Tracker) +class WebSocketTracker extends Tracker { + constructor (client, announceUrl, opts) { + super(client, announceUrl) + const self = this + debug('new websocket tracker %s', announceUrl) -function WebSocketTracker (client, announceUrl, opts) { - var self = this - Tracker.call(self, client, announceUrl) - debug('new websocket tracker %s', announceUrl) + self.peers = {} // peers (offer id -> peer) + self.socket = null - self.peers = {} // peers (offer id -> peer) - self.socket = null + self.reconnecting = false + self.retries = 0 + self.reconnectTimer = null - self.reconnecting = false - self.retries = 0 - self.reconnectTimer = null + // Simple boolean flag to track whether the socket has received data from + // the websocket server since the last time socket.send() was called. + self.expectingResponse = false - // Simple boolean flag to track whether the socket has received data from - // the websocket server since the last time socket.send() was called. - self.expectingResponse = false + self._openSocket() + } - self._openSocket() + announce (opts) { + const self = this + if (self.destroyed || self.reconnecting) return + if (!self.socket.connected) { + self.socket.once('connect', () => { + self.announce(opts) + }) + return + } + + const params = Object.assign({}, opts, { + action: 'announce', + info_hash: self.client._infoHashBinary, + peer_id: self.client._peerIdBinary + }) + if (self._trackerId) params.trackerid = self._trackerId + + if (opts.event === 'stopped' || opts.event === 'completed') { + // Don't include offers with 'stopped' or 'completed' event + self._send(params) + } else { + // Limit the number of offers that are generated, since it can be slow + const numwant = Math.min(opts.numwant, 10) + + self._generateOffers(numwant, offers => { + params.numwant = numwant + params.offers = offers + self._send(params) + }) + } + } + + scrape (opts) { + const self = this + if (self.destroyed || self.reconnecting) return + if (!self.socket.connected) { + self.socket.once('connect', () => { + self.scrape(opts) + }) + return + } + + const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) + ? opts.infoHash.map(infoHash => { + return infoHash.toString('binary') + }) + : (opts.infoHash && opts.infoHash.toString('binary')) || self.client._infoHashBinary + const params = { + action: 'scrape', + info_hash: infoHashes + } + + self._send(params) + } + + destroy (cb) { + const self = this + if (!cb) cb = noop + if (self.destroyed) return cb(null) + + self.destroyed = true + + clearInterval(self.interval) + clearTimeout(self.reconnectTimer) + + // Destroy peers + for (const peerId in self.peers) { + const peer = self.peers[peerId] + clearTimeout(peer.trackerTimeout) + peer.destroy() + } + self.peers = null + + if (self.socket) { + self.socket.removeListener('connect', self._onSocketConnectBound) + self.socket.removeListener('data', self._onSocketDataBound) + self.socket.removeListener('close', self._onSocketCloseBound) + self.socket.removeListener('error', self._onSocketErrorBound) + self.socket = null + } + + self._onSocketConnectBound = null + self._onSocketErrorBound = null + self._onSocketDataBound = null + self._onSocketCloseBound = null + + if (socketPool[self.announceUrl]) { + socketPool[self.announceUrl].consumers -= 1 + } + + // Other instances are using the socket, so there's nothing left to do here + if (socketPool[self.announceUrl].consumers > 0) return cb() + + let socket = socketPool[self.announceUrl] + delete socketPool[self.announceUrl] + socket.on('error', noop) // ignore all future errors + socket.once('close', cb) + + // If there is no data response expected, destroy immediately. + if (!self.expectingResponse) return destroyCleanup() + + // Otherwise, wait a short time for potential responses to come in from the + // server, then force close the socket. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if a response comes from the server before the timeout fires, do cleanup + // right away. + socket.once('data', destroyCleanup) + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + socket.removeListener('data', destroyCleanup) + socket.destroy() + socket = null + } + } + + _openSocket () { + const self = this + self.destroyed = false + + if (!self.peers) self.peers = {} + + self._onSocketConnectBound = () => { + self._onSocketConnect() + } + self._onSocketErrorBound = err => { + self._onSocketError(err) + } + self._onSocketDataBound = data => { + self._onSocketData(data) + } + self._onSocketCloseBound = () => { + self._onSocketClose() + } + + self.socket = socketPool[self.announceUrl] + if (self.socket) { + socketPool[self.announceUrl].consumers += 1 + } else { + self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl) + self.socket.consumers = 1 + self.socket.once('connect', self._onSocketConnectBound) + } + + self.socket.on('data', self._onSocketDataBound) + self.socket.once('close', self._onSocketCloseBound) + self.socket.once('error', self._onSocketErrorBound) + } + + _onSocketConnect () { + const self = this + if (self.destroyed) return + + if (self.reconnecting) { + self.reconnecting = false + self.retries = 0 + self.announce(self.client._defaultAnnounceOpts()) + } + } + + _onSocketData (data) { + const self = this + if (self.destroyed) return + + self.expectingResponse = false + + try { + data = JSON.parse(data) + } catch (err) { + self.client.emit('warning', new Error('Invalid tracker response')) + return + } + + if (data.action === 'announce') { + self._onAnnounceResponse(data) + } else if (data.action === 'scrape') { + self._onScrapeResponse(data) + } else { + self._onSocketError(new Error(`invalid action in WS response: ${data.action}`)) + } + } + + _onAnnounceResponse (data) { + const self = this + + if (data.info_hash !== self.client._infoHashBinary) { + debug( + 'ignoring websocket data from %s for %s (looking for %s: reused socket)', + self.announceUrl, common.binaryToHex(data.info_hash), self.client.infoHash + ) + return + } + + if (data.peer_id && data.peer_id === self.client._peerIdBinary) { + // ignore offers/answers from this client + return + } + + debug( + 'received %s from %s for %s', + JSON.stringify(data), self.announceUrl, self.client.infoHash + ) + + const failure = data['failure reason'] + if (failure) return self.client.emit('warning', new Error(failure)) + + const warning = data['warning message'] + if (warning) self.client.emit('warning', new Error(warning)) + + const interval = data.interval || data['min interval'] + if (interval) self.setInterval(interval * 1000) + + const trackerId = data['tracker id'] + if (trackerId) { + // If absent, do not discard previous trackerId value + self._trackerId = trackerId + } + + if (data.complete != null) { + const response = Object.assign({}, data, { + announce: self.announceUrl, + infoHash: common.binaryToHex(data.info_hash) + }) + self.client.emit('update', response) + } + + let peer + if (data.offer && data.peer_id) { + debug('creating peer (from remote offer)') + peer = self._createPeer() + peer.id = common.binaryToHex(data.peer_id) + peer.once('signal', answer => { + const params = { + action: 'announce', + info_hash: self.client._infoHashBinary, + peer_id: self.client._peerIdBinary, + to_peer_id: data.peer_id, + answer, + offer_id: data.offer_id + } + if (self._trackerId) params.trackerid = self._trackerId + self._send(params) + }) + peer.signal(data.offer) + self.client.emit('peer', peer) + } + + if (data.answer && data.peer_id) { + const offerId = common.binaryToHex(data.offer_id) + peer = self.peers[offerId] + if (peer) { + peer.id = common.binaryToHex(data.peer_id) + peer.signal(data.answer) + self.client.emit('peer', peer) + + clearTimeout(peer.trackerTimeout) + peer.trackerTimeout = null + delete self.peers[offerId] + } else { + debug(`got unexpected answer: ${JSON.stringify(data.answer)}`) + } + } + } + + _onScrapeResponse (data) { + const self = this + data = data.files || {} + + const keys = Object.keys(data) + if (keys.length === 0) { + self.client.emit('warning', new Error('invalid scrape response')) + return + } + + keys.forEach(infoHash => { + // TODO: optionally handle data.flags.min_request_interval + // (separate from announce interval) + const response = Object.assign(data[infoHash], { + announce: self.announceUrl, + infoHash: common.binaryToHex(infoHash) + }) + self.client.emit('scrape', response) + }) + } + + _onSocketClose () { + const self = this + if (self.destroyed) return + self.destroy() + self._startReconnectTimer() + } + + _onSocketError (err) { + const self = this + if (self.destroyed) return + self.destroy() + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', err) + self._startReconnectTimer() + } + + _startReconnectTimer () { + const self = this + const ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, self.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM) + + self.reconnecting = true + clearTimeout(self.reconnectTimer) + self.reconnectTimer = setTimeout(() => { + self.retries++ + self._openSocket() + }, ms) + if (self.reconnectTimer.unref) self.reconnectTimer.unref() + + debug('reconnecting socket in %s ms', ms) + } + + _send (params) { + const self = this + if (self.destroyed) return + self.expectingResponse = true + const message = JSON.stringify(params) + debug('send %s', message) + self.socket.send(message) + } + + _generateOffers (numwant, cb) { + const self = this + const offers = [] + debug('generating %s offers', numwant) + + for (let i = 0; i < numwant; ++i) { + generateOffer() + } + checkDone() + + function generateOffer () { + const offerId = randombytes(20).toString('hex') + debug('creating peer (from _generateOffers)') + const peer = self.peers[offerId] = self._createPeer({ initiator: true }) + peer.once('signal', offer => { + offers.push({ + offer, + offer_id: common.hexToBinary(offerId) + }) + checkDone() + }) + peer.trackerTimeout = setTimeout(() => { + debug('tracker timeout: destroying peer') + peer.trackerTimeout = null + delete self.peers[offerId] + peer.destroy() + }, OFFER_TIMEOUT) + if (peer.trackerTimeout.unref) peer.trackerTimeout.unref() + } + + function checkDone () { + if (offers.length === numwant) { + debug('generated %s offers', numwant) + cb(offers) + } + } + } + + _createPeer (opts) { + const self = this + + opts = Object.assign({ + trickle: false, + config: self.client._rtcConfig, + wrtc: self.client._wrtc + }, opts) + + const peer = new Peer(opts) + + peer.once('error', onError) + peer.once('connect', onConnect) + + return peer + + // Handle peer 'error' events that are fired *before* the peer is emitted in + // a 'peer' event. + function onError (err) { + self.client.emit('warning', new Error(`Connection error: ${err.message}`)) + peer.destroy() + } + + // Once the peer is emitted in a 'peer' event, then it's the consumer's + // responsibility to listen for errors, so the listeners are removed here. + function onConnect () { + peer.removeListener('error', onError) + peer.removeListener('connect', onConnect) + } + } } WebSocketTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 1000 // 30 seconds - -WebSocketTracker.prototype.announce = function (opts) { - var self = this - if (self.destroyed || self.reconnecting) return - if (!self.socket.connected) { - self.socket.once('connect', function () { - self.announce(opts) - }) - return - } - - var params = Object.assign({}, opts, { - action: 'announce', - info_hash: self.client._infoHashBinary, - peer_id: self.client._peerIdBinary - }) - if (self._trackerId) params.trackerid = self._trackerId - - if (opts.event === 'stopped' || opts.event === 'completed') { - // Don't include offers with 'stopped' or 'completed' event - self._send(params) - } else { - // Limit the number of offers that are generated, since it can be slow - var numwant = Math.min(opts.numwant, 10) - - self._generateOffers(numwant, function (offers) { - params.numwant = numwant - params.offers = offers - self._send(params) - }) - } -} - -WebSocketTracker.prototype.scrape = function (opts) { - var self = this - if (self.destroyed || self.reconnecting) return - if (!self.socket.connected) { - self.socket.once('connect', function () { - self.scrape(opts) - }) - return - } - - var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) - ? opts.infoHash.map(function (infoHash) { - return infoHash.toString('binary') - }) - : (opts.infoHash && opts.infoHash.toString('binary')) || self.client._infoHashBinary - var params = { - action: 'scrape', - info_hash: infoHashes - } - - self._send(params) -} - -WebSocketTracker.prototype.destroy = function (cb) { - var self = this - if (!cb) cb = noop - if (self.destroyed) return cb(null) - - self.destroyed = true - - clearInterval(self.interval) - clearTimeout(self.reconnectTimer) - - // Destroy peers - for (var peerId in self.peers) { - var peer = self.peers[peerId] - clearTimeout(peer.trackerTimeout) - peer.destroy() - } - self.peers = null - - if (self.socket) { - self.socket.removeListener('connect', self._onSocketConnectBound) - self.socket.removeListener('data', self._onSocketDataBound) - self.socket.removeListener('close', self._onSocketCloseBound) - self.socket.removeListener('error', self._onSocketErrorBound) - self.socket = null - } - - self._onSocketConnectBound = null - self._onSocketErrorBound = null - self._onSocketDataBound = null - self._onSocketCloseBound = null - - if (socketPool[self.announceUrl]) { - socketPool[self.announceUrl].consumers -= 1 - } - - // Other instances are using the socket, so there's nothing left to do here - if (socketPool[self.announceUrl].consumers > 0) return cb() - - var socket = socketPool[self.announceUrl] - delete socketPool[self.announceUrl] - socket.on('error', noop) // ignore all future errors - socket.once('close', cb) - - // If there is no data response expected, destroy immediately. - if (!self.expectingResponse) return destroyCleanup() - - // Otherwise, wait a short time for potential responses to come in from the - // server, then force close the socket. - var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) - - // But, if a response comes from the server before the timeout fires, do cleanup - // right away. - socket.once('data', destroyCleanup) - - function destroyCleanup () { - if (timeout) { - clearTimeout(timeout) - timeout = null - } - socket.removeListener('data', destroyCleanup) - socket.destroy() - socket = null - } -} - -WebSocketTracker.prototype._openSocket = function () { - var self = this - self.destroyed = false - - if (!self.peers) self.peers = {} - - self._onSocketConnectBound = function () { - self._onSocketConnect() - } - self._onSocketErrorBound = function (err) { - self._onSocketError(err) - } - self._onSocketDataBound = function (data) { - self._onSocketData(data) - } - self._onSocketCloseBound = function () { - self._onSocketClose() - } - - self.socket = socketPool[self.announceUrl] - if (self.socket) { - socketPool[self.announceUrl].consumers += 1 - } else { - self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl) - self.socket.consumers = 1 - self.socket.once('connect', self._onSocketConnectBound) - } - - self.socket.on('data', self._onSocketDataBound) - self.socket.once('close', self._onSocketCloseBound) - self.socket.once('error', self._onSocketErrorBound) -} - -WebSocketTracker.prototype._onSocketConnect = function () { - var self = this - if (self.destroyed) return - - if (self.reconnecting) { - self.reconnecting = false - self.retries = 0 - self.announce(self.client._defaultAnnounceOpts()) - } -} - -WebSocketTracker.prototype._onSocketData = function (data) { - var self = this - if (self.destroyed) return - - self.expectingResponse = false - - try { - data = JSON.parse(data) - } catch (err) { - self.client.emit('warning', new Error('Invalid tracker response')) - return - } - - if (data.action === 'announce') { - self._onAnnounceResponse(data) - } else if (data.action === 'scrape') { - self._onScrapeResponse(data) - } else { - self._onSocketError(new Error('invalid action in WS response: ' + data.action)) - } -} - -WebSocketTracker.prototype._onAnnounceResponse = function (data) { - var self = this - - if (data.info_hash !== self.client._infoHashBinary) { - debug( - 'ignoring websocket data from %s for %s (looking for %s: reused socket)', - self.announceUrl, common.binaryToHex(data.info_hash), self.client.infoHash - ) - return - } - - if (data.peer_id && data.peer_id === self.client._peerIdBinary) { - // ignore offers/answers from this client - return - } - - debug( - 'received %s from %s for %s', - JSON.stringify(data), self.announceUrl, self.client.infoHash - ) - - var failure = data['failure reason'] - if (failure) return self.client.emit('warning', new Error(failure)) - - var warning = data['warning message'] - if (warning) self.client.emit('warning', new Error(warning)) - - var interval = data.interval || data['min interval'] - if (interval) self.setInterval(interval * 1000) - - var trackerId = data['tracker id'] - if (trackerId) { - // If absent, do not discard previous trackerId value - self._trackerId = trackerId - } - - if (data.complete != null) { - var response = Object.assign({}, data, { - announce: self.announceUrl, - infoHash: common.binaryToHex(data.info_hash) - }) - self.client.emit('update', response) - } - - var peer - if (data.offer && data.peer_id) { - debug('creating peer (from remote offer)') - peer = self._createPeer() - peer.id = common.binaryToHex(data.peer_id) - peer.once('signal', function (answer) { - var params = { - action: 'announce', - info_hash: self.client._infoHashBinary, - peer_id: self.client._peerIdBinary, - to_peer_id: data.peer_id, - answer: answer, - offer_id: data.offer_id - } - if (self._trackerId) params.trackerid = self._trackerId - self._send(params) - }) - peer.signal(data.offer) - self.client.emit('peer', peer) - } - - if (data.answer && data.peer_id) { - var offerId = common.binaryToHex(data.offer_id) - peer = self.peers[offerId] - if (peer) { - peer.id = common.binaryToHex(data.peer_id) - peer.signal(data.answer) - self.client.emit('peer', peer) - - clearTimeout(peer.trackerTimeout) - peer.trackerTimeout = null - delete self.peers[offerId] - } else { - debug('got unexpected answer: ' + JSON.stringify(data.answer)) - } - } -} - -WebSocketTracker.prototype._onScrapeResponse = function (data) { - var self = this - data = data.files || {} - - var keys = Object.keys(data) - if (keys.length === 0) { - self.client.emit('warning', new Error('invalid scrape response')) - return - } - - keys.forEach(function (infoHash) { - // TODO: optionally handle data.flags.min_request_interval - // (separate from announce interval) - var response = Object.assign(data[infoHash], { - announce: self.announceUrl, - infoHash: common.binaryToHex(infoHash) - }) - self.client.emit('scrape', response) - }) -} - -WebSocketTracker.prototype._onSocketClose = function () { - var self = this - if (self.destroyed) return - self.destroy() - self._startReconnectTimer() -} - -WebSocketTracker.prototype._onSocketError = function (err) { - var self = this - if (self.destroyed) return - self.destroy() - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', err) - self._startReconnectTimer() -} - -WebSocketTracker.prototype._startReconnectTimer = function () { - var self = this - var ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, self.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM) - - self.reconnecting = true - clearTimeout(self.reconnectTimer) - self.reconnectTimer = setTimeout(function () { - self.retries++ - self._openSocket() - }, ms) - if (self.reconnectTimer.unref) self.reconnectTimer.unref() - - debug('reconnecting socket in %s ms', ms) -} - -WebSocketTracker.prototype._send = function (params) { - var self = this - if (self.destroyed) return - self.expectingResponse = true - var message = JSON.stringify(params) - debug('send %s', message) - self.socket.send(message) -} - -WebSocketTracker.prototype._generateOffers = function (numwant, cb) { - var self = this - var offers = [] - debug('generating %s offers', numwant) - - for (var i = 0; i < numwant; ++i) { - generateOffer() - } - checkDone() - - function generateOffer () { - var offerId = randombytes(20).toString('hex') - debug('creating peer (from _generateOffers)') - var peer = self.peers[offerId] = self._createPeer({ initiator: true }) - peer.once('signal', function (offer) { - offers.push({ - offer: offer, - offer_id: common.hexToBinary(offerId) - }) - checkDone() - }) - peer.trackerTimeout = setTimeout(function () { - debug('tracker timeout: destroying peer') - peer.trackerTimeout = null - delete self.peers[offerId] - peer.destroy() - }, OFFER_TIMEOUT) - if (peer.trackerTimeout.unref) peer.trackerTimeout.unref() - } - - function checkDone () { - if (offers.length === numwant) { - debug('generated %s offers', numwant) - cb(offers) - } - } -} - -WebSocketTracker.prototype._createPeer = function (opts) { - var self = this - - opts = Object.assign({ - trickle: false, - config: self.client._rtcConfig, - wrtc: self.client._wrtc - }, opts) - - var peer = new Peer(opts) - - peer.once('error', onError) - peer.once('connect', onConnect) - - return peer - - // Handle peer 'error' events that are fired *before* the peer is emitted in - // a 'peer' event. - function onError (err) { - self.client.emit('warning', new Error('Connection error: ' + err.message)) - peer.destroy() - } - - // Once the peer is emitted in a 'peer' event, then it's the consumer's - // responsibility to listen for errors, so the listeners are removed here. - function onConnect () { - peer.removeListener('error', onError) - peer.removeListener('connect', onConnect) - } -} +// Normally this shouldn't be accessed but is occasionally useful +WebSocketTracker._socketPool = socketPool function noop () {} + +module.exports = WebSocketTracker