BREAKING CHANGES

Breaking changes:
- 'listening' event no longer emits with `port` param
- `server.port` property removed (instead, use
`server.http.address().port`)

Added features:
- expose http server as `server.http`
- expose udp server as `server.udp`
- client.destroy() - ungracefully leave the swarm
- server: added `filter` option to black/whitelist torrents

Bugfixes:
- client considers udp tracker errors to be warnings
- emit 'start', 'stop', 'update', etc. AFTER response sent
- fix udp error response action and message being `undefined`

Internal:
- remove `portfinder` dep
- add complete test for `filter` functionality
This commit is contained in:
Feross Aboukhadijeh 2015-01-29 14:59:08 -08:00
parent 098ec7041c
commit 82e6792a6b
12 changed files with 231 additions and 199 deletions

View File

@ -87,6 +87,9 @@ client.update()
// stop getting peers from the tracker, gracefully leave the swarm
client.stop()
// ungracefully leave the swarm (without sending final 'stop' message)
client.destroy()
// scrape
client.scrape()
@ -108,13 +111,17 @@ var Server = require('bittorrent-tracker').Server
var server = new Server({
udp: true, // enable udp server? [default=true]
http: true, // enable http server? [default=true]
filter: function (hash) {
// specify white/blacklist for disallowing/allowing torrents
return hash !== 'aaa67059ed6bd08362da625b3ae77f6f4a075aaa'
filter: function (infoHash) {
// black/whitelist for disallowing/allowing torrents [default=allow all]
// this example only allows this one torrent
return infoHash === 'aaa67059ed6bd08362da625b3ae77f6f4a075aaa'
})
})
// Internal http and udp servers exposed as public properties.
server.http
server.udp
server.on('error', function (err) {
// fatal server error!
console.log(err.message)
@ -125,11 +132,13 @@ server.on('warning', function (err) {
console.log(err.message)
})
server.on('listening', function (port) {
console.log('tracker server is now listening on ' + port)
server.on('listening', function () {
// fired when all requested servers are listening
console.log('listening on http port:' + server.http.address().port)
console.log('listening on udp port:' + server.udp.address().port)
})
// start tracker server listening!
// start tracker server listening! Use 0 to listen on a random free port.
server.listen(port)
// listen for individual tracker messages from peers:

View File

@ -128,6 +128,13 @@ Client.prototype.setInterval = function (intervalMs) {
})
}
Client.prototype.destroy = function () {
var self = this
self._trackers.forEach(function (tracker) {
tracker.destroy()
})
}
inherits(Tracker, EventEmitter)
/**
@ -175,7 +182,7 @@ Tracker.prototype.stop = function (opts) {
debug('sent `stop` %s', self._announceUrl)
self._announce(opts)
self.setInterval(0) // stop announcing on intervals
self.destroy()
}
Tracker.prototype.complete = function (opts) {
@ -196,6 +203,12 @@ Tracker.prototype.update = function (opts) {
self._announce(opts)
}
Tracker.prototype.destroy = function () {
var self = this
debug('destroy', self._announceUrl)
self.setInterval(0) // stop announcing on intervals
}
/**
* Send an announce request to the tracker.
* @param {Object} opts
@ -372,7 +385,7 @@ Tracker.prototype._requestUdp = function (requestUrl, opts) {
if (msg.length < 8) {
return error('invalid error message')
}
self.client.emit('error', new Error(msg.slice(8).toString()))
self.client.emit('warning', new Error(msg.slice(8).toString()))
break
default:

View File

@ -19,6 +19,12 @@ exports.EVENT_IDS = {
2: 'started',
3: 'stopped'
}
exports.EVENT_NAMES = {
update: 'update',
completed: 'complete',
started: 'start',
stopped: 'stop'
}
function toUInt32 (n) {
var buf = new Buffer(4)

View File

@ -8,7 +8,6 @@ function Swarm (infoHash, server) {
this.peers = {}
this.complete = 0
this.incomplete = 0
this.emit = server.emit.bind(server)
}
Swarm.prototype.announce = function (params, cb) {
@ -16,7 +15,6 @@ Swarm.prototype.announce = function (params, cb) {
var peer = self.peers[params.addr]
// Dispatch announce event
if (!params.event || params.event === 'empty') params.event = 'update'
var fn = '_onAnnounce_' + params.event
if (self[fn]) {
self[fn](params, peer) // process event
@ -46,7 +44,6 @@ Swarm.prototype._onAnnounce_started = function (params, peer) {
port: params.port,
peerId: params.peer_id
}
this.emit('start', params.addr)
}
Swarm.prototype._onAnnounce_stopped = function (params, peer) {
@ -58,7 +55,6 @@ Swarm.prototype._onAnnounce_stopped = function (params, peer) {
if (peer.complete) this.complete -= 1
else this.incomplete -= 1
this.peers[params.addr] = null
this.emit('stop', params.addr)
}
Swarm.prototype._onAnnounce_completed = function (params, peer) {
@ -74,7 +70,6 @@ Swarm.prototype._onAnnounce_completed = function (params, peer) {
this.complete += 1
this.incomplete -= 1
peer.complete = true
this.emit('complete', params.addr)
}
Swarm.prototype._onAnnounce_update = function (params, peer) {
@ -82,7 +77,6 @@ Swarm.prototype._onAnnounce_update = function (params, peer) {
debug('unexpected `update` event from peer that is not in swarm')
return this._onAnnounce_started(params, peer) // treat as a start
}
this.emit('update', params.addr)
}
Swarm.prototype._getPeers = function (numwant) {

View File

@ -24,7 +24,6 @@
"inherits": "^2.0.1",
"ip": "^0.3.0",
"once": "^1.3.0",
"portfinder": "^0.3.0",
"run-series": "^1.0.2",
"simple-get": "^1.3.0",
"string2compact": "^1.1.1"

View File

@ -6,7 +6,6 @@ var dgram = require('dgram')
var EventEmitter = require('events').EventEmitter
var http = require('http')
var inherits = require('inherits')
var portfinder = require('portfinder')
var series = require('run-series')
var string2compact = require('string2compact')
@ -15,9 +14,6 @@ var Swarm = require('./lib/swarm')
var parseHttpRequest = require('./lib/parse_http')
var parseUdpRequest = require('./lib/parse_udp')
// Use random port above 1024
portfinder.basePort = Math.floor(Math.random() * 60000) + 1025
inherits(Server, EventEmitter)
/**
@ -40,40 +36,41 @@ function Server (opts) {
EventEmitter.call(self)
opts = opts || {}
if (opts.http === false && opts.udp === false)
throw new Error('must start at least one type of server (http or udp)')
self._intervalMs = opts.interval
? opts.interval
: 10 * 60 * 1000 // 10 min
self._trustProxy = !!opts.trustProxy
if (typeof opts.filter === 'function') self._filter = opts.filter
self.listening = false
self.port = null
self.torrents = {}
// default to starting an http server unless the user explictly says no
if (opts.http !== false) {
self._httpServer = http.createServer()
self._httpServer.on('request', self.onHttpRequest.bind(self))
self._httpServer.on('error', self._onError.bind(self))
self._httpServer.on('listening', onListening)
self.http = http.createServer()
self.http.on('request', self.onHttpRequest.bind(self))
self.http.on('error', self._onError.bind(self))
self.http.on('listening', onListening)
}
// default to starting a udp server unless the user explicitly says no
if (opts.udp !== false) {
self._udpSocket = dgram.createSocket('udp4')
self._udpSocket.on('message', self.onUdpRequest.bind(self))
self._udpSocket.on('error', self._onError.bind(self))
self._udpSocket.on('listening', onListening)
self.udp = dgram.createSocket('udp4')
self.udp.on('message', self.onUdpRequest.bind(self))
self.udp.on('error', self._onError.bind(self))
self.udp.on('listening', onListening)
}
if (typeof opts.filter === 'function') self._filter = opts.filter
var num = !!self._httpServer + !!self._udpSocket
var num = !!self.http + !!self.udp
function onListening () {
num -= 1
if (num === 0) {
self.listening = true
self.emit('listening', self.port)
self.emit('listening')
}
}
}
@ -92,28 +89,24 @@ Server.prototype.listen = function (port, onlistening) {
if (self.listening) throw new Error('server already listening')
if (onlistening) self.once('listening', onlistening)
function onPort (err, port) {
if (err) return self.emit('error', err)
self.port = port
// ATTENTION:
// binding to :: only receives IPv4 connections if the bindv6only
// sysctl is set 0, which is the default on many operating systems.
self._httpServer && self._httpServer.listen(port.http || port, '::')
self._udpSocket && self._udpSocket.bind(port.udp || port)
}
if (!port) port = 0
if (port) onPort(null, port)
else portfinder.getPort(onPort)
// ATTENTION:
// binding to :: only receives IPv4 connections if the bindv6only
// sysctl is set 0, which is the default on many operating systems.
self.http && self.http.listen(port.http || port, '::')
self.udp && self.udp.bind(port.udp || port)
}
Server.prototype.close = function (cb) {
var self = this
self.listening = false
cb = cb || function () {}
if (self._udpSocket) {
self._udpSocket.close()
if (self.udp) {
self.udp.close()
}
if (self._httpServer) {
self._httpServer.close(cb)
if (self.http) {
self.http.close(cb)
} else {
cb(null)
}
@ -122,7 +115,7 @@ Server.prototype.close = function (cb) {
Server.prototype.getSwarm = function (infoHash) {
var self = this
if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex')
if (self._filter && self._filter(infoHash)) return null
if (self._filter && !self._filter(infoHash)) return null
var swarm = self.torrents[infoHash]
if (!swarm) swarm = self.torrents[infoHash] = new Swarm(infoHash, this)
return swarm
@ -157,6 +150,10 @@ Server.prototype.onHttpRequest = function (req, res) {
delete response.action // only needed for UDP encoding
res.end(bencode.encode(response))
if (params.action === common.ACTIONS.ANNOUNCE) {
self.emit(common.EVENT_NAMES[params.event], params.addr)
}
})
}
@ -177,15 +174,21 @@ Server.prototype.onUdpRequest = function (msg, rinfo) {
if (err) {
self.emit('warning', err)
response = {
action: common.ACTIONS.ERRROR,
action: common.ACTIONS.ERROR,
'failure reason': err.message
}
}
if (!self.listening) return
response.transactionId = params.transactionId
response.connectionId = params.connectionId
var buf = makeUdpPacket(response)
self._udpSocket.send(buf, 0, buf.length, rinfo.port, rinfo.address)
self.udp.send(buf, 0, buf.length, rinfo.port, rinfo.address)
if (params.action === common.ACTIONS.ANNOUNCE) {
self.emit(common.EVENT_NAMES[params.event], params.addr)
}
})
}
@ -204,7 +207,8 @@ Server.prototype._onRequest = function (params, cb) {
Server.prototype._onAnnounce = function (params, cb) {
var self = this
var swarm = self.getSwarm(params.info_hash)
if (swarm === null) return cb(new Error('invalid hash'))
if (swarm === null) return cb(new Error('disallowed info_hash'))
if (!params.event || params.event === 'empty') params.event = 'update'
swarm.announce(params, function (err, response) {
if (response) {
if (!response.action) response.action = common.ACTIONS.ANNOUNCE
@ -311,7 +315,7 @@ function makeUdpPacket (params) {
packet = Buffer.concat([
common.toUInt32(common.ACTIONS.ERROR),
common.toUInt32(params.transactionId || 0),
new Buffer(params.message, 'utf8')
new Buffer(params['failure reason'], 'utf8')
])
break
default:

View File

@ -1,7 +1,6 @@
var Client = require('../')
var fs = require('fs')
var parseTorrent = require('parse-torrent')
var portfinder = require('portfinder')
var Server = require('../').Server
var test = require('tape')
@ -10,7 +9,7 @@ var parsedTorrent = parseTorrent(torrent)
var peerId = new Buffer('01234567890123456789')
test('large torrent: client.start()', function (t) {
t.plan(6)
t.plan(5)
var server = new Server({ http: false })
@ -22,9 +21,8 @@ test('large torrent: client.start()', function (t) {
t.fail(err.message)
})
portfinder.getPort(function (err, port) {
t.error(err, 'found free port')
server.listen(port)
server.listen(0, function () {
var port = server.udp.address().port
// remove all tracker servers except a single UDP one, for now
parsedTorrent.announce = [ 'udp://127.0.0.1:' + port ]

View File

@ -1,6 +1,5 @@
var Client = require('../')
var magnet = require('magnet-uri')
var portfinder = require('portfinder')
var Server = require('../').Server
var test = require('tape')
@ -9,7 +8,7 @@ var parsedTorrent = magnet(uri)
var peerId = new Buffer('01234567890123456789')
test('magnet + udp: client.start/update/stop()', function (t) {
t.plan(12)
t.plan(11)
var server = new Server({ http: false })
@ -21,9 +20,8 @@ test('magnet + udp: client.start/update/stop()', function (t) {
t.fail(err.message)
})
portfinder.getPort(function (err, port) {
t.error(err, 'found free port')
server.listen(port)
server.listen(0, function () {
var port = server.udp.address().port
var announceUrl = 'udp://127.0.0.1:' + port
// remove all tracker servers except a single UDP one, for now

View File

@ -1,4 +1,3 @@
var portfinder = require('portfinder')
var Server = require('../').Server
exports.createServer = function (t, serverType, cb) {
@ -13,14 +12,12 @@ exports.createServer = function (t, serverType, cb) {
t.error(err)
})
portfinder.getPort(function (err, port) {
if (err) return t.error(err)
server.listen(0, function () {
var port = server[serverType].address().port
var announceUrl = serverType === 'http'
? 'http://127.0.0.1:' + port + '/announce'
: 'udp://127.0.0.1:' + port
server.listen(port)
cb(server, announceUrl)
})
}

85
test/filter.js Normal file
View File

@ -0,0 +1,85 @@
var Client = require('../')
var fs = require('fs')
var parseTorrent = require('parse-torrent')
var Server = require('../').Server
var test = require('tape')
var bitlove = fs.readFileSync(__dirname + '/torrents/bitlove-intro.torrent')
var parsedBitlove = parseTorrent(bitlove)
var leaves = fs.readFileSync(__dirname + '/torrents/leaves.torrent')
var parsedLeaves = parseTorrent(leaves)
var peerId = new Buffer('01234567890123456789')
function testFilterOption (t, serverType) {
t.plan(6)
var opts = serverType === 'http' ? { udp: false } : { http: false }
opts.filter = function (infoHash) {
return infoHash !== parsedBitlove.infoHash
}
var server = new Server(opts)
server.on('error', function (err) {
t.error(err)
})
server.listen(0, function () {
var port = server[serverType].address().port
var announceUrl = serverType === 'http'
? 'http://127.0.0.1:' + port + '/announce'
: 'udp://127.0.0.1:' + port
parsedBitlove.announce = [ announceUrl ]
parsedLeaves.announce = [ announceUrl ]
var client = new Client(peerId, port, parsedBitlove)
client.on('error', function (err) {
t.error(err)
})
client.once('warning', function (err) {
t.ok(/disallowed info_hash/.test(err.message), 'got client warning')
client.destroy()
client = new Client(peerId, port, parsedLeaves)
client.on('error', function (err) {
t.error(err)
})
client.on('warning', function (err) {
t.error(err)
})
client.on('update', function () {
t.pass('got announce')
client.destroy()
server.close(function () {
t.pass('server closed')
})
})
server.on('start', function () {
t.equal(Object.keys(server.torrents).length, 1)
})
client.start()
})
server.once('warning', function (err) {
t.ok(/disallowed info_hash/.test(err.message), 'got server warning')
t.equal(Object.keys(server.torrents).length, 0)
})
client.start()
})
}
test('http: filter option blocks tracker from tracking torrent', function (t) {
testFilterOption(t, 'http')
})
test('udp: filter option blocks tracker from tracking torrent', function (t) {
testFilterOption(t, 'udp')
})

View File

@ -5,7 +5,6 @@ var commonTest = require('./common')
var fs = require('fs')
var get = require('simple-get')
var parseTorrent = require('parse-torrent')
var portfinder = require('portfinder')
var Server = require('../').Server
var test = require('tape')
@ -26,16 +25,14 @@ var peerId = new Buffer('01234567890123456789')
function testSingle (t, serverType) {
commonTest.createServer(t, serverType, function (server, announceUrl) {
server.once('listening', function () {
Client.scrape(announceUrl, infoHash1, function (err, data) {
t.error(err)
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
server.close(function () {
t.end()
})
Client.scrape(announceUrl, infoHash1, function (err, data) {
t.error(err)
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
server.close(function () {
t.end()
})
})
})
@ -51,16 +48,14 @@ test('udp: single info_hash scrape', function (t) {
function clientScrapeStatic (t, serverType) {
commonTest.createServer(t, serverType, function (server, announceUrl) {
server.once('listening', function () {
Client.scrape(announceUrl, infoHash1, function (err, data) {
t.error(err)
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
server.close(function () {
t.end()
})
Client.scrape(announceUrl, infoHash1, function (err, data) {
t.error(err)
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
server.close(function () {
t.end()
})
})
})
@ -85,36 +80,32 @@ test('server: multiple info_hash scrape', function (t) {
t.error(err)
})
portfinder.getPort(function (err, port) {
t.error(err)
server.listen(port)
server.listen(0, function () {
var port = server.http.address().port
var scrapeUrl = 'http://127.0.0.1:' + port + '/scrape'
var url = scrapeUrl + '?' + commonLib.querystringStringify({
info_hash: [ binaryInfoHash1, binaryInfoHash2 ]
})
get.concat(url, function (err, data, res) {
if (err) throw err
t.equal(res.statusCode, 200)
server.once('listening', function () {
var url = scrapeUrl + '?' + commonLib.querystringStringify({
info_hash: [ binaryInfoHash1, binaryInfoHash2 ]
})
get.concat(url, function (err, data, res) {
if (err) throw err
t.equal(res.statusCode, 200)
data = bencode.decode(data)
t.ok(data.files)
t.equal(Object.keys(data.files).length, 2)
data = bencode.decode(data)
t.ok(data.files)
t.equal(Object.keys(data.files).length, 2)
t.ok(data.files[binaryInfoHash1])
t.equal(typeof data.files[binaryInfoHash1].complete, 'number')
t.equal(typeof data.files[binaryInfoHash1].incomplete, 'number')
t.equal(typeof data.files[binaryInfoHash1].downloaded, 'number')
t.ok(data.files[binaryInfoHash1])
t.equal(typeof data.files[binaryInfoHash1].complete, 'number')
t.equal(typeof data.files[binaryInfoHash1].incomplete, 'number')
t.equal(typeof data.files[binaryInfoHash1].downloaded, 'number')
t.ok(data.files[binaryInfoHash2])
t.equal(typeof data.files[binaryInfoHash2].complete, 'number')
t.equal(typeof data.files[binaryInfoHash2].incomplete, 'number')
t.equal(typeof data.files[binaryInfoHash2].downloaded, 'number')
t.ok(data.files[binaryInfoHash2])
t.equal(typeof data.files[binaryInfoHash2].complete, 'number')
t.equal(typeof data.files[binaryInfoHash2].incomplete, 'number')
t.equal(typeof data.files[binaryInfoHash2].downloaded, 'number')
server.close(function () {
t.end()
})
server.close(function () {
t.end()
})
})
})
@ -129,100 +120,37 @@ test('server: all info_hash scrape', function (t) {
t.error(err)
})
portfinder.getPort(function (err, port) {
t.error(err)
server.listen(port)
server.listen(0, function () {
var port = server.http.address().port
var announceUrl = 'http://127.0.0.1:' + port + '/announce'
var scrapeUrl = 'http://127.0.0.1:' + port + '/scrape'
parsedBitlove.announce = [ announceUrl ]
server.once('listening', function () {
// announce a torrent to the tracker
var client = new Client(peerId, port, parsedBitlove)
client.on('error', function (err) {
t.error(err)
})
client.start()
server.once('start', function () {
// now do a scrape of everything by omitting the info_hash param
get.concat(scrapeUrl, function (err, data, res) {
if (err) throw err
t.equal(res.statusCode, 200)
data = bencode.decode(data)
t.ok(data.files)
t.equal(Object.keys(data.files).length, 1)
t.ok(data.files[binaryBitlove])
t.equal(typeof data.files[binaryBitlove].complete, 'number')
t.equal(typeof data.files[binaryBitlove].incomplete, 'number')
t.equal(typeof data.files[binaryBitlove].downloaded, 'number')
client.stop()
server.close(function () {
t.end()
})
})
})
// announce a torrent to the tracker
var client = new Client(peerId, port, parsedBitlove)
client.on('error', function (err) {
t.error(err)
})
})
})
client.start()
test('http nonwhitelisted torrent does not appear in scrape', function (t) {
var server = new Server({
filter: function (hash) {
return hash !== parsedBitlove
},
udp: false
})
server.on('error', function (err) {
t.error(err)
})
portfinder.getPort(function (err, port) {
t.error(err)
server.listen(port)
var announceUrl = 'http://127.0.0.1:' + port + '/announce'
var scrapeUrl = 'http://127.0.0.1:' + port + '/scrape'
parsedBitlove.announce = [ announceUrl ]
server.once('listening', function () {
var client = new Client(peerId, port, parsedBitlove)
client.start()
client.on('error', function (err) {
t.error(err)
})
server.once('warning', function (err) {
if (err) {
get.concat(scrapeUrl, function (err, data, res) {
if (err) throw err
t.equal(res.statusCode, 200)
data = bencode.decode(data)
t.ok(data.files)
t.notOk(data.files[binaryBitlove])
client.stop()
server.close(function () {
t.end()
})
})
}
})
server.once('start', function (err) {
server.once('start', function () {
// now do a scrape of everything by omitting the info_hash param
get.concat(scrapeUrl, function (err, data, res) {
if (err) throw err
t.equal(res.statusCode, 200)
data = bencode.decode(data)
t.ok(data.files)
t.equal(Object.keys(data.files).length, 1)
t.ok(data.files[binaryBitlove])
t.equal(typeof data.files[binaryBitlove].complete, 'number')
t.equal(typeof data.files[binaryBitlove].incomplete, 'number')
t.equal(typeof data.files[binaryBitlove].downloaded, 'number')
client.stop()
server.close(function () {
t.fail('server should have thrown an error; filter condition was probably successful')
t.end()
})
})

View File

@ -28,7 +28,8 @@ function serverTest (t, serverType, serverFamily) {
t.pass('server listening')
})
server.listen(function (port) {
server.listen(0, function () {
var port = server[serverType].address().port
var announceUrl = serverType + '://' + serverAddr + ':' + port + '/announce'
var client = new Client(peerId, 6881, {