From b4c8b1d90e38bc517234a1aa58e7805b03e962ad Mon Sep 17 00:00:00 2001 From: Bobby Wibowo Date: Mon, 15 Jun 2020 23:14:33 +0700 Subject: [PATCH] =?UTF-8?q?BLAZING=20FAST=20CHUNKED=20UPLOADS=20?= =?UTF-8?q?=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inspired by our recent switch to using blake3 for file hashing, chunks will now be written to a tmp file directly as they're uploaded. So no more waiting so long for "rebuilding chunks". There will still be some delay on every following attempts of uploading each chunks. I'm not sure the specifics, as we're already reusing the write stream. --- controllers/multerStorageController.js | 67 +++++++++----- controllers/uploadController.js | 115 +++++++++++-------------- 2 files changed, 92 insertions(+), 90 deletions(-) diff --git a/controllers/multerStorageController.js b/controllers/multerStorageController.js index 13066ed..af7961d 100644 --- a/controllers/multerStorageController.js +++ b/controllers/multerStorageController.js @@ -37,32 +37,53 @@ DiskStorage.prototype._handleFile = function _handleFile (req, file, cb) { if (err) return cb(err) const finalPath = path.join(destination, filename) - const outStream = fs.createWriteStream(finalPath) - - let hash = null - if (!file._ischunk) { - hash = blake3.createHash() - const onerror = function (err) { - hash.dispose() - cb(err) - } - outStream.on('error', onerror) - file.stream.on('error', onerror) - file.stream.on('data', d => hash.update(d)) - } else { - outStream.on('error', cb) + const onerror = err => { + hash.dispose() + cb(err) } - file.stream.pipe(outStream) - outStream.on('finish', function () { - cb(null, { - destination, - filename, - path: finalPath, - size: outStream.bytesWritten, - hash: hash && hash.digest('hex') + let outStream + let hash + if (file._isChunk) { + if (!file._chunksData.stream) { + file._chunksData.stream = fs.createWriteStream(finalPath, { flags: 'a' }) + file._chunksData.stream.on('error', onerror) + } + if (!file._chunksData.hasher) + file._chunksData.hasher = blake3.createHash() + + outStream = file._chunksData.stream + hash = file._chunksData.hasher + } else { + outStream = fs.createWriteStream(finalPath) + outStream.on('error', onerror) + hash = blake3.createHash() + } + + file.stream.on('error', onerror) + file.stream.on('data', d => hash.update(d)) + + if (file._isChunk) { + file.stream.on('end', () => { + cb(null, { + destination, + filename, + path: finalPath + }) }) - }) + file.stream.pipe(outStream, { end: false }) + } else { + outStream.on('finish', () => { + cb(null, { + destination, + filename, + path: finalPath, + size: outStream.bytesWritten, + hash: hash.digest('hex') + }) + }) + file.stream.pipe(outStream) + } }) }) } diff --git a/controllers/uploadController.js b/controllers/uploadController.js index 71678b5..7aff0db 100644 --- a/controllers/uploadController.js +++ b/controllers/uploadController.js @@ -51,9 +51,15 @@ const initChunks = async uuid => { throw err await paths.mkdir(root) } - chunksData[uuid] = { root, chunks: [], size: 0 } + chunksData[uuid] = { + root, + filename: 'tmp', + chunks: 0, + stream: null, + hasher: null + } } - return chunksData[uuid].root + return chunksData[uuid] } const executeMulter = multer({ @@ -90,11 +96,14 @@ const executeMulter = multer({ storage: multerStorage({ destination (req, file, cb) { // Is file a chunk!? - file._ischunk = chunkedUploads && req.body.uuid !== undefined && req.body.chunkindex !== undefined + file._isChunk = chunkedUploads && req.body.uuid !== undefined && req.body.chunkindex !== undefined - if (file._ischunk) + if (file._isChunk) initChunks(req.body.uuid) - .then(uuidDir => cb(null, uuidDir)) + .then(chunksData => { + file._chunksData = chunksData + cb(null, chunksData.root) + }) .catch(error => { logger.error(error) return cb('Could not process the chunked upload. Try again?') @@ -104,12 +113,8 @@ const executeMulter = multer({ }, filename (req, file, cb) { - if (file._ischunk) { - // index.extension (i.e. 0, 1, ..., n - will prepend zeros depending on the amount of chunks) - const digits = req.body.totalchunkcount !== undefined ? `${req.body.totalchunkcount - 1}`.length : 1 - const zeros = new Array(digits + 1).join('0') - const name = (zeros + req.body.chunkindex).slice(-digits) - return cb(null, name) + if (file._isChunk) { + return cb(null, chunksData[req.body.uuid].filename) } else { const length = self.parseFileIdentifierLength(req.headers.filelength) return self.getUniqueRandomName(length, file.extname) @@ -258,8 +263,7 @@ self.actuallyUploadFiles = async (req, res, user, albumid, age) => { const uuid = req.body.uuid if (chunkedUploads && chunksData[uuid] !== undefined) { req.files.forEach(file => { - chunksData[uuid].chunks.push(file.filename) - chunksData[uuid].size += file.size + chunksData[uuid].chunks++ }) return res.json({ success: true }) } @@ -440,7 +444,7 @@ self.finishChunks = async (req, res, next) => { self.actuallyFinishChunks = async (req, res, user) => { const check = file => typeof file.uuid !== 'string' || !chunksData[file.uuid] || - chunksData[file.uuid].chunks.length < 2 + chunksData[file.uuid].chunks < 2 const files = req.body.files if (!Array.isArray(files) || !files.length || files.some(check)) @@ -449,7 +453,10 @@ self.actuallyFinishChunks = async (req, res, user) => { const infoMap = [] try { await Promise.all(files.map(async file => { - if (chunksData[file.uuid].chunks.length > maxChunksCount) + // Close stream + chunksData[file.uuid].stream.end() + + if (chunksData[file.uuid].chunks > maxChunksCount) throw 'Too many chunks.' file.extname = typeof file.original === 'string' ? utils.extname(file.original) : '' @@ -462,28 +469,30 @@ self.actuallyFinishChunks = async (req, res, user) => { throw 'Permanent uploads are not permitted.' } - file.size = chunksData[file.uuid].size + file.size = chunksData[file.uuid].stream.bytesWritten if (config.filterEmptyFile && file.size === 0) throw 'Empty files are not allowed.' else if (file.size > maxSizeBytes) throw `File too large. Chunks are bigger than ${maxSize} MB.` + // Double-check file size + const tmpfile = path.join(chunksData[file.uuid].root, chunksData[file.uuid].filename) + const lstat = await paths.lstat(tmpfile) + if (lstat.size !== file.size) + throw `File size mismatched (${lstat.size} vs. ${file.size}).` + // Generate name const length = self.parseFileIdentifierLength(file.filelength) const name = await self.getUniqueRandomName(length, file.extname) - // Combine chunks + // Move tmp file to final destination const destination = path.join(paths.uploads, name) - const hash = await self.combineChunks(destination, file.uuid) + await paths.rename(tmpfile, destination) + const hash = chunksData[file.uuid].hasher.digest('hex') // Continue even when encountering errors await self.cleanUpChunks(file.uuid).catch(logger.error) - // Double-check file size - const lstat = await paths.lstat(destination) - if (lstat.size !== file.size) - throw 'Chunks size mismatched.' - let albumid = parseInt(file.albumid) if (isNaN(albumid)) albumid = null @@ -512,11 +521,17 @@ self.actuallyFinishChunks = async (req, res, user) => { const result = await self.storeFilesToDb(req, res, user, infoMap) await self.sendUploadResponse(req, res, result) } catch (error) { - // Clean up leftover chunks + // Dispose unfinished hasher and clean up leftover chunks // Should continue even when encountering errors await Promise.all(files.map(file => { - if (chunksData[file.uuid] !== undefined) - return self.cleanUpChunks(file.uuid).catch(logger.error) + // eslint-disable-next-line curly + if (chunksData[file.uuid] !== undefined) { + try { + if (chunksData[file.uuid].hasher) + chunksData[file.uuid].hasher.dispose() + } catch (error) {} + self.cleanUpChunks(file.uuid).catch(logger.error) + } })) // Re-throw error @@ -524,50 +539,16 @@ self.actuallyFinishChunks = async (req, res, user) => { } } -self.combineChunks = async (destination, uuid) => { - let errorObj - const outStream = fs.createWriteStream(destination, { flags: 'a' }) - const hash = blake3.createHash() - - outStream.on('error', error => { - hash.dispose() - errorObj = error - }) - - try { - chunksData[uuid].chunks.sort() - for (const chunk of chunksData[uuid].chunks) - await new Promise((resolve, reject) => { - const stream = fs.createReadStream(path.join(chunksData[uuid].root, chunk)) - stream.pipe(outStream, { end: false }) - - stream.on('data', d => hash.update(d)) - stream.on('error', reject) - stream.on('end', () => resolve()) - }) - } catch (error) { - hash.dispose() - errorObj = error - } - - // Close stream - outStream.end() - - // Re-throw error - if (errorObj) throw errorObj - - // Return hash - return hash.digest('hex') -} - self.cleanUpChunks = async (uuid) => { - // Unlink chunks - await Promise.all(chunksData[uuid].chunks.map(chunk => - paths.unlink(path.join(chunksData[uuid].root, chunk)) - )) + // Remove tmp file + await paths.unlink(path.join(chunksData[uuid].root, chunksData[uuid].filename)) + .catch(error => { + if (error.code !== 'ENOENT') + logger.error(error) + }) // Remove UUID dir await paths.rmdir(chunksData[uuid].root) - // Delete cached date + // Delete cached chunks data delete chunksData[uuid] }