mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-22 12:48:58 +00:00
Merge pull request #134 from MarginaliaSearch/slop-crawl-data-spike
Store crawl data in slop instead of parquet
This commit is contained in:
commit
270cab874b
@ -20,6 +20,7 @@ public enum ExecutorActor {
|
||||
EXPORT_FEEDS(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
EXPORT_SAMPLE_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
DOWNLOAD_SAMPLE(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
MIGRATE_CRAWL_DATA(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED),
|
||||
|
||||
PROC_CONVERTER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
|
||||
PROC_LOADER_SPAWNER(NodeProfile.BATCH_CRAWL, NodeProfile.MIXED, NodeProfile.SIDELOAD),
|
||||
|
@ -66,6 +66,7 @@ public class ExecutorActorControlService {
|
||||
DownloadSampleActor downloadSampleActor,
|
||||
ScrapeFeedsActor scrapeFeedsActor,
|
||||
ExecutorActorStateMachines stateMachines,
|
||||
MigrateCrawlDataActor migrateCrawlDataActor,
|
||||
ExportAllPrecessionActor exportAllPrecessionActor,
|
||||
UpdateRssActor updateRssActor) throws SQLException {
|
||||
this.messageQueueFactory = messageQueueFactory;
|
||||
@ -107,6 +108,8 @@ public class ExecutorActorControlService {
|
||||
register(ExecutorActor.SCRAPE_FEEDS, scrapeFeedsActor);
|
||||
register(ExecutorActor.UPDATE_RSS, updateRssActor);
|
||||
|
||||
register(ExecutorActor.MIGRATE_CRAWL_DATA, migrateCrawlDataActor);
|
||||
|
||||
if (serviceConfiguration.node() == 1) {
|
||||
register(ExecutorActor.PREC_EXPORT_ALL, exportAllPrecessionActor);
|
||||
}
|
||||
|
@ -0,0 +1,130 @@
|
||||
package nu.marginalia.actor.task;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.io.CrawlerOutputFile;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.process.log.WorkLogEntry;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Singleton
|
||||
public class MigrateCrawlDataActor extends RecordActorPrototype {
|
||||
|
||||
private final FileStorageService fileStorageService;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class);
|
||||
|
||||
@Inject
|
||||
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) {
|
||||
super(gson);
|
||||
|
||||
this.fileStorageService = fileStorageService;
|
||||
}
|
||||
|
||||
public record Run(long fileStorageId) implements ActorStep {}
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch (self) {
|
||||
case Run(long fileStorageId) -> {
|
||||
|
||||
FileStorage storage = fileStorageService.getStorage(FileStorageId.of(fileStorageId));
|
||||
Path root = storage.asPath();
|
||||
|
||||
Path crawlerLog = root.resolve("crawler.log");
|
||||
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog)) {
|
||||
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
|
||||
|
||||
var entry = item.getKey();
|
||||
var path = item.getValue();
|
||||
|
||||
logger.info("Converting {}", entry.id());
|
||||
|
||||
|
||||
if (path.toFile().getName().endsWith(".parquet")) {
|
||||
String domain = entry.id();
|
||||
String id = Integer.toHexString(domain.hashCode());
|
||||
|
||||
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
|
||||
|
||||
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
|
||||
|
||||
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
|
||||
}
|
||||
else {
|
||||
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log");
|
||||
Files.move(crawlerLog, oldCrawlerLog);
|
||||
Files.move(newCrawlerLog, crawlerLog);
|
||||
|
||||
yield new End();
|
||||
}
|
||||
default -> new Error();
|
||||
};
|
||||
}
|
||||
|
||||
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<Map.Entry<WorkLogEntry, Path>>> {
|
||||
|
||||
private final Path crawlRootDir;
|
||||
|
||||
CrawlDataLocator(Path crawlRootDir) {
|
||||
this.crawlRootDir = crawlRootDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Map.Entry<WorkLogEntry, Path>> apply(WorkLogEntry entry) {
|
||||
var path = getCrawledFilePath(crawlRootDir, entry.path());
|
||||
|
||||
if (!Files.exists(path)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(Map.entry(entry, path));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Path getCrawledFilePath(Path crawlDir, String fileName) {
|
||||
int sp = fileName.lastIndexOf('/');
|
||||
|
||||
// Normalize the filename
|
||||
if (sp >= 0 && sp + 1< fileName.length())
|
||||
fileName = fileName.substring(sp + 1);
|
||||
if (fileName.length() < 4)
|
||||
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
|
||||
|
||||
String sp1 = fileName.substring(0, 2);
|
||||
String sp2 = fileName.substring(2, 4);
|
||||
return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String describe() {
|
||||
return "Migrates crawl data to the latest format";
|
||||
}
|
||||
}
|
@ -45,6 +45,11 @@ public class GammaCodedSequenceArrayColumn extends AbstractObjectColumn<List<Gam
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int alignmentSize() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
public Reader openUnregistered(URI uri, int page) throws IOException {
|
||||
return new Reader(
|
||||
dataColumn.openUnregistered(uri, page),
|
||||
@ -109,6 +114,11 @@ public class GammaCodedSequenceArrayColumn extends AbstractObjectColumn<List<Gam
|
||||
dataReader.skip(toSkip);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirect() {
|
||||
return dataReader.isDirect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasRemaining() throws IOException {
|
||||
return groupsReader.hasRemaining();
|
||||
|
@ -44,6 +44,11 @@ public class GammaCodedSequenceColumn extends AbstractObjectColumn<GammaCodedSeq
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int alignmentSize() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
public Reader openUnregistered(URI uri, int page) throws IOException {
|
||||
return new Reader(
|
||||
Storage.reader(uri, this, page, false),
|
||||
@ -96,6 +101,11 @@ public class GammaCodedSequenceColumn extends AbstractObjectColumn<GammaCodedSeq
|
||||
this.indexReader = indexReader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirect() {
|
||||
return storage.isDirect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractColumn<?, ?> columnDesc() {
|
||||
return GammaCodedSequenceColumn.this;
|
||||
|
@ -45,6 +45,11 @@ public class VarintCodedSequenceArrayColumn extends AbstractObjectColumn<List<Va
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int alignmentSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public Reader openUnregistered(URI uri, int page) throws IOException {
|
||||
return new Reader(
|
||||
dataColumn.openUnregistered(uri, page),
|
||||
@ -109,6 +114,11 @@ public class VarintCodedSequenceArrayColumn extends AbstractObjectColumn<List<Va
|
||||
dataReader.skip(toSkip);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirect() {
|
||||
return dataReader.isDirect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasRemaining() throws IOException {
|
||||
return groupsReader.hasRemaining();
|
||||
|
@ -44,6 +44,11 @@ public class VarintCodedSequenceColumn extends AbstractObjectColumn<VarintCodedS
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int alignmentSize() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
public Reader openUnregistered(URI uri, int page) throws IOException {
|
||||
return new Reader(
|
||||
Storage.reader(uri, this, page, false),
|
||||
@ -101,6 +106,11 @@ public class VarintCodedSequenceColumn extends AbstractObjectColumn<VarintCodedS
|
||||
return VarintCodedSequenceColumn.this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDirect() {
|
||||
return storage.isDirect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip(long positions) throws IOException {
|
||||
for (int i = 0; i < positions; i++) {
|
||||
|
@ -12,8 +12,6 @@ import nu.marginalia.converting.sideload.SideloadSourceFactory;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.converting.writer.ConverterWriter;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||
import nu.marginalia.process.ProcessConfiguration;
|
||||
@ -228,7 +226,7 @@ public class ConverterMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
|
||||
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<SerializableCrawlDataStream>> {
|
||||
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<Path>> {
|
||||
|
||||
private final Path crawlRootDir;
|
||||
private final BatchingWorkLog batchingWorkLog;
|
||||
@ -239,7 +237,7 @@ public class ConverterMain extends ProcessMainClass {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SerializableCrawlDataStream> apply(WorkLogEntry entry) {
|
||||
public Optional<Path> apply(WorkLogEntry entry) {
|
||||
if (batchingWorkLog.isItemProcessed(entry.id())) {
|
||||
return Optional.empty();
|
||||
}
|
||||
@ -252,7 +250,7 @@ public class ConverterMain extends ProcessMainClass {
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(CrawledDomainReader.createDataStream(path));
|
||||
return Optional.of(path);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return Optional.empty();
|
||||
|
@ -19,6 +19,7 @@ import nu.marginalia.model.idx.WordFlags;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -91,7 +92,7 @@ public class DocumentProcessor {
|
||||
DocumentClass documentClass,
|
||||
DocumentDecorator documentDecorator,
|
||||
DomainLinks externalDomainLinks,
|
||||
ProcessedDocument ret) throws URISyntaxException, DisqualifiedException
|
||||
ProcessedDocument ret) throws URISyntaxException, IOException, DisqualifiedException
|
||||
{
|
||||
|
||||
var crawlerStatus = CrawlerDocumentStatus.valueOf(crawledDocument.crawlerStatus);
|
||||
@ -109,7 +110,7 @@ public class DocumentProcessor {
|
||||
|
||||
ret.state = crawlerStatusToUrlState(crawledDocument.crawlerStatus, crawledDocument.httpStatus);
|
||||
|
||||
final var plugin = findPlugin(crawledDocument);
|
||||
AbstractDocumentProcessorPlugin plugin = findPlugin(crawledDocument);
|
||||
|
||||
EdgeUrl url = new EdgeUrl(crawledDocument.url);
|
||||
LinkTexts linkTexts = anchorTextKeywords.getAnchorTextKeywords(externalDomainLinks, url);
|
||||
|
@ -14,6 +14,7 @@ import nu.marginalia.converting.writer.ConverterBatchWritableIf;
|
||||
import nu.marginalia.converting.writer.ConverterBatchWriter;
|
||||
import nu.marginalia.geoip.GeoIpDictionary;
|
||||
import nu.marginalia.geoip.sources.AsnTable;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
||||
@ -27,6 +28,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
@ -54,21 +56,24 @@ public class DomainProcessor {
|
||||
geoIpDictionary.waitReady();
|
||||
}
|
||||
|
||||
public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) {
|
||||
final int sizeHint = domain.sizeHint();
|
||||
public ConverterBatchWritableIf createWritable(Path path) throws IOException {
|
||||
|
||||
var dataStream = CrawledDomainReader.createDataStream(path);
|
||||
|
||||
final int sizeHint = dataStream.sizeHint();
|
||||
|
||||
if (sizeHint > SIDELOAD_THRESHOLD) {
|
||||
// If the file is too big, we run a processing mode that doesn't
|
||||
// require loading the entire dataset into RAM
|
||||
return sideloadProcessing(domain, sizeHint);
|
||||
return simpleProcessing(dataStream, sizeHint);
|
||||
}
|
||||
|
||||
return fullProcessing(domain);
|
||||
return fullProcessing(dataStream);
|
||||
}
|
||||
|
||||
public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
|
||||
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) {
|
||||
try {
|
||||
return new SideloadProcessing(dataStream, sizeHint, extraKeywords);
|
||||
return new SimpleProcessing(dataStream, sizeHint, extraKeywords);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to process domain sideload", ex);
|
||||
@ -76,9 +81,9 @@ public class DomainProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) {
|
||||
public SimpleProcessing simpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint) {
|
||||
try {
|
||||
return new SideloadProcessing(dataStream, sizeHint);
|
||||
return new SimpleProcessing(dataStream, sizeHint);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to process domain sideload", ex);
|
||||
@ -86,93 +91,6 @@ public class DomainProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
public class SideloadProcessing implements ConverterBatchWritableIf, SideloadSource {
|
||||
private final SerializableCrawlDataStream dataStream;
|
||||
private final ProcessedDomain domain;
|
||||
private final DocumentDecorator documentDecorator;
|
||||
private final Set<String> processedUrls = new HashSet<>();
|
||||
private final DomainLinks externalDomainLinks;
|
||||
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
||||
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
|
||||
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
|
||||
);
|
||||
|
||||
SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException {
|
||||
this(dataStream, sizeHint, List.of());
|
||||
}
|
||||
|
||||
SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) throws IOException {
|
||||
this.dataStream = dataStream;
|
||||
|
||||
if (!dataStream.hasNext() || !(dataStream.next() instanceof CrawledDomain crawledDomain))
|
||||
{
|
||||
throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName());
|
||||
}
|
||||
|
||||
domain = new ProcessedDomain();
|
||||
domain.sizeloadSizeAdvice = sizeHint == 0 ? 10_000 : sizeHint;
|
||||
|
||||
documentDecorator = new DocumentDecorator();
|
||||
documentDecorator.addTerms(extraKeywords);
|
||||
|
||||
processDomain(crawledDomain, domain, documentDecorator);
|
||||
|
||||
externalDomainLinks = anchorTagsSource.getAnchorTags(domain.domain);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessedDomain getDomain() {
|
||||
return domain;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||
return iteratorFactory.create((taskConsumer) -> {
|
||||
while (dataStream.hasNext())
|
||||
{
|
||||
if (!(dataStream.next() instanceof CrawledDocument doc))
|
||||
continue;
|
||||
if (doc.url == null || !processedUrls.add(doc.url))
|
||||
continue;
|
||||
|
||||
|
||||
taskConsumer.accept(() -> {
|
||||
var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator);
|
||||
|
||||
synchronized (deduplicator) {
|
||||
deduplicator.markIfDuplicate(processedDoc);
|
||||
}
|
||||
|
||||
if (processedDoc.isProcessedFully()) {
|
||||
// This is a bit sketchy, but we need to set the size and topology to something
|
||||
processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology(
|
||||
10_000, externalDomainLinks.countForUrl(processedDoc.url));
|
||||
}
|
||||
|
||||
return processedDoc;
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ConverterBatchWriter writer) throws IOException {
|
||||
writer.writeSideloadSource(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String id() {
|
||||
return domain.domain.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
dataStream.close();
|
||||
deduplicator.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Nullable
|
||||
public ProcessedDomain fullProcessing(SerializableCrawlDataStream dataStream) {
|
||||
try {
|
||||
@ -204,7 +122,7 @@ public class DomainProcessor {
|
||||
continue;
|
||||
if (doc.url == null)
|
||||
continue;
|
||||
if (doc.documentBody.isBlank())
|
||||
if (doc.documentBodyBytes.length == 0)
|
||||
continue;
|
||||
if (!processedUrls.add(doc.url))
|
||||
continue;
|
||||
@ -231,6 +149,90 @@ public class DomainProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
/** The simple processing track processes documents individually, and does not perform any domain-level analysis.
|
||||
* This is needed to process extremely large domains, which would otherwise eat up too much RAM.
|
||||
*/
|
||||
public class SimpleProcessing implements ConverterBatchWritableIf, SideloadSource {
|
||||
private final SerializableCrawlDataStream dataStream;
|
||||
private final ProcessedDomain domain;
|
||||
private final DocumentDecorator documentDecorator;
|
||||
private final Set<String> processedUrls = new HashSet<>();
|
||||
private final DomainLinks externalDomainLinks;
|
||||
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
||||
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
|
||||
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
|
||||
);
|
||||
|
||||
SimpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException {
|
||||
this(dataStream, sizeHint, List.of());
|
||||
}
|
||||
|
||||
SimpleProcessing(SerializableCrawlDataStream dataStream, int sizeHint, Collection<String> extraKeywords) throws IOException {
|
||||
this.dataStream = dataStream;
|
||||
|
||||
if (!dataStream.hasNext() || !(dataStream.next() instanceof CrawledDomain crawledDomain))
|
||||
{
|
||||
throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName());
|
||||
}
|
||||
|
||||
domain = new ProcessedDomain();
|
||||
domain.sizeloadSizeAdvice = sizeHint == 0 ? 10_000 : sizeHint;
|
||||
|
||||
documentDecorator = new DocumentDecorator();
|
||||
documentDecorator.addTerms(extraKeywords);
|
||||
|
||||
processDomain(crawledDomain, domain, documentDecorator);
|
||||
|
||||
externalDomainLinks = anchorTagsSource.getAnchorTags(domain.domain);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessedDomain getDomain() {
|
||||
return domain;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||
return dataStream.map((next) -> {
|
||||
if (!(next instanceof CrawledDocument doc))
|
||||
return Optional.empty();
|
||||
|
||||
if (doc.url == null || !processedUrls.add(doc.url))
|
||||
return Optional.empty();
|
||||
|
||||
var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator);
|
||||
|
||||
synchronized (deduplicator) {
|
||||
deduplicator.markIfDuplicate(processedDoc);
|
||||
}
|
||||
|
||||
if (processedDoc.isProcessedFully()) {
|
||||
// This is a bit sketchy, but we need to set the size and topology to something
|
||||
processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology(
|
||||
10_000, externalDomainLinks.countForUrl(processedDoc.url));
|
||||
}
|
||||
|
||||
return Optional.of(processedDoc);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ConverterBatchWriter writer) throws IOException {
|
||||
writer.writeSideloadSource(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String id() {
|
||||
return domain.domain.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
dataStream.close();
|
||||
deduplicator.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void processDomain(CrawledDomain crawledDomain,
|
||||
ProcessedDomain domain,
|
||||
DocumentDecorator decorator)
|
||||
|
@ -24,7 +24,7 @@ public class DocumentValuator {
|
||||
double scriptPenalty = getScriptPenalty(parsedDocument);
|
||||
double chatGptPenalty = getChatGptContentFarmPenalty(parsedDocument);
|
||||
|
||||
int rawLength = crawledDocument.documentBody.length();
|
||||
int rawLength = crawledDocument.documentBodyBytes.length;
|
||||
|
||||
if (textLength == 0) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LENGTH);
|
||||
|
@ -218,7 +218,10 @@ public class FeatureExtractor {
|
||||
}
|
||||
}
|
||||
|
||||
if (features.contains(HtmlFeature.JS) && adblockSimulator.hasAds(doc.clone())) {
|
||||
if (features.contains(HtmlFeature.JS)
|
||||
// remove while disabled to get rid of expensive clone() call:
|
||||
// adblockSimulator.hasAds(doc.clone())
|
||||
) {
|
||||
features.add(HtmlFeature.ADVERTISEMENT);
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import nu.marginalia.model.html.HtmlStandard;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -25,7 +26,7 @@ public abstract class AbstractDocumentProcessorPlugin {
|
||||
this.languageFilter = languageFilter;
|
||||
}
|
||||
|
||||
public abstract DetailsWithWords createDetails(CrawledDocument crawledDocument, LinkTexts linkTexts, DocumentClass documentClass) throws DisqualifiedException, URISyntaxException;
|
||||
public abstract DetailsWithWords createDetails(CrawledDocument crawledDocument, LinkTexts linkTexts, DocumentClass documentClass) throws DisqualifiedException, URISyntaxException, IOException;
|
||||
public abstract boolean isApplicable(CrawledDocument doc);
|
||||
|
||||
protected void checkDocumentLanguage(DocumentLanguageData dld) throws DisqualifiedException {
|
||||
@ -86,6 +87,7 @@ public abstract class AbstractDocumentProcessorPlugin {
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public MetaTagsBuilder addPubDate(PubDate pubDate) {
|
||||
|
||||
if (pubDate.year() > 1900) {
|
||||
|
@ -6,6 +6,7 @@ import nu.marginalia.converting.model.DisqualifiedException;
|
||||
import nu.marginalia.converting.model.DocumentHeaders;
|
||||
import nu.marginalia.converting.model.GeneratorType;
|
||||
import nu.marginalia.converting.model.ProcessedDocumentDetails;
|
||||
import nu.marginalia.converting.processor.AcceptableAds;
|
||||
import nu.marginalia.converting.processor.DocumentClass;
|
||||
import nu.marginalia.converting.processor.MetaRobotsTag;
|
||||
import nu.marginalia.converting.processor.logic.*;
|
||||
@ -32,11 +33,11 @@ import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import nu.marginalia.model.html.HtmlStandard;
|
||||
import nu.marginalia.model.idx.DocumentFlags;
|
||||
import nu.marginalia.model.idx.DocumentMetadata;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
@ -51,7 +52,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
private final double minDocumentQuality;
|
||||
|
||||
private final FeatureExtractor featureExtractor;
|
||||
private final TitleExtractor titleExtractor;
|
||||
private final DocumentKeywordExtractor keywordExtractor;
|
||||
private final PubDateSniffer pubDateSniffer;
|
||||
|
||||
@ -74,7 +74,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
@Named("min-document-quality") Double minDocumentQuality,
|
||||
LanguageFilter languageFilter,
|
||||
FeatureExtractor featureExtractor,
|
||||
TitleExtractor titleExtractor,
|
||||
DocumentKeywordExtractor keywordExtractor,
|
||||
PubDateSniffer pubDateSniffer,
|
||||
DocumentLengthLogic documentLengthLogic,
|
||||
@ -89,7 +88,6 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
this.minDocumentQuality = minDocumentQuality;
|
||||
this.featureExtractor = featureExtractor;
|
||||
|
||||
this.titleExtractor = titleExtractor;
|
||||
this.keywordExtractor = keywordExtractor;
|
||||
this.pubDateSniffer = pubDateSniffer;
|
||||
this.metaRobotsTag = metaRobotsTag;
|
||||
@ -108,19 +106,17 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
public DetailsWithWords createDetails(CrawledDocument crawledDocument,
|
||||
LinkTexts linkTexts,
|
||||
DocumentClass documentClass)
|
||||
throws DisqualifiedException, URISyntaxException {
|
||||
throws DisqualifiedException, URISyntaxException, IOException {
|
||||
|
||||
String documentBody = crawledDocument.documentBody;
|
||||
|
||||
if (languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||
if (languageFilter.isBlockedUnicodeRange(crawledDocument.documentBody(512))) {
|
||||
throw new DisqualifiedException(DisqualificationReason.LANGUAGE);
|
||||
}
|
||||
|
||||
if (documentBody.length() > MAX_DOCUMENT_LENGTH_BYTES) { // 128kb
|
||||
documentBody = documentBody.substring(0, MAX_DOCUMENT_LENGTH_BYTES);
|
||||
}
|
||||
Document doc = crawledDocument.parseBody();
|
||||
|
||||
Document doc = Jsoup.parse(documentBody);
|
||||
if (AcceptableAds.hasAcceptableAdsTag(doc)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.ACCEPTABLE_ADS);
|
||||
}
|
||||
|
||||
if (!metaRobotsTag.allowIndexingByMetaTag(doc)) {
|
||||
throw new DisqualifiedException(DisqualificationReason.FORBIDDEN);
|
||||
@ -138,32 +134,33 @@ public class HtmlDocumentProcessorPlugin extends AbstractDocumentProcessorPlugin
|
||||
}
|
||||
|
||||
var prunedDoc = specialization.prune(doc);
|
||||
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
|
||||
|
||||
checkDocumentLanguage(dld);
|
||||
|
||||
var ret = new ProcessedDocumentDetails();
|
||||
|
||||
final int length = getLength(doc);
|
||||
final HtmlStandard standard = getHtmlStandard(doc);
|
||||
final double quality = documentValuator.getQuality(crawledDocument, standard, doc, length);
|
||||
|
||||
if (isDisqualified(documentClass, url, quality, doc.title())) {
|
||||
throw new DisqualifiedException(DisqualificationReason.QUALITY);
|
||||
}
|
||||
|
||||
DocumentLanguageData dld = sentenceExtractorProvider.get().extractSentences(prunedDoc);
|
||||
|
||||
checkDocumentLanguage(dld);
|
||||
documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier());
|
||||
|
||||
var ret = new ProcessedDocumentDetails();
|
||||
|
||||
ret.length = length;
|
||||
ret.standard = standard;
|
||||
ret.title = specialization.getTitle(doc, dld, crawledDocument.url);
|
||||
|
||||
documentLengthLogic.validateLength(dld, specialization.lengthModifier() * documentClass.lengthLimitModifier());
|
||||
|
||||
final Set<HtmlFeature> features = featureExtractor.getFeatures(url, doc, documentHeaders, dld);
|
||||
|
||||
ret.features = features;
|
||||
ret.quality = documentValuator.adjustQuality(quality, features);
|
||||
ret.hashCode = dld.localitySensitiveHashCode();
|
||||
|
||||
if (isDisqualified(documentClass, url, quality, ret.title)) {
|
||||
throw new DisqualifiedException(DisqualificationReason.QUALITY);
|
||||
}
|
||||
|
||||
PubDate pubDate = pubDateSniffer.getPubDate(documentHeaders, url, doc, standard, true);
|
||||
|
||||
EnumSet<DocumentFlags> documentFlags = documentFlags(features, generatorParts.type());
|
||||
|
@ -71,7 +71,7 @@ public class PlainTextDocumentProcessorPlugin extends AbstractDocumentProcessorP
|
||||
DocumentClass documentClass)
|
||||
throws DisqualifiedException, URISyntaxException {
|
||||
|
||||
String documentBody = crawledDocument.documentBody;
|
||||
String documentBody = crawledDocument.documentBody();
|
||||
|
||||
if (languageFilter.isBlockedUnicodeRange(documentBody)) {
|
||||
throw new DisqualifiedException(DisqualifiedException.DisqualificationReason.LANGUAGE);
|
||||
|
@ -19,6 +19,7 @@ import nu.marginalia.model.idx.DocumentMetadata;
|
||||
import nu.marginalia.model.idx.WordFlags;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
@ -50,7 +51,7 @@ public class SideloaderProcessing {
|
||||
"OK",
|
||||
"NP",
|
||||
"",
|
||||
body,
|
||||
body.getBytes(StandardCharsets.UTF_8),
|
||||
false,
|
||||
null,
|
||||
null
|
||||
|
@ -98,7 +98,7 @@ public class ConvertingIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testMemexMarginaliaNuSideloadProcessing() throws IOException {
|
||||
var ret = domainProcessor.sideloadProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()), 100);
|
||||
var ret = domainProcessor.simpleProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()), 100);
|
||||
assertNotNull(ret);
|
||||
assertEquals("memex.marginalia.nu", ret.id());
|
||||
|
||||
@ -146,7 +146,7 @@ public class ConvertingIntegrationTest {
|
||||
"OK",
|
||||
"",
|
||||
"",
|
||||
readClassPathFile(p.toString()),
|
||||
readClassPathFile(p.toString()).getBytes(),
|
||||
false,
|
||||
null,
|
||||
null
|
||||
|
@ -2,11 +2,16 @@ package nu.marginalia.contenttype;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.IllegalCharsetNameException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/** Content type and charset of a document
|
||||
* @param contentType The content type, e.g. "text/html"
|
||||
* @param charset The charset, e.g. "UTF-8"
|
||||
*/
|
||||
public record ContentType(String contentType, String charset) {
|
||||
|
||||
public static ContentType parse(String contentTypeHeader) {
|
||||
if (contentTypeHeader == null || contentTypeHeader.isBlank())
|
||||
return new ContentType(null, null);
|
||||
@ -15,9 +20,31 @@ public record ContentType(String contentType, String charset) {
|
||||
String contentType = parts[0].trim();
|
||||
String charset = parts.length > 1 ? parts[1].trim() : "UTF-8";
|
||||
|
||||
if (charset.toLowerCase().startsWith("charset=")) {
|
||||
charset = charset.substring("charset=".length());
|
||||
}
|
||||
|
||||
return new ContentType(contentType, charset);
|
||||
}
|
||||
|
||||
/** Best effort method for turning the provided charset string into a Java charset method,
|
||||
* with some guesswork-heuristics for when it doesn't work
|
||||
*/
|
||||
public Charset asCharset() {
|
||||
try {
|
||||
if (Charset.isSupported(charset)) {
|
||||
return Charset.forName(charset);
|
||||
} else if (charset.equalsIgnoreCase("macintosh-latin")) {
|
||||
return StandardCharsets.ISO_8859_1;
|
||||
} else {
|
||||
return StandardCharsets.UTF_8;
|
||||
}
|
||||
}
|
||||
catch (IllegalCharsetNameException ex) { // thrown by Charset.isSupported()
|
||||
return StandardCharsets.UTF_8;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean is(String contentType) {
|
||||
return this.contentType.equalsIgnoreCase(contentType);
|
||||
}
|
||||
|
@ -1,9 +1,12 @@
|
||||
package nu.marginalia.contenttype;
|
||||
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.IllegalCharsetNameException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.charset.UnsupportedCharsetException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@ -23,24 +26,25 @@ public class DocumentBodyToString {
|
||||
return new String(data, charset);
|
||||
}
|
||||
|
||||
public static Document getParsedData(ContentType type, byte[] data, String url) throws IOException {
|
||||
final Charset charset;
|
||||
|
||||
if (type.charset() == null || type.charset().isBlank()) {
|
||||
charset = StandardCharsets.UTF_8;
|
||||
} else {
|
||||
charset = charsetMap.computeIfAbsent(type, DocumentBodyToString::computeCharset);
|
||||
}
|
||||
|
||||
ByteArrayInputStream bais = new ByteArrayInputStream(data);
|
||||
|
||||
return Jsoup.parse(bais, charset.name(), url);
|
||||
}
|
||||
|
||||
private static Charset computeCharset(ContentType type) {
|
||||
try {
|
||||
if (type.charset() == null || type.charset().isBlank())
|
||||
return StandardCharsets.UTF_8;
|
||||
else {
|
||||
return Charset.forName(type.charset());
|
||||
}
|
||||
}
|
||||
catch (IllegalCharsetNameException ex) {
|
||||
// Fall back to UTF-8 if we don't understand what this is. It's *probably* fine? Maybe?
|
||||
if (type.charset() == null || type.charset().isBlank())
|
||||
return StandardCharsets.UTF_8;
|
||||
}
|
||||
catch (UnsupportedCharsetException ex) {
|
||||
// This is usually like Macintosh Latin
|
||||
// (https://en.wikipedia.org/wiki/Macintosh_Latin_encoding)
|
||||
//
|
||||
// It's close enough to 8859-1 to serve
|
||||
return StandardCharsets.ISO_8859_1;
|
||||
else {
|
||||
return type.asCharset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,16 +23,18 @@ import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.io.CrawlerOutputFile;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileWriter;
|
||||
import nu.marginalia.process.ProcessConfiguration;
|
||||
import nu.marginalia.process.ProcessConfigurationModule;
|
||||
import nu.marginalia.process.ProcessMainClass;
|
||||
import nu.marginalia.process.control.ProcessHeartbeatImpl;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.process.log.WorkLogEntry;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -42,13 +44,11 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.security.Security;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static nu.marginalia.mqapi.ProcessInboxNames.CRAWLER_INBOX;
|
||||
|
||||
@ -182,6 +182,8 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
// Assign any domains with node_affinity=0 to this node, and then fetch all domains assigned to this node
|
||||
// to be crawled.
|
||||
|
||||
performMigration(outputDir);
|
||||
|
||||
try (var conn = dataSource.getConnection()) {
|
||||
try (var assignFreeDomains = conn.prepareStatement(
|
||||
"""
|
||||
@ -291,7 +293,6 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void runForSingleDomain(String targetDomainName, FileStorageId fileStorageId) throws Exception {
|
||||
runForSingleDomain(targetDomainName, fileStorageService.getStorage(fileStorageId).asPath());
|
||||
}
|
||||
@ -353,7 +354,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
|
||||
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
|
||||
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
|
||||
Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain);
|
||||
Path slopFile = CrawlerOutputFile.createSlopPath(outputDir, id, domain);
|
||||
|
||||
// Move the WARC file to a temp file if it exists, so we can resume the crawl using the old data
|
||||
// while writing to the same file name as before
|
||||
@ -387,15 +388,15 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
reference.delete();
|
||||
|
||||
// Convert the WARC file to Parquet
|
||||
CrawledDocumentParquetRecordFileWriter
|
||||
.convertWarc(domain, userAgent, newWarcFile, parquetFile);
|
||||
SlopCrawlDataRecord
|
||||
.convertWarc(domain, userAgent, newWarcFile, slopFile);
|
||||
|
||||
// Optionally archive the WARC file if full retention is enabled,
|
||||
// otherwise delete it:
|
||||
warcArchiver.consumeWarc(newWarcFile, domain);
|
||||
|
||||
// Mark the domain as finished in the work log
|
||||
workLog.setJobToFinished(domain, parquetFile.toString(), size);
|
||||
workLog.setJobToFinished(domain, slopFile.toString(), size);
|
||||
|
||||
// Update the progress bar
|
||||
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
|
||||
@ -480,4 +481,93 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Data migration logic
|
||||
|
||||
private void performMigration(Path root) throws IOException {
|
||||
Path crawlerLog = root.resolve("crawler.log");
|
||||
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
|
||||
|
||||
|
||||
int finishedTasks = 0;
|
||||
int totalTasks;
|
||||
try (var oldLog = new WorkLog(crawlerLog)) {
|
||||
totalTasks = oldLog.countFinishedJobs();
|
||||
}
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog);
|
||||
var migrationHeartbeat = heartbeat.createAdHocTaskHeartbeat("MIGRATING")) {
|
||||
|
||||
|
||||
|
||||
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
|
||||
|
||||
var entry = item.getKey();
|
||||
var path = item.getValue();
|
||||
|
||||
if (path.toFile().getName().endsWith(".parquet")) {
|
||||
logger.info("Converting {}", entry.id());
|
||||
|
||||
String domain = entry.id();
|
||||
String id = Integer.toHexString(domain.hashCode());
|
||||
|
||||
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
|
||||
|
||||
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
|
||||
|
||||
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
|
||||
}
|
||||
else {
|
||||
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
|
||||
}
|
||||
|
||||
migrationHeartbeat.progress("Parquet To Slop", ++finishedTasks, totalTasks);
|
||||
}
|
||||
}
|
||||
|
||||
Path oldCrawlerLog = Files.createTempFile(root, "crawler-", ".migrate.old.log");
|
||||
Files.move(crawlerLog, oldCrawlerLog, StandardCopyOption.REPLACE_EXISTING);
|
||||
Files.move(newCrawlerLog, crawlerLog);
|
||||
}
|
||||
|
||||
|
||||
private static class CrawlDataLocator implements Function<WorkLogEntry, Optional<Map.Entry<WorkLogEntry, Path>>> {
|
||||
|
||||
private final Path crawlRootDir;
|
||||
|
||||
CrawlDataLocator(Path crawlRootDir) {
|
||||
this.crawlRootDir = crawlRootDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Map.Entry<WorkLogEntry, Path>> apply(WorkLogEntry entry) {
|
||||
var path = getCrawledFilePath(crawlRootDir, entry.path());
|
||||
|
||||
if (!Files.exists(path)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(Map.entry(entry, path));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Path getCrawledFilePath(Path crawlDir, String fileName) {
|
||||
int sp = fileName.lastIndexOf('/');
|
||||
|
||||
// Normalize the filename
|
||||
if (sp >= 0 && sp + 1< fileName.length())
|
||||
fileName = fileName.substring(sp + 1);
|
||||
if (fileName.length() < 4)
|
||||
fileName = Strings.repeat("0", 4 - fileName.length()) + fileName;
|
||||
|
||||
String sp1 = fileName.substring(0, 2);
|
||||
String sp2 = fileName.substring(2, 4);
|
||||
return crawlDir.resolve(sp1).resolve(sp2).resolve(fileName);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -339,14 +339,14 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
case "sitemapindex" -> {
|
||||
List<String> references = new ArrayList<>();
|
||||
for (var locTag : parsedSitemap.getElementsByTag("loc")) {
|
||||
references.add(URLDecoder.decode(locTag.text().trim(), StandardCharsets.UTF_8));
|
||||
references.add(locTag.text().trim());
|
||||
}
|
||||
yield new SitemapResult.SitemapReferences(Collections.unmodifiableList(references));
|
||||
}
|
||||
case "urlset" -> {
|
||||
List<String> urls = new ArrayList<>();
|
||||
for (var locTag : parsedSitemap.select("url > loc")) {
|
||||
urls.add(URLDecoder.decode(locTag.text().trim(), StandardCharsets.UTF_8));
|
||||
urls.add(locTag.text().trim());
|
||||
}
|
||||
yield new SitemapResult.SitemapUrls(Collections.unmodifiableList(urls));
|
||||
}
|
||||
|
@ -214,7 +214,7 @@ public class WarcRecorder implements AutoCloseable {
|
||||
writer.write(item);
|
||||
}
|
||||
|
||||
private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, String documentBody, @Nullable String headers, ContentTags contentTags) {
|
||||
private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags contentTags) {
|
||||
try {
|
||||
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
|
||||
WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder();
|
||||
@ -224,7 +224,7 @@ public class WarcRecorder implements AutoCloseable {
|
||||
if (documentBody == null) {
|
||||
bytes = new byte[0];
|
||||
} else {
|
||||
bytes = documentBody.getBytes();
|
||||
bytes = documentBody;
|
||||
}
|
||||
|
||||
// Create a synthesis of custom headers and the original headers
|
||||
@ -295,7 +295,7 @@ public class WarcRecorder implements AutoCloseable {
|
||||
* an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this
|
||||
* scenario we want to record the data as it was in the previous crawl, but not re-fetch it.
|
||||
*/
|
||||
public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, String documentBody, @Nullable String headers, ContentTags ctags) {
|
||||
public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, byte[] documentBody, @Nullable String headers, ContentTags ctags) {
|
||||
saveOldResponse(url, contentType, statusCode, documentBody, headers, ctags);
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ public class CrawlDataReference implements AutoCloseable {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static boolean isContentBodySame(String one, String other) {
|
||||
public static boolean isContentBodySame(byte[] one, byte[] other) {
|
||||
|
||||
final long contentHashOne = contentHash(one);
|
||||
final long contentHashOther = contentHash(other);
|
||||
@ -66,7 +66,7 @@ public class CrawlDataReference implements AutoCloseable {
|
||||
return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4;
|
||||
}
|
||||
|
||||
private static long contentHash(String content) {
|
||||
private static long contentHash(byte[] content) {
|
||||
EasyLSH hash = new EasyLSH();
|
||||
int next = 0;
|
||||
|
||||
@ -74,8 +74,8 @@ public class CrawlDataReference implements AutoCloseable {
|
||||
|
||||
// In a naive best-effort fashion, extract the text
|
||||
// content of the document and feed it into the LSH
|
||||
for (int i = 0; i < content.length(); i++) {
|
||||
char c = content.charAt(i);
|
||||
for (byte b : content) {
|
||||
char c = (char) b;
|
||||
if (c == '<') {
|
||||
isInTag = true;
|
||||
} else if (c == '>') {
|
||||
|
@ -378,14 +378,14 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||
else if (fetchedDoc instanceof HttpFetchResult.Result304Raw && reference.doc() != null) {
|
||||
var doc = reference.doc();
|
||||
|
||||
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody, doc.headers, contentTags);
|
||||
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBodyBytes, doc.headers, contentTags);
|
||||
|
||||
fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url,
|
||||
new ContentType(doc.contentType, "UTF-8"),
|
||||
doc.documentBody);
|
||||
doc.documentBodyBytes);
|
||||
|
||||
if (doc.documentBody != null) {
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
if (doc.documentBodyBytes != null) {
|
||||
var parsed = doc.parseBody();
|
||||
|
||||
crawlFrontier.enqueueLinksFromDocument(top, parsed);
|
||||
crawlFrontier.addVisited(top);
|
||||
|
@ -1,6 +1,5 @@
|
||||
package nu.marginalia.crawl.retreival.revisit;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import crawlercommons.robots.SimpleRobotRules;
|
||||
import nu.marginalia.crawl.fetcher.ContentTags;
|
||||
import nu.marginalia.crawl.fetcher.warc.WarcRecorder;
|
||||
@ -11,7 +10,8 @@ import nu.marginalia.crawl.retreival.DomainCrawlFrontier;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.body.HttpFetchResult;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** This class encapsulates the logic for re-visiting a domain that has already been crawled.
|
||||
* We may use information from the previous crawl to inform the next crawl, specifically the
|
||||
@ -70,7 +70,7 @@ public class CrawlerRevisitor {
|
||||
// unlikely to produce anything meaningful for us.
|
||||
if (doc.httpStatus != 200)
|
||||
continue;
|
||||
if (Strings.isNullOrEmpty(doc.documentBody))
|
||||
if (!doc.hasBody())
|
||||
continue;
|
||||
|
||||
if (!crawlFrontier.filterLink(url))
|
||||
@ -117,14 +117,19 @@ public class CrawlerRevisitor {
|
||||
// fashion to make sure we eventually catch changes over time
|
||||
// and ensure we discover new links
|
||||
|
||||
// Hoover up any links from the document
|
||||
crawlFrontier.enqueueLinksFromDocument(url, Jsoup.parse(doc.documentBody));
|
||||
try {
|
||||
// Hoover up any links from the document
|
||||
crawlFrontier.enqueueLinksFromDocument(url, doc.parseBody());
|
||||
|
||||
}
|
||||
catch (IOException ex) {
|
||||
//
|
||||
}
|
||||
// Add a WARC record so we don't repeat this
|
||||
warcRecorder.writeReferenceCopy(url,
|
||||
doc.contentType,
|
||||
doc.httpStatus,
|
||||
doc.documentBody,
|
||||
doc.documentBodyBytes,
|
||||
doc.headers,
|
||||
new ContentTags(doc.etagMaybe, doc.lastModifiedMaybe)
|
||||
);
|
||||
|
@ -2,8 +2,6 @@ package nu.marginalia.crawl.retreival.revisit;
|
||||
|
||||
import nu.marginalia.crawl.fetcher.ContentTags;
|
||||
import nu.marginalia.crawl.retreival.CrawlDataReference;
|
||||
import nu.marginalia.model.body.DocumentBodyExtractor;
|
||||
import nu.marginalia.model.body.DocumentBodyResult;
|
||||
import nu.marginalia.model.body.HttpFetchResult;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
|
||||
@ -35,21 +33,17 @@ public record DocumentWithReference(
|
||||
return false;
|
||||
if (doc == null)
|
||||
return false;
|
||||
if (doc.documentBody == null)
|
||||
if (doc.documentBodyBytes.length == 0)
|
||||
return false;
|
||||
|
||||
if (!(DocumentBodyExtractor.asString(resultOk) instanceof DocumentBodyResult.Ok<String> bodyOk)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return CrawlDataReference.isContentBodySame(doc.documentBody, bodyOk.body());
|
||||
return CrawlDataReference.isContentBodySame(doc.documentBodyBytes, resultOk.bytesRaw());
|
||||
}
|
||||
|
||||
public ContentTags getContentTags() {
|
||||
if (null == doc)
|
||||
return ContentTags.empty();
|
||||
|
||||
if (doc.documentBody == null || doc.httpStatus != 200)
|
||||
if (doc.documentBodyBytes.length == 0 || doc.httpStatus != 200)
|
||||
return ContentTags.empty();
|
||||
|
||||
String lastmod = doc.getLastModified();
|
||||
|
@ -32,6 +32,7 @@ dependencies {
|
||||
implementation libs.bundles.parquet
|
||||
|
||||
implementation libs.trove
|
||||
implementation libs.slop
|
||||
implementation libs.jwarc
|
||||
implementation libs.gson
|
||||
implementation libs.commons.io
|
||||
|
@ -1,6 +1,7 @@
|
||||
package nu.marginalia.io;
|
||||
|
||||
import nu.marginalia.io.crawldata.format.ParquetSerializableCrawlDataStream;
|
||||
import nu.marginalia.io.crawldata.format.SlopSerializableCrawlDataStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -24,7 +25,16 @@ public class CrawledDomainReader {
|
||||
logger.error("Error reading domain data from " + fullPath, ex);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else if (fileName.endsWith(".slop.zip")) {
|
||||
try {
|
||||
return new SlopSerializableCrawlDataStream(fullPath);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Error reading domain data from " + fullPath, ex);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.error("Unknown file type: {}", fullPath);
|
||||
return SerializableCrawlDataStream.empty();
|
||||
}
|
||||
|
@ -47,6 +47,20 @@ public class CrawlerOutputFile {
|
||||
}
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet");
|
||||
}
|
||||
|
||||
public static Path createSlopPath(Path basePath, String id, String domain) throws IOException {
|
||||
id = padId(id);
|
||||
|
||||
String first = id.substring(0, 2);
|
||||
String second = id.substring(2, 4);
|
||||
|
||||
Path destDir = basePath.resolve(first).resolve(second);
|
||||
if (!Files.exists(destDir)) {
|
||||
Files.createDirectories(destDir);
|
||||
}
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".slop.zip");
|
||||
}
|
||||
|
||||
public static Path getParquetPath(Path basePath, String id, String domain) {
|
||||
id = padId(id);
|
||||
|
||||
@ -56,6 +70,7 @@ public class CrawlerOutputFile {
|
||||
Path destDir = basePath.resolve(first).resolve(second);
|
||||
return destDir.resolve(id + "-" + filesystemSafeName(domain) + ".parquet");
|
||||
}
|
||||
|
||||
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) {
|
||||
id = padId(id);
|
||||
|
||||
|
@ -4,12 +4,16 @@ import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.model.crawldata.SerializableCrawlData;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
/** Closable iterator exceptional over serialized crawl data
|
||||
* The data may appear in any order, and the iterator must be closed.
|
||||
@ -17,7 +21,7 @@ import java.util.List;
|
||||
* @see CrawledDomainReader
|
||||
* */
|
||||
public interface SerializableCrawlDataStream extends AutoCloseable {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(SerializableCrawlDataStream.class);
|
||||
|
||||
SerializableCrawlData next() throws IOException;
|
||||
|
||||
@ -30,6 +34,41 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
|
||||
@Nullable
|
||||
default Path path() { return null; }
|
||||
|
||||
default <T> Iterator<T> map(Function<SerializableCrawlData, Optional<T>> mapper) {
|
||||
return new Iterator<>() {
|
||||
T next = null;
|
||||
|
||||
public boolean hasNext() {
|
||||
if (next != null)
|
||||
return true;
|
||||
try {
|
||||
while (SerializableCrawlDataStream.this.hasNext()) {
|
||||
var val = mapper.apply(SerializableCrawlDataStream.this.next());
|
||||
if (val.isPresent()) {
|
||||
next = val.get();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
logger.error("Error during stream", ex);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public T next() {
|
||||
if (next == null && !hasNext())
|
||||
throw new IllegalStateException("No more data to read");
|
||||
|
||||
T ret = next;
|
||||
next = null;
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
/** For tests */
|
||||
default List<SerializableCrawlData> asList() throws IOException {
|
||||
List<SerializableCrawlData> data = new ArrayList<>();
|
||||
@ -81,7 +120,6 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
|
||||
public boolean hasNext() { return iterator.hasNext(); }
|
||||
public void close() {}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package nu.marginalia.io.crawldata.format;
|
||||
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.hash.MurmurHash3_128;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
@ -18,6 +17,7 @@ import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Deprecated
|
||||
public class ParquetSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ParquetSerializableCrawlDataStream.class);
|
||||
|
||||
@ -124,9 +124,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
|
||||
}
|
||||
else if (nextRecord.body != null) {
|
||||
try {
|
||||
bodyString = DocumentBodyToString.getStringData(
|
||||
ContentType.parse(nextRecord.contentType),
|
||||
nextRecord.body);
|
||||
ContentType.parse(nextRecord.contentType);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Failed to convert body to string", ex);
|
||||
status = CrawlerDocumentStatus.BAD_CHARSET;
|
||||
@ -147,7 +145,7 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
|
||||
status.toString(),
|
||||
"",
|
||||
nextRecord.headers,
|
||||
bodyString,
|
||||
nextRecord.body,
|
||||
// this field isn't actually used, maybe we can skip calculating it?
|
||||
nextRecord.cookies,
|
||||
lastModified,
|
||||
|
@ -0,0 +1,181 @@
|
||||
package nu.marginalia.io.crawldata.format;
|
||||
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawldata.*;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
public class SlopSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SlopSerializableCrawlDataStream.class);
|
||||
|
||||
private final SlopCrawlDataRecord.FilteringReader reader;
|
||||
|
||||
// Holds the next value. This is not a buffer, but to deal with the fact that
|
||||
// we sometimes generate multiple SerializableCrawlData records for a single input
|
||||
private final Deque<SerializableCrawlData> nextQ = new ArrayDeque<>();
|
||||
|
||||
private boolean wroteDomainRecord = false;
|
||||
private final Path path;
|
||||
|
||||
public SlopSerializableCrawlDataStream(Path file) throws IOException {
|
||||
path = file;
|
||||
reader = new SlopCrawlDataRecord.FilteringReader(file) {
|
||||
@Override
|
||||
public boolean filter(String url, int status, String contentType) {
|
||||
String ctLc = contentType.toLowerCase();
|
||||
|
||||
if (ctLc.startsWith("text/"))
|
||||
return true;
|
||||
else if (ctLc.startsWith("x-marginalia/"))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path path() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public int sizeHint() {
|
||||
// Only calculate size hint for large files
|
||||
// (the reason we calculate them in the first place is to assess whether it is large
|
||||
// because it has many documents, or because it is a small number of large documents)
|
||||
try {
|
||||
if (Files.size(path) > 10_000_000) {
|
||||
return SlopCrawlDataRecord.countGoodStatusCodes(path);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// suppressed
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
try {
|
||||
while (reader.hasRemaining() && nextQ.isEmpty()) {
|
||||
try {
|
||||
var nextRecord = reader.get();
|
||||
if (!wroteDomainRecord) {
|
||||
createDomainRecord(nextRecord);
|
||||
wroteDomainRecord = true;
|
||||
}
|
||||
|
||||
createDocumentRecord(nextRecord);
|
||||
} catch (Exception ex) {
|
||||
logger.error("Failed to create document record", ex);
|
||||
}
|
||||
}
|
||||
return !nextQ.isEmpty();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void createDomainRecord(SlopCrawlDataRecord parquetRecord) throws URISyntaxException {
|
||||
|
||||
CrawlerDomainStatus status = CrawlerDomainStatus.OK;
|
||||
String statusReason = "";
|
||||
|
||||
String redirectDomain = null;
|
||||
|
||||
// The advisory content types are used to signal various states of the crawl
|
||||
// that are not actual crawled documents.
|
||||
|
||||
switch (parquetRecord.contentType()) {
|
||||
case "x-marginalia/advisory;state=redirect" -> {
|
||||
EdgeUrl crawledUrl = new EdgeUrl(parquetRecord.url());
|
||||
redirectDomain = crawledUrl.getDomain().toString();
|
||||
status = CrawlerDomainStatus.REDIRECT;
|
||||
}
|
||||
case "x-marginalia/advisory;state=blocked" -> {
|
||||
status = CrawlerDomainStatus.BLOCKED;
|
||||
}
|
||||
case "x-marginalia/advisory;state=error" -> {
|
||||
status = CrawlerDomainStatus.ERROR;
|
||||
statusReason = new String(parquetRecord.body());
|
||||
}
|
||||
}
|
||||
|
||||
nextQ.add(new CrawledDomain(
|
||||
parquetRecord.domain(),
|
||||
redirectDomain,
|
||||
status.toString(),
|
||||
statusReason,
|
||||
parquetRecord.ip(),
|
||||
new ArrayList<>(),
|
||||
new ArrayList<>()
|
||||
));
|
||||
}
|
||||
|
||||
private void createDocumentRecord(SlopCrawlDataRecord nextRecord) {
|
||||
CrawlerDocumentStatus status = CrawlerDocumentStatus.OK;
|
||||
|
||||
if (nextRecord.contentType().startsWith("x-marginalia/advisory;state=content-type-failed-probe")) {
|
||||
status = CrawlerDocumentStatus.BAD_CONTENT_TYPE;
|
||||
}
|
||||
else if (nextRecord.contentType().startsWith("x-marginalia/advisory;state=robots-txt-skipped")) {
|
||||
status = CrawlerDocumentStatus.ROBOTS_TXT;
|
||||
}
|
||||
else if (nextRecord.contentType().startsWith("x-marginalia/advisory")) {
|
||||
// we don't care about the other advisory content types here
|
||||
return;
|
||||
}
|
||||
else if (nextRecord.body() != null) {
|
||||
try {
|
||||
ContentType.parse(nextRecord.contentType());
|
||||
} catch (Exception ex) {
|
||||
logger.error("Failed to convert body to string", ex);
|
||||
status = CrawlerDocumentStatus.BAD_CHARSET;
|
||||
}
|
||||
}
|
||||
else {
|
||||
status = CrawlerDocumentStatus.ERROR;
|
||||
}
|
||||
|
||||
nextQ.add(new CrawledDocument("",
|
||||
nextRecord.url(),
|
||||
nextRecord.contentType(),
|
||||
Instant.ofEpochMilli(nextRecord.timestamp()).toString(),
|
||||
nextRecord.httpStatus(),
|
||||
status.toString(),
|
||||
"",
|
||||
nextRecord.headers(),
|
||||
nextRecord.body(),
|
||||
// this field isn't actually used, maybe we can skip calculating it?
|
||||
nextRecord.cookies(),
|
||||
null,
|
||||
null));
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
reader.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SerializableCrawlData next() throws IOException {
|
||||
if (!hasNext())
|
||||
throw new NoSuchElementException();
|
||||
|
||||
return nextQ.poll();
|
||||
}
|
||||
|
||||
}
|
@ -18,7 +18,7 @@ public class DocumentBodyExtractor {
|
||||
return asBytes(fetchOk);
|
||||
}
|
||||
else if (result instanceof HttpFetchResult.Result304ReplacedWithReference retained) {
|
||||
return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body().getBytes());
|
||||
return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body());
|
||||
}
|
||||
|
||||
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok");
|
||||
|
@ -99,20 +99,10 @@ public sealed interface HttpFetchResult {
|
||||
*
|
||||
* @see Result304Raw for the case where the document has not yet been replaced with the reference data.
|
||||
*/
|
||||
record Result304ReplacedWithReference(String url, ContentType contentType, String body) implements HttpFetchResult {
|
||||
|
||||
record Result304ReplacedWithReference(String url, ContentType contentType, byte[] body) implements HttpFetchResult {
|
||||
public boolean isOk() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public Optional<Document> parseDocument() {
|
||||
try {
|
||||
return Optional.of(Jsoup.parse(body));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Fetching resulted in an exception */
|
||||
|
@ -1,8 +1,16 @@
|
||||
package nu.marginalia.model.crawldata;
|
||||
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.jsoup.nodes.Document;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
public final class CrawledDocument implements SerializableCrawlData {
|
||||
public String crawlId;
|
||||
@ -19,8 +27,49 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
@Nullable
|
||||
public String headers;
|
||||
|
||||
public String documentBody;
|
||||
public String documentBody() {
|
||||
return DocumentBodyToString.getStringData(
|
||||
ContentType.parse(contentType),
|
||||
documentBodyBytes);
|
||||
}
|
||||
|
||||
/** Attempt to parse the first sampleSize bytes of the document body into a string */
|
||||
public String documentBody(int sampleSize) {
|
||||
if (sampleSize >= documentBodyBytes.length) {
|
||||
return documentBody();
|
||||
}
|
||||
|
||||
// Truncating the string at an unlucky point *may* lead to a parsing error
|
||||
// ... so we try again with a longer length
|
||||
for (int i = 0; i <= 3 && sampleSize + i < documentBodyBytes.length; i++) {
|
||||
try {
|
||||
byte[] bytes = new byte[sampleSize + i];
|
||||
System.arraycopy(documentBodyBytes, 0, bytes, 0, bytes.length);
|
||||
|
||||
return DocumentBodyToString.getStringData(
|
||||
ContentType.parse(contentType),
|
||||
bytes);
|
||||
}
|
||||
catch (RuntimeException ex) {
|
||||
// Try again with i + 1
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException("Failed to parse substring");
|
||||
}
|
||||
|
||||
public Document parseBody() throws IOException {
|
||||
return DocumentBodyToString.getParsedData(
|
||||
ContentType.parse(contentType),
|
||||
documentBodyBytes,
|
||||
url);
|
||||
}
|
||||
|
||||
public boolean hasBody() {
|
||||
return documentBodyBytes.length > 0;
|
||||
}
|
||||
|
||||
public byte[] documentBodyBytes;
|
||||
/**
|
||||
* This is not guaranteed to be set in all versions of the format,
|
||||
* information may come in CrawledDomain instead
|
||||
@ -30,7 +79,7 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
public String lastModifiedMaybe;
|
||||
public String etagMaybe;
|
||||
|
||||
public CrawledDocument(String crawlId, String url, String contentType, String timestamp, int httpStatus, String crawlerStatus, String crawlerStatusDesc, @Nullable String headers, String documentBody, Boolean hasCookies, String lastModifiedMaybe, String etagMaybe) {
|
||||
public CrawledDocument(String crawlId, String url, String contentType, String timestamp, int httpStatus, String crawlerStatus, String crawlerStatusDesc, @Nullable String headers, byte[] documentBodyBytes, Boolean hasCookies, String lastModifiedMaybe, String etagMaybe) {
|
||||
this.crawlId = crawlId;
|
||||
this.url = url;
|
||||
this.contentType = contentType;
|
||||
@ -39,7 +88,7 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
this.crawlerStatus = crawlerStatus;
|
||||
this.crawlerStatusDesc = crawlerStatusDesc;
|
||||
this.headers = headers;
|
||||
this.documentBody = documentBody;
|
||||
this.documentBodyBytes = Objects.requireNonNullElse(documentBodyBytes, new byte[] {});
|
||||
this.hasCookies = hasCookies;
|
||||
this.lastModifiedMaybe = lastModifiedMaybe;
|
||||
this.etagMaybe = etagMaybe;
|
||||
@ -106,7 +155,7 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "CrawledDocument(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
|
||||
return "CrawledDocument(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + documentBody() + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
|
||||
}
|
||||
|
||||
public static class CrawledDocumentBuilder {
|
||||
@ -118,7 +167,7 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
private String crawlerStatus;
|
||||
private String crawlerStatusDesc;
|
||||
private @Nullable String headers;
|
||||
private String documentBody;
|
||||
private byte[] documentBodyBytes = new byte[0];
|
||||
private String recrawlState;
|
||||
private Boolean hasCookies;
|
||||
private String lastModifiedMaybe;
|
||||
@ -168,10 +217,13 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
}
|
||||
|
||||
public CrawledDocumentBuilder documentBody(String documentBody) {
|
||||
this.documentBody = documentBody;
|
||||
this.documentBodyBytes = documentBody.getBytes(StandardCharsets.UTF_8);
|
||||
return this;
|
||||
}
|
||||
public CrawledDocumentBuilder documentBodyBytes(byte[] documentBodyBytes) {
|
||||
this.documentBodyBytes = documentBodyBytes;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public CrawledDocumentBuilder recrawlState(String recrawlState) {
|
||||
this.recrawlState = recrawlState;
|
||||
@ -194,11 +246,11 @@ public final class CrawledDocument implements SerializableCrawlData {
|
||||
}
|
||||
|
||||
public CrawledDocument build() {
|
||||
return new CrawledDocument(this.crawlId, this.url, this.contentType, this.timestamp, this.httpStatus, this.crawlerStatus, this.crawlerStatusDesc, this.headers, this.documentBody, this.hasCookies, this.lastModifiedMaybe, this.etagMaybe);
|
||||
return new CrawledDocument(this.crawlId, this.url, this.contentType, this.timestamp, this.httpStatus, this.crawlerStatus, this.crawlerStatusDesc, this.headers, this.documentBodyBytes, this.hasCookies, this.lastModifiedMaybe, this.etagMaybe);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "CrawledDocument.CrawledDocumentBuilder(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBody=" + this.documentBody + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
|
||||
return "CrawledDocument.CrawledDocumentBuilder(crawlId=" + this.crawlId + ", url=" + this.url + ", contentType=" + this.contentType + ", timestamp=" + this.timestamp + ", httpStatus=" + this.httpStatus + ", crawlerStatus=" + this.crawlerStatus + ", crawlerStatusDesc=" + this.crawlerStatusDesc + ", headers=" + this.headers + ", documentBodyBytes=" + Arrays.toString(this.documentBodyBytes) + ", recrawlState=" + this.recrawlState + ", hasCookies=" + this.hasCookies + ", lastModifiedMaybe=" + this.lastModifiedMaybe + ", etagMaybe=" + this.etagMaybe + ")";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,520 @@
|
||||
package nu.marginalia.slop;
|
||||
|
||||
import nu.marginalia.ContentTypes;
|
||||
import nu.marginalia.UserAgent;
|
||||
import nu.marginalia.model.body.DocumentBodyExtractor;
|
||||
import nu.marginalia.model.body.DocumentBodyResult;
|
||||
import nu.marginalia.model.body.HttpFetchResult;
|
||||
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecord;
|
||||
import nu.marginalia.parquet.crawldata.CrawledDocumentParquetRecordFileReader;
|
||||
import nu.marginalia.slop.column.array.ByteArrayColumn;
|
||||
import nu.marginalia.slop.column.primitive.ByteColumn;
|
||||
import nu.marginalia.slop.column.primitive.LongColumn;
|
||||
import nu.marginalia.slop.column.primitive.ShortColumn;
|
||||
import nu.marginalia.slop.column.string.EnumColumn;
|
||||
import nu.marginalia.slop.column.string.StringColumn;
|
||||
import nu.marginalia.slop.desc.StorageType;
|
||||
import nu.marginalia.slop.storage.LargeItem;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.netpreserve.jwarc.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public record SlopCrawlDataRecord(String domain,
|
||||
String url,
|
||||
String ip,
|
||||
boolean cookies,
|
||||
int httpStatus,
|
||||
long timestamp,
|
||||
String contentType,
|
||||
byte[] body,
|
||||
String headers)
|
||||
{
|
||||
private static final EnumColumn domainColumn = new EnumColumn("domain", StandardCharsets.UTF_8, StorageType.ZSTD);
|
||||
private static final StringColumn urlColumn = new StringColumn("url", StandardCharsets.UTF_8, StorageType.ZSTD);
|
||||
private static final StringColumn ipColumn = new StringColumn("ip", StandardCharsets.ISO_8859_1, StorageType.ZSTD);
|
||||
private static final ByteColumn cookiesColumn = new ByteColumn("cookies");
|
||||
private static final ShortColumn statusColumn = new ShortColumn("httpStatus");
|
||||
private static final LongColumn timestampColumn = new LongColumn("timestamp");
|
||||
private static final EnumColumn contentTypeColumn = new EnumColumn("contentType", StandardCharsets.UTF_8);
|
||||
private static final ByteArrayColumn bodyColumn = new ByteArrayColumn("body", StorageType.ZSTD);
|
||||
private static final StringColumn headerColumn = new StringColumn("header", StandardCharsets.UTF_8, StorageType.ZSTD);
|
||||
|
||||
public SlopCrawlDataRecord(CrawledDocumentParquetRecord parquetRecord) {
|
||||
this(parquetRecord.domain,
|
||||
parquetRecord.url,
|
||||
parquetRecord.ip,
|
||||
parquetRecord.cookies,
|
||||
parquetRecord.httpStatus,
|
||||
parquetRecord.timestamp.toEpochMilli(),
|
||||
parquetRecord.contentType,
|
||||
parquetRecord.body,
|
||||
parquetRecord.headers
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private static SlopCrawlDataRecord forDomainRedirect(String domain, Instant date, String redirectDomain) {
|
||||
return new SlopCrawlDataRecord(domain,
|
||||
"https://" + redirectDomain + "/",
|
||||
"",
|
||||
false,
|
||||
0,
|
||||
date.toEpochMilli(),
|
||||
"x-marginalia/advisory;state=redirect",
|
||||
new byte[0],
|
||||
""
|
||||
);
|
||||
}
|
||||
|
||||
private static SlopCrawlDataRecord forDomainError(String domain, Instant date, String ip, String errorStatus) {
|
||||
return new SlopCrawlDataRecord(domain,
|
||||
"https://" + domain + "/",
|
||||
ip,
|
||||
false,
|
||||
0,
|
||||
date.toEpochMilli(),
|
||||
"x-marginalia/advisory;state=error",
|
||||
errorStatus.getBytes(),
|
||||
""
|
||||
);
|
||||
}
|
||||
|
||||
private static SlopCrawlDataRecord forDocError(String domain, Instant date, String url, String errorStatus) {
|
||||
return new SlopCrawlDataRecord(domain,
|
||||
url,
|
||||
"",
|
||||
false,
|
||||
0,
|
||||
date.toEpochMilli(),
|
||||
errorStatus,
|
||||
new byte[0],
|
||||
""
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
public static void convertFromParquet(Path parquetInput, Path slopOutput) throws IOException {
|
||||
Path tempDir = Files.createTempDirectory(slopOutput.getParent(), "conversion");
|
||||
|
||||
try (var writer = new Writer(tempDir)) {
|
||||
CrawledDocumentParquetRecordFileReader.stream(parquetInput).forEach(
|
||||
parquetRecord -> {
|
||||
try {
|
||||
writer.write(new SlopCrawlDataRecord(parquetRecord));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (IOException ex) {
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
throw ex;
|
||||
}
|
||||
|
||||
try {
|
||||
SlopTablePacker.packToSlopZip(tempDir, slopOutput);
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to convert WARC file to Parquet", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SlopCrawlDataRecord.class);
|
||||
|
||||
public static void convertWarc(String domain,
|
||||
UserAgent userAgent,
|
||||
Path warcInputFile,
|
||||
Path slopOutputFile) throws IOException {
|
||||
|
||||
Path tempDir = Files.createTempDirectory(slopOutputFile.getParent(), "slop-"+domain);
|
||||
|
||||
try (var warcReader = new WarcReader(warcInputFile);
|
||||
var slopWriter = new SlopCrawlDataRecord.Writer(tempDir)
|
||||
) {
|
||||
WarcXResponseReference.register(warcReader);
|
||||
WarcXEntityRefused.register(warcReader);
|
||||
|
||||
String uaString = userAgent.uaString();
|
||||
|
||||
for (var record : warcReader) {
|
||||
try {
|
||||
if (record instanceof WarcResponse response) {
|
||||
// this also captures WarcXResponseReference, which inherits from WarcResponse
|
||||
// and is used to store old responses from previous crawls; in this part of the logic
|
||||
// we treat them the same as a normal response
|
||||
|
||||
if (!filterResponse(uaString, response)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
slopWriter.write(domain, response);
|
||||
} else if (record instanceof WarcXEntityRefused refused) {
|
||||
slopWriter.write(domain, refused);
|
||||
} else if (record instanceof Warcinfo warcinfo) {
|
||||
slopWriter.write(warcinfo);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to convert WARC record to Parquet", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to convert WARC file to Parquet", ex);
|
||||
}
|
||||
|
||||
try {
|
||||
SlopTablePacker.packToSlopZip(tempDir, slopOutputFile);
|
||||
FileUtils.deleteDirectory(tempDir.toFile());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to convert WARC file to Parquet", ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/** Return true if the WarcResponse should be excluded from conversion */
|
||||
private static boolean filterResponse(String uaString, WarcResponse response) throws IOException {
|
||||
|
||||
// We don't want to store robots.txt files, as they are not
|
||||
// interesting for the analysis we want to do. This is important
|
||||
// since txt-files in general are interesting, and we don't want to
|
||||
// exclude them as a class.
|
||||
|
||||
if (response.targetURI().getPath().equals("/robots.txt")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
var headers = response.http().headers();
|
||||
var robotsTags = headers.all("X-Robots-Tag");
|
||||
|
||||
if (!isXRobotsTagsPermitted(robotsTags, uaString)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Strip out responses with content types we aren't interested in
|
||||
// (though ideally we wouldn't download these at all)
|
||||
String contentType = headers.first("Content-Type").orElse("text/plain").toLowerCase();
|
||||
|
||||
if (!ContentTypes.isAccepted(contentType)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Check X-Robots-Tag header tag to see if we are allowed to index this page.
|
||||
* <p>
|
||||
* Reference: <a href="https://developers.google.com/search/docs/crawling-indexing/robots-meta-tag">https://developers.google.com/search/docs/crawling-indexing/robots-meta-tag</a>
|
||||
*
|
||||
* @param xRobotsHeaderTags List of X-Robots-Tag values
|
||||
* @param userAgent User agent string
|
||||
* @return true if we are allowed to index this page
|
||||
*/
|
||||
// Visible for tests
|
||||
public static boolean isXRobotsTagsPermitted(List<String> xRobotsHeaderTags, String userAgent) {
|
||||
boolean isPermittedGeneral = true;
|
||||
boolean isPermittedMarginalia = false;
|
||||
boolean isForbiddenMarginalia = false;
|
||||
|
||||
for (String header : xRobotsHeaderTags) {
|
||||
if (header.indexOf(':') >= 0) {
|
||||
String[] parts = StringUtils.split(header, ":", 2);
|
||||
|
||||
if (parts.length < 2)
|
||||
continue;
|
||||
|
||||
// Is this relevant to us?
|
||||
if (!Objects.equals(parts[0].trim(), userAgent))
|
||||
continue;
|
||||
|
||||
if (parts[1].contains("noindex"))
|
||||
isForbiddenMarginalia = true;
|
||||
else if (parts[1].contains("none"))
|
||||
isForbiddenMarginalia = true;
|
||||
else if (parts[1].contains("all"))
|
||||
isPermittedMarginalia = true;
|
||||
}
|
||||
else {
|
||||
if (header.contains("noindex"))
|
||||
isPermittedGeneral = false;
|
||||
if (header.contains("none"))
|
||||
isPermittedGeneral = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (isPermittedMarginalia)
|
||||
return true;
|
||||
if (isForbiddenMarginalia)
|
||||
return false;
|
||||
return isPermittedGeneral;
|
||||
}
|
||||
|
||||
public static int countGoodStatusCodes(Path path) throws IOException {
|
||||
int cnt = 0;
|
||||
|
||||
try (var table = new SlopTable(path)) {
|
||||
ShortColumn.Reader statusReader = statusColumn.open(table);
|
||||
while (statusReader.hasRemaining()) {
|
||||
if (statusReader.get() == 200) {
|
||||
cnt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
public static class Writer extends SlopTable {
|
||||
private final EnumColumn.Writer domainColumnWriter;
|
||||
private final StringColumn.Writer urlColumnWriter;
|
||||
private final StringColumn.Writer ipColumnWriter;
|
||||
private final ByteColumn.Writer cookiesColumnWriter;
|
||||
private final ShortColumn.Writer statusColumnWriter;
|
||||
private final LongColumn.Writer timestampColumnWriter;
|
||||
private final EnumColumn.Writer contentTypeColumnWriter;
|
||||
private final ByteArrayColumn.Writer bodyColumnWriter;
|
||||
private final StringColumn.Writer headerColumnWriter;
|
||||
|
||||
public Writer(Path path) throws IOException {
|
||||
super(path);
|
||||
|
||||
domainColumnWriter = domainColumn.create(this);
|
||||
urlColumnWriter = urlColumn.create(this);
|
||||
ipColumnWriter = ipColumn.create(this);
|
||||
cookiesColumnWriter = cookiesColumn.create(this);
|
||||
statusColumnWriter = statusColumn.create(this);
|
||||
timestampColumnWriter = timestampColumn.create(this);
|
||||
contentTypeColumnWriter = contentTypeColumn.create(this);
|
||||
bodyColumnWriter = bodyColumn.create(this);
|
||||
headerColumnWriter = headerColumn.create(this);
|
||||
}
|
||||
|
||||
public void write(SlopCrawlDataRecord record) throws IOException {
|
||||
domainColumnWriter.put(record.domain);
|
||||
urlColumnWriter.put(record.url);
|
||||
ipColumnWriter.put(record.ip);
|
||||
cookiesColumnWriter.put(record.cookies ? (byte) 1 : (byte) 0);
|
||||
statusColumnWriter.put((short) record.httpStatus);
|
||||
timestampColumnWriter.put(record.timestamp);
|
||||
contentTypeColumnWriter.put(record.contentType);
|
||||
bodyColumnWriter.put(record.body);
|
||||
headerColumnWriter.put(record.headers);
|
||||
}
|
||||
|
||||
public void write(String domain, WarcResponse response) throws IOException {
|
||||
|
||||
HttpFetchResult result = HttpFetchResult.importWarc(response);
|
||||
if (!(result instanceof HttpFetchResult.ResultOk fetchOk)) {
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] bodyBytes;
|
||||
String contentType;
|
||||
|
||||
var body = DocumentBodyExtractor.asBytes(result);
|
||||
|
||||
var headers = fetchOk.headers();
|
||||
|
||||
if (body instanceof DocumentBodyResult.Ok<byte[]> bodyOk) {
|
||||
bodyBytes = bodyOk.body();
|
||||
contentType = bodyOk.contentType().toString();
|
||||
}
|
||||
else {
|
||||
bodyBytes = new byte[0];
|
||||
contentType = "";
|
||||
}
|
||||
|
||||
String headersStr;
|
||||
StringJoiner headersStrBuilder = new StringJoiner("\n");
|
||||
for (var header : headers.map().entrySet()) {
|
||||
for (var value : header.getValue()) {
|
||||
headersStrBuilder.add(header.getKey() + ": " + value);
|
||||
}
|
||||
}
|
||||
headersStr = headersStrBuilder.toString();
|
||||
|
||||
|
||||
write(new SlopCrawlDataRecord(
|
||||
domain,
|
||||
response.target(),
|
||||
fetchOk.ipAddress(),
|
||||
"1".equals(headers.firstValue("X-Cookies").orElse("0")),
|
||||
fetchOk.statusCode(),
|
||||
response.date().toEpochMilli(),
|
||||
contentType,
|
||||
bodyBytes,
|
||||
headersStr
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private void write(String domain, WarcXEntityRefused refused) throws IOException {
|
||||
URI profile = refused.profile();
|
||||
|
||||
String meta;
|
||||
if (profile.equals(WarcXEntityRefused.documentRobotsTxtSkippedURN)) {
|
||||
meta = "x-marginalia/advisory;state=robots-txt-skipped";
|
||||
}
|
||||
else if (profile.equals(WarcXEntityRefused.documentBadContentTypeURN)) {
|
||||
meta = "x-marginalia/advisory;state=content-type-failed-probe";
|
||||
}
|
||||
else if (profile.equals(WarcXEntityRefused.documentProbeTimeout)) {
|
||||
meta = "x-marginalia/advisory;state=timeout-probe";
|
||||
}
|
||||
else if (profile.equals(WarcXEntityRefused.documentUnspecifiedError)) {
|
||||
meta = "x-marginalia/advisory;state=doc-error";
|
||||
}
|
||||
else {
|
||||
meta = "x-marginalia/advisory;state=unknown";
|
||||
}
|
||||
|
||||
write(forDocError(domain, refused.date(), refused.target(), meta));
|
||||
}
|
||||
|
||||
private void write(Warcinfo warcinfo) throws IOException {
|
||||
String selfDomain = warcinfo.fields().first("domain").orElse("");
|
||||
String ip = warcinfo.fields().first("ip").orElse("");
|
||||
String probeStatus = warcinfo.fields().first("X-WARC-Probe-Status").orElse("");
|
||||
|
||||
if (probeStatus.startsWith("REDIRECT")) {
|
||||
String redirectDomain = probeStatus.substring("REDIRECT;".length());
|
||||
write(forDomainRedirect(selfDomain, warcinfo.date(), redirectDomain));
|
||||
}
|
||||
else if (!"OK".equals(probeStatus)) {
|
||||
write(forDomainError(selfDomain, warcinfo.date(), ip, probeStatus));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class Reader extends SlopTable {
|
||||
private final EnumColumn.Reader domainColumnReader;
|
||||
private final StringColumn.Reader urlColumnReader;
|
||||
private final StringColumn.Reader ipColumnReader;
|
||||
private final ByteColumn.Reader cookiesColumnReader;
|
||||
private final ShortColumn.Reader statusColumnReader;
|
||||
private final LongColumn.Reader timestampColumnReader;
|
||||
private final EnumColumn.Reader contentTypeColumnReader;
|
||||
private final ByteArrayColumn.Reader bodyColumnReader;
|
||||
private final StringColumn.Reader headerColumnReader;
|
||||
|
||||
public Reader(Path path) throws IOException {
|
||||
super(path);
|
||||
|
||||
domainColumnReader = domainColumn.open(this);
|
||||
urlColumnReader = urlColumn.open(this);
|
||||
ipColumnReader = ipColumn.open(this);
|
||||
cookiesColumnReader = cookiesColumn.open(this);
|
||||
statusColumnReader = statusColumn.open(this);
|
||||
timestampColumnReader = timestampColumn.open(this);
|
||||
contentTypeColumnReader = contentTypeColumn.open(this);
|
||||
bodyColumnReader = bodyColumn.open(this);
|
||||
headerColumnReader = headerColumn.open(this);
|
||||
}
|
||||
|
||||
public SlopCrawlDataRecord get() throws IOException {
|
||||
return new SlopCrawlDataRecord(
|
||||
domainColumnReader.get(),
|
||||
urlColumnReader.get(),
|
||||
ipColumnReader.get(),
|
||||
cookiesColumnReader.get() == 1,
|
||||
statusColumnReader.get(),
|
||||
timestampColumnReader.get(),
|
||||
contentTypeColumnReader.get(),
|
||||
bodyColumnReader.get(),
|
||||
headerColumnReader.get()
|
||||
);
|
||||
}
|
||||
|
||||
public boolean hasRemaining() throws IOException {
|
||||
return domainColumnReader.hasRemaining();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public abstract static class FilteringReader extends SlopTable {
|
||||
private final EnumColumn.Reader domainColumnReader;
|
||||
private final StringColumn.Reader urlColumnReader;
|
||||
private final StringColumn.Reader ipColumnReader;
|
||||
private final ByteColumn.Reader cookiesColumnReader;
|
||||
private final ShortColumn.Reader statusColumnReader;
|
||||
private final LongColumn.Reader timestampColumnReader;
|
||||
private final EnumColumn.Reader contentTypeColumnReader;
|
||||
private final ByteArrayColumn.Reader bodyColumnReader;
|
||||
private final StringColumn.Reader headerColumnReader;
|
||||
|
||||
private SlopCrawlDataRecord next = null;
|
||||
|
||||
public FilteringReader(Path path) throws IOException {
|
||||
super(path);
|
||||
|
||||
domainColumnReader = domainColumn.open(this);
|
||||
urlColumnReader = urlColumn.open(this);
|
||||
ipColumnReader = ipColumn.open(this);
|
||||
cookiesColumnReader = cookiesColumn.open(this);
|
||||
statusColumnReader = statusColumn.open(this);
|
||||
timestampColumnReader = timestampColumn.open(this);
|
||||
contentTypeColumnReader = contentTypeColumn.open(this);
|
||||
bodyColumnReader = bodyColumn.open(this);
|
||||
headerColumnReader = headerColumn.open(this);
|
||||
}
|
||||
|
||||
public abstract boolean filter(String url, int status, String contentType);
|
||||
|
||||
public SlopCrawlDataRecord get() throws IOException {
|
||||
if (next == null) {
|
||||
if (!hasRemaining()) {
|
||||
throw new IllegalStateException("No more values remaining");
|
||||
}
|
||||
}
|
||||
var val = next;
|
||||
next = null;
|
||||
return val;
|
||||
}
|
||||
|
||||
public boolean hasRemaining() throws IOException {
|
||||
if (next != null)
|
||||
return true;
|
||||
|
||||
while (domainColumnReader.hasRemaining()) {
|
||||
String domain = domainColumnReader.get();
|
||||
String url = urlColumnReader.get();
|
||||
String ip = ipColumnReader.get();
|
||||
boolean cookies = cookiesColumnReader.get() == 1;
|
||||
int status = statusColumnReader.get();
|
||||
long timestamp = timestampColumnReader.get();
|
||||
String contentType = contentTypeColumnReader.get();
|
||||
|
||||
LargeItem<byte[]> body = bodyColumnReader.getLarge();
|
||||
LargeItem<String> headers = headerColumnReader.getLarge();
|
||||
|
||||
if (filter(url, status, contentType)) {
|
||||
next = new SlopCrawlDataRecord(
|
||||
domain, url, ip, cookies, status, timestamp, contentType, body.get(), headers.get()
|
||||
);
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
body.close();
|
||||
headers.close();
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@ -80,7 +80,7 @@ class CrawledDocumentParquetRecordFileWriterTest {
|
||||
var document = (CrawledDocument) secondItem;
|
||||
assertEquals("https://www.marginalia.nu/", document.url);
|
||||
assertEquals("text/html", document.contentType);
|
||||
assertEquals("hello world", document.documentBody);
|
||||
assertEquals("hello world", document.documentBody());
|
||||
assertEquals(200, document.httpStatus);
|
||||
}
|
||||
|
||||
@ -103,7 +103,7 @@ class CrawledDocumentParquetRecordFileWriterTest {
|
||||
System.out.println(doc.url);
|
||||
System.out.println(doc.contentType);
|
||||
System.out.println(doc.httpStatus);
|
||||
System.out.println(doc.documentBody.length());
|
||||
System.out.println(doc.documentBody().length());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -0,0 +1,85 @@
|
||||
package nu.marginalia.slop;
|
||||
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
class SlopCrawlDataRecordTest {
|
||||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
|
||||
// Files.deleteIfExists(Path.of("/tmp/steam.slop.zip"));
|
||||
// SlopCrawlDataRecord.convertFromParquet(
|
||||
// Path.of("/home/vlofgren/Downloads/_storage_crawl-data__23-10-21T15_08_43.750_3b_41_3b41e714-store.steampowered.com.parquet"),
|
||||
// Path.of("/tmp/steam.slop.zip")
|
||||
// );
|
||||
// long st = 0;
|
||||
// try (var reader = new SlopCrawlDataRecord.FilteringReader(Path.of("/tmp/steam.slop.zip")) {
|
||||
// public boolean filter(String url, int status, String contentType) {
|
||||
// return contentType.startsWith("text/html");
|
||||
// }
|
||||
// }) {
|
||||
// Instant start = Instant.now();
|
||||
// while (reader.hasRemaining()) {
|
||||
// var next = reader.get();
|
||||
// byte[] body = next.body();
|
||||
//
|
||||
// st += Jsoup.parse(new String(body)).title().length();
|
||||
// }
|
||||
// System.out.println(st + " " + Duration.between(start, Instant.now()));
|
||||
// }
|
||||
|
||||
long st = 0;
|
||||
try (var reader = new SlopCrawlDataRecord.FilteringReader(Path.of("/tmp/steam.slop.zip")) {
|
||||
public boolean filter(String url, int status, String contentType) {
|
||||
return contentType.startsWith("text/html");
|
||||
}
|
||||
}) {
|
||||
Instant start = Instant.now();
|
||||
while (reader.hasRemaining()) {
|
||||
var next = reader.get();
|
||||
byte[] body = next.body();
|
||||
|
||||
st += Jsoup.parse(new ByteArrayInputStream(body), ContentType.parse(next.contentType()).asCharset().name(), next.url()).title().length();
|
||||
|
||||
}
|
||||
System.out.println(Duration.between(start, Instant.now()));
|
||||
}
|
||||
|
||||
// System.out.println("BEGIN Slop");
|
||||
// for (int i = 0; i < 6; i++) {
|
||||
// int sz = 0;
|
||||
// Instant start = Instant.now();
|
||||
// try (var reader = new SlopCrawlDataRecord.Reader(Path.of("/tmp/steam.slop.zip")) {
|
||||
// public boolean filter(String url, int status, String contentType) {
|
||||
// return contentType.startsWith("text/html");
|
||||
// }
|
||||
// }) {
|
||||
// while (reader.hasRemaining()) {
|
||||
// sz += reader.get().httpStatus();
|
||||
// }
|
||||
// }
|
||||
// Instant end = Instant.now();
|
||||
// System.out.println("END Iter " + sz + " " + Duration.between(start, end));
|
||||
// }
|
||||
// System.out.println("END Slop");
|
||||
//
|
||||
// System.out.println("BEGIN Parquet");
|
||||
// for (int i = 0; i < 6; i++) {
|
||||
// Instant start = Instant.now();
|
||||
// int sz = CrawledDocumentParquetRecordFileReader.stream(Path.of("/home/vlofgren/Downloads/_storage_crawl-data__23-10-21T15_08_43.750_3b_41_3b41e714-store.steampowered.com.parquet"))
|
||||
// .filter(record -> record.contentType.startsWith("text/html"))
|
||||
// .mapToInt(record -> record.httpStatus).sum();
|
||||
// Instant end = Instant.now();
|
||||
// System.out.println("END Iter " + sz + " " + Duration.between(start, end));
|
||||
// }
|
||||
// System.out.println("END Parquet");
|
||||
}
|
||||
}
|
@ -82,7 +82,7 @@ class WarcRecorderTest {
|
||||
recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"),
|
||||
"text/html",
|
||||
200,
|
||||
"<?doctype html><html><body>test</body></html>",
|
||||
"<?doctype html><html><body>test</body></html>".getBytes(),
|
||||
null,
|
||||
ContentTags.empty());
|
||||
}
|
||||
@ -118,7 +118,7 @@ class WarcRecorderTest {
|
||||
recorder.writeReferenceCopy(new EdgeUrl("https://www.marginalia.nu/"),
|
||||
"text/html",
|
||||
200,
|
||||
"<?doctype html><html><body>test</body></html>",
|
||||
"<?doctype html><html><body>test</body></html>".getBytes(),
|
||||
null, ContentTags.empty());
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ public class CrawlerMockFetcherTest {
|
||||
public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags, ProbeType probeType) {
|
||||
logger.info("Fetching {}", url);
|
||||
if (mockData.containsKey(url)) {
|
||||
byte[] bodyBytes = mockData.get(url).documentBody.getBytes();
|
||||
byte[] bodyBytes = mockData.get(url).documentBodyBytes;
|
||||
|
||||
try {
|
||||
return new HttpFetchResult.ResultOk(
|
||||
|
@ -14,7 +14,6 @@ import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -52,11 +51,9 @@ public class AtagExporter implements ExporterIf {
|
||||
|
||||
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)))))
|
||||
{
|
||||
Path crawlerLogFile = inputDir.resolve("crawler.log");
|
||||
|
||||
var tagWriter = new ATagCsvWriter(bw);
|
||||
|
||||
for (var item : WorkLog.iterable(crawlerLogFile)) {
|
||||
for (var item : WorkLog.iterable(inputDir.resolve("crawler.log"))) {
|
||||
if (Thread.interrupted()) {
|
||||
throw new InterruptedException();
|
||||
}
|
||||
@ -89,15 +86,19 @@ public class AtagExporter implements ExporterIf {
|
||||
while (stream.hasNext()) {
|
||||
if (!(stream.next() instanceof CrawledDocument doc))
|
||||
continue;
|
||||
if (null == doc.documentBody)
|
||||
if (!doc.hasBody())
|
||||
continue;
|
||||
if (!doc.contentType.toLowerCase().startsWith("text/html"))
|
||||
continue;
|
||||
|
||||
var baseUrl = new EdgeUrl(doc.url);
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
var parsed = doc.parseBody();
|
||||
|
||||
for (var atag : parsed.getElementsByTag("a")) {
|
||||
if (!atag.hasAttr("href")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String linkText = atag.text();
|
||||
|
||||
if (!linkFilter.isLinkTextEligible(linkText)) {
|
||||
|
@ -12,7 +12,6 @@ import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
@ -81,13 +80,13 @@ public class FeedExporter implements ExporterIf {
|
||||
while (stream.hasNext()) {
|
||||
if (!(stream.next() instanceof CrawledDocument doc))
|
||||
continue;
|
||||
if (null == doc.documentBody)
|
||||
if (!doc.hasBody())
|
||||
continue;
|
||||
if (!doc.contentType.toLowerCase().startsWith("text/html"))
|
||||
continue;
|
||||
|
||||
var baseUrl = new EdgeUrl(doc.url);
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
var parsed = doc.parseBody();
|
||||
|
||||
List<EdgeUrl> feedUrls = new ArrayList<>();
|
||||
for (var link : parsed.select("link[rel=alternate]")) {
|
||||
|
@ -15,7 +15,6 @@ import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.util.SimpleBlockingThreadPool;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -110,13 +109,13 @@ public class TermFrequencyExporter implements ExporterIf {
|
||||
return;
|
||||
|
||||
if (!(stream.next() instanceof CrawledDocument doc)) continue;
|
||||
if (doc.documentBody == null) continue;
|
||||
if (!doc.hasBody()) continue;
|
||||
if (!doc.contentType.toLowerCase().startsWith("text/html"))
|
||||
continue;
|
||||
|
||||
docCount.incrementAndGet();
|
||||
|
||||
Document parsed = Jsoup.parse(doc.documentBody);
|
||||
Document parsed = doc.parseBody();
|
||||
parsed.body().filter(new DomPruningFilter(0.5));
|
||||
|
||||
DocumentLanguageData dld = se.extractSentences(parsed);
|
||||
|
@ -119,12 +119,16 @@ public class LiveCrawlDataSet implements AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
private String decompress(byte[] data) {
|
||||
private String decompressStr(byte[] data) {
|
||||
return new String(decompressBytes(data));
|
||||
}
|
||||
|
||||
private byte[] decompressBytes(byte[] data) {
|
||||
// gzip decompression
|
||||
try (var bis = new ByteArrayInputStream(data);
|
||||
var gzip = new GZIPInputStream(bis))
|
||||
try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
|
||||
GZIPInputStream gzip = new GZIPInputStream(bis))
|
||||
{
|
||||
return new String(gzip.readAllBytes());
|
||||
return gzip.readAllBytes();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
@ -177,8 +181,8 @@ public class LiveCrawlDataSet implements AutoCloseable {
|
||||
dataStack = new ArrayList<>();
|
||||
while (rs.next()) {
|
||||
String url = rs.getString("url");
|
||||
String body = decompress(rs.getBytes("body"));
|
||||
String headers = decompress(rs.getBytes("headers"));
|
||||
byte[] body = decompressBytes(rs.getBytes("body"));
|
||||
String headers = decompressStr(rs.getBytes("headers"));
|
||||
|
||||
dataStack.add(new CrawledDocument(
|
||||
"LIVE",
|
||||
|
@ -200,7 +200,7 @@ public class LiveCrawlerMain extends ProcessMainClass {
|
||||
writer.setOrdinalOffset(67_000_000);
|
||||
|
||||
for (SerializableCrawlDataStream stream : hb.wrap("Processing", dataSet.getDataStreams())) {
|
||||
writer.write(domainProcessor.sideloadProcessing(stream, 0, Set.of("special:live")));
|
||||
writer.write(domainProcessor.simpleProcessing(stream, 0, Set.of("special:live")));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,7 +51,7 @@ public class LiveCrawlDataSetTest {
|
||||
case CrawledDocument document -> {
|
||||
dataCount++;
|
||||
Assertions.assertEquals("https://www.example.com/", document.url);
|
||||
Assertions.assertEquals("test", document.documentBody);
|
||||
Assertions.assertEquals("test", document.documentBody());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ class SimpleLinkScraperTest {
|
||||
|
||||
List<CrawledDocument> documents = firstStream.docsAsList();
|
||||
Assertions.assertEquals(1, documents.size());
|
||||
Assertions.assertTrue(documents.getFirst().documentBody.startsWith("<!doctype"));
|
||||
Assertions.assertTrue(documents.getFirst().documentBody().startsWith("<!doctype"));
|
||||
}
|
||||
|
||||
|
||||
|
@ -6,7 +6,6 @@ import nu.marginalia.converting.ConverterModule;
|
||||
import nu.marginalia.io.CrawledDomainReader;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.tools.experiments.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
@ -16,13 +15,6 @@ import java.util.Map;
|
||||
public class ExperimentRunnerMain {
|
||||
|
||||
private static Map<String, Class<? extends Experiment>> experiments = Map.of(
|
||||
"test", TestExperiment.class,
|
||||
"adblock", AdblockExperiment.class,
|
||||
"topic", TopicExperiment.class,
|
||||
"sentence-statistics", SentenceStatisticsExperiment.class,
|
||||
"site-statistics", SiteStatisticsExperiment.class,
|
||||
"export-atags", ExportExternalLinksExperiment.class,
|
||||
"debug-converter", DebugConverterExperiment.class
|
||||
);
|
||||
|
||||
public static void main(String... args) throws IOException {
|
||||
|
@ -1,45 +0,0 @@
|
||||
package nu.marginalia.tools.experiments;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.converting.processor.DocumentProcessor;
|
||||
import nu.marginalia.converting.processor.classifier.adblock.AdblockSimulator;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.tools.LegacyExperiment;
|
||||
import org.jsoup.Jsoup;
|
||||
import org.jsoup.nodes.Document;
|
||||
|
||||
public class AdblockExperiment extends LegacyExperiment {
|
||||
|
||||
private final AdblockSimulator simulator;
|
||||
|
||||
@Inject
|
||||
public AdblockExperiment(AdblockSimulator simulator) {
|
||||
this.simulator = simulator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(CrawledDomain domain) {
|
||||
if (domain.doc == null) return true;
|
||||
|
||||
for (var doc : domain.doc) {
|
||||
if (DocumentProcessor.isAcceptedContentType(doc) && "OK".equals(doc.crawlerStatus)) {
|
||||
processDocument(doc);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void processDocument(CrawledDocument doc) {
|
||||
Document parsedDocument = Jsoup.parse(doc.documentBody);
|
||||
|
||||
if (simulator.hasAds(parsedDocument)) {
|
||||
System.out.println(doc.url);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish() {
|
||||
}
|
||||
}
|
@ -1,46 +0,0 @@
|
||||
package nu.marginalia.tools.experiments;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.converting.processor.DomainProcessor;
|
||||
import nu.marginalia.converting.processor.plugin.specialization.BlogSpecialization;
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.tools.LegacyExperiment;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
public class DebugConverterExperiment extends LegacyExperiment {
|
||||
|
||||
|
||||
private final DomainProcessor domainProcessor;
|
||||
|
||||
@Inject
|
||||
public DebugConverterExperiment(DomainProcessor domainProcessor) {
|
||||
this.domainProcessor = domainProcessor;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(CrawledDomain domain) {
|
||||
|
||||
if (domain.doc == null) return true;
|
||||
|
||||
for (var doc : domain.doc) {
|
||||
if (doc.documentBody == null) continue;
|
||||
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
|
||||
var tagExtractor = new BlogSpecialization.BlogTagExtractor();
|
||||
parsed.traverse(tagExtractor);
|
||||
var tags = tagExtractor.getTags();
|
||||
if (!tags.isEmpty()) {
|
||||
System.out.println(tags);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish() {
|
||||
}
|
||||
}
|
@ -1,71 +0,0 @@
|
||||
package nu.marginalia.tools.experiments;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import gnu.trove.set.hash.TLongHashSet;
|
||||
import nu.marginalia.hash.MurmurHash3_128;
|
||||
import nu.marginalia.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.link_parser.LinkParser;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawldata.CrawledDocument;
|
||||
import nu.marginalia.tools.Experiment;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class ExportExternalLinksExperiment extends Experiment {
|
||||
|
||||
@Inject
|
||||
public ExportExternalLinksExperiment() {
|
||||
|
||||
}
|
||||
private static final LinkParser linkParser = new LinkParser();
|
||||
MurmurHash3_128 hash = new MurmurHash3_128();
|
||||
|
||||
@Override
|
||||
public boolean process(SerializableCrawlDataStream stream) {
|
||||
TLongHashSet hashes = new TLongHashSet();
|
||||
|
||||
try {
|
||||
while (stream.hasNext()) {
|
||||
if (!(stream.next() instanceof CrawledDocument doc))
|
||||
continue;
|
||||
if (null == doc.documentBody)
|
||||
continue;
|
||||
|
||||
var baseUrl = new EdgeUrl(doc.url);
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
|
||||
for (var atag : parsed.getElementsByTag("a")) {
|
||||
String linkText = atag.text();
|
||||
if (linkText.isBlank())
|
||||
continue;
|
||||
|
||||
var linkOpt = linkParser.parseLinkPermissive(baseUrl, atag);
|
||||
linkOpt
|
||||
.filter(url -> !Objects.equals(url.domain, baseUrl.domain))
|
||||
.filter(url -> hashes.add(hash.hashNearlyASCII(linkText) ^ hash.hashNearlyASCII(url.toString())))
|
||||
.ifPresent(url ->
|
||||
System.out.printf("\"%s\",\"%s\",\"%s\"\n",
|
||||
csvify(url),
|
||||
csvify(baseUrl.domain),
|
||||
csvify(linkText)));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static String csvify(Object field) {
|
||||
return field.toString().replace("\"", "\"\"");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish() {
|
||||
}
|
||||
}
|
@ -1,70 +0,0 @@
|
||||
package nu.marginalia.tools.experiments;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
|
||||
import nu.marginalia.keyword.DocumentKeywordExtractor;
|
||||
import nu.marginalia.keyword.LinkTexts;
|
||||
import nu.marginalia.language.sentence.SentenceExtractor;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.term_frequency_dict.TermFrequencyDict;
|
||||
import nu.marginalia.tools.LegacyExperiment;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class SentenceStatisticsExperiment extends LegacyExperiment {
|
||||
|
||||
SentenceExtractor se = new SentenceExtractor(WmsaHome.getLanguageModels());
|
||||
DocumentKeywordExtractor documentKeywordExtractor = new DocumentKeywordExtractor(new TermFrequencyDict(WmsaHome.getLanguageModels()));
|
||||
Path filename;
|
||||
PrintWriter writer;
|
||||
|
||||
@Inject
|
||||
public SentenceStatisticsExperiment() throws IOException {
|
||||
filename = Files.createTempFile(getClass().getSimpleName(), ".csv");
|
||||
System.out.println("Writing to " + filename);
|
||||
|
||||
writer = new PrintWriter(new BufferedOutputStream(new FileOutputStream(filename.toFile())));
|
||||
}
|
||||
|
||||
private void logLine(String message) {
|
||||
System.out.printf("\u001b[2K\r%s", message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(CrawledDomain domain) {
|
||||
if (domain.doc == null) return true;
|
||||
|
||||
logLine("Processing: " + domain.domain);
|
||||
|
||||
ByteBuffer workArea = ByteBuffer.allocate(8192);
|
||||
for (var doc : domain.doc) {
|
||||
if (doc.documentBody == null) continue;
|
||||
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
|
||||
parsed.body().filter(new DomPruningFilter(0.5));
|
||||
|
||||
var dld = se.extractSentences(parsed);
|
||||
var keywords = documentKeywordExtractor.extractKeywords(dld, new LinkTexts(), EdgeUrl.parse(doc.url).orElseThrow());
|
||||
|
||||
keywords.build(workArea);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish() {
|
||||
logLine("Done!\n");
|
||||
writer.close();
|
||||
}
|
||||
}
|
@ -1,16 +0,0 @@
|
||||
package nu.marginalia.tools.experiments;
|
||||
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.tools.LegacyExperiment;
|
||||
|
||||
public class TestExperiment extends LegacyExperiment {
|
||||
@Override
|
||||
public boolean process(CrawledDomain domain) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish() {
|
||||
System.out.println("Tada!");
|
||||
}
|
||||
}
|
@ -1,64 +0,0 @@
|
||||
package nu.marginalia.tools.experiments;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.converting.processor.classifier.topic.AdHocDetector;
|
||||
import nu.marginalia.converting.processor.logic.dom.DomPruningFilter;
|
||||
import nu.marginalia.language.sentence.SentenceExtractor;
|
||||
import nu.marginalia.model.crawldata.CrawledDomain;
|
||||
import nu.marginalia.tools.LegacyExperiment;
|
||||
import org.jsoup.Jsoup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class TopicExperiment extends LegacyExperiment {
|
||||
|
||||
AdHocDetector detector;
|
||||
|
||||
SentenceExtractor se = new SentenceExtractor(WmsaHome.getLanguageModels());
|
||||
Path filename = null;
|
||||
|
||||
public void args(String... args) {
|
||||
filename = Path.of(args[0]);
|
||||
try {
|
||||
detector = new AdHocDetector(Files.readAllLines(filename));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
public TopicExperiment() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(CrawledDomain domain) {
|
||||
if (domain.doc == null) return true;
|
||||
|
||||
|
||||
for (var doc : domain.doc) {
|
||||
if (doc.documentBody == null) continue;
|
||||
|
||||
var parsed = Jsoup.parse(doc.documentBody);
|
||||
|
||||
parsed.body().filter(new DomPruningFilter(0.5));
|
||||
var dld = se.extractSentences(parsed);
|
||||
|
||||
if (dld.totalNumWords() < 50)
|
||||
continue;
|
||||
|
||||
if (detector.testP(dld) > 0.5) {
|
||||
System.out.println("match\t" + doc.url);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFinish() {
|
||||
}
|
||||
}
|
@ -136,7 +136,7 @@ public class IntegrationTest {
|
||||
</p>
|
||||
</body>
|
||||
</html>
|
||||
""",
|
||||
""".getBytes(),
|
||||
"",
|
||||
ContentTags.empty()
|
||||
);
|
||||
|
@ -234,14 +234,13 @@ dependencyResolutionManagement {
|
||||
library('jetty-util','org.eclipse.jetty','jetty-util').version('9.4.54.v20240208')
|
||||
library('jetty-servlet','org.eclipse.jetty','jetty-servlet').version('9.4.54.v20240208')
|
||||
|
||||
library('slop', 'nu.marginalia', 'slop').version('0.0.9-org-5-SNAPSHOT')
|
||||
library('jooby-netty','io.jooby','jooby-netty').version(joobyVersion)
|
||||
library('jooby-jte','io.jooby','jooby-jte').version(joobyVersion)
|
||||
library('jooby-apt','io.jooby','jooby-apt').version(joobyVersion)
|
||||
|
||||
library('jte','gg.jte','jte').version('3.1.15')
|
||||
|
||||
library('slop', 'nu.marginalia', 'slop').version('0.0.8-SNAPSHOT')
|
||||
|
||||
bundle('jetty', ['jetty-server', 'jetty-util', 'jetty-servlet'])
|
||||
|
||||
bundle('slf4j', ['slf4j.api', 'log4j.api', 'log4j.core', 'log4j.slf4j'])
|
||||
@ -261,7 +260,6 @@ dependencyResolutionManagement {
|
||||
bundle('jooby', ['jooby-netty', 'jooby-jte'])
|
||||
bundle('curator', ['curator-framework', 'curator-x-discovery'])
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user