BLAZING FAST CHUNKED UPLOADS 🚀

Inspired by our recent switch to using blake3 for file hashing, chunks
will now be written to a tmp file directly as they're uploaded.
So no more waiting so long for "rebuilding chunks".
There will still be some delay on every following attempts of uploading
each chunks. I'm not sure the specifics, as we're already reusing the
write stream.
This commit is contained in:
Bobby Wibowo 2020-06-15 23:14:33 +07:00
parent 14b97ecbf1
commit b4c8b1d90e
No known key found for this signature in database
GPG Key ID: 51C3A1E1E22D26CF
2 changed files with 92 additions and 90 deletions

View File

@ -37,32 +37,53 @@ DiskStorage.prototype._handleFile = function _handleFile (req, file, cb) {
if (err) return cb(err)
const finalPath = path.join(destination, filename)
const outStream = fs.createWriteStream(finalPath)
let hash = null
if (!file._ischunk) {
hash = blake3.createHash()
const onerror = function (err) {
const onerror = err => {
hash.dispose()
cb(err)
}
outStream.on('error', onerror)
file.stream.on('error', onerror)
file.stream.on('data', d => hash.update(d))
let outStream
let hash
if (file._isChunk) {
if (!file._chunksData.stream) {
file._chunksData.stream = fs.createWriteStream(finalPath, { flags: 'a' })
file._chunksData.stream.on('error', onerror)
}
if (!file._chunksData.hasher)
file._chunksData.hasher = blake3.createHash()
outStream = file._chunksData.stream
hash = file._chunksData.hasher
} else {
outStream.on('error', cb)
outStream = fs.createWriteStream(finalPath)
outStream.on('error', onerror)
hash = blake3.createHash()
}
file.stream.pipe(outStream)
outStream.on('finish', function () {
file.stream.on('error', onerror)
file.stream.on('data', d => hash.update(d))
if (file._isChunk) {
file.stream.on('end', () => {
cb(null, {
destination,
filename,
path: finalPath
})
})
file.stream.pipe(outStream, { end: false })
} else {
outStream.on('finish', () => {
cb(null, {
destination,
filename,
path: finalPath,
size: outStream.bytesWritten,
hash: hash && hash.digest('hex')
hash: hash.digest('hex')
})
})
file.stream.pipe(outStream)
}
})
})
}

View File

