diff --git a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java new file mode 100644 index 00000000..15dbc087 --- /dev/null +++ b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java @@ -0,0 +1,139 @@ +package nu.marginalia.util; + +import lombok.SneakyThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Abstraction for exposing a (typically) read-from-disk -> parallel processing -> sequential output + * workflow as an iterator, where the number of tasks is much larger than the number of cores + */ +public class ProcessingIterator implements Iterator { + private static final Logger logger = LoggerFactory.getLogger(ProcessingIterator.class); + + private final LinkedBlockingQueue queue; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final ExecutorService executorService; + private final Semaphore sem; + + private T next = null; + + private final int parallelism; + + public ProcessingIterator(int queueSize, int parallelism, ProcessingJob task) { + this.parallelism = parallelism; + + queue = new LinkedBlockingQueue<>(queueSize); + executorService = Executors.newFixedThreadPool(parallelism); + sem = new Semaphore(parallelism); + + executorService.submit(() -> executeJob(task)); + } + + private void executeJob(ProcessingJob job) { + try { + job.run(this::executeTask); + } catch (Exception e) { + logger.warn("Exception while processing", e); + } finally { + isFinished.set(true); + } + } + + private void executeTask(Task task) { + try { + sem.acquire(); + } catch (InterruptedException e) { + return; + } + + try { + queue.put(task.get()); + } catch (Exception e) { + logger.warn("Exception while processing", e); + } finally { + sem.release(); + } + } + + /** Returns true if there are more documents to be processed. + * This method may block until we are certain this is true. + *

+ * This method must be invoked from the same thread that invokes next(), + * (or synchronize between the two) + */ + @Override + @SneakyThrows + public boolean hasNext() { + if (next != null) + return true; + + do { + next = queue.poll(1, TimeUnit.SECONDS); + if (next != null) { + return true; + } + } while (expectMore()); + + if (!executorService.isShutdown()) { + executorService.shutdown(); + } + + return false; + } + + /** Heuristic for if we should expect more documents to be processed, + * _trust but verify_ since we don't run this in an exclusive section + * and may get a false positive. We never expect a false negative though. + */ + private boolean expectMore() { + return !isFinished.get() // we are still reading from the database + || !queue.isEmpty() // ... or we have documents in the queue + || sem.availablePermits() < parallelism; // ... or we are still processing documents + } + + /** Returns the next document to be processed. + * This method may block until we are certain there is a document to be processed. + *

+ * This method must be invoked from the same thread that invokes hasNext(), + * (or synchronize between the two) + *

+ * If this is run after hasNext() returns false, a NoSuchElementException is thrown. + */ + @SneakyThrows + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + try { + return next; + } + finally { + next = null; + } + } + + /** + * A job that produces a sequence of processing tasks that are to be + * performed in parallel + */ + public interface ProcessingJob { + void run(Consumer> output) throws Exception; + } + + /** + * A single task that produces a result to be iterable via the Iterator interface + * (along with other tasks' outputs) + */ + public interface Task { + T get() throws Exception; + } +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java index fc1f5015..204aa6a8 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java @@ -15,6 +15,7 @@ import nu.marginalia.converting.sideload.SideloaderProcessing; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.crawl.DomainIndexingState; +import nu.marginalia.util.ProcessingIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,11 +29,6 @@ import java.nio.file.Path; import java.sql.*; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; /** This is an experimental sideloader for encyclopedia.marginalia.nu's database; * (which serves as a way of loading wikipedia's zim files without binding to GPL2'd code) @@ -80,62 +76,24 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC @SneakyThrows @Override public Iterator getDocumentsStream() { - LinkedBlockingQueue docs = new LinkedBlockingQueue<>(32); - AtomicBoolean isFinished = new AtomicBoolean(false); + return new ProcessingIterator<>(24, 16, (taskConsumer) -> { + DomainLinks domainLinks = getDomainLinks(); - ExecutorService executorService = Executors.newFixedThreadPool(16); - Semaphore sem = new Semaphore(16); + var stmt = connection.prepareStatement(""" + SELECT url,title,html FROM articles + """); + stmt.setFetchSize(100); - DomainLinks domainLinks = getDomainLinks(); + var rs = stmt.executeQuery(); - executorService.submit(() -> { - try { - var stmt = connection.prepareStatement(""" - SELECT url,title,html FROM articles - """); - stmt.setFetchSize(100); + while (rs.next()) { + var articleParts = fromCompressedJson(rs.getBytes("html"), ArticleParts.class); + String title = rs.getString("title"); + String url = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8); - var rs = stmt.executeQuery(); - while (rs.next()) { - var articleParts = fromCompressedJson(rs.getBytes("html"), ArticleParts.class); - String title = rs.getString("title"); - String url = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8); - - sem.acquire(); - - executorService.submit(() -> { - try { - docs.add(convertDocument(articleParts.parts, title, url, domainLinks)); - } catch (URISyntaxException | DisqualifiedException e) { - logger.warn("Problem converting encyclopedia article " + url, e); - } finally { - sem.release(); - } - }); - } - - stmt.close(); - } - catch (Exception e) { - logger.warn("Problem converting encyclopedia article", e); - } - finally { - isFinished.set(true); + taskConsumer.accept(() -> convertDocument(articleParts.parts, title, url, domainLinks)); } }); - - return new Iterator<>() { - @Override - public boolean hasNext() { - return !isFinished.get() || !docs.isEmpty() || sem.availablePermits() < 16; - } - - @SneakyThrows - @Override - public ProcessedDocument next() { - return docs.take(); - } - }; } private DomainLinks getDomainLinks() { diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java new file mode 100644 index 00000000..d20b7ddf --- /dev/null +++ b/code/processes/converting-process/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java @@ -0,0 +1,38 @@ +package nu.marginalia.util; + +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ProcessingIteratorTest { + + @Test + public void test() { + Set output = new HashSet<>(); + var iter = new ProcessingIterator(2, 2, q -> { + for (int i = 0; i < 10_000; i++) { + int j = i; + q.accept(() -> task(j)); + } + }); + while (iter.hasNext()) { + output.add(iter.next()); + } + + assertEquals(10_000, output.size()); + + for (int i = 0; i < 10_000; i++) { + assertTrue(output.contains(i)); + } + } + + int task(int n) throws InterruptedException { + TimeUnit.NANOSECONDS.sleep(10); + return n; + } +} \ No newline at end of file