diff --git a/controllers/uploadController.js b/controllers/uploadController.js index c08f281..06c73d3 100644 --- a/controllers/uploadController.js +++ b/controllers/uploadController.js @@ -49,13 +49,16 @@ const urlExtensionsFilter = Array.isArray(config.uploads.urlExtensionsFilter) && /** Chunks helper class & function **/ class ChunksData { - constructor (uuid, root) { + constructor (uuid) { this.uuid = uuid - this.root = root + this.root = path.join(paths.chunks, this.uuid) this.filename = 'tmp' + this.path = path.join(this.root, this.filename) this.chunks = 0 this.writeStream = null this.hashStream = null + // Immediately mark this chunked upload as currently processing + this.processing = true } onTimeout () { @@ -76,16 +79,27 @@ class ChunksData { const initChunks = async uuid => { if (chunksData[uuid] === undefined) { - const root = path.join(paths.chunks, uuid) - try { - await paths.access(root) - } catch (err) { - // Re-throw error - if (err && err.code !== 'ENOENT') throw err - await paths.mkdir(root) + chunksData[uuid] = new ChunksData(uuid) + + const exist = await paths.access(chunksData[uuid].root) + .catch(err => { + // Re-throw error only if not directory is missing error + if (err.code !== 'ENOENT') throw err + return false + }) + if (!exist) { + await paths.mkdir(chunksData[uuid].root) } - chunksData[uuid] = new ChunksData(uuid, root) + + // Init write & hasher streams + chunksData[uuid].writeStream = fs.createWriteStream(chunksData[uuid].path, { flags: 'a' }) + chunksData[uuid].hashStream = blake3.createHash() + } else if (chunksData[uuid].processing) { + // Wait for the first spawned init tasks + throw new ClientError('Previous chunk upload is still being processed. Parallel chunked uploads is not supported.') } + + // Reset timeout chunksData[uuid].setTimeout(chunkedUploadsTimeout) return chunksData[uuid] } @@ -302,31 +316,25 @@ self.actuallyUpload = async (req, res, user, data = {}) => { throw new ClientError(`${file.extname ? `${file.extname.substr(1).toUpperCase()} files` : 'Files with no extension'} are not permitted.`) } - let destination if (file.isChunk) { - // Calling this will also reset this chunked uploads' timeout + // Calling initChunks() will also reset the chunked uploads' timeout file.chunksData = await initChunks(req.body.uuid) file.filename = file.chunksData.filename - destination = file.chunksData.root + file.path = file.chunksData.path } else { const length = self.parseFileIdentifierLength(req.headers.filelength) file.filename = await self.getUniqueRandomName(length, file.extname) - destination = paths.uploads + file.path = path.join(paths.uploads, file.filename) } - file.path = path.join(destination, file.filename) - // Write the file into disk, and supply required props into file object await new Promise((resolve, reject) => { - // "weighted" resolve function, to be able to "await" multiple callbacks - const REQUIRED_WEIGHT = 2 - let _weight = 0 - const _resolve = (props = {}, weight = 2) => { - if (file.error) return - Object.assign(file, props) - _weight += weight - if (_weight >= REQUIRED_WEIGHT) { - return resolve() + // Helper function to remove event listeners from multiple emitters + const _unlisten = (emitters = [], event, listener) => { + for (const emitter of emitters) { + if (emitter !== undefined) { + emitter.off(event, listener) + } } } @@ -336,64 +344,86 @@ self.actuallyUpload = async (req, res, user, data = {}) => { let scanStream const _reject = error => { + // If this had already been rejected once + if (file.error) return + + _unlisten([writeStream, hashStream, scanStream], 'error', _reject) file.error = true - if (writeStream && !writeStream.closed) { - writeStream.end() + + if (writeStream && !writeStream.destroyed) { + writeStream.destroy() } - if (hashStream.hash.hash) { + if (hashStream && hashStream.hash.hash) { hashStream.dispose() } - return reject(error) + + reject(error) + } + + // "weighted" resolve function, to be able to "await" multiple callbacks + const REQUIRED_WEIGHT = 2 + let _weight = 0 + const _resolve = (props = {}, weight = 2) => { + // If this had already been rejected once + if (file.error) return + + Object.assign(file, props) + _weight += weight + + if (_weight >= REQUIRED_WEIGHT) { + _unlisten([writeStream, hashStream, scanStream], 'error', _reject) + resolve() + } } if (file.isChunk) { - if (!file.chunksData.writeStream) { - file.chunksData.writeStream = fs.createWriteStream(file.path, { flags: 'a' }) - file.chunksData.writeStream.on('error', _reject) - } - if (!file.chunksData.hashStream) { - file.chunksData.hashStream = blake3.createHash() - } - writeStream = file.chunksData.writeStream hashStream = file.chunksData.hashStream } else { writeStream = fs.createWriteStream(file.path) - writeStream.on('error', _reject) hashStream = blake3.createHash() if (utils.scan.passthrough && - !self.scanHelpers.assertUserBypass(req._user, file.filename) && - !self.scanHelpers.assertFileBypass({ filename: file.filename })) { + !self.scanHelpers.assertUserBypass(req._user, file.filename) && + !self.scanHelpers.assertFileBypass({ filename: file.filename })) { scanStream = utils.scan.instance.passthrough() } } - readStream.on('error', _reject) - readStream.on('data', d => { - // .dispose() will destroy this internal component, - // so use it as an indicator of whether the hashStream has been .dispose()'d - if (hashStream.hash.hash) { - hashStream.update(d) - } - }) + // Re-init stream errors listeners for this Request + writeStream.once('error', _reject) + hashStream.once('error', _reject) + readStream.once('error', _reject) + + // Pass data into hashStream if required + if (hashStream) { + readStream.on('data', data => { + // .dispose() will destroy this internal component, + // so use it as an indicator of whether the hashStream has been .dispose()'d + if (hashStream.hash.hash) { + hashStream.update(data) + } + }) + } if (file.isChunk) { - readStream.on('end', () => _resolve()) + // We listen for readStream's end event instead + readStream.once('end', () => _resolve()) + // Do not end writeStream when readStream finishes readStream.pipe(writeStream, { end: false }) } else { // Callback's weight is 1 when passthrough scanning is enabled, // so that the Promise will be resolved only after // both writeStream and scanStream finish - writeStream.on('finish', () => _resolve({ + writeStream.once('finish', () => _resolve({ size: writeStream.bytesWritten, hash: hashStream.hash.hash ? hashStream.digest('hex') : null }, scanStream ? 1 : 2)) if (scanStream) { logger.debug(`[ClamAV]: ${file.filename}: Passthrough scanning\u2026`) - scanStream.on('error', _reject) - scanStream.on('scan-complete', scan => _resolve({ + scanStream.once('error', _reject) + scanStream.once('scan-complete', scan => _resolve({ scan }, 1)) readStream @@ -440,6 +470,8 @@ self.actuallyUpload = async (req, res, user, data = {}) => { req.files.forEach(file => { chunksData[uuid].chunks++ }) + // Mark as ready to accept more chunk uploads or to finalize + chunksData[uuid].processing = false return res.json({ success: true }) } @@ -667,12 +699,17 @@ self.actuallyFinishChunks = async (req, res, user, files) => { throw new ClientError('Invalid file UUID, or chunks data had already timed out. Try again?') } + if (chunksData[file.uuid].processing) { + throw new ClientError('Previous chunk upload is still being processed. Try again?') + } + // Suspend timeout // If the chunk errors out there, it will be immediately cleaned up anyway chunksData[file.uuid].clearTimeout() // Conclude write and hasher streams chunksData[file.uuid].writeStream.end() + const bytesWritten = chunksData[file.uuid].writeStream.bytesWritten const hash = chunksData[file.uuid].hashStream.digest('hex') if (chunksData[file.uuid].chunks < 2 || chunksData[file.uuid].chunks > maxChunksCount) { @@ -686,18 +723,25 @@ self.actuallyFinishChunks = async (req, res, user, files) => { file.age = self.assertRetentionPeriod(user, file.age) - file.size = chunksData[file.uuid].writeStream.bytesWritten + if (file.size === undefined) { + file.size = bytesWritten + } else if (file.size !== bytesWritten) { + // If client reports actual total size, confirm match + throw new ClientError(`Written bytes (${bytesWritten}) does not match actual size reported by client (${file.size}).`) + } + if (config.filterEmptyFile && file.size === 0) { throw new ClientError('Empty files are not allowed.') } else if (file.size > maxSizeBytes) { throw new ClientError(`File too large. Chunks are bigger than ${maxSize} MB.`) } - // Double-check file size const tmpfile = path.join(chunksData[file.uuid].root, chunksData[file.uuid].filename) + + // Double-check file size const lstat = await paths.lstat(tmpfile) if (lstat.size !== file.size) { - throw new ClientError(`File size mismatched (${lstat.size} vs. ${file.size}).`) + throw new ClientError(`Resulting physical file size (${lstat.size}) does not match expected size (${file.size}).`) } // Generate name diff --git a/src/js/home.js b/src/js/home.js index 23e01b8..9d8b9ae 100644 --- a/src/js/home.js +++ b/src/js/home.js @@ -383,7 +383,9 @@ page.prepareDropzone = () => { headers: { token: page.token }, chunking: Boolean(page.chunkSize), chunkSize: page.chunkSize * 1e6, // this option expects Bytes - parallelChunkUploads: false, // for now, enabling this breaks descriptive upload progress + // Lolisafe cannot handle parallel chunked uploads + // due to technical reasons involving how we optimize I/O performance + parallelChunkUploads: false, timeout: 0, init () { @@ -553,12 +555,13 @@ page.prepareDropzone = () => { file.previewElement.querySelector('.descriptive-progress').innerHTML = `Rebuilding ${file.upload.totalChunkCount} chunks\u2026` - return axios.post('api/upload/finishchunks', { + axios.post('api/upload/finishchunks', { // This API supports an array of multiple files files: [{ uuid: file.upload.uuid, original: file.name, type: file.type, + size: file.size, albumid: page.album, filelength: page.fileLength, age: page.uploadAge @@ -584,7 +587,7 @@ page.prepareDropzone = () => { page.updateTemplate(file, response.data.files[0]) } - return done() + done() }) } })