udp/ws clients: add destroy functions

Fixes #75
This commit is contained in:
Feross Aboukhadijeh 2015-05-16 23:25:34 -07:00
parent f285c9dd3c
commit 8788d75737
3 changed files with 87 additions and 65 deletions

View File

@ -227,7 +227,7 @@ Client.prototype.destroy = function () {
debug('destroy') debug('destroy')
self._trackers.forEach(function (tracker) { self._trackers.forEach(function (tracker) {
if (tracker.destroy) tracker.destroy() tracker.destroy()
tracker.setInterval(0) // stop announcing on intervals tracker.setInterval(0) // stop announcing on intervals
}) })
self._trackers = [] self._trackers = []

View File

@ -11,6 +11,8 @@ var url = require('url')
var common = require('./common') var common = require('./common')
var TIMEOUT = 15000
inherits(UDPTracker, EventEmitter) inherits(UDPTracker, EventEmitter)
/** /**
@ -31,6 +33,7 @@ function UDPTracker (client, announceUrl, opts) {
self._announceUrl = announceUrl self._announceUrl = announceUrl
self._intervalMs = self.client._intervalMs // use client interval initially self._intervalMs = self.client._intervalMs // use client interval initially
self._interval = null self._interval = null
self._cleanupFns = []
} }
UDPTracker.prototype.announce = function (opts) { UDPTracker.prototype.announce = function (opts) {
@ -44,25 +47,60 @@ UDPTracker.prototype.scrape = function (opts) {
self._request(opts) // udp scrape uses same announce url self._request(opts) // udp scrape uses same announce url
} }
// TODO: Improve this interface
UDPTracker.prototype.setInterval = function (intervalMs) {
var self = this
clearInterval(self._interval)
self._intervalMs = intervalMs
if (intervalMs) {
// HACK
var update = self.announce.bind(self, self.client._defaultAnnounceOpts())
self._interval = setInterval(update, self._intervalMs)
}
}
UDPTracker.prototype.destroy = function () {
var self = this
if (self.destroyed) return
self.destroyed = true
self._cleanupFns.slice(0).forEach(function (cleanup) {
cleanup()
})
self._cleanupFns = []
}
UDPTracker.prototype._request = function (opts) { UDPTracker.prototype._request = function (opts) {
var self = this var self = this
if (!opts) opts = {} if (!opts) opts = {}
var parsedUrl = url.parse(self._announceUrl) var parsedUrl = url.parse(self._announceUrl)
var socket = dgram.createSocket('udp4')
var transactionId = genTransactionId() var transactionId = genTransactionId()
var socket = dgram.createSocket('udp4')
var cleanup = function () {
self._cleanupFns.splice(self._cleanupFns.indexOf(cleanup), 1)
if (timeout) {
clearTimeout(timeout)
timeout = null
}
socket.removeListener('error', onError)
socket.removeListener('message', onSocketMessage)
try { socket.close() } catch (err) {}
socket = null
}
self._cleanupFns.push(cleanup)
// does not matter if `stopped` event arrives, so supress errors & cleanup after timeout // does not matter if `stopped` event arrives, so supress errors & cleanup after timeout
var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT
var timeout = setTimeout(function () { var timeout = setTimeout(function () {
timeout = null timeout = null
cleanup() cleanup()
if (opts.event !== 'stopped') { if (opts.event !== 'stopped') {
error('tracker request timed out') onError(new Error('tracker request timed out'))
} }
}, opts.event === 'stopped' ? 1500 : 15000) }, ms)
if (timeout.unref) timeout.unref()
if (timeout && timeout.unref) {
timeout.unref()
}
send(Buffer.concat([ send(Buffer.concat([
common.CONNECTION_ID, common.CONNECTION_ID,
@ -70,34 +108,29 @@ UDPTracker.prototype._request = function (opts) {
transactionId transactionId
])) ]))
socket.on('error', error) socket.on('error', onError)
socket.on('message', onSocketMessage)
socket.on('message', function (msg) { function onSocketMessage (msg) {
if (self.destroyed) return
if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) {
return error('tracker sent invalid transaction id') return onError(new Error('tracker sent invalid transaction id'))
} }
var action = msg.readUInt32BE(0) var action = msg.readUInt32BE(0)
debug(self._announceUrl + ' UDP response, action ' + action) debug('UDP response %s, action %s', self._announceUrl, action)
switch (action) { switch (action) {
case 0: // handshake case 0: // handshake
if (msg.length < 16) { if (msg.length < 16) return onError(new Error('invalid udp handshake'))
return error('invalid udp handshake')
}
if (opts._scrape) { if (opts._scrape) scrape(msg.slice(8, 16))
scrape(msg.slice(8, 16)) else announce(msg.slice(8, 16), opts)
} else {
announce(msg.slice(8, 16), opts)
}
return return
case 1: // announce case 1: // announce
cleanup() cleanup()
if (msg.length < 20) { if (msg.length < 20) return onError(new Error('invalid announce message'))
return error('invalid announce message')
}
var interval = msg.readUInt32BE(8) var interval = msg.readUInt32BE(8)
if (interval && !self._opts.interval && self._intervalMs !== 0) { if (interval && !self._opts.interval && self._intervalMs !== 0) {
@ -126,7 +159,7 @@ UDPTracker.prototype._request = function (opts) {
case 2: // scrape case 2: // scrape
cleanup() cleanup()
if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { if (msg.length < 20 || (msg.length - 8) % 12 !== 0) {
return error('invalid scrape message') return onError(new Error('invalid scrape message'))
} }
var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0) var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
? opts.infoHash.map(function (infoHash) { return infoHash.toString('hex') }) ? opts.infoHash.map(function (infoHash) { return infoHash.toString('hex') })
@ -145,17 +178,23 @@ UDPTracker.prototype._request = function (opts) {
case 3: // error case 3: // error
cleanup() cleanup()
if (msg.length < 8) { if (msg.length < 8) return onError(new Error('invalid error message'))
return error('invalid error message')
}
self.client.emit('warning', new Error(msg.slice(8).toString())) self.client.emit('warning', new Error(msg.slice(8).toString()))
break break
default: default:
error('tracker sent invalid action') onError(new Error('tracker sent invalid action'))
break break
} }
}) }
function onError (err) {
if (self.destroyed) return
cleanup()
if (err.message) err.message += ' (' + self._announceUrl + ')'
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', err)
}
function send (message) { function send (message) {
if (!parsedUrl.port) { if (!parsedUrl.port) {
@ -164,20 +203,6 @@ UDPTracker.prototype._request = function (opts) {
socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname) socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname)
} }
function error (message) {
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', new Error(message + ' (' + self._announceUrl + ')'))
cleanup()
}
function cleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
try { socket.close() } catch (err) {}
}
function announce (connectionId, opts) { function announce (connectionId, opts) {
opts = opts || {} opts = opts || {}
transactionId = genTransactionId() transactionId = genTransactionId()
@ -215,19 +240,6 @@ UDPTracker.prototype._request = function (opts) {
} }
} }
// TODO: Improve this interface
UDPTracker.prototype.setInterval = function (intervalMs) {
var self = this
clearInterval(self._interval)
self._intervalMs = intervalMs
if (intervalMs) {
// HACK
var update = self.announce.bind(self, self.client._defaultAnnounceOpts())
self._interval = setInterval(update, self._intervalMs)
}
}
function genTransactionId () { function genTransactionId () {
return new Buffer(hat(32), 'hex') return new Buffer(hat(32), 'hex')
} }

View File

@ -1,4 +1,4 @@
// TODO: destroy the websocket // TODO: cleanup unused Peers when tracker doesn't respond with matches
module.exports = WebSocketTracker module.exports = WebSocketTracker
@ -11,10 +11,6 @@ var Socket = require('simple-websocket')
var common = require('./common') var common = require('./common')
// It turns out that you can't open multiple websockets to the same server within one
// browser tab, so let's reuse them.
var socketPool = {}
inherits(WebSocketTracker, EventEmitter) inherits(WebSocketTracker, EventEmitter)
function WebSocketTracker (client, announceUrl, opts) { function WebSocketTracker (client, announceUrl, opts) {
@ -31,11 +27,12 @@ function WebSocketTracker (client, announceUrl, opts) {
self._intervalMs = self.client._intervalMs // use client interval initially self._intervalMs = self.client._intervalMs // use client interval initially
self._interval = null self._interval = null
if (socketPool[announceUrl]) self._socket = socketPool[announceUrl] self._onSocketErrorBound = self._onSocketError.bind(self)
else self._socket = socketPool[announceUrl] = new Socket(announceUrl) self._onSocketDataBound = self._onSocketData.bind(self)
self._socket.on('error', self._onSocketError.bind(self)) self._socket = new Socket(announceUrl + '?' + hat(40))
self._socket.on('data', self._onSocketData.bind(self)) self._socket.on('error', self._onSocketErrorBound)
self._socket.on('data', self._onSocketDataBound)
} }
WebSocketTracker.prototype.announce = function (opts) { WebSocketTracker.prototype.announce = function (opts) {
@ -79,13 +76,24 @@ WebSocketTracker.prototype.setInterval = function (intervalMs) {
} }
} }
WebSocketTracker.prototype.destroy = function () {
var self = this
if (self.destroyed) return
self.destroyed = true
self._socket.removeListener('error', self._onSocketErrorBound)
self._socket.removeListener('data', self._onSocketDataBound)
self._socket.close()
}
WebSocketTracker.prototype._onSocketError = function (err) { WebSocketTracker.prototype._onSocketError = function (err) {
var self = this var self = this
if (self.destroyed) return
self.client.emit('error', err) self.client.emit('error', err)
} }
WebSocketTracker.prototype._onSocketData = function (data) { WebSocketTracker.prototype._onSocketData = function (data) {
var self = this var self = this
if (self.destroyed) return
if (!(typeof data === 'object' && data !== null)) { if (!(typeof data === 'object' && data !== null)) {
return self.client.emit('warning', new Error('Invalid tracker response')) return self.client.emit('warning', new Error('Invalid tracker response'))
@ -161,6 +169,8 @@ WebSocketTracker.prototype._onSocketData = function (data) {
WebSocketTracker.prototype._send = function (params) { WebSocketTracker.prototype._send = function (params) {
var self = this var self = this
if (self.destroyed) return
var message = JSON.stringify(params) var message = JSON.stringify(params)
debug('send %s', message) debug('send %s', message)
self._socket.send(message) self._socket.send(message)