(minor) Javadoc comments for MqPersistance and MqMessageState

This commit is contained in:
Viktor Lofgren 2023-07-10 21:52:25 +02:00
parent 98b5f22104
commit ec7826659a
2 changed files with 29 additions and 1 deletions

View File

@ -1,9 +1,14 @@
package nu.marginalia.mq; package nu.marginalia.mq;
public enum MqMessageState { public enum MqMessageState {
/** The message is new and has not yet been acknowledged by the recipient */
NEW, NEW,
/** The message has been acknowledged by the recipient */
ACK, ACK,
/** The message has been processed successfully by the recipient */
OK, OK,
/** The message processing has failed */
ERR, ERR,
/** The message did not reach a terminal state within the TTL */
DEAD DEAD
} }

View File

@ -35,6 +35,7 @@ public class MqPersistence {
} }
} }
/** Removes messages that have been set to a terminal state a while after their last update timestamp */
public int cleanOldMessages() throws SQLException { public int cleanOldMessages() throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var setToDead = conn.prepareStatement(""" var setToDead = conn.prepareStatement("""
@ -47,6 +48,16 @@ public class MqPersistence {
} }
} }
/**
* Adds a new message to the message queue.
*
* @param recipientInboxName The recipient's inbox name
* @param senderInboxName (nullable) The sender's inbox name. Only needed if a reply is expected. If null, the message is not expected to be replied to.
* @param function The function to call
* @param payload The payload to send, typically JSON.
* @param ttl (nullable) The time to live of the message, in seconds. If null, the message will never set to DEAD.
* @return The id of the message
*/
public long sendNewMessage(String recipientInboxName, public long sendNewMessage(String recipientInboxName,
@Nullable @Nullable
String senderInboxName, String senderInboxName,
@ -82,7 +93,7 @@ public class MqPersistence {
} }
} }
/** Modifies the state of a message by id */
public void updateMessageState(long id, MqMessageState mqMessageState) throws SQLException { public void updateMessageState(long id, MqMessageState mqMessageState) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(""" var stmt = conn.prepareStatement("""
@ -99,6 +110,9 @@ public class MqPersistence {
} }
} }
/** Creates a new message in the queue referencing as a reply to an existing message
* This message will have it's RELATED_ID set to the original message's ID.
*/
public long sendResponse(long id, MqMessageState mqMessageState, String message) throws SQLException { public long sendResponse(long id, MqMessageState mqMessageState, String message) throws SQLException {
try (var conn = dataSource.getConnection()) { try (var conn = dataSource.getConnection()) {
conn.setAutoCommit(false); conn.setAutoCommit(false);
@ -149,6 +163,10 @@ public class MqPersistence {
} }
/** Marks unclaimed messages addressed to this inbox with instanceUUID and tick,
* then returns the number of messages marked. This is an atomic operation that
* ensures that messages aren't double processed.
*/
private int markInboxMessages(String inboxName, String instanceUUID, long tick) throws SQLException { private int markInboxMessages(String inboxName, String instanceUUID, long tick) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var updateStmt = conn.prepareStatement(""" var updateStmt = conn.prepareStatement("""
@ -170,11 +188,13 @@ public class MqPersistence {
*/ */
public Collection<MqMessage> pollInbox(String inboxName, String instanceUUID, long tick) throws SQLException { public Collection<MqMessage> pollInbox(String inboxName, String instanceUUID, long tick) throws SQLException {
// Mark new messages as claimed
int expected = markInboxMessages(inboxName, instanceUUID, tick); int expected = markInboxMessages(inboxName, instanceUUID, tick);
if (expected == 0) { if (expected == 0) {
return Collections.emptyList(); return Collections.emptyList();
} }
// Then fetch the messages that were marked
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement(""" var queryStmt = conn.prepareStatement("""
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM PROC_MESSAGE SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM PROC_MESSAGE
@ -213,11 +233,13 @@ public class MqPersistence {
*/ */
public Collection<MqMessage> pollReplyInbox(String inboxName, String instanceUUID, long tick) throws SQLException { public Collection<MqMessage> pollReplyInbox(String inboxName, String instanceUUID, long tick) throws SQLException {
// Mark new messages as claimed
int expected = markInboxMessages(inboxName, instanceUUID, tick); int expected = markInboxMessages(inboxName, instanceUUID, tick);
if (expected == 0) { if (expected == 0) {
return Collections.emptyList(); return Collections.emptyList();
} }
// Then fetch the messages that were marked
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement(""" var queryStmt = conn.prepareStatement("""
SELECT SELF.ID, SELF.RELATED_ID, SELF.FUNCTION, SELF.PAYLOAD, PARENT.STATE FROM PROC_MESSAGE SELF SELECT SELF.ID, SELF.RELATED_ID, SELF.FUNCTION, SELF.PAYLOAD, PARENT.STATE FROM PROC_MESSAGE SELF
@ -249,6 +271,7 @@ public class MqPersistence {
} }
} }
/** Returns the last N messages sent to this inbox */
public List<MqMessage> lastNMessages(String inboxName, int lastN) throws SQLException { public List<MqMessage> lastNMessages(String inboxName, int lastN) throws SQLException {
try (var conn = dataSource.getConnection(); try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(""" var stmt = conn.prepareStatement("""