diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java index 76e37acc..15f54d23 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java @@ -1,5 +1,6 @@ package nu.marginalia.crawling.io; +import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; import nu.marginalia.crawling.model.CrawledDocument; @@ -37,7 +38,7 @@ public class CrawledDomainReader { public CrawledDomain read(Path path) throws IOException { DomainDataAssembler domainData = new DomainDataAssembler(); - try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(path.toFile()))))) { + try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(path.toFile()), RecyclingBufferPool.INSTANCE)))) { String line; while ((line = br.readLine()) != null) { if (line.startsWith("//")) { @@ -105,7 +106,7 @@ public class CrawledDomainReader { public FileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException { this.gson = gson; - bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file)))); + bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE))); } @Override @@ -124,9 +125,15 @@ public class CrawledDomainReader { return true; String identifier = bufferedReader.readLine(); - if (identifier == null) return false; + if (identifier == null) { + bufferedReader.close(); + return false; + } String data = bufferedReader.readLine(); - if (data == null) return false; + if (data == null) { + bufferedReader.close(); + return false; + } if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { next = gson.fromJson(data, CrawledDomain.class); diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java index f431538c..bc83c10b 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java @@ -1,5 +1,6 @@ package nu.marginalia.crawling.io; +import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdOutputStream; import com.google.gson.Gson; import lombok.SneakyThrows; @@ -38,7 +39,8 @@ public class CrawledDomainWriter implements AutoCloseable { tmpFile = getOutputFile(spec.id, spec.domain + "_tmp"); actualFile = getOutputFile(spec.id, spec.domain); writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(tmpFile, - StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)))); + StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)), + RecyclingBufferPool.INSTANCE)); } public Path getOutputFile() { diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java index e68526b1..7d4fb28c 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java @@ -8,7 +8,7 @@ import java.util.Iterator; /** Closable iterator over serialized crawl data * The data may appear in any order, and the iterator must be closed. * */ -public interface SerializableCrawlDataStream { +public interface SerializableCrawlDataStream extends AutoCloseable { static SerializableCrawlDataStream empty() { return new SerializableCrawlDataStream() { @Override @@ -20,6 +20,8 @@ public interface SerializableCrawlDataStream { public boolean hasNext() throws IOException { return false; } + + public void close() {} }; } @@ -35,6 +37,8 @@ public interface SerializableCrawlDataStream { public boolean hasNext() throws IOException { return iterator.hasNext(); } + + public void close() {} }; } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java index 2ea956d5..d5d4e482 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/model/spec/CrawlerSpecificationLoader.java @@ -1,5 +1,6 @@ package nu.marginalia.crawling.model.spec; +import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; import com.google.gson.JsonStreamParser; @@ -17,7 +18,8 @@ public class CrawlerSpecificationLoader { @SneakyThrows public static Iterable asIterable(Path inputSpec) { - var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile())))); + var inputStream = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(inputSpec.toFile()), + RecyclingBufferPool.INSTANCE))); var parser = new JsonStreamParser(inputStream); return () -> new Iterator<>() { diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java index 58aa8b04..2c2ffb95 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConversionLog.java @@ -1,5 +1,6 @@ package nu.marginalia.converting; +import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdOutputStream; import nu.marginalia.model.crawl.DomainIndexingState; import nu.marginalia.model.idx.DocumentMetadata; @@ -27,8 +28,7 @@ public class ConversionLog implements AutoCloseable, Interpreter { String fileName = String.format("conversion-log-%s.zstd", LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)); Path logFile = rootDir.resolve(fileName); - writer = new PrintWriter(new ZstdOutputStream( - new BufferedOutputStream(Files.newOutputStream(logFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE)))); + writer = new PrintWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(logFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE)), RecyclingBufferPool.INSTANCE)); } @Override diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index 438bee66..835f3b0e 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -55,7 +55,7 @@ public class CrawlerMain implements AutoCloseable { private final Gson gson; private final DumbThreadPool pool; - private final Set processedIds = new HashSet<>(); + private final Set processingIds = new HashSet<>(); final AbortMonitor abortMonitor = AbortMonitor.getInstance(); @@ -92,6 +92,9 @@ public class CrawlerMain implements AutoCloseable { System.setProperty("sun.net.client.defaultConnectTimeout", "30000"); System.setProperty("sun.net.client.defaultReadTimeout", "30000"); + // We don't want to use too much memory caching sessions for https + System.setProperty("javax.net.ssl.sessionCacheSize", "2048"); + Injector injector = Guice.createInjector( new CrawlerModule(), new DatabaseModule() @@ -154,7 +157,7 @@ public class CrawlerMain implements AutoCloseable { private void startCrawlTask(CrawlPlan plan, CrawlingSpecification crawlingSpecification) { - if (!processedIds.add(crawlingSpecification.id)) { + if (workLog.isJobFinished(crawlingSpecification.id) || !processingIds.add(crawlingSpecification.id)) { // This is a duplicate id, so we ignore it. Otherwise we'd end crawling the same site twice, // and if we're really unlucky, we might end up writing to the same output file from multiple @@ -193,11 +196,10 @@ public class CrawlerMain implements AutoCloseable { HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool); - try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification)) { + try (CrawledDomainWriter writer = new CrawledDomainWriter(crawlDataDir, specification); + CrawlDataReference reference = getReference(specification)) + { var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept); - - CrawlDataReference reference = getReference(specification); - int size = retreiver.fetch(reference); workLog.setJobToFinished(specification.id, writer.getOutputFile().toString(), size); @@ -206,6 +208,10 @@ public class CrawlerMain implements AutoCloseable { } catch (Exception e) { logger.error("Error fetching domain " + specification.domain, e); } + finally { + // We don't need to double-count these; it's also kept int he workLog + processingIds.remove(specification.id); + } } private CrawlDataReference getReference(CrawlingSpecification specification) { diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java index 13d17dfc..985bfc39 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -10,7 +10,7 @@ import javax.annotation.Nullable; import java.io.IOException; /** A reference to a domain that has been crawled before. */ -public class CrawlDataReference { +public class CrawlDataReference implements AutoCloseable { private final SerializableCrawlDataStream data; @@ -75,4 +75,8 @@ public class CrawlDataReference { return hashFunction.hashInt(v).asInt(); } + @Override + public void close() throws Exception { + data.close(); + } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java index a52251bc..06f106fc 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/NoSecuritySSL.java @@ -33,7 +33,17 @@ public class NoSecuritySSL { // Install the all-trusting trust manager final SSLContext sslContext = SSLContext.getInstance("SSL"); sslContext.init(null, trustAllCerts, new java.security.SecureRandom()); - // Create an ssl socket factory with our all-trusting manager + + var clientSessionContext = sslContext.getClientSessionContext(); + + System.out.println("Default session cache size: " + clientSessionContext.getSessionCacheSize()); + System.out.println("Session timeout: " + clientSessionContext.getSessionTimeout()); + + // The default value for this is very high and will use a crapload of memory + // since the crawler will be making a lot of requests to various hosts + clientSessionContext.setSessionCacheSize(2048); + + // Create a ssl socket factory with our all-trusting manager return sslContext.getSocketFactory(); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java index 1c06510e..91875169 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java @@ -1,5 +1,6 @@ package nu.marginalia.loading; +import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; import lombok.SneakyThrows; @@ -26,7 +27,7 @@ public class ConvertedDomainReader { public List read(Path path, int cntHint) throws IOException { List ret = new ArrayList<>(cntHint); - try (var or = new ObjectInputStream(new ZstdInputStream(new FileInputStream(path.toFile())))) { + try (var or = new ObjectInputStream(new ZstdInputStream(new FileInputStream(path.toFile()), RecyclingBufferPool.INSTANCE))) { var object = or.readObject(); if (object instanceof Instruction is) { ret.add(is); @@ -39,7 +40,7 @@ public class ConvertedDomainReader { } public Iterator createIterator(Path path) throws IOException { - var or = new ObjectInputStream(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())))); + var or = new ObjectInputStream(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())), RecyclingBufferPool.INSTANCE)); return new Iterator<>() { Instruction next;