(crawler, converter) Fix so that DumbThreadPool actually waits for termination as intended.

This commit is contained in:
Viktor Lofgren 2023-07-29 19:19:09 +02:00
parent d3f01bd171
commit ee143bbc48
3 changed files with 139 additions and 22 deletions

View File

@ -108,19 +108,13 @@ public class ConverterMain {
public void convert(CrawlPlan plan) throws Exception {
final int maxPoolSize = 16;
final int maxPoolSize = Runtime.getRuntime().availableProcessors();
try (WorkLog processLog = plan.createProcessWorkLog();
ConversionLog log = new ConversionLog(plan.process.getDir())) {
var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson);
Semaphore semaphore = new Semaphore(maxPoolSize);
var pool = new ThreadPoolExecutor(
maxPoolSize/4,
maxPoolSize,
5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(8)
);
var pool = new DumbThreadPool(maxPoolSize, 2);
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
@ -131,8 +125,7 @@ public class ConverterMain {
for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id)))
{
semaphore.acquire();
pool.execute(() -> {
pool.submit(() -> {
try {
ProcessedDomain processed = processor.process(domain);
@ -151,13 +144,10 @@ public class ConverterMain {
catch (IOException ex) {
logger.warn("IO exception in converter", ex);
}
finally {
semaphore.release();
}
});
}
pool.shutdown();
pool.shutDown();
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));

View File

@ -0,0 +1,118 @@
package nu.marginalia.converting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** A simple thread pool implementation that will never invoke
* a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor}
* does when the queue is full. Instead, it will block until a thread
* becomes available to run the task. This is useful for coarse grained
* tasks where the calling thread might otherwise block for hours.
*/
public class DumbThreadPool {
private final List<Thread> workers = new ArrayList<>();
private final LinkedBlockingQueue<Runnable> tasks;
private volatile boolean shutDown = false;
private final AtomicInteger taskCount = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class);
public DumbThreadPool(int poolSize, int queueSize) {
tasks = new LinkedBlockingQueue<>(queueSize);
for (int i = 0; i < poolSize; i++) {
Thread worker = new Thread(this::worker, "Crawler Thread " + i);
worker.setDaemon(true);
worker.start();
workers.add(worker);
}
}
public void submit(Runnable runnable) throws InterruptedException {
tasks.put(runnable);
}
public void shutDown() {
this.shutDown = true;
}
public void shutDownNow() {
this.shutDown = true;
for (Thread worker : workers) {
worker.interrupt();
}
}
private void worker() {
while (!shutDown) {
try {
Runnable task = tasks.poll(1, TimeUnit.SECONDS);
if (task == null) {
continue;
}
try {
taskCount.incrementAndGet();
task.run();
}
catch (Exception ex) {
logger.warn("Error executing task", ex);
}
finally {
taskCount.decrementAndGet();
}
}
catch (InterruptedException ex) {
logger.warn("Thread pool worker interrupted", ex);
return;
}
}
}
/** Wait for all tasks to complete up to the specified timeout,
* then return true if all tasks completed, false otherwise.
*/
public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException {
final long start = System.currentTimeMillis();
final long deadline = start + timeUnit.toMillis(i);
for (var thread : workers) {
if (!thread.isAlive())
continue;
long timeRemaining = deadline - System.currentTimeMillis();
if (timeRemaining <= 0)
return false;
thread.join(timeRemaining);
if (thread.isAlive())
return false;
}
// Doublecheck the bookkeeping so we didn't mess up. This may mean you have to Ctrl+C the process
// if you see this warning forever, but for the crawler this is preferable to terminating early
// and missing tasks. (maybe some cosmic ray or OOM condition or X-Files baddie of the week killed a
// thread so hard and it didn't invoke finally and didn't decrement the task count)
int activeCount = getActiveCount();
if (activeCount != 0) {
logger.warn("Thread pool terminated with {} active threads(?!) -- check what's going on with jstack and kill manually", activeCount);
return false;
}
return true;
}
public int getActiveCount() {
return taskCount.get();
}
}

View File

@ -77,7 +77,10 @@ public class DumbThreadPool {
}
public boolean awaitTermination(int i, TimeUnit timeUnit) {
/** Wait for all tasks to complete up to the specified timeout,
* then return true if all tasks completed, false otherwise.
*/
public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException {
final long start = System.currentTimeMillis();
final long deadline = start + timeUnit.toMillis(i);
@ -86,17 +89,23 @@ public class DumbThreadPool {
continue;
long timeRemaining = deadline - System.currentTimeMillis();
if (timeRemaining <= 0)
return false;
try {
thread.join(timeRemaining);
}
catch (InterruptedException ex) {
logger.warn("Interrupted while waiting for thread pool to terminate", ex);
thread.join(timeRemaining);
if (thread.isAlive())
return false;
}
}
// Doublecheck the bookkeeping so we didn't mess up. This may mean you have to Ctrl+C the process
// if you see this warning forever, but for the crawler this is preferable to terminating early
// and missing tasks. (maybe some cosmic ray or OOM condition or X-Files baddie of the week killed a
// thread so hard and it didn't invoke finally and didn't decrement the task count)
int activeCount = getActiveCount();
if (activeCount != 0) {
logger.warn("Thread pool terminated with {} active threads(?!) -- check what's going on with jstack and kill manually", activeCount);
return false;
}
return true;