streams.consumer
consumer
codex_platform.streams.consumer
Redis Stream consumer — reads events via XREADGROUP, acknowledges via XACK.
Classes
StreamEvent
dataclass
Parsed Redis Stream event.
Source code in src/codex_platform/streams/consumer.py
Attributes
payload
property
Return dispatcher-ready payload with the type field restored.
StreamConsumer
Reads events from a Redis Stream via a consumer group (XREADGROUP).
Acknowledges processed messages via XACK.
Source code in src/codex_platform/streams/consumer.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | |
Functions
ensure_group()
async
Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM).
Source code in src/codex_platform/streams/consumer.py
create_group(stream_name=None, group_name=None)
async
Create a group using the StreamStorageProtocol method name.
Source code in src/codex_platform/streams/consumer.py
read(count=10, block_ms=1000)
async
Read new events from the stream for the consumer group (XREADGROUP).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Maximum number of messages to fetch per call. Defaults to |
10
|
block_ms
|
int | None
|
Redis XREADGROUP block timeout in milliseconds. |
1000
|
Returns:
| Type | Description |
|---|---|
list[StreamEvent]
|
List of :class: |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Source code in src/codex_platform/streams/consumer.py
read_events(stream_name=None, group_name=None, consumer_name=None, count=10, block_ms=1000)
async
Read dispatcher-ready events using the StreamStorageProtocol method name.
Source code in src/codex_platform/streams/consumer.py
ack(event_id)
async
Acknowledge that an event has been processed (XACK).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
str
|
Stream entry ID returned by :meth: |
required |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Source code in src/codex_platform/streams/consumer.py
ack_event(stream_name, group_name, message_id)
async
Acknowledge an event using the StreamStorageProtocol method name.