mirror of
https://github.com/MarginaliaSearch/MarginaliaSearch.git
synced 2025-02-23 13:09:00 +00:00
(crawl) Add new functionality for re-crawling a single domain
This commit is contained in:
parent
69f88255e9
commit
d86926be5f
@ -44,6 +44,15 @@ public class ExecutorCrawlClient {
|
||||
.build());
|
||||
}
|
||||
|
||||
public void triggerRecrawlSingleDomain(int node, FileStorageId fid, String domainName) {
|
||||
channelPool.call(ExecutorCrawlApiBlockingStub::triggerSingleDomainRecrawl)
|
||||
.forNode(node)
|
||||
.run(RpcFileStorageIdWithDomainName.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.setTargetDomainName(domainName)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void triggerConvert(int node, FileStorageId fid) {
|
||||
channelPool.call(ExecutorCrawlApiBlockingStub::triggerConvert)
|
||||
.forNode(node)
|
||||
|
@ -22,6 +22,7 @@ service ExecutorApi {
|
||||
service ExecutorCrawlApi {
|
||||
rpc triggerCrawl(RpcFileStorageId) returns (Empty) {}
|
||||
rpc triggerRecrawl(RpcFileStorageId) returns (Empty) {}
|
||||
rpc triggerSingleDomainRecrawl(RpcFileStorageIdWithDomainName) returns (Empty) {}
|
||||
rpc triggerConvert(RpcFileStorageId) returns (Empty) {}
|
||||
rpc triggerConvertAndLoad(RpcFileStorageId) returns (Empty) {}
|
||||
rpc loadProcessedData(RpcFileStorageIds) returns (Empty) {}
|
||||
@ -55,6 +56,10 @@ message RpcProcessId {
|
||||
message RpcFileStorageId {
|
||||
int64 fileStorageId = 1;
|
||||
}
|
||||
message RpcFileStorageIdWithDomainName {
|
||||
int64 fileStorageId = 1;
|
||||
string targetDomainName = 2;
|
||||
}
|
||||
message RpcFileStorageIds {
|
||||
repeated int64 fileStorageIds = 1;
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package nu.marginalia.actor;
|
||||
public enum ExecutorActor {
|
||||
CRAWL,
|
||||
RECRAWL,
|
||||
RECRAWL_SINGLE_DOMAIN,
|
||||
CONVERT_AND_LOAD,
|
||||
PROC_CONVERTER_SPAWNER,
|
||||
PROC_LOADER_SPAWNER,
|
||||
|
@ -26,6 +26,7 @@ public class ExecutorActorControlService {
|
||||
private final ExecutorActorStateMachines stateMachines;
|
||||
public Map<ExecutorActor, ActorPrototype> actorDefinitions = new HashMap<>();
|
||||
private final int node;
|
||||
|
||||
@Inject
|
||||
public ExecutorActorControlService(MessageQueueFactory messageQueueFactory,
|
||||
BaseServiceParams baseServiceParams,
|
||||
@ -33,6 +34,7 @@ public class ExecutorActorControlService {
|
||||
ConvertAndLoadActor convertAndLoadActor,
|
||||
CrawlActor crawlActor,
|
||||
RecrawlActor recrawlActor,
|
||||
RecrawlSingleDomainActor recrawlSingleDomainActor,
|
||||
RestoreBackupActor restoreBackupActor,
|
||||
ConverterMonitorActor converterMonitorFSM,
|
||||
CrawlerMonitorActor crawlerMonitorActor,
|
||||
@ -57,6 +59,8 @@ public class ExecutorActorControlService {
|
||||
|
||||
register(ExecutorActor.CRAWL, crawlActor);
|
||||
register(ExecutorActor.RECRAWL, recrawlActor);
|
||||
register(ExecutorActor.RECRAWL_SINGLE_DOMAIN, recrawlSingleDomainActor);
|
||||
|
||||
register(ExecutorActor.CONVERT, convertActor);
|
||||
register(ExecutorActor.RESTORE_BACKUP, restoreBackupActor);
|
||||
register(ExecutorActor.CONVERT_AND_LOAD, convertAndLoadActor);
|
||||
|
@ -50,7 +50,9 @@ public class CrawlActor extends RecordActorPrototype {
|
||||
storageService.relateFileStorages(storage.id(), dataArea.id());
|
||||
|
||||
// Send convert request
|
||||
long msgId = mqCrawlerOutbox.sendAsync(new CrawlRequest(List.of(fid), dataArea.id()));
|
||||
long msgId = mqCrawlerOutbox.sendAsync(
|
||||
CrawlRequest.forSpec(fid, dataArea.id())
|
||||
);
|
||||
|
||||
yield new Crawl(msgId);
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ public class RecrawlActor extends RecordActorPrototype {
|
||||
|
||||
refreshService.synchronizeDomainList();
|
||||
|
||||
long id = mqCrawlerOutbox.sendAsync(new CrawlRequest(null, fid));
|
||||
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.forRecrawl(fid));
|
||||
|
||||
yield new Crawl(id, fid, cascadeLoad);
|
||||
}
|
||||
|
@ -0,0 +1,85 @@
|
||||
package nu.marginalia.actor.task;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.prototype.RecordActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.actor.state.Resume;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
|
||||
@Singleton
|
||||
public class RecrawlSingleDomainActor extends RecordActorPrototype {
|
||||
|
||||
private final MqOutbox mqCrawlerOutbox;
|
||||
private final FileStorageService storageService;
|
||||
private final ActorProcessWatcher processWatcher;
|
||||
|
||||
/** Initial step
|
||||
* @param storageId - the id of the storage to recrawl
|
||||
* @param targetDomainName - domain to be recrawled
|
||||
*/
|
||||
public record Initial(FileStorageId storageId, String targetDomainName) implements ActorStep {}
|
||||
|
||||
/** The action step */
|
||||
@Resume(behavior = ActorResumeBehavior.RETRY)
|
||||
public record Crawl(long messageId) implements ActorStep {}
|
||||
|
||||
@Override
|
||||
public ActorStep transition(ActorStep self) throws Exception {
|
||||
return switch (self) {
|
||||
case Initial (FileStorageId fid, String targetDomainName) -> {
|
||||
var crawlStorage = storageService.getStorage(fid);
|
||||
|
||||
if (crawlStorage == null) yield new Error("Bad storage id");
|
||||
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type());
|
||||
|
||||
long id = mqCrawlerOutbox.sendAsync(
|
||||
CrawlRequest.forSingleDomain(targetDomainName, fid)
|
||||
);
|
||||
|
||||
yield new Crawl(id);
|
||||
}
|
||||
case Crawl (long msgId) -> {
|
||||
var rsp = processWatcher.waitResponse(
|
||||
mqCrawlerOutbox,
|
||||
ProcessService.ProcessId.CRAWLER,
|
||||
msgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
yield new Error("Crawler failed");
|
||||
}
|
||||
|
||||
yield new End();
|
||||
}
|
||||
default -> new End();
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public String describe() {
|
||||
return "Run the crawler only re-fetching a single domain";
|
||||
}
|
||||
|
||||
@Inject
|
||||
public RecrawlSingleDomainActor(ActorProcessWatcher processWatcher,
|
||||
ProcessOutboxes processOutboxes,
|
||||
FileStorageService storageService,
|
||||
Gson gson)
|
||||
{
|
||||
super(gson);
|
||||
|
||||
this.processWatcher = processWatcher;
|
||||
this.mqCrawlerOutbox = processOutboxes.getCrawlerOutbox();
|
||||
this.storageService = storageService;
|
||||
}
|
||||
|
||||
}
|
@ -47,6 +47,22 @@ public class ExecutorCrawlGrpcService extends ExecutorCrawlApiGrpc.ExecutorCrawl
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerSingleDomainRecrawl(RpcFileStorageIdWithDomainName request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
actorControlService.startFrom(ExecutorActor.RECRAWL_SINGLE_DOMAIN,
|
||||
new RecrawlSingleDomainActor.Initial(
|
||||
FileStorageId.of(request.getFileStorageId()),
|
||||
request.getTargetDomainName()));
|
||||
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerConvert(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
|
@ -14,8 +14,24 @@ public class CrawlRequest {
|
||||
*/
|
||||
public List<FileStorageId> specStorage;
|
||||
|
||||
/** (optional) Name of a single domain to be re-crawled */
|
||||
public String targetDomainName;
|
||||
|
||||
/** File storage where the crawl data will be written. If it contains existing crawl data,
|
||||
* this crawl data will be referenced for e-tags and last-mofified checks.
|
||||
*/
|
||||
public FileStorageId crawlStorage;
|
||||
|
||||
public static CrawlRequest forSpec(FileStorageId specStorage, FileStorageId crawlStorage) {
|
||||
return new CrawlRequest(List.of(specStorage), null, crawlStorage);
|
||||
}
|
||||
|
||||
public static CrawlRequest forSingleDomain(String targetDomainName, FileStorageId crawlStorage) {
|
||||
return new CrawlRequest(null, targetDomainName, crawlStorage);
|
||||
}
|
||||
|
||||
public static CrawlRequest forRecrawl(FileStorageId crawlStorage) {
|
||||
return new CrawlRequest(null, null, crawlStorage);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import nu.marginalia.crawling.io.CrawledDomainReader;
|
||||
import nu.marginalia.crawling.io.CrawlerOutputFile;
|
||||
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
|
||||
import nu.marginalia.crawlspec.CrawlSpecFileNames;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.service.ProcessMainClass;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
|
||||
@ -136,7 +137,12 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
|
||||
var instructions = crawler.fetchInstructions();
|
||||
try {
|
||||
crawler.run(instructions.specProvider, instructions.outputDir);
|
||||
if (instructions.targetDomainName != null) {
|
||||
crawler.runForSingleDomain(instructions.targetDomainName, instructions.outputDir);
|
||||
}
|
||||
else {
|
||||
crawler.run(instructions.specProvider, instructions.outputDir);
|
||||
}
|
||||
instructions.ok();
|
||||
} catch (Exception ex) {
|
||||
logger.error("Crawler failed", ex);
|
||||
@ -200,6 +206,26 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
}
|
||||
}
|
||||
|
||||
public void runForSingleDomain(String targetDomainName, Path outputDir) throws Exception {
|
||||
|
||||
heartbeat.start();
|
||||
|
||||
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler-" + targetDomainName.replace('/', '-') + ".log"));
|
||||
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
|
||||
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(List.of(new EdgeDomain(targetDomainName)))
|
||||
) {
|
||||
var spec = new CrawlSpecRecord(targetDomainName, 1000, null);
|
||||
var task = new CrawlTask(spec, anchorTagsSource, outputDir, warcArchiver, workLog);
|
||||
task.run();
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Exception in crawler", ex);
|
||||
}
|
||||
finally {
|
||||
heartbeat.shutDown();
|
||||
}
|
||||
}
|
||||
|
||||
class CrawlTask implements SimpleBlockingThreadPool.Task {
|
||||
|
||||
private final CrawlSpecRecord specification;
|
||||
@ -216,7 +242,8 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
AnchorTagsSource anchorTagsSource,
|
||||
Path outputDir,
|
||||
WarcArchiverIf warcArchiver,
|
||||
WorkLog workLog) {
|
||||
WorkLog workLog)
|
||||
{
|
||||
this.specification = specification;
|
||||
this.anchorTagsSource = anchorTagsSource;
|
||||
this.outputDir = outputDir;
|
||||
@ -303,11 +330,19 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
private final MqMessage message;
|
||||
private final MqSingleShotInbox inbox;
|
||||
|
||||
CrawlRequest(CrawlSpecProvider specProvider, Path outputDir, MqMessage message, MqSingleShotInbox inbox) {
|
||||
private final String targetDomainName;
|
||||
|
||||
CrawlRequest(CrawlSpecProvider specProvider,
|
||||
String targetDomainName,
|
||||
Path outputDir,
|
||||
MqMessage message,
|
||||
MqSingleShotInbox inbox)
|
||||
{
|
||||
this.message = message;
|
||||
this.inbox = inbox;
|
||||
this.specProvider = specProvider;
|
||||
this.outputDir = outputDir;
|
||||
this.targetDomainName = targetDomainName;
|
||||
}
|
||||
|
||||
|
||||
@ -325,6 +360,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
var inbox = messageQueueFactory.createSingleShotInbox(CRAWLER_INBOX, node, UUID.randomUUID());
|
||||
|
||||
logger.info("Waiting for instructions");
|
||||
|
||||
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.crawling.CrawlRequest.class.getSimpleName());
|
||||
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
|
||||
|
||||
@ -350,6 +386,7 @@ public class CrawlerMain extends ProcessMainClass {
|
||||
|
||||
return new CrawlRequest(
|
||||
specProvider,
|
||||
request.targetDomainName,
|
||||
crawlData.asPath(),
|
||||
msg,
|
||||
inbox);
|
||||
|
@ -24,6 +24,7 @@ import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
@Singleton
|
||||
@ -88,6 +89,9 @@ public class ControlNodeActionsService {
|
||||
Spark.post("/nodes/:id/actions/recrawl", this::triggerAutoRecrawl,
|
||||
redirectControl.renderRedirectAcknowledgement("Recrawling", "..")
|
||||
);
|
||||
Spark.post("/nodes/:id/actions/recrawl-single-domain", this::triggerSingleDomainRecrawl,
|
||||
redirectControl.renderRedirectAcknowledgement("Recrawling", "..")
|
||||
);
|
||||
Spark.post("/nodes/:id/actions/process", this::triggerProcess,
|
||||
redirectControl.renderRedirectAcknowledgement("Processing", "..")
|
||||
);
|
||||
@ -216,6 +220,21 @@ public class ControlNodeActionsService {
|
||||
return "";
|
||||
}
|
||||
|
||||
private Object triggerSingleDomainRecrawl(Request request, Response response) throws SQLException {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
|
||||
var toCrawl = parseSourceFileStorageId(request.queryParams("source"));
|
||||
var targetDomainName = Objects.requireNonNull(request.queryParams("targetDomainName"));
|
||||
|
||||
crawlClient.triggerRecrawlSingleDomain(
|
||||
nodeId,
|
||||
toCrawl,
|
||||
targetDomainName
|
||||
);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private Object triggerNewCrawl(Request request, Response response) throws SQLException {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
|
||||
|
@ -24,12 +24,20 @@
|
||||
<h2>Summary</h2>
|
||||
<table class="table">
|
||||
<tr>
|
||||
<th>Domain</th><th>File</th>
|
||||
<th>Domain</th><th>File</th><th>Crawl</th>
|
||||
</tr>
|
||||
<td>{{domain}}</td>
|
||||
<td>
|
||||
<a class="btn btn-primary" href="/nodes/{{node.id}}/storage/{{storage.id}}/transfer?path={{{path}}}">Download Parquet</a>
|
||||
</td>
|
||||
<td>
|
||||
<form method="post" action="/nodes/{{node.id}}/actions/recrawl-single-domain">
|
||||
<input type="hidden" name="source" value="{{storage.id}}">
|
||||
<input type="hidden" name="targetDomainName" value="{{domain}}">
|
||||
<button type="submit" class="btn btn-primary" onsubmit="return confirm('Confirm recrawl of {{domain}}')">Trigger Recrawl</button>
|
||||
</form>
|
||||
</td>
|
||||
|
||||
</table>
|
||||
|
||||
<h2>Contents</h2>
|
||||
|
Loading…
Reference in New Issue
Block a user