(converter/loader) Simplify document record writing to not require predicated reads

This commit is contained in:
Viktor Lofgren 2024-07-29 14:18:52 +02:00
parent 34703da144
commit 86ea28d6bc
3 changed files with 65 additions and 153 deletions

View File

@ -21,9 +21,6 @@ import java.nio.ByteBuffer;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
/** Writer for a single batch of converter parquet files */ /** Writer for a single batch of converter parquet files */
public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriterIf { public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriterIf {
@ -60,39 +57,23 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
var domain = sideloadSource.getDomain(); var domain = sideloadSource.getDomain();
writeDomainData(domain); writeDomainData(domain);
writeDocumentData(domain.domain, sideloadSource.getDocumentsStream()); writeDocumentData(domain.domain, sideloadSource.getDocumentsStream());
} }
@Override @Override
@SneakyThrows @SneakyThrows
public void writeProcessedDomain(ProcessedDomain domain) { public void writeProcessedDomain(ProcessedDomain domain) {
var results = ForkJoinPool.commonPool().invokeAll( try {
writeTasks(domain) if (domain.documents != null) {
); writeDocumentData(domain.domain, domain.documents.iterator());
for (var result : results) {
if (result.state() == Future.State.FAILED) {
logger.warn("Parquet writing job failed", result.exceptionNow());
} }
writeLinkData(domain);
writeDomainData(domain);
}
catch (IOException e) {
logger.error("Data writing job failed", e);
} }
}
private List<Callable<Object>> writeTasks(ProcessedDomain domain) {
return List.of(
() -> writeDocumentData(domain),
() -> writeLinkData(domain),
() -> writeDomainData(domain)
);
}
private Object writeDocumentData(ProcessedDomain domain) throws IOException {
if (domain.documents == null)
return this;
writeDocumentData(domain.domain, domain.documents.iterator());
return this;
} }
private void writeDocumentData(EdgeDomain domain, private void writeDocumentData(EdgeDomain domain,
@ -108,54 +89,39 @@ public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriter
while (documentIterator.hasNext()) { while (documentIterator.hasNext()) {
var document = documentIterator.next(); var document = documentIterator.next();
if (document.details == null) { var wb = document.words.build(workArea);
new SlopDocumentRecord(
domainName, List<GammaCodedSequence> spanSequences = new ArrayList<>(wb.spans.size());
document.url.toString(), byte[] spanCodes = new byte[wb.spans.size()];
ordinal,
document.state.toString(), for (int i = 0; i < wb.spans.size(); i++) {
document.stateReason); var span = wb.spans.get(i);
spanCodes[i] = span.code();
spanSequences.add(span.spans());
} }
else {
var wb = document.words.build(workArea);
List<String> words = wb.keywords;
byte[] metas = wb.metadata;
List<GammaCodedSequence> positions = wb.positions;
documentWriter.write(new SlopDocumentRecord(
List<GammaCodedSequence> spanSequences = new ArrayList<>(wb.spans.size()); domainName,
byte[] spanCodes = new byte[wb.spans.size()]; document.url.toString(),
ordinal,
for (int i = 0; i < wb.spans.size(); i++) { document.state.toString(),
var span = wb.spans.get(i); document.stateReason,
document.details.title,
spanCodes[i] = span.code(); document.details.description,
spanSequences.add(span.spans()); HtmlFeature.encode(document.details.features),
} document.details.standard.name(),
document.details.length,
documentWriter.write(new SlopDocumentRecord( document.details.hashCode,
domainName, (float) document.details.quality,
document.url.toString(), document.details.metadata.encode(),
ordinal, document.details.pubYear,
document.state.toString(), wb.keywords,
document.stateReason, wb.metadata,
document.details.title, wb.positions,
document.details.description, spanCodes,
HtmlFeature.encode(document.details.features), spanSequences
document.details.standard.name(), ));
document.details.length,
document.details.hashCode,
(float) document.details.quality,
document.details.metadata.encode(),
document.details.pubYear,
words,
metas,
positions,
spanCodes,
spanSequences
));
}
ordinal++; ordinal++;
} }

View File

@ -49,16 +49,6 @@ public record SlopDocumentRecord(
List<GammaCodedSequence> spans List<GammaCodedSequence> spans
) { ) {
/** Constructor for partial records */
public SlopDocumentRecord(String domain,
String url,
int ordinal,
String state,
String stateReason)
{
this(domain, url, ordinal, state, stateReason, "", "", 0, "", 0, 0L, 0.0f, 0L, null, List.of(), new byte[0], List.of(), new byte[0], List.of());
}
public SlopDocumentRecord { public SlopDocumentRecord {
if (spanCodes.length != spans.size()) if (spanCodes.length != spans.size())
throw new IllegalArgumentException("Span codes and spans must have the same length"); throw new IllegalArgumentException("Span codes and spans must have the same length");
@ -154,8 +144,6 @@ public record SlopDocumentRecord(
private final LongColumnReader domainMetadataReader; private final LongColumnReader domainMetadataReader;
private final IntColumnReader lengthsReader; private final IntColumnReader lengthsReader;
private final StringColumnReader statesColumnReader;
private final ObjectArrayColumnReader<String> keywordsReader; private final ObjectArrayColumnReader<String> keywordsReader;
private final ByteArrayColumnReader termMetaReader; private final ByteArrayColumnReader termMetaReader;
private final GammaCodedSequenceArrayReader termPositionsReader; private final GammaCodedSequenceArrayReader termPositionsReader;
@ -174,8 +162,6 @@ public record SlopDocumentRecord(
domainMetadataReader = domainMetadata.forPage(page).open(this, baseDir); domainMetadataReader = domainMetadata.forPage(page).open(this, baseDir);
lengthsReader = lengthsColumn.forPage(page).open(this, baseDir); lengthsReader = lengthsColumn.forPage(page).open(this, baseDir);
statesColumnReader = statesColumn.forPage(page).open(this, baseDir);
keywordsReader = keywordsColumn.forPage(page).open(this, baseDir); keywordsReader = keywordsColumn.forPage(page).open(this, baseDir);
termMetaReader = termMetaColumn.forPage(page).open(this, baseDir); termMetaReader = termMetaColumn.forPage(page).open(this, baseDir);
termPositionsReader = termPositionsColumn.forPage(page).open(this, baseDir); termPositionsReader = termPositionsColumn.forPage(page).open(this, baseDir);
@ -184,29 +170,12 @@ public record SlopDocumentRecord(
spansReader = spansColumn.forPage(page).open(this, baseDir); spansReader = spansColumn.forPage(page).open(this, baseDir);
} }
KeywordsProjection next = null;
public boolean hasMore() throws IOException { public boolean hasMore() throws IOException {
if (next != null) return domainsReader.hasRemaining();
return true;
next = getNext();
return next != null;
}
public KeywordsProjection next() throws IOException {
if (hasMore()) {
var ret = next;
next = null;
return ret;
}
throw new IllegalStateException("No more records");
} }
@Nullable @Nullable
private KeywordsProjection getNext() throws IOException { public KeywordsProjection next() throws IOException {
if (!find(statesColumnReader, "OK"))
return null;
String domain = domainsReader.get(); String domain = domainsReader.get();
int ordinal = ordinalsReader.get(); int ordinal = ordinalsReader.get();
int htmlFeatures = htmlFeaturesReader.get(); int htmlFeatures = htmlFeaturesReader.get();
@ -242,8 +211,6 @@ public record SlopDocumentRecord(
private final StringColumnReader titlesReader; private final StringColumnReader titlesReader;
private final StringColumnReader descriptionsReader; private final StringColumnReader descriptionsReader;
private final StringColumnReader statesColumnReader;
private final IntColumnReader htmlFeaturesReader; private final IntColumnReader htmlFeaturesReader;
private final StringColumnReader htmlStandardsReader; private final StringColumnReader htmlStandardsReader;
private final IntColumnReader lengthsReader; private final IntColumnReader lengthsReader;
@ -256,7 +223,6 @@ public record SlopDocumentRecord(
} }
public MetadataReader(Path baseDir, int page) throws IOException { public MetadataReader(Path baseDir, int page) throws IOException {
this.statesColumnReader = statesColumn.forPage(page).open(this, baseDir);
this.domainsReader = domainsColumn.forPage(page).open(this, baseDir); this.domainsReader = domainsColumn.forPage(page).open(this, baseDir);
this.urlsReader = urlsColumn.forPage(page).open(this, baseDir); this.urlsReader = urlsColumn.forPage(page).open(this, baseDir);
this.ordinalsReader = ordinalsColumn.forPage(page).open(this, baseDir); this.ordinalsReader = ordinalsColumn.forPage(page).open(this, baseDir);
@ -270,29 +236,11 @@ public record SlopDocumentRecord(
this.pubYearReader = pubYearColumn.forPage(page).open(this, baseDir); this.pubYearReader = pubYearColumn.forPage(page).open(this, baseDir);
} }
MetadataProjection next = null;
public boolean hasMore() throws IOException { public boolean hasMore() throws IOException {
if (next != null) return domainsReader.hasRemaining();
return true;
return (next = getNext()) != null;
} }
public MetadataProjection next() throws IOException { public MetadataProjection next() throws IOException {
if (hasMore()) {
var ret = next;
next = null;
return ret;
}
throw new IllegalStateException("No more records");
}
private MetadataProjection getNext() throws IOException {
if (!find(statesColumnReader, "OK"))
return null;
int pubYear = pubYearReader.get(); int pubYear = pubYearReader.get();
return new MetadataProjection( return new MetadataProjection(
domainsReader.get(), domainsReader.get(),

View File

@ -33,8 +33,7 @@ public class SlopDocumentRecordTest {
@Test @Test
public void test() throws IOException { public void test() throws IOException {
ByteBuffer workArea = ByteBuffer.allocate(1024); ByteBuffer workArea = ByteBuffer.allocate(1024);
var recordShort = new SlopDocumentRecord("test", "https://test/foo", 0, "ERROR", "Cosmic Ray"); var record = new SlopDocumentRecord("example.com", "https://example.com/foo", 1, "OK", "",
var recordLong = new SlopDocumentRecord("example.com", "https://example.com/foo", 1, "OK", "",
"test", "test",
"testtest", "testtest",
1, 1,
@ -52,8 +51,7 @@ public class SlopDocumentRecordTest {
); );
try (var writer = new SlopDocumentRecord.Writer(testDir, 0)) { try (var writer = new SlopDocumentRecord.Writer(testDir, 0)) {
writer.write(recordShort); writer.write(record);
writer.write(recordLong);
} }
try (var keywordReader = new SlopDocumentRecord.KeywordsProjectionReader(testDir, 0)) { try (var keywordReader = new SlopDocumentRecord.KeywordsProjectionReader(testDir, 0)) {
@ -62,16 +60,16 @@ public class SlopDocumentRecordTest {
assertFalse(keywordReader.hasMore()); assertFalse(keywordReader.hasMore());
var expected = new SlopDocumentRecord.KeywordsProjection( var expected = new SlopDocumentRecord.KeywordsProjection(
recordLong.domain(), record.domain(),
recordLong.ordinal(), record.ordinal(),
recordLong.htmlFeatures(), record.htmlFeatures(),
recordLong.documentMetadata(), record.documentMetadata(),
recordLong.length(), record.length(),
recordLong.words(), record.words(),
recordLong.metas(), record.metas(),
recordLong.positions(), record.positions(),
recordLong.spanCodes(), record.spanCodes(),
recordLong.spans() record.spans()
); );
Assertions.assertEquals(expected, readRecord); Assertions.assertEquals(expected, readRecord);
@ -83,17 +81,17 @@ public class SlopDocumentRecordTest {
assertFalse(docDataReader.hasMore()); assertFalse(docDataReader.hasMore());
var expected2 = new SlopDocumentRecord.MetadataProjection( var expected2 = new SlopDocumentRecord.MetadataProjection(
recordLong.domain(), record.domain(),
recordLong.url(), record.url(),
recordLong.ordinal(), record.ordinal(),
recordLong.title(), record.title(),
recordLong.description(), record.description(),
recordLong.htmlFeatures(), record.htmlFeatures(),
recordLong.htmlStandard(), record.htmlStandard(),
recordLong.length(), record.length(),
recordLong.hash(), record.hash(),
recordLong.quality(), record.quality(),
recordLong.pubYear() record.pubYear()
); );
Assertions.assertEquals(expected2, readRecord); Assertions.assertEquals(expected2, readRecord);