diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java index 833ad3f0..17102c06 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java @@ -4,5 +4,6 @@ public enum ConvertAction { ConvertCrawlData, SideloadEncyclopedia, SideloadDirtree, + SideloadWarc, SideloadStackexchange } diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java index fffed79b..cf445e5a 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java @@ -38,6 +38,13 @@ public class ConvertRequest { destId, null); } + public static ConvertRequest forWarc(Path sourcePath, FileStorageId destId) { + return new ConvertRequest(ConvertAction.SideloadWarc, + sourcePath.toString(), + null, + destId, + null); + } public static ConvertRequest forStackexchange(Path sourcePath, FileStorageId destId) { return new ConvertRequest(ConvertAction.SideloadStackexchange, diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index faa952fb..58b0ecdd 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -65,6 +65,7 @@ dependencies { implementation libs.bundles.slf4j implementation libs.notnull + implementation libs.jwarc implementation libs.jsoup diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index fb919018..50f29fb1 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -262,6 +262,14 @@ public class ConverterMain { processData.asPath(), msg, inbox); } + case SideloadWarc -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)), + processData.asPath(), + msg, inbox); + } case SideloadStackexchange -> { var processData = fileStorageService.getStorage(request.processedDataStorage); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java index debc460f..48ab45c9 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java @@ -6,6 +6,7 @@ import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.converting.sideload.dirtree.DirtreeSideloaderFactory; import nu.marginalia.converting.sideload.encyclopedia.EncyclopediaMarginaliaNuSideloader; import nu.marginalia.converting.sideload.stackexchange.StackexchangeSideloader; +import nu.marginalia.converting.sideload.warc.WarcSideloadFactory; import nu.marginalia.keyword.DocumentKeywordExtractor; import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider; @@ -22,6 +23,7 @@ public class SideloadSourceFactory { private final DocumentKeywordExtractor documentKeywordExtractor; private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final DirtreeSideloaderFactory dirtreeSideloaderFactory; + private final WarcSideloadFactory warcSideloadFactory; @Inject public SideloadSourceFactory(Gson gson, @@ -29,13 +31,15 @@ public class SideloadSourceFactory { ThreadLocalSentenceExtractorProvider sentenceExtractorProvider, DocumentKeywordExtractor documentKeywordExtractor, AnchorTagsSourceFactory anchorTagsSourceFactory, - DirtreeSideloaderFactory dirtreeSideloaderFactory) { + DirtreeSideloaderFactory dirtreeSideloaderFactory, + WarcSideloadFactory warcSideloadFactory) { this.gson = gson; this.sideloaderProcessing = sideloaderProcessing; this.sentenceExtractorProvider = sentenceExtractorProvider; this.documentKeywordExtractor = documentKeywordExtractor; this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.dirtreeSideloaderFactory = dirtreeSideloaderFactory; + this.warcSideloadFactory = warcSideloadFactory; } public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile, String baseUrl) throws SQLException { @@ -46,6 +50,10 @@ public class SideloadSourceFactory { return dirtreeSideloaderFactory.createSideloaders(pathToYamlFile); } + public Collection sideloadWarc(Path pathToWarcFiles) throws IOException { + return warcSideloadFactory.createSideloaders(pathToWarcFiles); + } + /** Do not use, this code isn't finished */ public Collection sideloadStackexchange(Path pathToDbFileRoot) throws IOException { try (var dirs = Files.walk(pathToDbFileRoot)) { diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java new file mode 100644 index 00000000..35fb6d3a --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java @@ -0,0 +1,32 @@ +package nu.marginalia.converting.sideload.warc; + +import nu.marginalia.converting.sideload.SideloadSource; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class WarcSideloadFactory { + + public Collection createSideloaders(Path pathToWarcFiles) throws IOException { + final List files = new ArrayList<>(); + + try (var stream = Files.list(pathToWarcFiles)) { + stream + .filter(Files::isRegularFile) + .filter(this::isWarcFile) + .forEach(files::add); + + } + // stub + return null; + } + + private boolean isWarcFile(Path path) { + return path.toString().endsWith(".warc") + || path.toString().endsWith(".warc.gz"); + } +} \ No newline at end of file diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java new file mode 100644 index 00000000..73d29a30 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java @@ -0,0 +1,141 @@ +package nu.marginalia.converting.sideload.warc; + +import lombok.SneakyThrows; +import nu.marginalia.atags.model.DomainLinks; +import nu.marginalia.converting.model.GeneratorType; +import nu.marginalia.converting.model.ProcessedDocument; +import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.sideload.SideloadSource; +import nu.marginalia.converting.sideload.SideloaderProcessing; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawl.DomainIndexingState; +import org.netpreserve.jwarc.*; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.StreamSupport; + +public class WarcSideloader implements SideloadSource, AutoCloseable { + + private final Path warcFile; + private final SideloaderProcessing sideloaderProcessing; + + private final WarcReader reader; + + private final EdgeDomain domain; + + public WarcSideloader(Path warcFile, + SideloaderProcessing sideloaderProcessing) + throws IOException + { + this.warcFile = warcFile; + this.sideloaderProcessing = sideloaderProcessing; + this.reader = new WarcReader(warcFile); + this.domain = sniffDomainFromWarc() + .orElseThrow(() -> new IOException("Could not identify domain from warc file")); + } + + @SneakyThrows + @Override + public ProcessedDomain getDomain() { + var ret = new ProcessedDomain(); + + ret.domain = domain; + ret.ip = "0.0.0.0"; + ret.state = DomainIndexingState.ACTIVE; + + return ret; + } + + private Optional sniffDomainFromWarc() throws IOException { + try { + for (var record : reader) { + if (!(record instanceof WarcRequest request)) { + continue; + } + + String target = request.target(); + if (target.startsWith("http://") || target.startsWith("https://")) { + return Optional.of(new EdgeUrl(target).getDomain()); + } + } + } catch (URISyntaxException e) { + return Optional.empty(); + } finally { + reader.position(0); + } + return Optional.empty(); + } + + @SneakyThrows + @Override + public Iterator getDocumentsStream() { + return reader.records() + .filter(record -> record instanceof WarcResponse) + .map(WarcResponse.class::cast) + .filter(this::isRelevantResponse) + .map(this::process) + .iterator(); + } + + private boolean isRelevantResponse(WarcResponse warcResponse) { + try { + HttpResponse httpResponse = warcResponse.http(); + if (httpResponse == null) + return false; + if (httpResponse.status() != 200) + return false; + if (!Objects.equals(httpResponse.contentType(), MediaType.HTML)) + return false; + + var url = new EdgeUrl(warcResponse.target()); + if (!Objects.equals(url.getDomain(), domain)) { + return false; + } + + return true; + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + @SneakyThrows + private ProcessedDocument process(WarcResponse response) { + String body = getBody(response); + String url = response.target(); + + // We trim "/index.html"-suffixes from the index if they are present, + // since this is typically an artifact from document retrieval + if (url.endsWith("/index.html")) { + url = url.substring(0, url.length() - "index.html".length()); + } + + return sideloaderProcessing + .processDocument(url, body, List.of(), new DomainLinks(), + GeneratorType.DOCS, + 10_000); + } + + @SneakyThrows + private String getBody(WarcResponse response) { + var http = response.http(); + + // TODO: We should support additional encodings here + return new String(http.body().stream().readAllBytes(), StandardCharsets.UTF_8); + } + + @Override + public void close() throws Exception { + reader.close(); + } + +} diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/warc/WarcSideloaderTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/warc/WarcSideloaderTest.java new file mode 100644 index 00000000..dfa3c972 --- /dev/null +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/warc/WarcSideloaderTest.java @@ -0,0 +1,37 @@ +package nu.marginalia.converting.sideload.warc; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import nu.marginalia.converting.ConverterModule; +import nu.marginalia.converting.processor.ConverterDomainTypes; +import nu.marginalia.converting.sideload.SideloaderProcessing; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.file.Path; + +import static org.mockito.Mockito.when; + +class WarcSideloaderTest { + @Test + public void test() throws IOException { + var domainTypesMock = Mockito.mock(ConverterDomainTypes.class); + when(domainTypesMock.isBlog(Mockito.any())).thenReturn(false); + + var processing = Guice.createInjector(new ConverterModule(), + new AbstractModule() { + public void configure() { + bind(ConverterDomainTypes.class).toInstance(domainTypesMock); + } + } + ) + .getInstance(SideloaderProcessing.class); + + var sideloader = new WarcSideloader(Path.of("/home/vlofgren/marginalia.warc.gz"), processing); + + var domain = sideloader.getDomain(); + System.out.println(domain); + sideloader.getDocumentsStream().forEachRemaining(System.out::println); + } +} \ No newline at end of file diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java index 275f4092..4af4852e 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java @@ -32,6 +32,7 @@ public class ConvertActor extends RecordActorPrototype { public record Convert(FileStorageId fid) implements ActorStep {}; public record ConvertEncyclopedia(String source, String baseUrl) implements ActorStep {}; public record ConvertDirtree(String source) implements ActorStep {}; + public record ConvertWarc(String source) implements ActorStep {}; public record ConvertStackexchange(String source) implements ActorStep {}; @Resume(behavior = ActorResumeBehavior.RETRY) public record ConvertWait(FileStorageId destFid, @@ -74,6 +75,25 @@ public class ConvertActor extends RecordActorPrototype { mqConverterOutbox.sendAsync(ConvertRequest.forDirtree(sourcePath, processedArea.id())) ); } + case ConvertWarc(String source) -> { + Path sourcePath = Path.of(source); + if (!Files.exists(sourcePath)) + yield new Error("Source path does not exist: " + sourcePath); + + String fileName = sourcePath.toFile().getName(); + + var base = storageService.getStorageBase(FileStorageBaseType.STORAGE); + var processedArea = storageService.allocateTemporaryStorage(base, + FileStorageType.PROCESSED_DATA, "processed-data", + "Processed Warc Data; " + fileName); + + storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW); + + yield new ConvertWait( + processedArea.id(), + mqConverterOutbox.sendAsync(ConvertRequest.forWarc(sourcePath, processedArea.id())) + ); + } case ConvertEncyclopedia(String source, String baseUrl) -> { Path sourcePath = Path.of(source); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexQueryService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexQueryService.java index b8bf0a5a..f00bace2 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexQueryService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexQueryService.java @@ -170,6 +170,7 @@ public class IndexQueryService extends IndexApiImplBase { } } + // GRPC endpoint @SneakyThrows public void query(nu.marginalia.index.api.RpcIndexQuery request, diff --git a/settings.gradle b/settings.gradle index 952acd9c..4814f7e7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -153,6 +153,8 @@ dependencyResolutionManagement { library('duckdb', 'org.duckdb', 'duckdb_jdbc').version('0.9.1') library('okhttp3','com.squareup.okhttp3','okhttp').version('4.11.0') + library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.4') + library('httpcomponents.core','org.apache.httpcomponents','httpcore').version('4.4.15') library('httpcomponents.client','org.apache.httpcomponents','httpclient').version('4.5.13') library('commons.net', 'commons-net','commons-net').version('3.9.0')