fix: improve multiple uploads Promise handling

This commit is contained in:
Bobby 2022-09-28 11:03:29 +07:00
parent 96ad1b7152
commit 7e28c26d41
No known key found for this signature in database
GPG Key ID: 941839794CBF5A09

View File

@ -407,18 +407,10 @@ self.actuallyUpload = async (req, res, data = {}) => {
let _reject let _reject
// Write the file into disk, and supply required props into file object // Write the file into disk, and supply required props into file object
await new Promise((resolve, reject) => { file.promise = new Promise((resolve, reject) => {
// Keep reference to Promise's reject function to allow unlistening events // Keep reference to Promise's reject function to allow unlistening events from Promise.finally() block
_reject = reject _reject = reject
// Ensure this Promise's status can be asserted later
const _resolve = () => {
file.promised = true
return resolve()
}
readStream.once('error', _reject)
if (file.chunksData) { if (file.chunksData) {
writeStream = file.chunksData.writeStream writeStream = file.chunksData.writeStream
hashStream = file.chunksData.hashStream hashStream = file.chunksData.hashStream
@ -427,6 +419,8 @@ self.actuallyUpload = async (req, res, data = {}) => {
hashStream = enableHashing && blake3.createHash() hashStream = enableHashing && blake3.createHash()
} }
readStream.once('error', _reject)
// Re-init stream errors listeners for this Request // Re-init stream errors listeners for this Request
writeStream.once('error', _reject) writeStream.once('error', _reject)
@ -445,25 +439,26 @@ self.actuallyUpload = async (req, res, data = {}) => {
if (file.chunksData) { if (file.chunksData) {
// We listen for readStream's end event // We listen for readStream's end event
readStream.once('end', () => _resolve()) readStream.once('end', () => resolve())
} else { } else {
// We immediately listen for writeStream's finish event // We immediately listen for writeStream's finish event
writeStream.once('finish', () => { writeStream.once('finish', () => {
file.size = writeStream.bytesWritten file.size = writeStream.bytesWritten
if (hashStream && hashStream.hash.hash) { if (hashStream && hashStream.hash.hash) {
const hash = hashStream.digest('hex') const hash = hashStream.digest('hex')
file.hash = file.size === 0 file.hash = file.size === 0 ? '' : hash
? '' }
: hash if (config.filterEmptyFile && file.size === 0) {
return _reject(new ClientError('Empty files are not allowed.'))
} else {
return resolve()
} }
return _resolve()
}) })
} }
// Pipe readStream to writeStream // Pipe readStream to writeStream
// Do not end writeStream when readStream finishes if it's a chunk upload // Do not end writeStream when readStream finishes if it's a chunk upload
readStream readStream.pipe(writeStream, { end: !file.chunksData })
.pipe(writeStream, { end: !file.chunksData })
}).catch(error => { }).catch(error => {
// Dispose of unfinished write & hasher streams // Dispose of unfinished write & hasher streams
if (writeStream && !writeStream.destroyed) { if (writeStream && !writeStream.destroyed) {
@ -480,11 +475,6 @@ self.actuallyUpload = async (req, res, data = {}) => {
// Unlisten streams' error event for this Request if it's a chunk upload // Unlisten streams' error event for this Request if it's a chunk upload
utils.unlistenEmitters([writeStream, hashStream], 'error', _reject) utils.unlistenEmitters([writeStream, hashStream], 'error', _reject)
}) })
// file.size is not populated if a chunk upload, so ignore
if (config.filterEmptyFile && !file.chunksData && file.size === 0) {
throw new ClientError('Empty files are not allowed.')
}
} }
}).catch(error => { }).catch(error => {
// Clean up temp files and held identifiers (do not wait) // Clean up temp files and held identifiers (do not wait)
@ -493,7 +483,7 @@ self.actuallyUpload = async (req, res, data = {}) => {
// Response.multipart() itself may throw string errors // Response.multipart() itself may throw string errors
if (typeof error === 'string') { if (typeof error === 'string') {
throw new ClientError(error) throw new Error(error)
} else { } else {
throw error throw error
} }
@ -503,15 +493,15 @@ self.actuallyUpload = async (req, res, data = {}) => {
throw new ClientError('No files.') throw new ClientError('No files.')
} }
// If for some reason Request.multipart() resolves before a file's Promise // Await all file's Promise
// Typically caused by something hanging up longer than await Promise.all(req.files.map(file => file.promise)).catch(error => {
// uWebSockets.js' internal security timeout (10 seconds)
if (req.files.some(file => !file.promised)) {
// Clean up temp files and held identifiers (do not wait) // Clean up temp files and held identifiers (do not wait)
cleanUpFiles() cleanUpFiles()
unfreezeChunksData() unfreezeChunksData()
throw new Error('Request.multipart() resolved before a file\'s Promise')
} // Re-throw error
throw error
})
// If chunked uploads is enabled and the uploaded file is a chunk, then just say that it was a success // If chunked uploads is enabled and the uploaded file is a chunk, then just say that it was a success
// NOTE: We loop through Request.files for clarity, // NOTE: We loop through Request.files for clarity,