streams.producer
producer
codex_platform.streams.producer
Redis Stream producer — writes events via XADD.
Classes
StreamReplyTimeoutError
StreamProducer
Writes events to a Redis Stream (XADD).
Encodes structured payload values as JSON-prefixed Redis Stream fields.
Source code in src/codex_platform/streams/producer.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | |
Functions
publish(event_type, data, *, correlation_id=None)
async
Append an event to the Redis Stream (XADD).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
str
|
Event type label, e.g. |
required |
data
|
dict[str, Any]
|
Event payload. |
required |
correlation_id
|
str | None
|
Optional request/reply correlation id. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The ID of the newly added stream entry (e.g. |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Source code in src/codex_platform/streams/producer.py
request(event_type, data, *, timeout=30.0, correlation_id=None)
async
Publish an event and wait for a basic Redis-list reply.
Replies are read from reply:{correlation_id} using BRPOP.
Source code in src/codex_platform/streams/producer.py
publish_reply(correlation_id, data, *, ttl=None)
async
Publish a basic request/reply response to reply:{correlation_id}.