(converter) Use a dumb thread pool instead of Java's executor service.

This commit is contained in:
Viktor Lofgren 2023-07-28 18:15:16 +02:00
parent f11103d31d
commit e237df4a10
6 changed files with 225 additions and 35 deletions

View File

@ -1,19 +1,87 @@
package nu.marginalia.crawl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class CrawlLimiter {
public static final int maxPoolSize = Integer.getInteger("crawler.pool-size", 512);
public static final int maxPoolSize = Integer.getInteger("crawler.pool-size", 256);
// Thresholds for throttling task-spawning. Note there's a bit of hysteresis to this
private static final long THROTTLE_TRIGGER_FREE_RAM = 2 * 1024 * 1024 * 1024L;
private static final long THROTTLE_RELEASE_FREE_RAM = 4 * 1024 * 1024 * 1024L;
private final Semaphore taskSemCount = new Semaphore(maxPoolSize);
// When set to true, the crawler will wait before starting additional tasks
private final AtomicBoolean throttle = new AtomicBoolean(false);
private static final Logger logger = LoggerFactory.getLogger(CrawlLimiter.class);
public CrawlLimiter() {
Thread monitorThread = new Thread(this::monitor, "Memory Monitor");
monitorThread.setDaemon(true);
monitorThread.start();
}
@SneakyThrows
public void monitor() {
for (;;) {
synchronized (throttle) {
boolean oldThrottle = throttle.get();
boolean newThrottle = oldThrottle;
if (Runtime.getRuntime().maxMemory() == Long.MAX_VALUE) {
// According to the spec this may happen, although it seems to rarely
// be the case in practice
logger.warn("Memory based throttling disabled (set Xmx)");
return;
}
final long freeMemory = Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory();
if (oldThrottle && freeMemory > THROTTLE_RELEASE_FREE_RAM) {
newThrottle = false;
logger.warn("Memory based throttling released");
}
else if (!oldThrottle && freeMemory < THROTTLE_TRIGGER_FREE_RAM) {
newThrottle = true;
logger.warn("Memory based throttling triggered");
}
throttle.set(newThrottle);
if (!newThrottle) {
throttle.notifyAll();
}
if (newThrottle != oldThrottle) {
logger.warn("Memory based throttling set to {}", newThrottle);
}
}
TimeUnit.SECONDS.sleep(1);
}
}
private void waitForEnoughRAM() throws InterruptedException {
while (!throttle.get()) {
synchronized (throttle) {
throttle.wait(30000);
}
}
}
public void acquire() throws InterruptedException {
// It's very important that we acquire the RAM semaphore first to avoid a deadlock
taskSemCount.acquire(1);
if (taskSemCount.availablePermits() < maxPoolSize / 2) {
waitForEnoughRAM();
}
}
public void release() {

View File

@ -53,9 +53,8 @@ public class CrawlerMain implements AutoCloseable {
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final Gson gson;
private final ThreadPoolExecutor pool;
private final DumbThreadPool pool;
public final CrawlLimiter crawlLimiter = new CrawlLimiter();
private final Set<String> processedIds = new HashSet<>();
final AbortMonitor abortMonitor = AbortMonitor.getInstance();
@ -76,12 +75,7 @@ public class CrawlerMain implements AutoCloseable {
this.gson = gson;
// maybe need to set -Xss for JVM to deal with this?
pool = new ThreadPoolExecutor(
CrawlLimiter.maxPoolSize /128,
CrawlLimiter.maxPoolSize,
5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(32)
);
pool = new DumbThreadPool(CrawlLimiter.maxPoolSize, 8);
}
public static void main(String... args) throws Exception {
@ -142,7 +136,7 @@ public class CrawlerMain implements AutoCloseable {
startCrawlTask(plan, spec);
}
pool.shutdown();
pool.shutDown();
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
@ -172,20 +166,19 @@ public class CrawlerMain implements AutoCloseable {
}
try {
crawlLimiter.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
pool.submit(() -> {
try {
Thread.currentThread().setName("crawling:" + crawlingSpecification.domain);
fetchDomain(crawlingSpecification);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
} finally {
Thread.currentThread().setName("[idle]");
}
});
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
pool.execute(() -> {
try {
fetchDomain(crawlingSpecification);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
}
finally {
crawlLimiter.release();
}
});
}
@ -195,7 +188,6 @@ public class CrawlerMain implements AutoCloseable {
HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);
try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification)) {
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);
@ -282,7 +274,7 @@ public class CrawlerMain implements AutoCloseable {
public void close() throws Exception {
logger.info("Awaiting termination");
pool.shutdown();
pool.shutDown();
while (!pool.awaitTermination(1, TimeUnit.SECONDS));
logger.info("All finished");

View File

@ -0,0 +1,109 @@
package nu.marginalia.crawl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** A simple thread pool implementation that will never invoke
* a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor}
* does when the queue is full. Instead, it will block until a thread
* becomes available to run the task. This is useful for coarse grained
* tasks where the calling thread might otherwise block for hours.
*/
public class DumbThreadPool {
private final List<Thread> workers = new ArrayList<>();
private final LinkedBlockingQueue<Runnable> tasks;
private volatile boolean shutDown = false;
private final AtomicInteger taskCount = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class);
public DumbThreadPool(int poolSize, int queueSize) {
tasks = new LinkedBlockingQueue<>(queueSize);
for (int i = 0; i < poolSize; i++) {
Thread worker = new Thread(this::worker, "Crawler Thread " + i);
worker.setDaemon(true);
worker.start();
workers.add(worker);
}
}
public void submit(Runnable runnable) throws InterruptedException {
tasks.put(runnable);
}
public void shutDown() {
this.shutDown = true;
}
public void shutDownNow() {
this.shutDown = true;
for (Thread worker : workers) {
worker.interrupt();
}
}
private void worker() {
while (!shutDown) {
try {
Runnable task = tasks.poll(1, TimeUnit.SECONDS);
if (task == null) {
continue;
}
try {
taskCount.incrementAndGet();
task.run();
}
catch (Exception ex) {
logger.warn("Error executing task", ex);
}
finally {
taskCount.decrementAndGet();
}
}
catch (InterruptedException ex) {
logger.warn("Thread pool worker interrupted", ex);
return;
}
}
}
public boolean awaitTermination(int i, TimeUnit timeUnit) {
final long start = System.currentTimeMillis();
final long deadline = start + timeUnit.toMillis(i);
for (var thread : workers) {
if (!thread.isAlive())
continue;
long timeRemaining = deadline - System.currentTimeMillis();
if (timeRemaining <= 0)
return false;
try {
thread.join(timeRemaining);
}
catch (InterruptedException ex) {
logger.warn("Interrupted while waiting for thread pool to terminate", ex);
return false;
}
}
return true;
}
public int getActiveCount() {
return taskCount.get();
}
}

View File

@ -516,6 +516,10 @@ public class CrawlerRetreiver {
return false;
if (doc == null)
return false;
if (doc.documentBody == null)
return false;
if (newDoc.documentBody == null)
return false;
return reference.isContentBodySame(doc, newDoc);
}

View File

@ -21,10 +21,13 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509TrustManager;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
@ -120,7 +123,7 @@ public class HttpFetcherImpl implements HttpFetcher {
return probeDomain(new EdgeUrl("https", url.domain, url.port, url.path, url.param));
}
logger.info("Error during fetching {}[{}]", ex.getClass().getSimpleName(), ex.getMessage());
logger.info("Error during fetching", ex);
return new FetchResult(FetchResultState.ERROR, url.domain);
}
}
@ -197,11 +200,18 @@ public class HttpFetcherImpl implements HttpFetcher {
catch (SocketTimeoutException ex) {
return createTimeoutErrorRsp(url, ex);
}
catch (IllegalCharsetNameException ex) {
catch (IllegalCharsetNameException | SSLException | EOFException ex) {
// This is a bit of a grab-bag of errors that crop up
// IllegalCharsetName is egg on our face,
// but SSLException and EOFException are probably the server's fault
return createHardErrorRsp(url, ex);
}
catch (UnknownHostException ex) {
return createUnknownHostError(url, ex);
}
catch (Exception ex) {
logger.error("Error during fetching {}[{}]", ex.getClass().getSimpleName(), ex.getMessage());
logger.error("Error during fetching", ex);
return createHardErrorRsp(url, ex);
}
}
@ -214,6 +224,16 @@ public class HttpFetcherImpl implements HttpFetcher {
.url(url.toString())
.build();
}
private CrawledDocument createUnknownHostError(EdgeUrl url, Exception why) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ERROR.toString())
.crawlerStatusDesc("Unknown Host")
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
}
private CrawledDocument createTimeoutErrorRsp(EdgeUrl url, Exception why) {
return CrawledDocument.builder()
.crawlerStatus("Timeout")

View File

@ -59,7 +59,4 @@ class RssCrawlerTest {
return urls;
}
}