Merge pull request #198 from feross/fix-196

Fix stats and leaked websockets
This commit is contained in:
Feross Aboukhadijeh 2017-02-09 23:16:28 +01:00 committed by GitHub
commit a469740603
8 changed files with 256 additions and 96 deletions

View File

@ -69,6 +69,11 @@ function Client (opts) {
// See: https://github.com/feross/webtorrent-hybrid/issues/46 // See: https://github.com/feross/webtorrent-hybrid/issues/46
self._wrtc = typeof opts.wrtc === 'function' ? opts.wrtc() : opts.wrtc self._wrtc = typeof opts.wrtc === 'function' ? opts.wrtc() : opts.wrtc
// Use a socket pool, so WebSocket tracker clients share WebSocket objects for
// the same server. In practice, WebSockets are pretty slow to establish, so this
// gives a nice performance boost, and saves browser resources.
self._socketPool = {}
var announce = typeof opts.announce === 'string' var announce = typeof opts.announce === 'string'
? [ opts.announce ] ? [ opts.announce ]
: opts.announce == null ? [] : opts.announce : opts.announce == null ? [] : opts.announce

View File

@ -10,11 +10,6 @@ var Socket = require('simple-websocket')
var common = require('../common') var common = require('../common')
var Tracker = require('./tracker') var Tracker = require('./tracker')
// Use a socket pool, so tracker clients share WebSocket objects for the same server.
// In practice, WebSockets are pretty slow to establish, so this gives a nice performance
// boost, and saves browser resources.
var socketPool = {}
var RECONNECT_MINIMUM = 15 * 1000 var RECONNECT_MINIMUM = 15 * 1000
var RECONNECT_MAXIMUM = 30 * 60 * 1000 var RECONNECT_MAXIMUM = 30 * 60 * 1000
var RECONNECT_VARIANCE = 30 * 1000 var RECONNECT_VARIANCE = 30 * 1000
@ -129,15 +124,15 @@ WebSocketTracker.prototype.destroy = function (cb) {
self._onSocketDataBound = null self._onSocketDataBound = null
self._onSocketCloseBound = null self._onSocketCloseBound = null
if (socketPool[self.announceUrl]) { if (self.client._socketPool[self.announceUrl]) {
socketPool[self.announceUrl].consumers -= 1 self.client._socketPool[self.announceUrl].consumers -= 1
} }
// Other instances are using the socket, so there's nothing left to do here // Other instances are using the socket, so there's nothing left to do here
if (socketPool[self.announceUrl].consumers > 0) return cb() if (self.client._socketPool[self.announceUrl].consumers > 0) return cb()
var socket = socketPool[self.announceUrl] var socket = self.client._socketPool[self.announceUrl]
delete socketPool[self.announceUrl] delete self.client._socketPool[self.announceUrl]
socket.on('error', noop) // ignore all future errors socket.on('error', noop) // ignore all future errors
socket.once('close', cb) socket.once('close', cb)
@ -182,11 +177,11 @@ WebSocketTracker.prototype._openSocket = function () {
self._onSocketClose() self._onSocketClose()
} }
self.socket = socketPool[self.announceUrl] self.socket = self.client._socketPool[self.announceUrl]
if (self.socket) { if (self.socket) {
socketPool[self.announceUrl].consumers += 1 self.client._socketPool[self.announceUrl].consumers += 1
} else { } else {
self.socket = socketPool[self.announceUrl] = new Socket(self.announceUrl) self.socket = self.client._socketPool[self.announceUrl] = new Socket(self.announceUrl)
self.socket.consumers = 1 self.socket.consumers = 1
self.socket.once('connect', self._onSocketConnectBound) self.socket.once('connect', self._onSocketConnectBound)
} }

View File

@ -25,7 +25,7 @@ function parseHttpRequest (req, opts) {
if (!params.port) throw new Error('invalid port') if (!params.port) throw new Error('invalid port')
params.left = Number(params.left) params.left = Number(params.left)
if (isNaN(params.left)) params.left = Infinity if (Number.isNaN(params.left)) params.left = Infinity
params.compact = Number(params.compact) || 0 params.compact = Number(params.compact) || 0
params.numwant = Math.min( params.numwant = Math.min(

View File

@ -28,7 +28,9 @@ function parseWebSocketRequest (socket, opts, params) {
params.to_peer_id = common.binaryToHex(params.to_peer_id) params.to_peer_id = common.binaryToHex(params.to_peer_id)
} }
params.left = Number(params.left) || Infinity params.left = Number(params.left)
if (Number.isNaN(params.left)) params.left = Infinity
params.numwant = Math.min( params.numwant = Math.min(
Number(params.offers && params.offers.length) || 0, // no default - explicit only Number(params.offers && params.offers.length) || 0, // no default - explicit only
common.MAX_ANNOUNCE_PEERS common.MAX_ANNOUNCE_PEERS

View File

@ -7,12 +7,37 @@ var randomIterate = require('random-iterate')
// Regard this as the default implementation of an interface that you // Regard this as the default implementation of an interface that you
// need to support when overriding Server.createSwarm() and Server.getSwarm() // need to support when overriding Server.createSwarm() and Server.getSwarm()
function Swarm (infoHash, server) { function Swarm (infoHash, server) {
this.peers = new LRU({ var self = this
self.infoHash = infoHash
self.complete = 0
self.incomplete = 0
self.peers = new LRU({
max: server.peersCacheLength || 1000, max: server.peersCacheLength || 1000,
maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes maxAge: server.peersCacheTtl || 20 * 60 * 1000 // 20 minutes
})
// When a peer is evicted from the LRU store, send a synthetic 'stopped' event
// so the stats get updated correctly.
self.peers.on('evict', function (data) {
var peer = data.value
var params = {
type: peer.type,
event: 'stopped',
numwant: 0,
peer_id: peer.peerId
}
self._onAnnounceStopped(params, peer, peer.peerId)
// When a websocket peer is evicted, and it's not in any other swarms, close
// the websocket to conserve server resources.
if (peer.socket && peer.socket.infoHashes.length === 0) {
try {
peer.socket.close()
peer.socket = null
} catch (err) {}
}
}) })
this.complete = 0
this.incomplete = 0
} }
Swarm.prototype.announce = function (params, cb) { Swarm.prototype.announce = function (params, cb) {
@ -73,6 +98,14 @@ Swarm.prototype._onAnnounceStopped = function (params, peer, id) {
if (peer.complete) this.complete -= 1 if (peer.complete) this.complete -= 1
else this.incomplete -= 1 else this.incomplete -= 1
// If it's a websocket, remove this swarm's infohash from the list of active
// swarms that this peer is participating in.
if (peer.socket) {
var index = peer.socket.infoHashes.indexOf(this.infoHash)
peer.socket.infoHashes.splice(index, 1)
}
this.peers.remove(id) this.peers.remove(id)
} }
@ -82,8 +115,8 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer, id) {
return this._onAnnounceStarted(params, peer, id) // treat as a start return this._onAnnounceStarted(params, peer, id) // treat as a start
} }
if (peer.complete) { if (peer.complete) {
debug('unexpected `completed` event from peer that is already marked as completed') debug('unexpected `completed` event from peer that is already completed')
return // do nothing return this._onAnnounceUpdate(params, peer, id) // treat as an update
} }
this.complete += 1 this.complete += 1
@ -102,8 +135,8 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer, id) {
this.complete += 1 this.complete += 1
this.incomplete -= 1 this.incomplete -= 1
peer.complete = true peer.complete = true
this.peers.set(id, peer)
} }
this.peers.set(id, peer)
} }
Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) {

106
server.js
View File

@ -579,6 +579,59 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) {
}) })
} }
Server.prototype._onWebSocketSend = function (socket, err) {
var self = this
if (err) self._onWebSocketError(socket, err)
}
Server.prototype._onWebSocketClose = function (socket) {
var self = this
debug('websocket close %s', socket.peerId)
if (socket.peerId) {
socket.infoHashes.slice(0).forEach(function (infoHash) {
var swarm = self.torrents[infoHash]
if (swarm) {
swarm.announce({
type: 'ws',
event: 'stopped',
numwant: 0,
peer_id: socket.peerId
}, noop)
}
})
}
// ignore all future errors
socket.onSend = noop
socket.on('error', noop)
socket.peerId = null
socket.infoHashes = null
if (typeof socket.onMessageBound === 'function') {
socket.removeListener('message', socket.onMessageBound)
}
socket.onMessageBound = null
if (typeof socket.onErrorBound === 'function') {
socket.removeListener('error', socket.onErrorBound)
}
socket.onErrorBound = null
if (typeof socket.onCloseBound === 'function') {
socket.removeListener('close', socket.onCloseBound)
}
socket.onCloseBound = null
}
Server.prototype._onWebSocketError = function (socket, err) {
var self = this
debug('websocket error %s', err.message || err)
self.emit('warning', err)
self._onWebSocketClose(socket)
}
Server.prototype._onRequest = function (params, cb) { Server.prototype._onRequest = function (params, cb) {
var self = this var self = this
if (params && params.action === common.ACTIONS.CONNECT) { if (params && params.action === common.ACTIONS.CONNECT) {
@ -761,59 +814,6 @@ function makeUdpPacket (params) {
return packet return packet
} }
Server.prototype._onWebSocketSend = function (socket, err) {
var self = this
if (err) self._onWebSocketError(socket, err)
}
Server.prototype._onWebSocketClose = function (socket) {
var self = this
debug('websocket close %s', socket.peerId)
if (socket.peerId) {
socket.infoHashes.forEach(function (infoHash) {
var swarm = self.torrents[infoHash]
if (swarm) {
swarm.announce({
type: 'ws',
event: 'stopped',
numwant: 0,
peer_id: socket.peerId
}, noop)
}
})
}
// ignore all future errors
socket.onSend = noop
socket.on('error', noop)
socket.peerId = null
socket.infoHashes = null
if (typeof socket.onMessageBound === 'function') {
socket.removeListener('message', socket.onMessageBound)
}
socket.onMessageBound = null
if (typeof socket.onErrorBound === 'function') {
socket.removeListener('error', socket.onErrorBound)
}
socket.onErrorBound = null
if (typeof socket.onCloseBound === 'function') {
socket.removeListener('close', socket.onCloseBound)
}
socket.onCloseBound = null
}
Server.prototype._onWebSocketError = function (socket, err) {
var self = this
debug('websocket error %s', err.message || err)
self.emit('warning', err)
self._onWebSocketClose(socket)
}
function toNumber (x) { function toNumber (x) {
x = Number(x) x = Number(x)
return x >= 0 ? x : false return x >= 0 ? x : false

126
test/evict.js Normal file
View File

@ -0,0 +1,126 @@
var Buffer = require('safe-buffer').Buffer
var Client = require('../')
var common = require('./common')
var test = require('tape')
var electronWebrtc = require('electron-webrtc')
var wrtc
var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705'
var peerId = Buffer.from('01234567890123456789')
var peerId2 = Buffer.from('12345678901234567890')
var peerId3 = Buffer.from('23456789012345678901')
function serverTest (t, serverType, serverFamily) {
t.plan(10)
var hostname = serverFamily === 'inet6'
? '[::1]'
: '127.0.0.1'
var opts = {
serverType: serverType,
peersCacheLength: 2 // LRU cache can only contain a max of 2 peers
}
common.createServer(t, opts, function (server) {
// Not using announceUrl param from `common.createServer()` since we
// want to control IPv4 vs IPv6.
var port = server[serverType].address().port
var announceUrl = serverType + '://' + hostname + ':' + port + '/announce'
var client1 = new Client({
infoHash: infoHash,
announce: [ announceUrl ],
peerId: peerId,
port: 6881,
wrtc: wrtc
})
if (serverType === 'ws') common.mockWebsocketTracker(client1)
client1.start()
client1.once('update', function (data) {
var client2 = new Client({
infoHash: infoHash,
announce: [ announceUrl ],
peerId: peerId2,
port: 6882,
wrtc: wrtc
})
if (serverType === 'ws') common.mockWebsocketTracker(client2)
client2.start()
client2.once('update', function (data) {
server.getSwarm(infoHash, function (err, swarm) {
t.error(err)
t.equal(swarm.complete + swarm.incomplete, 2)
// Ensure that first peer is evicted when a third one is added
var evicted = false
swarm.peers.once('evict', function (evictedPeer) {
t.equal(evictedPeer.value.peerId, peerId.toString('hex'))
t.equal(swarm.complete + swarm.incomplete, 2)
evicted = true
})
var client3 = new Client({
infoHash: infoHash,
announce: [ announceUrl ],
peerId: peerId3,
port: 6880,
wrtc: wrtc
})
if (serverType === 'ws') common.mockWebsocketTracker(client3)
client3.start()
client3.once('update', function (data) {
t.ok(evicted, 'client1 was evicted from server before client3 gets response')
t.equal(swarm.complete + swarm.incomplete, 2)
client1.destroy(function () {
t.pass('client1 destroyed')
})
client2.destroy(function () {
t.pass('client3 destroyed')
})
client3.destroy(function () {
t.pass('client3 destroyed')
})
server.close(function () {
t.pass('server destroyed')
})
})
})
})
})
})
}
test('evict: ipv4 server', function (t) {
serverTest(t, 'http', 'inet')
})
test('evict: http ipv6 server', function (t) {
serverTest(t, 'http', 'inet6')
})
test('evict: udp server', function (t) {
serverTest(t, 'udp', 'inet')
})
test('evict: ws server', function (t) {
wrtc = electronWebrtc()
wrtc.electronDaemon.once('ready', function () {
serverTest(t, 'ws', 'inet')
})
t.once('end', function () {
wrtc.close()
})
})

View File

@ -2,12 +2,9 @@ var Buffer = require('safe-buffer').Buffer
var Client = require('../') var Client = require('../')
var common = require('./common') var common = require('./common')
var test = require('tape') var test = require('tape')
var wrtc = require('electron-webrtc')() var electronWebrtc = require('electron-webrtc')
var wrtcReady = false var wrtc
wrtc.electronDaemon.once('ready', function () {
wrtcReady = true
})
var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705'
var peerId = Buffer.from('01234567890123456789') var peerId = Buffer.from('01234567890123456789')
@ -15,7 +12,7 @@ var peerId2 = Buffer.from('12345678901234567890')
var peerId3 = Buffer.from('23456789012345678901') var peerId3 = Buffer.from('23456789012345678901')
function serverTest (t, serverType, serverFamily) { function serverTest (t, serverType, serverFamily) {
t.plan(36) t.plan(40)
var hostname = serverFamily === 'inet6' var hostname = serverFamily === 'inet6'
? '[::1]' ? '[::1]'
@ -25,8 +22,7 @@ function serverTest (t, serverType, serverFamily) {
: '127.0.0.1' : '127.0.0.1'
var opts = { var opts = {
serverType: serverType, serverType: serverType
peersCacheLength: 2
} }
common.createServer(t, opts, function (server) { common.createServer(t, opts, function (server) {
@ -142,16 +138,23 @@ function serverTest (t, serverType, serverFamily) {
t.equal(data.incomplete, 1) t.equal(data.incomplete, 1)
client2.destroy(function () { client2.destroy(function () {
t.pass('client2 destroyed')
client3.stop() client3.stop()
client3.once('update', function (data) { client3.once('update', function (data) {
t.equal(data.announce, announceUrl) t.equal(data.announce, announceUrl)
t.equal(data.complete, 1) t.equal(data.complete, 1)
t.equal(data.incomplete, 0) t.equal(data.incomplete, 0)
client1.destroy(function () {
t.pass('client1 destroyed')
})
client3.destroy(function () { client3.destroy(function () {
client1.destroy(function () { t.pass('client3 destroyed')
server.close() })
})
server.close(function () {
t.pass('server destroyed')
}) })
}) })
}) })
@ -178,15 +181,11 @@ test('udp server', function (t) {
}) })
test('ws server', function (t) { test('ws server', function (t) {
if (wrtcReady) { wrtc = electronWebrtc()
runTest() wrtc.electronDaemon.once('ready', function () {
} else {
wrtc.electronDaemon.once('ready', runTest)
}
function runTest () {
t.once('end', function () {
wrtc.close()
})
serverTest(t, 'ws', 'inet') serverTest(t, 'ws', 'inet')
} })
t.once('end', function () {
wrtc.close()
})
}) })