diff --git a/client.js b/client.js index c72f8d5..cea05be 100644 --- a/client.js +++ b/client.js @@ -1,7 +1,7 @@ module.exports = Client var Buffer = require('safe-buffer').Buffer -var debug = require('debug')('bittorrent-tracker') +var debug = require('debug')('bittorrent-tracker:client') var EventEmitter = require('events').EventEmitter var extend = require('xtend') var inherits = require('inherits') diff --git a/lib/client/http-tracker.js b/lib/client/http-tracker.js index 570a910..0398773 100644 --- a/lib/client/http-tracker.js +++ b/lib/client/http-tracker.js @@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) { // Determine scrape url (if http tracker supports it) self.scrapeUrl = null - var m - if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) { - self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' + - self.announceUrl.slice(m.index + 9) + + var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT) + if (match) { + var pre = self.announceUrl.slice(0, match.index) + var post = self.announceUrl.slice(match.index + 9) + self.scrapeUrl = pre + '/scrape' + post } + + self.cleanupFns = [] + self.maybeDestroyCleanup = null } HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } HTTPTracker.prototype._request = function (requestUrl, params, cb) { var self = this var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + common.querystringStringify(params) - var opts = { + + self.cleanupFns.push(cleanup) + + var request = get.concat({ url: u, + timeout: common.REQUEST_TIMEOUT, headers: { 'user-agent': self.client._userAgent || '' } + }, onResponse) + + function cleanup () { + if (request) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + request.abort() + request = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() } - get.concat(opts, function (err, res, data) { + function onResponse (err, res, data) { + cleanup() if (self.destroyed) return + if (err) return self.client.emit('warning', err) if (res.statusCode !== 200) { return self.client.emit('warning', new Error('Non-200 response code ' + @@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) { debug('response from ' + requestUrl) cb(data) - }) + } } HTTPTracker.prototype._onAnnounceResponse = function (data) { diff --git a/lib/client/udp-tracker.js b/lib/client/udp-tracker.js index b9ccbb2..ae6e06c 100644 --- a/lib/client/udp-tracker.js +++ b/lib/client/udp-tracker.js @@ -12,8 +12,6 @@ var url = require('url') var common = require('../common') var Tracker = require('./tracker') -var TIMEOUT = 15000 - inherits(UDPTracker, Tracker) /** @@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) { debug('new udp tracker %s', announceUrl) self.cleanupFns = [] + self.maybeDestroyCleanup = null } UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - self.cleanupFns.slice(0).forEach(function (cleanup) { - cleanup() - }) - self.cleanupFns = [] - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } UDPTracker.prototype._request = function (opts) { @@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) { var transactionId = genTransactionId() var socket = dgram.createSocket('udp4') - var cleanup = function () { - if (!socket) return - self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) - if (timeout) { - clearTimeout(timeout) - timeout = null - } - socket.removeListener('error', onError) - socket.removeListener('message', onSocketMessage) - socket.on('error', noop) // ignore all future errors - try { socket.close() } catch (err) {} - socket = null - } - self.cleanupFns.push(cleanup) - - // does not matter if `stopped` event arrives, so supress errors & cleanup after timeout - var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT var timeout = setTimeout(function () { - timeout = null + // does not matter if `stopped` event arrives, so supress errors if (opts.event === 'stopped') cleanup() else onError(new Error('tracker request timed out (' + opts.event + ')')) - }, ms) + timeout = null + }, common.REQUEST_TIMEOUT) if (timeout.unref) timeout.unref() + self.cleanupFns.push(cleanup) + send(Buffer.concat([ common.CONNECTION_ID, common.toUInt32(common.ACTIONS.CONNECT), transactionId ])) - socket.on('error', onError) + socket.once('error', onError) socket.on('message', onSocketMessage) - function onSocketMessage (msg) { + function cleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + if (socket) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + socket.removeListener('error', onError) + socket.removeListener('message', onSocketMessage) + socket.on('error', noop) // ignore all future errors + try { socket.close() } catch (err) {} + socket = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() + } + + function onError (err) { + cleanup() if (self.destroyed) return + + if (err.message) err.message += ' (' + self.announceUrl + ')' + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', err) + } + + function onSocketMessage (msg) { if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { return onError(new Error('tracker sent invalid transaction id')) } @@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) { debug('UDP response %s, action %s', self.announceUrl, action) switch (action) { case 0: // handshake + // Note: no check for `self.destroyed` so that pending messages to the + // tracker can still be sent/received even after destroy() is called + if (msg.length < 16) return onError(new Error('invalid udp handshake')) if (opts._scrape) scrape(msg.slice(8, 16)) else announce(msg.slice(8, 16), opts) - return + break case 1: // announce cleanup() + if (self.destroyed) return + if (msg.length < 20) return onError(new Error('invalid announce message')) var interval = msg.readUInt32BE(8) @@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) { addrs.forEach(function (addr) { self.client.emit('peer', addr) }) + break case 2: // scrape cleanup() + if (self.destroyed) return + if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { return onError(new Error('invalid scrape message')) } @@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) { incomplete: msg.readUInt32BE(16 + (i * 12)) }) } + break case 3: // error cleanup() + if (self.destroyed) return + if (msg.length < 8) return onError(new Error('invalid error message')) self.client.emit('warning', new Error(msg.slice(8).toString())) + break default: @@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) { } } - function onError (err) { - if (self.destroyed) return - cleanup() - if (err.message) err.message += ' (' + self.announceUrl + ')' - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', err) - } - function send (message) { if (!parsedUrl.port) { parsedUrl.port = 80 diff --git a/lib/client/websocket-tracker.js b/lib/client/websocket-tracker.js index 3cd7d1f..0e7844b 100644 --- a/lib/client/websocket-tracker.js +++ b/lib/client/websocket-tracker.js @@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) { self.retries = 0 self.reconnectTimer = null + // Simple boolean flag to track whether the socket has received data from + // the websocket server since the last time socket.send() was called. + self.expectingResponse = false + self._openSocket() } @@ -104,18 +108,6 @@ WebSocketTracker.prototype.destroy = function (cb) { clearInterval(self.interval) clearTimeout(self.reconnectTimer) - if (self.socket) { - self.socket.removeListener('connect', self._onSocketConnectBound) - self.socket.removeListener('data', self._onSocketDataBound) - self.socket.removeListener('close', self._onSocketCloseBound) - self.socket.removeListener('error', self._onSocketErrorBound) - } - - self._onSocketConnectBound = null - self._onSocketErrorBound = null - self._onSocketDataBound = null - self._onSocketCloseBound = null - // Destroy peers for (var peerId in self.peers) { var peer = self.peers[peerId] @@ -124,24 +116,51 @@ WebSocketTracker.prototype.destroy = function (cb) { } self.peers = null + if (self.socket) { + self.socket.removeListener('connect', self._onSocketConnectBound) + self.socket.removeListener('data', self._onSocketDataBound) + self.socket.removeListener('close', self._onSocketCloseBound) + self.socket.removeListener('error', self._onSocketErrorBound) + self.socket = null + } + + self._onSocketConnectBound = null + self._onSocketErrorBound = null + self._onSocketDataBound = null + self._onSocketCloseBound = null + if (socketPool[self.announceUrl]) { socketPool[self.announceUrl].consumers -= 1 } - if (socketPool[self.announceUrl].consumers === 0) { - delete socketPool[self.announceUrl] + // Other instances are using the socket, so there's nothing left to do here + if (socketPool[self.announceUrl].consumers > 0) return cb() - try { - self.socket.on('error', noop) // ignore all future errors - self.socket.destroy(cb) - } catch (err) { - cb(null) + var socket = socketPool[self.announceUrl] + delete socketPool[self.announceUrl] + socket.on('error', noop) // ignore all future errors + socket.once('close', cb) + + // If there is no data response expected, destroy immediately. + if (!self.expectingResponse) return destroyCleanup() + + // Otherwise, wait a short time for potential responses to come in from the + // server, then force close the socket. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if a response comes from the server before the timeout fires, do cleanup + // right away. + socket.once('data', destroyCleanup) + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null } - } else { - cb(null) + socket.removeListener('data', destroyCleanup) + socket.destroy() + socket = null } - - self.socket = null } WebSocketTracker.prototype._openSocket = function () { @@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) { var self = this if (self.destroyed) return + self.expectingResponse = false + try { data = JSON.parse(data) } catch (err) { @@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () { WebSocketTracker.prototype._send = function (params) { var self = this if (self.destroyed) return - + self.expectingResponse = true var message = JSON.stringify(params) debug('send %s', message) self.socket.send(message) diff --git a/lib/common-node.js b/lib/common-node.js index 5cc1c0f..55ef8eb 100644 --- a/lib/common-node.js +++ b/lib/common-node.js @@ -26,6 +26,18 @@ exports.EVENT_NAMES = { stopped: 'stop' } +/** + * Client request timeout. How long to wait before considering a request to a + * tracker server to have timed out. + */ +exports.REQUEST_TIMEOUT = 15000 + +/** + * Client destroy timeout. How long to wait before forcibly cleaning up all + * pending requests, open sockets, etc. + */ +exports.DESTROY_TIMEOUT = 1000 + function toUInt32 (n) { var buf = Buffer.allocUnsafe(4) buf.writeUInt32BE(n, 0) diff --git a/lib/server/swarm.js b/lib/server/swarm.js index 3d987bf..f8d8a44 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -1,6 +1,6 @@ module.exports = Swarm -var debug = require('debug')('bittorrent-tracker') +var debug = require('debug')('bittorrent-tracker:swarm') var LRU = require('lru') var randomIterate = require('random-iterate') diff --git a/server.js b/server.js index 87d08a1..291357f 100644 --- a/server.js +++ b/server.js @@ -2,7 +2,7 @@ module.exports = Server var Buffer = require('safe-buffer').Buffer var bencode = require('bencode') -var debug = require('debug')('bittorrent-tracker') +var debug = require('debug')('bittorrent-tracker:server') var dgram = require('dgram') var EventEmitter = require('events').EventEmitter var http = require('http') diff --git a/test/client.js b/test/client.js index 29a2897..9858bf5 100644 --- a/test/client.js +++ b/test/client.js @@ -56,7 +56,7 @@ test('ws: client.start()', function (t) { }) function testClientStop (t, serverType) { - t.plan(3) + t.plan(4) common.createServer(t, serverType, function (server, announceUrl) { var client = new Client({ @@ -73,7 +73,9 @@ function testClientStop (t, serverType) { client.start() - setTimeout(function () { + client.once('update', function () { + t.pass('client received response to "start" message') + client.stop() client.once('update', function (data) { @@ -85,7 +87,7 @@ function testClientStop (t, serverType) { server.close() client.destroy() }) - }, 1000) + }) }) } @@ -101,6 +103,60 @@ test('ws: client.stop()', function (t) { testClientStop(t, 'ws') }) +function testClientStopDestroy (t, serverType) { + t.plan(2) + + common.createServer(t, serverType, function (server, announceUrl) { + var client = new Client({ + infoHash: fixtures.leaves.parsedTorrent.infoHash, + announce: announceUrl, + peerId: peerId1, + port: port, + wrtc: {} + }) + + if (serverType === 'ws') common.mockWebsocketTracker(client) + client.on('error', function (err) { t.error(err) }) + client.on('warning', function (err) { t.error(err) }) + + client.start() + + client.once('update', function () { + t.pass('client received response to "start" message') + + client.stop() + + client.on('update', function () { t.fail('client should not receive update after destroy is called') }) + + // Call destroy() in the same tick as stop(), but the message should still + // be received by the server, though obviously the client won't receive the + // response. + client.destroy() + + server.once('stop', function (peer, params) { + t.pass('server received "stop" message') + setTimeout(function () { + // give the websocket server time to finish in progress (stream) messages + // to peers + server.close() + }, 100) + }) + }) + }) +} + +test('http: client.stop(); client.destroy()', function (t) { + testClientStopDestroy(t, 'http') +}) + +test('udp: client.stop(); client.destroy()', function (t) { + testClientStopDestroy(t, 'udp') +}) + +test('ws: client.stop(); client.destroy()', function (t) { + testClientStopDestroy(t, 'ws') +}) + function testClientUpdate (t, serverType) { t.plan(4) @@ -117,14 +173,14 @@ function testClientUpdate (t, serverType) { client.on('error', function (err) { t.error(err) }) client.on('warning', function (err) { t.error(err) }) - client.setInterval(2000) + client.setInterval(500) client.start() client.once('update', function () { - client.setInterval(2000) + client.setInterval(500) - // after interval (2s), we should get another update + // after interval, we should get another update client.once('update', function (data) { // received an update! t.equal(data.announce, announceUrl) diff --git a/test/filter.js b/test/filter.js index 4e74294..ffdd567 100644 --- a/test/filter.js +++ b/test/filter.js @@ -17,7 +17,7 @@ function testFilterOption (t, serverType) { } common.createServer(t, opts, function (server, announceUrl) { - var client = new Client({ + var client1 = new Client({ infoHash: fixtures.alice.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, @@ -25,29 +25,30 @@ function testFilterOption (t, serverType) { wrtc: {} }) - client.on('error', function (err) { t.error(err) }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + client1.on('error', function (err) { t.error(err) }) + if (serverType === 'ws') common.mockWebsocketTracker(client1) - client.once('warning', function (err) { + client1.once('warning', function (err) { t.ok(/disallowed info_hash/.test(err.message), 'got client warning') - client.destroy(function () { - t.pass('client destroyed') - client = new Client({ + client1.destroy(function () { + t.pass('client1 destroyed') + + var client2 = new Client({ infoHash: fixtures.leaves.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, port: 6881, wrtc: {} }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + if (serverType === 'ws') common.mockWebsocketTracker(client2) - client.on('error', function (err) { t.error(err) }) - client.on('warning', function (err) { t.error(err) }) + client2.on('error', function (err) { t.error(err) }) + client2.on('warning', function (err) { t.error(err) }) - client.on('update', function () { + client2.on('update', function () { t.pass('got announce') - client.destroy(function () { t.pass('client destroyed') }) + client2.destroy(function () { t.pass('client2 destroyed') }) server.close(function () { t.pass('server closed') }) }) @@ -55,7 +56,7 @@ function testFilterOption (t, serverType) { t.equal(Object.keys(server.torrents).length, 1) }) - client.start() + client2.start() }) }) @@ -65,7 +66,7 @@ function testFilterOption (t, serverType) { t.equal(Object.keys(server.torrents).length, 0) }) - client.start() + client1.start() }) } @@ -93,7 +94,7 @@ function testFilterCustomError (t, serverType) { } common.createServer(t, opts, function (server, announceUrl) { - var client = new Client({ + var client1 = new Client({ infoHash: fixtures.alice.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, @@ -101,29 +102,29 @@ function testFilterCustomError (t, serverType) { wrtc: {} }) - client.on('error', function (err) { t.error(err) }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + client1.on('error', function (err) { t.error(err) }) + if (serverType === 'ws') common.mockWebsocketTracker(client1) - client.once('warning', function (err) { + client1.once('warning', function (err) { t.ok(/alice blocked/.test(err.message), 'got client warning') - client.destroy(function () { - t.pass('client destroyed') - client = new Client({ + client1.destroy(function () { + t.pass('client1 destroyed') + var client2 = new Client({ infoHash: fixtures.leaves.parsedTorrent.infoHash, announce: announceUrl, peerId: peerId, port: 6881, wrtc: {} }) - if (serverType === 'ws') common.mockWebsocketTracker(client) + if (serverType === 'ws') common.mockWebsocketTracker(client2) - client.on('error', function (err) { t.error(err) }) - client.on('warning', function (err) { t.error(err) }) + client2.on('error', function (err) { t.error(err) }) + client2.on('warning', function (err) { t.error(err) }) - client.on('update', function () { + client2.on('update', function () { t.pass('got announce') - client.destroy(function () { t.pass('client destroyed') }) + client2.destroy(function () { t.pass('client2 destroyed') }) server.close(function () { t.pass('server closed') }) }) @@ -131,7 +132,7 @@ function testFilterCustomError (t, serverType) { t.equal(Object.keys(server.torrents).length, 1) }) - client.start() + client2.start() }) }) @@ -141,7 +142,7 @@ function testFilterCustomError (t, serverType) { t.equal(Object.keys(server.torrents).length, 0) }) - client.start() + client1.start() }) }