Simplify CrawlerMain, removing the CrawlerLimiter and using a global HttpFetcher with a virtual thread pool dispatcher instead of the default.

This commit is contained in:
Viktor Lofgren 2023-12-08 20:24:01 +01:00
parent 968dce50fc
commit e6a1052ba7
2 changed files with 10 additions and 98 deletions

View File

@ -1,83 +0,0 @@
package nu.marginalia.crawl;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class CrawlLimiter {
public static final int maxPoolSize = Integer.getInteger("crawler.pool-size", 256);
// Thresholds for throttling task-spawning. Note there's a bit of hysteresis to this
private final long THROTTLE_TRIGGER_FREE_RAM = Runtime.getRuntime().maxMemory() / 4;
private final long THROTTLE_RELEASE_FREE_RAM = Runtime.getRuntime().maxMemory() / 2;
private final Semaphore taskSemCount = new Semaphore(maxPoolSize);
// When set to true, the crawler will wait before starting additional tasks
private final AtomicBoolean throttle = new AtomicBoolean(false);
private static final Logger logger = LoggerFactory.getLogger(CrawlLimiter.class);
public CrawlLimiter() {
Thread monitorThread = new Thread(this::monitor, "Memory Monitor");
monitorThread.setDaemon(true);
monitorThread.start();
}
@SneakyThrows
public void monitor() {
for (;;) {
synchronized (throttle) {
boolean oldThrottle = throttle.get();
boolean newThrottle = oldThrottle;
if (Runtime.getRuntime().maxMemory() == Long.MAX_VALUE) {
// According to the spec this may happen, although it seems to rarely
// be the case in practice
logger.warn("Memory based throttling disabled (set Xmx)");
return;
}
final long freeMemory = Runtime.getRuntime().maxMemory() - Runtime.getRuntime().totalMemory();
if (oldThrottle && freeMemory > THROTTLE_RELEASE_FREE_RAM) {
newThrottle = false;
logger.warn("Memory based throttling released");
}
else if (!oldThrottle && freeMemory < THROTTLE_TRIGGER_FREE_RAM) {
newThrottle = true;
logger.warn("Memory based throttling triggered");
// Try to GC
System.gc();
}
throttle.set(newThrottle);
if (!newThrottle) {
throttle.notifyAll();
}
if (newThrottle != oldThrottle) {
logger.warn("Memory based throttling set to {}", newThrottle);
}
}
TimeUnit.SECONDS.sleep(1);
}
}
@SneakyThrows
public void waitForEnoughRAM() {
while (throttle.get()) {
synchronized (throttle) {
throttle.wait(30000);
}
}
}
}

View File

@ -50,12 +50,6 @@ public class CrawlerMain {
private final static Logger logger = LoggerFactory.getLogger(CrawlerMain.class);
private final ProcessHeartbeatImpl heartbeat;
private final ConnectionPool connectionPool = new ConnectionPool(5, 10, TimeUnit.SECONDS);
private final Dispatcher dispatcher = new Dispatcher(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", true)));
private final UserAgent userAgent;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final DbCrawlSpecProvider dbCrawlSpecProvider;
@ -71,7 +65,7 @@ public class CrawlerMain {
volatile int totalTasks;
final AtomicInteger tasksDone = new AtomicInteger(0);
private final CrawlLimiter limiter = new CrawlLimiter();
private HttpFetcherImpl fetcher;
@Inject
public CrawlerMain(UserAgent userAgent,
@ -83,7 +77,6 @@ public class CrawlerMain {
AnchorTagsSourceFactory anchorTagsSourceFactory,
Gson gson) {
this.heartbeat = heartbeat;
this.userAgent = userAgent;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.dbCrawlSpecProvider = dbCrawlSpecProvider;
@ -91,8 +84,14 @@ public class CrawlerMain {
this.gson = gson;
this.node = processConfiguration.node();
// maybe need to set -Xss for JVM to deal with this?
pool = new SimpleBlockingThreadPool("CrawlerPool", CrawlLimiter.maxPoolSize, 1);
pool = new SimpleBlockingThreadPool("CrawlerPool",
Integer.getInteger("crawler.pool-size", 256),
1);
fetcher = new HttpFetcherImpl(userAgent.uaString(),
new Dispatcher(Executors.newVirtualThreadPerTaskExecutor()),
new ConnectionPool(5, 10, TimeUnit.SECONDS)
);
}
public static void main(String... args) throws Exception {
@ -173,6 +172,7 @@ public class CrawlerMain {
activePoolCount = newActivePoolCount;
}
}
}
catch (Exception ex) {
logger.warn("Exception in crawler", ex);
@ -209,11 +209,6 @@ public class CrawlerMain {
@Override
public void run() throws Exception {
limiter.waitForEnoughRAM();
HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);
try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id);
var warcRecorder = new WarcRecorder(); // write to a temp file for now
var retreiver = new CrawlerRetreiver(fetcher, specification, warcRecorder, writer::accept);