Merge branch 'master' into serp-redesign

This commit is contained in:
Viktor Lofgren 2025-01-05 18:52:51 +01:00
commit 8c69dc31b8
8 changed files with 51 additions and 10 deletions

1
.github/FUNDING.yml vendored
View File

@ -1,5 +1,6 @@
# These are supported funding model platforms # These are supported funding model platforms
polar: marginalia-search
github: MarginaliaSearch github: MarginaliaSearch
patreon: marginalia_nu patreon: marginalia_nu
open_collective: # Replace with a single Open Collective username open_collective: # Replace with a single Open Collective username

View File

@ -7,8 +7,6 @@ import nu.marginalia.service.discovery.property.PartitionTraits;
import nu.marginalia.service.discovery.property.ServiceEndpoint; import nu.marginalia.service.discovery.property.ServiceEndpoint;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition; import nu.marginalia.service.discovery.property.ServicePartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -24,7 +22,7 @@ import java.util.function.Function;
public class GrpcMultiNodeChannelPool<STUB> { public class GrpcMultiNodeChannelPool<STUB> {
private final ConcurrentHashMap<Integer, GrpcSingleNodeChannelPool<STUB>> pools = private final ConcurrentHashMap<Integer, GrpcSingleNodeChannelPool<STUB>> pools =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(GrpcMultiNodeChannelPool.class);
private final ServiceRegistryIf serviceRegistryIf; private final ServiceRegistryIf serviceRegistryIf;
private final ServiceKey<? extends PartitionTraits.Multicast> serviceKey; private final ServiceKey<? extends PartitionTraits.Multicast> serviceKey;
private final Function<ServiceEndpoint.InstanceAddress, ManagedChannel> channelConstructor; private final Function<ServiceEndpoint.InstanceAddress, ManagedChannel> channelConstructor;

View File

@ -10,6 +10,8 @@ import nu.marginalia.service.discovery.property.ServiceKey;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
@ -26,6 +28,7 @@ import java.util.function.Function;
public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor { public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
private final Map<InstanceAddress, ConnectionHolder> channels = new ConcurrentHashMap<>(); private final Map<InstanceAddress, ConnectionHolder> channels = new ConcurrentHashMap<>();
private final Marker grpcMarker = MarkerFactory.getMarker("GRPC");
private static final Logger logger = LoggerFactory.getLogger(GrpcSingleNodeChannelPool.class); private static final Logger logger = LoggerFactory.getLogger(GrpcSingleNodeChannelPool.class);
private final ServiceRegistryIf serviceRegistryIf; private final ServiceRegistryIf serviceRegistryIf;
@ -59,10 +62,10 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
for (var route : Sets.symmetricDifference(oldRoutes, newRoutes)) { for (var route : Sets.symmetricDifference(oldRoutes, newRoutes)) {
ConnectionHolder oldChannel; ConnectionHolder oldChannel;
if (newRoutes.contains(route)) { if (newRoutes.contains(route)) {
logger.info("Adding route {}", route); logger.info(grpcMarker, "Adding route {} => {}", serviceKey, route);
oldChannel = channels.put(route, new ConnectionHolder(route)); oldChannel = channels.put(route, new ConnectionHolder(route));
} else { } else {
logger.info("Expelling route {}", route); logger.info(grpcMarker, "Expelling route {} => {}", serviceKey, route);
oldChannel = channels.remove(route); oldChannel = channels.remove(route);
} }
if (oldChannel != null) { if (oldChannel != null) {
@ -100,7 +103,7 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
} }
try { try {
logger.info("Creating channel for {}:{}", serviceKey, address); logger.info(grpcMarker, "Creating channel for {} => {}", serviceKey, address);
value = channelConstructor.apply(address); value = channelConstructor.apply(address);
if (channel.compareAndSet(null, value)) { if (channel.compareAndSet(null, value)) {
return value; return value;
@ -111,7 +114,7 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
} }
} }
catch (Exception e) { catch (Exception e) {
logger.error("Failed to get channel for " + address, e); logger.error(grpcMarker, "Failed to get channel for " + address, e);
return null; return null;
} }
} }
@ -203,7 +206,7 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
} }
for (var e : exceptions) { for (var e : exceptions) {
logger.error("Failed to call service {}", serviceKey, e); logger.error(grpcMarker, "Failed to call service {}", serviceKey, e);
} }
throw new ServiceNotAvailableException(serviceKey); throw new ServiceNotAvailableException(serviceKey);

View File

@ -4,6 +4,11 @@ import nu.marginalia.service.discovery.property.ServiceKey;
public class ServiceNotAvailableException extends RuntimeException { public class ServiceNotAvailableException extends RuntimeException {
public ServiceNotAvailableException(ServiceKey<?> key) { public ServiceNotAvailableException(ServiceKey<?> key) {
super("Service " + key + " not available"); super(key.toString());
}
@Override
public StackTraceElement[] getStackTrace() { // Suppress stack trace
return new StackTraceElement[0];
} }
} }

View File

@ -48,5 +48,10 @@ public record ServiceEndpoint(String host, int port) {
public int port() { public int port() {
return endpoint.port(); return endpoint.port();
} }
@Override
public String toString() {
return endpoint().host() + ":" + endpoint.port() + " [" + instance + "]";
}
} }
} }

