mirror of
https://github.com/webtorrent/bittorrent-tracker.git
synced 2025-01-09 15:51:35 +00:00
dd02cddcb8
This is just a defensive code change to ensure that if signal() were somehow able to cause the peer to synchronously emit the 'connect' event that we won't cause the 'error' listener to be unregistered before 'peer' is emitted and the user has a chance to register their own 'error' listener
435 lines
12 KiB
JavaScript
435 lines
12 KiB
JavaScript
const debug = require('debug')('bittorrent-tracker:websocket-tracker')
|
|
const Peer = require('simple-peer')
|
|
const randombytes = require('randombytes')
|
|
const Socket = require('simple-websocket')
|
|
|
|
const common = require('../common')
|
|
const Tracker = require('./tracker')
|
|
|
|
// Use a socket pool, so tracker clients share WebSocket objects for the same server.
|
|
// In practice, WebSockets are pretty slow to establish, so this gives a nice performance
|
|
// boost, and saves browser resources.
|
|
const socketPool = {}
|
|
|
|
const RECONNECT_MINIMUM = 10 * 1000
|
|
const RECONNECT_MAXIMUM = 60 * 60 * 1000
|
|
const RECONNECT_VARIANCE = 5 * 60 * 1000
|
|
const OFFER_TIMEOUT = 50 * 1000
|
|
|
|
class WebSocketTracker extends Tracker {
|
|
constructor (client, announceUrl, opts) {
|
|
super(client, announceUrl)
|
|
debug('new websocket tracker %s', announceUrl)
|
|
|
|
this.peers = {} // peers (offer id -> peer)
|
|
this.socket = null
|
|
|
|
this.reconnecting = false
|
|
this.retries = 0
|
|
this.reconnectTimer = null
|
|
|
|
// Simple boolean flag to track whether the socket has received data from
|
|
// the websocket server since the last time socket.send() was called.
|
|
this.expectingResponse = false
|
|
|
|
this._openSocket()
|
|
}
|
|
|
|
announce (opts) {
|
|
if (this.destroyed || this.reconnecting) return
|
|
if (!this.socket.connected) {
|
|
this.socket.once('connect', () => {
|
|
this.announce(opts)
|
|
})
|
|
return
|
|
}
|
|
|
|
const params = Object.assign({}, opts, {
|
|
action: 'announce',
|
|
info_hash: this.client._infoHashBinary,
|
|
peer_id: this.client._peerIdBinary
|
|
})
|
|
if (this._trackerId) params.trackerid = this._trackerId
|
|
|
|
if (opts.event === 'stopped' || opts.event === 'completed') {
|
|
// Don't include offers with 'stopped' or 'completed' event
|
|
this._send(params)
|
|
} else {
|
|
// Limit the number of offers that are generated, since it can be slow
|
|
const numwant = Math.min(opts.numwant, 5)
|
|
|
|
this._generateOffers(numwant, offers => {
|
|
params.numwant = numwant
|
|
params.offers = offers
|
|
this._send(params)
|
|
})
|
|
}
|
|
}
|
|
|
|
scrape (opts) {
|
|
if (this.destroyed || this.reconnecting) return
|
|
if (!this.socket.connected) {
|
|
this.socket.once('connect', () => {
|
|
this.scrape(opts)
|
|
})
|
|
return
|
|
}
|
|
|
|
const infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
|
|
? opts.infoHash.map(infoHash => {
|
|
return infoHash.toString('binary')
|
|
})
|
|
: (opts.infoHash && opts.infoHash.toString('binary')) || this.client._infoHashBinary
|
|
const params = {
|
|
action: 'scrape',
|
|
info_hash: infoHashes
|
|
}
|
|
|
|
this._send(params)
|
|
}
|
|
|
|
destroy (cb = noop) {
|
|
if (this.destroyed) return cb(null)
|
|
|
|
this.destroyed = true
|
|
|
|
clearInterval(this.interval)
|
|
clearTimeout(this.reconnectTimer)
|
|
|
|
// Destroy peers
|
|
for (const peerId in this.peers) {
|
|
const peer = this.peers[peerId]
|
|
clearTimeout(peer.trackerTimeout)
|
|
peer.destroy()
|
|
}
|
|
this.peers = null
|
|
|
|
if (this.socket) {
|
|
this.socket.removeListener('connect', this._onSocketConnectBound)
|
|
this.socket.removeListener('data', this._onSocketDataBound)
|
|
this.socket.removeListener('close', this._onSocketCloseBound)
|
|
this.socket.removeListener('error', this._onSocketErrorBound)
|
|
this.socket = null
|
|
}
|
|
|
|
this._onSocketConnectBound = null
|
|
this._onSocketErrorBound = null
|
|
this._onSocketDataBound = null
|
|
this._onSocketCloseBound = null
|
|
|
|
if (socketPool[this.announceUrl]) {
|
|
socketPool[this.announceUrl].consumers -= 1
|
|
}
|
|
|
|
// Other instances are using the socket, so there's nothing left to do here
|
|
if (socketPool[this.announceUrl].consumers > 0) return cb()
|
|
|
|
let socket = socketPool[this.announceUrl]
|
|
delete socketPool[this.announceUrl]
|
|
socket.on('error', noop) // ignore all future errors
|
|
socket.once('close', cb)
|
|
|
|
let timeout
|
|
|
|
// If there is no data response expected, destroy immediately.
|
|
if (!this.expectingResponse) return destroyCleanup()
|
|
|
|
// Otherwise, wait a short time for potential responses to come in from the
|
|
// server, then force close the socket.
|
|
timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
|
|
|
|
// But, if a response comes from the server before the timeout fires, do cleanup
|
|
// right away.
|
|
socket.once('data', destroyCleanup)
|
|
|
|
function destroyCleanup () {
|
|
if (timeout) {
|
|
clearTimeout(timeout)
|
|
timeout = null
|
|
}
|
|
socket.removeListener('data', destroyCleanup)
|
|
socket.destroy()
|
|
socket = null
|
|
}
|
|
}
|
|
|
|
_openSocket () {
|
|
this.destroyed = false
|
|
|
|
if (!this.peers) this.peers = {}
|
|
|
|
this._onSocketConnectBound = () => {
|
|
this._onSocketConnect()
|
|
}
|
|
this._onSocketErrorBound = err => {
|
|
this._onSocketError(err)
|
|
}
|
|
this._onSocketDataBound = data => {
|
|
this._onSocketData(data)
|
|
}
|
|
this._onSocketCloseBound = () => {
|
|
this._onSocketClose()
|
|
}
|
|
|
|
this.socket = socketPool[this.announceUrl]
|
|
if (this.socket) {
|
|
socketPool[this.announceUrl].consumers += 1
|
|
if (this.socket.connected) {
|
|
this._onSocketConnectBound()
|
|
}
|
|
} else {
|
|
this.socket = socketPool[this.announceUrl] = new Socket(this.announceUrl)
|
|
this.socket.consumers = 1
|
|
this.socket.once('connect', this._onSocketConnectBound)
|
|
}
|
|
|
|
this.socket.on('data', this._onSocketDataBound)
|
|
this.socket.once('close', this._onSocketCloseBound)
|
|
this.socket.once('error', this._onSocketErrorBound)
|
|
}
|
|
|
|
_onSocketConnect () {
|
|
if (this.destroyed) return
|
|
|
|
if (this.reconnecting) {
|
|
this.reconnecting = false
|
|
this.retries = 0
|
|
this.announce(this.client._defaultAnnounceOpts())
|
|
}
|
|
}
|
|
|
|
_onSocketData (data) {
|
|
if (this.destroyed) return
|
|
|
|
this.expectingResponse = false
|
|
|
|
try {
|
|
data = JSON.parse(data)
|
|
} catch (err) {
|
|
this.client.emit('warning', new Error('Invalid tracker response'))
|
|
return
|
|
}
|
|
|
|
if (data.action === 'announce') {
|
|
this._onAnnounceResponse(data)
|
|
} else if (data.action === 'scrape') {
|
|
this._onScrapeResponse(data)
|
|
} else {
|
|
this._onSocketError(new Error(`invalid action in WS response: ${data.action}`))
|
|
}
|
|
}
|
|
|
|
_onAnnounceResponse (data) {
|
|
if (data.info_hash !== this.client._infoHashBinary) {
|
|
debug(
|
|
'ignoring websocket data from %s for %s (looking for %s: reused socket)',
|
|
this.announceUrl, common.binaryToHex(data.info_hash), this.client.infoHash
|
|
)
|
|
return
|
|
}
|
|
|
|
if (data.peer_id && data.peer_id === this.client._peerIdBinary) {
|
|
// ignore offers/answers from this client
|
|
return
|
|
}
|
|
|
|
debug(
|
|
'received %s from %s for %s',
|
|
JSON.stringify(data), this.announceUrl, this.client.infoHash
|
|
)
|
|
|
|
const failure = data['failure reason']
|
|
if (failure) return this.client.emit('warning', new Error(failure))
|
|
|
|
const warning = data['warning message']
|
|
if (warning) this.client.emit('warning', new Error(warning))
|
|
|
|
const interval = data.interval || data['min interval']
|
|
if (interval) this.setInterval(interval * 1000)
|
|
|
|
const trackerId = data['tracker id']
|
|
if (trackerId) {
|
|
// If absent, do not discard previous trackerId value
|
|
this._trackerId = trackerId
|
|
}
|
|
|
|
if (data.complete != null) {
|
|
const response = Object.assign({}, data, {
|
|
announce: this.announceUrl,
|
|
infoHash: common.binaryToHex(data.info_hash)
|
|
})
|
|
this.client.emit('update', response)
|
|
}
|
|
|
|
let peer
|
|
if (data.offer && data.peer_id) {
|
|
debug('creating peer (from remote offer)')
|
|
peer = this._createPeer()
|
|
peer.id = common.binaryToHex(data.peer_id)
|
|
peer.once('signal', answer => {
|
|
const params = {
|
|
action: 'announce',
|
|
info_hash: this.client._infoHashBinary,
|
|
peer_id: this.client._peerIdBinary,
|
|
to_peer_id: data.peer_id,
|
|
answer,
|
|
offer_id: data.offer_id
|
|
}
|
|
if (this._trackerId) params.trackerid = this._trackerId
|
|
this._send(params)
|
|
})
|
|
this.client.emit('peer', peer)
|
|
peer.signal(data.offer)
|
|
}
|
|
|
|
if (data.answer && data.peer_id) {
|
|
const offerId = common.binaryToHex(data.offer_id)
|
|
peer = this.peers[offerId]
|
|
if (peer) {
|
|
peer.id = common.binaryToHex(data.peer_id)
|
|
this.client.emit('peer', peer)
|
|
peer.signal(data.answer)
|
|
|
|
clearTimeout(peer.trackerTimeout)
|
|
peer.trackerTimeout = null
|
|
delete this.peers[offerId]
|
|
} else {
|
|
debug(`got unexpected answer: ${JSON.stringify(data.answer)}`)
|
|
}
|
|
}
|
|
}
|
|
|
|
_onScrapeResponse (data) {
|
|
data = data.files || {}
|
|
|
|
const keys = Object.keys(data)
|
|
if (keys.length === 0) {
|
|
this.client.emit('warning', new Error('invalid scrape response'))
|
|
return
|
|
}
|
|
|
|
keys.forEach(infoHash => {
|
|
// TODO: optionally handle data.flags.min_request_interval
|
|
// (separate from announce interval)
|
|
const response = Object.assign(data[infoHash], {
|
|
announce: this.announceUrl,
|
|
infoHash: common.binaryToHex(infoHash)
|
|
})
|
|
this.client.emit('scrape', response)
|
|
})
|
|
}
|
|
|
|
_onSocketClose () {
|
|
if (this.destroyed) return
|
|
this.destroy()
|
|
this._startReconnectTimer()
|
|
}
|
|
|
|
_onSocketError (err) {
|
|
if (this.destroyed) return
|
|
this.destroy()
|
|
// errors will often happen if a tracker is offline, so don't treat it as fatal
|
|
this.client.emit('warning', err)
|
|
this._startReconnectTimer()
|
|
}
|
|
|
|
_startReconnectTimer () {
|
|
const ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, this.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM)
|
|
|
|
this.reconnecting = true
|
|
clearTimeout(this.reconnectTimer)
|
|
this.reconnectTimer = setTimeout(() => {
|
|
this.retries++
|
|
this._openSocket()
|
|
}, ms)
|
|
if (this.reconnectTimer.unref) this.reconnectTimer.unref()
|
|
|
|
debug('reconnecting socket in %s ms', ms)
|
|
}
|
|
|
|
_send (params) {
|
|
if (this.destroyed) return
|
|
this.expectingResponse = true
|
|
const message = JSON.stringify(params)
|
|
debug('send %s', message)
|
|
this.socket.send(message)
|
|
}
|
|
|
|
_generateOffers (numwant, cb) {
|
|
const self = this
|
|
const offers = []
|
|
debug('generating %s offers', numwant)
|
|
|
|
for (let i = 0; i < numwant; ++i) {
|
|
generateOffer()
|
|
}
|
|
checkDone()
|
|
|
|
function generateOffer () {
|
|
const offerId = randombytes(20).toString('hex')
|
|
debug('creating peer (from _generateOffers)')
|
|
const peer = self.peers[offerId] = self._createPeer({ initiator: true })
|
|
peer.once('signal', offer => {
|
|
offers.push({
|
|
offer,
|
|
offer_id: common.hexToBinary(offerId)
|
|
})
|
|
checkDone()
|
|
})
|
|
peer.trackerTimeout = setTimeout(() => {
|
|
debug('tracker timeout: destroying peer')
|
|
peer.trackerTimeout = null
|
|
delete self.peers[offerId]
|
|
peer.destroy()
|
|
}, OFFER_TIMEOUT)
|
|
if (peer.trackerTimeout.unref) peer.trackerTimeout.unref()
|
|
}
|
|
|
|
function checkDone () {
|
|
if (offers.length === numwant) {
|
|
debug('generated %s offers', numwant)
|
|
cb(offers)
|
|
}
|
|
}
|
|
}
|
|
|
|
_createPeer (opts) {
|
|
const self = this
|
|
|
|
opts = Object.assign({
|
|
trickle: false,
|
|
config: self.client._rtcConfig,
|
|
wrtc: self.client._wrtc
|
|
}, opts)
|
|
|
|
const peer = new Peer(opts)
|
|
|
|
peer.once('error', onError)
|
|
peer.once('connect', onConnect)
|
|
|
|
return peer
|
|
|
|
// Handle peer 'error' events that are fired *before* the peer is emitted in
|
|
// a 'peer' event.
|
|
function onError (err) {
|
|
self.client.emit('warning', new Error(`Connection error: ${err.message}`))
|
|
peer.destroy()
|
|
}
|
|
|
|
// Once the peer is emitted in a 'peer' event, then it's the consumer's
|
|
// responsibility to listen for errors, so the listeners are removed here.
|
|
function onConnect () {
|
|
peer.removeListener('error', onError)
|
|
peer.removeListener('connect', onConnect)
|
|
}
|
|
}
|
|
}
|
|
|
|
WebSocketTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 1000 // 30 seconds
|
|
// Normally this shouldn't be accessed but is occasionally useful
|
|
WebSocketTracker._socketPool = socketPool
|
|
|
|
function noop () {}
|
|
|
|
module.exports = WebSocketTracker
|