mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-22 20:48:59 +00:00
(convert) Add basic support for Warc file sideloading
This update includes the integration of the jwarc library and implements support for Warc file sideloading, as a first trial integration with this library.
This commit is contained in:
parent
156c067f79
commit
cc813a5624
@ -4,5 +4,6 @@ public enum ConvertAction {
|
||||
ConvertCrawlData,
|
||||
SideloadEncyclopedia,
|
||||
SideloadDirtree,
|
||||
SideloadWarc,
|
||||
SideloadStackexchange
|
||||
}
|
||||
|
@ -38,6 +38,13 @@ public class ConvertRequest {
|
||||
destId,
|
||||
null);
|
||||
}
|
||||
public static ConvertRequest forWarc(Path sourcePath, FileStorageId destId) {
|
||||
return new ConvertRequest(ConvertAction.SideloadWarc,
|
||||
sourcePath.toString(),
|
||||
null,
|
||||
destId,
|
||||
null);
|
||||
}
|
||||
|
||||
public static ConvertRequest forStackexchange(Path sourcePath, FileStorageId destId) {
|
||||
return new ConvertRequest(ConvertAction.SideloadStackexchange,
|
||||
|
@ -65,6 +65,7 @@ dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
implementation libs.notnull
|
||||
implementation libs.jwarc
|
||||
|
||||
implementation libs.jsoup
|
||||
|
||||
|
@ -262,6 +262,14 @@ public class ConverterMain {
|
||||
processData.asPath(),
|
||||
msg, inbox);
|
||||
}
|
||||
case SideloadWarc -> {
|
||||
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
||||
|
||||
yield new SideloadAction(
|
||||
sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)),
|
||||
processData.asPath(),
|
||||
msg, inbox);
|
||||
}
|
||||
case SideloadStackexchange -> {
|
||||
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
||||
|
||||
|
@ -6,6 +6,7 @@ import nu.marginalia.atags.source.AnchorTagsSourceFactory;
|
||||
import nu.marginalia.converting.sideload.dirtree.DirtreeSideloaderFactory;
|
||||
import nu.marginalia.converting.sideload.encyclopedia.EncyclopediaMarginaliaNuSideloader;
|
||||
import nu.marginalia.converting.sideload.stackexchange.StackexchangeSideloader;
|
||||
import nu.marginalia.converting.sideload.warc.WarcSideloadFactory;
|
||||
import nu.marginalia.keyword.DocumentKeywordExtractor;
|
||||
import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider;
|
||||
|
||||
@ -22,6 +23,7 @@ public class SideloadSourceFactory {
|
||||
private final DocumentKeywordExtractor documentKeywordExtractor;
|
||||
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
|
||||
private final DirtreeSideloaderFactory dirtreeSideloaderFactory;
|
||||
private final WarcSideloadFactory warcSideloadFactory;
|
||||
|
||||
@Inject
|
||||
public SideloadSourceFactory(Gson gson,
|
||||
@ -29,13 +31,15 @@ public class SideloadSourceFactory {
|
||||
ThreadLocalSentenceExtractorProvider sentenceExtractorProvider,
|
||||
DocumentKeywordExtractor documentKeywordExtractor,
|
||||
AnchorTagsSourceFactory anchorTagsSourceFactory,
|
||||
DirtreeSideloaderFactory dirtreeSideloaderFactory) {
|
||||
DirtreeSideloaderFactory dirtreeSideloaderFactory,
|
||||
WarcSideloadFactory warcSideloadFactory) {
|
||||
this.gson = gson;
|
||||
this.sideloaderProcessing = sideloaderProcessing;
|
||||
this.sentenceExtractorProvider = sentenceExtractorProvider;
|
||||
this.documentKeywordExtractor = documentKeywordExtractor;
|
||||
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
|
||||
this.dirtreeSideloaderFactory = dirtreeSideloaderFactory;
|
||||
this.warcSideloadFactory = warcSideloadFactory;
|
||||
}
|
||||
|
||||
public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile, String baseUrl) throws SQLException {
|
||||
@ -46,6 +50,10 @@ public class SideloadSourceFactory {
|
||||
return dirtreeSideloaderFactory.createSideloaders(pathToYamlFile);
|
||||
}
|
||||
|
||||
public Collection<? extends SideloadSource> sideloadWarc(Path pathToWarcFiles) throws IOException {
|
||||
return warcSideloadFactory.createSideloaders(pathToWarcFiles);
|
||||
}
|
||||
|
||||
/** Do not use, this code isn't finished */
|
||||
public Collection<? extends SideloadSource> sideloadStackexchange(Path pathToDbFileRoot) throws IOException {
|
||||
try (var dirs = Files.walk(pathToDbFileRoot)) {
|
||||
|
@ -0,0 +1,32 @@
|
||||
package nu.marginalia.converting.sideload.warc;
|
||||
|
||||
import nu.marginalia.converting.sideload.SideloadSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public class WarcSideloadFactory {
|
||||
|
||||
public Collection<? extends SideloadSource> createSideloaders(Path pathToWarcFiles) throws IOException {
|
||||
final List<Path> files = new ArrayList<>();
|
||||
|
||||
try (var stream = Files.list(pathToWarcFiles)) {
|
||||
stream
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(this::isWarcFile)
|
||||
.forEach(files::add);
|
||||
|
||||
}
|
||||
// stub
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean isWarcFile(Path path) {
|
||||
return path.toString().endsWith(".warc")
|
||||
|| path.toString().endsWith(".warc.gz");
|
||||
}
|
||||
}
|
@ -0,0 +1,141 @@
|
||||
package nu.marginalia.converting.sideload.warc;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.atags.model.DomainLinks;
|
||||
import nu.marginalia.converting.model.GeneratorType;
|
||||
import nu.marginalia.converting.model.ProcessedDocument;
|
||||
import nu.marginalia.converting.model.ProcessedDomain;
|
||||
import nu.marginalia.converting.sideload.SideloadSource;
|
||||
import nu.marginalia.converting.sideload.SideloaderProcessing;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
||||
import org.netpreserve.jwarc.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class WarcSideloader implements SideloadSource, AutoCloseable {
|
||||
|
||||
private final Path warcFile;
|
||||
private final SideloaderProcessing sideloaderProcessing;
|
||||
|
||||
private final WarcReader reader;
|
||||
|
||||
private final EdgeDomain domain;
|
||||
|
||||
public WarcSideloader(Path warcFile,
|
||||
SideloaderProcessing sideloaderProcessing)
|
||||
throws IOException
|
||||
{
|
||||
this.warcFile = warcFile;
|
||||
this.sideloaderProcessing = sideloaderProcessing;
|
||||
this.reader = new WarcReader(warcFile);
|
||||
this.domain = sniffDomainFromWarc()
|
||||
.orElseThrow(() -> new IOException("Could not identify domain from warc file"));
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public ProcessedDomain getDomain() {
|
||||
var ret = new ProcessedDomain();
|
||||
|
||||
ret.domain = domain;
|
||||
ret.ip = "0.0.0.0";
|
||||
ret.state = DomainIndexingState.ACTIVE;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private Optional<EdgeDomain> sniffDomainFromWarc() throws IOException {
|
||||
try {
|
||||
for (var record : reader) {
|
||||
if (!(record instanceof WarcRequest request)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String target = request.target();
|
||||
if (target.startsWith("http://") || target.startsWith("https://")) {
|
||||
return Optional.of(new EdgeUrl(target).getDomain());
|
||||
}
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
return Optional.empty();
|
||||
} finally {
|
||||
reader.position(0);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||
return reader.records()
|
||||
.filter(record -> record instanceof WarcResponse)
|
||||
.map(WarcResponse.class::cast)
|
||||
.filter(this::isRelevantResponse)
|
||||
.map(this::process)
|
||||
.iterator();
|
||||
}
|
||||
|
||||
private boolean isRelevantResponse(WarcResponse warcResponse) {
|
||||
try {
|
||||
HttpResponse httpResponse = warcResponse.http();
|
||||
if (httpResponse == null)
|
||||
return false;
|
||||
if (httpResponse.status() != 200)
|
||||
return false;
|
||||
if (!Objects.equals(httpResponse.contentType(), MediaType.HTML))
|
||||
return false;
|
||||
|
||||
var url = new EdgeUrl(warcResponse.target());
|
||||
if (!Objects.equals(url.getDomain(), domain)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private ProcessedDocument process(WarcResponse response) {
|
||||
String body = getBody(response);
|
||||
String url = response.target();
|
||||
|
||||
// We trim "/index.html"-suffixes from the index if they are present,
|
||||
// since this is typically an artifact from document retrieval
|
||||
if (url.endsWith("/index.html")) {
|
||||
url = url.substring(0, url.length() - "index.html".length());
|
||||
}
|
||||
|
||||
return sideloaderProcessing
|
||||
.processDocument(url, body, List.of(), new DomainLinks(),
|
||||
GeneratorType.DOCS,
|
||||
10_000);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private String getBody(WarcResponse response) {
|
||||
var http = response.http();
|
||||
|
||||
// TODO: We should support additional encodings here
|
||||
return new String(http.body().stream().readAllBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
reader.close();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
package nu.marginalia.converting.sideload.warc;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import nu.marginalia.converting.ConverterModule;
|
||||
import nu.marginalia.converting.processor.ConverterDomainTypes;
|
||||
import nu.marginalia.converting.sideload.SideloaderProcessing;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class WarcSideloaderTest {
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
var domainTypesMock = Mockito.mock(ConverterDomainTypes.class);
|
||||
when(domainTypesMock.isBlog(Mockito.any())).thenReturn(false);
|
||||
|
||||
var processing = Guice.createInjector(new ConverterModule(),
|
||||
new AbstractModule() {
|
||||
public void configure() {
|
||||
bind(ConverterDomainTypes.class).toInstance(domainTypesMock);
|
||||
}
|
||||
}
|
||||
)
|
||||
.getInstance(SideloaderProcessing.class);
|
||||
|
||||
var sideloader = new WarcSideloader(Path.of("/home/vlofgren/marginalia.warc.gz"), processing);
|
||||
|
||||
var domain = sideloader.getDomain();
|
||||
System.out.println(domain);
|
||||
sideloader.getDocumentsStream().forEachRemaining(System.out::println);
|
||||
}
|
||||
}
|
@ -32,6 +32,7 @@ public class ConvertActor extends RecordActorPrototype {
|
||||
public record Convert(FileStorageId fid) implements ActorStep {};
|
||||
public record ConvertEncyclopedia(String source, String baseUrl) implements ActorStep {};
|
||||
public record ConvertDirtree(String source) implements ActorStep {};
|
||||
public record ConvertWarc(String source) implements ActorStep {};
|
||||
public record ConvertStackexchange(String source) implements ActorStep {};
|
||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||
public record ConvertWait(FileStorageId destFid,
|
||||
@ -74,6 +75,25 @@ public class ConvertActor extends RecordActorPrototype {
|
||||
mqConverterOutbox.sendAsync(ConvertRequest.forDirtree(sourcePath, processedArea.id()))
|
||||
);
|
||||
}
|
||||
case ConvertWarc(String source) -> {
|
||||
Path sourcePath = Path.of(source);
|
||||
if (!Files.exists(sourcePath))
|
||||
yield new Error("Source path does not exist: " + sourcePath);
|
||||
|
||||
String fileName = sourcePath.toFile().getName();
|
||||
|
||||
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
|
||||
var processedArea = storageService.allocateTemporaryStorage(base,
|
||||
FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Warc Data; " + fileName);
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
|
||||
|
||||
yield new ConvertWait(
|
||||
processedArea.id(),
|
||||
mqConverterOutbox.sendAsync(ConvertRequest.forWarc(sourcePath, processedArea.id()))
|
||||
);
|
||||
}
|
||||
case ConvertEncyclopedia(String source, String baseUrl) -> {
|
||||
|
||||
Path sourcePath = Path.of(source);
|
||||
|
@ -170,6 +170,7 @@ public class IndexQueryService extends IndexApiImplBase {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// GRPC endpoint
|
||||
@SneakyThrows
|
||||
public void query(nu.marginalia.index.api.RpcIndexQuery request,
|
||||
|
@ -153,6 +153,8 @@ dependencyResolutionManagement {
|
||||
library('duckdb', 'org.duckdb', 'duckdb_jdbc').version('0.9.1')
|
||||
library('okhttp3','com.squareup.okhttp3','okhttp').version('4.11.0')
|
||||
|
||||
library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.4')
|
||||
|
||||
library('httpcomponents.core','org.apache.httpcomponents','httpcore').version('4.4.15')
|
||||
library('httpcomponents.client','org.apache.httpcomponents','httpclient').version('4.5.13')
|
||||
library('commons.net', 'commons-net','commons-net').version('3.9.0')
|
||||
|
Loading…
Reference in New Issue
Block a user