@ -51,9 +51,15 @@ const initChunks = async uuid => {
throw err
await paths.mkdir(root)
}
chunksData[uuid] = { root, chunks: [], size: 0 }
chunksData[uuid] = {
root,
filename: 'tmp',
chunks: 0,
stream: null,
hasher: null
}
return chunksData[uuid].root
}
return chunksData[uuid]
}
const executeMulter = multer({
@ -90,11 +96,14 @@ const executeMulter = multer({
storage: multerStorage({
destination (req, file, cb) {
// Is file a chunk!?
file._ischunk = chunkedUploads && req.body.uuid !== undefined && req.body.chunkindex !== undefined
file._isChunk = chunkedUploads && req.body.uuid !== undefined && req.body.chunkindex !== undefined
if (file._ischunk)
if (file._isChunk)
initChunks(req.body.uuid)
.then(uuidDir => cb(null, uuidDir))
.then(chunksData => {
file._chunksData = chunksData
cb(null, chunksData.root)
})
.catch(error => {
logger.error(error)
return cb('Could not process the chunked upload. Try again?')
@ -104,12 +113,8 @@ const executeMulter = multer({
},
filename (req, file, cb) {
if (file._ischunk) {
// index.extension (i.e. 0, 1, ..., n - will prepend zeros depending on the amount of chunks)
const digits = req.body.totalchunkcount !== undefined ? `${req.body.totalchunkcount - 1}`.length : 1
const zeros = new Array(digits + 1).join('0')
const name = (zeros + req.body.chunkindex).slice(-digits)
return cb(null, name)
if (file._isChunk) {
return cb(null, chunksData[req.body.uuid].filename)
} else {
const length = self.parseFileIdentifierLength(req.headers.filelength)
return self.getUniqueRandomName(length, file.extname)
@ -258,8 +263,7 @@ self.actuallyUploadFiles = async (req, res, user, albumid, age) => {
const uuid = req.body.uuid
if (chunkedUploads && chunksData[uuid] !== undefined) {
req.files.forEach(file => {
chunksData[uuid].chunks.push(file.filename)
chunksData[uuid].size += file.size
chunksData[uuid].chunks++
})
return res.json({ success: true })
}
@ -440,7 +444,7 @@ self.finishChunks = async (req, res, next) => {
self.actuallyFinishChunks = async (req, res, user) => {
const check = file => typeof file.uuid !== 'string' ||
!chunksData[file.uuid] ||
chunksData[file.uuid].chunks.length < 2
chunksData[file.uuid].chunks < 2
const files = req.body.files
if (!Array.isArray(files) || !files.length || files.some(check))
@ -449,7 +453,10 @@ self.actuallyFinishChunks = async (req, res, user) => {
const infoMap = []
try {
await Promise.all(files.map(async file => {
if (chunksData[file.uuid].chunks.length > maxChunksCount)
// Close stream
chunksData[file.uuid].stream.end()
if (chunksData[file.uuid].chunks > maxChunksCount)
throw 'Too many chunks.'
file.extname = typeof file.original === 'string' ? utils.extname(file.original) : ''
@ -462,28 +469,30 @@ self.actuallyFinishChunks = async (req, res, user) => {
throw 'Permanent uploads are not permitted.'
}
file.size = chunksData[file.uuid].size
file.size = chunksData[file.uuid].stream.bytesWritten
if (config.filterEmptyFile && file.size === 0)
throw 'Empty files are not allowed.'
else if (file.size > maxSizeBytes)
throw `File too large. Chunks are bigger than ${maxSize} MB.`
// Double-check file size
const tmpfile = path.join(chunksData[file.uuid].root, chunksData[file.uuid].filename)
const lstat = await paths.lstat(tmpfile)
if (lstat.size !== file.size)
throw `File size mismatched (${lstat.size} vs. ${file.size}).`
// Generate name
const length = self.parseFileIdentifierLength(file.filelength)
const name = await self.getUniqueRandomName(length, file.extname)
// Combine chunks
// Move tmp file to final destination
const destination = path.join(paths.uploads, name)
const hash = await self.combineChunks(destination, file.uuid)
await paths.rename(tmpfile, destination)
const hash = chunksData[file.uuid].hasher.digest('hex')
// Continue even when encountering errors
await self.cleanUpChunks(file.uuid).catch(logger.error)
// Double-check file size
const lstat = await paths.lstat(destination)
if (lstat.size !== file.size)
throw 'Chunks size mismatched.'
let albumid = parseInt(file.albumid)
if (isNaN(albumid))
albumid = null
@ -512,11 +521,17 @@ self.actuallyFinishChunks = async (req, res, user) => {
const result = await self.storeFilesToDb(req, res, user, infoMap)
await self.sendUploadResponse(req, res, result)
} catch (error) {
// Clean up leftover chunks
// Dispose unfinished hasher and clean up leftover chunks
// Should continue even when encountering errors
await Promise.all(files.map(file => {
if (chunksData[file.uuid] !== undefined)
return self.cleanUpChunks(file.uuid).catch(logger.error)
// eslint-disable-next-line curly
if (chunksData[file.uuid] !== undefined) {
try {
if (chunksData[file.uuid].hasher)
chunksData[file.uuid].hasher.dispose()
} catch (error) {}
self.cleanUpChunks(file.uuid).catch(logger.error)
}
}))
// Re-throw error
@ -524,50 +539,16 @@ self.actuallyFinishChunks = async (req, res, user) => {
}
}
self.combineChunks = async (destination, uuid) => {
let errorObj
const outStream = fs.createWriteStream(destination, { flags: 'a' })
const hash = blake3.createHash()
outStream.on('error', error => {
hash.dispose()
errorObj = error
})
try {
chunksData[uuid].chunks.sort()
for (const chunk of chunksData[uuid].chunks)
await new Promise((resolve, reject) => {
const stream = fs.createReadStream(path.join(chunksData[uuid].root, chunk))
stream.pipe(outStream, { end: false })
stream.on('data', d => hash.update(d))
stream.on('error', reject)
stream.on('end', () => resolve())
})
} catch (error) {
hash.dispose()
errorObj = error
}
// Close stream
outStream.end()
// Re-throw error
if (errorObj) throw errorObj
// Return hash
return hash.digest('hex')
}
self.cleanUpChunks = async (uuid) => {
// Unlink chunks
await Promise.all(chunksData[uuid].chunks.map(chunk =>
paths.unlink(path.join(chunksData[uuid].root, chunk))
))
// Remove tmp file
await paths.unlink(path.join(chunksData[uuid].root, chunksData[uuid].filename))
.catch(error => {
if (error.code !== 'ENOENT')
logger.error(error)
})
// Remove UUID dir
await paths.rmdir(chunksData[uuid].root)
// Delete cached date
// Delete cached chunks data
delete chunksData[uuid]
}