diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java index 20184f32..6f48f481 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/inbox/MqInbox.java @@ -28,6 +28,7 @@ public class MqInbox { private volatile boolean run = true; private final int pollIntervalMs = Integer.getInteger("mq.inbox.poll-interval-ms", 100); + private final int maxPollCount = Integer.getInteger("mq.inbox.max-poll-count", 10); private final List eventSubscribers = new ArrayList<>(); private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(32); @@ -194,7 +195,7 @@ public class MqInbox { private Collection pollInbox(long tick) { try { - return persistence.pollInbox(inboxName, instanceUUID, tick); + return persistence.pollInbox(inboxName, instanceUUID, tick, maxPollCount); } catch (SQLException ex) { logger.error("Failed to poll inbox", ex); diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java index a3cc319b..5bdeabd3 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java @@ -23,6 +23,7 @@ public class MqOutbox { private final ConcurrentHashMap pendingResponses = new ConcurrentHashMap<>(); private final int pollIntervalMs = Integer.getInteger("mq.outbox.poll-interval-ms", 100); + private final int maxPollCount = Integer.getInteger("mq.outbox.max-poll-count", 10); private final Thread pollThread; private volatile boolean run = true; @@ -71,7 +72,7 @@ public class MqOutbox { return; try { - var updates = persistence.pollReplyInbox(replyInboxName, instanceUUID, tick); + var updates = persistence.pollReplyInbox(replyInboxName, instanceUUID, tick, maxPollCount); for (var message : updates) { pendingResponses.put(message.relatedId(), message); diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index a62a0227..4e1f3843 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -167,18 +167,20 @@ public class MqPersistence { * then returns the number of messages marked. This is an atomic operation that * ensures that messages aren't double processed. */ - private int markInboxMessages(String inboxName, String instanceUUID, long tick) throws SQLException { + private int markInboxMessages(String inboxName, String instanceUUID, long tick, int n) throws SQLException { try (var conn = dataSource.getConnection(); var updateStmt = conn.prepareStatement(""" UPDATE MESSAGE_QUEUE SET OWNER_INSTANCE=?, OWNER_TICK=?, UPDATED_TIME=CURRENT_TIMESTAMP(6), STATE='ACK' WHERE RECIPIENT_INBOX=? AND OWNER_INSTANCE IS NULL AND STATE='NEW' + LIMIT ? """); ) { updateStmt.setString(1, instanceUUID); updateStmt.setLong(2, tick); updateStmt.setString(3, inboxName); + updateStmt.setInt(4, n); return updateStmt.executeUpdate(); } } @@ -186,10 +188,10 @@ public class MqPersistence { /** Marks unclaimed messages addressed to this inbox with instanceUUID and tick, * then returns these messages. */ - public Collection pollInbox(String inboxName, String instanceUUID, long tick) throws SQLException { + public Collection pollInbox(String inboxName, String instanceUUID, long tick, int n) throws SQLException { // Mark new messages as claimed - int expected = markInboxMessages(inboxName, instanceUUID, tick); + int expected = markInboxMessages(inboxName, instanceUUID, tick, n); if (expected == 0) { return Collections.emptyList(); } @@ -231,10 +233,10 @@ public class MqPersistence { /** Marks unclaimed messages addressed to this inbox with instanceUUID and tick, * then returns these messages. */ - public Collection pollReplyInbox(String inboxName, String instanceUUID, long tick) throws SQLException { + public Collection pollReplyInbox(String inboxName, String instanceUUID, long tick, int n) throws SQLException { // Mark new messages as claimed - int expected = markInboxMessages(inboxName, instanceUUID, tick); + int expected = markInboxMessages(inboxName, instanceUUID, tick, n); if (expected == 0) { return Collections.emptyList(); } diff --git a/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java b/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java index ad9700da..38cfc4fb 100644 --- a/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java +++ b/code/tools/crawl-job-extractor/src/test/java/nu/marginalia/crawl/CrawlJobSpecWriterTest.java @@ -37,7 +37,9 @@ public class CrawlJobSpecWriterTest { } List outputs = new ArrayList<>(); - CrawlerSpecificationLoader.readInputSpec(tempFile, outputs::add); + for (var item : CrawlerSpecificationLoader.asIterable(tempFile)) { + outputs.add(item); + } assertEquals(outputs.size(), 3); }