mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-22 20:48:59 +00:00
Merge branch 'master' into master
This commit is contained in:
commit
0bebdb6e33
@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/** WorkLog is a journal of work done by a process,
|
||||
@ -61,6 +63,12 @@ public class WorkLog implements AutoCloseable, Closeable {
|
||||
return new WorkLoadIterable<>(logFile, mapper);
|
||||
}
|
||||
|
||||
public static int countEntries(Path crawlerLog) throws IOException{
|
||||
try (var linesStream = Files.lines(crawlerLog)) {
|
||||
return (int) linesStream.filter(WorkLogEntry::isJobId).count();
|
||||
}
|
||||
}
|
||||
|
||||
// Use synchro over concurrent set to avoid competing writes
|
||||
// - correct is better than fast here, it's sketchy enough to use
|
||||
// a PrintWriter
|
||||
|
@ -8,6 +8,7 @@ import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.io.CrawlerOutputFile;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.process.log.WorkLogEntry;
|
||||
import nu.marginalia.service.control.ServiceHeartbeat;
|
||||
import nu.marginalia.slop.SlopCrawlDataRecord;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
@ -26,14 +27,15 @@ import java.util.function.Function;
|
||||
public class MigrateCrawlDataActor extends RecordActorPrototype {
|
||||
|
||||
private final FileStorageService fileStorageService;
|
||||
|
||||
private final ServiceHeartbeat serviceHeartbeat;
|
||||
private static final Logger logger = LoggerFactory.getLogger(MigrateCrawlDataActor.class);
|
||||
|
||||
@Inject
|
||||
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService) {
|
||||
public MigrateCrawlDataActor(Gson gson, FileStorageService fileStorageService, ServiceHeartbeat serviceHeartbeat) {
|
||||
super(gson);
|
||||
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.serviceHeartbeat = serviceHeartbeat;
|
||||
}
|
||||
|
||||
public record Run(long fileStorageId) implements ActorStep {}
|
||||
@ -49,28 +51,39 @@ public class MigrateCrawlDataActor extends RecordActorPrototype {
|
||||
Path crawlerLog = root.resolve("crawler.log");
|
||||
Path newCrawlerLog = Files.createTempFile(root, "crawler", ".migrate.log");
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog)) {
|
||||
int totalEntries = WorkLog.countEntries(crawlerLog);
|
||||
|
||||
try (WorkLog workLog = new WorkLog(newCrawlerLog);
|
||||
var heartbeat = serviceHeartbeat.createServiceAdHocTaskHeartbeat("Migrating")
|
||||
) {
|
||||
int entryIdx = 0;
|
||||
|
||||
for (Map.Entry<WorkLogEntry, Path> item : WorkLog.iterableMap(crawlerLog, new CrawlDataLocator(root))) {
|
||||
|
||||
var entry = item.getKey();
|
||||
var path = item.getValue();
|
||||
|
||||
logger.info("Converting {}", entry.id());
|
||||
heartbeat.progress("Migrating" + path.toFile().getName(), entryIdx++, totalEntries);
|
||||
|
||||
if (path.toFile().getName().endsWith(".parquet") && Files.exists(path)) {
|
||||
try {
|
||||
String domain = entry.id();
|
||||
String id = Integer.toHexString(domain.hashCode());
|
||||
|
||||
if (path.toFile().getName().endsWith(".parquet")) {
|
||||
String domain = entry.id();
|
||||
String id = Integer.toHexString(domain.hashCode());
|
||||
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
|
||||
|
||||
Path outputFile = CrawlerOutputFile.createSlopPath(root, id, domain);
|
||||
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
|
||||
|
||||
SlopCrawlDataRecord.convertFromParquet(path, outputFile);
|
||||
|
||||
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
|
||||
workLog.setJobToFinished(entry.id(), outputFile.toString(), entry.cnt());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to convert " + path, ex);
|
||||
}
|
||||
}
|
||||
else {
|
||||
workLog.setJobToFinished(entry.id(), path.toString(), entry.cnt());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
|
||||
|
||||
private final Duration requestTimeout = Duration.ofSeconds(10);
|
||||
private final Duration probeTimeout = Duration.ofSeconds(30);
|
||||
|
||||
@Override
|
||||
public void setAllowAllContentTypes(boolean allowAllContentTypes) {
|
||||
@ -107,23 +108,27 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.HEAD()
|
||||
.uri(url.asURI())
|
||||
.header("User-agent", userAgentString)
|
||||
.timeout(requestTimeout)
|
||||
.timeout(probeTimeout)
|
||||
.build();
|
||||
} catch (URISyntaxException e) {
|
||||
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, "Invalid URL");
|
||||
}
|
||||
|
||||
try {
|
||||
var rsp = client.send(head, HttpResponse.BodyHandlers.discarding());
|
||||
EdgeUrl rspUri = new EdgeUrl(rsp.uri());
|
||||
for (int tries = 0;; tries++) {
|
||||
try {
|
||||
var rsp = client.send(head, HttpResponse.BodyHandlers.discarding());
|
||||
EdgeUrl rspUri = new EdgeUrl(rsp.uri());
|
||||
|
||||
if (!Objects.equals(rspUri.domain, url.domain)) {
|
||||
return new DomainProbeResult.Redirect(rspUri.domain);
|
||||
if (!Objects.equals(rspUri.domain, url.domain)) {
|
||||
return new DomainProbeResult.Redirect(rspUri.domain);
|
||||
}
|
||||
return new DomainProbeResult.Ok(rspUri);
|
||||
} catch (Exception ex) {
|
||||
if (tries > 3) {
|
||||
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
|
||||
}
|
||||
// else try again ...
|
||||
}
|
||||
return new DomainProbeResult.Ok(rspUri);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
return new DomainProbeResult.Error(CrawlerDomainStatus.ERROR, ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,7 +148,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
var headBuilder = HttpRequest.newBuilder()
|
||||
.HEAD()
|
||||
.uri(url.asURI())
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.timeout(requestTimeout)
|
||||
;
|
||||
@ -215,7 +220,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
var getBuilder = HttpRequest.newBuilder()
|
||||
.GET()
|
||||
.uri(url.asURI())
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept-Language", "en,*;q=0.5")
|
||||
.header("Accept", "text/html, application/xhtml+xml, text/*;q=0.8")
|
||||
@ -307,7 +312,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.uri(sitemapUrl.asURI())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.timeout(requestTimeout)
|
||||
.build();
|
||||
|
||||
@ -386,7 +391,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
.uri(url.asURI())
|
||||
.header("Accept-Encoding", "gzip")
|
||||
.header("Accept", "text/*, */*;q=0.9")
|
||||
.header("User-agent", userAgentString)
|
||||
.header("User-Agent", userAgentString)
|
||||
.timeout(requestTimeout);
|
||||
|
||||
HttpFetchResult result = recorder.fetch(client, getRequest.build());
|
||||
|
Loading…
Reference in New Issue
Block a user