diff --git a/lib/server/swarm.js b/lib/server/swarm.js index 1dad058..d0b3277 100644 --- a/lib/server/swarm.js +++ b/lib/server/swarm.js @@ -1,12 +1,16 @@ module.exports = Swarm var debug = require('debug')('bittorrent-tracker') +var LRU = require('lru') var randomIterate = require('random-iterate') // Regard this as the default implementation of an interface that you // need to support when overriding Server.createSwarm() and Server.getSwarm() function Swarm (infoHash, server) { - this.peers = {} + this.peers = new LRU({ + max: server.peersCacheLength || 1000, + maxAge: server.peersCacheTtl || 900000 // 900 000ms = 15 minutes + }) this.complete = 0 this.incomplete = 0 } @@ -14,16 +18,17 @@ function Swarm (infoHash, server) { Swarm.prototype.announce = function (params, cb) { var self = this var id = params.type === 'ws' ? params.peer_id : params.addr - var peer = self.peers[id] + // Mark the source peer as recently used in cache + var peer = self.peers.get(id) if (params.event === 'started') { - self._onAnnounceStarted(params, peer) + self._onAnnounceStarted(params, peer, id) } else if (params.event === 'stopped') { - self._onAnnounceStopped(params, peer) + self._onAnnounceStopped(params, peer, id) } else if (params.event === 'completed') { - self._onAnnounceCompleted(params, peer) + self._onAnnounceCompleted(params, peer, id) } else if (params.event === 'update') { - self._onAnnounceUpdate(params, peer) + self._onAnnounceUpdate(params, peer, id) } else { cb(new Error('invalid event')) return @@ -42,7 +47,7 @@ Swarm.prototype.scrape = function (params, cb) { }) } -Swarm.prototype._onAnnounceStarted = function (params, peer) { +Swarm.prototype._onAnnounceStarted = function (params, peer, id) { if (peer) { debug('unexpected `started` event from peer that is already in swarm') return this._onAnnounceUpdate(params, peer) // treat as an update @@ -50,18 +55,17 @@ Swarm.prototype._onAnnounceStarted = function (params, peer) { if (params.left === 0) this.complete += 1 else this.incomplete += 1 - var id = params.type === 'ws' ? params.peer_id : params.addr - peer = this.peers[id] = { + peer = this.peers.set(id, { type: params.type, complete: params.left === 0, peerId: params.peer_id, // as hex ip: params.ip, port: params.port, socket: params.socket // only websocket - } + }) } -Swarm.prototype._onAnnounceStopped = function (params, peer) { +Swarm.prototype._onAnnounceStopped = function (params, peer, id) { if (!peer) { debug('unexpected `stopped` event from peer that is not in swarm') return // do nothing @@ -69,11 +73,10 @@ Swarm.prototype._onAnnounceStopped = function (params, peer) { if (peer.complete) this.complete -= 1 else this.incomplete -= 1 - var id = params.type === 'ws' ? params.peer_id : params.addr - delete this.peers[id] + this.peers.remove(id) } -Swarm.prototype._onAnnounceCompleted = function (params, peer) { +Swarm.prototype._onAnnounceCompleted = function (params, peer, id) { if (!peer) { debug('unexpected `completed` event from peer that is not in swarm') return this._onAnnounceStarted(params, peer) // treat as a start @@ -86,9 +89,10 @@ Swarm.prototype._onAnnounceCompleted = function (params, peer) { this.complete += 1 this.incomplete -= 1 peer.complete = true + this.peers.set(id, peer) } -Swarm.prototype._onAnnounceUpdate = function (params, peer) { +Swarm.prototype._onAnnounceUpdate = function (params, peer, id) { if (!peer) { debug('unexpected `update` event from peer that is not in swarm') return this._onAnnounceStarted(params, peer) // treat as a start @@ -98,15 +102,18 @@ Swarm.prototype._onAnnounceUpdate = function (params, peer) { this.complete += 1 this.incomplete -= 1 peer.complete = true + this.peers.set(id, peer) } } Swarm.prototype._getPeers = function (numwant, ownPeerId, isWebRTC) { var peers = [] - var ite = randomIterate(Object.keys(this.peers)) + var ite = randomIterate(this.peers.keys) var peerId while ((peerId = ite()) && peers.length < numwant) { - var peer = this.peers[peerId] + // Don't mark the peer as most recently used on announce + var peer = this.peers.peek(peerId) + if (!peer) continue if (isWebRTC && peer.peerId === ownPeerId) continue // don't send peer to itself if ((isWebRTC && peer.type !== 'ws') || (!isWebRTC && peer.type === 'ws')) continue // send proper peer type peers.push(peer) diff --git a/package.json b/package.json index ffcc060..662c3a1 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "hat": "0.0.3", "inherits": "^2.0.1", "ip": "^1.0.1", + "lru": "^3.0.0", "minimist": "^1.1.1", "once": "^1.3.0", "random-iterate": "^1.0.1", diff --git a/server.js b/server.js index df52c39..28f33e6 100644 --- a/server.js +++ b/server.js @@ -51,6 +51,9 @@ function Server (opts) { self._trustProxy = !!opts.trustProxy if (typeof opts.filter === 'function') self._filter = opts.filter + self.peersCacheLength = opts.peersCacheLength + self.peersCacheTtl = opts.peersCacheTtl + self._listenCalled = false self.listening = false self.destroyed = false @@ -191,7 +194,7 @@ function Server (opts) { if (req.method === 'GET' && (req.url === '/stats' || req.url === '/stats.json')) { infoHashes.forEach(function (infoHash) { var peers = self.torrents[infoHash].peers - var keys = Object.keys(peers) + var keys = peers.keys if (keys.length > 0) activeTorrents++ keys.forEach(function (peerId) { @@ -203,7 +206,8 @@ function Server (opts) { leecher: false } } - var peer = peers[peerId] + // Don't mark the peer as most recently used for stats + var peer = peers.peek(peerId) if (peer.ip.indexOf(':') >= 0) { allPeers[peerId].ipv6 = true } else { @@ -533,7 +537,8 @@ Server.prototype._onWebSocketRequest = function (socket, opts, params) { if (!swarm) { return self.emit('warning', new Error('no swarm with that `info_hash`')) } - var toPeer = swarm.peers[params.to_peer_id] + // Mark the destination peer as recently used in cache + var toPeer = swarm.peers.get(params.to_peer_id) if (!toPeer) { return self.emit('warning', new Error('no peer with that `to_peer_id`')) } diff --git a/test/server.js b/test/server.js index 526cab3..b3a7b16 100644 --- a/test/server.js +++ b/test/server.js @@ -12,9 +12,10 @@ var test = require('tape') var infoHash = '4cb67059ed6bd08362da625b3ae77f6f4a075705' var peerId = Buffer.from('01234567890123456789') var peerId2 = Buffer.from('12345678901234567890') +var peerId3 = Buffer.from('23456789012345678901') function serverTest (t, serverType, serverFamily) { - t.plan(30) + t.plan(32) var hostname = serverFamily === 'inet6' ? '[::1]' @@ -23,7 +24,12 @@ function serverTest (t, serverType, serverFamily) { ? '::1' : '127.0.0.1' - common.createServer(t, serverType, function (server) { + var opts = { + serverType: serverType, + peersCacheLength: 2 + } + + common.createServer(t, opts, function (server) { var port = server[serverType].address().port var announceUrl = serverType + '://' + hostname + ':' + port + '/announce' @@ -52,22 +58,23 @@ function serverTest (t, serverType, serverFamily) { t.equal(Object.keys(server.torrents).length, 1) t.equal(swarm.complete, 0) t.equal(swarm.incomplete, 1) - t.equal(Object.keys(swarm.peers).length, 1) + t.equal(swarm.peers.length, 1) var id = serverType === 'ws' ? peerId.toString('hex') : hostname + ':6881' - t.equal(swarm.peers[id].type, serverType) - t.equal(swarm.peers[id].ip, clientIp) - t.equal(swarm.peers[id].peerId, peerId.toString('hex')) - t.equal(swarm.peers[id].complete, false) + var peer = swarm.peers.peek(id) + t.equal(peer.type, serverType) + t.equal(peer.ip, clientIp) + t.equal(peer.peerId, peerId.toString('hex')) + t.equal(peer.complete, false) if (serverType === 'ws') { - t.equal(typeof swarm.peers[id].port, 'number') - t.ok(swarm.peers[id].socket) + t.equal(typeof peer.port, 'number') + t.ok(peer.socket) } else { - t.equal(swarm.peers[id].port, 6881) - t.notOk(swarm.peers[id].socket) + t.equal(peer.port, 6881) + t.notOk(peer.socket) } client1.complete() @@ -102,22 +109,42 @@ function serverTest (t, serverType, serverFamily) { client2.once('peer', function (addr) { t.ok(addr === hostname + ':6881' || addr === hostname + ':6882' || addr.id === peerId.toString('hex')) - client2.stop() - client2.once('update', function (data) { - t.equal(data.announce, announceUrl) - t.equal(data.complete, 1) - t.equal(data.incomplete, 0) - client2.destroy() + swarm.peers.once('evict', function (evicted) { + t.equals(evicted.value.peerId, peerId.toString('hex')) + }) + var client3 = new Client({ + infoHash: infoHash, + announce: [ announceUrl ], + peerId: peerId3, + port: 6880 + // wrtc: wrtc + }) + client3.start() - client1.stop() - client1.once('update', function (data) { + server.once('start', function () { + t.pass('got start message from client3') + }) + + client3.once('update', function () { + client2.stop() + client2.once('update', function (data) { t.equal(data.announce, announceUrl) - t.equal(data.complete, 0) - t.equal(data.incomplete, 0) + t.equal(data.complete, 1) + t.equal(data.incomplete, 1) + client2.destroy() - client1.destroy(function () { - server.close() - // if (serverType === 'ws') wrtc.close() + client3.stop() + client3.once('update', function (data) { + t.equal(data.announce, announceUrl) + t.equal(data.complete, 1) + t.equal(data.incomplete, 0) + + client3.destroy(function () { + client1.destroy(function () { + server.close() + }) + // if (serverType === 'ws') wrtc.close() + }) }) }) })