From 40b687dc18d546304909fa3af8a87e14b9fe5dd5 Mon Sep 17 00:00:00 2001 From: KoalaSat Date: Sat, 30 Dec 2023 16:01:08 +0000 Subject: [PATCH] Robohash gereator queue (#1035) Robohash generator queue --- .../RobotAvatar/RobohashGenerator.ts | 130 +++++++++++------- frontend/src/models/Federation.model.ts | 2 +- 2 files changed, 84 insertions(+), 48 deletions(-) diff --git a/frontend/src/components/RobotAvatar/RobohashGenerator.ts b/frontend/src/components/RobotAvatar/RobohashGenerator.ts index 2d25a89f..8f6c1adc 100644 --- a/frontend/src/components/RobotAvatar/RobohashGenerator.ts +++ b/frontend/src/components/RobotAvatar/RobohashGenerator.ts @@ -1,15 +1,75 @@ +interface Task { + robohash: Robohash; + resolves: ((result: string) => void)[]; + rejects: ((reason?: Error) => void)[]; +} + +interface Robohash { + hash: string; + size: 'small' | 'large'; + cacheKey: string; +} + +interface RoboWorker { + worker: Worker; + busy: boolean; +} + class RoboGenerator { private assetsCache: Record = {}; - private assetsPromises: Record> = {}; - private readonly workers: Worker[] = []; + + private readonly workers: RoboWorker[] = []; + private readonly queue: Task[] = []; constructor() { // limit to 8 workers - const numCores = Math.min(navigator.hardwareConcurrency || 1, 8); + const numCores = 8; for (let i = 0; i < numCores; i++) { const worker = new Worker(new URL('./robohash.worker.ts', import.meta.url)); - this.workers.push(worker); + worker.onmessage = this.assignTasksToWorkers.bind(this); + this.workers.push({ worker, busy: false }); + } + } + + private assignTasksToWorkers() { + const availableWorker = this.workers.find((w) => !w.busy); + + if (availableWorker) { + const task = this.queue.shift(); + if (task) { + availableWorker.busy = true; + availableWorker.worker.postMessage(task.robohash); + + // Clean up the event listener and free the worker after receiving the result + const cleanup = () => { + availableWorker.worker.removeEventListener('message', completionCallback); + availableWorker.busy = false; + }; + + // Resolve the promise when the task is completed + const completionCallback = (event: MessageEvent) => { + if (event.data.cacheKey === task.robohash.cacheKey) { + const { cacheKey, imageUrl } = event.data; + + // Update the cache and resolve the promise + this.assetsCache[cacheKey] = imageUrl; + + cleanup(); + + task.resolves.forEach((f) => f(imageUrl)); + } + }; + + availableWorker.worker.addEventListener('message', completionCallback); + + // Reject the promise if an error occurs + availableWorker.worker.addEventListener('error', (error) => { + cleanup(); + + task.rejects.forEach((f) => f(new Error(error.message))); + }); + } } } @@ -20,53 +80,29 @@ class RoboGenerator { const cacheKey = `${size}px;${hash}`; if (this.assetsCache[cacheKey]) { return this.assetsCache[cacheKey]; - } else if (cacheKey in this.assetsPromises) { - return await this.assetsPromises[cacheKey]; - } + } else { + return new Promise((resolve, reject) => { + let task = this.queue.find((t) => t.robohash.cacheKey === cacheKey); - const workerIndex = Object.keys(this.assetsPromises).length % this.workers.length; - const worker = this.workers[workerIndex]; + if (!task) { + task = { + robohash: { + hash, + size, + cacheKey, + }, + resolves: [], + rejects: [], + }; + this.queue.push(task); + } - this.assetsPromises[cacheKey] = new Promise((resolve, reject) => { - // const avatarB64 = async_generate_robohash(hash, size == 'small' ? 80 : 256).then((avatarB64)=> resolve(`data:image/png;base64,${avatarB64}`)); - // Create a message object with the necessary data - const message = { hash, size, cacheKey, workerIndex }; + task.resolves.push(resolve); + task.rejects.push(reject); - // Listen for messages from the worker - const handleMessage = (event: MessageEvent) => { - const { cacheKey, imageUrl } = event.data; - - // Update the cache and resolve the promise - this.assetsCache[cacheKey] = imageUrl; - delete this.assetsPromises[cacheKey]; - resolve(imageUrl); - }; - - // Add the event listener for messages - worker.addEventListener('message', handleMessage); - - // Send the message to the worker - worker.postMessage(message); - - // Clean up the event listener after receiving the result - const cleanup = () => { - worker.removeEventListener('message', handleMessage); - }; - - // Reject the promise if an error occurs - worker.addEventListener('error', (error) => { - cleanup(); - reject(error); + this.assignTasksToWorkers(); }); - - // Reject the promise if the worker times out - setTimeout(() => { - cleanup(); - reject(new Error('Generation timed out')); - }, 5000); // Adjust the timeout duration as needed - }); - - return await this.assetsPromises[cacheKey]; + } }; } diff --git a/frontend/src/models/Federation.model.ts b/frontend/src/models/Federation.model.ts index 7b47e541..09286162 100644 --- a/frontend/src/models/Federation.model.ts +++ b/frontend/src/models/Federation.model.ts @@ -17,7 +17,7 @@ export class Federation { constructor() { this.coordinators = Object.entries(defaultFederation).reduce( (acc: Record, [key, value]: [string, any]) => { - if (getHost() !== '127.0.0.1:8000' && key == 'local') { + if (getHost() !== '127.0.0.1:8000' && key === 'local') { // Do not add `Local Dev` unless it is running on localhost return acc; } else {