fix: failsafes for chunks timeout

This commit is contained in:
Bobby 2022-05-08 12:01:18 +07:00
parent f3b7d5e56d
commit eeb1611b2a
No known key found for this signature in database
GPG Key ID: 941839794CBF5A09

View File

@ -62,13 +62,7 @@ class ChunksData {
}
onTimeout () {
if (this.stream && !this.stream.writableEnded) {
this.stream.end()
}
if (this.hasher) {
this.hasher.dispose()
}
self.cleanUpChunks(this.uuid, true)
self.cleanUpChunks(this.uuid)
}
setTimeout (delay) {
@ -140,6 +134,7 @@ const executeMulter = multer({
file._isChunk = chunkedUploads && req.body.uuid !== undefined && req.body.chunkindex !== undefined
if (file._isChunk) {
// Calling this will also reset its timeout
initChunks(req.body.uuid)
.then(chunksData => {
file._chunksData = chunksData
@ -516,23 +511,28 @@ self.finishChunks = async (req, res, next) => {
}
self.actuallyFinishChunks = async (req, res, user) => {
const check = file => typeof file.uuid !== 'string' ||
!chunksData[file.uuid] ||
chunksData[file.uuid].chunks < 2
const files = req.body.files
if (!Array.isArray(files) || !files.length || files.some(check)) {
if (!Array.isArray(files) || !files.length) {
throw new ClientError('Bad request.')
}
const infoMap = []
try {
await Promise.all(files.map(async file => {
// Close stream
chunksData[file.uuid].stream.end()
if (!file.uuid || typeof chunksData[file.uuid] === 'undefined') {
throw new ClientError('Invalid file UUID, or chunks data had already timed out. Try again?')
}
if (chunksData[file.uuid].chunks > maxChunksCount) {
throw new ClientError('Too many chunks.')
// Suspend timeout
// If the chunk errors out there, it will be immediately cleaned up anyway
chunksData[file.uuid].clearTimeout()
// Conclude write and hasher streams
chunksData[file.uuid].stream.end()
const hash = chunksData[file.uuid].hasher.digest('hex')
if (chunksData[file.uuid].chunks < 2 || chunksData[file.uuid].chunks > maxChunksCount) {
throw new ClientError('Invalid chunks count.')
}
file.extname = typeof file.original === 'string' ? utils.extname(file.original) : ''
@ -568,7 +568,6 @@ self.actuallyFinishChunks = async (req, res, user) => {
} else {
await paths.rename(tmpfile, destination)
}
const hash = chunksData[file.uuid].hasher.digest('hex')
// Continue even when encountering errors
await self.cleanUpChunks(file.uuid).catch(logger.error)
@ -600,16 +599,11 @@ self.actuallyFinishChunks = async (req, res, user) => {
const result = await self.storeFilesToDb(req, res, user, infoMap)
await self.sendUploadResponse(req, res, user, result)
} catch (error) {
// Dispose unfinished hasher and clean up leftover chunks
// Should continue even when encountering errors
files.forEach(file => {
if (chunksData[file.uuid] === undefined) return
try {
if (chunksData[file.uuid].hasher) {
chunksData[file.uuid].hasher.dispose()
}
} catch (_) {}
self.cleanUpChunks(file.uuid).catch(logger.error)
if (file.uuid && chunksData[file.uuid]) {
self.cleanUpChunks(file.uuid).catch(logger.error)
}
})
// Re-throw error
@ -617,7 +611,17 @@ self.actuallyFinishChunks = async (req, res, user) => {
}
}
self.cleanUpChunks = async (uuid, onTimeout) => {
self.cleanUpChunks = async uuid => {
// Dispose unfinished write & hasher streams
if (chunksData[uuid].stream && !chunksData[uuid].stream.destroyed) {
chunksData[uuid].stream.destroy()
}
try {
if (chunksData[uuid].hasher) {
chunksData[uuid].hasher.dispose()
}
} catch (_) {}
// Remove tmp file
await paths.unlink(path.join(chunksData[uuid].root, chunksData[uuid].filename))
.catch(error => {
@ -629,7 +633,6 @@ self.cleanUpChunks = async (uuid, onTimeout) => {
await paths.rmdir(chunksData[uuid].root)
// Delete cached chunks data
if (!onTimeout) chunksData[uuid].clearTimeout()
delete chunksData[uuid]
}