View File

@ -48,6 +48,19 @@ public sealed interface ServiceKey<P extends ServicePartition> {
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public String toString() {
final String shortName;
int periodIndex = name.lastIndexOf('.');
if (periodIndex >= 0) shortName = name.substring(periodIndex+1);
else shortName = name;
return "rest:" + shortName;
}
} }
record Grpc<P extends ServicePartition>(String name, P partition) implements ServiceKey<P> { record Grpc<P extends ServicePartition>(String name, P partition) implements ServiceKey<P> {
public String baseName() { public String baseName() {
@ -64,6 +77,18 @@ public sealed interface ServiceKey<P extends ServicePartition> {
{ {
return new Grpc<>(name, partition); return new Grpc<>(name, partition);
} }
@Override
public String toString() {
final String shortName;
int periodIndex = name.lastIndexOf('.');
if (periodIndex >= 0) shortName = name.substring(periodIndex+1);
else shortName = name;
return "grpc:" + shortName + "[" + partition.identifier() + "]";
}
} }
} }

View File

@ -167,7 +167,7 @@ public class SimilarDomainsService {
private void updateFeedInfo() { private void updateFeedInfo() {
Set<String> feedsDomainNames = new HashSet<>(500_000); Set<String> feedsDomainNames = new HashSet<>(500_000);
Path readerDbPath = WmsaHome.getDataPath().resolve("feeds.db").toAbsolutePath(); Path readerDbPath = WmsaHome.getDataPath().resolve("rss-feeds.db").toAbsolutePath();
String dbUrl = "jdbc:sqlite:" + readerDbPath; String dbUrl = "jdbc:sqlite:" + readerDbPath;
logger.info("Opening feed db at " + dbUrl); logger.info("Opening feed db at " + dbUrl);

View File

@ -5,6 +5,7 @@ import com.google.inject.Singleton;
import nu.marginalia.api.livecapture.LiveCaptureApiGrpc.LiveCaptureApiBlockingStub; import nu.marginalia.api.livecapture.LiveCaptureApiGrpc.LiveCaptureApiBlockingStub;
import nu.marginalia.service.client.GrpcChannelPoolFactory; import nu.marginalia.service.client.GrpcChannelPoolFactory;
import nu.marginalia.service.client.GrpcSingleNodeChannelPool; import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
import nu.marginalia.service.client.ServiceNotAvailableException;
import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition; import nu.marginalia.service.discovery.property.ServicePartition;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -29,6 +30,9 @@ public class LiveCaptureClient {
channelPool.call(LiveCaptureApiBlockingStub::requestScreengrab) channelPool.call(LiveCaptureApiBlockingStub::requestScreengrab)
.run(RpcDomainId.newBuilder().setDomainId(domainId).build()); .run(RpcDomainId.newBuilder().setDomainId(domainId).build());
} }
catch (ServiceNotAvailableException e) {
logger.info("requestScreengrab() failed since the service is not available");
}
catch (Exception e) { catch (Exception e) {
logger.error("API Exception", e); logger.error("API Exception", e);
} }