Message queues¶
Moon provides durable message queues built on Redis Streams with at-least-once delivery semantics, automatic dead-letter handling, and debounced trigger callbacks.
Quick start¶
redis-cli -p 6379
# Create a durable queue with max 5 delivery attempts
127.0.0.1:6379> MQ CREATE orders MAXDELIVERY 5
OK
# Enqueue a message (field/value pairs, like XADD)
127.0.0.1:6379> MQ PUSH orders action process item_id 42
"1713394800000-0"
# Claim messages (consumer-group based)
127.0.0.1:6379> MQ POP orders COUNT 3
1) 1) "1713394800000-0"
2) 1) "action" 2) "process" 3) "item_id" 4) "42"
# Acknowledge processed messages
127.0.0.1:6379> MQ ACK orders 1713394800000-0
(integer) 1
Commands¶
| Command | Description |
|---|---|
MQ CREATE <key> [MAXDELIVERY <n>] [DEBOUNCE <ms>] |
Create a durable queue. Default: max 3 deliveries, no debounce |
MQ PUSH <key> <f1> <v1> [f2 v2 ...] |
Enqueue a message with field/value pairs. Returns stream ID |
MQ POP <key> [COUNT <n>] |
Claim up to N messages (default 1). Increments delivery counter |
MQ ACK <key> <id1> [id2 ...] |
Acknowledge messages by stream ID |
MQ DLQLEN <key> |
Return dead-letter queue depth |
MQ TRIGGER <key> <callback> [DEBOUNCE <ms>] |
Register a debounced trigger callback (default 1000ms) |
MQ PUBLISH <key> <f1> <v1> [...] |
Transactional enqueue within a TXN block. Applied on TXN COMMIT |
Dead-letter queue¶
Messages that exceed MAXDELIVERY attempts are automatically moved to a dead-letter queue at {queue_key}::mq:dlq (double-colon avoids key collisions). Use MQ DLQLEN to monitor DLQ depth.
Triggers¶
Register a callback command that fires when new messages arrive. The trigger is debounced to avoid thundering-herd on burst ingestion.
# Fire a PUBLISH notification when orders arrive (2s debounce)
127.0.0.1:6379> MQ TRIGGER orders "PUBLISH events new-order" DEBOUNCE 2000
OK
Triggers use a consumer group (__mq_consumers) and the fire_pending_mq_triggers() event loop integration to execute callbacks.
Transactional enqueue¶
Within a TXN block, use MQ PUBLISH instead of MQ PUSH to buffer the enqueue. The message is only added to the stream on TXN COMMIT.
127.0.0.1:6379> TXN BEGIN
OK
127.0.0.1:6379> SET order:99 '{"status":"paid"}'
OK
127.0.0.1:6379> MQ PUBLISH fulfillment order_id 99 action ship
OK
127.0.0.1:6379> TXN COMMIT -- SET and MQ PUBLISH applied atomically
OK
WAL recovery¶
MQ state is persisted via WAL records. On crash recovery, replay_mq_wal() restores queue registries, pending messages, and trigger configurations with cursor rollback for consistency.