(qs) Don't blow up if an index node isn't responsive

This commit is contained in:
Viktor Lofgren 2023-10-23 11:53:18 +02:00
parent 2ed2f35a9b
commit efb73ff4e7
2 changed files with 12 additions and 2 deletions

View File

@ -15,6 +15,8 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId; import nu.marginalia.service.id.ServiceId;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
@ -54,8 +56,12 @@ public class IndexClient extends AbstractDynamicClient {
@CheckReturnValue @CheckReturnValue
public SearchResultSet query(Context ctx, List<Integer> nodes, SearchSpecification specs) { public SearchResultSet query(Context ctx, List<Integer> nodes, SearchSpecification specs) {
return Observable.fromIterable(nodes) return Observable.fromIterable(nodes)
.subscribeOn(Schedulers.io()) .concatMap(node -> this
.concatMap(node -> this.postGet(ctx, node,"/search/", specs, SearchResultSet.class)) .postGet(ctx, node,"/search/", specs, SearchResultSet.class)
.onErrorReturn(t -> new SearchResultSet()),
nodes.size(),
Schedulers.io()
)
.reduce(SearchResultSet::combine) .reduce(SearchResultSet::combine)
.blockingGet(); .blockingGet();
} }

View File

@ -10,6 +10,10 @@ import java.util.List;
@AllArgsConstructor @Getter @ToString @AllArgsConstructor @Getter @ToString
public class SearchResultSet { public class SearchResultSet {
public SearchResultSet() {
results = new ArrayList<>();
}
public List<DecoratedSearchResultItem> results; public List<DecoratedSearchResultItem> results;
public int size() { public int size() {
return results.size(); return results.size();