Merge pull request #194 from feross/final-wait

Wait up to 1s for pending requests before destroy()
This commit is contained in:
Feross Aboukhadijeh 2017-01-23 14:59:19 -08:00 committed by GitHub
commit 3b463e4dbf
9 changed files with 273 additions and 106 deletions

View File

@ -1,7 +1,7 @@
module.exports = Client
var Buffer = require('safe-buffer').Buffer
var debug = require('debug')('bittorrent-tracker')
var debug = require('debug')('bittorrent-tracker:client')
var EventEmitter = require('events').EventEmitter
var extend = require('xtend')
var inherits = require('inherits')

View File

@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) {
// Determine scrape url (if http tracker supports it)
self.scrapeUrl = null
var m
if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) {
self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' +
self.announceUrl.slice(m.index + 9)
var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT)
if (match) {
var pre = self.announceUrl.slice(0, match.index)
var post = self.announceUrl.slice(match.index + 9)
self.scrapeUrl = pre + '/scrape' + post
}
self.cleanupFns = []
self.maybeDestroyCleanup = null
}
HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) {
self.destroyed = true
clearInterval(self.interval)
cb(null)
// If there are no pending requests, destroy immediately.
if (self.cleanupFns.length === 0) return destroyCleanup()
// Otherwise, wait a short time for pending requests to complete, then force
// destroy them.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if all pending requests complete before the timeout fires, do cleanup
// right away.
self.maybeDestroyCleanup = function () {
if (self.cleanupFns.length === 0) destroyCleanup()
}
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
self.maybeDestroyCleanup = null
self.cleanupFns.slice(0).forEach(function (cleanup) {
cleanup()
})
self.cleanupFns = []
cb(null)
}
}
HTTPTracker.prototype._request = function (requestUrl, params, cb) {
var self = this
var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') +
common.querystringStringify(params)
var opts = {
self.cleanupFns.push(cleanup)
var request = get.concat({
url: u,
timeout: common.REQUEST_TIMEOUT,
headers: {
'user-agent': self.client._userAgent || ''
}
}, onResponse)
function cleanup () {
if (request) {
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
request.abort()
request = null
}
if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
}
get.concat(opts, function (err, res, data) {
function onResponse (err, res, data) {
cleanup()
if (self.destroyed) return
if (err) return self.client.emit('warning', err)
if (res.statusCode !== 200) {
return self.client.emit('warning', new Error('Non-200 response code ' +
@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) {
debug('response from ' + requestUrl)
cb(data)
})
}
}
HTTPTracker.prototype._onAnnounceResponse = function (data) {

View File

@ -12,8 +12,6 @@ var url = require('url')
var common = require('../common')
var Tracker = require('./tracker')
var TIMEOUT = 15000
inherits(UDPTracker, Tracker)
/**
@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) {
debug('new udp tracker %s', announceUrl)
self.cleanupFns = []
self.maybeDestroyCleanup = null
}
UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) {
self.destroyed = true
clearInterval(self.interval)
self.cleanupFns.slice(0).forEach(function (cleanup) {
cleanup()
})
self.cleanupFns = []
cb(null)
// If there are no pending requests, destroy immediately.
if (self.cleanupFns.length === 0) return destroyCleanup()
// Otherwise, wait a short time for pending requests to complete, then force
// destroy them.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if all pending requests complete before the timeout fires, do cleanup
// right away.
self.maybeDestroyCleanup = function () {
if (self.cleanupFns.length === 0) destroyCleanup()
}
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
self.maybeDestroyCleanup = null
self.cleanupFns.slice(0).forEach(function (cleanup) {
cleanup()
})
self.cleanupFns = []
cb(null)
}
}
UDPTracker.prototype._request = function (opts) {
@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) {
var transactionId = genTransactionId()
var socket = dgram.createSocket('udp4')
var cleanup = function () {
if (!socket) return
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
if (timeout) {
clearTimeout(timeout)
timeout = null
}
socket.removeListener('error', onError)
socket.removeListener('message', onSocketMessage)
socket.on('error', noop) // ignore all future errors
try { socket.close() } catch (err) {}
socket = null
}
self.cleanupFns.push(cleanup)
// does not matter if `stopped` event arrives, so supress errors & cleanup after timeout
var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT
var timeout = setTimeout(function () {
timeout = null
// does not matter if `stopped` event arrives, so supress errors
if (opts.event === 'stopped') cleanup()
else onError(new Error('tracker request timed out (' + opts.event + ')'))
}, ms)
timeout = null
}, common.REQUEST_TIMEOUT)
if (timeout.unref) timeout.unref()
self.cleanupFns.push(cleanup)
send(Buffer.concat([
common.CONNECTION_ID,
common.toUInt32(common.ACTIONS.CONNECT),
transactionId
]))
socket.on('error', onError)
socket.once('error', onError)
socket.on('message', onSocketMessage)
function onSocketMessage (msg) {
function cleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
if (socket) {
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
socket.removeListener('error', onError)
socket.removeListener('message', onSocketMessage)
socket.on('error', noop) // ignore all future errors
try { socket.close() } catch (err) {}
socket = null
}
if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
}
function onError (err) {
cleanup()
if (self.destroyed) return
if (err.message) err.message += ' (' + self.announceUrl + ')'
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', err)
}
function onSocketMessage (msg) {
if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) {
return onError(new Error('tracker sent invalid transaction id'))
}
@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) {
debug('UDP response %s, action %s', self.announceUrl, action)
switch (action) {
case 0: // handshake
// Note: no check for `self.destroyed` so that pending messages to the
// tracker can still be sent/received even after destroy() is called
if (msg.length < 16) return onError(new Error('invalid udp handshake'))
if (opts._scrape) scrape(msg.slice(8, 16))
else announce(msg.slice(8, 16), opts)
return
break
case 1: // announce
cleanup()
if (self.destroyed) return
if (msg.length < 20) return onError(new Error('invalid announce message'))
var interval = msg.readUInt32BE(8)
@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) {
addrs.forEach(function (addr) {
self.client.emit('peer', addr)
})
break
case 2: // scrape
cleanup()
if (self.destroyed) return
if (msg.length < 20 || (msg.length - 8) % 12 !== 0) {
return onError(new Error('invalid scrape message'))
}
@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) {
incomplete: msg.readUInt32BE(16 + (i * 12))
})
}
break
case 3: // error
cleanup()
if (self.destroyed) return
if (msg.length < 8) return onError(new Error('invalid error message'))
self.client.emit('warning', new Error(msg.slice(8).toString()))
break
default:
@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) {
}
}
function onError (err) {
if (self.destroyed) return
cleanup()
if (err.message) err.message += ' (' + self.announceUrl + ')'
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', err)
}
function send (message) {
if (!parsedUrl.port) {
parsedUrl.port = 80

View File

@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) {
self.retries = 0
self.reconnectTimer = null
// Simple boolean flag to track whether the socket has received data from
// the websocket server since the last time socket.send() was called.
self.expectingResponse = false
self._openSocket()
}
@ -104,18 +108,6 @@ WebSocketTracker.prototype.destroy = function (cb) {
clearInterval(self.interval)
clearTimeout(self.reconnectTimer)
if (self.socket) {
self.socket.removeListener('connect', self._onSocketConnectBound)
self.socket.removeListener('data', self._onSocketDataBound)
self.socket.removeListener('close', self._onSocketCloseBound)
self.socket.removeListener('error', self._onSocketErrorBound)
}
self._onSocketConnectBound = null
self._onSocketErrorBound = null
self._onSocketDataBound = null
self._onSocketCloseBound = null
// Destroy peers
for (var peerId in self.peers) {
var peer = self.peers[peerId]
@ -124,24 +116,51 @@ WebSocketTracker.prototype.destroy = function (cb) {
}
self.peers = null
if (self.socket) {
self.socket.removeListener('connect', self._onSocketConnectBound)
self.socket.removeListener('data', self._onSocketDataBound)
self.socket.removeListener('close', self._onSocketCloseBound)
self.socket.removeListener('error', self._onSocketErrorBound)
self.socket = null
}
self._onSocketConnectBound = null
self._onSocketErrorBound = null
self._onSocketDataBound = null
self._onSocketCloseBound = null
if (socketPool[self.announceUrl]) {
socketPool[self.announceUrl].consumers -= 1
}
if (socketPool[self.announceUrl].consumers === 0) {
delete socketPool[self.announceUrl]
// Other instances are using the socket, so there's nothing left to do here
if (socketPool[self.announceUrl].consumers > 0) return cb()
try {
self.socket.on('error', noop) // ignore all future errors
self.socket.destroy(cb)
} catch (err) {
cb(null)
var socket = socketPool[self.announceUrl]
delete socketPool[self.announceUrl]
socket.on('error', noop) // ignore all future errors
socket.once('close', cb)
// If there is no data response expected, destroy immediately.
if (!self.expectingResponse) return destroyCleanup()
// Otherwise, wait a short time for potential responses to come in from the
// server, then force close the socket.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if a response comes from the server before the timeout fires, do cleanup
// right away.
socket.once('data', destroyCleanup)
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
} else {
cb(null)
socket.removeListener('data', destroyCleanup)
socket.destroy()
socket = null
}
self.socket = null
}
WebSocketTracker.prototype._openSocket = function () {
@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) {
var self = this
if (self.destroyed) return
self.expectingResponse = false
try {
data = JSON.parse(data)
} catch (err) {
@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () {
WebSocketTracker.prototype._send = function (params) {
var self = this
if (self.destroyed) return
self.expectingResponse = true
var message = JSON.stringify(params)
debug('send %s', message)
self.socket.send(message)

View File

@ -26,6 +26,18 @@ exports.EVENT_NAMES = {
stopped: 'stop'
}
/**
* Client request timeout. How long to wait before considering a request to a
* tracker server to have timed out.
*/
exports.REQUEST_TIMEOUT = 15000
/**
* Client destroy timeout. How long to wait before forcibly cleaning up all
* pending requests, open sockets, etc.
*/
exports.DESTROY_TIMEOUT = 1000
function toUInt32 (n) {
var buf = Buffer.allocUnsafe(4)
buf.writeUInt32BE(n, 0)

View File

@ -1,6 +1,6 @@
module.exports = Swarm
var debug = require('debug')('bittorrent-tracker')
var debug = require('debug')('bittorrent-tracker:swarm')
var LRU = require('lru')
var randomIterate = require('random-iterate')

View File

@ -2,7 +2,7 @@ module.exports = Server
var Buffer = require('safe-buffer').Buffer
var bencode = require('bencode')
var debug = require('debug')('bittorrent-tracker')
var debug = require('debug')('bittorrent-tracker:server')
var dgram = require('dgram')
var EventEmitter = require('events').EventEmitter
var http = require('http')

View File

@ -56,7 +56,7 @@ test('ws: client.start()', function (t) {
})
function testClientStop (t, serverType) {
t.plan(3)
t.plan(4)
common.createServer(t, serverType, function (server, announceUrl) {
var client = new Client({
@ -73,7 +73,9 @@ function testClientStop (t, serverType) {
client.start()
setTimeout(function () {
client.once('update', function () {
t.pass('client received response to "start" message')
client.stop()
client.once('update', function (data) {
@ -85,7 +87,7 @@ function testClientStop (t, serverType) {
server.close()
client.destroy()
})
}, 1000)
})
})
}
@ -101,6 +103,60 @@ test('ws: client.stop()', function (t) {
testClientStop(t, 'ws')
})
function testClientStopDestroy (t, serverType) {
t.plan(2)
common.createServer(t, serverType, function (server, announceUrl) {
var client = new Client({
infoHash: fixtures.leaves.parsedTorrent.infoHash,
announce: announceUrl,
peerId: peerId1,
port: port,
wrtc: {}
})
if (serverType === 'ws') common.mockWebsocketTracker(client)
client.on('error', function (err) { t.error(err) })
client.on('warning', function (err) { t.error(err) })
client.start()
client.once('update', function () {
t.pass('client received response to "start" message')
client.stop()
client.on('update', function () { t.fail('client should not receive update after destroy is called') })
// Call destroy() in the same tick as stop(), but the message should still
// be received by the server, though obviously the client won't receive the
// response.
client.destroy()
server.once('stop', function (peer, params) {
t.pass('server received "stop" message')
setTimeout(function () {
// give the websocket server time to finish in progress (stream) messages
// to peers
server.close()
}, 100)
})
})
})
}
test('http: client.stop(); client.destroy()', function (t) {
testClientStopDestroy(t, 'http')
})
test('udp: client.stop(); client.destroy()', function (t) {
testClientStopDestroy(t, 'udp')
})
test('ws: client.stop(); client.destroy()', function (t) {
testClientStopDestroy(t, 'ws')
})
function testClientUpdate (t, serverType) {
t.plan(4)
@ -117,14 +173,14 @@ function testClientUpdate (t, serverType) {
client.on('error', function (err) { t.error(err) })
client.on('warning', function (err) { t.error(err) })
client.setInterval(2000)
client.setInterval(500)
client.start()
client.once('update', function () {
client.setInterval(2000)
client.setInterval(500)
// after interval (2s), we should get another update
// after interval, we should get another update
client.once('update', function (data) {
// received an update!
t.equal(data.announce, announceUrl)

View File

@ -17,7 +17,7 @@ function testFilterOption (t, serverType) {
}
common.createServer(t, opts, function (server, announceUrl) {
var client = new Client({
var client1 = new Client({
infoHash: fixtures.alice.parsedTorrent.infoHash,
announce: announceUrl,
peerId: peerId,
@ -25,29 +25,30 @@ function testFilterOption (t, serverType) {
wrtc: {}
})
client.on('error', function (err) { t.error(err) })
if (serverType === 'ws') common.mockWebsocketTracker(client)
client1.on('error', function (err) { t.error(err) })
if (serverType === 'ws') common.mockWebsocketTracker(client1)
client.once('warning', function (err) {
client1.once('warning', function (err) {
t.ok(/disallowed info_hash/.test(err.message), 'got client warning')
client.destroy(function () {
t.pass('client destroyed')
client = new Client({
client1.destroy(function () {
t.pass('client1 destroyed')
var client2 = new Client({
infoHash: fixtures.leaves.parsedTorrent.infoHash,
announce: announceUrl,
peerId: peerId,
port: 6881,
wrtc: {}
})
if (serverType === 'ws') common.mockWebsocketTracker(client)
if (serverType === 'ws') common.mockWebsocketTracker(client2)
client.on('error', function (err) { t.error(err) })
client.on('warning', function (err) { t.error(err) })
client2.on('error', function (err) { t.error(err) })
client2.on('warning', function (err) { t.error(err) })
client.on('update', function () {
client2.on('update', function () {
t.pass('got announce')
client.destroy(function () { t.pass('client destroyed') })
client2.destroy(function () { t.pass('client2 destroyed') })
server.close(function () { t.pass('server closed') })
})
@ -55,7 +56,7 @@ function testFilterOption (t, serverType) {
t.equal(Object.keys(server.torrents).length, 1)
})
client.start()
client2.start()
})
})
@ -65,7 +66,7 @@ function testFilterOption (t, serverType) {
t.equal(Object.keys(server.torrents).length, 0)
})
client.start()
client1.start()
})
}
@ -93,7 +94,7 @@ function testFilterCustomError (t, serverType) {
}
common.createServer(t, opts, function (server, announceUrl) {
var client = new Client({
var client1 = new Client({
infoHash: fixtures.alice.parsedTorrent.infoHash,
announce: announceUrl,
peerId: peerId,
@ -101,29 +102,29 @@ function testFilterCustomError (t, serverType) {
wrtc: {}
})
client.on('error', function (err) { t.error(err) })
if (serverType === 'ws') common.mockWebsocketTracker(client)
client1.on('error', function (err) { t.error(err) })
if (serverType === 'ws') common.mockWebsocketTracker(client1)
client.once('warning', function (err) {
client1.once('warning', function (err) {
t.ok(/alice blocked/.test(err.message), 'got client warning')
client.destroy(function () {
t.pass('client destroyed')
client = new Client({
client1.destroy(function () {
t.pass('client1 destroyed')
var client2 = new Client({
infoHash: fixtures.leaves.parsedTorrent.infoHash,
announce: announceUrl,
peerId: peerId,
port: 6881,
wrtc: {}
})
if (serverType === 'ws') common.mockWebsocketTracker(client)
if (serverType === 'ws') common.mockWebsocketTracker(client2)
client.on('error', function (err) { t.error(err) })
client.on('warning', function (err) { t.error(err) })
client2.on('error', function (err) { t.error(err) })
client2.on('warning', function (err) { t.error(err) })
client.on('update', function () {
client2.on('update', function () {
t.pass('got announce')
client.destroy(function () { t.pass('client destroyed') })
client2.destroy(function () { t.pass('client2 destroyed') })
server.close(function () { t.pass('server closed') })
})
@ -131,7 +132,7 @@ function testFilterCustomError (t, serverType) {
t.equal(Object.keys(server.torrents).length, 1)
})
client.start()
client2.start()
})
})
@ -141,7 +142,7 @@ function testFilterCustomError (t, serverType) {
t.equal(Object.keys(server.torrents).length, 0)
})
client.start()
client1.start()
})
}