redis — Redis Stream integration
RedisRouter
RedisRouter
Router for grouping Redis Stream handlers by message type.
Used in feature_setting.py of Redis features. After creation, it is connected to BotRedisDispatcher via include_router().
Example
redis_router = RedisRouter()
@redis_router.message("notification.created")
async def handle_notification(payload: dict, container) -> None:
user_id = payload["user_id"]
...
# With a filter:
@redis_router.message("notification.created", filter_func=lambda p: p.get("urgent"))
async def handle_urgent(payload: dict, container) -> None:
...
Source code in src/codex_bot/redis/router.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | |
Attributes
handlers
property
Registered handlers (read-only view).
Functions
message(message_type, filter_func=None)
Decorator for registering a handler for a Redis Stream message type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
str
|
String message type (e.g., "booking.confirmed"). |
required |
filter_func
|
FilterFunc | None
|
Optional filter — callable(payload) -> bool. The handler is called only if the filter returns True. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[HandlerFunc], HandlerFunc]
|
Decorator returning the original handler unchanged. |
Source code in src/codex_bot/redis/router.py
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | |
BotRedisDispatcher
BotRedisDispatcher
Redis Stream message dispatcher for a Telegram bot.
Allows registering handlers via decorators (@dispatcher.on_message) or via routers (include_router). When processing a message:
- Determines the type by the
"type"field in the payload. - Finds suitable handlers (considering filters).
- On handler error — if a
retry_schedulerexists, passes the task to it and returns control (Stream ACK will occur in the processor). If there is no scheduler or it fails — re-raises the exception (no ACK).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_scheduler
|
RetrySchedulerProtocol | None
|
Optional retry scheduler (ARQ, etc.). |
None
|
Example
dispatcher = BotRedisDispatcher()
@dispatcher.on_message("booking.confirmed")
async def on_booking(payload: dict, container) -> None:
await container.notification_service.send(payload["user_id"])
# Connecting a router from a feature:
dispatcher.include_router(notifications_redis_router)
# Setting dependencies before start:
dispatcher.setup(container=container)
await dispatcher.process_message({"type": "booking.confirmed", ...})
Source code in src/codex_bot/redis/dispatcher.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | |
Functions
include_router(router)
Connects a RedisRouter with its handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
router
|
RedisRouter
|
Router from a feature. |
required |
Source code in src/codex_bot/redis/dispatcher.py
89 90 91 92 93 94 95 96 97 98 99 100 | |
on_message(message_type, filter_func=None)
Decorator for registering a handler directly in the dispatcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
str
|
Redis Stream message type. |
required |
filter_func
|
FilterFunc | None
|
Optional payload -> bool filter. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[HandlerFunc], HandlerFunc]
|
Decorator. |
Source code in src/codex_bot/redis/dispatcher.py
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | |
process_message(message_data)
async
Dispatches an incoming Redis Stream message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_data
|
dict[str, Any]
|
Message payload. Required field: |
required |
Source code in src/codex_bot/redis/dispatcher.py
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | |
setup(container)
Sets the DI container before processing begins.
If a handler needs a Bot to send messages — it retrieves it
directly from container.bot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
container
|
Any
|
Project's DI container. |
required |
Source code in src/codex_bot/redis/dispatcher.py
77 78 79 80 81 82 83 84 85 86 87 | |
RedisStreamProcessor
RedisStreamProcessor
Redis Stream processor: reads messages and passes them to a callback.
Starts a background asyncio task (_consume_loop) that continuously reads messages from Redis Stream via a Consumer Group. On errors — recreates the group (e.g., after a Redis restart).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
StreamStorageProtocol
|
Stream adapter (StreamStorageProtocol). |
required |
stream_name
|
str
|
Redis Stream name (e.g., "bot_events"). |
required |
consumer_group_name
|
str
|
Consumer Group name. |
required |
consumer_name
|
str
|
Unique consumer name (e.g., "bot-worker-1"). |
required |
batch_count
|
int
|
Number of messages per reading cycle. |
10
|
poll_interval
|
float
|
Wait interval (sec) if there are no messages. |
1.0
|
Example
processor = RedisStreamProcessor(
storage=redis_stream_adapter,
stream_name="bot_events",
consumer_group_name="bot_group",
consumer_name="bot-1",
)
processor.set_message_callback(dispatcher.process_message)
await processor.start_listening()
Source code in src/codex_bot/redis/stream_processor.py
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |
Functions
set_message_callback(callback)
Sets the callback for processing each message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
MessageCallback
|
Async callable(payload: dict) -> None. |
required |
Source code in src/codex_bot/redis/stream_processor.py
123 124 125 126 127 128 129 130 | |
start_listening()
async
Starts the background Stream reading loop.
Creates a Consumer Group (up to 5 attempts), then starts _consume_loop as an asyncio Task.
Source code in src/codex_bot/redis/stream_processor.py
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | |
stop_listening()
async
Stops the reading loop and correctly cancels the asyncio Task.
Source code in src/codex_bot/redis/stream_processor.py
162 163 164 165 166 167 168 169 | |
Protocols
RetrySchedulerProtocol
Protocol for a retry scheduler (ARQ or similar).
Implement and pass to BotRedisDispatcher for automatic rescheduling of failed messages to a retry queue.
Source code in src/codex_bot/redis/dispatcher.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | |
Functions
schedule_retry(stream_name, payload, delay=60)
async
Schedules message reprocessing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Redis Stream name. |
required |
payload
|
dict[str, Any]
|
Message data for reprocessing. |
required |
delay
|
int
|
Delay in seconds before retry. |
60
|
Source code in src/codex_bot/redis/dispatcher.py
26 27 28 29 30 31 32 33 34 35 | |
StreamStorageProtocol
Bases: Protocol
Redis Stream adapter protocol for RedisStreamProcessor.
Implement in the project on top of redis-py or any other client.
Example
class RedisStreamAdapter:
def __init__(self, redis: Redis): self.redis = redis
async def create_group(self, stream: str, group: str) -> None:
with contextlib.suppress(ResponseError):
await self.redis.xgroup_create(stream, group, id="0", mkstream=True)
async def read_events(self, stream_name, group_name, consumer_name, count):
result = await self.redis.xreadgroup(
group_name, consumer_name, {stream_name: ">"}, count=count, block=0
)
if not result:
return []
return [(msg_id, data) for _, messages in result for msg_id, data in messages]
async def ack_event(self, stream_name, group_name, message_id) -> None:
await self.redis.xack(stream_name, group_name, message_id)
Source code in src/codex_bot/redis/stream_processor.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | |
Functions
ack_event(stream_name, group_name, message_id)
async
Acknowledges message processing (XACK).
Source code in src/codex_bot/redis/stream_processor.py
66 67 68 | |
create_group(stream_name, group_name)
async
Creates a Consumer Group (idempotently).
Source code in src/codex_bot/redis/stream_processor.py
47 48 49 | |
read_events(stream_name, group_name, consumer_name, count)
async
Reads a batch of unread messages from the group.
Returns:
| Type | Description |
|---|---|
list[tuple[str, dict[str, Any]]]
|
List of pairs (message_id, data_dict). |
Source code in src/codex_bot/redis/stream_processor.py
51 52 53 54 55 56 57 58 59 60 61 62 63 64 | |