From 86ea28d6bc4b20a562e4949ad20c603874bbec67 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 29 Jul 2024 14:18:52 +0200 Subject: [PATCH] (converter/loader) Simplify document record writing to not require predicated reads --- .../writer/ConverterBatchWriter.java | 112 ++++++------------ .../model/processed/SlopDocumentRecord.java | 58 +-------- .../processed/SlopDocumentRecordTest.java | 48 ++++---- 3 files changed, 65 insertions(+), 153 deletions(-) diff --git a/code/processes/converting-process/java/nu/marginalia/converting/writer/ConverterBatchWriter.java b/code/processes/converting-process/java/nu/marginalia/converting/writer/ConverterBatchWriter.java index ea04cbe3..c53de5d2 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/writer/ConverterBatchWriter.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/writer/ConverterBatchWriter.java @@ -21,9 +21,6 @@ import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; /** Writer for a single batch of converter parquet files */ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriterIf { @@ -60,39 +57,23 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter var domain = sideloadSource.getDomain(); writeDomainData(domain); - writeDocumentData(domain.domain, sideloadSource.getDocumentsStream()); } @Override @SneakyThrows public void writeProcessedDomain(ProcessedDomain domain) { - var results = ForkJoinPool.commonPool().invokeAll( - writeTasks(domain) - ); - - for (var result : results) { - if (result.state() == Future.State.FAILED) { - logger.warn("Parquet writing job failed", result.exceptionNow()); + try { + if (domain.documents != null) { + writeDocumentData(domain.domain, domain.documents.iterator()); } + writeLinkData(domain); + writeDomainData(domain); + } + catch (IOException e) { + logger.error("Data writing job failed", e); } - } - private List> writeTasks(ProcessedDomain domain) { - return List.of( - () -> writeDocumentData(domain), - () -> writeLinkData(domain), - () -> writeDomainData(domain) - ); - } - - private Object writeDocumentData(ProcessedDomain domain) throws IOException { - if (domain.documents == null) - return this; - - writeDocumentData(domain.domain, domain.documents.iterator()); - - return this; } private void writeDocumentData(EdgeDomain domain, @@ -108,54 +89,39 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter while (documentIterator.hasNext()) { var document = documentIterator.next(); - if (document.details == null) { - new SlopDocumentRecord( - domainName, - document.url.toString(), - ordinal, - document.state.toString(), - document.stateReason); + var wb = document.words.build(workArea); + + List spanSequences = new ArrayList<>(wb.spans.size()); + byte[] spanCodes = new byte[wb.spans.size()]; + + for (int i = 0; i < wb.spans.size(); i++) { + var span = wb.spans.get(i); + + spanCodes[i] = span.code(); + spanSequences.add(span.spans()); } - else { - var wb = document.words.build(workArea); - List words = wb.keywords; - byte[] metas = wb.metadata; - List positions = wb.positions; - - List spanSequences = new ArrayList<>(wb.spans.size()); - byte[] spanCodes = new byte[wb.spans.size()]; - - for (int i = 0; i < wb.spans.size(); i++) { - var span = wb.spans.get(i); - - spanCodes[i] = span.code(); - spanSequences.add(span.spans()); - } - - documentWriter.write(new SlopDocumentRecord( - domainName, - document.url.toString(), - ordinal, - document.state.toString(), - document.stateReason, - document.details.title, - document.details.description, - HtmlFeature.encode(document.details.features), - document.details.standard.name(), - document.details.length, - document.details.hashCode, - (float) document.details.quality, - document.details.metadata.encode(), - document.details.pubYear, - words, - metas, - positions, - spanCodes, - spanSequences - )); - - } + documentWriter.write(new SlopDocumentRecord( + domainName, + document.url.toString(), + ordinal, + document.state.toString(), + document.stateReason, + document.details.title, + document.details.description, + HtmlFeature.encode(document.details.features), + document.details.standard.name(), + document.details.length, + document.details.hashCode, + (float) document.details.quality, + document.details.metadata.encode(), + document.details.pubYear, + wb.keywords, + wb.metadata, + wb.positions, + spanCodes, + spanSequences + )); ordinal++; } diff --git a/code/processes/converting-process/model/java/nu/marginalia/model/processed/SlopDocumentRecord.java b/code/processes/converting-process/model/java/nu/marginalia/model/processed/SlopDocumentRecord.java index 2c4671fe..6e3f139e 100644 --- a/code/processes/converting-process/model/java/nu/marginalia/model/processed/SlopDocumentRecord.java +++ b/code/processes/converting-process/model/java/nu/marginalia/model/processed/SlopDocumentRecord.java @@ -49,16 +49,6 @@ public record SlopDocumentRecord( List spans ) { - /** Constructor for partial records */ - public SlopDocumentRecord(String domain, - String url, - int ordinal, - String state, - String stateReason) - { - this(domain, url, ordinal, state, stateReason, "", "", 0, "", 0, 0L, 0.0f, 0L, null, List.of(), new byte[0], List.of(), new byte[0], List.of()); - } - public SlopDocumentRecord { if (spanCodes.length != spans.size()) throw new IllegalArgumentException("Span codes and spans must have the same length"); @@ -154,8 +144,6 @@ public record SlopDocumentRecord( private final LongColumnReader domainMetadataReader; private final IntColumnReader lengthsReader; - private final StringColumnReader statesColumnReader; - private final ObjectArrayColumnReader keywordsReader; private final ByteArrayColumnReader termMetaReader; private final GammaCodedSequenceArrayReader termPositionsReader; @@ -174,8 +162,6 @@ public record SlopDocumentRecord( domainMetadataReader = domainMetadata.forPage(page).open(this, baseDir); lengthsReader = lengthsColumn.forPage(page).open(this, baseDir); - statesColumnReader = statesColumn.forPage(page).open(this, baseDir); - keywordsReader = keywordsColumn.forPage(page).open(this, baseDir); termMetaReader = termMetaColumn.forPage(page).open(this, baseDir); termPositionsReader = termPositionsColumn.forPage(page).open(this, baseDir); @@ -184,29 +170,12 @@ public record SlopDocumentRecord( spansReader = spansColumn.forPage(page).open(this, baseDir); } - KeywordsProjection next = null; - public boolean hasMore() throws IOException { - if (next != null) - return true; - next = getNext(); - return next != null; - } - - public KeywordsProjection next() throws IOException { - if (hasMore()) { - var ret = next; - next = null; - return ret; - } - throw new IllegalStateException("No more records"); + return domainsReader.hasRemaining(); } @Nullable - private KeywordsProjection getNext() throws IOException { - if (!find(statesColumnReader, "OK")) - return null; - + public KeywordsProjection next() throws IOException { String domain = domainsReader.get(); int ordinal = ordinalsReader.get(); int htmlFeatures = htmlFeaturesReader.get(); @@ -242,8 +211,6 @@ public record SlopDocumentRecord( private final StringColumnReader titlesReader; private final StringColumnReader descriptionsReader; - private final StringColumnReader statesColumnReader; - private final IntColumnReader htmlFeaturesReader; private final StringColumnReader htmlStandardsReader; private final IntColumnReader lengthsReader; @@ -256,7 +223,6 @@ public record SlopDocumentRecord( } public MetadataReader(Path baseDir, int page) throws IOException { - this.statesColumnReader = statesColumn.forPage(page).open(this, baseDir); this.domainsReader = domainsColumn.forPage(page).open(this, baseDir); this.urlsReader = urlsColumn.forPage(page).open(this, baseDir); this.ordinalsReader = ordinalsColumn.forPage(page).open(this, baseDir); @@ -270,29 +236,11 @@ public record SlopDocumentRecord( this.pubYearReader = pubYearColumn.forPage(page).open(this, baseDir); } - MetadataProjection next = null; - public boolean hasMore() throws IOException { - if (next != null) - return true; - - return (next = getNext()) != null; + return domainsReader.hasRemaining(); } public MetadataProjection next() throws IOException { - if (hasMore()) { - var ret = next; - next = null; - return ret; - } - throw new IllegalStateException("No more records"); - } - - - private MetadataProjection getNext() throws IOException { - if (!find(statesColumnReader, "OK")) - return null; - int pubYear = pubYearReader.get(); return new MetadataProjection( domainsReader.get(), diff --git a/code/processes/converting-process/model/test/nu/marginalia/model/processed/SlopDocumentRecordTest.java b/code/processes/converting-process/model/test/nu/marginalia/model/processed/SlopDocumentRecordTest.java index 1841a518..9a3aef56 100644 --- a/code/processes/converting-process/model/test/nu/marginalia/model/processed/SlopDocumentRecordTest.java +++ b/code/processes/converting-process/model/test/nu/marginalia/model/processed/SlopDocumentRecordTest.java @@ -33,8 +33,7 @@ public class SlopDocumentRecordTest { @Test public void test() throws IOException { ByteBuffer workArea = ByteBuffer.allocate(1024); - var recordShort = new SlopDocumentRecord("test", "https://test/foo", 0, "ERROR", "Cosmic Ray"); - var recordLong = new SlopDocumentRecord("example.com", "https://example.com/foo", 1, "OK", "", + var record = new SlopDocumentRecord("example.com", "https://example.com/foo", 1, "OK", "", "test", "testtest", 1, @@ -52,8 +51,7 @@ public class SlopDocumentRecordTest { ); try (var writer = new SlopDocumentRecord.Writer(testDir, 0)) { - writer.write(recordShort); - writer.write(recordLong); + writer.write(record); } try (var keywordReader = new SlopDocumentRecord.KeywordsProjectionReader(testDir, 0)) { @@ -62,16 +60,16 @@ public class SlopDocumentRecordTest { assertFalse(keywordReader.hasMore()); var expected = new SlopDocumentRecord.KeywordsProjection( - recordLong.domain(), - recordLong.ordinal(), - recordLong.htmlFeatures(), - recordLong.documentMetadata(), - recordLong.length(), - recordLong.words(), - recordLong.metas(), - recordLong.positions(), - recordLong.spanCodes(), - recordLong.spans() + record.domain(), + record.ordinal(), + record.htmlFeatures(), + record.documentMetadata(), + record.length(), + record.words(), + record.metas(), + record.positions(), + record.spanCodes(), + record.spans() ); Assertions.assertEquals(expected, readRecord); @@ -83,17 +81,17 @@ public class SlopDocumentRecordTest { assertFalse(docDataReader.hasMore()); var expected2 = new SlopDocumentRecord.MetadataProjection( - recordLong.domain(), - recordLong.url(), - recordLong.ordinal(), - recordLong.title(), - recordLong.description(), - recordLong.htmlFeatures(), - recordLong.htmlStandard(), - recordLong.length(), - recordLong.hash(), - recordLong.quality(), - recordLong.pubYear() + record.domain(), + record.url(), + record.ordinal(), + record.title(), + record.description(), + record.htmlFeatures(), + record.htmlStandard(), + record.length(), + record.hash(), + record.quality(), + record.pubYear() ); Assertions.assertEquals(expected2, readRecord);