(crawl-spec) Parquetify crawl spec

* Crawl-specs are now parquet files
* Deprecate the crawl-job-extractor tool
This commit is contained in:
Viktor Lofgren 2023-09-16 18:14:47 +02:00
parent 46232c7fd4
commit 5c040f7a46
42 changed files with 575 additions and 701 deletions

View File

@ -34,10 +34,6 @@ tasks.register('dist', Copy) {
from tarTree("$buildDir/dist/website-adjacencies-calculator.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/crawl-job-extractor-process.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/index-construction-process.tar")
into "$projectDir/run/dist/"

View File

@ -0,0 +1,123 @@
package nu.marginalia.db;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
/** Class used in exporting data. This is intended to be used for a brief time
* and then discarded, not kept around as a service.
*/
public class DbDomainStatsExportMultitool implements AutoCloseable {
private final Connection connection;
private final PreparedStatement knownUrlsQuery;
private final PreparedStatement visitedUrlsQuery;
private final PreparedStatement goodUrlsQuery;
private final PreparedStatement domainNameToId;
private final PreparedStatement allDomainsQuery;
private final PreparedStatement crawlQueueDomains;
private final PreparedStatement indexedDomainsQuery;
public DbDomainStatsExportMultitool(HikariDataSource dataSource) throws SQLException {
this.connection = dataSource.getConnection();
knownUrlsQuery = connection.prepareStatement("""
SELECT KNOWN_URLS
FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA
ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
WHERE DOMAIN_NAME=?
""");
visitedUrlsQuery = connection.prepareStatement("""
SELECT VISITED_URLS
FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA
ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
WHERE DOMAIN_NAME=?
""");
goodUrlsQuery = connection.prepareStatement("""
SELECT GOOD_URLS
FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA
ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
WHERE DOMAIN_NAME=?
""");
domainNameToId = connection.prepareStatement("""
SELECT ID
FROM EC_DOMAIN
WHERE DOMAIN_NAME=?
""");
allDomainsQuery = connection.prepareStatement("""
SELECT DOMAIN_NAME
FROM EC_DOMAIN
""");
crawlQueueDomains = connection.prepareStatement("""
SELECT DOMAIN_NAME
FROM CRAWL_QUEUE
""");
indexedDomainsQuery = connection.prepareStatement("""
SELECT DOMAIN_NAME
FROM EC_DOMAIN
WHERE INDEXED > 0
""");
}
public OptionalInt getKnownUrls(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, knownUrlsQuery);
}
public OptionalInt getVisitedUrls(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, visitedUrlsQuery);
}
public OptionalInt getGoodUrls(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, goodUrlsQuery);
}
public OptionalInt getDomainId(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, domainNameToId);
}
public List<String> getAllDomains() throws SQLException {
return executeListQuery(allDomainsQuery, 100_000);
}
public List<String> getCrawlQueueDomains() throws SQLException {
return executeListQuery(crawlQueueDomains, 100);
}
public List<String> getAllIndexedDomains() throws SQLException {
return executeListQuery(indexedDomainsQuery, 100_000);
}
private OptionalInt executeNameToIntQuery(String domainName, PreparedStatement statement)
throws SQLException {
statement.setString(1, domainName);
var rs = statement.executeQuery();
if (rs.next()) {
return OptionalInt.of(rs.getInt(1));
}
return OptionalInt.empty();
}
private List<String> executeListQuery(PreparedStatement statement, int sizeHint) throws SQLException {
List<String> ret = new ArrayList<>(sizeHint);
var rs = statement.executeQuery();
while (rs.next()) {
ret.add(rs.getString(1));
}
return ret;
}
@Override
public void close() throws SQLException {
knownUrlsQuery.close();
goodUrlsQuery.close();
visitedUrlsQuery.close();
allDomainsQuery.close();
crawlQueueDomains.close();
domainNameToId.close();
connection.close();
}
}

View File

