Skip to content

redis — Redis Stream processing and event routing

Infrastructure for building Event-Driven architecture based on Redis Streams.

Dispatcher

Central hub for event distribution.

BotRedisDispatcher

Redis Stream message dispatcher for a Telegram bot.

Allows registering handlers via decorators or routers. Orchestrates the dispatching of incoming messages by type, supporting filtering and automated retry scheduling via external protocols.

Parameters:

Name Type Description Default
retry_scheduler RetrySchedulerProtocol | None

Optional retry scheduler (ARQ, etc.).

None
Example
dispatcher = BotRedisDispatcher()

@dispatcher.on_message("booking.confirmed")
async def on_booking(payload: dict, container) -> None:
    await container.notification_service.send(payload["user_id"])

# Connecting a router from a feature:
dispatcher.include_router(notifications_redis_router)

# Setting dependencies before start:
dispatcher.setup(container=container)
await dispatcher.process_message({"type": "booking.confirmed", ...})
Source code in src/codex_bot/redis/dispatcher.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class BotRedisDispatcher:
    """Redis Stream message dispatcher for a Telegram bot.

    Allows registering handlers via decorators or routers. Orchestrates the
    dispatching of incoming messages by type, supporting filtering and
    automated retry scheduling via external protocols.

    Args:
        retry_scheduler: Optional retry scheduler (ARQ, etc.).

    Example:
        ```python
        dispatcher = BotRedisDispatcher()

        @dispatcher.on_message("booking.confirmed")
        async def on_booking(payload: dict, container) -> None:
            await container.notification_service.send(payload["user_id"])

        # Connecting a router from a feature:
        dispatcher.include_router(notifications_redis_router)

        # Setting dependencies before start:
        dispatcher.setup(container=container)
        await dispatcher.process_message({"type": "booking.confirmed", ...})
        ```
    """

    def __init__(self, retry_scheduler: RetrySchedulerProtocol | None = None) -> None:
        """Initializes the dispatcher with an empty router list.

        Args:
            retry_scheduler: Optional service for message retries.
        """
        self._container: Any | None = None
        self._retry_scheduler = retry_scheduler
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}
        log.info("BotRedisDispatcher | initialized")

    def setup(self, container: Any) -> None:
        """Sets the DI container before processing begins.

        If a handler needs a Bot to send messages - it retrieves it
        directly from ``container.bot``.

        Args:
            container: Project's DI container.
        """
        self._container = container

    def include_router(self, router: RedisRouter) -> None:
        """Connects a RedisRouter with its handlers.

        Args:
            router: Router from a feature.
        """
        for message_type, handlers in router.handlers.items():
            if message_type not in self._handlers:
                self._handlers[message_type] = []
            self._handlers[message_type].extend(handlers)
        log.info(f"BotRedisDispatcher | included router types={list(router.handlers.keys())}")

    def on_message(
        self,
        message_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler directly in the dispatcher.

        Args:
            message_type: Redis Stream message type.
            filter_func: Optional payload -> bool filter.

        Returns:
            Decorator function.
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if message_type not in self._handlers:
                self._handlers[message_type] = []
            self._handlers[message_type].append((handler, filter_func))
            return handler

        return decorator

    async def process_message(self, message_data: dict[str, Any], stream_name: str = "bot_events") -> None:
        """Dispatches an incoming Redis Stream message.

        Args:
            message_data: Message payload. Required field: ``"type"``.
            stream_name: Redis Stream name for retry scheduling. Defaults to ``"bot_events"``.
        """
        if not self._container:
            log.error("BotRedisDispatcher | container not set — call setup() first")
            return

        msg_type = message_data.get("type")
        if not msg_type:
            log.warning(f"BotRedisDispatcher | message without 'type' field: {message_data}")
            return

        handlers = self._handlers.get(msg_type, [])
        if not handlers:
            log.debug(f"BotRedisDispatcher | no handlers for type='{msg_type}'")
            return

        for handler, filter_func in handlers:
            try:
                # Filter check
                if filter_func is None or filter_func(message_data):
                    log.debug(f"BotRedisDispatcher | calling {handler.__name__} for type='{msg_type}'")
                    await handler(message_data, self._container)
            except Exception as e:
                log.error(f"BotRedisDispatcher | handler {handler.__name__} failed: {e}")

                # Attempt to schedule retry if scheduler is available
                if self._retry_scheduler:
                    try:
                        await self._retry_scheduler.schedule_retry(
                            stream_name=stream_name,
                            payload=message_data,
                            delay=60,
                        )
                        log.info(f"BotRedisDispatcher | retry scheduled for type='{msg_type}'")
                        # Responsibility passed to scheduler — ACK will occur in processor
                        return
                    except Exception as retry_err:
                        log.error(f"BotRedisDispatcher | retry scheduling failed: {retry_err}")

                # No scheduler or it failed — re-raise, no ACK (PEL)
                raise

Functions

__init__(retry_scheduler=None)

Initializes the dispatcher with an empty router list.

Parameters:

Name Type Description Default
retry_scheduler RetrySchedulerProtocol | None

Optional service for message retries.

None
Source code in src/codex_bot/redis/dispatcher.py
67
68
69
70
71
72
73
74
75
76
def __init__(self, retry_scheduler: RetrySchedulerProtocol | None = None) -> None:
    """Initializes the dispatcher with an empty router list.

    Args:
        retry_scheduler: Optional service for message retries.
    """
    self._container: Any | None = None
    self._retry_scheduler = retry_scheduler
    self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}
    log.info("BotRedisDispatcher | initialized")

