From ee45b7947b02233fbc6f83446377e184301392b5 Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:32:59 -0800 Subject: [PATCH 1/6] update filter tests to clarify multiple clients --- test/filter.js | 57 +++++++++++++++++++++++++------------------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/test/filter.js b/test/filter.js index 4e74294..ffdd567 100644 --- a/test/filter.js +++ b/test/filter.js @@ -17,7 +17,7 @@ function testFilterOption (t, serverType) { } common.createServer(t, opts, function (server, announceUrl) { - var client = new Client({ + var client1 = new Client({ infoHash: fixtures.alice.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, @@ -25,29 +25,30 @@ function testFilterOption (t, serverType) { wrtc: {} }) - client.on('error', function (err) { t.error(err) }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + client1.on('error', function (err) { t.error(err) }) + if (serverType === 'ws') common.mockWebsocketTracker(client1) - client.once('warning', function (err) { + client1.once('warning', function (err) { t.ok(/disallowed info_hash/.test(err.message), 'got client warning') - client.destroy(function () { - t.pass('client destroyed') - client = new Client({ + client1.destroy(function () { + t.pass('client1 destroyed') + + var client2 = new Client({ infoHash: fixtures.leaves.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, port: 6881, wrtc: {} }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + if (serverType === 'ws') common.mockWebsocketTracker(client2) - client.on('error', function (err) { t.error(err) }) - client.on('warning', function (err) { t.error(err) }) + client2.on('error', function (err) { t.error(err) }) + client2.on('warning', function (err) { t.error(err) }) - client.on('update', function () { + client2.on('update', function () { t.pass('got announce') - client.destroy(function () { t.pass('client destroyed') }) + client2.destroy(function () { t.pass('client2 destroyed') }) server.close(function () { t.pass('server closed') }) }) @@ -55,7 +56,7 @@ function testFilterOption (t, serverType) { t.equal(Object.keys(server.torrents).length, 1) }) - client.start() + client2.start() }) }) @@ -65,7 +66,7 @@ function testFilterOption (t, serverType) { t.equal(Object.keys(server.torrents).length, 0) }) - client.start() + client1.start() }) } @@ -93,7 +94,7 @@ function testFilterCustomError (t, serverType) { } common.createServer(t, opts, function (server, announceUrl) { - var client = new Client({ + var client1 = new Client({ infoHash: fixtures.alice.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, @@ -101,29 +102,29 @@ function testFilterCustomError (t, serverType) { wrtc: {} }) - client.on('error', function (err) { t.error(err) }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + client1.on('error', function (err) { t.error(err) }) + if (serverType === 'ws') common.mockWebsocketTracker(client1) - client.once('warning', function (err) { + client1.once('warning', function (err) { t.ok(/alice blocked/.test(err.message), 'got client warning') - client.destroy(function () { - t.pass('client destroyed') - client = new Client({ + client1.destroy(function () { + t.pass('client1 destroyed') + var client2 = new Client({ infoHash: fixtures.leaves.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, port: 6881, wrtc: {} }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + if (serverType === 'ws') common.mockWebsocketTracker(client2) - client.on('error', function (err) { t.error(err) }) - client.on('warning', function (err) { t.error(err) }) + client2.on('error', function (err) { t.error(err) }) + client2.on('warning', function (err) { t.error(err) }) - client.on('update', function () { + client2.on('update', function () { t.pass('got announce') - client.destroy(function () { t.pass('client destroyed') }) + client2.destroy(function () { t.pass('client2 destroyed') }) server.close(function () { t.pass('server closed') }) }) @@ -131,7 +132,7 @@ function testFilterCustomError (t, serverType) { t.equal(Object.keys(server.torrents).length, 1) }) - client.start() + client2.start() }) }) @@ -141,7 +142,7 @@ function testFilterCustomError (t, serverType) { t.equal(Object.keys(server.torrents).length, 0) }) - client.start() + client1.start() }) } From 1424a00c3adf7a5d439555032a62257cd2467187 Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:33:17 -0800 Subject: [PATCH 2/6] test: don't rely on setTimeout in testClientStop() --- test/client.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/client.js b/test/client.js index 29a2897..e8b6431 100644 --- a/test/client.js +++ b/test/client.js @@ -56,7 +56,7 @@ test('ws: client.start()', function (t) { }) function testClientStop (t, serverType) { - t.plan(3) + t.plan(4) common.createServer(t, serverType, function (server, announceUrl) { var client = new Client({ @@ -73,7 +73,9 @@ function testClientStop (t, serverType) { client.start() - setTimeout(function () { + client.once('update', function () { + t.pass('client received response to "start" message') + client.stop() client.once('update', function (data) { @@ -85,7 +87,7 @@ function testClientStop (t, serverType) { server.close() client.destroy() }) - }, 1000) + }) }) } From 63d24611ec2be35475e82f6e5ac6ebab421afa95 Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:33:30 -0800 Subject: [PATCH 3/6] test: speed up testClientUpdate() test --- test/client.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/client.js b/test/client.js index e8b6431..e6631b7 100644 --- a/test/client.js +++ b/test/client.js @@ -119,14 +119,14 @@ function testClientUpdate (t, serverType) { client.on('error', function (err) { t.error(err) }) client.on('warning', function (err) { t.error(err) }) - client.setInterval(2000) + client.setInterval(500) client.start() client.once('update', function () { - client.setInterval(2000) + client.setInterval(500) - // after interval (2s), we should get another update + // after interval, we should get another update client.once('update', function (data) { // received an update! t.equal(data.announce, announceUrl) From c3bf7f87f6c7d2036e623e84ea510be3f43406a1 Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:34:05 -0800 Subject: [PATCH 4/6] test: add failing test for the second part of #190 --- test/client.js | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/test/client.js b/test/client.js index e6631b7..9858bf5 100644 --- a/test/client.js +++ b/test/client.js @@ -103,6 +103,60 @@ test('ws: client.stop()', function (t) { testClientStop(t, 'ws') }) +function testClientStopDestroy (t, serverType) { + t.plan(2) + + common.createServer(t, serverType, function (server, announceUrl) { + var client = new Client({ + infoHash: fixtures.leaves.parsedTorrent.infoHash, + announce: announceUrl, + peerId: peerId1, + port: port, + wrtc: {} + }) + + if (serverType === 'ws') common.mockWebsocketTracker(client) + client.on('error', function (err) { t.error(err) }) + client.on('warning', function (err) { t.error(err) }) + + client.start() + + client.once('update', function () { + t.pass('client received response to "start" message') + + client.stop() + + client.on('update', function () { t.fail('client should not receive update after destroy is called') }) + + // Call destroy() in the same tick as stop(), but the message should still + // be received by the server, though obviously the client won't receive the + // response. + client.destroy() + + server.once('stop', function (peer, params) { + t.pass('server received "stop" message') + setTimeout(function () { + // give the websocket server time to finish in progress (stream) messages + // to peers + server.close() + }, 100) + }) + }) + }) +} + +test('http: client.stop(); client.destroy()', function (t) { + testClientStopDestroy(t, 'http') +}) + +test('udp: client.stop(); client.destroy()', function (t) { + testClientStopDestroy(t, 'udp') +}) + +test('ws: client.stop(); client.destroy()', function (t) { + testClientStopDestroy(t, 'ws') +}) + function testClientUpdate (t, serverType) { t.plan(4) From 9cf2dffa67850aead29f47493a2f529c7f5defdf Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:34:33 -0800 Subject: [PATCH 5/6] debug: better debug names --- client.js | 2 +- lib/server/swarm.js | 2 +- server.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client.js b/client.js index c72f8d5..cea05be 100644 --- a/client.js +++ b/client.js @@ -1,7 +1,7 @@ module.exports = Client var Buffer = require('safe-buffer').Buffer -var debug = require('debug')('bittorrent-tracker') +var debug = require('debug')('bittorrent-tracker:client') var EventEmitter = require('events').EventEmitter var extend = require('xtend') var inherits = require('inherits') diff --git a/lib/server/swarm.js b/lib/server/swarm.js index 3d987bf..f8d8a44 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -1,6 +1,6 @@ module.exports = Swarm -var debug = require('debug')('bittorrent-tracker') +var debug = require('debug')('bittorrent-tracker:swarm') var LRU = require('lru') var randomIterate = require('random-iterate') diff --git a/server.js b/server.js index 87d08a1..291357f 100644 --- a/server.js +++ b/server.js @@ -2,7 +2,7 @@ module.exports = Server var Buffer = require('safe-buffer').Buffer var bencode = require('bencode') -var debug = require('debug')('bittorrent-tracker') +var debug = require('debug')('bittorrent-tracker:server') var dgram = require('dgram') var EventEmitter = require('events').EventEmitter var http = require('http') From 0aadcc1cbb7253c58ecfd9ada11ac33a22bdf34f Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:41:28 -0800 Subject: [PATCH 6/6] Wait up to 1s for pending requests before destroy() If the user calls: client.stop() client.destroy() We should ensure that the final 'stopped' message reaches the tracker server, even though the client will not get the response (because they destroyed the client and no more events will be emitted). If there are pending requests when destroy() is called, then a 1s timer is set after which point all requests are forcibly cleaned up. If the requests complete before the 1s timer fires, then cleanup happens right away (so we're not stuck waiting for the 1s timer). So, destroy() can happen one of three ways: - immediately, if no pending requests exist - after exactly 1s, if pending requests exist and they don't complete within 1s - less than 1s, if pending requests exist and they all complete before the 1s timer fires --- lib/client/http-tracker.js | 60 +++++++++++++++--- lib/client/udp-tracker.js | 107 +++++++++++++++++++++----------- lib/client/websocket-tracker.js | 69 +++++++++++++------- lib/common-node.js | 12 ++++ 4 files changed, 179 insertions(+), 69 deletions(-) diff --git a/lib/client/http-tracker.js b/lib/client/http-tracker.js index 570a910..0398773 100644 --- a/lib/client/http-tracker.js +++ b/lib/client/http-tracker.js @@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) { // Determine scrape url (if http tracker supports it) self.scrapeUrl = null - var m - if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) { - self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' + - self.announceUrl.slice(m.index + 9) + + var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT) + if (match) { + var pre = self.announceUrl.slice(0, match.index) + var post = self.announceUrl.slice(match.index + 9) + self.scrapeUrl = pre + '/scrape' + post } + + self.cleanupFns = [] + self.maybeDestroyCleanup = null } HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } HTTPTracker.prototype._request = function (requestUrl, params, cb) { var self = this var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + common.querystringStringify(params) - var opts = { + + self.cleanupFns.push(cleanup) + + var request = get.concat({ url: u, + timeout: common.REQUEST_TIMEOUT, headers: { 'user-agent': self.client._userAgent || '' } + }, onResponse) + + function cleanup () { + if (request) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + request.abort() + request = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() } - get.concat(opts, function (err, res, data) { + function onResponse (err, res, data) { + cleanup() if (self.destroyed) return + if (err) return self.client.emit('warning', err) if (res.statusCode !== 200) { return self.client.emit('warning', new Error('Non-200 response code ' + @@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) { debug('response from ' + requestUrl) cb(data) - }) + } } HTTPTracker.prototype._onAnnounceResponse = function (data) { diff --git a/lib/client/udp-tracker.js b/lib/client/udp-tracker.js index b9ccbb2..ae6e06c 100644 --- a/lib/client/udp-tracker.js +++ b/lib/client/udp-tracker.js @@ -12,8 +12,6 @@ var url = require('url') var common = require('../common') var Tracker = require('./tracker') -var TIMEOUT = 15000 - inherits(UDPTracker, Tracker) /** @@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) { debug('new udp tracker %s', announceUrl) self.cleanupFns = [] + self.maybeDestroyCleanup = null } UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - self.cleanupFns.slice(0).forEach(function (cleanup) { - cleanup() - }) - self.cleanupFns = [] - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } UDPTracker.prototype._request = function (opts) { @@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) { var transactionId = genTransactionId() var socket = dgram.createSocket('udp4') - var cleanup = function () { - if (!socket) return - self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) - if (timeout) { - clearTimeout(timeout) - timeout = null - } - socket.removeListener('error', onError) - socket.removeListener('message', onSocketMessage) - socket.on('error', noop) // ignore all future errors - try { socket.close() } catch (err) {} - socket = null - } - self.cleanupFns.push(cleanup) - - // does not matter if `stopped` event arrives, so supress errors & cleanup after timeout - var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT var timeout = setTimeout(function () { - timeout = null + // does not matter if `stopped` event arrives, so supress errors if (opts.event === 'stopped') cleanup() else onError(new Error('tracker request timed out (' + opts.event + ')')) - }, ms) + timeout = null + }, common.REQUEST_TIMEOUT) if (timeout.unref) timeout.unref() + self.cleanupFns.push(cleanup) + send(Buffer.concat([ common.CONNECTION_ID, common.toUInt32(common.ACTIONS.CONNECT), transactionId ])) - socket.on('error', onError) + socket.once('error', onError) socket.on('message', onSocketMessage) - function onSocketMessage (msg) { + function cleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + if (socket) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + socket.removeListener('error', onError) + socket.removeListener('message', onSocketMessage) + socket.on('error', noop) // ignore all future errors + try { socket.close() } catch (err) {} + socket = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() + } + + function onError (err) { + cleanup() if (self.destroyed) return + + if (err.message) err.message += ' (' + self.announceUrl + ')' + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', err) + } + + function onSocketMessage (msg) { if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { return onError(new Error('tracker sent invalid transaction id')) } @@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) { debug('UDP response %s, action %s', self.announceUrl, action) switch (action) { case 0: // handshake + // Note: no check for `self.destroyed` so that pending messages to the + // tracker can still be sent/received even after destroy() is called + if (msg.length < 16) return onError(new Error('invalid udp handshake')) if (opts._scrape) scrape(msg.slice(8, 16)) else announce(msg.slice(8, 16), opts) - return + break case 1: // announce cleanup() + if (self.destroyed) return + if (msg.length < 20) return onError(new Error('invalid announce message')) var interval = msg.readUInt32BE(8) @@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) { addrs.forEach(function (addr) { self.client.emit('peer', addr) }) + break case 2: // scrape cleanup() + if (self.destroyed) return + if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { return onError(new Error('invalid scrape message')) } @@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) { incomplete: msg.readUInt32BE(16 + (i * 12)) }) } + break case 3: // error cleanup() + if (self.destroyed) return + if (msg.length < 8) return onError(new Error('invalid error message')) self.client.emit('warning', new Error(msg.slice(8).toString())) + break default: @@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) { } } - function onError (err) { - if (self.destroyed) return - cleanup() - if (err.message) err.message += ' (' + self.announceUrl + ')' - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', err) - } - function send (message) { if (!parsedUrl.port) { parsedUrl.port = 80 diff --git a/lib/client/websocket-tracker.js b/lib/client/websocket-tracker.js index 3cd7d1f..0e7844b 100644 --- a/lib/client/websocket-tracker.js +++ b/lib/client/websocket-tracker.js @@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) { self.retries = 0 self.reconnectTimer = null + // Simple boolean flag to track whether the socket has received data from + // the websocket server since the last time socket.send() was called. + self.expectingResponse = false + self._openSocket() } @@ -104,18 +108,6 @@ WebSocketTracker.prototype.destroy = function (cb) { clearInterval(self.interval) clearTimeout(self.reconnectTimer) - if (self.socket) { - self.socket.removeListener('connect', self._onSocketConnectBound) - self.socket.removeListener('data', self._onSocketDataBound) - self.socket.removeListener('close', self._onSocketCloseBound) - self.socket.removeListener('error', self._onSocketErrorBound) - } - - self._onSocketConnectBound = null - self._onSocketErrorBound = null - self._onSocketDataBound = null - self._onSocketCloseBound = null - // Destroy peers for (var peerId in self.peers) { var peer = self.peers[peerId] @@ -124,24 +116,51 @@ WebSocketTracker.prototype.destroy = function (cb) { } self.peers = null + if (self.socket) { + self.socket.removeListener('connect', self._onSocketConnectBound) + self.socket.removeListener('data', self._onSocketDataBound) + self.socket.removeListener('close', self._onSocketCloseBound) + self.socket.removeListener('error', self._onSocketErrorBound) + self.socket = null + } + + self._onSocketConnectBound = null + self._onSocketErrorBound = null + self._onSocketDataBound = null + self._onSocketCloseBound = null + if (socketPool[self.announceUrl]) { socketPool[self.announceUrl].consumers -= 1 } - if (socketPool[self.announceUrl].consumers === 0) { - delete socketPool[self.announceUrl] + // Other instances are using the socket, so there's nothing left to do here + if (socketPool[self.announceUrl].consumers > 0) return cb() - try { - self.socket.on('error', noop) // ignore all future errors - self.socket.destroy(cb) - } catch (err) { - cb(null) + var socket = socketPool[self.announceUrl] + delete socketPool[self.announceUrl] + socket.on('error', noop) // ignore all future errors + socket.once('close', cb) + + // If there is no data response expected, destroy immediately. + if (!self.expectingResponse) return destroyCleanup() + + // Otherwise, wait a short time for potential responses to come in from the + // server, then force close the socket. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if a response comes from the server before the timeout fires, do cleanup + // right away. + socket.once('data', destroyCleanup) + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null } - } else { - cb(null) + socket.removeListener('data', destroyCleanup) + socket.destroy() + socket = null } - - self.socket = null } WebSocketTracker.prototype._openSocket = function () { @@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) { var self = this if (self.destroyed) return + self.expectingResponse = false + try { data = JSON.parse(data) } catch (err) { @@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () { WebSocketTracker.prototype._send = function (params) { var self = this if (self.destroyed) return - + self.expectingResponse = true var message = JSON.stringify(params) debug('send %s', message) self.socket.send(message) diff --git a/lib/common-node.js b/lib/common-node.js index 5cc1c0f..55ef8eb 100644 --- a/lib/common-node.js +++ b/lib/common-node.js @@ -26,6 +26,18 @@ exports.EVENT_NAMES = { stopped: 'stop' } +/** + * Client request timeout. How long to wait before considering a request to a + * tracker server to have timed out. + */ +exports.REQUEST_TIMEOUT = 15000 + +/** + * Client destroy timeout. How long to wait before forcibly cleaning up all + * pending requests, open sockets, etc. + */ +exports.DESTROY_TIMEOUT = 1000 + function toUInt32 (n) { var buf = Buffer.allocUnsafe(4) buf.writeUInt32BE(n, 0)