mirror of
https://github.com/webtorrent/bittorrent-tracker.git
synced 2024-12-14 03:16:24 +00:00
48d0ea42ad
Handle peer 'error' events that are fired *before* the peer is emitted in a 'peer' event. Once the peer is emitted in a 'peer' event, then it's the consumer's responsibility to listen for errors. This fixes the most common error in WebTorrent Desktop according to our telemetry.
447 lines
12 KiB
JavaScript
447 lines
12 KiB
JavaScript
module.exports = WebSocketTracker
|
|
|
|
var debug = require('debug')('bittorrent-tracker:websocket-tracker')
|
|
var extend = require('xtend')
|
|
var inherits = require('inherits')
|
|
var Peer = require('simple-peer')
|
|
var randombytes = require('randombytes')
|
|
var Socket = require('simple-websocket')
|
|
|
|
var common = require('../common')
|
|
var Tracker = require('./tracker')
|
|
|
|
// Use a socket pool, so tracker clients share WebSocket objects for the same server.
|
|
// In practice, WebSockets are pretty slow to establish, so this gives a nice performance
|
|
// boost, and saves browser resources.
|
|
var socketPool = {}
|
|
|
|
var RECONNECT_MINIMUM = 15 * 1000
|
|
var RECONNECT_MAXIMUM = 30 * 60 * 1000
|
|
var RECONNECT_VARIANCE = 30 * 1000
|
|
var OFFER_TIMEOUT = 50 * 1000
|
|
|
|
inherits(WebSocketTracker, Tracker)
|
|
|
|
function WebSocketTracker (client, announceUrl, opts) {
|
|
var self = this
|
|
Tracker.call(self, client, announceUrl)
|
|
debug('new websocket tracker %s', announceUrl)
|
|
|
|
self.peers = {} // peers (offer id -> peer)
|
|
self.socket = null
|
|
|
|
self.reconnecting = false
|
|
self.retries = 0
|
|
self.reconnectTimer = null
|
|
|
|
// Simple boolean flag to track whether the socket has received data from
|
|
// the websocket server since the last time socket.send() was called.
|
|
self.expectingResponse = false
|
|
|
|
self._openSocket()
|
|
}
|
|
|
|
WebSocketTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 1000 // 30 seconds
|
|
|
|
WebSocketTracker.prototype.announce = function (opts) {
|
|
var self = this
|
|
if (self.destroyed || self.reconnecting) return
|
|
if (!self.socket.connected) {
|
|
self.socket.once('connect', function () {
|
|
self.announce(opts)
|
|
})
|
|
return
|
|
}
|
|
|
|
var params = extend(opts, {
|
|
action: 'announce',
|
|
info_hash: self.client._infoHashBinary,
|
|
peer_id: self.client._peerIdBinary
|
|
})
|
|
if (self._trackerId) params.trackerid = self._trackerId
|
|
|
|
if (opts.event === 'stopped' || opts.event === 'completed') {
|
|
// Don't include offers with 'stopped' or 'completed' event
|
|
self._send(params)
|
|
} else {
|
|
// Limit the number of offers that are generated, since it can be slow
|
|
var numwant = Math.min(opts.numwant, 10)
|
|
|
|
self._generateOffers(numwant, function (offers) {
|
|
params.numwant = numwant
|
|
params.offers = offers
|
|
self._send(params)
|
|
})
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype.scrape = function (opts) {
|
|
var self = this
|
|
if (self.destroyed || self.reconnecting) return
|
|
if (!self.socket.connected) {
|
|
self.socket.once('connect', function () {
|
|
self.scrape(opts)
|
|
})
|
|
return
|
|
}
|
|
|
|
var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
|
|
? opts.infoHash.map(function (infoHash) {
|
|
return infoHash.toString('binary')
|
|
})
|
|
: (opts.infoHash && opts.infoHash.toString('binary')) || self.client._infoHashBinary
|
|
var params = {
|
|
action: 'scrape',
|
|
info_hash: infoHashes
|
|
}
|
|
|
|
self._send(params)
|
|
}
|
|
|
|
WebSocketTracker.prototype.destroy = function (cb) {
|
|
var self = this
|
|
if (!cb) cb = noop
|
|
if (self.destroyed) return cb(null)
|
|
|
|
self.destroyed = true
|
|
|
|
clearInterval(self.interval)
|
|
clearTimeout(self.reconnectTimer)
|
|
|
|
// Destroy peers
|
|
for (var peerId in self.peers) {
|
|
var peer = self.peers[peerId]
|
|
clearTimeout(peer.trackerTimeout)
|
|
peer.destroy()
|
|
}
|
|
self.peers = null
|
|
|
|
if (self.socket) {
|
|
self.socket.removeListener('connect', self._onSocketConnectBound)
|
|
self.socket.removeListener('data', self._onSocketDataBound)
|
|
self.socket.removeListener('close', self._onSocketCloseBound)
|
|
self.socket.removeListener('error', self._onSocketErrorBound)
|
|
self.socket = null
|
|
}
|
|
|
|
self._onSocketConnectBound = null
|
|
self._onSocketErrorBound = null
|
|
self._onSocketDataBound = null
|
|
self._onSocketCloseBound = null
|
|
|
|
if (socketPool[self.announceUrl]) {
|
|
socketPool[self.announceUrl].consumers -= 1
|
|
}
|
|
|
|
// Other instances are using the socket, so there's nothing left to do here
|
|
if (socketPool[self.announceUrl].consumers > 0) return cb()
|
|
|
|
var socket = socketPool[self.announceUrl]
|
|
delete socketPool[self.announceUrl]
|
|
socket.on('error', noop) // ignore all future errors
|
|
socket.once('close', cb)
|
|
|
|
// If there is no data response expected, destroy immediately.
|
|
if (!self.expectingResponse) return destroyCleanup()
|
|
|
|
// Otherwise, wait a short time for potential responses to come in from the
|
|
// server, then force close the socket.
|
|
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
|
|
|
|
// But, if a response comes from the server before the timeout fires, do cleanup
|
|
// right away.
|
|
socket.once('data', destroyCleanup)
|
|
|
|
function destroyCleanup () {
|
|
if (timeout) {
|
|
clearTimeout(timeout)
|
|
timeout = null
|
|
}
|
|
socket.removeListener('data', destroyCleanup)
|
|
socket.destroy()
|
|
socket = null
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype._openSocket = function () {
|
|
var self = this
|
|
self.destroyed = false
|
|
|
|
if (!self.peers) self.peers = {}
|
|
|
|
self._onSocketConnectBound = function () {
|
|
self._onSocketConnect()
|
|
}
|
|
self._onSocketErrorBound = function (err) {
|
|
self._onSocketError(err)
|
|
}
|
|
self._onSocketDataBound = function (data) {
|
|
self._onSocketData(data)
|
|
}
|
|
self._onSocketCloseBound = function () {
|
|
self._onSocketClose()
|
|
}
|
|
|
|
self.socket = socketPool[self.announceUrl]
|
|
if (self.socket) {
|
|
socketPool[self.announceUrl].consumers += 1
|
|
} else {
|
|
self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl)
|
|
self.socket.consumers = 1
|
|
self.socket.once('connect', self._onSocketConnectBound)
|
|
}
|
|
|
|
self.socket.on('data', self._onSocketDataBound)
|
|
self.socket.once('close', self._onSocketCloseBound)
|
|
self.socket.once('error', self._onSocketErrorBound)
|
|
}
|
|
|
|
WebSocketTracker.prototype._onSocketConnect = function () {
|
|
var self = this
|
|
if (self.destroyed) return
|
|
|
|
if (self.reconnecting) {
|
|
self.reconnecting = false
|
|
self.retries = 0
|
|
self.announce(self.client._defaultAnnounceOpts())
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype._onSocketData = function (data) {
|
|
var self = this
|
|
if (self.destroyed) return
|
|
|
|
self.expectingResponse = false
|
|
|
|
try {
|
|
data = JSON.parse(data)
|
|
} catch (err) {
|
|
self.client.emit('warning', new Error('Invalid tracker response'))
|
|
return
|
|
}
|
|
|
|
if (data.action === 'announce') {
|
|
self._onAnnounceResponse(data)
|
|
} else if (data.action === 'scrape') {
|
|
self._onScrapeResponse(data)
|
|
} else {
|
|
self._onSocketError(new Error('invalid action in WS response: ' + data.action))
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype._onAnnounceResponse = function (data) {
|
|
var self = this
|
|
|
|
if (data.info_hash !== self.client._infoHashBinary) {
|
|
debug(
|
|
'ignoring websocket data from %s for %s (looking for %s: reused socket)',
|
|
self.announceUrl, common.binaryToHex(data.info_hash), self.client.infoHash
|
|
)
|
|
return
|
|
}
|
|
|
|
if (data.peer_id && data.peer_id === self.client._peerIdBinary) {
|
|
// ignore offers/answers from this client
|
|
return
|
|
}
|
|
|
|
debug(
|
|
'received %s from %s for %s',
|
|
JSON.stringify(data), self.announceUrl, self.client.infoHash
|
|
)
|
|
|
|
var failure = data['failure reason']
|
|
if (failure) return self.client.emit('warning', new Error(failure))
|
|
|
|
var warning = data['warning message']
|
|
if (warning) self.client.emit('warning', new Error(warning))
|
|
|
|
var interval = data.interval || data['min interval']
|
|
if (interval) self.setInterval(interval * 1000)
|
|
|
|
var trackerId = data['tracker id']
|
|
if (trackerId) {
|
|
// If absent, do not discard previous trackerId value
|
|
self._trackerId = trackerId
|
|
}
|
|
|
|
if (data.complete != null) {
|
|
var response = Object.assign({}, data, {
|
|
announce: self.announceUrl,
|
|
infoHash: common.binaryToHex(data.info_hash)
|
|
})
|
|
self.client.emit('update', response)
|
|
}
|
|
|
|
var peer
|
|
if (data.offer && data.peer_id) {
|
|
debug('creating peer (from remote offer)')
|
|
peer = self._createPeer({
|
|
trickle: false,
|
|
config: self.client._rtcConfig,
|
|
wrtc: self.client._wrtc
|
|
})
|
|
peer.id = common.binaryToHex(data.peer_id)
|
|
peer.once('signal', function (answer) {
|
|
var params = {
|
|
action: 'announce',
|
|
info_hash: self.client._infoHashBinary,
|
|
peer_id: self.client._peerIdBinary,
|
|
to_peer_id: data.peer_id,
|
|
answer: answer,
|
|
offer_id: data.offer_id
|
|
}
|
|
if (self._trackerId) params.trackerid = self._trackerId
|
|
self._send(params)
|
|
})
|
|
peer.signal(data.offer)
|
|
self.client.emit('peer', peer)
|
|
}
|
|
|
|
if (data.answer && data.peer_id) {
|
|
var offerId = common.binaryToHex(data.offer_id)
|
|
peer = self.peers[offerId]
|
|
if (peer) {
|
|
peer.id = common.binaryToHex(data.peer_id)
|
|
peer.signal(data.answer)
|
|
self.client.emit('peer', peer)
|
|
|
|
clearTimeout(peer.trackerTimeout)
|
|
peer.trackerTimeout = null
|
|
delete self.peers[offerId]
|
|
} else {
|
|
debug('got unexpected answer: ' + JSON.stringify(data.answer))
|
|
}
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype._onScrapeResponse = function (data) {
|
|
var self = this
|
|
data = data.files || {}
|
|
|
|
var keys = Object.keys(data)
|
|
if (keys.length === 0) {
|
|
self.client.emit('warning', new Error('invalid scrape response'))
|
|
return
|
|
}
|
|
|
|
keys.forEach(function (infoHash) {
|
|
// TODO: optionally handle data.flags.min_request_interval
|
|
// (separate from announce interval)
|
|
var response = Object.assign(data[infoHash], {
|
|
announce: self.announceUrl,
|
|
infoHash: common.binaryToHex(infoHash)
|
|
})
|
|
self.client.emit('scrape', response)
|
|
})
|
|
}
|
|
|
|
WebSocketTracker.prototype._onSocketClose = function () {
|
|
var self = this
|
|
if (self.destroyed) return
|
|
self.destroy()
|
|
self._startReconnectTimer()
|
|
}
|
|
|
|
WebSocketTracker.prototype._onSocketError = function (err) {
|
|
var self = this
|
|
if (self.destroyed) return
|
|
self.destroy()
|
|
// errors will often happen if a tracker is offline, so don't treat it as fatal
|
|
self.client.emit('warning', err)
|
|
self._startReconnectTimer()
|
|
}
|
|
|
|
WebSocketTracker.prototype._startReconnectTimer = function () {
|
|
var self = this
|
|
var ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, self.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM)
|
|
|
|
self.reconnecting = true
|
|
clearTimeout(self.reconnectTimer)
|
|
self.reconnectTimer = setTimeout(function () {
|
|
self.retries++
|
|
self._openSocket()
|
|
}, ms)
|
|
if (self.reconnectTimer.unref) self.reconnectTimer.unref()
|
|
|
|
debug('reconnecting socket in %s ms', ms)
|
|
}
|
|
|
|
WebSocketTracker.prototype._send = function (params) {
|
|
var self = this
|
|
if (self.destroyed) return
|
|
self.expectingResponse = true
|
|
var message = JSON.stringify(params)
|
|
debug('send %s', message)
|
|
self.socket.send(message)
|
|
}
|
|
|
|
WebSocketTracker.prototype._generateOffers = function (numwant, cb) {
|
|
var self = this
|
|
var offers = []
|
|
debug('generating %s offers', numwant)
|
|
|
|
for (var i = 0; i < numwant; ++i) {
|
|
generateOffer()
|
|
}
|
|
checkDone()
|
|
|
|
function generateOffer () {
|
|
var offerId = randombytes(20).toString('hex')
|
|
debug('creating peer (from _generateOffers)')
|
|
var peer = self.peers[offerId] = self._createPeer({
|
|
initiator: true,
|
|
trickle: false,
|
|
config: self.client._rtcConfig,
|
|
wrtc: self.client._wrtc
|
|
})
|
|
peer.once('signal', function (offer) {
|
|
offers.push({
|
|
offer: offer,
|
|
offer_id: common.hexToBinary(offerId)
|
|
})
|
|
checkDone()
|
|
})
|
|
peer.trackerTimeout = setTimeout(function () {
|
|
debug('tracker timeout: destroying peer')
|
|
peer.trackerTimeout = null
|
|
delete self.peers[offerId]
|
|
peer.destroy()
|
|
}, OFFER_TIMEOUT)
|
|
if (peer.trackerTimeout.unref) peer.trackerTimeout.unref()
|
|
}
|
|
|
|
function checkDone () {
|
|
if (offers.length === numwant) {
|
|
debug('generated %s offers', numwant)
|
|
cb(offers)
|
|
}
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype._createPeer = function (opts) {
|
|
var self = this
|
|
var peer = new Peer(opts)
|
|
|
|
peer.once('error', onError)
|
|
peer.once('connect', onConnect)
|
|
|
|
return peer
|
|
|
|
// Handle peer 'error' events that are fired *before* the peer is emitted in
|
|
// a 'peer' event.
|
|
function onError (err) {
|
|
self.client.emit('warning', new Error('Connection error: ' + err.message))
|
|
peer.destroy()
|
|
}
|
|
|
|
// Once the peer is emitted in a 'peer' event, then it's the consumer's
|
|
// responsibility to listen for errors, so the listeners are removed here.
|
|
function onConnect () {
|
|
peer.removeListener('error', onError)
|
|
peer.removeListener('connect', onConnect)
|
|
}
|
|
}
|
|
|
|
function noop () {}
|