@ -6,6 +6,7 @@ import com.google.inject.name.Named;
import gnu.trove.list.TLongList;
import nu.marginalia.linkdb.model.LdbUrlDetail;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,6 +62,35 @@ public class LinkdbReader {
connection = createConnection();
}
public List<String> getUrlsFromDomain(int domainId) throws SQLException {
if (connection == null ||
connection.isClosed())
{
throw new RuntimeException("URL query temporarily unavailable due to database switch");
}
long minId = UrlIdCodec.encodeId(domainId, 0);
long maxId = UrlIdCodec.encodeId(domainId+1, 0);
List<String> ret = new ArrayList<>();
try (var stmt = connection.prepareStatement("""
SELECT URL
FROM DOCUMENT
WHERE ID >= ? AND ID < ?
"""))
{
stmt.setLong(1, minId);
stmt.setLong(2, maxId);
var rs = stmt.executeQuery();
while (rs.next()) {
ret.add(rs.getString(1));
}
}
return ret;
}
public List<LdbUrlDetail> getUrlDetails(TLongList ids) throws SQLException {
List<LdbUrlDetail> ret = new ArrayList<>(ids.size());

View File

@ -1,7 +1,7 @@
plugins {
id 'java'
id "io.freefair.lombok" version "8.2.2"
id 'application'
id 'jvm-test-suite'
}
@ -10,31 +10,19 @@ java {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
application {
mainClass = 'nu.marginalia.crawl.CrawlJobExtractorMain'
applicationName = 'crawl-job-extractor-process'
}
tasks.distZip.enabled = false
dependencies {
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':code:common:service-discovery')
implementation project(':code:process-models:crawling-model')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation project(':third-party:parquet-floor')
implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation libs.notnull
implementation libs.trove
implementation libs.bundles.parquet
implementation libs.bundles.mariadb
implementation libs.guice
implementation libs.bundles.gson
implementation libs.zstd
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@ -0,0 +1,20 @@
package nu.marginalia.crawlspec;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageType;
import java.nio.file.Path;
public class CrawlSpecFileNames {
public static Path resolve(Path base) {
return base.resolve("crawl-spec.parquet");
}
public static Path resolve(FileStorage storage) {
if (storage.type() != FileStorageType.CRAWL_SPEC)
throw new IllegalArgumentException("Provided file storage is of unexpected type " +
storage.type() + ", expected CRAWL_SPEC");
return resolve(storage.asPath());
}
}

View File

@ -0,0 +1,139 @@
package nu.marginalia.crawlspec;
import nu.marginalia.db.DbDomainStatsExportMultitool;
import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileWriter;
import nu.marginalia.linkdb.LinkdbReader;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class CrawlSpecGenerator {
private static final int MIN_VISIT_COUNT = 200;
private static final int MAX_VISIT_COUNT = 100000;
public static void generateCrawlSpec(Path output,
DomainSource domains,
KnownUrlsCountSource counts,
KnownUrlsListSource listSource)
throws IOException, SQLException
{
try (var writer = new CrawlSpecRecordParquetFileWriter(output)) {
for (String domain : domains.getDomainNames()) {
domain = domain.toLowerCase();
writer.write(CrawlSpecRecord
.builder()
.crawlDepth(calculateCrawlDepthFromVisitedCount(
counts.getKnownUrlCount(domain)
))
.urls(listSource.getKnownUrls(domain))
.domain(domain)
.build());
}
}
}
private static int calculateCrawlDepthFromVisitedCount(int count) {
if (count < MIN_VISIT_COUNT / 2) {
/* If we aren't getting very many good documents
out of this webpage on previous attempts, we
won't dig very deeply this time. This saves time
and resources for both the crawler and the server,
and also prevents deep crawls on foreign websites we aren't
interested in crawling at this point. */
count = MIN_VISIT_COUNT;
}
else {
/* If we got many results previously, we'll
dig deeper with each successive crawl. */
count = count + 1000 + count / 4;
}
if (count > MAX_VISIT_COUNT) {
count = MAX_VISIT_COUNT;
}
return count;
}
public interface DomainSource {
List<String> getDomainNames() throws IOException, SQLException;
static DomainSource combined(DomainSource... sources) {
if (sources.length == 0) {
return List::of;
}
return () -> {
List<String> combined = new ArrayList<>(sources[0].getDomainNames());
for (int i = 1; i < sources.length; i++) {
combined.addAll(sources[i].getDomainNames());
}
return combined;
};
}
static DomainSource fromFile(Path file) {
return () -> {
var lines = Files.readAllLines(file);
lines.replaceAll(s -> s.trim().toLowerCase());
lines.removeIf(line -> line.isBlank() || line.startsWith("#"));
return lines;
};
}
static DomainSource knownUrlsFromDb(DbDomainStatsExportMultitool dbData) {
return dbData::getAllIndexedDomains;
}
static DomainSource fromCrawlQueue(DbDomainStatsExportMultitool dbData) {
return dbData::getCrawlQueueDomains;
}
}
public interface KnownUrlsCountSource {
int getKnownUrlCount(String domainName) throws SQLException;
static KnownUrlsCountSource fixed(int value) {
return domainName -> value;
}
static KnownUrlsCountSource fromDb(DbDomainStatsExportMultitool dbData, int defaultValue) {
return domainName ->
dbData.getVisitedUrls(domainName)
.orElse(defaultValue);
}
}
public interface KnownUrlsListSource {
List<String> getKnownUrls(String domainName) throws SQLException;
static KnownUrlsListSource justIndex() {
return domainName -> List.of(
"http://" + domainName + "/",
"https://" + domainName + "/"
);
}
static KnownUrlsListSource fromLinkdb(DbDomainStatsExportMultitool dbData,
LinkdbReader linkdbReader)
{
return domainName -> {
var maybeId = dbData.getDomainId(domainName);
if (maybeId.isEmpty())
return List.of();
return linkdbReader
.getUrlsFromDomain(maybeId.getAsInt());
};
}
}
}

View File

@ -0,0 +1,26 @@
package nu.marginalia.io.crawlspec;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.stream.Stream;
public class CrawlSpecRecordParquetFileReader {
@NotNull
public static Stream<CrawlSpecRecord> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(CrawlSpecRecord.newHydrator()));
}
public static int count(Path path) throws IOException {
try (var stream = stream(path)) {
// FIXME This can be done in a more performant way by using another hydrator that only reads a single field
return (int) stream.count();
}
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.io.crawlspec;
import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import java.io.IOException;
import java.nio.file.Path;
public class CrawlSpecRecordParquetFileWriter implements AutoCloseable {
private final ParquetWriter<CrawlSpecRecord> writer;
public CrawlSpecRecordParquetFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(CrawlSpecRecord.schema,
file.toFile(), CrawlSpecRecord.newDehydrator());
}
public void write(CrawlSpecRecord domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,90 @@
package nu.marginalia.model.crawlspec;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import lombok.*;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Builder
@ToString
public class CrawlSpecRecord {
@NotNull
public String domain;
/** Limit for how many documents will be crawled */
public int crawlDepth;
/** List of known URLs */
@Nullable
public List<String> urls;
public static Hydrator<CrawlSpecRecord, CrawlSpecRecord> newHydrator() {
return new CrawlSpecRecordHydrator();
}
public static Dehydrator<CrawlSpecRecord> newDehydrator() {
return CrawlSpecRecord::dehydrate;
}
public static MessageType schema = new MessageType(
CrawlSpecRecord.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("domain"),
Types.required(INT32).named("crawlDepth"),
Types.repeated(BINARY).as(stringType()).named("urls")
);
public void dehydrate(ValueWriter valueWriter) {
valueWriter.write("domain", domain);
valueWriter.write("crawlDepth", crawlDepth);
valueWriter.writeList("urls", urls);
}
public CrawlSpecRecord add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "crawlDepth" -> crawlDepth = (Integer) value;
case "urls" -> {
if (urls == null)
urls = new ArrayList<>();
urls.add((String) value);
}
}
return this;
}
}
class CrawlSpecRecordHydrator implements Hydrator<CrawlSpecRecord, CrawlSpecRecord> {
@Override
public CrawlSpecRecord start() {
return new CrawlSpecRecord();
}
@Override
public CrawlSpecRecord add(CrawlSpecRecord target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public CrawlSpecRecord finish(CrawlSpecRecord target) {
return target;
}
}

View File

@ -6,7 +6,6 @@ import com.google.gson.Gson;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.gson.GsonFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,8 +32,8 @@ public class CrawledDomainReader {
}
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
public SerializableCrawlDataStream createDataStream(Path basePath, CrawlingSpecification spec) throws IOException {
return createDataStream(CrawlerOutputFile.getOutputFile(basePath, spec.id, spec.domain));
public SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
return createDataStream(CrawlerOutputFile.getOutputFile(basePath, id, domain));
}
/** Read the entirety of the domain data into memory. This uses a lot of RAM */

View File

@ -5,7 +5,6 @@ import com.github.luben.zstd.ZstdOutputStream;
import com.google.gson.Gson;
import lombok.SneakyThrows;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.gson.GsonFactory;
import java.io.BufferedOutputStream;
@ -24,7 +23,7 @@ public class CrawledDomainWriter implements AutoCloseable {
private final Path tmpFile;
private final Path actualFile;
public CrawledDomainWriter(Path outputDir, CrawlingSpecification spec) throws IOException {
public CrawledDomainWriter(Path outputDir, String domain, String id) throws IOException {
this.outputDir = outputDir;
if (!Files.isDirectory(outputDir)) {
@ -36,8 +35,8 @@ public class CrawledDomainWriter implements AutoCloseable {
// this lets us read the old file and compare its contents while writing the new file. It also guards against
// half-written files if the process is killed.
tmpFile = getOutputFile(spec.id, spec.domain + "_tmp");
actualFile = getOutputFile(spec.id, spec.domain);
tmpFile = getOutputFile(id, domain + "_tmp");
actualFile = getOutputFile(id, domain);
writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(tmpFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)),
RecyclingBufferPool.INSTANCE));

View File

@ -1,18 +1,11 @@
package nu.marginalia.crawling.io;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class CrawlerOutputFile {
public static Path getOutputFile(Path base, CrawlingSpecification spec) {
return getOutputFile(base, spec.id, spec.domain);
}
/** Return the Path to a file for the given id and name */
public static Path getOutputFile(Path base, String id, String name) {
String first = id.substring(0, 2);

View File

@ -8,7 +8,6 @@ import java.util.List;
@AllArgsConstructor @Data @Builder
public class CrawledDomain implements SerializableCrawlData {
public String id;
public String domain;
public String redirectDomain;

View File

@ -1,42 +0,0 @@
package nu.marginalia.crawling.model.spec;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import com.google.gson.JsonStreamParser;
import lombok.SneakyThrows;
import nu.marginalia.model.gson.GsonFactory;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.util.Iterator;
public class CrawlerSpecificationLoader {
private final static Gson gson = GsonFactory.get();
@SneakyThrows
public static Iterable<CrawlingSpecification> asIterable(Path inputSpec) {
var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile()),
RecyclingBufferPool.INSTANCE)));
var parser = new JsonStreamParser(inputStream);
return () -> new Iterator<>() {
@Override
@SneakyThrows
public boolean hasNext() {
if (!parser.hasNext()) {
inputStream.close();
return false;
}
return true;
}
@Override
public CrawlingSpecification next() {
return gson.fromJson(parser.next(), CrawlingSpecification.class);
}
};
}
}

View File

@ -1,25 +0,0 @@
package nu.marginalia.crawling.model.spec;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.crawling.model.CrawledDomain;
import java.util.List;
@AllArgsConstructor @NoArgsConstructor @Builder @With
public class CrawlingSpecification {
public String id;
public int crawlDepth;
// Don't make this EdgeUrl, EdgeDomain etc. -- we want this plastic to change!
public String domain;
public List<String> urls;
@Override
public String toString() {
return String.format(getClass().getSimpleName() + "[" + id + "/" + domain + ": " + crawlDepth + "[ " + urls.size() + "]");
}
}

View File

@ -6,8 +6,6 @@ import lombok.ToString;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.spec.CrawlerSpecificationLoader;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.process.log.WorkLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,7 +14,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.Optional;
@AllArgsConstructor @NoArgsConstructor @ToString
@ -74,10 +71,6 @@ public class CrawlPlan {
return new WorkLog(process.getLogFile());
}
public Iterable<CrawlingSpecification> crawlingSpecificationIterable() {
return CrawlerSpecificationLoader.asIterable(getJobSpec());
}
public int countCrawledDomains() {
int count = 0;
for (var ignored : WorkLog.iterable(crawl.getLogFile())) {

View File

@ -57,6 +57,7 @@ dependencies {
implementation project(':code:features-crawl:link-parser')
testImplementation project(':code:libraries:term-frequency-dict')
testImplementation project(':code:process-models:crawl-spec')
implementation libs.lombok
annotationProcessor libs.lombok

View File

@ -15,7 +15,6 @@ public class ProcessedDomain {
public DomainIndexingState state;
public EdgeDomain redirect;
public String ip;
public String id;
public int size() {
return Optional.ofNullable(documents).map(List::size).orElse(1);

View File

@ -48,7 +48,6 @@ public class DomainProcessor {
if (data instanceof CrawledDomain crawledDomain) {
ret.domain = new EdgeDomain(crawledDomain.domain);
ret.ip = crawledDomain.ip;
ret.id = crawledDomain.id;
cookies = Objects.requireNonNullElse(crawledDomain.cookies, Collections.emptyList()).size() > 0;
ip = crawledDomain.ip;

View File

@ -57,7 +57,6 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
var ret = new ProcessedDomain();
ret.domain = new EdgeDomain("encyclopedia.marginalia.nu");
ret.id = "encyclopedia.marginalia.nu";
ret.ip = "127.0.0.1";
ret.state = DomainIndexingState.ACTIVE;

View File

@ -48,7 +48,6 @@ public class StackexchangeSideloader implements SideloadSource {
var ret = new ProcessedDomain();
ret.domain = new EdgeDomain(domainName);
ret.id = domainName;
ret.ip = "127.0.0.1";
ret.state = DomainIndexingState.ACTIVE;

View File

@ -62,7 +62,7 @@ public class ConverterWriter implements AutoCloseable {
if (data == null)
continue;
String id = data.id;
String id = data.domain.toString();
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id);

View File

@ -41,7 +41,7 @@ public class ConvertingIntegrationTest {
public void testEmptyDomain() {
var docs = new ArrayList<CrawledDocument>();
var domain = new CrawledDomain("123", "memex.marginalia.nu", null, "OK", "-", "127.0.0.1",
var domain = new CrawledDomain("memex.marginalia.nu", null, "OK", "-", "127.0.0.1",
docs, Collections.emptyList());
var ret = domainProcessor.process(asSerializableCrawlData(domain));
@ -120,7 +120,6 @@ public class ConvertingIntegrationTest {
}
return new CrawledDomain(
"1",
"memex.marginalia.nu",
null,
"OK",

View File

@ -12,7 +12,7 @@ import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
@ -47,8 +47,7 @@ public class CrawlingThenConvertingIntegrationTest {
@Test
public void crawlThenProcess() {
var specs = CrawlingSpecification.builder()
.id("some-string")
var specs = CrawlSpecRecord.builder()
.domain("www.marginalia.nu")
.crawlDepth(10)
.urls(List.of()) // add specific URLs to crawl here
@ -73,7 +72,7 @@ public class CrawlingThenConvertingIntegrationTest {
}
private CrawledDomain crawl(CrawlingSpecification specs) {
private CrawledDomain crawl(CrawlSpecRecord specs) {
List<SerializableCrawlData> data = new ArrayList<>();
new CrawlerRetreiver(httpFetcher, specs, data::add).fetch();

View File

@ -34,6 +34,7 @@ dependencies {
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh')
implementation project(':code:process-models:crawling-model')
implementation project(':code:process-models:crawl-spec')
implementation project(':code:features-crawl:crawl-blocklist')
implementation project(':code:features-crawl:link-parser')

View File

@ -9,7 +9,10 @@ import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileReader;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
@ -17,9 +20,7 @@ import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import plan.CrawlPlan;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import okhttp3.ConnectionPool;
@ -103,7 +104,7 @@ public class CrawlerMain {
var instructions = crawler.fetchInstructions();
try {
crawler.run(instructions.getPlan());
crawler.run(instructions.crawlSpec, instructions.outputDir);
instructions.ok();
}
catch (Exception ex) {
@ -117,43 +118,24 @@ public class CrawlerMain {
System.exit(0);
}
public void run(CrawlPlan plan) throws InterruptedException, IOException {
public void run(Path crawlSpec, Path outputDir) throws InterruptedException, IOException {
heartbeat.start();
try (WorkLog workLog = plan.createCrawlWorkLog()) {
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"))) {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
crawlDataDir = plan.crawl.getDir();
int countTotal = 0;
for (var unused : plan.crawlingSpecificationIterable()) {
countTotal++;
}
totalTasks = countTotal;
totalTasks = CrawlSpecRecordParquetFileReader.count(crawlSpec);
logger.info("Let's go");
for (var crawlingSpecification : plan.crawlingSpecificationIterable()) {
if (!abortMonitor.isAlive())
break;
// Check #1: Have we already crawled this site? Check is necessary for resuming a craw after a crash or something
if (workLog.isJobFinished(crawlingSpecification.id)) {
continue;
}
// Check #2: Have we already started this crawl (but not finished it)?
// This shouldn't realistically happen, but if it does, we need to ignore it, otherwise
// we'd end crawling the same site twice and might end up writing to the same output
// file from multiple threads with complete bit salad as a result.
if (processingIds.put(crawlingSpecification.id, "") != null) {
logger.error("Ignoring duplicate id: {}", crawlingSpecification.id);
continue;
}
pool.submit(new CrawlTask(crawlingSpecification, workLog));
try (var specStream = CrawlSpecRecordParquetFileReader.stream(crawlSpec)) {
specStream
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, workLog))
.forEach(pool::submitQuietly);
}
logger.info("Shutting down the pool, waiting for tasks to complete...");
@ -170,12 +152,19 @@ public class CrawlerMain {
class CrawlTask implements DumbThreadPool.Task {
private final CrawlingSpecification specification;
private final CrawlSpecRecord specification;
private final String domain;
private final String id;
private final WorkLog workLog;
CrawlTask(CrawlingSpecification specification, WorkLog workLog) {
CrawlTask(CrawlSpecRecord specification, WorkLog workLog) {
this.specification = specification;
this.workLog = workLog;
this.domain = specification.domain;
this.id = Integer.toHexString(domain.hashCode());
}
@Override
@ -185,15 +174,15 @@ public class CrawlerMain {
HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);
try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification);
CrawlDataReference reference = getReference(specification))
try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, domain, id);
CrawlDataReference reference = getReference())
{
Thread.currentThread().setName("crawling:" + specification.domain);
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);
int size = retreiver.fetch(reference);
workLog.setJobToFinished(specification.id, writer.getOutputFile().toString(), size);
workLog.setJobToFinished(specification.domain, writer.getOutputFile().toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", specification.domain);
@ -203,14 +192,14 @@ public class CrawlerMain {
}
finally {
// We don't need to double-count these; it's also kept int he workLog
processingIds.remove(specification.id);
processingIds.remove(domain);
Thread.currentThread().setName("[idle]");
}
}
private CrawlDataReference getReference(CrawlingSpecification specification) {
private CrawlDataReference getReference() {
try {
var dataStream = reader.createDataStream(crawlDataDir, specification);
var dataStream = reader.createDataStream(crawlDataDir, domain, id);
return new CrawlDataReference(dataStream);
} catch (IOException e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain);
@ -223,19 +212,18 @@ public class CrawlerMain {
private static class CrawlRequest {
private final CrawlPlan plan;
private final Path crawlSpec;
private final Path outputDir;
private final MqMessage message;
private final MqSingleShotInbox inbox;
CrawlRequest(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
this.plan = plan;
CrawlRequest(Path crawlSpec, Path outputDir, MqMessage message, MqSingleShotInbox inbox) {
this.message = message;
this.inbox = inbox;
this.crawlSpec = crawlSpec;
this.outputDir = outputDir;
}
public CrawlPlan getPlan() {
return plan;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
@ -259,11 +247,11 @@ public class CrawlerMain {
var specData = fileStorageService.getStorage(request.specStorage);
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var plan = new CrawlPlan(specData.asPath().resolve("crawler.spec").toString(),
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
null);
return new CrawlRequest(plan, msg, inbox);
return new CrawlRequest(
CrawlSpecFileNames.resolve(specData),
crawlData.asPath(),
msg,
inbox);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {

View File

@ -38,7 +38,14 @@ public class DumbThreadPool {
public void submit(Task task) throws InterruptedException {
tasks.put(task);
}
public void submitQuietly(Task task) {
try {
tasks.put(task);
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
public void shutDown() {
this.shutDown = true;
}

View File

@ -7,12 +7,12 @@ import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.*;
import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.slf4j.Logger;
@ -31,7 +31,6 @@ public class CrawlerRetreiver {
private final HttpFetcher fetcher;
private final String id;
private final String domain;
private final Consumer<SerializableCrawlData> crawledDomainWriter;
@ -55,16 +54,15 @@ public class CrawlerRetreiver {
private static final String documentWasSameTag = "SAME-BY-COMPARISON";
public CrawlerRetreiver(HttpFetcher fetcher,
CrawlingSpecification specs,
CrawlSpecRecord specs,
Consumer<SerializableCrawlData> writer) {
this.fetcher = fetcher;
id = specs.id;
domain = specs.domain;
crawledDomainWriter = writer;
this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), specs.urls, specs.crawlDepth);
this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth);
sitemapRetriever = fetcher.createSitemapRetriever();
// We must always crawl the index page first, this is assumed when fingerprinting the server
@ -102,7 +100,6 @@ public class CrawlerRetreiver {
CrawledDomain.builder()
.crawlerStatus(err.status().name())
.crawlerStatusDesc(err.desc())
.id(id)
.domain(domain)
.ip(ip)
.build()
@ -116,7 +113,6 @@ public class CrawlerRetreiver {
.crawlerStatus(CrawlerDomainStatus.REDIRECT.name())
.crawlerStatusDesc("Redirected to different domain")
.redirectDomain(redirect.domain().toString())
.id(id)
.domain(domain)
.ip(ip)
.build()
@ -147,7 +143,7 @@ public class CrawlerRetreiver {
downloadSitemaps(robotsRules);
CrawledDomain ret = new CrawledDomain(id, domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null);
CrawledDomain ret = new CrawledDomain(domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null);
int fetchedCount = recrawled;

View File

@ -2,15 +2,14 @@ package nu.marginalia.crawling.retreival;
import crawlercommons.robots.SimpleRobotRules;
import lombok.SneakyThrows;
import nu.marginalia.bigstring.BigString;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.*;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import nu.marginalia.test.CommonTestData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@ -69,7 +68,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "startrek.website", new ArrayList<>()), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()), out::add)
.fetch();
out.forEach(System.out::println);
@ -81,7 +80,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 10, "en.wikipedia.org", new ArrayList<>()), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()), out::add)
.fetch();
out.forEach(System.out::println);
@ -95,7 +94,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
new CrawlerRetreiver(fetcherMock, new CrawlingSpecification("1", 100, "community.tt-rss.org", new ArrayList<>()), out::add)
new CrawlerRetreiver(fetcherMock, new CrawlSpecRecord("community.tt-rss.org", 100, new ArrayList<>()), out::add)
.fetch();
out.forEach(System.out::println);

View File

@ -10,8 +10,8 @@ import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.junit.jupiter.api.*;
import java.io.IOException;
@ -43,9 +43,8 @@ class CrawlerRetreiverTest {
@Test
public void testWithKnownDomains() {
var specs = CrawlingSpecification
var specs = CrawlSpecRecord
.builder()
.id("whatever")
.crawlDepth(5)
.domain("www.marginalia.nu")
.urls(List.of("https://www.marginalia.nu/misc/debian-laptop-install-log/"))
@ -73,9 +72,8 @@ class CrawlerRetreiverTest {
@Test
public void testEmptySet() {
var specs = CrawlingSpecification
var specs = CrawlSpecRecord
.builder()
.id("whatever")
.crawlDepth(5)
.domain("www.marginalia.nu")
.urls(List.of())
@ -107,9 +105,8 @@ class CrawlerRetreiverTest {
@Test
public void testRecrawl() throws IOException {
var specs = CrawlingSpecification
var specs = CrawlSpecRecord
.builder()
.id("123456")
.crawlDepth(12)
.domain("www.marginalia.nu")
.urls(List.of("https://www.marginalia.nu/some-dead-link"))
@ -117,7 +114,7 @@ class CrawlerRetreiverTest {
Path out = Files.createTempDirectory("crawling-process");
var writer = new CrawledDomainWriter(out, specs);
var writer = new CrawledDomainWriter(out, specs.domain, "idid");
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
new CrawlerRetreiver(httpFetcher, specs, d -> {
@ -133,7 +130,7 @@ class CrawlerRetreiverTest {
writer.close();
var reader = new CrawledDomainReader();
var stream = reader.createDataStream(out, specs);
var stream = reader.createDataStream(out, specs.domain, "idid");
CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0);
domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList());

View File

@ -35,7 +35,6 @@ Processes are batch jobs that deal with data retrieval, processing and loading.
#### Tools
* * [crawl-job-extractor](tools/crawl-job-extractor)
* * [term-frequency-extractor](tools/term-frequency-extractor)
### Features

View File

@ -37,6 +37,7 @@ dependencies {
implementation project(':code:api:process-mqapi')
implementation project(':code:features-search:screenshots')
implementation project(':code:features-index:index-journal')
implementation project(':code:process-models:crawl-spec')
implementation libs.lombok
annotationProcessor libs.lombok

View File

@ -2,11 +2,13 @@ package nu.marginalia.control.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.svc.ControlFileStorageService;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.crawlspec.CrawlSpecGenerator;
import nu.marginalia.db.DbDomainStatsExportMultitool;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
@ -19,9 +21,8 @@ import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static nu.marginalia.crawlspec.CrawlSpecGenerator.*;
@Singleton
public class CrawlJobExtractorActor extends AbstractActorPrototype {
@ -32,21 +33,20 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype {
public static final String CREATE_FROM_DB = "CREATE_FROM_DB";
public static final String CREATE_FROM_LINK = "CREATE_FROM_LINK";
public static final String END = "END";
private final ProcessService processService;
private final FileStorageService fileStorageService;
private final ControlFileStorageService controlFileStorageService;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final HikariDataSource dataSource;
@Inject
public CrawlJobExtractorActor(ActorStateFactory stateFactory,
ProcessService processService,
FileStorageService fileStorageService,
ControlFileStorageService controlFileStorageService
ControlFileStorageService controlFileStorageService,
HikariDataSource dataSource
) {
super(stateFactory);
this.processService = processService;
this.fileStorageService = fileStorageService;
this.controlFileStorageService = controlFileStorageService;
this.dataSource = dataSource;
}
public record CrawlJobExtractorArguments(String description) { }
@ -85,10 +85,14 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype {
error("Error downloading " + arg.url());
}
final Path path = storage.asPath();
final Path path = CrawlSpecFileNames.resolve(storage);
run(storage, path.resolve("crawler.spec").toString(),
"-f", urlsTxt.toString());
generateCrawlSpec(
path,
DomainSource.fromFile(urlsTxt),
KnownUrlsCountSource.fixed(200),
KnownUrlsListSource.justIndex()
);
}
@ -106,30 +110,18 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype {
var base = fileStorageService.getStorageBase(FileStorageBaseType.SLOW);
var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description());
final Path path = storage.asPath();
final Path path = CrawlSpecFileNames.resolve(storage);
run(storage,
path.resolve("crawler.spec").toString());
}
private void run(FileStorage storage, String... args) throws Exception {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR,
args);
}
catch (Exception ex) {
logger.warn("Error in creating crawl job", ex);
hasError.set(true);
}
});
future.get();
if (hasError.get()) {
controlFileStorageService.flagFileForDeletion(storage.id());
error("Error triggering adjacency calculation");
try (var dbTools = new DbDomainStatsExportMultitool(dataSource)) {
generateCrawlSpec(
path,
DomainSource.combined(
DomainSource.knownUrlsFromDb(dbTools),
DomainSource.fromCrawlQueue(dbTools)
),
KnownUrlsCountSource.fromDb(dbTools, 200),
KnownUrlsListSource.justIndex() // TODO: hook in linkdb maybe?
);
}
}

View File

@ -43,7 +43,6 @@ public record ProcessHeartbeat(
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "crawl-job-extractor" -> ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR;
case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR;
default -> null;
};

View File

@ -33,8 +33,7 @@ public class ProcessService {
CONVERTER("converter-process/bin/converter-process"),
LOADER("loader-process/bin/loader-process"),
INDEX_CONSTRUCTOR("index-construction-process/bin/index-construction-process"),
ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator"),
CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process")
ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator")
;
public final String path;

View File

@ -1,51 +0,0 @@
# Crawl Job Extractor
The crawl job extractor creates a file containing a list of domains
along with known URLs.
This is consumed by [processes/crawling-process](../../processes/crawling-process).
## Usage
The crawl job extractor has three modes of operation:
```
# 1 grab domains from the database
./crawl-job-extractor file.out
# 2 grab domains from a file
./crawl-job-extractor file.out -f domains.txt
# 3 grab domains from the command line
./crawl-job-extractor file.out domain1 domain2 ...
```
* When only a single argument is passed, the file name to write to, it will create a complete list of domains
and URLs known to the system from the list of already indexed domains,
as well as domains from the CRAWL_QUEUE table in the database.
* When the command line is passed like `./crawl-job-extractor output-file -f domains.txt`,
domains will be read from non-blank and non-comment lines in the file.
* In other cases, the 2nd argument onward to the command will be interpreted as domain-names.
In the last two modes, if the crawl-job-extractor is able to connect to the database, it will use
information from the link database to populate the list of URLs for each domain, otherwise it will
create a spec with only the domain name and the index address, so the crawler will have to figure out
the rest.
The crawl-specification is zstd-compressed json.
## Tricks
### Joining two specifications
Two or more specifications can be joined with a shell command on the form
```shell
$ zstdcat file1 file2 | zstd -o new-file
```
### Inspection
The file can also be inspected with `zstdless`,
or combinations like `zstdcat file | jq`

View File

@ -1,237 +0,0 @@
package nu.marginalia.crawl;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.model.EdgeDomain;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
public class CrawlJobDomainExtractor {
private static final int MIN_VISIT_COUNT = 200;
private static final int MAX_VISIT_COUNT = 100000;
private static final String specificDomainSql =
"""
SELECT ID
FROM EC_DOMAIN
WHERE DOMAIN_NAME=?
""";
private static final String domainsSql =
"""
SELECT ID, LOWER(EC_DOMAIN.DOMAIN_NAME)
FROM EC_DOMAIN
WHERE INDEXED>0
AND STATE='ACTIVE' OR STATE='EXHAUSTED'
ORDER BY
INDEX_DATE ASC,
DISCOVER_DATE ASC,
STATE DESC,
INDEXED DESC,
EC_DOMAIN.ID
""";
private static final String queuedDomainsSql =
"""
SELECT IFNULL(ID, -1), LOWER(CRAWL_QUEUE.DOMAIN_NAME)
FROM CRAWL_QUEUE
LEFT JOIN EC_DOMAIN
ON CRAWL_QUEUE.DOMAIN_NAME=EC_DOMAIN.DOMAIN_NAME
""";
private static final String urlsSql =
"""
SELECT URL
FROM EC_URL_VIEW
WHERE DOMAIN_ID=?
ORDER BY
VISITED DESC,
DATA_HASH IS NOT NULL DESC,
ID
LIMIT 25000
""";
private static final String visitedUrlsSql =
"""
SELECT COUNT(*)
FROM EC_URL
WHERE DOMAIN_ID=?
AND VISITED
;
""";
private final DomainBlacklist blacklist;
private final HikariDataSource dataSource;
private static final HashFunction hasher = Hashing.murmur3_128(0);
public CrawlJobDomainExtractor(DomainBlacklist blacklist, HikariDataSource dataSource) {
this.blacklist = blacklist;
this.dataSource = dataSource;
}
public Stream<CrawlingSpecification> extractDomainsFromQueue() {
Set<DomainWithId> ids = new HashSet<>(1_000_000);
try (var conn = dataSource.getConnection();
var stmtDomains = conn.prepareStatement(domainsSql);
var stmtQueue = conn.prepareStatement(queuedDomainsSql);
) {
ResultSet rsp;
stmtDomains.setFetchSize(10_000);
rsp = stmtDomains.executeQuery();
while (rsp.next()) {
ids.add(new DomainWithId(rsp.getString(2), rsp.getInt(1)));
}
stmtQueue.setFetchSize(10_000);
rsp = stmtQueue.executeQuery();
while (rsp.next()) {
ids.add(new DomainWithId(rsp.getString(2), rsp.getInt(1)));
}
}
catch (SQLException ex) {
ex.printStackTrace();
}
return ids.stream()
.filter(id -> !blacklist.isBlacklisted(id.id))
.map(this::createCrawlJobForDomain);
}
public CrawlingSpecification extractNewDomain(EdgeDomain domain) {
CrawlingSpecification spec = new CrawlingSpecification();
spec.domain = domain.toString();
spec.id = createId(domain);
spec.urls = new ArrayList<>(1000);
spec.urls.add("https://"+domain+"/");
spec.crawlDepth = MIN_VISIT_COUNT;
return spec;
}
public CrawlingSpecification extractKnownDomain(EdgeDomain domain) {
CrawlingSpecification spec = new CrawlingSpecification();
spec.domain = domain.toString();
spec.id = createId(domain);
spec.urls = new ArrayList<>(1000);
try (var conn = dataSource.getConnection();
var domainQuery = conn.prepareStatement(specificDomainSql);
var urlQuery = conn.prepareStatement(urlsSql))
{
domainQuery.setString(1, domain.toString());
ResultSet rsp = domainQuery.executeQuery();
int domainId = rsp.next() ? rsp.getInt(1) : -1;
spec.crawlDepth = getCrawlDepth(new DomainWithId(domain.toString(), domainId));
urlQuery.setString(1, domain.toString());
urlQuery.setInt(2, domainId);
urlQuery.setFetchSize(1000);
rsp = urlQuery.executeQuery();
while (rsp.next()) {
spec.urls.add(rsp.getString(1));
}
} catch (SQLException e) {
e.printStackTrace();
}
if (spec.urls.isEmpty()) {
spec.urls.add("https://"+domain+"/");
}
return spec;
}
private record DomainWithId(String domainName, int id) {
}
private CrawlingSpecification createCrawlJobForDomain(DomainWithId domainWithId) {
var spec = new CrawlingSpecification();
spec.id = createId(domainWithId);
spec.domain = domainWithId.domainName;
spec.urls = new ArrayList<>();
spec.crawlDepth = getCrawlDepth(domainWithId);
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(urlsSql)) {
stmt.setFetchSize(1000);
stmt.setInt(1, domainWithId.id);
var rsp = stmt.executeQuery();
while (rsp.next()) {
spec.urls.add(rsp.getString(1));
}
}
catch (SQLException ex) {
ex.printStackTrace();
}
spec.urls.sort(Comparator.naturalOrder());
return spec;
}
private static String createId(DomainWithId domainWithId) {
return hasher.hashUnencodedChars(domainWithId.domainName).toString();
}
private static String createId(EdgeDomain domain) {
return hasher.hashUnencodedChars(domain.toString()).toString();
}
private int getCrawlDepth(DomainWithId domainWithId) {
try (var conn = dataSource.getConnection();
var domainQuery = conn.prepareStatement(visitedUrlsSql)) {
domainQuery.setInt(1, domainWithId.id);
var rsp = domainQuery.executeQuery();
if (rsp.next()) {
return calculateCrawlDepthFromVisitedCount(rsp.getInt(1));
}
} catch (SQLException e) {
e.printStackTrace();
}
return MIN_VISIT_COUNT;
}
private int calculateCrawlDepthFromVisitedCount(int count) {
if (count < MIN_VISIT_COUNT / 2) {
/* If we aren't getting very many good documents
out of this webpage on previous attempts, we
won't dig very deeply this time. This saves time
and resources for both the crawler and the server,
and also prevents deep crawls on foreign websites we aren't
interested in crawling at this point. */
count = MIN_VISIT_COUNT;
}
else {
/* If we got many results previously, we'll
dig deeper with each successive crawl. */
count = count + 1000 + count / 4;
}
if (count > MAX_VISIT_COUNT) {
count = MAX_VISIT_COUNT;
}
return count;
}
}

View File

@ -1,92 +0,0 @@
package nu.marginalia.crawl;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.service.ServiceHomeNotConfiguredException;
import nu.marginalia.service.module.DatabaseModule;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;
public class CrawlJobExtractorMain {
public static void main(String... args) throws IOException {
if (args.length == 0) {
System.out.println("Parameters: outputfile.spec [-f domains.txt] | [domain1, domain2, ...]");
System.out.println();
System.out.println("If no domains are provided, a full crawl spec is created from the database");
return;
}
Path outFile = Path.of(args[0]);
if (Files.exists(outFile)) {
System.err.println("Out file " + outFile + " already exists, remove it first!");
return;
}
// TODO (2023-06-26) figure out whether this needs a ProcessHeartbeat
String[] targetDomains = getTargetDomains(Arrays.copyOfRange(args, 1, args.length));
try (CrawlJobSpecWriter out = new CrawlJobSpecWriter(outFile))
{
streamSpecs(targetDomains).forEach(out::accept);
}
System.out.println("All done! Wrote " + outFile);
}
private static String[] getTargetDomains(String[] strings) throws IOException {
if (strings.length == 0)
return strings;
if (strings.length == 2 && "-f".equals(strings[0])) {
Path file = Path.of(strings[1]);
System.out.println("Reading domains from " + file);
try (var lines = Files.lines(file)) {
return lines
.filter(s -> !s.isBlank())
.filter(s -> !s.startsWith("#"))
.map(String::trim)
.map(String::toLowerCase)
.toArray(String[]::new);
}
}
return strings;
}
private static Stream<CrawlingSpecification> streamSpecs(String[] targetDomains) {
if (targetDomains.length > 0) {
try {
var dataSource = new DatabaseModule().provideConnection();
var domainExtractor = new CrawlJobDomainExtractor(new DomainBlacklistImpl(dataSource), dataSource);
return Arrays.stream(targetDomains).map(EdgeDomain::new).map(domainExtractor::extractKnownDomain);
}
catch (ServiceHomeNotConfiguredException ex) {
System.err.println("""
Could not connect to database, running crawl job creation in bootstrap mode.
This means that the crawl job will be created without any knowledge of the domains in the database.
If this is not desirable, ensure that WMSA_HOME is configured and that the database is running.
""");
var domainExtractor = new CrawlJobDomainExtractor(domain -> false, null);
return Arrays.stream(targetDomains).map(EdgeDomain::new).map(domainExtractor::extractNewDomain);
}
} else {
var ds = new DatabaseModule().provideConnection();
var domainExtractor = new CrawlJobDomainExtractor(new DomainBlacklistImpl(ds), ds);
return domainExtractor.extractDomainsFromQueue();
}
}
}

View File

@ -1,27 +0,0 @@
package nu.marginalia.crawl;
import com.github.luben.zstd.ZstdOutputStream;
import com.google.gson.Gson;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import nu.marginalia.model.gson.GsonFactory;
import java.io.*;
import java.nio.file.Path;
public class CrawlJobSpecWriter implements AutoCloseable {
private final PrintWriter writer;
private final Gson gson = GsonFactory.get();
public CrawlJobSpecWriter(Path fileName) throws IOException {
writer = new PrintWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(fileName.toFile()))));
}
public void accept(CrawlingSpecification crawlingSpecification) {
gson.toJson(crawlingSpecification, writer);
}
public void close() {
writer.close();
}
}

View File

@ -1,46 +0,0 @@
package nu.marginalia.crawl;
import nu.marginalia.crawling.model.spec.CrawlerSpecificationLoader;
import nu.marginalia.crawling.model.spec.CrawlingSpecification;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class CrawlJobSpecWriterTest {
Path tempFile;
@BeforeEach
public void setUp() throws IOException {
tempFile = Files.createTempFile(getClass().getSimpleName(), "tmp");
}
@AfterEach
public void tearDown() throws IOException {
Files.delete(tempFile);
}
@Test
public void testReadWrite() throws IOException {
try (CrawlJobSpecWriter writer = new CrawlJobSpecWriter(tempFile)) {
writer.accept(new CrawlingSpecification("first",1, "test1", List.of("a", "b", "c")));
writer.accept(new CrawlingSpecification("second",1, "test2", List.of("a", "b", "c", "d")));
writer.accept(new CrawlingSpecification("third",1, "test3", List.of("a", "b")));
}
List<CrawlingSpecification> outputs = new ArrayList<>();
for (var item : CrawlerSpecificationLoader.asIterable(tempFile)) {
outputs.add(item);
}
assertEquals(outputs.size(), 3);
}
}

View File

@ -46,16 +46,18 @@ public class ExperimentRunnerMain {
experiment.args(Arrays.copyOfRange(args, 2, args.length));
Map<String, String> idToDomain = new HashMap<>();
for (var spec : plan.crawlingSpecificationIterable()) {
idToDomain.put(spec.id, spec.domain);
}
// FIXME: This is broken
for (var domain : plan.domainsIterable(id -> experiment.isInterested(idToDomain.get(id)))) {
experiment.process(domain);
}
experiment.onFinish();
// Map<String, String> idToDomain = new HashMap<>();
// for (var spec : plan.crawlingSpecificationIterable()) {
// idToDomain.put(spec.id, spec.domain);
// }
//
// for (var domain : plan.domainsIterable(id -> experiment.isInterested(idToDomain.get(id)))) {
// experiment.process(domain);
// }
//
// experiment.onFinish();
}
}

View File

@ -65,10 +65,10 @@ include 'code:processes:test-data'
include 'code:process-models:crawling-model'
include 'code:process-models:work-log'
include 'code:process-models:crawl-spec'
include 'code:process-models:processed-data'
include 'code:tools:term-frequency-extractor'
include 'code:tools:crawl-job-extractor'
include 'code:tools:experiment-runner'
include 'code:tools:website-adjacencies-calculator'
include 'code:tools:screenshot-capture-tool'