From d895f8352017ba8c0c47564ae1cf5223f9914cd1 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 20 Sep 2023 10:11:49 +0200 Subject: [PATCH] (blocking-thread-pool) Move DumbThreadPool to its own micro-library Also rename it to SimpleBlockingThreadPool. --- .../blocking-thread-pool/build.gradle | 27 ++++ .../util/SimpleBlockingThreadPool.java} | 28 ++--- .../processes/converting-process/build.gradle | 1 + .../marginalia/converting/ConverterMain.java | 6 +- .../marginalia/converting/DumbThreadPool.java | 119 ------------------ .../HtmlProcessorSpecializations.java | 9 +- .../MariadbKbSpecialization.java | 66 ++++++++++ code/processes/crawling-process/build.gradle | 1 + .../java/nu/marginalia/crawl/CrawlerMain.java | 7 +- settings.gradle | 1 + 10 files changed, 123 insertions(+), 142 deletions(-) create mode 100644 code/libraries/blocking-thread-pool/build.gradle rename code/{processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java => libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java} (78%) delete mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/MariadbKbSpecialization.java diff --git a/code/libraries/blocking-thread-pool/build.gradle b/code/libraries/blocking-thread-pool/build.gradle new file mode 100644 index 00000000..657954e6 --- /dev/null +++ b/code/libraries/blocking-thread-pool/build.gradle @@ -0,0 +1,27 @@ +plugins { + id 'java' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(20)) + } +} + +dependencies { + implementation libs.lombok + annotationProcessor libs.lombok + implementation libs.bundles.slf4j + + implementation libs.notnull + + implementation libs.fastutil + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito +} + +test { + useJUnitPlatform() +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java similarity index 78% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java rename to code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java index 762bfb32..7b6248d0 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/DumbThreadPool.java +++ b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java @@ -1,40 +1,37 @@ -package nu.marginalia.crawl; +package nu.marginalia.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -/** A simple thread pool implementation that will never invoke - * a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor} - * does when the queue is full. Instead, it will block until a thread - * becomes available to run the task. This is useful for coarse grained - * tasks where the calling thread might otherwise block for hours. +/** A dead simple thread pool implementation that will block the caller + * when it is not able to perform a task. This is desirable in batch + * processing workloads. */ -// TODO: This class exists in converter as well, should probably be broken out into a common library; use this version -public class DumbThreadPool { +public class SimpleBlockingThreadPool { private final List workers = new ArrayList<>(); - private final LinkedBlockingQueue tasks; + private final BlockingQueue tasks; private volatile boolean shutDown = false; private final AtomicInteger taskCount = new AtomicInteger(0); - private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class); + private final Logger logger = LoggerFactory.getLogger(SimpleBlockingThreadPool.class); - public DumbThreadPool(int poolSize, int queueSize) { - tasks = new LinkedBlockingQueue<>(queueSize); + public SimpleBlockingThreadPool(String name, int poolSize, int queueSize) { + tasks = new ArrayBlockingQueue<>(queueSize); for (int i = 0; i < poolSize; i++) { - Thread worker = new Thread(this::worker, "Crawler Thread " + i); + Thread worker = new Thread(this::worker, name + "[" + i + "]"); worker.setDaemon(true); worker.start(); workers.add(worker); } } - public void submit(Task task) throws InterruptedException { tasks.put(task); } @@ -126,4 +123,5 @@ public class DumbThreadPool { public interface Task { void run() throws Exception; } + } diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 5714f2cc..ebd8d5ed 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation project(':code:common:service') implementation project(':code:common:config') implementation project(':code:libraries:message-queue') + implementation project(':code:libraries:blocking-thread-pool') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 6cb1413a..a5e3b35e 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -14,11 +14,10 @@ import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqSingleShotInbox; -import nu.marginalia.mqapi.converting.ConvertAction; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; -import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.DatabaseModule; +import nu.marginalia.util.SimpleBlockingThreadPool; import nu.marginalia.worklog.BatchingWorkLog; import nu.marginalia.worklog.BatchingWorkLogImpl; import plan.CrawlPlan; @@ -26,7 +25,6 @@ import nu.marginalia.converting.processor.DomainProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.file.Path; import java.sql.SQLException; import java.util.Collection; @@ -107,7 +105,7 @@ public class ConverterMain { try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile()); ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir())) { - var pool = new DumbThreadPool(maxPoolSize, 2); + var pool = new SimpleBlockingThreadPool("ConverterThread", maxPoolSize, 2); int totalDomains = plan.countCrawledDomains(); AtomicInteger processedDomains = new AtomicInteger(0); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java deleted file mode 100644 index 95cbf14a..00000000 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/DumbThreadPool.java +++ /dev/null @@ -1,119 +0,0 @@ -package nu.marginalia.converting; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** A simple thread pool implementation that will never invoke - * a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor} - * does when the queue is full. Instead, it will block until a thread - * becomes available to run the task. This is useful for coarse grained - * tasks where the calling thread might otherwise block for hours. - */ -// TODO: This class exists in crawler as well, should probably be broken out into a common library; use the one from crawler instead -public class DumbThreadPool { - private final List workers = new ArrayList<>(); - private final LinkedBlockingQueue tasks; - private volatile boolean shutDown = false; - private final AtomicInteger taskCount = new AtomicInteger(0); - private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class); - - public DumbThreadPool(int poolSize, int queueSize) { - tasks = new LinkedBlockingQueue<>(queueSize); - - for (int i = 0; i < poolSize; i++) { - Thread worker = new Thread(this::worker, "Converter Thread " + i); - worker.setDaemon(true); - worker.start(); - workers.add(worker); - } - - } - - public void submit(Runnable runnable) throws InterruptedException { - tasks.put(runnable); - } - - public void shutDown() { - this.shutDown = true; - } - - public void shutDownNow() { - this.shutDown = true; - for (Thread worker : workers) { - worker.interrupt(); - } - } - - private void worker() { - while (!shutDown) { - try { - Runnable task = tasks.poll(1, TimeUnit.SECONDS); - if (task == null) { - continue; - } - - try { - taskCount.incrementAndGet(); - task.run(); - } - catch (Exception ex) { - logger.warn("Error executing task", ex); - } - finally { - taskCount.decrementAndGet(); - } - } - - catch (InterruptedException ex) { - logger.warn("Thread pool worker interrupted", ex); - return; - } - } - } - - - /** Wait for all tasks to complete up to the specified timeout, - * then return true if all tasks completed, false otherwise. - */ - public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException { - final long start = System.currentTimeMillis(); - final long deadline = start + timeUnit.toMillis(i); - - for (var thread : workers) { - if (!thread.isAlive()) - continue; - - long timeRemaining = deadline - System.currentTimeMillis(); - if (timeRemaining <= 0) - return false; - - thread.join(timeRemaining); - if (thread.isAlive()) - return false; - } - - // Doublecheck the bookkeeping so we didn't mess up. This may mean you have to Ctrl+C the process - // if you see this warning forever, but for the crawler this is preferable to terminating early - // and missing tasks. (maybe some cosmic ray or OOM condition or X-Files baddie of the week killed a - // thread so hard and it didn't invoke finally and didn't decrement the task count) - - int activeCount = getActiveCount(); - if (activeCount != 0) { - logger.warn("Thread pool terminated with {} active threads(?!) -- check what's going on with jstack and kill manually", activeCount); - return false; - } - - return true; - } - - public int getActiveCount() { - return taskCount.get(); - } - -} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/HtmlProcessorSpecializations.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/HtmlProcessorSpecializations.java index b64c1dde..0ac11b05 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/HtmlProcessorSpecializations.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/HtmlProcessorSpecializations.java @@ -17,6 +17,7 @@ public class HtmlProcessorSpecializations { private final XenForoSpecialization xenforoSpecialization; private final PhpBBSpecialization phpBBSpecialization; private final JavadocSpecialization javadocSpecialization; + private final MariadbKbSpecialization mariadbKbSpecialization; private final BlogSpecialization blogSpecialization; private final DefaultSpecialization defaultSpecialization; @@ -26,13 +27,14 @@ public class HtmlProcessorSpecializations { XenForoSpecialization xenforoSpecialization, PhpBBSpecialization phpBBSpecialization, JavadocSpecialization javadocSpecialization, - BlogSpecialization blogSpecialization, + MariadbKbSpecialization mariadbKbSpecialization, BlogSpecialization blogSpecialization, DefaultSpecialization defaultSpecialization) { this.domainTypes = domainTypes; this.lemmySpecialization = lemmySpecialization; this.xenforoSpecialization = xenforoSpecialization; this.phpBBSpecialization = phpBBSpecialization; this.javadocSpecialization = javadocSpecialization; + this.mariadbKbSpecialization = mariadbKbSpecialization; this.blogSpecialization = blogSpecialization; this.defaultSpecialization = defaultSpecialization; } @@ -47,6 +49,11 @@ public class HtmlProcessorSpecializations { return blogSpecialization; } + if (url.domain.getDomain().equals("mariadb.com") + && url.path.startsWith("/kb")) { + return mariadbKbSpecialization; + } + if (generator.keywords().contains("lemmy")) { return lemmySpecialization; } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/MariadbKbSpecialization.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/MariadbKbSpecialization.java new file mode 100644 index 00000000..b26288e9 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/plugin/specialization/MariadbKbSpecialization.java @@ -0,0 +1,66 @@ +package nu.marginalia.converting.processor.plugin.specialization; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.keyword.model.DocumentKeywordsBuilder; +import nu.marginalia.model.idx.WordFlags; +import nu.marginalia.summary.SummaryExtractor; +import org.apache.commons.lang3.StringUtils; +import org.jsoup.nodes.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +@Singleton +public class MariadbKbSpecialization extends DefaultSpecialization { + private static final Logger logger = LoggerFactory.getLogger(MariadbKbSpecialization.class); + + @Inject + public MariadbKbSpecialization(SummaryExtractor summaryExtractor) { + super(summaryExtractor); + } + + @Override + public Document prune(Document doc) { + var newDoc = new Document(doc.baseUri()); + var bodyTag = newDoc.appendElement("body"); + + var comments = doc.getElementById("comments"); + if (comments != null) + comments.remove(); + + var contentTag= doc.getElementById("content"); + if (contentTag != null) + bodyTag.appendChild(newDoc.createElement("section").html(contentTag.html())); + + return newDoc; + } + + @Override + public void amendWords(Document doc, DocumentKeywordsBuilder words) { + Set toAdd = new HashSet<>(); + + for (var elem : doc.getElementsByTag("strong")) { + var text = elem.text(); + + if (text.contains(":")) + continue; + if (text.contains("(")) + continue; + + String[] keywords = text.toLowerCase().split("\\s+"); + if (keywords.length > 4) + continue; + + toAdd.addAll(List.of(keywords)); + for (int i = 1; i < keywords.length; i++) { + toAdd.add(keywords[i-1] + "_" + keywords[i]); + } + } + + System.out.println("Generated keywords: " + toAdd); + words.setFlagOnMetadataForWords(WordFlags.Subjects, toAdd); + } + +} \ No newline at end of file diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index 49a6a426..aa70079e 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:service') implementation project(':code:libraries:big-string') + implementation project(':code:libraries:blocking-thread-pool') implementation project(':code:api:index-api') implementation project(':code:api:process-mqapi') implementation project(':code:common:service-discovery') diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index 66ace4ff..50d4b1a8 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -23,6 +23,7 @@ import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.crawling.io.CrawledDomainWriter; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; +import nu.marginalia.util.SimpleBlockingThreadPool; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; import okhttp3.internal.Util; @@ -53,7 +54,7 @@ public class CrawlerMain { private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; private final Gson gson; - private final DumbThreadPool pool; + private final SimpleBlockingThreadPool pool; private final Map processingIds = new ConcurrentHashMap<>(); private final CrawledDomainReader reader = new CrawledDomainReader(); @@ -77,7 +78,7 @@ public class CrawlerMain { this.gson = gson; // maybe need to set -Xss for JVM to deal with this? - pool = new DumbThreadPool(CrawlLimiter.maxPoolSize, 1); + pool = new SimpleBlockingThreadPool("CrawlerPool", CrawlLimiter.maxPoolSize, 1); } public static void main(String... args) throws Exception { @@ -150,7 +151,7 @@ public class CrawlerMain { } } - class CrawlTask implements DumbThreadPool.Task { + class CrawlTask implements SimpleBlockingThreadPool.Task { private final CrawlSpecRecord specification; diff --git a/settings.gradle b/settings.gradle index ea0dce52..80887d3d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,6 +16,7 @@ include 'code:libraries:guarded-regex' include 'code:libraries:big-string' include 'code:libraries:random-write-funnel' include 'code:libraries:next-prime' +include 'code:libraries:blocking-thread-pool' include 'code:libraries:braille-block-punch-cards' include 'code:libraries:language-processing' include 'code:libraries:term-frequency-dict'