Skip to content

redis — Redis Stream integration

RedisRouter

RedisRouter

Router for grouping Redis Stream handlers by message type.

Used in feature_setting.py of Redis features. After creation, it is connected to BotRedisDispatcher via include_router().

Example
redis_router = RedisRouter()

@redis_router.message("notification.created")
async def handle_notification(payload: dict, container) -> None:
    user_id = payload["user_id"]
    ...

# With a filter:
@redis_router.message("notification.created", filter_func=lambda p: p.get("urgent"))
async def handle_urgent(payload: dict, container) -> None:
    ...
Source code in src/codex_bot/redis/router.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class RedisRouter:
    """
    Router for grouping Redis Stream handlers by message type.

    Used in feature_setting.py of Redis features. After creation,
    it is connected to BotRedisDispatcher via include_router().

    Example:
        ```python
        redis_router = RedisRouter()

        @redis_router.message("notification.created")
        async def handle_notification(payload: dict, container) -> None:
            user_id = payload["user_id"]
            ...

        # With a filter:
        @redis_router.message("notification.created", filter_func=lambda p: p.get("urgent"))
        async def handle_urgent(payload: dict, container) -> None:
            ...
        ```
    """

    def __init__(self) -> None:
        # {message_type: [(handler, filter_func), ...]}
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}

    def message(
        self,
        message_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """
        Decorator for registering a handler for a Redis Stream message type.

        Args:
            message_type: String message type (e.g., "booking.confirmed").
            filter_func: Optional filter — callable(payload) -> bool.
                         The handler is called only if the filter returns True.

        Returns:
            Decorator returning the original handler unchanged.
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if message_type not in self._handlers:
                self._handlers[message_type] = []
            self._handlers[message_type].append((handler, filter_func))
            return handler

        return decorator

    @property
    def handlers(self) -> dict[str, list[tuple[HandlerFunc, FilterFunc | None]]]:
        """Registered handlers (read-only view)."""
        return self._handlers

Attributes

handlers property

Registered handlers (read-only view).

Functions

message(message_type, filter_func=None)

Decorator for registering a handler for a Redis Stream message type.

Parameters:

Name Type Description Default
message_type str

String message type (e.g., "booking.confirmed").

required
filter_func FilterFunc | None

Optional filter — callable(payload) -> bool. The handler is called only if the filter returns True.

None

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator returning the original handler unchanged.

Source code in src/codex_bot/redis/router.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def message(
    self,
    message_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """
    Decorator for registering a handler for a Redis Stream message type.

    Args:
        message_type: String message type (e.g., "booking.confirmed").
        filter_func: Optional filter — callable(payload) -> bool.
                     The handler is called only if the filter returns True.

    Returns:
        Decorator returning the original handler unchanged.
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if message_type not in self._handlers:
            self._handlers[message_type] = []
        self._handlers[message_type].append((handler, filter_func))
        return handler

    return decorator

BotRedisDispatcher

BotRedisDispatcher

Redis Stream message dispatcher for a Telegram bot.

Allows registering handlers via decorators (@dispatcher.on_message) or via routers (include_router). When processing a message:

  1. Determines the type by the "type" field in the payload.
  2. Finds suitable handlers (considering filters).
  3. On handler error — if a retry_scheduler exists, passes the task to it and returns control (Stream ACK will occur in the processor). If there is no scheduler or it fails — re-raises the exception (no ACK).

Parameters:

Name Type Description Default
retry_scheduler RetrySchedulerProtocol | None

Optional retry scheduler (ARQ, etc.).

None
Example
dispatcher = BotRedisDispatcher()

@dispatcher.on_message("booking.confirmed")
async def on_booking(payload: dict, container) -> None:
    await container.notification_service.send(payload["user_id"])

# Connecting a router from a feature:
dispatcher.include_router(notifications_redis_router)

# Setting dependencies before start:
dispatcher.setup(container=container)
await dispatcher.process_message({"type": "booking.confirmed", ...})
Source code in src/codex_bot/redis/dispatcher.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class BotRedisDispatcher:
    """
    Redis Stream message dispatcher for a Telegram bot.

    Allows registering handlers via decorators (@dispatcher.on_message)
    or via routers (include_router). When processing a message:

    1. Determines the type by the ``"type"`` field in the payload.
    2. Finds suitable handlers (considering filters).
    3. On handler error — if a ``retry_scheduler`` exists, passes the
       task to it and returns control (Stream ACK will occur in the processor).
       If there is no scheduler or it fails — re-raises the exception (no ACK).

    Args:
        retry_scheduler: Optional retry scheduler (ARQ, etc.).

    Example:
        ```python
        dispatcher = BotRedisDispatcher()

        @dispatcher.on_message("booking.confirmed")
        async def on_booking(payload: dict, container) -> None:
            await container.notification_service.send(payload["user_id"])

        # Connecting a router from a feature:
        dispatcher.include_router(notifications_redis_router)

        # Setting dependencies before start:
        dispatcher.setup(container=container)
        await dispatcher.process_message({"type": "booking.confirmed", ...})
        ```
    """

    def __init__(self, retry_scheduler: RetrySchedulerProtocol | None = None) -> None:
        self._container: Any = None
        self._retry_scheduler = retry_scheduler
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}
        log.info("BotRedisDispatcher | initialized")

    def setup(self, container: Any) -> None:
        """
        Sets the DI container before processing begins.

        If a handler needs a Bot to send messages — it retrieves it
        directly from ``container.bot``.

        Args:
            container: Project's DI container.
        """
        self._container = container

    def include_router(self, router: RedisRouter) -> None:
        """
        Connects a RedisRouter with its handlers.

        Args:
            router: Router from a feature.
        """
        for message_type, handlers in router.handlers.items():
            if message_type not in self._handlers:
                self._handlers[message_type] = []
            self._handlers[message_type].extend(handlers)
        log.info(f"BotRedisDispatcher | included router types={list(router.handlers.keys())}")

    def on_message(
        self,
        message_type: str,
        filter_func: "FilterFunc | None" = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """
        Decorator for registering a handler directly in the dispatcher.

        Args:
            message_type: Redis Stream message type.
            filter_func: Optional payload -> bool filter.

        Returns:
            Decorator.
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if message_type not in self._handlers:
                self._handlers[message_type] = []
            self._handlers[message_type].append((handler, filter_func))
            return handler

        return decorator

    async def process_message(self, message_data: dict[str, Any]) -> None:
        """
        Dispatches an incoming Redis Stream message.

        Args:
            message_data: Message payload. Required field: ``"type"``.
        """
        if not self._container:
            log.error("BotRedisDispatcher | container not set — call setup() first")
            return

        msg_type = message_data.get("type")
        if not msg_type:
            log.warning(f"BotRedisDispatcher | message without 'type' field: {message_data}")
            return

        handlers = self._handlers.get(msg_type, [])
        if not handlers:
            log.debug(f"BotRedisDispatcher | no handlers for type='{msg_type}'")
            return

        for handler, filter_func in handlers:
            try:
                if filter_func is None or filter_func(message_data):
                    log.debug(f"BotRedisDispatcher | calling {handler.__name__} for type='{msg_type}'")
                    await handler(message_data, self._container)
            except Exception as e:
                log.error(f"BotRedisDispatcher | handler {handler.__name__} failed: {e}")

                if self._retry_scheduler:
                    try:
                        await self._retry_scheduler.schedule_retry(
                            stream_name="bot_events",
                            payload=message_data,
                            delay=60,
                        )
                        log.info(f"BotRedisDispatcher | retry scheduled for type='{msg_type}'")
                        # Responsibility passed to scheduler — ACK will occur in processor
                        return
                    except Exception as retry_err:
                        log.error(f"BotRedisDispatcher | retry scheduling failed: {retry_err}")

                # No scheduler or it failed — re-raise, no ACK (PEL)
                raise

Functions

include_router(router)

Connects a RedisRouter with its handlers.

Parameters:

Name Type Description Default
router RedisRouter

Router from a feature.

required
Source code in src/codex_bot/redis/dispatcher.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def include_router(self, router: RedisRouter) -> None:
    """
    Connects a RedisRouter with its handlers.

    Args:
        router: Router from a feature.
    """
    for message_type, handlers in router.handlers.items():
        if message_type not in self._handlers:
            self._handlers[message_type] = []
        self._handlers[message_type].extend(handlers)
    log.info(f"BotRedisDispatcher | included router types={list(router.handlers.keys())}")

on_message(message_type, filter_func=None)

Decorator for registering a handler directly in the dispatcher.

Parameters:

Name Type Description Default
message_type str

Redis Stream message type.

required
filter_func FilterFunc | None

Optional payload -> bool filter.

None

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator.

Source code in src/codex_bot/redis/dispatcher.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def on_message(
    self,
    message_type: str,
    filter_func: "FilterFunc | None" = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """
    Decorator for registering a handler directly in the dispatcher.

    Args:
        message_type: Redis Stream message type.
        filter_func: Optional payload -> bool filter.

    Returns:
        Decorator.
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if message_type not in self._handlers:
            self._handlers[message_type] = []
        self._handlers[message_type].append((handler, filter_func))
        return handler

    return decorator

process_message(message_data) async

Dispatches an incoming Redis Stream message.

Parameters:

Name Type Description Default
message_data dict[str, Any]

Message payload. Required field: "type".

required
Source code in src/codex_bot/redis/dispatcher.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def process_message(self, message_data: dict[str, Any]) -> None:
    """
    Dispatches an incoming Redis Stream message.

    Args:
        message_data: Message payload. Required field: ``"type"``.
    """
    if not self._container:
        log.error("BotRedisDispatcher | container not set — call setup() first")
        return

    msg_type = message_data.get("type")
    if not msg_type:
        log.warning(f"BotRedisDispatcher | message without 'type' field: {message_data}")
        return

    handlers = self._handlers.get(msg_type, [])
    if not handlers:
        log.debug(f"BotRedisDispatcher | no handlers for type='{msg_type}'")
        return

    for handler, filter_func in handlers:
        try:
            if filter_func is None or filter_func(message_data):
                log.debug(f"BotRedisDispatcher | calling {handler.__name__} for type='{msg_type}'")
                await handler(message_data, self._container)
        except Exception as e:
            log.error(f"BotRedisDispatcher | handler {handler.__name__} failed: {e}")

            if self._retry_scheduler:
                try:
                    await self._retry_scheduler.schedule_retry(
                        stream_name="bot_events",
                        payload=message_data,
                        delay=60,
                    )
                    log.info(f"BotRedisDispatcher | retry scheduled for type='{msg_type}'")
                    # Responsibility passed to scheduler — ACK will occur in processor
                    return
                except Exception as retry_err:
                    log.error(f"BotRedisDispatcher | retry scheduling failed: {retry_err}")

            # No scheduler or it failed — re-raise, no ACK (PEL)
            raise

setup(container)

Sets the DI container before processing begins.

If a handler needs a Bot to send messages — it retrieves it directly from container.bot.

Parameters:

Name Type Description Default
container Any

Project's DI container.

required
Source code in src/codex_bot/redis/dispatcher.py
77
78
79
80
81
82
83
84
85
86
87
def setup(self, container: Any) -> None:
    """
    Sets the DI container before processing begins.

    If a handler needs a Bot to send messages — it retrieves it
    directly from ``container.bot``.

    Args:
        container: Project's DI container.
    """
    self._container = container

RedisStreamProcessor

RedisStreamProcessor

Redis Stream processor: reads messages and passes them to a callback.

Starts a background asyncio task (_consume_loop) that continuously reads messages from Redis Stream via a Consumer Group. On errors — recreates the group (e.g., after a Redis restart).

Parameters:

Name Type Description Default
storage StreamStorageProtocol

Stream adapter (StreamStorageProtocol).

required
stream_name str

Redis Stream name (e.g., "bot_events").

required
consumer_group_name str

Consumer Group name.

required
consumer_name str

Unique consumer name (e.g., "bot-worker-1").

required
batch_count int

Number of messages per reading cycle.

10
poll_interval float

Wait interval (sec) if there are no messages.

1.0
Example
processor = RedisStreamProcessor(
    storage=redis_stream_adapter,
    stream_name="bot_events",
    consumer_group_name="bot_group",
    consumer_name="bot-1",
)
processor.set_message_callback(dispatcher.process_message)
await processor.start_listening()
Source code in src/codex_bot/redis/stream_processor.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class RedisStreamProcessor:
    """
    Redis Stream processor: reads messages and passes them to a callback.

    Starts a background asyncio task (_consume_loop) that continuously
    reads messages from Redis Stream via a Consumer Group.
    On errors — recreates the group (e.g., after a Redis restart).

    Args:
        storage: Stream adapter (StreamStorageProtocol).
        stream_name: Redis Stream name (e.g., "bot_events").
        consumer_group_name: Consumer Group name.
        consumer_name: Unique consumer name (e.g., "bot-worker-1").
        batch_count: Number of messages per reading cycle.
        poll_interval: Wait interval (sec) if there are no messages.

    Example:
        ```python
        processor = RedisStreamProcessor(
            storage=redis_stream_adapter,
            stream_name="bot_events",
            consumer_group_name="bot_group",
            consumer_name="bot-1",
        )
        processor.set_message_callback(dispatcher.process_message)
        await processor.start_listening()
        ```
    """

    def __init__(
        self,
        storage: StreamStorageProtocol,
        stream_name: str,
        consumer_group_name: str,
        consumer_name: str,
        batch_count: int = 10,
        poll_interval: float = 1.0,
    ) -> None:
        self.storage = storage
        self.stream_name = stream_name
        self.group_name = consumer_group_name
        self.consumer_name = consumer_name
        self.batch_count = batch_count
        self.poll_interval = poll_interval

        self.is_running = False
        self._callback: MessageCallback | None = None
        self._task: asyncio.Task[None] | None = None

    def set_message_callback(self, callback: MessageCallback) -> None:
        """
        Sets the callback for processing each message.

        Args:
            callback: Async callable(payload: dict) -> None.
        """
        self._callback = callback

    async def start_listening(self) -> None:
        """
        Starts the background Stream reading loop.

        Creates a Consumer Group (up to 5 attempts), then starts _consume_loop
        as an asyncio Task.
        """
        if self.is_running:
            log.warning("RedisStreamProcessor | already running")
            return

        for attempt in range(1, 6):
            try:
                await self.storage.create_group(self.stream_name, self.group_name)
                break
            except Exception as e:
                log.warning(f"RedisStreamProcessor | create_group attempt {attempt}/5: {e}")
                if attempt < 5:
                    await asyncio.sleep(3)
                else:
                    log.error("RedisStreamProcessor | failed to create group, giving up")
                    return

        self.is_running = True
        self._task = asyncio.create_task(self._consume_loop())
        log.info(
            f"RedisStreamProcessor | listening stream='{self.stream_name}' "
            f"group='{self.group_name}' consumer='{self.consumer_name}'"
        )

    async def stop_listening(self) -> None:
        """Stops the reading loop and correctly cancels the asyncio Task."""
        self.is_running = False
        if self._task and not self._task.done():
            self._task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._task
        log.info("RedisStreamProcessor | stopped")

    async def _consume_loop(self) -> None:
        try:
            while self.is_running:
                try:
                    messages = await self.storage.read_events(
                        stream_name=self.stream_name,
                        group_name=self.group_name,
                        consumer_name=self.consumer_name,
                        count=self.batch_count,
                    )

                    if not messages:
                        await asyncio.sleep(self.poll_interval)
                        continue

                    for message_id, data in messages:
                        await self._process_single(message_id, data)

                except asyncio.CancelledError:
                    raise  # propagate immediately, do not catch in general except
                except Exception as e:
                    log.error(f"RedisStreamProcessor | consume loop error: {e}")
                    if "NOGROUP" in str(e):
                        log.warning("RedisStreamProcessor | consumer group missing, recreating...")
                        try:
                            await self.storage.create_group(self.stream_name, self.group_name)
                        except Exception as create_err:
                            log.error(f"RedisStreamProcessor | recreate failed: {create_err}")
                    await asyncio.sleep(5)
        except asyncio.CancelledError:
            log.info("RedisStreamProcessor | consume loop cancelled")
            raise  # asyncio requires propagating CancelledError

    async def _process_single(self, message_id: str, data: dict[str, Any]) -> None:
        try:
            if self._callback:
                await self._callback(data)
            await self.storage.ack_event(self.stream_name, self.group_name, message_id)
        except Exception as e:
            log.error(f"RedisStreamProcessor | failed to process message {message_id}: {e}")

Functions

set_message_callback(callback)

Sets the callback for processing each message.

Parameters:

Name Type Description Default
callback MessageCallback

Async callable(payload: dict) -> None.

required
Source code in src/codex_bot/redis/stream_processor.py
123
124
125
126
127
128
129
130
def set_message_callback(self, callback: MessageCallback) -> None:
    """
    Sets the callback for processing each message.

    Args:
        callback: Async callable(payload: dict) -> None.
    """
    self._callback = callback

start_listening() async

Starts the background Stream reading loop.

Creates a Consumer Group (up to 5 attempts), then starts _consume_loop as an asyncio Task.

Source code in src/codex_bot/redis/stream_processor.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
async def start_listening(self) -> None:
    """
    Starts the background Stream reading loop.

    Creates a Consumer Group (up to 5 attempts), then starts _consume_loop
    as an asyncio Task.
    """
    if self.is_running:
        log.warning("RedisStreamProcessor | already running")
        return

    for attempt in range(1, 6):
        try:
            await self.storage.create_group(self.stream_name, self.group_name)
            break
        except Exception as e:
            log.warning(f"RedisStreamProcessor | create_group attempt {attempt}/5: {e}")
            if attempt < 5:
                await asyncio.sleep(3)
            else:
                log.error("RedisStreamProcessor | failed to create group, giving up")
                return

    self.is_running = True
    self._task = asyncio.create_task(self._consume_loop())
    log.info(
        f"RedisStreamProcessor | listening stream='{self.stream_name}' "
        f"group='{self.group_name}' consumer='{self.consumer_name}'"
    )

stop_listening() async

Stops the reading loop and correctly cancels the asyncio Task.

Source code in src/codex_bot/redis/stream_processor.py
162
163
164
165
166
167
168
169
async def stop_listening(self) -> None:
    """Stops the reading loop and correctly cancels the asyncio Task."""
    self.is_running = False
    if self._task and not self._task.done():
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task
    log.info("RedisStreamProcessor | stopped")

Protocols

RetrySchedulerProtocol

Protocol for a retry scheduler (ARQ or similar).

Implement and pass to BotRedisDispatcher for automatic rescheduling of failed messages to a retry queue.

Source code in src/codex_bot/redis/dispatcher.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class RetrySchedulerProtocol:
    """
    Protocol for a retry scheduler (ARQ or similar).

    Implement and pass to BotRedisDispatcher for automatic
    rescheduling of failed messages to a retry queue.
    """

    async def schedule_retry(self, stream_name: str, payload: dict[str, Any], delay: int = 60) -> None:
        """
        Schedules message reprocessing.

        Args:
            stream_name: Redis Stream name.
            payload: Message data for reprocessing.
            delay: Delay in seconds before retry.
        """
        ...

Functions

schedule_retry(stream_name, payload, delay=60) async

Schedules message reprocessing.

Parameters:

Name Type Description Default
stream_name str

Redis Stream name.

required
payload dict[str, Any]

Message data for reprocessing.

required
delay int

Delay in seconds before retry.

60
Source code in src/codex_bot/redis/dispatcher.py
26
27
28
29
30
31
32
33
34
35
async def schedule_retry(self, stream_name: str, payload: dict[str, Any], delay: int = 60) -> None:
    """
    Schedules message reprocessing.

    Args:
        stream_name: Redis Stream name.
        payload: Message data for reprocessing.
        delay: Delay in seconds before retry.
    """
    ...

StreamStorageProtocol

Bases: Protocol

Redis Stream adapter protocol for RedisStreamProcessor.

Implement in the project on top of redis-py or any other client.

Example
class RedisStreamAdapter:
    def __init__(self, redis: Redis): self.redis = redis

    async def create_group(self, stream: str, group: str) -> None:
        with contextlib.suppress(ResponseError):
            await self.redis.xgroup_create(stream, group, id="0", mkstream=True)

    async def read_events(self, stream_name, group_name, consumer_name, count):
        result = await self.redis.xreadgroup(
            group_name, consumer_name, {stream_name: ">"}, count=count, block=0
        )
        if not result:
            return []
        return [(msg_id, data) for _, messages in result for msg_id, data in messages]

    async def ack_event(self, stream_name, group_name, message_id) -> None:
        await self.redis.xack(stream_name, group_name, message_id)
Source code in src/codex_bot/redis/stream_processor.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@runtime_checkable
class StreamStorageProtocol(Protocol):
    """
    Redis Stream adapter protocol for RedisStreamProcessor.

    Implement in the project on top of redis-py or any other client.

    Example:
        ```python
        class RedisStreamAdapter:
            def __init__(self, redis: Redis): self.redis = redis

            async def create_group(self, stream: str, group: str) -> None:
                with contextlib.suppress(ResponseError):
                    await self.redis.xgroup_create(stream, group, id="0", mkstream=True)

            async def read_events(self, stream_name, group_name, consumer_name, count):
                result = await self.redis.xreadgroup(
                    group_name, consumer_name, {stream_name: ">"}, count=count, block=0
                )
                if not result:
                    return []
                return [(msg_id, data) for _, messages in result for msg_id, data in messages]

            async def ack_event(self, stream_name, group_name, message_id) -> None:
                await self.redis.xack(stream_name, group_name, message_id)
        ```
    """

    async def create_group(self, stream_name: str, group_name: str) -> None:
        """Creates a Consumer Group (idempotently)."""
        ...

    async def read_events(
        self,
        stream_name: str,
        group_name: str,
        consumer_name: str,
        count: int,
    ) -> list[tuple[str, dict[str, Any]]]:
        """
        Reads a batch of unread messages from the group.

        Returns:
            List of pairs (message_id, data_dict).
        """
        ...

    async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
        """Acknowledges message processing (XACK)."""
        ...

Functions

ack_event(stream_name, group_name, message_id) async

Acknowledges message processing (XACK).

Source code in src/codex_bot/redis/stream_processor.py
66
67
68
async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
    """Acknowledges message processing (XACK)."""
    ...

create_group(stream_name, group_name) async

Creates a Consumer Group (idempotently).

Source code in src/codex_bot/redis/stream_processor.py
47
48
49
async def create_group(self, stream_name: str, group_name: str) -> None:
    """Creates a Consumer Group (idempotently)."""
    ...

read_events(stream_name, group_name, consumer_name, count) async

Reads a batch of unread messages from the group.

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of pairs (message_id, data_dict).

Source code in src/codex_bot/redis/stream_processor.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async def read_events(
    self,
    stream_name: str,
    group_name: str,
    consumer_name: str,
    count: int,
) -> list[tuple[str, dict[str, Any]]]:
    """
    Reads a batch of unread messages from the group.

    Returns:
        List of pairs (message_id, data_dict).
    """
    ...