(loader) Minor optimizations and bugfixes.

* Reduce memory churn in LoaderIndexJournalWriter, fix bug with keyword mappings as well
* Remove remains of OldDomains
* Ensure LOADER_PROCESS_OPTS gets fed to the processes
* LinkdbStatusWriter won't execute batch after each added item post 100 items
This commit is contained in:
Viktor Lofgren 2023-08-29 15:35:17 +02:00
parent fa87c7e1b7
commit dd593c292c
7 changed files with 22 additions and 77 deletions

View File

@ -47,7 +47,8 @@ public class LinkdbStatusWriter {
stmt.setString(4, status.description());
}
stmt.addBatch();
if (++count > 100) {
if (++count > 1000) {
count = 0;
stmt.executeBatch();
}
}

View File

@ -1,15 +1,12 @@
package nu.marginalia.index.journal.reader;
import nu.marginalia.index.journal.model.IndexJournalEntryData;
import nu.marginalia.index.journal.model.IndexJournalFileHeader;
import nu.marginalia.index.journal.model.IndexJournalStatistics;
import nu.marginalia.model.idx.WordFlags;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.function.IntConsumer;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
@ -20,27 +17,11 @@ public interface IndexJournalReader extends Iterable<IndexJournalReadEntry> {
static IndexJournalReader singleFile(Path fileName) throws IOException {
return new IndexJournalReaderSingleCompressedFile(fileName);
}
static IndexJournalReader paging(Path baseDir) throws IOException {
return new IndexJournalReaderPagingImpl(baseDir);
}
static IndexJournalReader withFilters(Path path, Predicate<IndexJournalReadEntry> entryPredicate, Predicate<IndexJournalEntryData.Record> recordPredicate) throws IOException {
return new IndexJournalReaderSingleCompressedFile(path, entryPredicate, recordPredicate);
}
void forEachWordId(LongConsumer consumer);
void forEachDocIdRecord(LongObjectConsumer<IndexJournalEntryData.Record> consumer);
void forEachDocId(LongConsumer consumer);
@NotNull
@Override
Iterator<IndexJournalReadEntry> iterator();
void close() throws IOException;
static IndexJournalReader singleFileWithPriorityFilters(Path path) throws IOException {
long highPriorityFlags =
@ -57,6 +38,18 @@ public interface IndexJournalReader extends Iterable<IndexJournalReadEntry> {
r -> (r.metadata() & highPriorityFlags) != 0);
}
void forEachWordId(LongConsumer consumer);
void forEachDocIdRecord(LongObjectConsumer<IndexJournalEntryData.Record> consumer);
void forEachDocId(LongConsumer consumer);
@NotNull
@Override
Iterator<IndexJournalReadEntry> iterator();
void close() throws IOException;
interface LongObjectConsumer<T> {

View File

@ -77,7 +77,7 @@ public class Loader implements Interpreter, AutoCloseable {
public void loadProcessedDocument(LoadProcessedDocument document) {
processedDocumentList.add(document);
if (processedDocumentList.size() > 100) {
if (processedDocumentList.size() > 1000) {
loadProcessedDocument.load(data, processedDocumentList);
processedDocumentList.clear();
}
@ -86,7 +86,7 @@ public class Loader implements Interpreter, AutoCloseable {
public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) {
processedDocumentWithErrorList.add(document);
if (processedDocumentWithErrorList.size() > 100) {
if (processedDocumentWithErrorList.size() > 1000) {
loadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
processedDocumentWithErrorList.clear();
}

View File

@ -3,7 +3,6 @@ package nu.marginalia.loading.loader;
import com.google.inject.Inject;
public class LoaderFactory {
private final OldDomains oldDomains;
private final SqlLoadDomains sqlLoadDomains;
private final SqlLoadDomainLinks sqlLoadDomainLinks;
private final SqlLoadProcessedDomain sqlLoadProcessedDomain;
@ -12,15 +11,12 @@ public class LoaderFactory {
private final IndexLoadKeywords indexLoadKeywords;
@Inject
public LoaderFactory(OldDomains oldDomains,
SqlLoadDomains sqlLoadDomains,
public LoaderFactory(SqlLoadDomains sqlLoadDomains,
SqlLoadDomainLinks sqlLoadDomainLinks,
SqlLoadProcessedDomain sqlLoadProcessedDomain,
LdbLoadProcessedDocument sqlLoadProcessedDocument,
SqlLoadDomainMetadata sqlLoadDomainMetadata,
IndexLoadKeywords indexLoadKeywords) {
this.oldDomains = oldDomains;
this.sqlLoadDomains = sqlLoadDomains;
this.sqlLoadDomainLinks = sqlLoadDomainLinks;
this.sqlLoadProcessedDomain = sqlLoadProcessedDomain;

View File

@ -43,7 +43,7 @@ public class LoaderIndexJournalWriter {
}
MurmurHash3_128 hasher = new MurmurHash3_128();
long[] buffer = new long[MAX_LENGTH * 2];
@SneakyThrows
public void putWords(long combinedId,
int features,
@ -60,18 +60,14 @@ public class LoaderIndexJournalWriter {
}
String[] words = wordSet.keywords();
long[] wordIds = new long[wordSet.size()];
long[] meta = wordSet.metadata();
Arrays.parallelSetAll(wordIds, i -> hasher.hashNearlyASCII(words[i]));
long[] buffer = new long[MAX_LENGTH * 2];
for (int start = 0; start < words.length; ) {
int end = Math.min(start + MAX_LENGTH, words.length);
for (int i = 0; i < end - start; i++) {
buffer[2*i] = wordIds[i];
buffer[2*i + 1] = meta[i];
buffer[2*i] = hasher.hashNearlyASCII(words[start+i]);
buffer[2*i + 1] = meta[start+i];
}
var entry = new IndexJournalEntryData(end-start, buffer);

View File

@ -1,41 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import gnu.trove.map.hash.TObjectIntHashMap;
import nu.marginalia.model.EdgeDomain;
import java.sql.SQLException;
import static java.sql.Statement.SUCCESS_NO_INFO;
public class OldDomains {
private final TObjectIntHashMap<EdgeDomain> knownDomains = new TObjectIntHashMap<>(100_000, 0.75f, -1);
@Inject
public OldDomains(HikariDataSource dataSource) {
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.prepareStatement("""
SELECT DOMAIN_NAME, ID FROM EC_DOMAIN
"""))
{
var rs = stmt.executeQuery();
while (rs.next()) {
knownDomains.put(new EdgeDomain(rs.getString(1)), rs.getInt(2));
}
}
}
catch (SQLException ex) {
throw new RuntimeException("Failed to set up loader", ex);
}
}
public int getId(EdgeDomain domain) {
return knownDomains.get(domain);
}
public void add(EdgeDomain domain, int id) {
knownDomains.put(domain, id);
}
}

View File

@ -119,7 +119,7 @@ public class ProcessService {
private final List<String> propagatedEnvironmentVariables = List.of(
"JAVA_HOME",
"CONVERTER_PROCESS_OPTS",
// "LOADER_PROCESS_OPTS",
"LOADER_PROCESS_OPTS",
"CRAWLER_PROCESS_OPTS");
private String[] createEnvironmentVariables() {