From 0aadcc1cbb7253c58ecfd9ada11ac33a22bdf34f Mon Sep 17 00:00:00 2001 From: Feross Aboukhadijeh Date: Fri, 20 Jan 2017 18:41:28 -0800 Subject: [PATCH] Wait up to 1s for pending requests before destroy() If the user calls: client.stop() client.destroy() We should ensure that the final 'stopped' message reaches the tracker server, even though the client will not get the response (because they destroyed the client and no more events will be emitted). If there are pending requests when destroy() is called, then a 1s timer is set after which point all requests are forcibly cleaned up. If the requests complete before the 1s timer fires, then cleanup happens right away (so we're not stuck waiting for the 1s timer). So, destroy() can happen one of three ways: - immediately, if no pending requests exist - after exactly 1s, if pending requests exist and they don't complete within 1s - less than 1s, if pending requests exist and they all complete before the 1s timer fires --- lib/client/http-tracker.js | 60 +++++++++++++++--- lib/client/udp-tracker.js | 107 +++++++++++++++++++++----------- lib/client/websocket-tracker.js | 69 +++++++++++++------- lib/common-node.js | 12 ++++ 4 files changed, 179 insertions(+), 69 deletions(-) diff --git a/lib/client/http-tracker.js b/lib/client/http-tracker.js index 570a910..0398773 100644 --- a/lib/client/http-tracker.js +++ b/lib/client/http-tracker.js @@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) { // Determine scrape url (if http tracker supports it) self.scrapeUrl = null - var m - if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) { - self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' + - self.announceUrl.slice(m.index + 9) + + var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT) + if (match) { + var pre = self.announceUrl.slice(0, match.index) + var post = self.announceUrl.slice(match.index + 9) + self.scrapeUrl = pre + '/scrape' + post } + + self.cleanupFns = [] + self.maybeDestroyCleanup = null } HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } HTTPTracker.prototype._request = function (requestUrl, params, cb) { var self = this var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + common.querystringStringify(params) - var opts = { + + self.cleanupFns.push(cleanup) + + var request = get.concat({ url: u, + timeout: common.REQUEST_TIMEOUT, headers: { 'user-agent': self.client._userAgent || '' } + }, onResponse) + + function cleanup () { + if (request) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + request.abort() + request = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() } - get.concat(opts, function (err, res, data) { + function onResponse (err, res, data) { + cleanup() if (self.destroyed) return + if (err) return self.client.emit('warning', err) if (res.statusCode !== 200) { return self.client.emit('warning', new Error('Non-200 response code ' + @@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) { debug('response from ' + requestUrl) cb(data) - }) + } } HTTPTracker.prototype._onAnnounceResponse = function (data) { diff --git a/lib/client/udp-tracker.js b/lib/client/udp-tracker.js index b9ccbb2..ae6e06c 100644 --- a/lib/client/udp-tracker.js +++ b/lib/client/udp-tracker.js @@ -12,8 +12,6 @@ var url = require('url') var common = require('../common') var Tracker = require('./tracker') -var TIMEOUT = 15000 - inherits(UDPTracker, Tracker) /** @@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) { debug('new udp tracker %s', announceUrl) self.cleanupFns = [] + self.maybeDestroyCleanup = null } UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - self.cleanupFns.slice(0).forEach(function (cleanup) { - cleanup() - }) - self.cleanupFns = [] - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } UDPTracker.prototype._request = function (opts) { @@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) { var transactionId = genTransactionId() var socket = dgram.createSocket('udp4') - var cleanup = function () { - if (!socket) return - self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) - if (timeout) { - clearTimeout(timeout) - timeout = null - } - socket.removeListener('error', onError) - socket.removeListener('message', onSocketMessage) - socket.on('error', noop) // ignore all future errors - try { socket.close() } catch (err) {} - socket = null - } - self.cleanupFns.push(cleanup) - - // does not matter if `stopped` event arrives, so supress errors & cleanup after timeout - var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT var timeout = setTimeout(function () { - timeout = null + // does not matter if `stopped` event arrives, so supress errors if (opts.event === 'stopped') cleanup() else onError(new Error('tracker request timed out (' + opts.event + ')')) - }, ms) + timeout = null + }, common.REQUEST_TIMEOUT) if (timeout.unref) timeout.unref() + self.cleanupFns.push(cleanup) + send(Buffer.concat([ common.CONNECTION_ID, common.toUInt32(common.ACTIONS.CONNECT), transactionId ])) - socket.on('error', onError) + socket.once('error', onError) socket.on('message', onSocketMessage) - function onSocketMessage (msg) { + function cleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + if (socket) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + socket.removeListener('error', onError) + socket.removeListener('message', onSocketMessage) + socket.on('error', noop) // ignore all future errors + try { socket.close() } catch (err) {} + socket = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() + } + + function onError (err) { + cleanup() if (self.destroyed) return + + if (err.message) err.message += ' (' + self.announceUrl + ')' + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', err) + } + + function onSocketMessage (msg) { if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { return onError(new Error('tracker sent invalid transaction id')) } @@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) { debug('UDP response %s, action %s', self.announceUrl, action) switch (action) { case 0: // handshake + // Note: no check for `self.destroyed` so that pending messages to the + // tracker can still be sent/received even after destroy() is called + if (msg.length < 16) return onError(new Error('invalid udp handshake')) if (opts._scrape) scrape(msg.slice(8, 16)) else announce(msg.slice(8, 16), opts) - return + break case 1: // announce cleanup() + if (self.destroyed) return + if (msg.length < 20) return onError(new Error('invalid announce message')) var interval = msg.readUInt32BE(8) @@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) { addrs.forEach(function (addr) { self.client.emit('peer', addr) }) + break case 2: // scrape cleanup() + if (self.destroyed) return + if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { return onError(new Error('invalid scrape message')) } @@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) { incomplete: msg.readUInt32BE(16 + (i * 12)) }) } + break case 3: // error cleanup() + if (self.destroyed) return + if (msg.length < 8) return onError(new Error('invalid error message')) self.client.emit('warning', new Error(msg.slice(8).toString())) + break default: @@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) { } } - function onError (err) { - if (self.destroyed) return - cleanup() - if (err.message) err.message += ' (' + self.announceUrl + ')' - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', err) - } - function send (message) { if (!parsedUrl.port) { parsedUrl.port = 80 diff --git a/lib/client/websocket-tracker.js b/lib/client/websocket-tracker.js index 3cd7d1f..0e7844b 100644 --- a/lib/client/websocket-tracker.js +++ b/lib/client/websocket-tracker.js @@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) { self.retries = 0 self.reconnectTimer = null + // Simple boolean flag to track whether the socket has received data from + // the websocket server since the last time socket.send() was called. + self.expectingResponse = false + self._openSocket() } @@ -104,18 +108,6 @@ WebSocketTracker.prototype.destroy = function (cb) { clearInterval(self.interval) clearTimeout(self.reconnectTimer) - if (self.socket) { - self.socket.removeListener('connect', self._onSocketConnectBound) - self.socket.removeListener('data', self._onSocketDataBound) - self.socket.removeListener('close', self._onSocketCloseBound) - self.socket.removeListener('error', self._onSocketErrorBound) - } - - self._onSocketConnectBound = null - self._onSocketErrorBound = null - self._onSocketDataBound = null - self._onSocketCloseBound = null - // Destroy peers for (var peerId in self.peers) { var peer = self.peers[peerId] @@ -124,24 +116,51 @@ WebSocketTracker.prototype.destroy = function (cb) { } self.peers = null + if (self.socket) { + self.socket.removeListener('connect', self._onSocketConnectBound) + self.socket.removeListener('data', self._onSocketDataBound) + self.socket.removeListener('close', self._onSocketCloseBound) + self.socket.removeListener('error', self._onSocketErrorBound) + self.socket = null + } + + self._onSocketConnectBound = null + self._onSocketErrorBound = null + self._onSocketDataBound = null + self._onSocketCloseBound = null + if (socketPool[self.announceUrl]) { socketPool[self.announceUrl].consumers -= 1 } - if (socketPool[self.announceUrl].consumers === 0) { - delete socketPool[self.announceUrl] + // Other instances are using the socket, so there's nothing left to do here + if (socketPool[self.announceUrl].consumers > 0) return cb() - try { - self.socket.on('error', noop) // ignore all future errors - self.socket.destroy(cb) - } catch (err) { - cb(null) + var socket = socketPool[self.announceUrl] + delete socketPool[self.announceUrl] + socket.on('error', noop) // ignore all future errors + socket.once('close', cb) + + // If there is no data response expected, destroy immediately. + if (!self.expectingResponse) return destroyCleanup() + + // Otherwise, wait a short time for potential responses to come in from the + // server, then force close the socket. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if a response comes from the server before the timeout fires, do cleanup + // right away. + socket.once('data', destroyCleanup) + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null } - } else { - cb(null) + socket.removeListener('data', destroyCleanup) + socket.destroy() + socket = null } - - self.socket = null } WebSocketTracker.prototype._openSocket = function () { @@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) { var self = this if (self.destroyed) return + self.expectingResponse = false + try { data = JSON.parse(data) } catch (err) { @@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () { WebSocketTracker.prototype._send = function (params) { var self = this if (self.destroyed) return - + self.expectingResponse = true var message = JSON.stringify(params) debug('send %s', message) self.socket.send(message) diff --git a/lib/common-node.js b/lib/common-node.js index 5cc1c0f..55ef8eb 100644 --- a/lib/common-node.js +++ b/lib/common-node.js @@ -26,6 +26,18 @@ exports.EVENT_NAMES = { stopped: 'stop' } +/** + * Client request timeout. How long to wait before considering a request to a + * tracker server to have timed out. + */ +exports.REQUEST_TIMEOUT = 15000 + +/** + * Client destroy timeout. How long to wait before forcibly cleaning up all + * pending requests, open sockets, etc. + */ +exports.DESTROY_TIMEOUT = 1000 + function toUInt32 (n) { var buf = Buffer.allocUnsafe(4) buf.writeUInt32BE(n, 0)