diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActor.java b/code/execution/java/nu/marginalia/actor/ExecutorActor.java index 211a7144..0a74fc43 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActor.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActor.java @@ -20,6 +20,7 @@ public enum ExecutorActor { EXPORT_FEEDS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), EXPORT_SAMPLE_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), DOWNLOAD_SAMPLE(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), + MIGRATE_CRAWL_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED), PROC_CONVERTER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD), PROC_LOADER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD), diff --git a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java index 04daf704..923888c2 100644 --- a/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java +++ b/code/execution/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -66,6 +66,7 @@ public class ExecutorActorControlService { DownloadSampleActor downloadSampleActor, ScrapeFeedsActor scrapeFeedsActor, ExecutorActorStateMachines stateMachines, + MigrateCrawlDataActor migrateCrawlDataActor, ExportAllPrecessionActor exportAllPrecessionActor, UpdateRssActor updateRssActor) throws SQLException { this.messageQueueFactory = messageQueueFactory; @@ -107,6 +108,8 @@ public class ExecutorActorControlService { register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor); register(ExecutorActor.UPDATE_RSS, updateRssActor); + register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor); + if (serviceConfiguration.node() == 1) { register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor); } diff --git a/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java b/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java new file mode 100644 index 00000000..15e6ddb2 --- /dev/null +++ b/code/execution/java/nu/marginalia/actor/task/MigrateCrawlDataActor.java @@ -0,0 +1,130 @@ +package nu.marginalia.actor.task; + +import com.google.gson.Gson; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.io.CrawlerOutputFile; +import nu.marginalia.process.log.WorkLog; +import nu.marginalia.process.log.WorkLogEntry; +import nu.marginalia.slop.SlopCrawlDataRecord; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorage; +import nu.marginalia.storage.model.FileStorageId; +import org.apache.logging.log4j.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +@Singleton +public class MigrateCrawlDataActor extends RecordActorPrototype { + + private final FileStorageService fileStorageService; + + private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class); + + @Inject + public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) { + super(gson); + + this.fileStorageService = fileStorageService; + } + + public record Run(long fileStorageId) implements ActorStep {} + + @Override + public ActorStep transition(ActorStep self) throws Exception { + return switch (self) { + case Run(long fileStorageId) -> { + + FileStorage storage = fileStorageService.getStorage(FileStorageId.of(fileStorageId)); + Path root = storage.asPath(); + + Path crawlerLog = root.resolve("crawler.log"); + Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log"); + + try (WorkLog workLog = new WorkLog(newCrawlerLog)) { + for (Map.Entry item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) { + + var entry = item.getKey(); + var path = item.getValue(); + + logger.info("Converting {}", entry.id()); + + + if (path.toFile().getName().endsWith(".parquet")) { + String domain = entry.id(); + String id = Integer.toHexString(domain.hashCode()); + + Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain); + + SlopCrawlDataRecord.convertFromParquet(path, outputFile); + + workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt()); + } + else { + workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt()); + } + } + } + + Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log"); + Files.move(crawlerLog, oldCrawlerLog); + Files.move(newCrawlerLog, crawlerLog); + + yield new End(); + } + default -> new Error(); + }; + } + + private static class CrawlDataLocator implements Function>> { + + private final Path crawlRootDir; + + CrawlDataLocator(Path crawlRootDir) { + this.crawlRootDir = crawlRootDir; + } + + @Override + public Optional> apply(WorkLogEntry entry) { + var path = getCrawledFilePath(crawlRootDir, entry.path()); + + if (!Files.exists(path)) { + return Optional.empty(); + } + + try { + return Optional.of(Map.entry(entry, path)); + } + catch (Exception ex) { + return Optional.empty(); + } + } + + private Path getCrawledFilePath(Path crawlDir, String fileName) { + int sp = fileName.lastIndexOf('/'); + + // Normalize the filename + if (sp >= 0 && sp + 1< fileName.length()) + fileName = fileName.substring(sp + 1); + if (fileName.length() < 4) + fileName = Strings.repeat("0", 4 - fileName.length()) + fileName; + + String sp1 = fileName.substring(0, 2); + String sp2 = fileName.substring(2, 4); + return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName); + } + } + + @Override + public String describe() { + return "Migrates crawl data to the latest format"; + } +} diff --git a/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/GammaCodedSequenceArrayColumn.java b/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/GammaCodedSequenceArrayColumn.java index ba31564e..2cb7bcf9 100644 --- a/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/GammaCodedSequenceArrayColumn.java +++ b/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/GammaCodedSequenceArrayColumn.java @@ -45,6 +45,11 @@ public class GammaCodedSequenceArrayColumn extends AbstractObjectColumn columnDesc() { return GammaCodedSequenceColumn.this; diff --git a/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/VarintCodedSequenceArrayColumn.java b/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/VarintCodedSequenceArrayColumn.java index 1d8141d7..dc16d5ec 100644 --- a/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/VarintCodedSequenceArrayColumn.java +++ b/code/libraries/coded-sequence/java/nu/marginalia/sequence/slop/VarintCodedSequenceArrayColumn.java @@ -45,6 +45,11 @@ public class VarintCodedSequenceArrayColumn extends AbstractObjectColumn> { + private static class CrawlDataLocator implements Function> { private final Path crawlRootDir; private final BatchingWorkLog batchingWorkLog; @@ -239,7 +237,7 @@ public class ConverterMain extends ProcessMainClass { } @Override - public Optional apply(WorkLogEntry entry) { + public Optional apply(WorkLogEntry entry) { if (batchingWorkLog.isItemProcessed(entry.id())) { return Optional.empty(); } @@ -252,7 +250,7 @@ public class ConverterMain extends ProcessMainClass { } try { - return Optional.of(CrawledDomainReader.createDataStream(path)); + return Optional.of(path); } catch (Exception ex) { return Optional.empty(); diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/DocumentProcessor.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/DocumentProcessor.java index d1e4d495..62da1c63 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/DocumentProcessor.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/DocumentProcessor.java @@ -19,6 +19,7 @@ import nu.marginalia.model.idx.WordFlags; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; @@ -91,7 +92,7 @@ public class DocumentProcessor { DocumentClass documentClass, DocumentDecorator documentDecorator, DomainLinks externalDomainLinks, - ProcessedDocument ret) throws URISyntaxException, DisqualifiedException + ProcessedDocument ret) throws URISyntaxException, IOException, DisqualifiedException { var crawlerStatus = CrawlerDocumentStatus.valueOf(crawledDocument.crawlerStatus); @@ -109,7 +110,7 @@ public class DocumentProcessor { ret.state = crawlerStatusToUrlState(crawledDocument.crawlerStatus, crawledDocument.httpStatus); - final var plugin = findPlugin(crawledDocument); + AbstractDocumentProcessorPlugin plugin = findPlugin(crawledDocument); EdgeUrl url = new EdgeUrl(crawledDocument.url); LinkTexts linkTexts = anchorTextKeywords.getAnchorTextKeywords(externalDomainLinks, url); diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java index d31195f8..0da8db92 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -14,6 +14,7 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf; import nu.marginalia.converting.writer.ConverterBatchWriter; import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.geoip.sources.AsnTable; +import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawl.DomainIndexingState; @@ -27,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Path; import java.sql.SQLException; import java.util.*; import java.util.regex.Pattern; @@ -54,21 +56,24 @@ public class DomainProcessor { geoIpDictionary.waitReady(); } - public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) { - final int sizeHint = domain.sizeHint(); + public ConverterBatchWritableIf createWritable(Path path) throws IOException { + + var dataStream = CrawledDomainReader.createDataStream(path); + + final int sizeHint = dataStream.sizeHint(); if (sizeHint > SIDELOAD_THRESHOLD) { // If the file is too big, we run a processing mode that doesn't // require loading the entire dataset into RAM - return sideloadProcessing(domain, sizeHint); + return simpleProcessing(dataStream, sizeHint); } - return fullProcessing(domain); + return fullProcessing(dataStream); } - public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection extraKeywords) { + public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection extraKeywords) { try { - return new SideloadProcessing(dataStream, sizeHint, extraKeywords); + return new SimpleProcessing(dataStream, sizeHint, extraKeywords); } catch (Exception ex) { logger.warn("Failed to process domain sideload", ex); @@ -76,9 +81,9 @@ public class DomainProcessor { } } - public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) { + public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint) { try { - return new SideloadProcessing(dataStream, sizeHint); + return new SimpleProcessing(dataStream, sizeHint); } catch (Exception ex) { logger.warn("Failed to process domain sideload", ex); @@ -86,93 +91,6 @@ public class DomainProcessor { } } - public class SideloadProcessing implements ConverterBatchWritableIf, SideloadSource { - private final SerializableCrawlDataStream dataStream; - private final ProcessedDomain domain; - private final DocumentDecorator documentDecorator; - private final Set processedUrls = new HashSet<>(); - private final DomainLinks externalDomainLinks; - private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); - private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8, - Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors()) - ); - - SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException { - this(dataStream, sizeHint, List.of()); - } - - SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection extraKeywords) throws IOException { - this.dataStream = dataStream; - - if (!dataStream.hasNext() || !(dataStream.next() instanceof CrawledDomain crawledDomain)) - { - throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName()); - } - - domain = new ProcessedDomain(); - domain.sizeloadSizeAdvice = sizeHint == 0 ? 10_000 : sizeHint; - - documentDecorator = new DocumentDecorator(); - documentDecorator.addTerms(extraKeywords); - - processDomain(crawledDomain, domain, documentDecorator); - - externalDomainLinks = anchorTagsSource.getAnchorTags(domain.domain); - } - - @Override - public ProcessedDomain getDomain() { - return domain; - } - - @Override - public Iterator getDocumentsStream() { - return iteratorFactory.create((taskConsumer) -> { - while (dataStream.hasNext()) - { - if (!(dataStream.next() instanceof CrawledDocument doc)) - continue; - if (doc.url == null || !processedUrls.add(doc.url)) - continue; - - - taskConsumer.accept(() -> { - var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator); - - synchronized (deduplicator) { - deduplicator.markIfDuplicate(processedDoc); - } - - if (processedDoc.isProcessedFully()) { - // This is a bit sketchy, but we need to set the size and topology to something - processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology( - 10_000, externalDomainLinks.countForUrl(processedDoc.url)); - } - - return processedDoc; - }); - } - }); - } - - @Override - public void write(ConverterBatchWriter writer) throws IOException { - writer.writeSideloadSource(this); - } - - @Override - public String id() { - return domain.domain.toString(); - } - - @Override - public void close() throws Exception { - dataStream.close(); - deduplicator.close(); - } - } - - @Nullable public ProcessedDomain fullProcessing(SerializableCrawlDataStream dataStream) { try { @@ -204,7 +122,7 @@ public class DomainProcessor { continue; if (doc.url == null) continue; - if (doc.documentBody.isBlank()) + if (doc.documentBodyBytes.length == 0) continue; if (!processedUrls.add(doc.url)) continue; @@ -231,6 +149,90 @@ public class DomainProcessor { } } + /** The simple processing track processes documents individually, and does not perform any domain-level analysis. + * This is needed to process extremely large domains, which would otherwise eat up too much RAM. + */ + public class SimpleProcessing implements ConverterBatchWritableIf, SideloadSource { + private final SerializableCrawlDataStream dataStream; + private final ProcessedDomain domain; + private final DocumentDecorator documentDecorator; + private final Set processedUrls = new HashSet<>(); + private final DomainLinks externalDomainLinks; + private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); + private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8, + Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors()) + ); + + SimpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException { + this(dataStream, sizeHint, List.of()); + } + + SimpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection extraKeywords) throws IOException { + this.dataStream = dataStream; + + if (!dataStream.hasNext() || !(dataStream.next() instanceof CrawledDomain crawledDomain)) + { + throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName()); + } + + domain = new ProcessedDomain(); + domain.sizeloadSizeAdvice = sizeHint == 0 ? 10_000 : sizeHint; + + documentDecorator = new DocumentDecorator(); + documentDecorator.addTerms(extraKeywords); + + processDomain(crawledDomain, domain, documentDecorator); + + externalDomainLinks = anchorTagsSource.getAnchorTags(domain.domain); + } + + @Override + public ProcessedDomain getDomain() { + return domain; + } + + @Override + public Iterator getDocumentsStream() { + return dataStream.map((next) -> { + if (!(next instanceof CrawledDocument doc)) + return Optional.empty(); + + if (doc.url == null || !processedUrls.add(doc.url)) + return Optional.empty(); + + var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator); + + synchronized (deduplicator) { + deduplicator.markIfDuplicate(processedDoc); + } + + if (processedDoc.isProcessedFully()) { + // This is a bit sketchy, but we need to set the size and topology to something + processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology( + 10_000, externalDomainLinks.countForUrl(processedDoc.url)); + } + + return Optional.of(processedDoc); + }); + } + + @Override + public void write(ConverterBatchWriter writer) throws IOException { + writer.writeSideloadSource(this); + } + + @Override + public String id() { + return domain.domain.toString(); + } + + @Override + public void close() throws Exception { + dataStream.close(); + deduplicator.close(); + } + } + private void processDomain(CrawledDomain crawledDomain, ProcessedDomain domain, DocumentDecorator decorator) diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/DocumentValuator.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/DocumentValuator.java index 1c959dee..c96602e8 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/DocumentValuator.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/DocumentValuator.java @@ -24,7 +24,7 @@ public class DocumentValuator { double scriptPenalty = getScriptPenalty(parsedDocument); double chatGptPenalty = getChatGptContentFarmPenalty(parsedDocument); - int rawLength = crawledDocument.documentBody.length(); + int rawLength = crawledDocument.documentBodyBytes.length; if (textLength == 0) { throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH); diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/FeatureExtractor.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/FeatureExtractor.java index f2bac097..b1a8bbb4 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/FeatureExtractor.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/logic/FeatureExtractor.java @@ -218,7 +218,10 @@ public class FeatureExtractor { } } - if (features.contains(HtmlFeature.JS) && adblockSimulator.hasAds(doc.clone())) { + if (features.contains(HtmlFeature.JS) + // remove while disabled to get rid of expensive clone() call: + // adblockSimulator.hasAds(doc.clone()) + ) { features.add(HtmlFeature.ADVERTISEMENT); } diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/AbstractDocumentProcessorPlugin.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/AbstractDocumentProcessorPlugin.java index b03468ca..36a76ab3 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/AbstractDocumentProcessorPlugin.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/AbstractDocumentProcessorPlugin.java @@ -14,6 +14,7 @@ import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.html.HtmlStandard; import javax.annotation.Nullable; +import java.io.IOException; import java.net.URISyntaxException; import java.util.HashSet; import java.util.List; @@ -25,7 +26,7 @@ public abstract class AbstractDocumentProcessorPlugin { this.languageFilter = languageFilter; } - public abstract DetailsWithWords createDetails(CrawledDocument crawledDocument, LinkTexts linkTexts, DocumentClass documentClass) throws DisqualifiedException, URISyntaxException; + public abstract DetailsWithWords createDetails(CrawledDocument crawledDocument, LinkTexts linkTexts, DocumentClass documentClass) throws DisqualifiedException, URISyntaxException, IOException; public abstract boolean isApplicable(CrawledDocument doc); protected void checkDocumentLanguage(DocumentLanguageData dld) throws DisqualifiedException { @@ -86,6 +87,7 @@ public abstract class AbstractDocumentProcessorPlugin { return this; } + public MetaTagsBuilder addPubDate(PubDate pubDate) { if (pubDate.year() > 1900) { diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/HtmlDocumentProcessorPlugin.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/HtmlDocumentProcessorPlugin.java index bc51e472..898795ef 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/HtmlDocumentProcessorPlugin.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/HtmlDocumentProcessorPlugin.java @@ -6,6 +6,7 @@ import nu.marginalia.converting.model.DisqualifiedException; import nu.marginalia.converting.model.DocumentHeaders; import nu.marginalia.converting.model.GeneratorType; import nu.marginalia.converting.model.ProcessedDocumentDetails; +import nu.marginalia.converting.processor.AcceptableAds; import nu.marginalia.converting.processor.DocumentClass; import nu.marginalia.converting.processor.MetaRobotsTag; import nu.marginalia.converting.processor.logic.*; @@ -32,11 +33,11 @@ import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.html.HtmlStandard; import nu.marginalia.model.idx.DocumentFlags; import nu.marginalia.model.idx.DocumentMetadata; -import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.URISyntaxException; import java.util.EnumSet; import java.util.HashSet; @@ -51,7 +52,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin private final double minDocumentQuality; private final FeatureExtractor featureExtractor; - private final TitleExtractor titleExtractor; private final DocumentKeywordExtractor keywordExtractor; private final PubDateSniffer pubDateSniffer; @@ -74,7 +74,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin @Named("min-document-quality") Double minDocumentQuality, LanguageFilter languageFilter, FeatureExtractor featureExtractor, - TitleExtractor titleExtractor, DocumentKeywordExtractor keywordExtractor, PubDateSniffer pubDateSniffer, DocumentLengthLogic documentLengthLogic, @@ -89,7 +88,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin this.minDocumentQuality = minDocumentQuality; this.featureExtractor = featureExtractor; - this.titleExtractor = titleExtractor; this.keywordExtractor = keywordExtractor; this.pubDateSniffer = pubDateSniffer; this.metaRobotsTag = metaRobotsTag; @@ -108,19 +106,17 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin public DetailsWithWords createDetails(CrawledDocument crawledDocument, LinkTexts linkTexts, DocumentClass documentClass) - throws DisqualifiedException, URISyntaxException { + throws DisqualifiedException, URISyntaxException, IOException { - String documentBody = crawledDocument.documentBody; - - if (languageFilter.isBlockedUnicodeRange(documentBody)) { + if (languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) { throw new DisqualifiedException(DisqualificationReason.LANGUAGE); } - if (documentBody.length() > MAX_DOCUMENT_LENGTH_BYTES) { // 128kb - documentBody = documentBody.substring(0, MAX_DOCUMENT_LENGTH_BYTES); - } + Document doc = crawledDocument.parseBody(); - Document doc = Jsoup.parse(documentBody); + if (AcceptableAds.hasAcceptableAdsTag(doc)) { + throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS); + } if (!metaRobotsTag.allowIndexingByMetaTag(doc)) { throw new DisqualifiedException(DisqualificationReason.FORBIDDEN); @@ -138,32 +134,33 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin } var prunedDoc = specialization.prune(doc); - DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc); - checkDocumentLanguage(dld); - - var ret = new ProcessedDocumentDetails(); final int length = getLength(doc); final HtmlStandard standard = getHtmlStandard(doc); final double quality = documentValuator.getQuality(crawledDocument, standard, doc, length); + if (isDisqualified(documentClass, url, quality, doc.title())) { + throw new DisqualifiedException(DisqualificationReason.QUALITY); + } + + DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc); + + checkDocumentLanguage(dld); + documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier()); + + var ret = new ProcessedDocumentDetails(); + ret.length = length; ret.standard = standard; ret.title = specialization.getTitle(doc, dld, crawledDocument.url); - documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier()); - final Set features = featureExtractor.getFeatures(url, doc, documentHeaders, dld); ret.features = features; ret.quality = documentValuator.adjustQuality(quality, features); ret.hashCode = dld.localitySensitiveHashCode(); - if (isDisqualified(documentClass, url, quality, ret.title)) { - throw new DisqualifiedException(DisqualificationReason.QUALITY); - } - PubDate pubDate = pubDateSniffer.getPubDate(documentHeaders, url, doc, standard, true); EnumSet documentFlags = documentFlags(features, generatorParts.type()); diff --git a/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/PlainTextDocumentProcessorPlugin.java b/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/PlainTextDocumentProcessorPlugin.java index 23f444a9..2d9364a8 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/PlainTextDocumentProcessorPlugin.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/processor/plugin/PlainTextDocumentProcessorPlugin.java @@ -71,7 +71,7 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP DocumentClass documentClass) throws DisqualifiedException, URISyntaxException { - String documentBody = crawledDocument.documentBody; + String documentBody = crawledDocument.documentBody(); if (languageFilter.isBlockedUnicodeRange(documentBody)) { throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE); diff --git a/code/processes/converting-process/java/nu/marginalia/converting/sideload/SideloaderProcessing.java b/code/processes/converting-process/java/nu/marginalia/converting/sideload/SideloaderProcessing.java index 95729851..3418fcbe 100644 --- a/code/processes/converting-process/java/nu/marginalia/converting/sideload/SideloaderProcessing.java +++ b/code/processes/converting-process/java/nu/marginalia/converting/sideload/SideloaderProcessing.java @@ -19,6 +19,7 @@ import nu.marginalia.model.idx.DocumentMetadata; import nu.marginalia.model.idx.WordFlags; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.EnumSet; import java.util.List; @@ -50,7 +51,7 @@ public class SideloaderProcessing { "OK", "NP", "", - body, + body.getBytes(StandardCharsets.UTF_8), false, null, null diff --git a/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTest.java b/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTest.java index 7cc451b4..0e5bbf39 100644 --- a/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTest.java +++ b/code/processes/converting-process/test/nu/marginalia/converting/ConvertingIntegrationTest.java @@ -98,7 +98,7 @@ public class ConvertingIntegrationTest { @Test public void testMemexMarginaliaNuSideloadProcessing() throws IOException { - var ret = domainProcessor.sideloadProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()), 100); + var ret = domainProcessor.simpleProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()), 100); assertNotNull(ret); assertEquals("memex.marginalia.nu", ret.id()); @@ -146,7 +146,7 @@ public class ConvertingIntegrationTest { "OK", "", "", - readClassPathFile(p.toString()), + readClassPathFile(p.toString()).getBytes(), false, null, null diff --git a/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/ContentType.java b/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/ContentType.java index 86b6aa68..f3d5048e 100644 --- a/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/ContentType.java +++ b/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/ContentType.java @@ -2,11 +2,16 @@ package nu.marginalia.contenttype; import org.apache.commons.lang3.StringUtils; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; + /** Content type and charset of a document * @param contentType The content type, e.g. "text/html" * @param charset The charset, e.g. "UTF-8" */ public record ContentType(String contentType, String charset) { + public static ContentType parse(String contentTypeHeader) { if (contentTypeHeader == null || contentTypeHeader.isBlank()) return new ContentType(null, null); @@ -15,9 +20,31 @@ public record ContentType(String contentType, String charset) { String contentType = parts[0].trim(); String charset = parts.length > 1 ? parts[1].trim() : "UTF-8"; + if (charset.toLowerCase().startsWith("charset=")) { + charset = charset.substring("charset=".length()); + } + return new ContentType(contentType, charset); } + /** Best effort method for turning the provided charset string into a Java charset method, + * with some guesswork-heuristics for when it doesn't work + */ + public Charset asCharset() { + try { + if (Charset.isSupported(charset)) { + return Charset.forName(charset); + } else if (charset.equalsIgnoreCase("macintosh-latin")) { + return StandardCharsets.ISO_8859_1; + } else { + return StandardCharsets.UTF_8; + } + } + catch (IllegalCharsetNameException ex) { // thrown by Charset.isSupported() + return StandardCharsets.UTF_8; + } + } + public boolean is(String contentType) { return this.contentType.equalsIgnoreCase(contentType); } diff --git a/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/DocumentBodyToString.java b/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/DocumentBodyToString.java index 8187871e..22949489 100644 --- a/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/DocumentBodyToString.java +++ b/code/processes/crawling-process/ft-content-type/java/nu/marginalia/contenttype/DocumentBodyToString.java @@ -1,9 +1,12 @@ package nu.marginalia.contenttype; +import org.jsoup.Jsoup; +import org.jsoup.nodes.Document; + +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.StandardCharsets; -import java.nio.charset.UnsupportedCharsetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -23,24 +26,25 @@ public class DocumentBodyToString { return new String(data, charset); } + public static Document getParsedData(ContentType type, byte[] data, String url) throws IOException { + final Charset charset; + + if (type.charset() == null || type.charset().isBlank()) { + charset = StandardCharsets.UTF_8; + } else { + charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset); + } + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + + return Jsoup.parse(bais, charset.name(), url); + } + private static Charset computeCharset(ContentType type) { - try { - if (type.charset() == null || type.charset().isBlank()) - return StandardCharsets.UTF_8; - else { - return Charset.forName(type.charset()); - } - } - catch (IllegalCharsetNameException ex) { - // Fall back to UTF-8 if we don't understand what this is. It's *probably* fine? Maybe? + if (type.charset() == null || type.charset().isBlank()) return StandardCharsets.UTF_8; - } - catch (UnsupportedCharsetException ex) { - // This is usually like Macintosh Latin - // (https://en.wikipedia.org/wiki/Macintosh_Latin_encoding) - // - // It's close enough to 8859-1 to serve - return StandardCharsets.ISO_8859_1; + else { + return type.asCharset(); } } } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java index 3bc87da8..6837a3bd 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/CrawlerMain.java @@ -23,16 +23,18 @@ import nu.marginalia.io.CrawledDomainReader; import nu.marginalia.io.CrawlerOutputFile; import nu.marginalia.model.EdgeDomain; import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter; import nu.marginalia.process.ProcessConfiguration; import nu.marginalia.process.ProcessConfigurationModule; import nu.marginalia.process.ProcessMainClass; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.log.WorkLog; +import nu.marginalia.process.log.WorkLogEntry; import nu.marginalia.service.module.DatabaseModule; +import nu.marginalia.slop.SlopCrawlDataRecord; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.util.SimpleBlockingThreadPool; +import org.apache.logging.log4j.util.Strings; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,13 +44,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.security.Security; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX; @@ -182,6 +182,8 @@ public class CrawlerMain extends ProcessMainClass { // Assign any domains with node_affinity=0 to this node, and then fetch all domains assigned to this node // to be crawled. + performMigration(outputDir); + try (var conn = dataSource.getConnection()) { try (var assignFreeDomains = conn.prepareStatement( """ @@ -291,7 +293,6 @@ public class CrawlerMain extends ProcessMainClass { } } - public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception { runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath()); } @@ -353,7 +354,7 @@ public class CrawlerMain extends ProcessMainClass { Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); - Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain); + Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain); // Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data // while writing to the same file name as before @@ -387,15 +388,15 @@ public class CrawlerMain extends ProcessMainClass { reference.delete(); // Convert the WARC file to Parquet - CrawledDocumentParquetRecordFileWriter - .convertWarc(domain, userAgent, newWarcFile, parquetFile); + SlopCrawlDataRecord + .convertWarc(domain, userAgent, newWarcFile, slopFile); // Optionally archive the WARC file if full retention is enabled, // otherwise delete it: warcArchiver.consumeWarc(newWarcFile, domain); // Mark the domain as finished in the work log - workLog.setJobToFinished(domain, parquetFile.toString(), size); + workLog.setJobToFinished(domain, slopFile.toString(), size); // Update the progress bar heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); @@ -480,4 +481,93 @@ public class CrawlerMain extends ProcessMainClass { } } } + + // Data migration logic + + private void performMigration(Path root) throws IOException { + Path crawlerLog = root.resolve("crawler.log"); + Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log"); + + + int finishedTasks = 0; + int totalTasks; + try (var oldLog = new WorkLog(crawlerLog)) { + totalTasks = oldLog.countFinishedJobs(); + } + + try (WorkLog workLog = new WorkLog(newCrawlerLog); + var migrationHeartbeat = heartbeat.createAdHocTaskHeartbeat("MIGRATING")) { + + + + for (Map.Entry item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) { + + var entry = item.getKey(); + var path = item.getValue(); + + if (path.toFile().getName().endsWith(".parquet")) { + logger.info("Converting {}", entry.id()); + + String domain = entry.id(); + String id = Integer.toHexString(domain.hashCode()); + + Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain); + + SlopCrawlDataRecord.convertFromParquet(path, outputFile); + + workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt()); + } + else { + workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt()); + } + + migrationHeartbeat.progress("Parquet To Slop", ++finishedTasks, totalTasks); + } + } + + Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log"); + Files.move(crawlerLog, oldCrawlerLog, StandardCopyOption.REPLACE_EXISTING); + Files.move(newCrawlerLog, crawlerLog); + } + + + private static class CrawlDataLocator implements Function>> { + + private final Path crawlRootDir; + + CrawlDataLocator(Path crawlRootDir) { + this.crawlRootDir = crawlRootDir; + } + + @Override + public Optional> apply(WorkLogEntry entry) { + var path = getCrawledFilePath(crawlRootDir, entry.path()); + + if (!Files.exists(path)) { + return Optional.empty(); + } + + try { + return Optional.of(Map.entry(entry, path)); + } + catch (Exception ex) { + return Optional.empty(); + } + } + + private Path getCrawledFilePath(Path crawlDir, String fileName) { + int sp = fileName.lastIndexOf('/'); + + // Normalize the filename + if (sp >= 0 && sp + 1< fileName.length()) + fileName = fileName.substring(sp + 1); + if (fileName.length() < 4) + fileName = Strings.repeat("0", 4 - fileName.length()) + fileName; + + String sp1 = fileName.substring(0, 2); + String sp2 = fileName.substring(2, 4); + return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName); + } + } + } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java index 295d432b..3c330fb4 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/HttpFetcherImpl.java @@ -339,14 +339,14 @@ public class HttpFetcherImpl implements HttpFetcher { case "sitemapindex" -> { List references = new ArrayList<>(); for (var locTag : parsedSitemap.getElementsByTag("loc")) { - references.add(URLDecoder.decode(locTag.text().trim(), StandardCharsets.UTF_8)); + references.add(locTag.text().trim()); } yield new SitemapResult.SitemapReferences(Collections.unmodifiableList(references)); } case "urlset" -> { List urls = new ArrayList<>(); for (var locTag : parsedSitemap.select("url > loc")) { - urls.add(URLDecoder.decode(locTag.text().trim(), StandardCharsets.UTF_8)); + urls.add(locTag.text().trim()); } yield new SitemapResult.SitemapUrls(Collections.unmodifiableList(urls)); } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/warc/WarcRecorder.java b/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/warc/WarcRecorder.java index db4cf6ba..464ee91b 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/warc/WarcRecorder.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/fetcher/warc/WarcRecorder.java @@ -214,7 +214,7 @@ public class WarcRecorder implements AutoCloseable { writer.write(item); } - private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, String documentBody, @Nullable String headers, ContentTags contentTags) { + private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags contentTags) { try { WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder(); WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder(); @@ -224,7 +224,7 @@ public class WarcRecorder implements AutoCloseable { if (documentBody == null) { bytes = new byte[0]; } else { - bytes = documentBody.getBytes(); + bytes = documentBody; } // Create a synthesis of custom headers and the original headers @@ -295,7 +295,7 @@ public class WarcRecorder implements AutoCloseable { * an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this * scenario we want to record the data as it was in the previous crawl, but not re-fetch it. */ - public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, String documentBody, @Nullable String headers, ContentTags ctags) { + public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags ctags) { saveOldResponse(url, contentType, statusCode, documentBody, headers, ctags); } diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java index 98133bcf..8e1838a9 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -58,7 +58,7 @@ public class CrawlDataReference implements AutoCloseable { return null; } - public static boolean isContentBodySame(String one, String other) { + public static boolean isContentBodySame(byte[] one, byte[] other) { final long contentHashOne = contentHash(one); final long contentHashOther = contentHash(other); @@ -66,7 +66,7 @@ public class CrawlDataReference implements AutoCloseable { return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4; } - private static long contentHash(String content) { + private static long contentHash(byte[] content) { EasyLSH hash = new EasyLSH(); int next = 0; @@ -74,8 +74,8 @@ public class CrawlDataReference implements AutoCloseable { // In a naive best-effort fashion, extract the text // content of the document and feed it into the LSH - for (int i = 0; i < content.length(); i++) { - char c = content.charAt(i); + for (byte b : content) { + char c = (char) b; if (c == '<') { isInTag = true; } else if (c == '>') { diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index ede6e617..049e8bbf 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -378,14 +378,14 @@ public class CrawlerRetreiver implements AutoCloseable { else if (fetchedDoc instanceof HttpFetchResult.Result304Raw && reference.doc() != null) { var doc = reference.doc(); - warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody, doc.headers, contentTags); + warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBodyBytes, doc.headers, contentTags); fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url, new ContentType(doc.contentType, "UTF-8"), - doc.documentBody); + doc.documentBodyBytes); - if (doc.documentBody != null) { - var parsed = Jsoup.parse(doc.documentBody); + if (doc.documentBodyBytes != null) { + var parsed = doc.parseBody(); crawlFrontier.enqueueLinksFromDocument(top, parsed); crawlFrontier.addVisited(top); diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java index 7f3c6c22..67835eeb 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java @@ -1,6 +1,5 @@ package nu.marginalia.crawl.retreival.revisit; -import com.google.common.base.Strings; import crawlercommons.robots.SimpleRobotRules; import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.fetcher.warc.WarcRecorder; @@ -11,7 +10,8 @@ import nu.marginalia.crawl.retreival.DomainCrawlFrontier; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.body.HttpFetchResult; import nu.marginalia.model.crawldata.CrawledDocument; -import org.jsoup.Jsoup; + +import java.io.IOException; /** This class encapsulates the logic for re-visiting a domain that has already been crawled. * We may use information from the previous crawl to inform the next crawl, specifically the @@ -70,7 +70,7 @@ public class CrawlerRevisitor { // unlikely to produce anything meaningful for us. if (doc.httpStatus != 200) continue; - if (Strings.isNullOrEmpty(doc.documentBody)) + if (!doc.hasBody()) continue; if (!crawlFrontier.filterLink(url)) @@ -117,14 +117,19 @@ public class CrawlerRevisitor { // fashion to make sure we eventually catch changes over time // and ensure we discover new links - // Hoover up any links from the document - crawlFrontier.enqueueLinksFromDocument(url, Jsoup.parse(doc.documentBody)); + try { + // Hoover up any links from the document + crawlFrontier.enqueueLinksFromDocument(url, doc.parseBody()); + } + catch (IOException ex) { + // + } // Add a WARC record so we don't repeat this warcRecorder.writeReferenceCopy(url, doc.contentType, doc.httpStatus, - doc.documentBody, + doc.documentBodyBytes, doc.headers, new ContentTags(doc.etagMaybe, doc.lastModifiedMaybe) ); diff --git a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java index c24ef754..83a775f0 100644 --- a/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java +++ b/code/processes/crawling-process/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java @@ -2,8 +2,6 @@ package nu.marginalia.crawl.retreival.revisit; import nu.marginalia.crawl.fetcher.ContentTags; import nu.marginalia.crawl.retreival.CrawlDataReference; -import nu.marginalia.model.body.DocumentBodyExtractor; -import nu.marginalia.model.body.DocumentBodyResult; import nu.marginalia.model.body.HttpFetchResult; import nu.marginalia.model.crawldata.CrawledDocument; @@ -35,21 +33,17 @@ public record DocumentWithReference( return false; if (doc == null) return false; - if (doc.documentBody == null) + if (doc.documentBodyBytes.length == 0) return false; - if (!(DocumentBodyExtractor.asString(resultOk) instanceof DocumentBodyResult.Ok bodyOk)) { - return false; - } - - return CrawlDataReference.isContentBodySame(doc.documentBody, bodyOk.body()); + return CrawlDataReference.isContentBodySame(doc.documentBodyBytes, resultOk.bytesRaw()); } public ContentTags getContentTags() { if (null == doc) return ContentTags.empty(); - if (doc.documentBody == null || doc.httpStatus != 200) + if (doc.documentBodyBytes.length == 0 || doc.httpStatus != 200) return ContentTags.empty(); String lastmod = doc.getLastModified(); diff --git a/code/processes/crawling-process/model/build.gradle b/code/processes/crawling-process/model/build.gradle index e2a7afb3..b989cd8b 100644 --- a/code/processes/crawling-process/model/build.gradle +++ b/code/processes/crawling-process/model/build.gradle @@ -32,6 +32,7 @@ dependencies { implementation libs.bundles.parquet implementation libs.trove + implementation libs.slop implementation libs.jwarc implementation libs.gson implementation libs.commons.io diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java b/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java index 95e69925..5ef1a745 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/CrawledDomainReader.java @@ -1,6 +1,7 @@ package nu.marginalia.io; import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream; +import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +25,16 @@ public class CrawledDomainReader { logger.error("Error reading domain data from " + fullPath, ex); return SerializableCrawlDataStream.empty(); } - } else { + } + else if (fileName.endsWith(".slop.zip")) { + try { + return new SlopSerializableCrawlDataStream(fullPath); + } catch (Exception ex) { + logger.error("Error reading domain data from " + fullPath, ex); + return SerializableCrawlDataStream.empty(); + } + } + else { logger.error("Unknown file type: {}", fullPath); return SerializableCrawlDataStream.empty(); } diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/CrawlerOutputFile.java b/code/processes/crawling-process/model/java/nu/marginalia/io/CrawlerOutputFile.java index 6895387b..83a20e0f 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/CrawlerOutputFile.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/CrawlerOutputFile.java @@ -47,6 +47,20 @@ public class CrawlerOutputFile { } return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet"); } + + public static Path createSlopPath(Path basePath, String id, String domain) throws IOException { + id = padId(id); + + String first = id.substring(0, 2); + String second = id.substring(2, 4); + + Path destDir = basePath.resolve(first).resolve(second); + if (!Files.exists(destDir)) { + Files.createDirectories(destDir); + } + return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".slop.zip"); + } + public static Path getParquetPath(Path basePath, String id, String domain) { id = padId(id); @@ -56,6 +70,7 @@ public class CrawlerOutputFile { Path destDir = basePath.resolve(first).resolve(second); return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet"); } + public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) { id = padId(id); diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java b/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java index ab2a6624..0593b33e 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/SerializableCrawlDataStream.java @@ -4,12 +4,16 @@ import nu.marginalia.model.crawldata.CrawledDocument; import nu.marginalia.model.crawldata.CrawledDomain; import nu.marginalia.model.crawldata.SerializableCrawlData; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.function.Function; /** Closable iterator exceptional over serialized crawl data * The data may appear in any order, and the iterator must be closed. @@ -17,7 +21,7 @@ import java.util.List; * @see CrawledDomainReader * */ public interface SerializableCrawlDataStream extends AutoCloseable { - + Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class); SerializableCrawlData next() throws IOException; @@ -30,6 +34,41 @@ public interface SerializableCrawlDataStream extends AutoCloseable { @Nullable default Path path() { return null; } + default Iterator map(Function> mapper) { + return new Iterator<>() { + T next = null; + + public boolean hasNext() { + if (next != null) + return true; + try { + while (SerializableCrawlDataStream.this.hasNext()) { + var val = mapper.apply(SerializableCrawlDataStream.this.next()); + if (val.isPresent()) { + next = val.get(); + return true; + } + } + } + catch (IOException ex) { + logger.error("Error during stream", ex); + } + + return false; + } + + public T next() { + if (next == null && !hasNext()) + throw new IllegalStateException("No more data to read"); + + T ret = next; + next = null; + return ret; + } + }; + + } + /** For tests */ default List asList() throws IOException { List data = new ArrayList<>(); @@ -81,7 +120,6 @@ public interface SerializableCrawlDataStream extends AutoCloseable { public boolean hasNext() { return iterator.hasNext(); } public void close() {} }; - } } diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java b/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java index b8dea5bc..4a970384 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/ParquetSerializableCrawlDataStream.java @@ -1,7 +1,6 @@ package nu.marginalia.io.crawldata.format; import nu.marginalia.contenttype.ContentType; -import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.hash.MurmurHash3_128; import nu.marginalia.io.SerializableCrawlDataStream; import nu.marginalia.model.EdgeUrl; @@ -18,6 +17,7 @@ import java.nio.file.Path; import java.util.*; import java.util.stream.Stream; +@Deprecated public class ParquetSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { private static final Logger logger = LoggerFactory.getLogger(ParquetSerializableCrawlDataStream.class); @@ -124,9 +124,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial } else if (nextRecord.body != null) { try { - bodyString = DocumentBodyToString.getStringData( - ContentType.parse(nextRecord.contentType), - nextRecord.body); + ContentType.parse(nextRecord.contentType); } catch (Exception ex) { logger.error("Failed to convert body to string", ex); status = CrawlerDocumentStatus.BAD_CHARSET; @@ -147,7 +145,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial status.toString(), "", nextRecord.headers, - bodyString, + nextRecord.body, // this field isn't actually used, maybe we can skip calculating it? nextRecord.cookies, lastModified, diff --git a/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/SlopSerializableCrawlDataStream.java b/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/SlopSerializableCrawlDataStream.java new file mode 100644 index 00000000..5d0f7ff8 --- /dev/null +++ b/code/processes/crawling-process/model/java/nu/marginalia/io/crawldata/format/SlopSerializableCrawlDataStream.java @@ -0,0 +1,181 @@ +package nu.marginalia.io.crawldata.format; + +import nu.marginalia.contenttype.ContentType; +import nu.marginalia.io.SerializableCrawlDataStream; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawldata.*; +import nu.marginalia.slop.SlopCrawlDataRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.NoSuchElementException; + +public class SlopSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { + private static final Logger logger = LoggerFactory.getLogger(SlopSerializableCrawlDataStream.class); + + private final SlopCrawlDataRecord.FilteringReader reader; + + // Holds the next value. This is not a buffer, but to deal with the fact that + // we sometimes generate multiple SerializableCrawlData records for a single input + private final Deque nextQ = new ArrayDeque<>(); + + private boolean wroteDomainRecord = false; + private final Path path; + + public SlopSerializableCrawlDataStream(Path file) throws IOException { + path = file; + reader = new SlopCrawlDataRecord.FilteringReader(file) { + @Override + public boolean filter(String url, int status, String contentType) { + String ctLc = contentType.toLowerCase(); + + if (ctLc.startsWith("text/")) + return true; + else if (ctLc.startsWith("x-marginalia/")) + return true; + + return false; + } + }; + } + + @Override + public Path path() { + return path; + } + + public int sizeHint() { + // Only calculate size hint for large files + // (the reason we calculate them in the first place is to assess whether it is large + // because it has many documents, or because it is a small number of large documents) + try { + if (Files.size(path) > 10_000_000) { + return SlopCrawlDataRecord.countGoodStatusCodes(path); + } + } catch (IOException e) { + // suppressed + } + + return 0; + } + + @Override + public boolean hasNext() { + try { + while (reader.hasRemaining() && nextQ.isEmpty()) { + try { + var nextRecord = reader.get(); + if (!wroteDomainRecord) { + createDomainRecord(nextRecord); + wroteDomainRecord = true; + } + + createDocumentRecord(nextRecord); + } catch (Exception ex) { + logger.error("Failed to create document record", ex); + } + } + return !nextQ.isEmpty(); + } + catch (IOException ex) { + return false; + } + } + + private void createDomainRecord(SlopCrawlDataRecord parquetRecord) throws URISyntaxException { + + CrawlerDomainStatus status = CrawlerDomainStatus.OK; + String statusReason = ""; + + String redirectDomain = null; + + // The advisory content types are used to signal various states of the crawl + // that are not actual crawled documents. + + switch (parquetRecord.contentType()) { + case "x-marginalia/advisory;state=redirect" -> { + EdgeUrl crawledUrl = new EdgeUrl(parquetRecord.url()); + redirectDomain = crawledUrl.getDomain().toString(); + status = CrawlerDomainStatus.REDIRECT; + } + case "x-marginalia/advisory;state=blocked" -> { + status = CrawlerDomainStatus.BLOCKED; + } + case "x-marginalia/advisory;state=error" -> { + status = CrawlerDomainStatus.ERROR; + statusReason = new String(parquetRecord.body()); + } + } + + nextQ.add(new CrawledDomain( + parquetRecord.domain(), + redirectDomain, + status.toString(), + statusReason, + parquetRecord.ip(), + new ArrayList<>(), + new ArrayList<>() + )); + } + + private void createDocumentRecord(SlopCrawlDataRecord nextRecord) { + CrawlerDocumentStatus status = CrawlerDocumentStatus.OK; + + if (nextRecord.contentType().startsWith("x-marginalia/advisory;state=content-type-failed-probe")) { + status = CrawlerDocumentStatus.BAD_CONTENT_TYPE; + } + else if (nextRecord.contentType().startsWith("x-marginalia/advisory;state=robots-txt-skipped")) { + status = CrawlerDocumentStatus.ROBOTS_TXT; + } + else if (nextRecord.contentType().startsWith("x-marginalia/advisory")) { + // we don't care about the other advisory content types here + return; + } + else if (nextRecord.body() != null) { + try { + ContentType.parse(nextRecord.contentType()); + } catch (Exception ex) { + logger.error("Failed to convert body to string", ex); + status = CrawlerDocumentStatus.BAD_CHARSET; + } + } + else { + status = CrawlerDocumentStatus.ERROR; + } + + nextQ.add(new CrawledDocument("", + nextRecord.url(), + nextRecord.contentType(), + Instant.ofEpochMilli(nextRecord.timestamp()).toString(), + nextRecord.httpStatus(), + status.toString(), + "", + nextRecord.headers(), + nextRecord.body(), + // this field isn't actually used, maybe we can skip calculating it? + nextRecord.cookies(), + null, + null)); + } + + public void close() throws IOException { + reader.close(); + } + + @Override + public SerializableCrawlData next() throws IOException { + if (!hasNext()) + throw new NoSuchElementException(); + + return nextQ.poll(); + } + +} diff --git a/code/processes/crawling-process/model/java/nu/marginalia/model/body/DocumentBodyExtractor.java b/code/processes/crawling-process/model/java/nu/marginalia/model/body/DocumentBodyExtractor.java index ebd3d33e..ceb71545 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/model/body/DocumentBodyExtractor.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/model/body/DocumentBodyExtractor.java @@ -18,7 +18,7 @@ public class DocumentBodyExtractor { return asBytes(fetchOk); } else if (result instanceof HttpFetchResult.Result304ReplacedWithReference retained) { - return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body().getBytes()); + return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body()); } return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok"); diff --git a/code/processes/crawling-process/model/java/nu/marginalia/model/body/HttpFetchResult.java b/code/processes/crawling-process/model/java/nu/marginalia/model/body/HttpFetchResult.java index e2b047fc..2f6df9bb 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/model/body/HttpFetchResult.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/model/body/HttpFetchResult.java @@ -99,20 +99,10 @@ public sealed interface HttpFetchResult { * * @see Result304Raw for the case where the document has not yet been replaced with the reference data. */ - record Result304ReplacedWithReference(String url, ContentType contentType, String body) implements HttpFetchResult { - + record Result304ReplacedWithReference(String url, ContentType contentType, byte[] body) implements HttpFetchResult { public boolean isOk() { return true; } - - public Optional parseDocument() { - try { - return Optional.of(Jsoup.parse(body)); - } - catch (Exception ex) { - return Optional.empty(); - } - } } /** Fetching resulted in an exception */ diff --git a/code/processes/crawling-process/model/java/nu/marginalia/model/crawldata/CrawledDocument.java b/code/processes/crawling-process/model/java/nu/marginalia/model/crawldata/CrawledDocument.java index 656e4b0f..bcadf0ad 100644 --- a/code/processes/crawling-process/model/java/nu/marginalia/model/crawldata/CrawledDocument.java +++ b/code/processes/crawling-process/model/java/nu/marginalia/model/crawldata/CrawledDocument.java @@ -1,8 +1,16 @@ package nu.marginalia.model.crawldata; +import nu.marginalia.contenttype.ContentType; +import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.model.EdgeUrl; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.Nullable; +import org.jsoup.nodes.Document; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Objects; public final class CrawledDocument implements SerializableCrawlData { public String crawlId; @@ -19,8 +27,49 @@ public final class CrawledDocument implements SerializableCrawlData { @Nullable public String headers; - public String documentBody; + public String documentBody() { + return DocumentBodyToString.getStringData( + ContentType.parse(contentType), + documentBodyBytes); + } + /** Attempt to parse the first sampleSize bytes of the document body into a string */ + public String documentBody(int sampleSize) { + if (sampleSize >= documentBodyBytes.length) { + return documentBody(); + } + + // Truncating the string at an unlucky point *may* lead to a parsing error + // ... so we try again with a longer length + for (int i = 0; i <= 3 && sampleSize + i < documentBodyBytes.length; i++) { + try { + byte[] bytes = new byte[sampleSize + i]; + System.arraycopy(documentBodyBytes, 0, bytes, 0, bytes.length); + + return DocumentBodyToString.getStringData( + ContentType.parse(contentType), + bytes); + } + catch (RuntimeException ex) { + // Try again with i + 1 + } + } + + throw new IllegalArgumentException("Failed to parse substring"); + } + + public Document parseBody() throws IOException { + return DocumentBodyToString.getParsedData( + ContentType.parse(contentType), + documentBodyBytes, + url); + } + + public boolean hasBody() { + return documentBodyBytes.length > 0; + } + + public byte[] documentBodyBytes; /** * This is not guaranteed to be set in all versions of the format, * information may come in CrawledDomain instead @@ -30,7 +79,7 @@ public final class CrawledDocument implements SerializableCrawlData { public String lastModifiedMaybe; public String etagMaybe; - public CrawledDocument(String crawlId, String url, String contentType, String timestamp, int httpStatus, String crawlerStatus, String crawlerStatusDesc, @Nullable String headers, String documentBody, Boolean hasCookies, String lastModifiedMaybe, String etagMaybe) { + public CrawledDocument(String crawlId, String url, String contentType, String timestamp, int httpStatus, String crawlerStatus, String crawlerStatusDesc, @Nullable String headers, byte[] documentBodyBytes, Boolean hasCookies, String lastModifiedMaybe, String etagMaybe) { this.crawlId = crawlId; this.url = url; this.contentType = contentType; @@ -39,7 +88,7 @@ public final class CrawledDocument implements SerializableCrawlData { this.crawlerStatus = crawlerStatus; this.crawlerStatusDesc = crawlerStatusDesc; this.headers = headers; - this.documentBody = documentBody; + this.documentBodyBytes = Objects.requireNonNullElse(documentBodyBytes, new byte[] {}); this.hasCookies = hasCookies; this.lastModifiedMaybe = lastModifiedMaybe; this.etagMaybe = etagMaybe; @@ -106,7 +155,7 @@ public final class CrawledDocument implements SerializableCrawlData { } public String toString() { - return "CrawledDocument(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")"; + return "CrawledDocument(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + documentBody() + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")"; } public static class CrawledDocumentBuilder { @@ -118,7 +167,7 @@ public final class CrawledDocument implements SerializableCrawlData { private String crawlerStatus; private String crawlerStatusDesc; private @Nullable String headers; - private String documentBody; + private byte[] documentBodyBytes = new byte[0]; private String recrawlState; private Boolean hasCookies; private String lastModifiedMaybe; @@ -168,10 +217,13 @@ public final class CrawledDocument implements SerializableCrawlData { } public CrawledDocumentBuilder documentBody(String documentBody) { - this.documentBody = documentBody; + this.documentBodyBytes = documentBody.getBytes(StandardCharsets.UTF_8); + return this; + } + public CrawledDocumentBuilder documentBodyBytes(byte[] documentBodyBytes) { + this.documentBodyBytes = documentBodyBytes; return this; } - @Deprecated public CrawledDocumentBuilder recrawlState(String recrawlState) { this.recrawlState = recrawlState; @@ -194,11 +246,11 @@ public final class CrawledDocument implements SerializableCrawlData { } public CrawledDocument build() { - return new CrawledDocument(this.crawlId, this.url, this.contentType, this.timestamp, this.httpStatus, this.crawlerStatus, this.crawlerStatusDesc, this.headers, this.documentBody, this.hasCookies, this.lastModifiedMaybe, this.etagMaybe); + return new CrawledDocument(this.crawlId, this.url, this.contentType, this.timestamp, this.httpStatus, this.crawlerStatus, this.crawlerStatusDesc, this.headers, this.documentBodyBytes, this.hasCookies, this.lastModifiedMaybe, this.etagMaybe); } public String toString() { - return "CrawledDocument.CrawledDocumentBuilder(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")"; + return "CrawledDocument.CrawledDocumentBuilder(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBodyBytes=" + Arrays.toString(this.documentBodyBytes) + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")"; } } } diff --git a/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java b/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java new file mode 100644 index 00000000..8842d7e8 --- /dev/null +++ b/code/processes/crawling-process/model/java/nu/marginalia/slop/SlopCrawlDataRecord.java @@ -0,0 +1,520 @@ +package nu.marginalia.slop; + +import nu.marginalia.ContentTypes; +import nu.marginalia.UserAgent; +import nu.marginalia.model.body.DocumentBodyExtractor; +import nu.marginalia.model.body.DocumentBodyResult; +import nu.marginalia.model.body.HttpFetchResult; +import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecord; +import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileReader; +import nu.marginalia.slop.column.array.ByteArrayColumn; +import nu.marginalia.slop.column.primitive.ByteColumn; +import nu.marginalia.slop.column.primitive.LongColumn; +import nu.marginalia.slop.column.primitive.ShortColumn; +import nu.marginalia.slop.column.string.EnumColumn; +import nu.marginalia.slop.column.string.StringColumn; +import nu.marginalia.slop.desc.StorageType; +import nu.marginalia.slop.storage.LargeItem; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.netpreserve.jwarc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.StringJoiner; + +public record SlopCrawlDataRecord(String domain, + String url, + String ip, + boolean cookies, + int httpStatus, + long timestamp, + String contentType, + byte[] body, + String headers) +{ + private static final EnumColumn domainColumn = new EnumColumn("domain", StandardCharsets.UTF_8, StorageType.ZSTD); + private static final StringColumn urlColumn = new StringColumn("url", StandardCharsets.UTF_8, StorageType.ZSTD); + private static final StringColumn ipColumn = new StringColumn("ip", StandardCharsets.ISO_8859_1, StorageType.ZSTD); + private static final ByteColumn cookiesColumn = new ByteColumn("cookies"); + private static final ShortColumn statusColumn = new ShortColumn("httpStatus"); + private static final LongColumn timestampColumn = new LongColumn("timestamp"); + private static final EnumColumn contentTypeColumn = new EnumColumn("contentType", StandardCharsets.UTF_8); + private static final ByteArrayColumn bodyColumn = new ByteArrayColumn("body", StorageType.ZSTD); + private static final StringColumn headerColumn = new StringColumn("header", StandardCharsets.UTF_8, StorageType.ZSTD); + + public SlopCrawlDataRecord(CrawledDocumentParquetRecord parquetRecord) { + this(parquetRecord.domain, + parquetRecord.url, + parquetRecord.ip, + parquetRecord.cookies, + parquetRecord.httpStatus, + parquetRecord.timestamp.toEpochMilli(), + parquetRecord.contentType, + parquetRecord.body, + parquetRecord.headers + ); + } + + + private static SlopCrawlDataRecord forDomainRedirect(String domain, Instant date, String redirectDomain) { + return new SlopCrawlDataRecord(domain, + "https://" + redirectDomain + "/", + "", + false, + 0, + date.toEpochMilli(), + "x-marginalia/advisory;state=redirect", + new byte[0], + "" + ); + } + + private static SlopCrawlDataRecord forDomainError(String domain, Instant date, String ip, String errorStatus) { + return new SlopCrawlDataRecord(domain, + "https://" + domain + "/", + ip, + false, + 0, + date.toEpochMilli(), + "x-marginalia/advisory;state=error", + errorStatus.getBytes(), + "" + ); + } + + private static SlopCrawlDataRecord forDocError(String domain, Instant date, String url, String errorStatus) { + return new SlopCrawlDataRecord(domain, + url, + "", + false, + 0, + date.toEpochMilli(), + errorStatus, + new byte[0], + "" + ); + } + + + public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException { + Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion"); + + try (var writer = new Writer(tempDir)) { + CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach( + parquetRecord -> { + try { + writer.write(new SlopCrawlDataRecord(parquetRecord)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + catch (IOException ex) { + FileUtils.deleteDirectory(tempDir.toFile()); + throw ex; + } + + try { + SlopTablePacker.packToSlopZip(tempDir, slopOutput); + FileUtils.deleteDirectory(tempDir.toFile()); + } + catch (Exception ex) { + logger.error("Failed to convert WARC file to Parquet", ex); + } + } + + private static final Logger logger = LoggerFactory.getLogger(SlopCrawlDataRecord.class); + + public static void convertWarc(String domain, + UserAgent userAgent, + Path warcInputFile, + Path slopOutputFile) throws IOException { + + Path tempDir = Files.createTempDirectory(slopOutputFile.getParent(), "slop-"+domain); + + try (var warcReader = new WarcReader(warcInputFile); + var slopWriter = new SlopCrawlDataRecord.Writer(tempDir) + ) { + WarcXResponseReference.register(warcReader); + WarcXEntityRefused.register(warcReader); + + String uaString = userAgent.uaString(); + + for (var record : warcReader) { + try { + if (record instanceof WarcResponse response) { + // this also captures WarcXResponseReference, which inherits from WarcResponse + // and is used to store old responses from previous crawls; in this part of the logic + // we treat them the same as a normal response + + if (!filterResponse(uaString, response)) { + continue; + } + + slopWriter.write(domain, response); + } else if (record instanceof WarcXEntityRefused refused) { + slopWriter.write(domain, refused); + } else if (record instanceof Warcinfo warcinfo) { + slopWriter.write(warcinfo); + } + } + catch (Exception ex) { + logger.error("Failed to convert WARC record to Parquet", ex); + } + } + } + catch (Exception ex) { + logger.error("Failed to convert WARC file to Parquet", ex); + } + + try { + SlopTablePacker.packToSlopZip(tempDir, slopOutputFile); + FileUtils.deleteDirectory(tempDir.toFile()); + } + catch (Exception ex) { + logger.error("Failed to convert WARC file to Parquet", ex); + } + } + + + + /** Return true if the WarcResponse should be excluded from conversion */ + private static boolean filterResponse(String uaString, WarcResponse response) throws IOException { + + // We don't want to store robots.txt files, as they are not + // interesting for the analysis we want to do. This is important + // since txt-files in general are interesting, and we don't want to + // exclude them as a class. + + if (response.targetURI().getPath().equals("/robots.txt")) { + return false; + } + + var headers = response.http().headers(); + var robotsTags = headers.all("X-Robots-Tag"); + + if (!isXRobotsTagsPermitted(robotsTags, uaString)) { + return false; + } + + // Strip out responses with content types we aren't interested in + // (though ideally we wouldn't download these at all) + String contentType = headers.first("Content-Type").orElse("text/plain").toLowerCase(); + + if (!ContentTypes.isAccepted(contentType)) { + return false; + } + + return true; + } + + /** Check X-Robots-Tag header tag to see if we are allowed to index this page. + *

+ * Reference: https://developers.google.com/search/docs/crawling-indexing/robots-meta-tag + * + * @param xRobotsHeaderTags List of X-Robots-Tag values + * @param userAgent User agent string + * @return true if we are allowed to index this page + */ + // Visible for tests + public static boolean isXRobotsTagsPermitted(List xRobotsHeaderTags, String userAgent) { + boolean isPermittedGeneral = true; + boolean isPermittedMarginalia = false; + boolean isForbiddenMarginalia = false; + + for (String header : xRobotsHeaderTags) { + if (header.indexOf(':') >= 0) { + String[] parts = StringUtils.split(header, ":", 2); + + if (parts.length < 2) + continue; + + // Is this relevant to us? + if (!Objects.equals(parts[0].trim(), userAgent)) + continue; + + if (parts[1].contains("noindex")) + isForbiddenMarginalia = true; + else if (parts[1].contains("none")) + isForbiddenMarginalia = true; + else if (parts[1].contains("all")) + isPermittedMarginalia = true; + } + else { + if (header.contains("noindex")) + isPermittedGeneral = false; + if (header.contains("none")) + isPermittedGeneral = false; + } + } + + if (isPermittedMarginalia) + return true; + if (isForbiddenMarginalia) + return false; + return isPermittedGeneral; + } + + public static int countGoodStatusCodes(Path path) throws IOException { + int cnt = 0; + + try (var table = new SlopTable(path)) { + ShortColumn.Reader statusReader = statusColumn.open(table); + while (statusReader.hasRemaining()) { + if (statusReader.get() == 200) { + cnt++; + } + } + } + + return cnt; + } + + public static class Writer extends SlopTable { + private final EnumColumn.Writer domainColumnWriter; + private final StringColumn.Writer urlColumnWriter; + private final StringColumn.Writer ipColumnWriter; + private final ByteColumn.Writer cookiesColumnWriter; + private final ShortColumn.Writer statusColumnWriter; + private final LongColumn.Writer timestampColumnWriter; + private final EnumColumn.Writer contentTypeColumnWriter; + private final ByteArrayColumn.Writer bodyColumnWriter; + private final StringColumn.Writer headerColumnWriter; + + public Writer(Path path) throws IOException { + super(path); + + domainColumnWriter = domainColumn.create(this); + urlColumnWriter = urlColumn.create(this); + ipColumnWriter = ipColumn.create(this); + cookiesColumnWriter = cookiesColumn.create(this); + statusColumnWriter = statusColumn.create(this); + timestampColumnWriter = timestampColumn.create(this); + contentTypeColumnWriter = contentTypeColumn.create(this); + bodyColumnWriter = bodyColumn.create(this); + headerColumnWriter = headerColumn.create(this); + } + + public void write(SlopCrawlDataRecord record) throws IOException { + domainColumnWriter.put(record.domain); + urlColumnWriter.put(record.url); + ipColumnWriter.put(record.ip); + cookiesColumnWriter.put(record.cookies ? (byte) 1 : (byte) 0); + statusColumnWriter.put((short) record.httpStatus); + timestampColumnWriter.put(record.timestamp); + contentTypeColumnWriter.put(record.contentType); + bodyColumnWriter.put(record.body); + headerColumnWriter.put(record.headers); + } + + public void write(String domain, WarcResponse response) throws IOException { + + HttpFetchResult result = HttpFetchResult.importWarc(response); + if (!(result instanceof HttpFetchResult.ResultOk fetchOk)) { + return; + } + + byte[] bodyBytes; + String contentType; + + var body = DocumentBodyExtractor.asBytes(result); + + var headers = fetchOk.headers(); + + if (body instanceof DocumentBodyResult.Ok bodyOk) { + bodyBytes = bodyOk.body(); + contentType = bodyOk.contentType().toString(); + } + else { + bodyBytes = new byte[0]; + contentType = ""; + } + + String headersStr; + StringJoiner headersStrBuilder = new StringJoiner("\n"); + for (var header : headers.map().entrySet()) { + for (var value : header.getValue()) { + headersStrBuilder.add(header.getKey() + ": " + value); + } + } + headersStr = headersStrBuilder.toString(); + + + write(new SlopCrawlDataRecord( + domain, + response.target(), + fetchOk.ipAddress(), + "1".equals(headers.firstValue("X-Cookies").orElse("0")), + fetchOk.statusCode(), + response.date().toEpochMilli(), + contentType, + bodyBytes, + headersStr + ) + ); + } + + private void write(String domain, WarcXEntityRefused refused) throws IOException { + URI profile = refused.profile(); + + String meta; + if (profile.equals(WarcXEntityRefused.documentRobotsTxtSkippedURN)) { + meta = "x-marginalia/advisory;state=robots-txt-skipped"; + } + else if (profile.equals(WarcXEntityRefused.documentBadContentTypeURN)) { + meta = "x-marginalia/advisory;state=content-type-failed-probe"; + } + else if (profile.equals(WarcXEntityRefused.documentProbeTimeout)) { + meta = "x-marginalia/advisory;state=timeout-probe"; + } + else if (profile.equals(WarcXEntityRefused.documentUnspecifiedError)) { + meta = "x-marginalia/advisory;state=doc-error"; + } + else { + meta = "x-marginalia/advisory;state=unknown"; + } + + write(forDocError(domain, refused.date(), refused.target(), meta)); + } + + private void write(Warcinfo warcinfo) throws IOException { + String selfDomain = warcinfo.fields().first("domain").orElse(""); + String ip = warcinfo.fields().first("ip").orElse(""); + String probeStatus = warcinfo.fields().first("X-WARC-Probe-Status").orElse(""); + + if (probeStatus.startsWith("REDIRECT")) { + String redirectDomain = probeStatus.substring("REDIRECT;".length()); + write(forDomainRedirect(selfDomain, warcinfo.date(), redirectDomain)); + } + else if (!"OK".equals(probeStatus)) { + write(forDomainError(selfDomain, warcinfo.date(), ip, probeStatus)); + } + } + } + + public static class Reader extends SlopTable { + private final EnumColumn.Reader domainColumnReader; + private final StringColumn.Reader urlColumnReader; + private final StringColumn.Reader ipColumnReader; + private final ByteColumn.Reader cookiesColumnReader; + private final ShortColumn.Reader statusColumnReader; + private final LongColumn.Reader timestampColumnReader; + private final EnumColumn.Reader contentTypeColumnReader; + private final ByteArrayColumn.Reader bodyColumnReader; + private final StringColumn.Reader headerColumnReader; + + public Reader(Path path) throws IOException { + super(path); + + domainColumnReader = domainColumn.open(this); + urlColumnReader = urlColumn.open(this); + ipColumnReader = ipColumn.open(this); + cookiesColumnReader = cookiesColumn.open(this); + statusColumnReader = statusColumn.open(this); + timestampColumnReader = timestampColumn.open(this); + contentTypeColumnReader = contentTypeColumn.open(this); + bodyColumnReader = bodyColumn.open(this); + headerColumnReader = headerColumn.open(this); + } + + public SlopCrawlDataRecord get() throws IOException { + return new SlopCrawlDataRecord( + domainColumnReader.get(), + urlColumnReader.get(), + ipColumnReader.get(), + cookiesColumnReader.get() == 1, + statusColumnReader.get(), + timestampColumnReader.get(), + contentTypeColumnReader.get(), + bodyColumnReader.get(), + headerColumnReader.get() + ); + } + + public boolean hasRemaining() throws IOException { + return domainColumnReader.hasRemaining(); + } + } + + + public abstract static class FilteringReader extends SlopTable { + private final EnumColumn.Reader domainColumnReader; + private final StringColumn.Reader urlColumnReader; + private final StringColumn.Reader ipColumnReader; + private final ByteColumn.Reader cookiesColumnReader; + private final ShortColumn.Reader statusColumnReader; + private final LongColumn.Reader timestampColumnReader; + private final EnumColumn.Reader contentTypeColumnReader; + private final ByteArrayColumn.Reader bodyColumnReader; + private final StringColumn.Reader headerColumnReader; + + private SlopCrawlDataRecord next = null; + + public FilteringReader(Path path) throws IOException { + super(path); + + domainColumnReader = domainColumn.open(this); + urlColumnReader = urlColumn.open(this); + ipColumnReader = ipColumn.open(this); + cookiesColumnReader = cookiesColumn.open(this); + statusColumnReader = statusColumn.open(this); + timestampColumnReader = timestampColumn.open(this); + contentTypeColumnReader = contentTypeColumn.open(this); + bodyColumnReader = bodyColumn.open(this); + headerColumnReader = headerColumn.open(this); + } + + public abstract boolean filter(String url, int status, String contentType); + + public SlopCrawlDataRecord get() throws IOException { + if (next == null) { + if (!hasRemaining()) { + throw new IllegalStateException("No more values remaining"); + } + } + var val = next; + next = null; + return val; + } + + public boolean hasRemaining() throws IOException { + if (next != null) + return true; + + while (domainColumnReader.hasRemaining()) { + String domain = domainColumnReader.get(); + String url = urlColumnReader.get(); + String ip = ipColumnReader.get(); + boolean cookies = cookiesColumnReader.get() == 1; + int status = statusColumnReader.get(); + long timestamp = timestampColumnReader.get(); + String contentType = contentTypeColumnReader.get(); + + LargeItem body = bodyColumnReader.getLarge(); + LargeItem headers = headerColumnReader.getLarge(); + + if (filter(url, status, contentType)) { + next = new SlopCrawlDataRecord( + domain, url, ip, cookies, status, timestamp, contentType, body.get(), headers.get() + ); + return true; + } + else { + body.close(); + headers.close(); + } + } + + return false; + } + } +} diff --git a/code/processes/crawling-process/model/test/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java b/code/processes/crawling-process/model/test/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java index cef2a0f2..9809078d 100644 --- a/code/processes/crawling-process/model/test/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java +++ b/code/processes/crawling-process/model/test/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java @@ -80,7 +80,7 @@ class CrawledDocumentParquetRecordFileWriterTest { var document = (CrawledDocument) secondItem; assertEquals("https://www.marginalia.nu/", document.url); assertEquals("text/html", document.contentType); - assertEquals("hello world", document.documentBody); + assertEquals("hello world", document.documentBody()); assertEquals(200, document.httpStatus); } @@ -103,7 +103,7 @@ class CrawledDocumentParquetRecordFileWriterTest { System.out.println(doc.url); System.out.println(doc.contentType); System.out.println(doc.httpStatus); - System.out.println(doc.documentBody.length()); + System.out.println(doc.documentBody().length()); } } } catch (IOException e) { diff --git a/code/processes/crawling-process/model/test/nu/marginalia/slop/SlopCrawlDataRecordTest.java b/code/processes/crawling-process/model/test/nu/marginalia/slop/SlopCrawlDataRecordTest.java new file mode 100644 index 00000000..d2f84c30 --- /dev/null +++ b/code/processes/crawling-process/model/test/nu/marginalia/slop/SlopCrawlDataRecordTest.java @@ -0,0 +1,85 @@ +package nu.marginalia.slop; + +import nu.marginalia.contenttype.ContentType; +import org.jsoup.Jsoup; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; + +class SlopCrawlDataRecordTest { + + @Test + public void test() throws IOException { + +// Files.deleteIfExists(Path.of("/tmp/steam.slop.zip")); +// SlopCrawlDataRecord.convertFromParquet( +// Path.of("/home/vlofgren/Downloads/_storage_crawl-data__23-10-21T15_08_43.750_3b_41_3b41e714-store.steampowered.com.parquet"), +// Path.of("/tmp/steam.slop.zip") +// ); +// long st = 0; +// try (var reader = new SlopCrawlDataRecord.FilteringReader(Path.of("/tmp/steam.slop.zip")) { +// public boolean filter(String url, int status, String contentType) { +// return contentType.startsWith("text/html"); +// } +// }) { +// Instant start = Instant.now(); +// while (reader.hasRemaining()) { +// var next = reader.get(); +// byte[] body = next.body(); +// +// st += Jsoup.parse(new String(body)).title().length(); +// } +// System.out.println(st + " " + Duration.between(start, Instant.now())); +// } + + long st = 0; + try (var reader = new SlopCrawlDataRecord.FilteringReader(Path.of("/tmp/steam.slop.zip")) { + public boolean filter(String url, int status, String contentType) { + return contentType.startsWith("text/html"); + } + }) { + Instant start = Instant.now(); + while (reader.hasRemaining()) { + var next = reader.get(); + byte[] body = next.body(); + + st += Jsoup.parse(new ByteArrayInputStream(body), ContentType.parse(next.contentType()).asCharset().name(), next.url()).title().length(); + + } + System.out.println(Duration.between(start, Instant.now())); + } + +// System.out.println("BEGIN Slop"); +// for (int i = 0; i < 6; i++) { +// int sz = 0; +// Instant start = Instant.now(); +// try (var reader = new SlopCrawlDataRecord.Reader(Path.of("/tmp/steam.slop.zip")) { +// public boolean filter(String url, int status, String contentType) { +// return contentType.startsWith("text/html"); +// } +// }) { +// while (reader.hasRemaining()) { +// sz += reader.get().httpStatus(); +// } +// } +// Instant end = Instant.now(); +// System.out.println("END Iter " + sz + " " + Duration.between(start, end)); +// } +// System.out.println("END Slop"); +// +// System.out.println("BEGIN Parquet"); +// for (int i = 0; i < 6; i++) { +// Instant start = Instant.now(); +// int sz = CrawledDocumentParquetRecordFileReader.stream(Path.of("/home/vlofgren/Downloads/_storage_crawl-data__23-10-21T15_08_43.750_3b_41_3b41e714-store.steampowered.com.parquet")) +// .filter(record -> record.contentType.startsWith("text/html")) +// .mapToInt(record -> record.httpStatus).sum(); +// Instant end = Instant.now(); +// System.out.println("END Iter " + sz + " " + Duration.between(start, end)); +// } +// System.out.println("END Parquet"); + } +} \ No newline at end of file diff --git a/code/processes/crawling-process/test/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java b/code/processes/crawling-process/test/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java index a53feb33..a7e280f5 100644 --- a/code/processes/crawling-process/test/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java +++ b/code/processes/crawling-process/test/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java @@ -82,7 +82,7 @@ class WarcRecorderTest { recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"), "text/html", 200, - "test", + "test".getBytes(), null, ContentTags.empty()); } @@ -118,7 +118,7 @@ class WarcRecorderTest { recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"), "text/html", 200, - "test", + "test".getBytes(), null, ContentTags.empty()); } diff --git a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index 6c9975ed..2c264e4b 100644 --- a/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/test/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -141,7 +141,7 @@ public class CrawlerMockFetcherTest { public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags, ProbeType probeType) { logger.info("Fetching {}", url); if (mockData.containsKey(url)) { - byte[] bodyBytes = mockData.get(url).documentBody.getBytes(); + byte[] bodyBytes = mockData.get(url).documentBodyBytes; try { return new HttpFetchResult.ResultOk( diff --git a/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java index 320171bb..f31d39eb 100644 --- a/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java +++ b/code/processes/export-task-process/java/nu/marginalia/extractor/AtagExporter.java @@ -14,7 +14,6 @@ import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; import org.apache.commons.lang3.StringUtils; -import org.jsoup.Jsoup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +51,9 @@ public class AtagExporter implements ExporterIf { try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))))) { - Path crawlerLogFile = inputDir.resolve("crawler.log"); - var tagWriter = new ATagCsvWriter(bw); - for (var item : WorkLog.iterable(crawlerLogFile)) { + for (var item : WorkLog.iterable(inputDir.resolve("crawler.log"))) { if (Thread.interrupted()) { throw new InterruptedException(); } @@ -89,15 +86,19 @@ public class AtagExporter implements ExporterIf { while (stream.hasNext()) { if (!(stream.next() instanceof CrawledDocument doc)) continue; - if (null == doc.documentBody) + if (!doc.hasBody()) continue; if (!doc.contentType.toLowerCase().startsWith("text/html")) continue; var baseUrl = new EdgeUrl(doc.url); - var parsed = Jsoup.parse(doc.documentBody); + var parsed = doc.parseBody(); for (var atag : parsed.getElementsByTag("a")) { + if (!atag.hasAttr("href")) { + continue; + } + String linkText = atag.text(); if (!linkFilter.isLinkTextEligible(linkText)) { diff --git a/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java index abe5c708..2f5b1d90 100644 --- a/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java +++ b/code/processes/export-task-process/java/nu/marginalia/extractor/FeedExporter.java @@ -12,7 +12,6 @@ import nu.marginalia.process.log.WorkLog; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; -import org.jsoup.Jsoup; import java.io.BufferedWriter; import java.io.IOException; @@ -81,13 +80,13 @@ public class FeedExporter implements ExporterIf { while (stream.hasNext()) { if (!(stream.next() instanceof CrawledDocument doc)) continue; - if (null == doc.documentBody) + if (!doc.hasBody()) continue; if (!doc.contentType.toLowerCase().startsWith("text/html")) continue; var baseUrl = new EdgeUrl(doc.url); - var parsed = Jsoup.parse(doc.documentBody); + var parsed = doc.parseBody(); List feedUrls = new ArrayList<>(); for (var link : parsed.select("link[rel=alternate]")) { diff --git a/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java b/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java index 3255edf2..08562251 100644 --- a/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java +++ b/code/processes/export-task-process/java/nu/marginalia/extractor/TermFrequencyExporter.java @@ -15,7 +15,6 @@ import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.util.SimpleBlockingThreadPool; -import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,13 +109,13 @@ public class TermFrequencyExporter implements ExporterIf { return; if (!(stream.next() instanceof CrawledDocument doc)) continue; - if (doc.documentBody == null) continue; + if (!doc.hasBody()) continue; if (!doc.contentType.toLowerCase().startsWith("text/html")) continue; docCount.incrementAndGet(); - Document parsed = Jsoup.parse(doc.documentBody); + Document parsed = doc.parseBody(); parsed.body().filter(new DomPruningFilter(0.5)); DocumentLanguageData dld = se.extractSentences(parsed); diff --git a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java index 4324c254..4e94bb52 100644 --- a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java +++ b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlDataSet.java @@ -119,12 +119,16 @@ public class LiveCrawlDataSet implements AutoCloseable { } } - private String decompress(byte[] data) { + private String decompressStr(byte[] data) { + return new String(decompressBytes(data)); + } + + private byte[] decompressBytes(byte[] data) { // gzip decompression - try (var bis = new ByteArrayInputStream(data); - var gzip = new GZIPInputStream(bis)) + try (ByteArrayInputStream bis = new ByteArrayInputStream(data); + GZIPInputStream gzip = new GZIPInputStream(bis)) { - return new String(gzip.readAllBytes()); + return gzip.readAllBytes(); } catch (IOException ex) { throw new RuntimeException(ex); @@ -177,8 +181,8 @@ public class LiveCrawlDataSet implements AutoCloseable { dataStack = new ArrayList<>(); while (rs.next()) { String url = rs.getString("url"); - String body = decompress(rs.getBytes("body")); - String headers = decompress(rs.getBytes("headers")); + byte[] body = decompressBytes(rs.getBytes("body")); + String headers = decompressStr(rs.getBytes("headers")); dataStack.add(new CrawledDocument( "LIVE", diff --git a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java index bb193e51..b97d84c8 100644 --- a/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java +++ b/code/processes/live-crawling-process/java/nu/marginalia/livecrawler/LiveCrawlerMain.java @@ -200,7 +200,7 @@ public class LiveCrawlerMain extends ProcessMainClass { writer.setOrdinalOffset(67_000_000); for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) { - writer.write(domainProcessor.sideloadProcessing(stream, 0, Set.of("special:live"))); + writer.write(domainProcessor.simpleProcessing(stream, 0, Set.of("special:live"))); } } diff --git a/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java b/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java index ee1124e9..f45675e1 100644 --- a/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java +++ b/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/LiveCrawlDataSetTest.java @@ -51,7 +51,7 @@ public class LiveCrawlDataSetTest { case CrawledDocument document -> { dataCount++; Assertions.assertEquals("https://www.example.com/", document.url); - Assertions.assertEquals("test", document.documentBody); + Assertions.assertEquals("test", document.documentBody()); } } } diff --git a/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/SimpleLinkScraperTest.java b/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/SimpleLinkScraperTest.java index 74aa51f2..d7af0eb1 100644 --- a/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/SimpleLinkScraperTest.java +++ b/code/processes/live-crawling-process/test/nu/marginalia/livecrawler/SimpleLinkScraperTest.java @@ -49,7 +49,7 @@ class SimpleLinkScraperTest { List documents = firstStream.docsAsList(); Assertions.assertEquals(1, documents.size()); - Assertions.assertTrue(documents.getFirst().documentBody.startsWith("> experiments = Map.of( - "test", TestExperiment.class, - "adblock", AdblockExperiment.class, - "topic", TopicExperiment.class, - "sentence-statistics", SentenceStatisticsExperiment.class, - "site-statistics", SiteStatisticsExperiment.class, - "export-atags", ExportExternalLinksExperiment.class, - "debug-converter", DebugConverterExperiment.class ); public static void main(String... args) throws IOException { diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/AdblockExperiment.java b/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/AdblockExperiment.java deleted file mode 100644 index dc46f3bd..00000000 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/AdblockExperiment.java +++ /dev/null @@ -1,45 +0,0 @@ -package nu.marginalia.tools.experiments; - -import com.google.inject.Inject; -import nu.marginalia.converting.processor.DocumentProcessor; -import nu.marginalia.converting.processor.classifier.adblock.AdblockSimulator; -import nu.marginalia.model.crawldata.CrawledDocument; -import nu.marginalia.model.crawldata.CrawledDomain; -import nu.marginalia.tools.LegacyExperiment; -import org.jsoup.Jsoup; -import org.jsoup.nodes.Document; - -public class AdblockExperiment extends LegacyExperiment { - - private final AdblockSimulator simulator; - - @Inject - public AdblockExperiment(AdblockSimulator simulator) { - this.simulator = simulator; - } - - @Override - public boolean process(CrawledDomain domain) { - if (domain.doc == null) return true; - - for (var doc : domain.doc) { - if (DocumentProcessor.isAcceptedContentType(doc) && "OK".equals(doc.crawlerStatus)) { - processDocument(doc); - } - } - - return true; - } - - private void processDocument(CrawledDocument doc) { - Document parsedDocument = Jsoup.parse(doc.documentBody); - - if (simulator.hasAds(parsedDocument)) { - System.out.println(doc.url); - } - } - - @Override - public void onFinish() { - } -} diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/DebugConverterExperiment.java b/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/DebugConverterExperiment.java deleted file mode 100644 index 4a34a31c..00000000 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/DebugConverterExperiment.java +++ /dev/null @@ -1,46 +0,0 @@ -package nu.marginalia.tools.experiments; - -import com.google.inject.Inject; -import nu.marginalia.converting.processor.DomainProcessor; -import nu.marginalia.converting.processor.plugin.specialization.BlogSpecialization; -import nu.marginalia.model.crawldata.CrawledDomain; -import nu.marginalia.tools.LegacyExperiment; -import org.jsoup.Jsoup; - -public class DebugConverterExperiment extends LegacyExperiment { - - - private final DomainProcessor domainProcessor; - - @Inject - public DebugConverterExperiment(DomainProcessor domainProcessor) { - this.domainProcessor = domainProcessor; - - } - - @Override - public boolean process(CrawledDomain domain) { - - if (domain.doc == null) return true; - - for (var doc : domain.doc) { - if (doc.documentBody == null) continue; - - var parsed = Jsoup.parse(doc.documentBody); - - var tagExtractor = new BlogSpecialization.BlogTagExtractor(); - parsed.traverse(tagExtractor); - var tags = tagExtractor.getTags(); - if (!tags.isEmpty()) { - System.out.println(tags); - } - - } - - return true; - } - - @Override - public void onFinish() { - } -} diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/ExportExternalLinksExperiment.java b/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/ExportExternalLinksExperiment.java deleted file mode 100644 index 8a8b092e..00000000 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/ExportExternalLinksExperiment.java +++ /dev/null @@ -1,71 +0,0 @@ -package nu.marginalia.tools.experiments; - -import com.google.inject.Inject; -import gnu.trove.set.hash.TLongHashSet; -import nu.marginalia.hash.MurmurHash3_128; -import nu.marginalia.io.SerializableCrawlDataStream; -import nu.marginalia.link_parser.LinkParser; -import nu.marginalia.model.EdgeUrl; -import nu.marginalia.model.crawldata.CrawledDocument; -import nu.marginalia.tools.Experiment; -import org.jsoup.Jsoup; - -import java.util.Objects; - -public class ExportExternalLinksExperiment extends Experiment { - - @Inject - public ExportExternalLinksExperiment() { - - } - private static final LinkParser linkParser = new LinkParser(); - MurmurHash3_128 hash = new MurmurHash3_128(); - - @Override - public boolean process(SerializableCrawlDataStream stream) { - TLongHashSet hashes = new TLongHashSet(); - - try { - while (stream.hasNext()) { - if (!(stream.next() instanceof CrawledDocument doc)) - continue; - if (null == doc.documentBody) - continue; - - var baseUrl = new EdgeUrl(doc.url); - var parsed = Jsoup.parse(doc.documentBody); - - for (var atag : parsed.getElementsByTag("a")) { - String linkText = atag.text(); - if (linkText.isBlank()) - continue; - - var linkOpt = linkParser.parseLinkPermissive(baseUrl, atag); - linkOpt - .filter(url -> !Objects.equals(url.domain, baseUrl.domain)) - .filter(url -> hashes.add(hash.hashNearlyASCII(linkText) ^ hash.hashNearlyASCII(url.toString()))) - .ifPresent(url -> - System.out.printf("\"%s\",\"%s\",\"%s\"\n", - csvify(url), - csvify(baseUrl.domain), - csvify(linkText))); - } - } - - } - catch (Exception ex) { - ex.printStackTrace(); - return false; - } - - return true; - } - - private static String csvify(Object field) { - return field.toString().replace("\"", "\"\""); - } - - @Override - public void onFinish() { - } -} diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/SentenceStatisticsExperiment.java b/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/SentenceStatisticsExperiment.java deleted file mode 100644 index ba247479..00000000 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/SentenceStatisticsExperiment.java +++ /dev/null @@ -1,70 +0,0 @@ -package nu.marginalia.tools.experiments; - -import com.google.inject.Inject; -import nu.marginalia.WmsaHome; -import nu.marginalia.converting.processor.logic.dom.DomPruningFilter; -import nu.marginalia.keyword.DocumentKeywordExtractor; -import nu.marginalia.keyword.LinkTexts; -import nu.marginalia.language.sentence.SentenceExtractor; -import nu.marginalia.model.EdgeUrl; -import nu.marginalia.model.crawldata.CrawledDomain; -import nu.marginalia.term_frequency_dict.TermFrequencyDict; -import nu.marginalia.tools.LegacyExperiment; -import org.jsoup.Jsoup; - -import java.io.BufferedOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Path; - -public class SentenceStatisticsExperiment extends LegacyExperiment { - - SentenceExtractor se = new SentenceExtractor(WmsaHome.getLanguageModels()); - DocumentKeywordExtractor documentKeywordExtractor = new DocumentKeywordExtractor(new TermFrequencyDict(WmsaHome.getLanguageModels())); - Path filename; - PrintWriter writer; - - @Inject - public SentenceStatisticsExperiment() throws IOException { - filename = Files.createTempFile(getClass().getSimpleName(), ".csv"); - System.out.println("Writing to " + filename); - - writer = new PrintWriter(new BufferedOutputStream(new FileOutputStream(filename.toFile()))); - } - - private void logLine(String message) { - System.out.printf("\u001b[2K\r%s", message); - } - - @Override - public boolean process(CrawledDomain domain) { - if (domain.doc == null) return true; - - logLine("Processing: " + domain.domain); - - ByteBuffer workArea = ByteBuffer.allocate(8192); - for (var doc : domain.doc) { - if (doc.documentBody == null) continue; - - var parsed = Jsoup.parse(doc.documentBody); - - parsed.body().filter(new DomPruningFilter(0.5)); - - var dld = se.extractSentences(parsed); - var keywords = documentKeywordExtractor.extractKeywords(dld, new LinkTexts(), EdgeUrl.parse(doc.url).orElseThrow()); - - keywords.build(workArea); - } - - return true; - } - - @Override - public void onFinish() { - logLine("Done!\n"); - writer.close(); - } -} diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/TestExperiment.java b/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/TestExperiment.java deleted file mode 100644 index 436b227d..00000000 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/TestExperiment.java +++ /dev/null @@ -1,16 +0,0 @@ -package nu.marginalia.tools.experiments; - -import nu.marginalia.model.crawldata.CrawledDomain; -import nu.marginalia.tools.LegacyExperiment; - -public class TestExperiment extends LegacyExperiment { - @Override - public boolean process(CrawledDomain domain) { - return true; - } - - @Override - public void onFinish() { - System.out.println("Tada!"); - } -} diff --git a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/TopicExperiment.java b/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/TopicExperiment.java deleted file mode 100644 index b617a3dc..00000000 --- a/code/tools/experiment-runner/java/nu/marginalia/tools/experiments/TopicExperiment.java +++ /dev/null @@ -1,64 +0,0 @@ -package nu.marginalia.tools.experiments; - -import com.google.inject.Inject; -import nu.marginalia.WmsaHome; -import nu.marginalia.converting.processor.classifier.topic.AdHocDetector; -import nu.marginalia.converting.processor.logic.dom.DomPruningFilter; -import nu.marginalia.language.sentence.SentenceExtractor; -import nu.marginalia.model.crawldata.CrawledDomain; -import nu.marginalia.tools.LegacyExperiment; -import org.jsoup.Jsoup; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -public class TopicExperiment extends LegacyExperiment { - - AdHocDetector detector; - - SentenceExtractor se = new SentenceExtractor(WmsaHome.getLanguageModels()); - Path filename = null; - - public void args(String... args) { - filename = Path.of(args[0]); - try { - detector = new AdHocDetector(Files.readAllLines(filename)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Inject - public TopicExperiment() { - } - - @Override - public boolean process(CrawledDomain domain) { - if (domain.doc == null) return true; - - - for (var doc : domain.doc) { - if (doc.documentBody == null) continue; - - var parsed = Jsoup.parse(doc.documentBody); - - parsed.body().filter(new DomPruningFilter(0.5)); - var dld = se.extractSentences(parsed); - - if (dld.totalNumWords() < 50) - continue; - - if (detector.testP(dld) > 0.5) { - System.out.println("match\t" + doc.url); - } - - } - - return true; - } - - @Override - public void onFinish() { - } -} diff --git a/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java b/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java index 91a14137..138a243a 100644 --- a/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java +++ b/code/tools/integration-test/test/nu/marginalia/IntegrationTest.java @@ -136,7 +136,7 @@ public class IntegrationTest {

- """, + """.getBytes(), "", ContentTags.empty() ); diff --git a/settings.gradle b/settings.gradle index 0c2b22b2..75504cd7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -234,14 +234,13 @@ dependencyResolutionManagement { library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208') library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208') + library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT') library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion) library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion) library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion) library('jte','gg.jte','jte').version('3.1.15') - library('slop', 'nu.marginalia', 'slop').version('0.0.8-SNAPSHOT') - bundle('jetty', ['jetty-server', 'jetty-util', 'jetty-servlet']) bundle('slf4j', ['slf4j.api', 'log4j.api', 'log4j.core', 'log4j.slf4j']) @@ -261,7 +260,6 @@ dependencyResolutionManagement { bundle('jooby', ['jooby-netty', 'jooby-jte']) bundle('curator', ['curator-framework', 'curator-x-discovery']) - } } }