(minor) Add limit to pol count in MqPersistence, fix test

This commit is contained in:
Viktor Lofgren 2023-07-12 18:16:23 +02:00
parent 89e4343fdb
commit 480abfe966
4 changed files with 14 additions and 8 deletions

View File

@ -28,6 +28,7 @@ public class MqInbox {
private volatile boolean run = true;
private final int pollIntervalMs = Integer.getInteger("mq.inbox.poll-interval-ms", 100);
private final int maxPollCount = Integer.getInteger("mq.inbox.max-poll-count", 10);
private final List<MqSubscription> eventSubscribers = new ArrayList<>();
private final LinkedBlockingQueue<MqMessage> queue = new LinkedBlockingQueue<>(32);
@ -194,7 +195,7 @@ public class MqInbox {
private Collection<MqMessage> pollInbox(long tick) {
try {
return persistence.pollInbox(inboxName, instanceUUID, tick);
return persistence.pollInbox(inboxName, instanceUUID, tick, maxPollCount);
}
catch (SQLException ex) {
logger.error("Failed to poll inbox", ex);

View File

@ -23,6 +23,7 @@ public class MqOutbox {
private final ConcurrentHashMap<Long, MqMessage> pendingResponses = new ConcurrentHashMap<>();
private final int pollIntervalMs = Integer.getInteger("mq.outbox.poll-interval-ms", 100);
private final int maxPollCount = Integer.getInteger("mq.outbox.max-poll-count", 10);
private final Thread pollThread;
private volatile boolean run = true;
@ -71,7 +72,7 @@ public class MqOutbox {
return;
try {
var updates = persistence.pollReplyInbox(replyInboxName, instanceUUID, tick);
var updates = persistence.pollReplyInbox(replyInboxName, instanceUUID, tick, maxPollCount);
for (var message : updates) {
pendingResponses.put(message.relatedId(), message);

View File

@ -167,18 +167,20 @@ public class MqPersistence {
* then returns the number of messages marked. This is an atomic operation that
* ensures that messages aren't double processed.
*/
private int markInboxMessages(String inboxName, String instanceUUID, long tick) throws SQLException {
private int markInboxMessages(String inboxName, String instanceUUID, long tick, int n) throws SQLException {
try (var conn = dataSource.getConnection();
var updateStmt = conn.prepareStatement("""
UPDATE MESSAGE_QUEUE
SET OWNER_INSTANCE=?, OWNER_TICK=?, UPDATED_TIME=CURRENT_TIMESTAMP(6), STATE='ACK'
WHERE RECIPIENT_INBOX=?
AND OWNER_INSTANCE IS NULL AND STATE='NEW'
LIMIT ?
""");
) {
updateStmt.setString(1, instanceUUID);
updateStmt.setLong(2, tick);
updateStmt.setString(3, inboxName);
updateStmt.setInt(4, n);
return updateStmt.executeUpdate();
}
}
@ -186,10 +188,10 @@ public class MqPersistence {
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
* then returns these messages.
*/
public Collection<MqMessage> pollInbox(String inboxName, String instanceUUID, long tick) throws SQLException {
public Collection<MqMessage> pollInbox(String inboxName, String instanceUUID, long tick, int n) throws SQLException {
// Mark new messages as claimed
int expected = markInboxMessages(inboxName, instanceUUID, tick);
int expected = markInboxMessages(inboxName, instanceUUID, tick, n);
if (expected == 0) {
return Collections.emptyList();
}
@ -231,10 +233,10 @@ public class MqPersistence {
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
* then returns these messages.
*/
public Collection<MqMessage> pollReplyInbox(String inboxName, String instanceUUID, long tick) throws SQLException {
public Collection<MqMessage> pollReplyInbox(String inboxName, String instanceUUID, long tick, int n) throws SQLException {
// Mark new messages as claimed
int expected = markInboxMessages(inboxName, instanceUUID, tick);
int expected = markInboxMessages(inboxName, instanceUUID, tick, n);
if (expected == 0) {
return Collections.emptyList();
}

View File

@ -37,7 +37,9 @@ public class CrawlJobSpecWriterTest {
}
List<CrawlingSpecification> outputs = new ArrayList<>();
CrawlerSpecificationLoader.readInputSpec(tempFile, outputs::add);
for (var item : CrawlerSpecificationLoader.asIterable(tempFile)) {
outputs.add(item);
}
assertEquals(outputs.size(), 3);
}