diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index c7584a6c..99445d81 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -108,19 +108,13 @@ public class ConverterMain { public void convert(CrawlPlan plan) throws Exception { - final int maxPoolSize = 16; + final int maxPoolSize = Runtime.getRuntime().availableProcessors(); try (WorkLog processLog = plan.createProcessWorkLog(); ConversionLog log = new ConversionLog(plan.process.getDir())) { var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson); - Semaphore semaphore = new Semaphore(maxPoolSize); - var pool = new ThreadPoolExecutor( - maxPoolSize/4, - maxPoolSize, - 5, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(8) - ); + var pool = new DumbThreadPool(maxPoolSize, 2); int totalDomains = plan.countCrawledDomains(); AtomicInteger processedDomains = new AtomicInteger(0); @@ -131,8 +125,7 @@ public class ConverterMain { for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id))) { - semaphore.acquire(); - pool.execute(() -> { + pool.submit(() -> { try { ProcessedDomain processed = processor.process(domain); @@ -151,13 +144,10 @@ public class ConverterMain { catch (IOException ex) { logger.warn("IO exception in converter", ex); } - finally { - semaphore.release(); - } }); } - pool.shutdown(); + pool.shutDown(); do { System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); } while (!pool.awaitTermination(60, TimeUnit.SECONDS)); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java new file mode 100644 index 00000000..3175fec7 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java @@ -0,0 +1,118 @@ +package nu.marginalia.converting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** A simple thread pool implementation that will never invoke + * a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor} + * does when the queue is full. Instead, it will block until a thread + * becomes available to run the task. This is useful for coarse grained + * tasks where the calling thread might otherwise block for hours. + */ +public class DumbThreadPool { + private final List workers = new ArrayList<>(); + private final LinkedBlockingQueue tasks; + private volatile boolean shutDown = false; + private final AtomicInteger taskCount = new AtomicInteger(0); + private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class); + + public DumbThreadPool(int poolSize, int queueSize) { + tasks = new LinkedBlockingQueue<>(queueSize); + + for (int i = 0; i < poolSize; i++) { + Thread worker = new Thread(this::worker, "Crawler Thread " + i); + worker.setDaemon(true); + worker.start(); + workers.add(worker); + } + + } + + public void submit(Runnable runnable) throws InterruptedException { + tasks.put(runnable); + } + + public void shutDown() { + this.shutDown = true; + } + + public void shutDownNow() { + this.shutDown = true; + for (Thread worker : workers) { + worker.interrupt(); + } + } + + private void worker() { + while (!shutDown) { + try { + Runnable task = tasks.poll(1, TimeUnit.SECONDS); + if (task == null) { + continue; + } + + try { + taskCount.incrementAndGet(); + task.run(); + } + catch (Exception ex) { + logger.warn("Error executing task", ex); + } + finally { + taskCount.decrementAndGet(); + } + } + + catch (InterruptedException ex) { + logger.warn("Thread pool worker interrupted", ex); + return; + } + } + } + + + /** Wait for all tasks to complete up to the specified timeout, + * then return true if all tasks completed, false otherwise. + */ + public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException { + final long start = System.currentTimeMillis(); + final long deadline = start + timeUnit.toMillis(i); + + for (var thread : workers) { + if (!thread.isAlive()) + continue; + + long timeRemaining = deadline - System.currentTimeMillis(); + if (timeRemaining <= 0) + return false; + + thread.join(timeRemaining); + if (thread.isAlive()) + return false; + } + + // Doublecheck the bookkeeping so we didn't mess up. This may mean you have to Ctrl+C the process + // if you see this warning forever, but for the crawler this is preferable to terminating early + // and missing tasks. (maybe some cosmic ray or OOM condition or X-Files baddie of the week killed a + // thread so hard and it didn't invoke finally and didn't decrement the task count) + + int activeCount = getActiveCount(); + if (activeCount != 0) { + logger.warn("Thread pool terminated with {} active threads(?!) -- check what's going on with jstack and kill manually", activeCount); + return false; + } + + return true; + } + + public int getActiveCount() { + return taskCount.get(); + } + +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java index 7a56be74..676e3286 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java @@ -77,7 +77,10 @@ public class DumbThreadPool { } - public boolean awaitTermination(int i, TimeUnit timeUnit) { + /** Wait for all tasks to complete up to the specified timeout, + * then return true if all tasks completed, false otherwise. + */ + public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException { final long start = System.currentTimeMillis(); final long deadline = start + timeUnit.toMillis(i); @@ -86,17 +89,23 @@ public class DumbThreadPool { continue; long timeRemaining = deadline - System.currentTimeMillis(); - if (timeRemaining <= 0) return false; - try { - thread.join(timeRemaining); - } - catch (InterruptedException ex) { - logger.warn("Interrupted while waiting for thread pool to terminate", ex); + thread.join(timeRemaining); + if (thread.isAlive()) return false; - } + } + + // Doublecheck the bookkeeping so we didn't mess up. This may mean you have to Ctrl+C the process + // if you see this warning forever, but for the crawler this is preferable to terminating early + // and missing tasks. (maybe some cosmic ray or OOM condition or X-Files baddie of the week killed a + // thread so hard and it didn't invoke finally and didn't decrement the task count) + + int activeCount = getActiveCount(); + if (activeCount != 0) { + logger.warn("Thread pool terminated with {} active threads(?!) -- check what's going on with jstack and kill manually", activeCount); + return false; } return true;