(slop) Replace SlopPageRef<T> with SlopTable.Ref<T>

This commit is contained in:
Viktor Lofgren 2024-08-21 10:13:49 +02:00
parent e4c97a91d8
commit 266d6e4bea
11 changed files with 60 additions and 66 deletions

View File

@ -148,12 +148,8 @@ public record SlopDocumentRecord(
private final ByteArrayColumn.Reader spanCodesReader; private final ByteArrayColumn.Reader spanCodesReader;
private final GammaCodedSequenceArrayColumn.Reader spansReader; private final GammaCodedSequenceArrayColumn.Reader spansReader;
public KeywordsProjectionReader(SlopPageRef<SlopDocumentRecord> pageRef) throws IOException { public KeywordsProjectionReader(SlopTable.Ref<SlopDocumentRecord> pageRef) throws IOException {
this(pageRef.baseDir(), pageRef.page()); super(pageRef);
}
public KeywordsProjectionReader(Path baseDir, int page) throws IOException {
super(baseDir, page);
domainsReader = domainsColumn.open(this); domainsReader = domainsColumn.open(this);
ordinalsReader = ordinalsColumn.open(this); ordinalsReader = ordinalsColumn.open(this);
htmlFeaturesReader = htmlFeaturesColumn.open(this); htmlFeaturesReader = htmlFeaturesColumn.open(this);
@ -216,12 +212,8 @@ public record SlopDocumentRecord(
private final FloatColumn.Reader qualitiesReader; private final FloatColumn.Reader qualitiesReader;
private final IntColumn.Reader pubYearReader; private final IntColumn.Reader pubYearReader;
public MetadataReader(SlopPageRef<SlopDocumentRecord> pageRef) throws IOException{ public MetadataReader(SlopTable.Ref<SlopDocumentRecord> pageRef) throws IOException{
this(pageRef.baseDir(), pageRef.page()); super(pageRef);
}
public MetadataReader(Path baseDir, int page) throws IOException {
super(baseDir, page);
this.domainsReader = domainsColumn.open(this); this.domainsReader = domainsColumn.open(this);
this.urlsReader = urlsColumn.open(this); this.urlsReader = urlsColumn.open(this);
@ -236,6 +228,10 @@ public record SlopDocumentRecord(
this.pubYearReader = pubYearColumn.open(this); this.pubYearReader = pubYearColumn.open(this);
} }
public MetadataReader(Path baseDir, int page) throws IOException {
this(new Ref<>(baseDir, page));
}
public boolean hasMore() throws IOException { public boolean hasMore() throws IOException {
return domainsReader.hasRemaining(); return domainsReader.hasRemaining();
} }

View File

@ -23,17 +23,17 @@ public record SlopDomainLinkRecord(
private final TxtStringColumn.Reader sourcesReader; private final TxtStringColumn.Reader sourcesReader;
private final TxtStringColumn.Reader destsReader; private final TxtStringColumn.Reader destsReader;
public Reader(SlopPageRef<SlopDomainLinkRecord> page) throws IOException { public Reader(SlopTable.Ref<SlopDomainLinkRecord> ref) throws IOException {
this(page.baseDir(), page.page()); super(ref);
}
public Reader(Path baseDir, int page) throws IOException {
super(baseDir, page);
sourcesReader = sourcesColumn.open(this); sourcesReader = sourcesColumn.open(this);
destsReader = destsColumn.open(this); destsReader = destsColumn.open(this);
} }
public Reader(Path baseDir, int page) throws IOException {
this(new Ref<>(baseDir, page));
}
public boolean hasMore() throws IOException { public boolean hasMore() throws IOException {
return sourcesReader.hasRemaining(); return sourcesReader.hasRemaining();
} }

View File

@ -43,12 +43,12 @@ public record SlopDomainRecord(
public static class DomainNameReader extends SlopTable { public static class DomainNameReader extends SlopTable {
private final TxtStringColumn.Reader domainsReader; private final TxtStringColumn.Reader domainsReader;
public DomainNameReader(SlopPageRef<SlopDomainRecord> page) throws IOException { public DomainNameReader(Path baseDir, int page) throws IOException {
this(page.baseDir(), page.page()); this(new Ref<>(baseDir, page));
} }
public DomainNameReader(Path baseDir, int page) throws IOException { public DomainNameReader(SlopTable.Ref<SlopDomainRecord> ref) throws IOException {
super(baseDir, page); super(ref);
domainsReader = domainsColumn.open(this); domainsReader = domainsColumn.open(this);
} }
@ -66,17 +66,17 @@ public record SlopDomainRecord(
private final TxtStringColumn.Reader domainsReader; private final TxtStringColumn.Reader domainsReader;
private final TxtStringColumn.Reader ipReader; private final TxtStringColumn.Reader ipReader;
public DomainWithIpReader(SlopPageRef<SlopDomainRecord> page) throws IOException { public DomainWithIpReader(SlopTable.Ref<SlopDomainRecord> ref) throws IOException {
this(page.baseDir(), page.page()); super(ref);
}
public DomainWithIpReader(Path baseDir, int page) throws IOException {
super(baseDir, page);
domainsReader = domainsColumn.open(this); domainsReader = domainsColumn.open(this);
ipReader = ipColumn.open(this); ipReader = ipColumn.open(this);
} }
public DomainWithIpReader(Path baseDir, int page) throws IOException {
this(new Ref<>(baseDir, page));
}
public boolean hasMore() throws IOException { public boolean hasMore() throws IOException {
return domainsReader.hasRemaining(); return domainsReader.hasRemaining();
} }
@ -102,12 +102,8 @@ public record SlopDomainRecord(
private final ObjectArrayColumn<String>.Reader rssFeedsReader; private final ObjectArrayColumn<String>.Reader rssFeedsReader;
public Reader(SlopPageRef<SlopDomainRecord> page) throws IOException { public Reader(SlopTable.Ref<SlopDomainRecord> ref) throws IOException {
this(page.baseDir(), page.page()); super(ref);
}
public Reader(Path baseDir, int page) throws IOException {
super(baseDir, page);
domainsReader = domainsColumn.open(this); domainsReader = domainsColumn.open(this);
statesReader = statesColumn.open(this); statesReader = statesColumn.open(this);
@ -121,6 +117,10 @@ public record SlopDomainRecord(
rssFeedsReader = rssFeedsColumn.open(this); rssFeedsReader = rssFeedsColumn.open(this);
} }
public Reader(Path baseDir, int page) throws IOException {
this(new Ref<>(baseDir, page));
}
public boolean hasMore() throws IOException { public boolean hasMore() throws IOException {
return domainsReader.hasRemaining(); return domainsReader.hasRemaining();
} }

View File

@ -1,6 +0,0 @@
package nu.marginalia.model.processed;
import java.nio.file.Path;
public record SlopPageRef<T>(Path baseDir, int page) {
}

View File

@ -1,6 +1,7 @@
package nu.marginalia.model.processed; package nu.marginalia.model.processed;
import nu.marginalia.sequence.GammaCodedSequence; import nu.marginalia.sequence.GammaCodedSequence;
import nu.marginalia.slop.SlopTable;
import nu.marginalia.test.TestUtil; import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -54,7 +55,7 @@ public class SlopDocumentRecordTest {
writer.write(record); writer.write(record);
} }
try (var keywordReader = new SlopDocumentRecord.KeywordsProjectionReader(testDir, 0)) { try (var keywordReader = new SlopDocumentRecord.KeywordsProjectionReader(new SlopTable.Ref<>(testDir, 0))) {
assertTrue(keywordReader.hasMore()); assertTrue(keywordReader.hasMore());
var readRecord = keywordReader.next(); var readRecord = keywordReader.next();
assertFalse(keywordReader.hasMore()); assertFalse(keywordReader.hasMore());

View File

@ -4,7 +4,7 @@ import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.model.processed.SlopDocumentRecord; import nu.marginalia.model.processed.SlopDocumentRecord;
import nu.marginalia.model.processed.SlopDomainLinkRecord; import nu.marginalia.model.processed.SlopDomainLinkRecord;
import nu.marginalia.model.processed.SlopDomainRecord; import nu.marginalia.model.processed.SlopDomainRecord;
import nu.marginalia.model.processed.SlopPageRef; import nu.marginalia.slop.SlopTable;
import nu.marginalia.worklog.BatchingWorkLogInspector; import nu.marginalia.worklog.BatchingWorkLogInspector;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -43,31 +43,34 @@ public class LoaderInputData {
lastGoodBatch.put(singleSource, lastBatch); lastGoodBatch.put(singleSource, lastBatch);
} }
public Collection<SlopPageRef<SlopDomainRecord>> listDomainPages() { public Collection<SlopTable.Ref<SlopDomainRecord>> listDomainPages() {
List<SlopPageRef<SlopDomainRecord>> pathsAll = new ArrayList<>(); List<SlopTable.Ref<SlopDomainRecord>> pathsAll = new ArrayList<>();
for (var source : sourceDirectories) { for (var source : sourceDirectories) {
for (int i = 0; i < lastGoodBatch.get(source); i++) { for (int i = 0; i < lastGoodBatch.get(source); i++) {
pathsAll.add(new SlopPageRef<>(ProcessedDataFileNames.domainFileName(source), i)); pathsAll.add(new SlopTable.Ref<>(ProcessedDataFileNames.domainFileName(source), i));
} }
} }
return pathsAll; return pathsAll;
} }
public Collection<SlopPageRef<SlopDomainLinkRecord>> listDomainLinkPages() { public Collection<SlopTable.Ref<SlopDomainLinkRecord>> listDomainLinkPages() {
List<SlopPageRef<SlopDomainLinkRecord>> pathsAll = new ArrayList<>(); List<SlopTable.Ref<SlopDomainLinkRecord>> pathsAll = new ArrayList<>();
for (var source : sourceDirectories) { for (var source : sourceDirectories) {
for (int i = 0; i < lastGoodBatch.get(source); i++) { for (int i = 0; i < lastGoodBatch.get(source); i++) {
pathsAll.add(new SlopPageRef<>(ProcessedDataFileNames.domainLinkFileName(source), i)); pathsAll.add(new SlopTable.Ref<>(ProcessedDataFileNames.domainLinkFileName(source), i));
} }
} }
return pathsAll; return pathsAll;
} }
public Collection<SlopPageRef<SlopDocumentRecord>> listDocumentFiles() { public Collection<SlopTable.Ref<SlopDocumentRecord>> listDocumentFiles() {
List<SlopPageRef<SlopDocumentRecord>> pathsAll = new ArrayList<>(); List<SlopTable.Ref<SlopDocumentRecord>> pathsAll = new ArrayList<>();
for (var source : sourceDirectories) { for (var source : sourceDirectories) {
for (int i = 0; i < lastGoodBatch.get(source); i++) { for (int i = 0; i < lastGoodBatch.get(source); i++) {
pathsAll.add(new SlopPageRef<>(ProcessedDataFileNames.documentFileName(source), i)); pathsAll.add(new SlopTable.Ref<>(ProcessedDataFileNames.documentFileName(source), i));
} }
} }
return pathsAll; return pathsAll;

View File

@ -10,8 +10,8 @@ import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.processed.SlopDocumentRecord; import nu.marginalia.model.processed.SlopDocumentRecord;
import nu.marginalia.model.processed.SlopPageRef;
import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.slop.SlopTable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,7 +38,7 @@ public class DocumentLoaderService {
LoaderInputData inputData) LoaderInputData inputData)
throws IOException, SQLException throws IOException, SQLException
{ {
Collection<SlopPageRef<SlopDocumentRecord>> pageRefs = inputData.listDocumentFiles(); Collection<SlopTable.Ref<SlopDocumentRecord>> pageRefs = inputData.listDocumentFiles();
try (var taskHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("DOCUMENTS")) { try (var taskHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("DOCUMENTS")) {

View File

@ -7,8 +7,8 @@ import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.processed.SlopDocumentRecord; import nu.marginalia.model.processed.SlopDocumentRecord;
import nu.marginalia.model.processed.SlopPageRef;
import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.slop.SlopTable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,10 +30,10 @@ public class KeywordLoaderService {
LoaderInputData inputData) throws IOException { LoaderInputData inputData) throws IOException {
try (var task = heartbeat.createAdHocTaskHeartbeat("KEYWORDS")) { try (var task = heartbeat.createAdHocTaskHeartbeat("KEYWORDS")) {
Collection<SlopPageRef<SlopDocumentRecord>> documentFiles = inputData.listDocumentFiles(); Collection<SlopTable.Ref<SlopDocumentRecord>> documentFiles = inputData.listDocumentFiles();
int processed = 0; int processed = 0;
for (SlopPageRef<SlopDocumentRecord> pageRef : documentFiles) { for (SlopTable.Ref<SlopDocumentRecord> pageRef : documentFiles) {
task.progress("LOAD", processed++, documentFiles.size()); task.progress("LOAD", processed++, documentFiles.size());
try (var keywordsReader = new SlopDocumentRecord.KeywordsProjectionReader(pageRef)) { try (var keywordsReader = new SlopDocumentRecord.KeywordsProjectionReader(pageRef)) {

View File

@ -8,9 +8,9 @@ import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.processed.SlopDomainLinkRecord; import nu.marginalia.model.processed.SlopDomainLinkRecord;
import nu.marginalia.model.processed.SlopDomainRecord; import nu.marginalia.model.processed.SlopDomainRecord;
import nu.marginalia.model.processed.SlopPageRef;
import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.slop.SlopTable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -59,8 +59,8 @@ public class DomainLoaderService {
{ {
taskHeartbeat.progress(Steps.PREP_DATA); taskHeartbeat.progress(Steps.PREP_DATA);
Collection<SlopPageRef<SlopDomainRecord>> domainPageRefs = inputData.listDomainPages(); Collection<SlopTable.Ref<SlopDomainRecord>> domainPageRefs = inputData.listDomainPages();
Collection<SlopPageRef<SlopDomainLinkRecord>> domainLinkPageRefs = inputData.listDomainLinkPages(); Collection<SlopTable.Ref<SlopDomainLinkRecord>> domainLinkPageRefs = inputData.listDomainLinkPages();
// Ensure that the domains we've just crawled are in the domain database to this node // Ensure that the domains we've just crawled are in the domain database to this node
try (var inserter = new DomainInserter(conn, nodeId); try (var inserter = new DomainInserter(conn, nodeId);
@ -68,7 +68,7 @@ public class DomainLoaderService {
// Add domain names from this data set with the current node affinity // Add domain names from this data set with the current node affinity
int pageIdx = 0; int pageIdx = 0;
for (SlopPageRef<SlopDomainRecord> page : inputData.listDomainPages()) { for (SlopTable.Ref<SlopDomainRecord> page : inputData.listDomainPages()) {
processHeartbeat.progress("INSERT", pageIdx++, domainPageRefs.size()); processHeartbeat.progress("INSERT", pageIdx++, domainPageRefs.size());
try (var reader = new SlopDomainRecord.DomainNameReader(page)) { try (var reader = new SlopDomainRecord.DomainNameReader(page)) {
@ -89,7 +89,7 @@ public class DomainLoaderService {
// Add linked domains, but with -1 affinity meaning they can be grabbed by any index node // Add linked domains, but with -1 affinity meaning they can be grabbed by any index node
int pageIdx = 0; int pageIdx = 0;
for (SlopPageRef<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) { for (SlopTable.Ref<SlopDomainLinkRecord> page : inputData.listDomainLinkPages()) {
processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size()); processHeartbeat.progress("INSERT", pageIdx++, domainLinkPageRefs.size());
try (var reader = new SlopDomainLinkRecord.Reader(page)) { try (var reader = new SlopDomainLinkRecord.Reader(page)) {
@ -111,7 +111,7 @@ public class DomainLoaderService {
// Update the node affinity and IP address for each domain // Update the node affinity and IP address for each domain
int pageIdx = 0; int pageIdx = 0;
for (SlopPageRef<SlopDomainRecord> page : inputData.listDomainPages()) { for (SlopTable.Ref<SlopDomainRecord> page : inputData.listDomainPages()) {
processHeartbeat.progress("UPDATE", pageIdx++, domainPageRefs.size()); processHeartbeat.progress("UPDATE", pageIdx++, domainPageRefs.size());
try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId); try (var updater = new DomainAffinityAndIpUpdater(conn, nodeId);
@ -154,7 +154,7 @@ public class DomainLoaderService {
int processed = 0; int processed = 0;
Collection<SlopPageRef<SlopDomainRecord>> pages = inputData.listDomainPages(); Collection<SlopTable.Ref<SlopDomainRecord>> pages = inputData.listDomainPages();
for (var page : pages) { for (var page : pages) {
taskHeartbeat.progress("UPDATE-META", processed++, pages.size()); taskHeartbeat.progress("UPDATE-META", processed++, pages.size());

View File

@ -7,8 +7,8 @@ import nu.marginalia.linkgraph.io.DomainLinksWriter;
import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.processed.SlopDomainLinkRecord; import nu.marginalia.model.processed.SlopDomainLinkRecord;
import nu.marginalia.model.processed.SlopPageRef;
import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.slop.SlopTable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,7 +34,7 @@ public class DomainLinksLoaderService {
try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS"); try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS");
var linkLoader = new LinkLoader(domainIdRegistry)) var linkLoader = new LinkLoader(domainIdRegistry))
{ {
Collection<SlopPageRef<SlopDomainLinkRecord>> pageRefs = inputData.listDomainLinkPages(); Collection<SlopTable.Ref<SlopDomainLinkRecord>> pageRefs = inputData.listDomainLinkPages();
int processed = 0; int processed = 0;

View File

@ -226,7 +226,7 @@ dependencyResolutionManagement {
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208') library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208') library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
library('slop', 'nu.marginalia', 'slop').version('0.0.5-SNAPSHOT') library('slop', 'nu.marginalia', 'slop').version('0.0.7-SNAPSHOT')
bundle('jetty', ['jetty-server', 'jetty-util', 'jetty-servlet']) bundle('jetty', ['jetty-server', 'jetty-util', 'jetty-servlet'])