feat: improved chunked uploads lifecycle

added checks when there's an attempt to uploads chunks to same file in
parallel

improved final file size checks
This commit is contained in:
Bobby Wibowo 2022-07-25 07:09:28 +07:00
parent ee8f1914ca
commit 6ba30a23c6
No known key found for this signature in database
GPG Key ID: 51C3A1E1E22D26CF
2 changed files with 105 additions and 58 deletions

View File

@ -49,13 +49,16 @@ const urlExtensionsFilter = Array.isArray(config.uploads.urlExtensionsFilter) &&
/** Chunks helper class & function **/ /** Chunks helper class & function **/
class ChunksData { class ChunksData {
constructor (uuid, root) { constructor (uuid) {
this.uuid = uuid this.uuid = uuid
this.root = root this.root = path.join(paths.chunks, this.uuid)
this.filename = 'tmp' this.filename = 'tmp'
this.path = path.join(this.root, this.filename)
this.chunks = 0 this.chunks = 0
this.writeStream = null this.writeStream = null
this.hashStream = null this.hashStream = null
// Immediately mark this chunked upload as currently processing
this.processing = true
} }
onTimeout () { onTimeout () {
@ -76,16 +79,27 @@ class ChunksData {
const initChunks = async uuid => { const initChunks = async uuid => {
if (chunksData[uuid] === undefined) { if (chunksData[uuid] === undefined) {
const root = path.join(paths.chunks, uuid) chunksData[uuid] = new ChunksData(uuid)
try {
await paths.access(root) const exist = await paths.access(chunksData[uuid].root)
} catch (err) { .catch(err => {
// Re-throw error // Re-throw error only if not directory is missing error
if (err && err.code !== 'ENOENT') throw err if (err.code !== 'ENOENT') throw err
await paths.mkdir(root) return false
})
if (!exist) {
await paths.mkdir(chunksData[uuid].root)
} }
chunksData[uuid] = new ChunksData(uuid, root)
// Init write & hasher streams
chunksData[uuid].writeStream = fs.createWriteStream(chunksData[uuid].path, { flags: 'a' })
chunksData[uuid].hashStream = blake3.createHash()
} else if (chunksData[uuid].processing) {
// Wait for the first spawned init tasks
throw new ClientError('Previous chunk upload is still being processed. Parallel chunked uploads is not supported.')
} }
// Reset timeout
chunksData[uuid].setTimeout(chunkedUploadsTimeout) chunksData[uuid].setTimeout(chunkedUploadsTimeout)
return chunksData[uuid] return chunksData[uuid]
} }
@ -302,31 +316,25 @@ self.actuallyUpload = async (req, res, user, data = {}) => {
throw new ClientError(`${file.extname ? `${file.extname.substr(1).toUpperCase()} files` : 'Files with no extension'} are not permitted.`) throw new ClientError(`${file.extname ? `${file.extname.substr(1).toUpperCase()} files` : 'Files with no extension'} are not permitted.`)
} }
let destination
if (file.isChunk) { if (file.isChunk) {
// Calling this will also reset this chunked uploads' timeout // Calling initChunks() will also reset the chunked uploads' timeout
file.chunksData = await initChunks(req.body.uuid) file.chunksData = await initChunks(req.body.uuid)
file.filename = file.chunksData.filename file.filename = file.chunksData.filename
destination = file.chunksData.root file.path = file.chunksData.path
} else { } else {
const length = self.parseFileIdentifierLength(req.headers.filelength) const length = self.parseFileIdentifierLength(req.headers.filelength)
file.filename = await self.getUniqueRandomName(length, file.extname) file.filename = await self.getUniqueRandomName(length, file.extname)
destination = paths.uploads file.path = path.join(paths.uploads, file.filename)
} }
file.path = path.join(destination, file.filename)
// Write the file into disk, and supply required props into file object // Write the file into disk, and supply required props into file object
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
// "weighted" resolve function, to be able to "await" multiple callbacks // Helper function to remove event listeners from multiple emitters
const REQUIRED_WEIGHT = 2 const _unlisten = (emitters = [], event, listener) => {
let _weight = 0 for (const emitter of emitters) {
const _resolve = (props = {}, weight = 2) => { if (emitter !== undefined) {
if (file.error) return emitter.off(event, listener)
Object.assign(file, props) }
_weight += weight
if (_weight >= REQUIRED_WEIGHT) {
return resolve()
} }
} }
@ -336,30 +344,43 @@ self.actuallyUpload = async (req, res, user, data = {}) => {
let scanStream let scanStream
const _reject = error => { const _reject = error => {
// If this had already been rejected once
if (file.error) return
_unlisten([writeStream, hashStream, scanStream], 'error', _reject)
file.error = true file.error = true
if (writeStream && !writeStream.closed) {
writeStream.end() if (writeStream && !writeStream.destroyed) {
writeStream.destroy()
} }
if (hashStream.hash.hash) { if (hashStream && hashStream.hash.hash) {
hashStream.dispose() hashStream.dispose()
} }
return reject(error)
reject(error)
}
// "weighted" resolve function, to be able to "await" multiple callbacks
const REQUIRED_WEIGHT = 2
let _weight = 0
const _resolve = (props = {}, weight = 2) => {
// If this had already been rejected once
if (file.error) return
Object.assign(file, props)
_weight += weight
if (_weight >= REQUIRED_WEIGHT) {
_unlisten([writeStream, hashStream, scanStream], 'error', _reject)
resolve()
}
} }
if (file.isChunk) { if (file.isChunk) {
if (!file.chunksData.writeStream) {
file.chunksData.writeStream = fs.createWriteStream(file.path, { flags: 'a' })
file.chunksData.writeStream.on('error', _reject)
}
if (!file.chunksData.hashStream) {
file.chunksData.hashStream = blake3.createHash()
}
writeStream = file.chunksData.writeStream writeStream = file.chunksData.writeStream
hashStream = file.chunksData.hashStream hashStream = file.chunksData.hashStream
} else { } else {
writeStream = fs.createWriteStream(file.path) writeStream = fs.createWriteStream(file.path)
writeStream.on('error', _reject)
hashStream = blake3.createHash() hashStream = blake3.createHash()
if (utils.scan.passthrough && if (utils.scan.passthrough &&
@ -369,31 +390,40 @@ self.actuallyUpload = async (req, res, user, data = {}) => {
} }
} }
readStream.on('error', _reject) // Re-init stream errors listeners for this Request
readStream.on('data', d => { writeStream.once('error', _reject)
hashStream.once('error', _reject)
readStream.once('error', _reject)
// Pass data into hashStream if required
if (hashStream) {
readStream.on('data', data => {
// .dispose() will destroy this internal component, // .dispose() will destroy this internal component,
// so use it as an indicator of whether the hashStream has been .dispose()'d // so use it as an indicator of whether the hashStream has been .dispose()'d
if (hashStream.hash.hash) { if (hashStream.hash.hash) {
hashStream.update(d) hashStream.update(data)
} }
}) })
}
if (file.isChunk) { if (file.isChunk) {
readStream.on('end', () => _resolve()) // We listen for readStream's end event instead
readStream.once('end', () => _resolve())
// Do not end writeStream when readStream finishes
readStream.pipe(writeStream, { end: false }) readStream.pipe(writeStream, { end: false })
} else { } else {
// Callback's weight is 1 when passthrough scanning is enabled, // Callback's weight is 1 when passthrough scanning is enabled,
// so that the Promise will be resolved only after // so that the Promise will be resolved only after
// both writeStream and scanStream finish // both writeStream and scanStream finish
writeStream.on('finish', () => _resolve({ writeStream.once('finish', () => _resolve({
size: writeStream.bytesWritten, size: writeStream.bytesWritten,
hash: hashStream.hash.hash ? hashStream.digest('hex') : null hash: hashStream.hash.hash ? hashStream.digest('hex') : null
}, scanStream ? 1 : 2)) }, scanStream ? 1 : 2))
if (scanStream) { if (scanStream) {
logger.debug(`[ClamAV]: ${file.filename}: Passthrough scanning\u2026`) logger.debug(`[ClamAV]: ${file.filename}: Passthrough scanning\u2026`)
scanStream.on('error', _reject) scanStream.once('error', _reject)
scanStream.on('scan-complete', scan => _resolve({ scanStream.once('scan-complete', scan => _resolve({
scan scan
}, 1)) }, 1))
readStream readStream
@ -440,6 +470,8 @@ self.actuallyUpload = async (req, res, user, data = {}) => {
req.files.forEach(file => { req.files.forEach(file => {
chunksData[uuid].chunks++ chunksData[uuid].chunks++
}) })
// Mark as ready to accept more chunk uploads or to finalize
chunksData[uuid].processing = false
return res.json({ success: true }) return res.json({ success: true })
} }
@ -667,12 +699,17 @@ self.actuallyFinishChunks = async (req, res, user, files) => {
throw new ClientError('Invalid file UUID, or chunks data had already timed out. Try again?') throw new ClientError('Invalid file UUID, or chunks data had already timed out. Try again?')
} }
if (chunksData[file.uuid].processing) {
throw new ClientError('Previous chunk upload is still being processed. Try again?')
}
// Suspend timeout // Suspend timeout
// If the chunk errors out there, it will be immediately cleaned up anyway // If the chunk errors out there, it will be immediately cleaned up anyway
chunksData[file.uuid].clearTimeout() chunksData[file.uuid].clearTimeout()
// Conclude write and hasher streams // Conclude write and hasher streams
chunksData[file.uuid].writeStream.end() chunksData[file.uuid].writeStream.end()
const bytesWritten = chunksData[file.uuid].writeStream.bytesWritten
const hash = chunksData[file.uuid].hashStream.digest('hex') const hash = chunksData[file.uuid].hashStream.digest('hex')
if (chunksData[file.uuid].chunks < 2 || chunksData[file.uuid].chunks > maxChunksCount) { if (chunksData[file.uuid].chunks < 2 || chunksData[file.uuid].chunks > maxChunksCount) {
@ -686,18 +723,25 @@ self.actuallyFinishChunks = async (req, res, user, files) => {
file.age = self.assertRetentionPeriod(user, file.age) file.age = self.assertRetentionPeriod(user, file.age)
file.size = chunksData[file.uuid].writeStream.bytesWritten if (file.size === undefined) {
file.size = bytesWritten
} else if (file.size !== bytesWritten) {
// If client reports actual total size, confirm match
throw new ClientError(`Written bytes (${bytesWritten}) does not match actual size reported by client (${file.size}).`)
}
if (config.filterEmptyFile && file.size === 0) { if (config.filterEmptyFile && file.size === 0) {
throw new ClientError('Empty files are not allowed.') throw new ClientError('Empty files are not allowed.')
} else if (file.size > maxSizeBytes) { } else if (file.size > maxSizeBytes) {
throw new ClientError(`File too large. Chunks are bigger than ${maxSize} MB.`) throw new ClientError(`File too large. Chunks are bigger than ${maxSize} MB.`)
} }
// Double-check file size
const tmpfile = path.join(chunksData[file.uuid].root, chunksData[file.uuid].filename) const tmpfile = path.join(chunksData[file.uuid].root, chunksData[file.uuid].filename)
// Double-check file size
const lstat = await paths.lstat(tmpfile) const lstat = await paths.lstat(tmpfile)
if (lstat.size !== file.size) { if (lstat.size !== file.size) {
throw new ClientError(`File size mismatched (${lstat.size} vs. ${file.size}).`) throw new ClientError(`Resulting physical file size (${lstat.size}) does not match expected size (${file.size}).`)
} }
// Generate name // Generate name

View File

@ -383,7 +383,9 @@ page.prepareDropzone = () => {
headers: { token: page.token }, headers: { token: page.token },
chunking: Boolean(page.chunkSize), chunking: Boolean(page.chunkSize),
chunkSize: page.chunkSize * 1e6, // this option expects Bytes chunkSize: page.chunkSize * 1e6, // this option expects Bytes
parallelChunkUploads: false, // for now, enabling this breaks descriptive upload progress // Lolisafe cannot handle parallel chunked uploads
// due to technical reasons involving how we optimize I/O performance
parallelChunkUploads: false,
timeout: 0, timeout: 0,
init () { init () {
@ -553,12 +555,13 @@ page.prepareDropzone = () => {
file.previewElement.querySelector('.descriptive-progress').innerHTML = file.previewElement.querySelector('.descriptive-progress').innerHTML =
`Rebuilding ${file.upload.totalChunkCount} chunks\u2026` `Rebuilding ${file.upload.totalChunkCount} chunks\u2026`
return axios.post('api/upload/finishchunks', { axios.post('api/upload/finishchunks', {
// This API supports an array of multiple files // This API supports an array of multiple files
files: [{ files: [{
uuid: file.upload.uuid, uuid: file.upload.uuid,
original: file.name, original: file.name,
type: file.type, type: file.type,
size: file.size,
albumid: page.album, albumid: page.album,
filelength: page.fileLength, filelength: page.fileLength,
age: page.uploadAge age: page.uploadAge
@ -584,7 +587,7 @@ page.prepareDropzone = () => {
page.updateTemplate(file, response.data.files[0]) page.updateTemplate(file, response.data.files[0])
} }
return done() done()
}) })
} }
}) })