Merge pull request #160 from yciabaud/lru-pruning

Limit peers in tracker server with LRU based cache fixes #4
This commit is contained in:
Yoann Ciabaud 2016-07-19 19:50:25 +02:00 committed by GitHub
commit f76b27c12a
4 changed files with 84 additions and 44 deletions

View File

@ -1,12 +1,16 @@
module.exports = Swarm module.exports = Swarm
var debug = require('debug')('bittorrent-tracker') var debug = require('debug')('bittorrent-tracker')
var LRU = require('lru')
var randomIterate = require('random-iterate') var randomIterate = require('random-iterate')
// Regard this as the default implementation of an interface that you // Regard this as the default implementation of an interface that you
// need to support when overriding Server.createSwarm() and Server.getSwarm() // need to support when overriding Server.createSwarm() and Server.getSwarm()
function Swarm (infoHash, server) { function Swarm (infoHash, server) {
this.peers = {} this.peers = new LRU({
max: server.peersCacheLength || 1000,
maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes
})
this.complete = 0 this.complete = 0
this.incomplete = 0 this.incomplete = 0
} }
@ -14,16 +18,17 @@ function Swarm (infoHash, server) {
Swarm.prototype.announce = function (params, cb) { Swarm.prototype.announce = function (params, cb) {
var self = this var self = this
var id = params.type === 'ws' ? params.peer_id : params.addr var id = params.type === 'ws' ? params.peer_id : params.addr
var peer = self.peers[id] // Mark the source peer as recently used in cache
var peer = self.peers.get(id)
if (params.event === 'started') { if (params.event === 'started') {
self._onAnnounceStarted(params, peer) self._onAnnounceStarted(params, peer, id)
} else if (params.event === 'stopped') { } else if (params.event === 'stopped') {
self._onAnnounceStopped(params, peer) self._onAnnounceStopped(params, peer, id)
} else if (params.event === 'completed') { } else if (params.event === 'completed') {
self._onAnnounceCompleted(params, peer) self._onAnnounceCompleted(params, peer, id)
} else if (params.event === 'update') { } else if (params.event === 'update') {
self._onAnnounceUpdate(params, peer) self._onAnnounceUpdate(params, peer, id)
} else { } else {
cb(new Error('invalid event')) cb(new Error('invalid event'))
return return
@ -42,7 +47,7 @@ Swarm.prototype.scrape = function (params, cb) {
}) })
} }
Swarm.prototype._onAnnounceStarted = function (params, peer) { Swarm.prototype._onAnnounceStarted = function (params, peer, id) {
if (peer) { if (peer) {
debug('unexpected `started` event from peer that is already in swarm') debug('unexpected `started` event from peer that is already in swarm')
return this._onAnnounceUpdate(params, peer) // treat as an update return this._onAnnounceUpdate(params, peer) // treat as an update
@ -50,18 +55,17 @@ Swarm.prototype._onAnnounceStarted = function (params, peer) {
if (params.left === 0) this.complete += 1 if (params.left === 0) this.complete += 1
else this.incomplete += 1 else this.incomplete += 1
var id = params.type === 'ws' ? params.peer_id : params.addr peer = this.peers.set(id, {
peer = this.peers[id] = {
type: params.type, type: params.type,
complete: params.left === 0, complete: params.left === 0,
peerId: params.peer_id, // as hex peerId: params.peer_id, // as hex
ip: params.ip, ip: params.ip,
port: params.port, port: params.port,
socket: params.socket // only websocket socket: params.socket // only websocket
} })
} }
Swarm.prototype._onAnnounceStopped = function (params, peer) { Swarm.prototype._onAnnounceStopped = function (params, peer, id) {
if (!peer) { if (!peer) {
debug('unexpected `stopped` event from peer that is not in swarm') debug('unexpected `stopped` event from peer that is not in swarm')
return // do nothing return // do nothing
@ -69,11 +73,10 @@ Swarm.prototype._onAnnounceStopped = function (params, peer) {
if (peer.complete) this.complete -= 1 if (peer.complete) this.complete -= 1
else this.incomplete -= 1 else this.incomplete -= 1
var id = params.type === 'ws' ? params.peer_id : params.addr this.peers.remove(id)
delete this.peers[id]
} }
Swarm.prototype._onAnnounceCompleted = function (params, peer) { Swarm.prototype._onAnnounceCompleted = function (params, peer, id) {
if (!peer) { if (!peer) {
debug('unexpected `completed` event from peer that is not in swarm') debug('unexpected `completed` event from peer that is not in swarm')
return this._onAnnounceStarted(params, peer) // treat as a start return this._onAnnounceStarted(params, peer) // treat as a start
@ -86,9 +89,10 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer) {
this.complete += 1 this.complete += 1
this.incomplete -= 1 this.incomplete -= 1
peer.complete = true peer.complete = true
this.peers.set(id, peer)
} }
Swarm.prototype._onAnnounceUpdate = function (params, peer) { Swarm.prototype._onAnnounceUpdate = function (params, peer, id) {
if (!peer) { if (!peer) {
debug('unexpected `update` event from peer that is not in swarm') debug('unexpected `update` event from peer that is not in swarm')
return this._onAnnounceStarted(params, peer) // treat as a start return this._onAnnounceStarted(params, peer) // treat as a start
@ -98,15 +102,18 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) {
this.complete += 1 this.complete += 1
this.incomplete -= 1 this.incomplete -= 1
peer.complete = true peer.complete = true
this.peers.set(id, peer)
} }
} }
Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) {
var peers = [] var peers = []
var ite = randomIterate(Object.keys(this.peers)) var ite = randomIterate(this.peers.keys)
var peerId var peerId
while ((peerId = ite()) && peers.length < numwant) { while ((peerId = ite()) && peers.length < numwant) {
var peer = this.peers[peerId] // Don't mark the peer as most recently used on announce
var peer = this.peers.peek(peerId)
if (!peer) continue
if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself
if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type
peers.push(peer) peers.push(peer)

View File

@ -28,6 +28,7 @@
"hat": "0.0.3", "hat": "0.0.3",
"inherits": "^2.0.1", "inherits": "^2.0.1",
"ip": "^1.0.1", "ip": "^1.0.1",
"lru": "^3.0.0",
"minimist": "^1.1.1", "minimist": "^1.1.1",
"once": "^1.3.0", "once": "^1.3.0",
"random-iterate": "^1.0.1", "random-iterate": "^1.0.1",

View File

@ -51,6 +51,9 @@ function Server (opts) {
self._trustProxy = !!opts.trustProxy self._trustProxy = !!opts.trustProxy
if (typeof opts.filter === 'function') self._filter = opts.filter if (typeof opts.filter === 'function') self._filter = opts.filter
self.peersCacheLength = opts.peersCacheLength
self.peersCacheTtl = opts.peersCacheTtl
self._listenCalled = false self._listenCalled = false
self.listening = false self.listening = false
self.destroyed = false self.destroyed = false
@ -191,7 +194,7 @@ function Server (opts) {
if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) {
infoHashes.forEach(function (infoHash) { infoHashes.forEach(function (infoHash) {
var peers = self.torrents[infoHash].peers var peers = self.torrents[infoHash].peers
var keys = Object.keys(peers) var keys = peers.keys
if (keys.length > 0) activeTorrents++ if (keys.length > 0) activeTorrents++
keys.forEach(function (peerId) { keys.forEach(function (peerId) {
@ -203,7 +206,8 @@ function Server (opts) {
leecher: false leecher: false
} }
} }
var peer = peers[peerId] // Don't mark the peer as most recently used for stats
var peer = peers.peek(peerId)
if (peer.ip.indexOf(':') >= 0) { if (peer.ip.indexOf(':') >= 0) {
allPeers[peerId].ipv6 = true allPeers[peerId].ipv6 = true
} else { } else {
@ -533,7 +537,8 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) {
if (!swarm) { if (!swarm) {
return self.emit('warning', new Error('no swarm with that `info_hash`')) return self.emit('warning', new Error('no swarm with that `info_hash`'))
} }
var toPeer = swarm.peers[params.to_peer_id] // Mark the destination peer as recently used in cache
var toPeer = swarm.peers.get(params.to_peer_id)
if (!toPeer) { if (!toPeer) {
return self.emit('warning', new Error('no peer with that `to_peer_id`')) return self.emit('warning', new Error('no peer with that `to_peer_id`'))
} }

View File

@ -12,9 +12,10 @@ var test = require('tape')
var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705'
var peerId = Buffer.from('01234567890123456789') var peerId = Buffer.from('01234567890123456789')
var peerId2 = Buffer.from('12345678901234567890') var peerId2 = Buffer.from('12345678901234567890')
var peerId3 = Buffer.from('23456789012345678901')
function serverTest (t, serverType, serverFamily) { function serverTest (t, serverType, serverFamily) {
t.plan(30) t.plan(32)
var hostname = serverFamily === 'inet6' var hostname = serverFamily === 'inet6'
? '[::1]' ? '[::1]'
@ -23,7 +24,12 @@ function serverTest (t, serverType, serverFamily) {
? '::1' ? '::1'
: '127.0.0.1' : '127.0.0.1'
common.createServer(t, serverType, function (server) { var opts = {
serverType: serverType,
peersCacheLength: 2
}
common.createServer(t, opts, function (server) {
var port = server[serverType].address().port var port = server[serverType].address().port
var announceUrl = serverType + '://' + hostname + ':' + port + '/announce' var announceUrl = serverType + '://' + hostname + ':' + port + '/announce'
@ -52,22 +58,23 @@ function serverTest (t, serverType, serverFamily) {
t.equal(Object.keys(server.torrents).length, 1) t.equal(Object.keys(server.torrents).length, 1)
t.equal(swarm.complete, 0) t.equal(swarm.complete, 0)
t.equal(swarm.incomplete, 1) t.equal(swarm.incomplete, 1)
t.equal(Object.keys(swarm.peers).length, 1) t.equal(swarm.peers.length, 1)
var id = serverType === 'ws' var id = serverType === 'ws'
? peerId.toString('hex') ? peerId.toString('hex')
: hostname + ':6881' : hostname + ':6881'
t.equal(swarm.peers[id].type, serverType) var peer = swarm.peers.peek(id)
t.equal(swarm.peers[id].ip, clientIp) t.equal(peer.type, serverType)
t.equal(swarm.peers[id].peerId, peerId.toString('hex')) t.equal(peer.ip, clientIp)
t.equal(swarm.peers[id].complete, false) t.equal(peer.peerId, peerId.toString('hex'))
t.equal(peer.complete, false)
if (serverType === 'ws') { if (serverType === 'ws') {
t.equal(typeof swarm.peers[id].port, 'number') t.equal(typeof peer.port, 'number')
t.ok(swarm.peers[id].socket) t.ok(peer.socket)
} else { } else {
t.equal(swarm.peers[id].port, 6881) t.equal(peer.port, 6881)
t.notOk(swarm.peers[id].socket) t.notOk(peer.socket)
} }
client1.complete() client1.complete()
@ -102,22 +109,42 @@ function serverTest (t, serverType, serverFamily) {
client2.once('peer', function (addr) { client2.once('peer', function (addr) {
t.ok(addr === hostname + ':6881' || addr === hostname + ':6882' || addr.id === peerId.toString('hex')) t.ok(addr === hostname + ':6881' || addr === hostname + ':6882' || addr.id === peerId.toString('hex'))
client2.stop() swarm.peers.once('evict', function (evicted) {
client2.once('update', function (data) { t.equals(evicted.value.peerId, peerId.toString('hex'))
t.equal(data.announce, announceUrl) })
t.equal(data.complete, 1) var client3 = new Client({
t.equal(data.incomplete, 0) infoHash: infoHash,
client2.destroy() announce: [ announceUrl ],
peerId: peerId3,
port: 6880
// wrtc: wrtc
})
client3.start()
client1.stop() server.once('start', function () {
client1.once('update', function (data) { t.pass('got start message from client3')
})
client3.once('update', function () {
client2.stop()
client2.once('update', function (data) {
t.equal(data.announce, announceUrl) t.equal(data.announce, announceUrl)
t.equal(data.complete, 0) t.equal(data.complete, 1)
t.equal(data.incomplete, 0) t.equal(data.incomplete, 1)
client2.destroy()
client1.destroy(function () { client3.stop()
server.close() client3.once('update', function (data) {
// if (serverType === 'ws') wrtc.close() t.equal(data.announce, announceUrl)
t.equal(data.complete, 1)
t.equal(data.incomplete, 0)
client3.destroy(function () {
client1.destroy(function () {
server.close()
})
// if (serverType === 'ws') wrtc.close()
})
}) })
}) })
}) })