added client and server support for scrape messages in addition to announce messages (tcp and udp)

This commit is contained in:
fisch0920 2014-05-09 02:38:41 -04:00
parent 931f0a5a48
commit 5064a05dde
4 changed files with 211 additions and 68 deletions

218
index.js
View File

@ -18,7 +18,7 @@ var string2compact = require('string2compact')
var url = require('url')
var CONNECTION_ID = Buffer.concat([ toUInt32(0x417), toUInt32(0x27101980) ])
var ACTIONS = { CONNECT: 0, ANNOUNCE: 1 }
var ACTIONS = { CONNECT: 0, ANNOUNCE: 1, SCRAPE: 2 }
var EVENTS = { completed: 1, started: 2, stopped: 3 }
var MAX_UINT = 4294967295
@ -83,6 +83,30 @@ Tracker.prototype.update = function (opts) {
self._request(opts)
}
Tracker.prototype.scrape = function (opts) {
var self = this
if (!self._scrapeUrl) {
var announce = 'announce'
var i = self._announceUrl.lastIndexOf('\/') + 1
if (i >= 1 && self._announceUrl.slice(i, i + announce.length) === announce) {
self._scrapeUrl = self._announceUrl.slice(0, i) + 'scrape' + self._announceUrl.slice(i + announce.length)
}
}
if (!self._scrapeUrl) {
self.client.emit('error', new Error('scrape not supported for announceUrl ' + self._announceUrl))
return
}
opts = extend({
info_hash: bytewiseEncodeURIComponent(self.client._infoHash)
}, opts)
self._requestImpl(self._scrapeUrl, opts)
}
Tracker.prototype.setInterval = function (intervalMs) {
var self = this
if (self._interval) {
@ -96,7 +120,7 @@ Tracker.prototype.setInterval = function (intervalMs) {
}
/**
* Send a request to the tracker
* Send an announce request to the tracker
*/
Tracker.prototype._request = function (opts) {
var self = this
@ -115,12 +139,12 @@ Tracker.prototype._request = function (opts) {
opts.trackerid = self._trackerId
}
self._requestImpl(opts)
self._requestImpl(self._announceUrl, opts)
}
Tracker.prototype._requestHttp = function (opts) {
Tracker.prototype._requestHttp = function (requestUrl, opts) {
var self = this
var fullUrl = self._announceUrl + '?' + querystring.stringify(opts)
var fullUrl = requestUrl + '?' + querystring.stringify(opts)
var req = http.get(fullUrl, function (res) {
var data = ''
@ -133,7 +157,7 @@ Tracker.prototype._requestHttp = function (opts) {
data += chunk
})
res.on('end', function () {
self._handleResponse(data)
self._handleResponse(requestUrl, data)
})
})
@ -142,9 +166,9 @@ Tracker.prototype._requestHttp = function (opts) {
})
}
Tracker.prototype._requestUdp = function (opts) {
Tracker.prototype._requestUdp = function (requestUrl, opts) {
var self = this
var parsedUrl = url.parse(self._announceUrl)
var parsedUrl = url.parse(requestUrl)
var socket = dgram.createSocket('udp4')
var transactionId = new Buffer(hat(32), 'hex')
@ -169,21 +193,21 @@ Tracker.prototype._requestUdp = function (opts) {
socket.on('message', function (message, rinfo) {
if (message.length < 8 || message.readUInt32BE(4) !== transactionId.readUInt32BE(0)) {
return error(new Error('tracker sent back invalid transaction id'))
return error('tracker sent back invalid transaction id')
}
var action = message.readUInt32BE(0)
switch (action) {
case 0:
case 0: // handshake
if (message.length < 16) {
return error(new Error('invalid udp handshake'))
return error('invalid udp handshake')
}
announce(message.slice(8, 16), opts)
return
case 1:
case 1: // announce
if (message.length < 20) {
return error(new Error('invalid announce message'))
return error('invalid announce message')
}
var interval = message.readUInt32BE(8)
@ -205,6 +229,23 @@ Tracker.prototype._requestUdp = function (opts) {
clearTimeout(timeout)
socket.close()
return
case 2: // scrape
if (message.length < 20) {
return error('invalid scrape message')
}
self.client.emit('scrape', {
announce: self._announceUrl,
complete: message.readUInt32BE(8),
downloaded: message.readUInt32BE(12),
incomplete: message.readUInt32BE(16)
})
clearTimeout(timeout)
socket.close()
return
}
})
@ -215,9 +256,13 @@ Tracker.prototype._requestUdp = function (opts) {
socket.send(message, 0, message.length, parsedUrl.port, parsedUrl.hostname)
}
function genTransactionId () {
transactionId = new Buffer(hat(32), 'hex')
}
function announce (connectionId, opts) {
opts = opts || {}
transactionId = new Buffer(hat(32), 'hex')
genTransactionId()
send(Buffer.concat([
connectionId,
@ -235,6 +280,17 @@ Tracker.prototype._requestUdp = function (opts) {
toUInt16(self.client._port || 0)
]))
}
function scrape (connectionId, opts) {
genTransactionId()
send(Buffer.concat([
connectionId,
toUInt32(ACTIONS.SCRAPE),
transactionId,
self.client._infoHash
]))
}
send(Buffer.concat([
CONNECTION_ID,
@ -243,7 +299,7 @@ Tracker.prototype._requestUdp = function (opts) {
]))
}
Tracker.prototype._handleResponse = function (data) {
Tracker.prototype._handleResponse = function (requestUrl, data) {
var self = this
try {
@ -260,37 +316,55 @@ Tracker.prototype._handleResponse = function (data) {
if (warning) {
self.client.emit('warning', warning);
}
if (requestUrl === self._announceUrl) {
var interval = data.interval || data['min interval']
if (interval && !self._opts.interval && self._intervalMs !== 0) {
// use the interval the tracker recommends, UNLESS the user manually specifies an
// interval they want to use
self.setInterval(interval * 1000)
}
var interval = data.interval || data['min interval']
if (interval && !self._opts.interval && self._intervalMs !== 0) {
// use the interval the tracker recommends, UNLESS the user manually specifies an
// interval they want to use
self.setInterval(interval * 1000)
}
var trackerId = data['tracker id']
if (trackerId) {
// If absent, do not discard previous trackerId value
self._trackerId = trackerId
}
var trackerId = data['tracker id']
if (trackerId) {
// If absent, do not discard previous trackerId value
self._trackerId = trackerId
}
self.client.emit('update', {
announce: self._announceUrl,
complete: data.complete,
incomplete: data.incomplete
})
if (Buffer.isBuffer(data.peers)) {
// tracker returned compact response
compact2string.multi(data.peers).forEach(function (addr) {
self.client.emit('peer', addr)
})
} else if (Array.isArray(data.peers)) {
// tracker returned normal response
data.peers.forEach(function (peer) {
var ip = peer.ip
self.client.emit('peer', ip[0] + '.' + ip[1] + '.' + ip[2] + '.' + ip[3] + ':' + peer.port)
self.client.emit('update', {
announce: self._announceUrl,
complete: data.complete,
incomplete: data.incomplete
})
if (Buffer.isBuffer(data.peers)) {
// tracker returned compact response
compact2string.multi(data.peers).forEach(function (addr) {
self.client.emit('peer', addr)
})
} else if (Array.isArray(data.peers)) {
// tracker returned normal response
data.peers.forEach(function (peer) {
var ip = peer.ip
self.client.emit('peer', ip[0] + '.' + ip[1] + '.' + ip[2] + '.' + ip[3] + ':' + peer.port)
})
}
} else if (requestUrl === self._scrapeUrl) {
// note: the unofficial spec says to use the 'files' key but i've seen 'host' in practice
data = data.files || data.host || {}
data = data[bytewiseEncodeURIComponent(self.client._infoHash)]
if (!data) {
self.client.emit('error', new Error('invalid scrape response'))
} else {
// TODO: optionally handle data.flags.min_request_interval (separate from announce interval)
self.client.emit('scrape', {
announce: self._announceUrl,
complete: data.complete,
incomplete: data.incomplete,
downloaded: data.downloaded
})
}
}
}
@ -360,6 +434,13 @@ Client.prototype.update = function (opts) {
})
}
Client.prototype.scrape = function (opts) {
var self = this
self._trackers.forEach(function (tracker) {
tracker.scrape(opts)
})
}
Client.prototype.setInterval = function (intervalMs) {
var self = this
self._intervalMs = intervalMs
@ -452,18 +533,17 @@ Server.prototype._onHttpRequest = function (req, res) {
var port = Number(params.port)
var addr = ip + ':' + port
// TODO: support multiple info_hash parameters as a concatenation of individual requests
var infoHash = bytewiseDecodeURIComponent(params.info_hash).toString('hex')
var peerId = bytewiseDecodeURIComponent(params.peer_id).toString('utf8')
var swarm = self.torrents[infoHash]
if (!swarm) {
swarm = self.torrents[infoHash] = {
complete: 0,
incomplete: 0,
peers: {}
}
if (!infoHash) {
return error('bittorrent-tracker server only supports announcing one torrent at a time')
}
var swarm = self._getSwarm(infoHash)
var peer = swarm.peers[addr]
switch (params.event) {
case 'started':
if (peer) {
@ -547,10 +627,44 @@ Server.prototype._onHttpRequest = function (req, res) {
}
res.end(bncode.encode(response))
} else { // TODO: handle unofficial scrape messages
} else if (s[0] === '/scrape') { // unofficial scrape message
var params = querystring.parse(s[1])
var infoHash = bytewiseDecodeURIComponent(params.info_hash).toString('hex')
if (!infoHash) {
return error('bittorrent-tracker server only supports scraping one torrent at a time')
}
var swarm = self._getSwarm(infoHash)
var response = { files : { } }
response.files[params.info_hash] = {
complete: swarm.complete,
incomplete: swarm.incomplete,
downloaded: swarm.complete, // TODO: this only provides a lower-bound
flags: {
min_request_interval: self._interval
}
}
res.end(bncode.encode(response))
}
}
Server.prototype._getSwarm = function (infoHash) {
var self = this
var swarm = self.torrents[infoHash]
if (!swarm) {
swarm = self.torrents[infoHash] = {
complete: 0,
incomplete: 0,
peers: {}
}
}
return swarm
}
Server.prototype._onUdpRequest = function (req, res) {
// TODO: implement UDP server
}

View File

@ -6,6 +6,7 @@ var test = require('tape')
var torrent = fs.readFileSync(__dirname + '/torrents/bitlove-intro.torrent')
var parsedTorrent = parseTorrent(torrent)
var peerId = new Buffer('01234567890123456789')
var announceUrl = 'http://t.bitlove.org/announce' // TODO: shouldn't rely on an external server!
var port = 6881
test('client.start()', function (t) {
@ -18,13 +19,13 @@ test('client.start()', function (t) {
})
client.once('update', function (data) {
t.equal(data.announce, 'http://t.bitlove.org/announce')
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
})
client.once('peer', function (addr) {
t.pass('there is at least one peer') // TODO: this shouldn't rely on an external server!
t.pass('there is at least one peer')
client.stop()
})
@ -47,7 +48,7 @@ test('client.stop()', function (t) {
client.once('update', function (data) {
// receive one final update after calling stop
t.equal(data.announce, 'http://t.bitlove.org/announce')
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
})
@ -73,7 +74,7 @@ test('client.update()', function (t) {
client.once('update', function (data) {
// received an update!
t.equal(data.announce, 'http://t.bitlove.org/announce')
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
client.stop()
@ -82,4 +83,23 @@ test('client.update()', function (t) {
})
})
test('client.scrape()', function (t) {
t.plan(4)
var client = new Client(peerId, port, parsedTorrent)
client.on('error', function (err) {
t.error(err)
})
client.once('scrape', function (data) {
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
})
client.scrape()
})
// TODO: add test where tracker doesn't support compact

View File

@ -11,7 +11,7 @@ var peerId = '12345678901234567890'
var torrentLength = 50000
test('server', function (t) {
t.plan(17)
t.plan(21)
var server = new Server() // { interval: 50000, compactOnly: false }
@ -66,14 +66,22 @@ test('server', function (t) {
t.equal(data.complete, 1)
t.equal(data.incomplete, 0)
client.stop()
client.once('update', function (data) {
client.scrape()
client.once('scrape', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 0)
t.equal(data.incomplete, 0)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
server.close()
client.once('update', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 0)
t.equal(data.incomplete, 0)
server.close()
})
client.stop()
})
})
})

View File

@ -7,7 +7,8 @@ var torrent = fs.readFileSync(__dirname + '/torrents/leaves.torrent')
var parsedTorrent = parseTorrent(torrent)
// remove all tracker servers except a single UDP one, for now
parsedTorrent.announce = [ 'udp://tracker.publicbt.com:80' ]
var announceUrl = 'udp://tracker.publicbt.com:80'
parsedTorrent.announce = [ announceUrl ]
var peerId = new Buffer('01234567890123456789')
var port = 6881
@ -22,7 +23,7 @@ test('udp: client.start/update/stop()', function (t) {
})
client.once('update', function (data) {
t.equal(data.announce, 'udp://tracker.publicbt.com:80')
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
})
@ -32,17 +33,17 @@ test('udp: client.start/update/stop()', function (t) {
client.once('update', function (data) {
// receive one final update after calling stop
t.equal(data.announce, 'udp://tracker.publicbt.com:80')
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
client.once('update', function (data) {
// received an update!
t.equal(data.announce, 'udp://tracker.publicbt.com:80')
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
})
client.stop()
})