streams.consumer
consumer
codex_platform.streams.consumer
Redis Stream consumer — reads events via XREADGROUP, acknowledges via XACK.
Classes
StreamEvent
dataclass
StreamConsumer
Reads events from a Redis Stream via a consumer group (XREADGROUP).
Acknowledges processed messages via XACK.
Source code in src/codex_platform/streams/consumer.py
Functions
ensure_group()
async
Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM).
Source code in src/codex_platform/streams/consumer.py
read(count=10)
async
Read new events from the stream for the consumer group (XREADGROUP).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Maximum number of messages to fetch per call. Defaults to |
10
|
Returns:
| Type | Description |
|---|---|
list[StreamEvent]
|
List of :class: |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Source code in src/codex_platform/streams/consumer.py
ack(event_id)
async
Acknowledge that an event has been processed (XACK).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
str
|
Stream entry ID returned by :meth: |
required |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |