bittorrent-tracker/lib/client/websocket-tracker.js

421 lines
11 KiB
JavaScript
Raw Normal View History

module.exports = WebSocketTracker
2015-03-29 07:39:18 +00:00
var debug = require('debug')('bittorrent-tracker:websocket-tracker')
2016-02-29 20:34:06 +00:00
var extend = require('xtend')
var inherits = require('inherits')
var Peer = require('simple-peer')
2016-08-21 02:37:36 +00:00
var randombytes = require('randombytes')
var Socket = require('simple-websocket')
2015-07-29 07:26:44 +00:00
var common = require('../common')
2015-07-29 08:47:09 +00:00
var Tracker = require('./tracker')
var RECONNECT_MINIMUM = 15 * 1000
2016-03-16 04:14:24 +00:00
var RECONNECT_MAXIMUM = 30 * 60 * 1000
var RECONNECT_VARIANCE = 30 * 1000
2015-12-05 11:06:46 +00:00
var OFFER_TIMEOUT = 50 * 1000
2015-07-29 08:47:09 +00:00
inherits(WebSocketTracker, Tracker)
function WebSocketTracker (client, announceUrl, opts) {
var self = this
2015-07-29 08:47:09 +00:00
Tracker.call(self, client, announceUrl)
debug('new websocket tracker %s', announceUrl)
2015-07-29 08:47:09 +00:00
self.peers = {} // peers (offer id -> peer)
self.socket = null
self.reconnecting = false
self.retries = 0
2016-03-17 00:58:47 +00:00
self.reconnectTimer = null
// Simple boolean flag to track whether the socket has received data from
// the websocket server since the last time socket.send() was called.
self.expectingResponse = false
self._openSocket()
}
WebSocketTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 1000 // 30 seconds
WebSocketTracker.prototype.announce = function (opts) {
var self = this
if (self.destroyed || self.reconnecting) return
2015-07-29 08:47:09 +00:00
if (!self.socket.connected) {
self.socket.once('connect', function () {
self.announce(opts)
})
return
2015-04-10 23:58:21 +00:00
}
var params = extend(opts, {
action: 'announce',
info_hash: self.client._infoHashBinary,
peer_id: self.client._peerIdBinary
})
if (self._trackerId) params.trackerid = self._trackerId
if (opts.event === 'stopped' || opts.event === 'completed') {
// Don't include offers with 'stopped' or 'completed' event
self._send(params)
} else {
// Limit the number of offers that are generated, since it can be slow
var numwant = Math.min(opts.numwant, 10)
self._generateOffers(numwant, function (offers) {
params.numwant = numwant
params.offers = offers
self._send(params)
})
}
}
WebSocketTracker.prototype.scrape = function (opts) {
var self = this
if (self.destroyed || self.reconnecting) return
if (!self.socket.connected) {
self.socket.once('connect', function () {
self.scrape(opts)
})
return
}
var infoHashes = (Array.isArray(opts.infoHash) && opts.infoHash.length > 0)
? opts.infoHash.map(function (infoHash) {
return infoHash.toString('binary')
})
: (opts.infoHash && opts.infoHash.toString('binary')) || self.client._infoHashBinary
var params = {
action: 'scrape',
info_hash: infoHashes
}
self._send(params)
}
2016-03-15 02:16:00 +00:00
WebSocketTracker.prototype.destroy = function (cb) {
var self = this
2016-03-15 02:16:00 +00:00
if (!cb) cb = noop
if (self.destroyed) return cb(null)
self.destroyed = true
2015-07-29 08:47:09 +00:00
clearInterval(self.interval)
2016-03-16 08:22:33 +00:00
clearTimeout(self.reconnectTimer)
// Destroy peers
for (var peerId in self.peers) {
var peer = self.peers[peerId]
clearTimeout(peer.trackerTimeout)
peer.destroy()
}
self.peers = null
2016-03-19 06:12:28 +00:00
if (self.socket) {
self.socket.removeListener('connect', self._onSocketConnectBound)
self.socket.removeListener('data', self._onSocketDataBound)
self.socket.removeListener('close', self._onSocketCloseBound)
self.socket.removeListener('error', self._onSocketErrorBound)
self.socket = null
2016-03-19 06:12:28 +00:00
}
self._onSocketConnectBound = null
self._onSocketErrorBound = null
self._onSocketDataBound = null
self._onSocketCloseBound = null
if (self.client._socketPool[self.announceUrl]) {
self.client._socketPool[self.announceUrl].consumers -= 1
2016-03-17 00:58:47 +00:00
}
// Other instances are using the socket, so there's nothing left to do here
if (self.client._socketPool[self.announceUrl].consumers > 0) return cb()
var socket = self.client._socketPool[self.announceUrl]
delete self.client._socketPool[self.announceUrl]
socket.on('error', noop) // ignore all future errors
socket.once('close', cb)
// If there is no data response expected, destroy immediately.
if (!self.expectingResponse) return destroyCleanup()
// Otherwise, wait a short time for potential responses to come in from the
// server, then force close the socket.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if a response comes from the server before the timeout fires, do cleanup
// right away.
socket.once('data', destroyCleanup)
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
socket.removeListener('data', destroyCleanup)
socket.destroy()
socket = null
}
}
WebSocketTracker.prototype._openSocket = function () {
2015-04-10 23:58:21 +00:00
var self = this
self.destroyed = false
if (!self.peers) self.peers = {}
self._onSocketConnectBound = function () {
self._onSocketConnect()
}
self._onSocketErrorBound = function (err) {
self._onSocketError(err)
}
self._onSocketDataBound = function (data) {
self._onSocketData(data)
}
self._onSocketCloseBound = function () {
self._onSocketClose()
}
self.socket = self.client._socketPool[self.announceUrl]
2016-03-17 00:58:47 +00:00
if (self.socket) {
self.client._socketPool[self.announceUrl].consumers += 1
2016-03-17 00:58:47 +00:00
} else {
self.socket = self.client._socketPool[self.announceUrl] = new Socket(self.announceUrl)
self.socket.consumers = 1
2016-09-16 14:49:29 +00:00
self.socket.once('connect', self._onSocketConnectBound)
}
2015-07-29 08:47:09 +00:00
self.socket.on('data', self._onSocketDataBound)
2016-09-16 14:49:29 +00:00
self.socket.once('close', self._onSocketCloseBound)
self.socket.once('error', self._onSocketErrorBound)
}
WebSocketTracker.prototype._onSocketConnect = function () {
var self = this
if (self.destroyed) return
if (self.reconnecting) {
self.reconnecting = false
self.retries = 0
self.announce(self.client._defaultAnnounceOpts())
}
}
2015-04-10 23:58:21 +00:00
WebSocketTracker.prototype._onSocketData = function (data) {
var self = this
if (self.destroyed) return
self.expectingResponse = false
2016-02-16 03:36:47 +00:00
try {
data = JSON.parse(data)
} catch (err) {
self.client.emit('warning', new Error('Invalid tracker response'))
return
}
if (data.action === 'announce') {
self._onAnnounceResponse(data)
2016-03-16 03:06:39 +00:00
} else if (data.action === 'scrape') {
self._onScrapeResponse(data)
} else {
2016-03-16 03:59:04 +00:00
self._onSocketError(new Error('invalid action in WS response: ' + data.action))
}
}
WebSocketTracker.prototype._onAnnounceResponse = function (data) {
var self = this
2015-05-20 13:45:59 +00:00
if (data.info_hash !== self.client._infoHashBinary) {
debug(
'ignoring websocket data from %s for %s (looking for %s: reused socket)',
self.announceUrl, common.binaryToHex(data.info_hash), self.client.infoHash
)
return
}
if (data.peer_id && data.peer_id === self.client._peerIdBinary) {
// ignore offers/answers from this client
return
}
debug(
'received %s from %s for %s',
JSON.stringify(data), self.announceUrl, self.client.infoHash
)
var failure = data['failure reason']
if (failure) return self.client.emit('warning', new Error(failure))
var warning = data['warning message']
if (warning) self.client.emit('warning', new Error(warning))
var interval = data.interval || data['min interval']
2015-07-29 08:47:09 +00:00
if (interval) self.setInterval(interval * 1000)
var trackerId = data['tracker id']
if (trackerId) {
// If absent, do not discard previous trackerId value
self._trackerId = trackerId
}
if (data.complete != null) {
self.client.emit('update', {
2015-07-29 08:47:09 +00:00
announce: self.announceUrl,
complete: data.complete,
incomplete: data.incomplete
})
}
var peer
if (data.offer && data.peer_id) {
debug('creating peer (from remote offer)')
2015-05-04 00:21:08 +00:00
peer = new Peer({
trickle: false,
config: self.client._rtcConfig,
wrtc: self.client._wrtc
})
peer.id = common.binaryToHex(data.peer_id)
peer.once('signal', function (answer) {
2015-03-29 07:41:25 +00:00
var params = {
action: 'announce',
2015-05-20 13:45:59 +00:00
info_hash: self.client._infoHashBinary,
peer_id: self.client._peerIdBinary,
to_peer_id: data.peer_id,
answer: answer,
offer_id: data.offer_id
}
2015-03-29 07:41:25 +00:00
if (self._trackerId) params.trackerid = self._trackerId
self._send(params)
})
peer.signal(data.offer)
self.client.emit('peer', peer)
}
if (data.answer && data.peer_id) {
var offerId = common.binaryToHex(data.offer_id)
peer = self.peers[offerId]
if (peer) {
peer.id = common.binaryToHex(data.peer_id)
peer.signal(data.answer)
self.client.emit('peer', peer)
clearTimeout(peer.trackerTimeout)
2015-12-05 11:06:54 +00:00
peer.trackerTimeout = null
delete self.peers[offerId]
} else {
debug('got unexpected answer: ' + JSON.stringify(data.answer))
}
}
}
WebSocketTracker.prototype._onScrapeResponse = function (data) {
var self = this
2016-03-16 03:06:39 +00:00
data = data.files || {}
var keys = Object.keys(data)
if (keys.length === 0) {
self.client.emit('warning', new Error('invalid scrape response'))
return
}
keys.forEach(function (infoHash) {
var response = data[infoHash]
// TODO: optionally handle data.flags.min_request_interval
// (separate from announce interval)
self.client.emit('scrape', {
announce: self.announceUrl,
infoHash: common.binaryToHex(infoHash),
complete: response.complete,
incomplete: response.incomplete,
downloaded: response.downloaded
})
})
}
WebSocketTracker.prototype._onSocketClose = function () {
var self = this
if (self.destroyed) return
self.destroy()
self._startReconnectTimer()
}
WebSocketTracker.prototype._onSocketError = function (err) {
var self = this
if (self.destroyed) return
self.destroy()
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', err)
self._startReconnectTimer()
}
WebSocketTracker.prototype._startReconnectTimer = function () {
var self = this
2016-03-16 04:14:24 +00:00
var ms = Math.floor(Math.random() * RECONNECT_VARIANCE) + Math.min(Math.pow(2, self.retries) * RECONNECT_MINIMUM, RECONNECT_MAXIMUM)
2015-07-17 01:33:54 +00:00
self.reconnecting = true
clearTimeout(self.reconnectTimer)
2016-03-16 08:22:33 +00:00
self.reconnectTimer = setTimeout(function () {
self.retries++
self._openSocket()
}, ms)
2016-03-16 08:22:33 +00:00
if (self.reconnectTimer.unref) self.reconnectTimer.unref()
2015-07-17 01:33:54 +00:00
debug('reconnecting socket in %s ms', ms)
}
2015-03-29 07:41:25 +00:00
WebSocketTracker.prototype._send = function (params) {
var self = this
if (self.destroyed) return
self.expectingResponse = true
2015-05-06 06:05:10 +00:00
var message = JSON.stringify(params)
debug('send %s', message)
2015-07-29 08:47:09 +00:00
self.socket.send(message)
}
WebSocketTracker.prototype._generateOffers = function (numwant, cb) {
var self = this
var offers = []
debug('generating %s offers', numwant)
for (var i = 0; i < numwant; ++i) {
generateOffer()
}
checkDone()
function generateOffer () {
2016-08-21 02:37:36 +00:00
var offerId = randombytes(20).toString('hex')
debug('creating peer (from _generateOffers)')
2015-07-29 08:47:09 +00:00
var peer = self.peers[offerId] = new Peer({
initiator: true,
trickle: false,
2015-05-04 00:21:08 +00:00
config: self.client._rtcConfig,
wrtc: self.client._wrtc
})
peer.once('signal', function (offer) {
offers.push({
offer: offer,
offer_id: common.hexToBinary(offerId)
})
checkDone()
})
peer.trackerTimeout = setTimeout(function () {
2015-12-05 11:06:54 +00:00
debug('tracker timeout: destroying peer')
peer.trackerTimeout = null
delete self.peers[offerId]
2015-12-05 11:06:54 +00:00
peer.destroy()
}, OFFER_TIMEOUT)
if (peer.trackerTimeout.unref) peer.trackerTimeout.unref()
}
function checkDone () {
if (offers.length === numwant) {
debug('generated %s offers', numwant)
cb(offers)
}
}
}
function noop () {}