From 366b49bf068b17c4adbb72efa696a991650d4d39 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 9 Dec 2014 02:35:05 +0100 Subject: [PATCH] server: split out _onRequest() et al --- server.js | 452 +++++++++++++++++++++++++----------------------------- 1 file changed, 206 insertions(+), 246 deletions(-) diff --git a/server.js b/server.js index dc041ba..10f8bcb 100644 --- a/server.js +++ b/server.js @@ -157,140 +157,18 @@ Server.prototype._onHttpRequest = function (req, res) { return } - - var response - if (params && params.action === common.ACTIONS.ANNOUNCE) { - var swarm = self._getSwarm(params.info_hash) - var peer = swarm.peers[params.addr] - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update + this._onRequest(params, function (err, response) { + if (err) { + self.emit('warning', new Error(err.message)) + response = { + action: common.ACTIONS.ERRROR, + 'failure reason': err.message } - if (params.left === 0) swarm.complete += 1 - else swarm.incomplete += 1 - peer = swarm.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) - } - - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) swarm.complete -= 1 - else swarm.incomplete -= 1 - swarm.peers[params.addr] = null - self.emit('stop', params.addr) - } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - swarm.complete += 1 - swarm.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) - } - - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) - } - - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case '': case undefined: // update - update() - break - default: - return error('invalid event') // early return - } - - if (params.left === 0 && peer) peer.complete = true - - // send peers - var peers = params.compact === 1 - ? self._getPeersCompact(swarm, params.numwant) - : self._getPeers(swarm, params.numwant) - - response = { - complete: swarm.complete, - incomplete: swarm.incomplete, - peers: peers, - interval: self._intervalMs } res.end(bencode.encode(response)) - debug('sent response %s', response) - - } else if (params.action === common.ACTIONS.SCRAPE) { // unofficial scrape message - if (typeof params.info_hash === 'string') { - params.info_hash = [ params.info_hash ] - } else if (params.info_hash == null) { - // if info_hash param is omitted, stats for all torrents are returned - // TODO: make this configurable! - params.info_hash = Object.keys(self.torrents) - } - - if (!Array.isArray(params.info_hash)) return error('invalid info_hash') - - response = { - files: {}, - flags: { - min_request_interval: self._intervalMs - } - } - - params.info_hash.some(function (infoHash) { - var swarm = self._getSwarm(infoHash) - - response.files[infoHash] = { - complete: swarm.complete, - incomplete: swarm.incomplete, - downloaded: swarm.complete // TODO: this only provides a lower-bound - } - }) - - res.end(bencode.encode(response)) - debug('sent response %s', response) - - } else { - error('only /announce and /scrape are valid endpoints') - } - - function error (message) { - debug('sent error %s', message) - res.end(bencode.encode({ - 'failure reason': message - })) - - // even though it's an error for the client, it's just a warning for the server. - // don't crash the server because a client sent bad data :) - self.emit('warning', new Error(message)) - } + }) } Server.prototype._onUdpRequest = function (msg, rinfo) { @@ -300,133 +178,173 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { try { params = parseUdpRequest(msg, rinfo) } catch (err) { - console.error(err.stack) - return error(err.message) - } - - var socket = dgram.createSocket('udp4') - - var swarm - if (params && params.action === common.ACTIONS.CONNECT) { - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.CONNECT), - common.toUInt32(params.transactionId), - params.connectionId - ])) - } else if (params && params.action === common.ACTIONS.ANNOUNCE) { - swarm = self._getSwarm(params.info_hash) - var peer = swarm.peers[params.addr] - - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update - } - if (params.left === 0) swarm.complete += 1 - else swarm.incomplete += 1 - peer = swarm.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) - } - - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) swarm.complete -= 1 - else swarm.incomplete -= 1 - swarm.peers[params.addr] = null - self.emit('stop', params.addr) - } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - swarm.complete += 1 - swarm.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) - } - - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) - } - - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case 'update': - update() - break - default: - return error('invalid event') // early return - } - - if (params.left === 0 && peer) peer.complete = true - - // send peers - var peers = self._getPeersCompact(swarm, params.numwant) - - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.ANNOUNCE), - common.toUInt32(params.transactionId), - common.toUInt32(self._intervalMs), - common.toUInt32(swarm.incomplete), - common.toUInt32(swarm.complete), - peers - ])) - - } else if (params && params.action === common.ACTIONS.SCRAPE) { // scrape message - swarm = self._getSwarm(params.info_hash) - - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.SCRAPE), - common.toUInt32(params.transactionId), - common.toUInt32(swarm.complete), - common.toUInt32(swarm.complete), // TODO: this only provides a lower-bound - common.toUInt32(swarm.incomplete) - ])) + self.emit('warning', err) + // Do not reply for parsing errors + return } - function send (buf) { - debug('sent response %s', buf.toString('hex')) + // Do nothing with invalid request + if (!params) return + + // Handle + this._onRequest(params, function (err, response) { + if (err) { + self.emit('warning', new Error(err.message)) + response = { + action: common.ACTIONS.ERRROR, + 'failure reason': err.message + } + } + + var socket = dgram.createSocket('udp4') + response.transactionId = params.transactionId + response.connectionId = params.connectionId + var buf = makeUdpPacket(response) socket.send(buf, 0, buf.length, rinfo.port, rinfo.address, function () { try { socket.close() } catch (err) {} }) + }) +} + +Server.prototype._onRequest = function (params, cb) { + var response + if (params && params.action === common.ACTIONS.CONNECT) { + cb(null, { action: common.ACTIONS.CONNECT }) + } else if (params && params.action === common.ACTIONS.ANNOUNCE) { + this._onAnnounce(params, cb) + } else if (params && params.action === common.ACTIONS.SCRAPE) { + this._onScrape(params, cb) + } else { + cb(new Error('Invalid action')) + } +} + +Server.prototype._onAnnounce = function (params, cb) { + var self = this + + var swarm = self._getSwarm(params.info_hash) + var peer = swarm.peers[params.addr] + + var start = function () { + if (peer) { + debug('unexpected `started` event from peer that is already in swarm') + return update() // treat as an update + } + if (params.left === 0) swarm.complete += 1 + else swarm.incomplete += 1 + peer = swarm.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id + } + self.emit('start', params.addr) } - function error (message) { - debug('sent error %s', message) - send(Buffer.concat([ - common.toUInt32(common.ACTIONS.ERROR), - common.toUInt32(params.transactionId || 0), - new Buffer(message, 'utf8') - ])) - self.emit('warning', new Error(message)) + var stop = function () { + if (!peer) { + debug('unexpected `stopped` event from peer that is not in swarm') + return // do nothing + } + if (peer.complete) swarm.complete -= 1 + else swarm.incomplete -= 1 + swarm.peers[params.addr] = null + self.emit('stop', params.addr) } + + var complete = function () { + if (!peer) { + debug('unexpected `completed` event from peer that is not in swarm') + return start() // treat as a start + } + if (peer.complete) { + debug('unexpected `completed` event from peer that is already marked as completed') + return // do nothing + } + swarm.complete += 1 + swarm.incomplete -= 1 + peer.complete = true + self.emit('complete', params.addr) + } + + var update = function () { + if (!peer) { + debug('unexpected `update` event from peer that is not in swarm') + return start() // treat as a start + } + self.emit('update', params.addr) + } + + switch (params.event) { + case 'started': + start() + break + case 'stopped': + stop() + break + case 'completed': + complete() + break + case '': case undefined: case 'empty': case 'update': // update + update() + break + default: + return cb(new Error('invalid event')) // early return + } + + if (params.left === 0 && peer) peer.complete = true + + // send peers + var peers = params.compact === 1 + ? self._getPeersCompact(swarm, params.numwant) + : self._getPeers(swarm, params.numwant) + + cb(null, { + action: common.ACTIONS.ANNOUNCE, + complete: swarm.complete, + incomplete: swarm.incomplete, + peers: peers, + intervalMs: self._intervalMs + }) +} + +Server.prototype._onScrape = function (params, cb) { + var self = this + + if (typeof params.info_hash === 'string') { + params.info_hash = [ params.info_hash ] + } else if (params.info_hash == null) { + // if info_hash param is omitted, stats for all torrents are returned + // TODO: make this configurable! + params.info_hash = Object.keys(self.torrents) + } + + if (!Array.isArray(params.info_hash)) { + var err = new Error('invalid info_hash') + self.emit('warning', err) + return cb(err) + } + + var response = { + action: common.ACTIONS.SCRAPE, + files: {}, + flags: { + min_request_interval: self._intervalMs + } + } + + params.info_hash.some(function (infoHash) { + var swarm = self._getSwarm(infoHash) + + response.files[infoHash] = { + complete: swarm.complete, + incomplete: swarm.incomplete, + downloaded: swarm.complete // TODO: this only provides a lower-bound + } + }) + + cb(null, response) } Server.prototype._getPeers = function (swarm, numwant) { @@ -553,6 +471,7 @@ function parseUdpRequest (msg, rinfo) { params.numwant = Math.min(params.numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) params.port = msg.readUInt16BE(96) || rinfo.port // optional params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets + params.compact = 1 // udp is always compact } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes @@ -568,6 +487,47 @@ function parseUdpRequest (msg, rinfo) { return params } +function makeUdpPacket (params) { + switch (params.action) { + case common.ACTIONS.CONNECT: + return Buffer.concat([ + common.toUInt32(common.ACTIONS.CONNECT), + common.toUInt32(params.transactionId), + params.connectionId + ]) + case common.ACTIONS.ANNOUNCE: + return Buffer.concat([ + common.toUInt32(common.ACTIONS.ANNOUNCE), + common.toUInt32(params.transactionId), + common.toUInt32(params.intervalMs), + common.toUInt32(params.incomplete), + common.toUInt32(params.complete), + params.peers + ]) + case common.ACTIONS.SCRAPE: + var firstInfoHash = Object.keys(params.files)[0] + var scrapeInfo = firstInfoHash ? { + complete: params.files[firstInfoHash].complete, + incomplete: params.files[firstInfoHash].incomplete, + completed: params.files[firstInfoHash].complete // TODO: this only provides a lower-bound + } : {} + return Buffer.concat([ + common.toUInt32(common.ACTIONS.SCRAPE), + common.toUInt32(params.transactionId), + common.toUInt32(scrapeInfo.complete), + common.toUInt32(scrapeInfo.completed), + common.toUInt32(scrapeInfo.incomplete) + ]) + case common.ACTIONS.ERROR: + return Buffer.concat([ + common.toUInt32(common.ACTIONS.ERROR), + common.toUInt32(params.transactionId || 0), + new Buffer(params.message, 'utf8') + ]) + default: + throw new Error('Action not implemented: ' + params.action) + } +} // HELPER FUNCTIONS