redis — Redis Stream processing and event routing
Infrastructure for building Event-Driven architecture based on Redis Streams.
Dispatcher
Central hub for event distribution.
BotRedisDispatcher
Redis Stream message dispatcher for a Telegram bot.
Allows registering handlers via decorators or routers. Orchestrates the dispatching of incoming messages by type, supporting filtering and automated retry scheduling via external protocols.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_scheduler
|
RetrySchedulerProtocol | None
|
Optional retry scheduler (ARQ, etc.). |
None
|
Example
dispatcher = BotRedisDispatcher()
@dispatcher.on_message("booking.confirmed")
async def on_booking(payload: dict, container) -> None:
await container.notification_service.send(payload["user_id"])
# Connecting a router from a feature:
dispatcher.include_router(notifications_redis_router)
# Setting dependencies before start:
dispatcher.setup(container=container)
await dispatcher.process_message({"type": "booking.confirmed", ...})
Source code in src/codex_bot/redis/dispatcher.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | |
Functions
__init__(retry_scheduler=None)
Initializes the dispatcher with an empty router list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_scheduler
|
RetrySchedulerProtocol | None
|
Optional service for message retries. |
None
|
Source code in src/codex_bot/redis/dispatcher.py
67 68 69 70 71 72 73 74 75 76 | |
include_router(router)
Connects a RedisRouter with its handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
router
|
RedisRouter
|
Router from a feature. |
required |
Source code in src/codex_bot/redis/dispatcher.py
89 90 91 92 93 94 95 96 97 98 99 | |
on_message(message_type, filter_func=None)
Decorator for registering a handler directly in the dispatcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
str
|
Redis Stream message type. |
required |
filter_func
|
FilterFunc | None
|
Optional payload -> bool filter. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[HandlerFunc], HandlerFunc]
|
Decorator function. |
Source code in src/codex_bot/redis/dispatcher.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | |
process_message(message_data, stream_name='bot_events')
async
Dispatches an incoming Redis Stream message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_data
|
dict[str, Any]
|
Message payload. Required field: |
required |
stream_name
|
str
|
Redis Stream name for retry scheduling. Defaults to |
'bot_events'
|
Source code in src/codex_bot/redis/dispatcher.py
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | |
setup(container)
Sets the DI container before processing begins.
If a handler needs a Bot to send messages - it retrieves it
directly from container.bot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container
|
Any
|
Project's DI container. |
required |
Source code in src/codex_bot/redis/dispatcher.py
78 79 80 81 82 83 84 85 86 87 | |
Stream Processor
Polling loop for event streams (Consumer Group).
RedisStreamProcessor
Asynchronous engine for consuming and dispatching Redis Stream events.
This processor facilitates horizontal scaling by allowing multiple instances to share the same Redis Consumer Group. It manages the low-level lifecycle of stream consumption, including grouping, batching, and acknowledgments.
Key Features
- Reliability: Guarantees at-least-once processing via XACK.
- Distributed Scaling: Native support for Redis Consumer Groups.
- Self-Healing: Automatically re-establishes group identity upon infrastructure failure (e.g., Redis flushes).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
StreamStorageProtocol
|
An implementation of |
required |
stream_name
|
str
|
The identifier of the source Redis Stream. |
required |
consumer_group_name
|
str
|
The shared group name for load balancing. |
required |
consumer_name
|
str
|
Unique identifier for this specific processor instance. |
required |
batch_count
|
int
|
Maximum messages to process per cycle. |
10
|
poll_interval
|
float
|
Idle wait duration when no messages are pending. |
1.0
|
Source code in src/codex_bot/redis/stream_processor.py
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 | |
Functions
set_message_callback(callback)
Sets the callback for processing each message.
The callback should be an async function that takes a dictionary (payload).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
MessageCallback
|
Async callable(payload: dict) -> None. |
required |
Source code in src/codex_bot/redis/stream_processor.py
164 165 166 167 168 169 170 171 172 | |
start_listening()
async
Starts the background Stream reading loop.
First, ensures that the consumer group exists (up to 5 attempts).
Then, spawns the _consume_loop as a background task.
Source code in src/codex_bot/redis/stream_processor.py
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | |
stop_listening()
async
Stops the reading loop and correctly cancels the asyncio Task.
Ensures graceful shutdown by waiting for the current poll cycle to finish or be cancelled correctly.
Source code in src/codex_bot/redis/stream_processor.py
204 205 206 207 208 209 210 211 212 213 214 215 | |
Router
Tool for registering event handlers.
RedisRouter
Router for grouping Redis Stream handlers by message type.
Used in feature_setting.py of Redis features. After creation, it is connected to BotRedisDispatcher via include_router().
Example
redis_router = RedisRouter()
@redis_router.message("notification.created")
async def handle_notification(payload: dict, container) -> None:
user_id = payload["user_id"]
...
# With a filter:
@redis_router.message("notification.created", filter_func=lambda p: p.get("urgent"))
async def handle_urgent(payload: dict, container) -> None:
...
Source code in src/codex_bot/redis/router.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | |
Attributes
handlers
property
Registered handlers (read-only view).
Functions
message(message_type, filter_func=None)
Decorator for registering a handler for a Redis Stream message type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
str
|
String message type (e.g., "booking.confirmed"). |
required |
filter_func
|
FilterFunc | None
|
Optional filter — callable(payload) -> bool. The handler is called only if the filter returns True. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[HandlerFunc], HandlerFunc]
|
Decorator returning the original handler unchanged. |
Source code in src/codex_bot/redis/router.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | |
Protocols
StreamStorageProtocol
StreamStorageProtocol
Bases: Protocol
Redis Stream adapter protocol for RedisStreamProcessor.
Implement this protocol in the project on top of redis-py or any other client. It abstracts the underlying Redis Stream commands (XGROUP, XREADGROUP, XACK).
Example
class RedisStreamAdapter:
def __init__(self, redis: Redis):
self.redis = redis
async def create_group(self, stream: str, group: str) -> None:
with contextlib.suppress(ResponseError):
await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
async def read_events(self, stream_name, group_name, consumer_name, count):
result = await self.redis.xreadgroup(
group_name, consumer_name, {stream_name: ">"}, count=count, block=0
)
if not result:
return []
return [(msg_id, data) for _, messages in result for msg_id, data in messages]
async def ack_event(self, stream_name, group_name, message_id) -> None:
await self.redis.xack(stream_name, group_name, message_id)
Source code in src/codex_bot/redis/stream_processor.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | |
Functions
ack_event(stream_name, group_name, message_id)
async
Acknowledges successful message processing (XACK).
Removing the message from the PEL (Pending Entries List) of the group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Name of the stream. |
required |
group_name
|
str
|
Consumer group name. |
required |
message_id
|
str
|
ID of the message to acknowledge. |
required |
Source code in src/codex_bot/redis/stream_processor.py
82 83 84 85 86 87 88 89 90 91 92 | |
claim_stale_events(stream_name, group_name, consumer_name, min_idle_time, count)
async
Claims pending messages from the group that have been idle for a given time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Name of the stream. |
required |
group_name
|
str
|
Consumer group name. |
required |
consumer_name
|
str
|
Unique name of this processor instance (the new owner). |
required |
min_idle_time
|
int
|
Minimum idle time in milliseconds before a message is claimed. |
required |
count
|
int
|
Maximum number of messages to claim. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, dict[str, Any]]]
|
List of pairs (message_id, data_dict). |
Source code in src/codex_bot/redis/stream_processor.py
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | |
create_group(stream_name, group_name)
async
Creates a Consumer Group (idempotently).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Name of the Redis Stream. |
required |
group_name
|
str
|
Name of the consumer group to create. |
required |
Source code in src/codex_bot/redis/stream_processor.py
50 51 52 53 54 55 56 57 | |
read_events(stream_name, group_name, consumer_name, count)
async
Reads a batch of unread messages from the group.
Uses the special ID '>' to fetch only messages that haven't been delivered to any other consumer in the group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Name of the stream. |
required |
group_name
|
str
|
Consumer group name. |
required |
consumer_name
|
str
|
Unique name of this processor instance. |
required |
count
|
int
|
Maximum number of messages to fetch in one batch. |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, dict[str, Any]]]
|
List of pairs (message_id, data_dict). Empty list if no messages. |
Source code in src/codex_bot/redis/stream_processor.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | |
RetrySchedulerProtocol
RetrySchedulerProtocol
Bases: Protocol
Protocol for a retry scheduler (ARQ or similar).
Implement and pass to BotRedisDispatcher for automatic rescheduling of failed messages to a retry queue.
Source code in src/codex_bot/redis/dispatcher.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | |
Functions
schedule_retry(stream_name, payload, delay=60)
async
Schedules message reprocessing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Redis Stream name. |
required |
payload
|
dict[str, Any]
|
Message data for reprocessing. |
required |
delay
|
int
|
Delay in seconds before retry. |
60
|
Source code in src/codex_bot/redis/dispatcher.py
29 30 31 32 33 34 35 36 37 | |