include_router(router)

Connects a RedisRouter with its handlers.

Parameters:

Name Type Description Default
router RedisRouter

Router from a feature.

required
Source code in src/codex_bot/redis/dispatcher.py
89
90
91
92
93
94
95
96
97
98
99
def include_router(self, router: RedisRouter) -> None:
    """Connects a RedisRouter with its handlers.

    Args:
        router: Router from a feature.
    """
    for message_type, handlers in router.handlers.items():
        if message_type not in self._handlers:
            self._handlers[message_type] = []
        self._handlers[message_type].extend(handlers)
    log.info(f"BotRedisDispatcher | included router types={list(router.handlers.keys())}")

on_message(message_type, filter_func=None)

Decorator for registering a handler directly in the dispatcher.

Parameters:

Name Type Description Default
message_type str

Redis Stream message type.

required
filter_func FilterFunc | None

Optional payload -> bool filter.

None

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator function.

Source code in src/codex_bot/redis/dispatcher.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def on_message(
    self,
    message_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler directly in the dispatcher.

    Args:
        message_type: Redis Stream message type.
        filter_func: Optional payload -> bool filter.

    Returns:
        Decorator function.
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if message_type not in self._handlers:
            self._handlers[message_type] = []
        self._handlers[message_type].append((handler, filter_func))
        return handler

    return decorator

process_message(message_data, stream_name='bot_events') async

Dispatches an incoming Redis Stream message.

Parameters:

Name Type Description Default
message_data dict[str, Any]

Message payload. Required field: "type".

required
stream_name str

Redis Stream name for retry scheduling. Defaults to "bot_events".

'bot_events'
Source code in src/codex_bot/redis/dispatcher.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def process_message(self, message_data: dict[str, Any], stream_name: str = "bot_events") -> None:
    """Dispatches an incoming Redis Stream message.

    Args:
        message_data: Message payload. Required field: ``"type"``.
        stream_name: Redis Stream name for retry scheduling. Defaults to ``"bot_events"``.
    """
    if not self._container:
        log.error("BotRedisDispatcher | container not set — call setup() first")
        return

    msg_type = message_data.get("type")
    if not msg_type:
        log.warning(f"BotRedisDispatcher | message without 'type' field: {message_data}")
        return

    handlers = self._handlers.get(msg_type, [])
    if not handlers:
        log.debug(f"BotRedisDispatcher | no handlers for type='{msg_type}'")
        return

    for handler, filter_func in handlers:
        try:
            # Filter check
            if filter_func is None or filter_func(message_data):
                log.debug(f"BotRedisDispatcher | calling {handler.__name__} for type='{msg_type}'")
                await handler(message_data, self._container)
        except Exception as e:
            log.error(f"BotRedisDispatcher | handler {handler.__name__} failed: {e}")

            # Attempt to schedule retry if scheduler is available
            if self._retry_scheduler:
                try:
                    await self._retry_scheduler.schedule_retry(
                        stream_name=stream_name,
                        payload=message_data,
                        delay=60,
                    )
                    log.info(f"BotRedisDispatcher | retry scheduled for type='{msg_type}'")
                    # Responsibility passed to scheduler — ACK will occur in processor
                    return
                except Exception as retry_err:
                    log.error(f"BotRedisDispatcher | retry scheduling failed: {retry_err}")

            # No scheduler or it failed — re-raise, no ACK (PEL)
            raise

setup(container)

Sets the DI container before processing begins.

If a handler needs a Bot to send messages - it retrieves it directly from container.bot.

Parameters:

Name Type Description Default
container Any

Project's DI container.

required
Source code in src/codex_bot/redis/dispatcher.py
78
79
80
81
82
83
84
85
86
87
def setup(self, container: Any) -> None:
    """Sets the DI container before processing begins.

    If a handler needs a Bot to send messages - it retrieves it
    directly from ``container.bot``.

    Args:
        container: Project's DI container.
    """
    self._container = container

Stream Processor

Polling loop for event streams (Consumer Group).

RedisStreamProcessor

Asynchronous engine for consuming and dispatching Redis Stream events.

This processor facilitates horizontal scaling by allowing multiple instances to share the same Redis Consumer Group. It manages the low-level lifecycle of stream consumption, including grouping, batching, and acknowledgments.

Key Features
  • Reliability: Guarantees at-least-once processing via XACK.
  • Distributed Scaling: Native support for Redis Consumer Groups.
  • Self-Healing: Automatically re-establishes group identity upon infrastructure failure (e.g., Redis flushes).

Parameters:

Name Type Description Default
storage StreamStorageProtocol

An implementation of StreamStorageProtocol for Redis interaction.

required
stream_name str

The identifier of the source Redis Stream.

required
consumer_group_name str

The shared group name for load balancing.

required
consumer_name str

Unique identifier for this specific processor instance.

required
batch_count int

Maximum messages to process per cycle.

10
poll_interval float

Idle wait duration when no messages are pending.

1.0
Source code in src/codex_bot/redis/stream_processor.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class RedisStreamProcessor:
    """Asynchronous engine for consuming and dispatching Redis Stream events.

    This processor facilitates horizontal scaling by allowing multiple
    instances to share the same Redis Consumer Group. It manages the
    low-level lifecycle of stream consumption, including grouping,
    batching, and acknowledgments.

    Key Features:
        - **Reliability**: Guarantees at-least-once processing via XACK.
        - **Distributed Scaling**: Native support for Redis Consumer Groups.
        - **Self-Healing**: Automatically re-establishes group identity
          upon infrastructure failure (e.g., Redis flushes).

    Args:
        storage: An implementation of `StreamStorageProtocol` for Redis interaction.
        stream_name: The identifier of the source Redis Stream.
        consumer_group_name: The shared group name for load balancing.
        consumer_name: Unique identifier for this specific processor instance.
        batch_count: Maximum messages to process per cycle.
        poll_interval: Idle wait duration when no messages are pending.
    """

    def __init__(
        self,
        storage: StreamStorageProtocol,
        stream_name: str,
        consumer_group_name: str,
        consumer_name: str,
        batch_count: int = 10,
        poll_interval: float = 1.0,
    ) -> None:
        self.storage = storage
        self.stream_name = stream_name
        self.group_name = consumer_group_name
        self.consumer_name = consumer_name
        self.batch_count = batch_count
        self.poll_interval = poll_interval

        self.is_running = False
        self._callback: MessageCallback | None = None
        self._task: asyncio.Task[None] | None = None
        self._last_recovery_time: float = 0.0

    def set_message_callback(self, callback: MessageCallback) -> None:
        """Sets the callback for processing each message.

        The callback should be an async function that takes a dictionary (payload).

        Args:
            callback: Async callable(payload: dict) -> None.
        """
        self._callback = callback

    async def start_listening(self) -> None:
        """Starts the background Stream reading loop.

        First, ensures that the consumer group exists (up to 5 attempts).
        Then, spawns the ``_consume_loop`` as a background task.
        """
        if self.is_running:
            log.warning("RedisStreamProcessor | already running")
            return

        # Attempt to create group with retry logic
        for attempt in range(1, 6):
            try:
                await self.storage.create_group(self.stream_name, self.group_name)
                break
            except Exception as e:
                log.warning(f"RedisStreamProcessor | create_group attempt {attempt}/5: {e}")
                if attempt < 5:
                    await asyncio.sleep(3)
                else:
                    log.error("RedisStreamProcessor | failed to create group, giving up")
                    return

        self.is_running = True
        self._task = asyncio.create_task(self._consume_loop())
        log.info(
            f"RedisStreamProcessor | listening stream='{self.stream_name}' "
            f"group='{self.group_name}' consumer='{self.consumer_name}'"
        )

    async def stop_listening(self) -> None:
        """Stops the reading loop and correctly cancels the asyncio Task.

        Ensures graceful shutdown by waiting for the current poll cycle to finish
        or be cancelled correctly.
        """
        self.is_running = False
        if self._task and not self._task.done():
            self._task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._task
        log.info("RedisStreamProcessor | stopped")

    async def _consume_loop(self) -> None:
        """Main infinite loop for consuming messages from the stream.

        Includes automatic PEL (Pending Entries List) recovery mechanism via XAUTOCLAIM.
        """
        try:
            while self.is_running:
                try:
                    # 1. Periodically recover stale messages from the PEL (every 60 seconds)
                    now = asyncio.get_event_loop().time()
                    if now - self._last_recovery_time > 60:
                        self._last_recovery_time = now
                        if hasattr(self.storage, "claim_stale_events"):
                            try:
                                stale_messages = await self.storage.claim_stale_events(
                                    stream_name=self.stream_name,
                                    group_name=self.group_name,
                                    consumer_name=self.consumer_name,
                                    min_idle_time=60000,  # 1 minute
                                    count=self.batch_count,
                                )
                                for message_id, data in stale_messages:
                                    await self._process_single(message_id, data)
                            except Exception as e:
                                log.warning(f"RedisStreamProcessor | PEL recovery failed: {e}")

                    # 2. Consume new messages
                    messages = await self.storage.read_events(
                        stream_name=self.stream_name,
                        group_name=self.group_name,
                        consumer_name=self.consumer_name,
                        count=self.batch_count,
                    )

                    if not messages:
                        await asyncio.sleep(self.poll_interval)
                        continue

                    for message_id, data in messages:
                        await self._process_single(message_id, data)

                except asyncio.CancelledError:
                    # Asyncio standard: re-raise CancelledError immediately
                    raise
                except Exception as e:
                    log.error(f"RedisStreamProcessor | consume loop error: {e}")
                    if "NOGROUP" in str(e):
                        log.warning("RedisStreamProcessor | consumer group missing, recreating...")
                        try:
                            await self.storage.create_group(self.stream_name, self.group_name)
                        except Exception as create_err:
                            log.error(f"RedisStreamProcessor | recreate failed: {create_err}")
                    await asyncio.sleep(5)
        except asyncio.CancelledError:
            log.info("RedisStreamProcessor | consume loop cancelled")
            raise

    async def _process_single(self, message_id: str, data: dict[str, Any]) -> None:
        """Processes a single message and acknowledges it on success.

        If the callback succeeds, calls ``XACK``. If it fails, the message
        remains in the PEL for future recovery or retry scheduling.

        Args:
            message_id: Unique Redis message ID.
            data: Payload of the message.
        """
        try:
            if self._callback:
                await self._callback(data)

            # Message processed successfully -> confirm to Redis
            await self.storage.ack_event(self.stream_name, self.group_name, message_id)
        except Exception as e:
            log.error(f"RedisStreamProcessor | failed to process message {message_id}: {e}")

Functions

set_message_callback(callback)

Sets the callback for processing each message.

The callback should be an async function that takes a dictionary (payload).

Parameters:

Name Type Description Default
callback MessageCallback

Async callable(payload: dict) -> None.

required
Source code in src/codex_bot/redis/stream_processor.py
164
165
166
167
168
169
170
171
172
def set_message_callback(self, callback: MessageCallback) -> None:
    """Sets the callback for processing each message.

    The callback should be an async function that takes a dictionary (payload).

    Args:
        callback: Async callable(payload: dict) -> None.
    """
    self._callback = callback

start_listening() async

Starts the background Stream reading loop.

First, ensures that the consumer group exists (up to 5 attempts). Then, spawns the _consume_loop as a background task.

Source code in src/codex_bot/redis/stream_processor.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def start_listening(self) -> None:
    """Starts the background Stream reading loop.

    First, ensures that the consumer group exists (up to 5 attempts).
    Then, spawns the ``_consume_loop`` as a background task.
    """
    if self.is_running:
        log.warning("RedisStreamProcessor | already running")
        return

    # Attempt to create group with retry logic
    for attempt in range(1, 6):
        try:
            await self.storage.create_group(self.stream_name, self.group_name)
            break
        except Exception as e:
            log.warning(f"RedisStreamProcessor | create_group attempt {attempt}/5: {e}")
            if attempt < 5:
                await asyncio.sleep(3)
            else:
                log.error("RedisStreamProcessor | failed to create group, giving up")
                return

    self.is_running = True
    self._task = asyncio.create_task(self._consume_loop())
    log.info(
        f"RedisStreamProcessor | listening stream='{self.stream_name}' "
        f"group='{self.group_name}' consumer='{self.consumer_name}'"
    )

stop_listening() async

Stops the reading loop and correctly cancels the asyncio Task.

Ensures graceful shutdown by waiting for the current poll cycle to finish or be cancelled correctly.

Source code in src/codex_bot/redis/stream_processor.py
204
205
206
207
208
209
210
211
212
213
214
215
async def stop_listening(self) -> None:
    """Stops the reading loop and correctly cancels the asyncio Task.

    Ensures graceful shutdown by waiting for the current poll cycle to finish
    or be cancelled correctly.
    """
    self.is_running = False
    if self._task and not self._task.done():
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task
    log.info("RedisStreamProcessor | stopped")

Router

Tool for registering event handlers.

RedisRouter

Router for grouping Redis Stream handlers by message type.

Used in feature_setting.py of Redis features. After creation, it is connected to BotRedisDispatcher via include_router().

Example
redis_router = RedisRouter()

@redis_router.message("notification.created")
async def handle_notification(payload: dict, container) -> None:
    user_id = payload["user_id"]
    ...

# With a filter:
@redis_router.message("notification.created", filter_func=lambda p: p.get("urgent"))
async def handle_urgent(payload: dict, container) -> None:
    ...
Source code in src/codex_bot/redis/router.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class RedisRouter:
    """
    Router for grouping Redis Stream handlers by message type.

    Used in feature_setting.py of Redis features. After creation,
    it is connected to BotRedisDispatcher via include_router().

    Example:
        ```python
        redis_router = RedisRouter()

        @redis_router.message("notification.created")
        async def handle_notification(payload: dict, container) -> None:
            user_id = payload["user_id"]
            ...

        # With a filter:
        @redis_router.message("notification.created", filter_func=lambda p: p.get("urgent"))
        async def handle_urgent(payload: dict, container) -> None:
            ...
        ```
    """

    def __init__(self) -> None:
        # {message_type: [(handler, filter_func), ...]}
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}

    def message(
        self,
        message_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """
        Decorator for registering a handler for a Redis Stream message type.

        Args:
            message_type: String message type (e.g., "booking.confirmed").
            filter_func: Optional filter — callable(payload) -> bool.
                         The handler is called only if the filter returns True.

        Returns:
            Decorator returning the original handler unchanged.
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if message_type not in self._handlers:
                self._handlers[message_type] = []
            self._handlers[message_type].append((handler, filter_func))
            return handler

        return decorator

    @property
    def handlers(self) -> dict[str, list[tuple[HandlerFunc, FilterFunc | None]]]:
        """Registered handlers (read-only view)."""
        return self._handlers

Attributes

handlers property

Registered handlers (read-only view).

Functions

message(message_type, filter_func=None)

Decorator for registering a handler for a Redis Stream message type.

Parameters:

Name Type Description Default
message_type str

String message type (e.g., "booking.confirmed").

required
filter_func FilterFunc | None

Optional filter — callable(payload) -> bool. The handler is called only if the filter returns True.

None

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator returning the original handler unchanged.

Source code in src/codex_bot/redis/router.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def message(
    self,
    message_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """
    Decorator for registering a handler for a Redis Stream message type.

    Args:
        message_type: String message type (e.g., "booking.confirmed").
        filter_func: Optional filter — callable(payload) -> bool.
                     The handler is called only if the filter returns True.

    Returns:
        Decorator returning the original handler unchanged.
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if message_type not in self._handlers:
            self._handlers[message_type] = []
        self._handlers[message_type].append((handler, filter_func))
        return handler

    return decorator

Protocols

StreamStorageProtocol

StreamStorageProtocol

Bases: Protocol

Redis Stream adapter protocol for RedisStreamProcessor.

Implement this protocol in the project on top of redis-py or any other client. It abstracts the underlying Redis Stream commands (XGROUP, XREADGROUP, XACK).

Example
class RedisStreamAdapter:
    def __init__(self, redis: Redis):
        self.redis = redis

    async def create_group(self, stream: str, group: str) -> None:
        with contextlib.suppress(ResponseError):
            await self.redis.xgroup_create(stream, group, id="0", mkstream=True)

    async def read_events(self, stream_name, group_name, consumer_name, count):
        result = await self.redis.xreadgroup(
            group_name, consumer_name, {stream_name: ">"}, count=count, block=0
        )
        if not result:
            return []
        return [(msg_id, data) for _, messages in result for msg_id, data in messages]

    async def ack_event(self, stream_name, group_name, message_id) -> None:
        await self.redis.xack(stream_name, group_name, message_id)
Source code in src/codex_bot/redis/stream_processor.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@runtime_checkable
class StreamStorageProtocol(Protocol):
    """Redis Stream adapter protocol for RedisStreamProcessor.

    Implement this protocol in the project on top of redis-py or any other client.
    It abstracts the underlying Redis Stream commands (XGROUP, XREADGROUP, XACK).

    Example:
        ```python
        class RedisStreamAdapter:
            def __init__(self, redis: Redis):
                self.redis = redis

            async def create_group(self, stream: str, group: str) -> None:
                with contextlib.suppress(ResponseError):
                    await self.redis.xgroup_create(stream, group, id="0", mkstream=True)

            async def read_events(self, stream_name, group_name, consumer_name, count):
                result = await self.redis.xreadgroup(
                    group_name, consumer_name, {stream_name: ">"}, count=count, block=0
                )
                if not result:
                    return []
                return [(msg_id, data) for _, messages in result for msg_id, data in messages]

            async def ack_event(self, stream_name, group_name, message_id) -> None:
                await self.redis.xack(stream_name, group_name, message_id)
        ```
    """

    async def create_group(self, stream_name: str, group_name: str) -> None:
        """Creates a Consumer Group (idempotently).

        Args:
            stream_name: Name of the Redis Stream.
            group_name: Name of the consumer group to create.
        """
        ...

    async def read_events(
        self,
        stream_name: str,
        group_name: str,
        consumer_name: str,
        count: int,
    ) -> list[tuple[str, dict[str, Any]]]:
        """Reads a batch of unread messages from the group.

        Uses the special ID '>' to fetch only messages that haven't been
        delivered to any other consumer in the group.

        Args:
            stream_name: Name of the stream.
            group_name: Consumer group name.
            consumer_name: Unique name of this processor instance.
            count: Maximum number of messages to fetch in one batch.

        Returns:
            List of pairs (message_id, data_dict). Empty list if no messages.
        """
        ...

    async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
        """Acknowledges successful message processing (XACK).

        Removing the message from the PEL (Pending Entries List) of the group.

        Args:
            stream_name: Name of the stream.
            group_name: Consumer group name.
            message_id: ID of the message to acknowledge.
        """
        ...

    async def claim_stale_events(
        self,
        stream_name: str,
        group_name: str,
        consumer_name: str,
        min_idle_time: int,
        count: int,
    ) -> list[tuple[str, dict[str, Any]]]:
        """Claims pending messages from the group that have been idle for a given time.

        Args:
            stream_name: Name of the stream.
            group_name: Consumer group name.
            consumer_name: Unique name of this processor instance (the new owner).
            min_idle_time: Minimum idle time in milliseconds before a message is claimed.
            count: Maximum number of messages to claim.

        Returns:
            List of pairs (message_id, data_dict).
        """
        ...

Functions

ack_event(stream_name, group_name, message_id) async

Acknowledges successful message processing (XACK).

Removing the message from the PEL (Pending Entries List) of the group.

Parameters:

Name Type Description Default
stream_name str

Name of the stream.

required
group_name str

Consumer group name.

required
message_id str

ID of the message to acknowledge.

required
Source code in src/codex_bot/redis/stream_processor.py
82
83
84
85
86
87
88
89
90
91
92
async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
    """Acknowledges successful message processing (XACK).

    Removing the message from the PEL (Pending Entries List) of the group.

    Args:
        stream_name: Name of the stream.
        group_name: Consumer group name.
        message_id: ID of the message to acknowledge.
    """
    ...

claim_stale_events(stream_name, group_name, consumer_name, min_idle_time, count) async

Claims pending messages from the group that have been idle for a given time.

Parameters:

Name Type Description Default
stream_name str

Name of the stream.

required
group_name str

Consumer group name.

required
consumer_name str

Unique name of this processor instance (the new owner).

required
min_idle_time int

Minimum idle time in milliseconds before a message is claimed.

required
count int

Maximum number of messages to claim.

required

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of pairs (message_id, data_dict).

Source code in src/codex_bot/redis/stream_processor.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def claim_stale_events(
    self,
    stream_name: str,
    group_name: str,
    consumer_name: str,
    min_idle_time: int,
    count: int,
) -> list[tuple[str, dict[str, Any]]]:
    """Claims pending messages from the group that have been idle for a given time.

    Args:
        stream_name: Name of the stream.
        group_name: Consumer group name.
        consumer_name: Unique name of this processor instance (the new owner).
        min_idle_time: Minimum idle time in milliseconds before a message is claimed.
        count: Maximum number of messages to claim.

    Returns:
        List of pairs (message_id, data_dict).
    """
    ...

create_group(stream_name, group_name) async

Creates a Consumer Group (idempotently).

Parameters:

Name Type Description Default
stream_name str

Name of the Redis Stream.

required
group_name str

Name of the consumer group to create.

required
Source code in src/codex_bot/redis/stream_processor.py
50
51
52
53
54
55
56
57
async def create_group(self, stream_name: str, group_name: str) -> None:
    """Creates a Consumer Group (idempotently).

    Args:
        stream_name: Name of the Redis Stream.
        group_name: Name of the consumer group to create.
    """
    ...

read_events(stream_name, group_name, consumer_name, count) async

Reads a batch of unread messages from the group.

Uses the special ID '>' to fetch only messages that haven't been delivered to any other consumer in the group.

Parameters:

Name Type Description Default
stream_name str

Name of the stream.

required
group_name str

Consumer group name.

required
consumer_name str

Unique name of this processor instance.

required
count int

Maximum number of messages to fetch in one batch.

required

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of pairs (message_id, data_dict). Empty list if no messages.

Source code in src/codex_bot/redis/stream_processor.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def read_events(
    self,
    stream_name: str,
    group_name: str,
    consumer_name: str,
    count: int,
) -> list[tuple[str, dict[str, Any]]]:
    """Reads a batch of unread messages from the group.

    Uses the special ID '>' to fetch only messages that haven't been
    delivered to any other consumer in the group.

    Args:
        stream_name: Name of the stream.
        group_name: Consumer group name.
        consumer_name: Unique name of this processor instance.
        count: Maximum number of messages to fetch in one batch.

    Returns:
        List of pairs (message_id, data_dict). Empty list if no messages.
    """
    ...

RetrySchedulerProtocol

RetrySchedulerProtocol

Bases: Protocol

Protocol for a retry scheduler (ARQ or similar).

Implement and pass to BotRedisDispatcher for automatic rescheduling of failed messages to a retry queue.

Source code in src/codex_bot/redis/dispatcher.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@runtime_checkable
class RetrySchedulerProtocol(Protocol):
    """Protocol for a retry scheduler (ARQ or similar).

    Implement and pass to BotRedisDispatcher for automatic
    rescheduling of failed messages to a retry queue.
    """

    async def schedule_retry(self, stream_name: str, payload: dict[str, Any], delay: int = 60) -> None:
        """Schedules message reprocessing.

        Args:
            stream_name: Redis Stream name.
            payload: Message data for reprocessing.
            delay: Delay in seconds before retry.
        """
        ...

Functions

schedule_retry(stream_name, payload, delay=60) async

Schedules message reprocessing.

Parameters:

Name Type Description Default
stream_name str

Redis Stream name.

required
payload dict[str, Any]

Message data for reprocessing.

required
delay int

Delay in seconds before retry.

60
Source code in src/codex_bot/redis/dispatcher.py
29
30
31
32
33
34
35
36
37
async def schedule_retry(self, stream_name: str, payload: dict[str, Any], delay: int = 60) -> None:
    """Schedules message reprocessing.

    Args:
        stream_name: Redis Stream name.
        payload: Message data for reprocessing.
        delay: Delay in seconds before retry.
    """
    ...