refactor: defer reject uploads after consume

always store incoming uploads to disk first before deciding whether to
reject and delete them depending on configured rejects/limits

doing this is more stable/reliable than attempting to immediately
reject the incoming connections, at least if we want to respond with
actual error messages
This commit is contained in:
Bobby 2022-10-16 11:08:53 +07:00
parent 62af840813
commit 96689278f1
No known key found for this signature in database
GPG Key ID: 941839794CBF5A09
2 changed files with 173 additions and 178 deletions

View File

@ -38,13 +38,31 @@ const fileIdentifierLengthChangeable = !config.uploads.fileIdentifierLength.forc
const maxSize = parseInt(config.uploads.maxSize) const maxSize = parseInt(config.uploads.maxSize)
const maxSizeBytes = maxSize * 1e6 const maxSizeBytes = maxSize * 1e6
// Max files (or URLs for URL uploads) per POST request
const maxFilesPerUpload = 20
// https://github.com/mscdex/busboy/tree/v1.6.0#exports
const busboyOptions = {
// This would otherwise defaults to latin1
defParamCharset: 'utf8',
limits: {
fileSize: maxSizeBytes,
// Maximum number of non-file fields.
// Dropzone.js will add 6 extra fields for chunked uploads.
// We don't use them for anything else.
fields: 6,
// Maximum number of file fields.
// Chunked uploads still need to provide ONLY 1 file field.
// Otherwise, only one of the files will end up being properly stored,
// and that will also be as a chunk.
files: maxFilesPerUpload
}
}
// URL uploads // URL uploads
const urlMaxSize = parseInt(config.uploads.urlMaxSize) const urlMaxSize = parseInt(config.uploads.urlMaxSize)
const urlMaxSizeBytes = urlMaxSize * 1e6 const urlMaxSizeBytes = urlMaxSize * 1e6
// Max files allowed in a single multiform POST request
const maxFilesPerUpload = 20
// URL uploads timeout for fetch() instances // URL uploads timeout for fetch() instances
// Please be aware that uWebSockets.js has a hard-coded timeout of 10s of no activity, // Please be aware that uWebSockets.js has a hard-coded timeout of 10s of no activity,
// so letting fetch() run for more than 10s may cause connection to uploaders to drop early, // so letting fetch() run for more than 10s may cause connection to uploaders to drop early,
@ -296,49 +314,36 @@ self.upload = async (req, res) => {
} }
} }
self.unfreezeChunksData = async (files = [], increase = false) => {
for (const file of files) {
if (!file.chunksData) return
if (increase) file.chunksData.chunks++
file.chunksData.processing = false
}
}
self.cleanUpFiles = async (files = []) => {
// Unlink temp files
await Promise.all(files.map(async file => {
if (file.chunksData) {
return self.cleanUpChunks(file.chunksData.uuid).catch(logger.error)
} else if (file.filename) {
return utils.unlinkFile(file.filename).catch(logger.error)
}
}))
}
self.actuallyUpload = async (req, res, data = {}) => { self.actuallyUpload = async (req, res, data = {}) => {
// Init empty Request.body and Request.files // Init empty Request.body and Request.files
req.body = {} req.body = {}
req.files = [] req.files = []
const unfreezeChunksData = async () => { await req.multipart(busboyOptions, async field => {
req.files.forEach(file => { /*
if (!file.chunksData) return Keep non-files fields in body.
file.chunksData.processing = false Since fields get processed in sequence, depending on the order at which they were defined,
}) chunked uploads data must be set before the "files[]"" field which contain the actual file.
} */
const cleanUpFiles = async () => {
// Unhold identifiers generated via self.getUniqueUploadIdentifier()
self.unholdUploadIdentifiers(res)
// Unlink temp files
return Promise.all(req.files.map(async file => {
if (!file.filename) return
return utils.unlinkFile(file.filename).catch(logger.error)
}))
}
await req.multipart({
// https://github.com/mscdex/busboy/tree/v1.6.0#exports
// This would otherwise defaults to latin1
defParamCharset: 'utf8',
limits: {
fileSize: maxSizeBytes,
// Maximum number of non-file fields.
// Dropzone.js will add 6 extra fields for chunked uploads.
// We don't use them for anything else.
fields: 6,
// Maximum number of file fields.
// Chunked uploads still need to provide ONLY 1 file field.
// Otherwise, only one of the files will end up being properly stored,
// and that will also be as a chunk.
files: maxFilesPerUpload
}
}, async field => {
// Keep non-files fields in Request.body
// Since fields get processed in sequence depending on the order at which they were defined,
// chunked uploads data must be set before the files[] field which contain the actual file
if (field.truncated) { if (field.truncated) {
// Re-map Dropzone chunked uploads keys so people can manually use the API without prepending 'dz' // Re-map Dropzone chunked uploads keys so people can manually use the API without prepending 'dz'
let name = field.name let name = field.name
@ -350,132 +355,115 @@ self.actuallyUpload = async (req, res, data = {}) => {
return return
} }
// Process files immediately and push into Request.files array if (!field.file) return
if (field.file) {
if (field.name !== 'files[]') {
throw new ClientError(`Unexpected file-type field: ${field.name}`)
}
// Push immediately as we will only be adding props into the file object down the line // Push immediately as we will only be adding props into the file object down the line
const file = { const file = {
albumid: data.albumid, field: field.name,
age: data.age, albumid: data.albumid,
originalname: field.file.name || '', age: data.age,
mimetype: field.mime_type || 'application/octet-stream' originalname: field.file.name || '',
} mimetype: field.mime_type || 'application/octet-stream'
req.files.push(file)
const isChunk = typeof req.body.uuid === 'string' && Boolean(req.body.uuid)
if (isChunk) {
if (!chunkedUploads) {
throw new ClientError('Chunked uploads are disabled at the moment.')
} else if (req.files.length > 1) {
throw new ClientError('Chunked uploads may only be uploaded 1 chunk at a time.')
}
}
file.extname = utils.extname(file.originalname)
if (self.isExtensionFiltered(file.extname)) {
throw new ClientError(`${file.extname ? `${file.extname.substr(1).toUpperCase()} files` : 'Files with no extension'} are not permitted.`)
}
if (isChunk) {
// Re-map UUID property to IP-specific UUID
const uuid = `${req.ip}_${req.body.uuid}`
// Calling initChunks() will also reset the chunked uploads' timeout
file.chunksData = await initChunks(uuid)
file.filename = file.chunksData.filename
file.path = file.chunksData.path
} else {
const length = self.parseFileIdentifierLength(req.headers.filelength)
const identifier = await self.getUniqueUploadIdentifier(length, file.extname, res)
file.filename = identifier + file.extname
file.path = path.join(paths.uploads, file.filename)
}
const readStream = field.file.stream
let writeStream
let hashStream
let _reject
// Write the file into disk, and supply required props into file object
file.promise = new Promise((resolve, reject) => {
// Keep reference to Promise's reject function to allow unlistening events from Promise.finally() block
_reject = reject
if (file.chunksData) {
writeStream = file.chunksData.writeStream
hashStream = file.chunksData.hashStream
} else {
writeStream = jetpack.createWriteStream(file.path)
hashStream = enableHashing && blake3.createHash()
}
readStream.once('error', _reject)
// Re-init stream errors listeners for this Request
writeStream.once('error', _reject)
if (hashStream) {
hashStream.once('error', _reject)
// Ensure readStream will only be resumed later down the line by readStream.pipe()
readStream.pause()
readStream.on('data', data => {
// .dispose() will destroy this internal component,
// so use it as an indicator of whether the hashStream has been .dispose()'d
if (hashStream.hash.hash) {
hashStream.update(data)
}
})
}
if (file.chunksData) {
// We listen for readStream's end event
readStream.once('end', () => resolve())
} else {
// We immediately listen for writeStream's finish event
writeStream.once('finish', () => {
file.size = writeStream.bytesWritten
if (hashStream && hashStream.hash.hash) {
const hash = hashStream.digest('hex')
file.hash = file.size === 0 ? '' : hash
}
if (config.filterEmptyFile && file.size === 0) {
return _reject(new ClientError('Empty files are not allowed.'))
} else {
return resolve()
}
})
}
// Pipe readStream to writeStream
// Do not end writeStream when readStream finishes if it's a chunk upload
readStream.pipe(writeStream, { end: !file.chunksData })
}).catch(error => {
// Dispose of unfinished write & hasher streams
if (writeStream && !writeStream.destroyed) {
writeStream.destroy()
}
if (hashStream && hashStream.hash.hash) {
hashStream.dispose()
}
// Re-throw error
throw error
}).finally(() => {
if (!file.chunksData) return
// Unlisten streams' error event for this Request if it's a chunk upload
utils.unlistenEmitters([writeStream, hashStream], 'error', _reject)
})
} }
req.files.push(file)
file.extname = utils.extname(file.originalname)
const isChunk = typeof req.body.uuid === 'string' && Boolean(req.body.uuid)
if (isChunk) {
// Re-map UUID property to IP-specific UUID
const uuid = `${req.ip}_${req.body.uuid}`
// Calling initChunks() will also reset the chunked uploads' timeout
file.chunksData = await initChunks(uuid)
file.filename = file.chunksData.filename
file.path = file.chunksData.path
} else {
const length = self.parseFileIdentifierLength(req.headers.filelength)
const identifier = await self.getUniqueUploadIdentifier(length, file.extname, res)
file.filename = identifier + file.extname
file.path = path.join(paths.uploads, file.filename)
}
const readStream = field.file.stream
let writeStream
let hashStream
let _reject
// Write the file into disk, and supply required props into file object
await new Promise((resolve, reject) => {
// Keep reference to Promise's reject function to allow unlistening events from Promise.finally() block
_reject = reject
if (file.chunksData) {
writeStream = file.chunksData.writeStream
hashStream = file.chunksData.hashStream
} else {
writeStream = jetpack.createWriteStream(file.path)
hashStream = enableHashing && blake3.createHash()
}
readStream.once('error', _reject)
// Re-init stream errors listeners for this Request
writeStream.once('error', _reject)
if (hashStream) {
hashStream.once('error', _reject)
// Ensure readStream will only be resumed later down the line by readStream.pipe()
readStream.pause()
readStream.on('data', data => {
// .dispose() will destroy this internal component,
// so use it as an indicator of whether the hashStream has been .dispose()'d
if (hashStream.hash?.hash) {
hashStream.update(data)
}
})
}
if (file.chunksData) {
// We listen for readStream's end event
readStream.once('end', () => resolve())
} else {
// We immediately listen for writeStream's finish event
writeStream.once('finish', () => {
file.size = writeStream.bytesWritten || 0
if (hashStream?.hash?.hash) {
const hash = hashStream.digest('hex')
file.hash = file.size === 0 ? '' : hash
}
return resolve()
})
}
// Pipe readStream to writeStream
// Do not end writeStream when readStream finishes if it's a chunk upload
readStream.pipe(writeStream, { end: !file.chunksData })
}).catch(error => {
// Dispose of unfinished write & hasher streams
if (writeStream && !writeStream.destroyed) {
writeStream.destroy()
}
if (hashStream?.hash?.hash) {
hashStream.dispose()
}
// Re-throw error
throw error
}).finally(() => {
if (!file.chunksData) return
// Unlisten streams' error event for this Request if it's a chunk upload
utils.unlistenEmitters([writeStream, hashStream], 'error', _reject)
})
}).catch(error => { }).catch(error => {
// Clean up temp files and held identifiers (do not wait) // Clean up temp files and held identifiers (do not wait)
cleanUpFiles() self.cleanUpFiles(req.files)
unfreezeChunksData() self.unfreezeChunksData(req.files)
// Response.multipart() itself may throw string errors // Re-throw error
if (typeof error === 'string') { if (typeof error === 'string') {
throw new Error(error) // Response.multipart() itself may throw string errors
throw new ClientError(error)
} else { } else {
throw error throw error
} }
@ -485,26 +473,33 @@ self.actuallyUpload = async (req, res, data = {}) => {
throw new ClientError('No files.') throw new ClientError('No files.')
} }
// Await all file's Promise // Validate files
await Promise.all(req.files.map(file => file.promise)).catch(error => { try {
for (const file of req.files) {
if (file.field !== 'files[]') {
throw new ClientError(`Unexpected file-type field: ${file.field}`)
}
if (self.isExtensionFiltered(file.extname)) {
throw new ClientError(`${file.extname ? `${file.extname.substr(1).toUpperCase()} files` : 'Files with no extension'} are not permitted.`)
}
if (config.filterEmptyFile && file.size === 0) {
throw new ClientError('Empty files are not allowed.')
}
}
} catch (error) {
// Clean up temp files and held identifiers (do not wait) // Clean up temp files and held identifiers (do not wait)
cleanUpFiles() self.cleanUpFiles(req.files)
unfreezeChunksData() self.unfreezeChunksData(req.files)
// Re-throw error // Re-throw error
throw error throw error
}) }
// If chunked uploads is enabled and the uploaded file is a chunk, then just say that it was a success // If chunked uploads is enabled and the uploaded file is a chunk, then just say that it was a success
// NOTE: We loop through Request.files for clarity,
// but we will actually have already rejected the Request
// if it has more than 1 file while being a chunk upload
if (req.files.some(file => file.chunksData)) { if (req.files.some(file => file.chunksData)) {
req.files.forEach(file => { self.unfreezeChunksData(req.files, true)
file.chunksData.chunks++
// Mark as ready to accept more chunk uploads or to finalize
file.chunksData.processing = false
})
return res.json({ success: true }) return res.json({ success: true })
} }
@ -717,7 +712,7 @@ self.actuallyUploadUrls = async (req, res, data = {}) => {
if (writeStream && !writeStream.destroyed) { if (writeStream && !writeStream.destroyed) {
writeStream.destroy() writeStream.destroy()
} }
if (hashStream && hashStream.hash.hash) { if (hashStream?.hash?.hash) {
hashStream.dispose() hashStream.dispose()
} }
@ -883,7 +878,7 @@ self.cleanUpChunks = async uuid => {
if (chunksData[uuid].writeStream && !chunksData[uuid].writeStream.destroyed) { if (chunksData[uuid].writeStream && !chunksData[uuid].writeStream.destroyed) {
chunksData[uuid].writeStream.destroy() chunksData[uuid].writeStream.destroy()
} }
if (chunksData[uuid].hashStream && chunksData[uuid].hashStream.hash.hash) { if (chunksData[uuid].hashStream?.hash?.hash) {
chunksData[uuid].hashStream.dispose() chunksData[uuid].hashStream.dispose()
} }

View File

@ -556,7 +556,7 @@ self.stripTags = async (name, extname) => {
return jetpack.inspectAsync(fullPath) return jetpack.inspectAsync(fullPath)
} }
self.unlinkFile = async (filename, predb) => { self.unlinkFile = async filename => {
await jetpack.removeAsync(path.join(paths.uploads, filename)) await jetpack.removeAsync(path.join(paths.uploads, filename))
const identifier = filename.split('.')[0] const identifier = filename.split('.')[0]
@ -607,7 +607,7 @@ self.bulkDeleteFromDb = async (field, values, user) => {
await Promise.all(files.map(async file => { await Promise.all(files.map(async file => {
try { try {
await self.unlinkFile(file.name, true) await self.unlinkFile(file.name)
unlinked.push(file) unlinked.push(file)
} catch (error) { } catch (error) {
logger.error(error) logger.error(error)