Skip to content

streams

streams

codex_platform.streams

Redis Streams broker — event sourcing / pub-sub layer.

Separate module from redis_service. Streams are a message broker, not a data structure.

Components: - StreamProducer — XADD (publish events) - StreamConsumer — XREADGROUP + XACK (low-level read) - StreamProcessor — background polling engine (wraps StreamConsumer) - StreamRouter — groups handlers by event type (per-feature) - StreamDispatcher — routes messages to handlers (generic, no DI)

Typical setup::

from codex_platform.streams import (
    StreamConsumer, StreamProducer,
    StreamProcessor, StreamRouter, StreamDispatcher,
)

# Producer
producer = StreamProducer(redis_client, stream_name="events:orders")

# Consumer + Processor
consumer = StreamConsumer(redis_client, "events:orders", "workers", "worker_1")
dispatcher = StreamDispatcher()

@dispatcher.on("order.paid")
async def handle_order(payload: dict) -> None:
    ...

processor = StreamProcessor(
    storage=consumer,
    stream_name="events:orders",
    consumer_group_name="workers",
    consumer_name="worker_1",
)
processor.set_callback(dispatcher.process)
await processor.start()

Classes

StreamConsumer

Reads events from a Redis Stream via a consumer group (XREADGROUP).

Acknowledges processed messages via XACK.

Source code in src/codex_platform/streams/consumer.py
class StreamConsumer:
    """Reads events from a Redis Stream via a consumer group (XREADGROUP).

    Acknowledges processed messages via XACK.
    """

    def __init__(self, client: Redis, stream_name: str, group: str, consumer: str) -> None:
        self.client = client
        self.stream_name = stream_name
        self.group = group
        self.consumer = consumer

    async def ensure_group(self) -> None:
        """Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM)."""
        try:
            await self.client.xgroup_create(self.stream_name, self.group, id="0", mkstream=True)
            log.info("StreamConsumer | group created stream='%s' group='%s'", self.stream_name, self.group)
        except RedisError as e:
            if "BUSYGROUP" in str(e):
                log.debug("StreamConsumer | group already exists stream='%s' group='%s'", self.stream_name, self.group)
            else:
                raise RedisServiceError(f"Failed to create group: {e}") from e

    async def read(self, count: int = 10) -> list[StreamEvent]:
        """Read new events from the stream for the consumer group (XREADGROUP).

        Args:
            count: Maximum number of messages to fetch per call. Defaults to ``10``.

        Returns:
            List of :class:`StreamEvent` instances. Empty list if no new messages.

        Raises:
            RedisConnectionError: Redis connection failure.
            RedisServiceError: Redis operation failure.
        """
        try:
            raw = await self.client.xreadgroup(
                groupname=self.group,
                consumername=self.consumer,
                streams={self.stream_name: ">"},
                count=count,
            )
            if not raw:
                return []
            events = [self._parse(msg_id, fields) for msg_id, fields in raw[0][1]]
            log.debug("StreamConsumer | read count=%d stream='%s'", len(events), self.stream_name)
            return events
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream consumer connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream consumer error: {e}") from e

    async def ack(self, event_id: str) -> None:
        """Acknowledge that an event has been processed (XACK).

        Args:
            event_id: Stream entry ID returned by :meth:`read`.

        Raises:
            RedisConnectionError: Redis connection failure.
            RedisServiceError: Redis operation failure.
        """
        try:
            await self.client.xack(self.stream_name, self.group, event_id)
            log.debug("StreamConsumer | ack id='%s' stream='%s'", event_id, self.stream_name)
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream ack connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream ack error: {e}") from e

    @staticmethod
    def _parse(msg_id: Any, fields: dict[bytes | str, bytes | str]) -> StreamEvent:
        data = {
            (k.decode() if isinstance(k, bytes) else k): (v.decode() if isinstance(v, bytes) else v)
            for k, v in fields.items()
        }
        event_type = data.pop("type", "unknown")
        return StreamEvent(id=str(msg_id), event_type=event_type, data=data)
Functions
ensure_group() async

Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM).

Source code in src/codex_platform/streams/consumer.py
async def ensure_group(self) -> None:
    """Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM)."""
    try:
        await self.client.xgroup_create(self.stream_name, self.group, id="0", mkstream=True)
        log.info("StreamConsumer | group created stream='%s' group='%s'", self.stream_name, self.group)
    except RedisError as e:
        if "BUSYGROUP" in str(e):
            log.debug("StreamConsumer | group already exists stream='%s' group='%s'", self.stream_name, self.group)
        else:
            raise RedisServiceError(f"Failed to create group: {e}") from e
read(count=10) async

Read new events from the stream for the consumer group (XREADGROUP).

Parameters:

Name Type Description Default
count int

Maximum number of messages to fetch per call. Defaults to 10.

10

Returns:

Type Description
list[StreamEvent]

List of :class:StreamEvent instances. Empty list if no new messages.

Raises:

Type Description
RedisConnectionError

Redis connection failure.

RedisServiceError

Redis operation failure.

Source code in src/codex_platform/streams/consumer.py
async def read(self, count: int = 10) -> list[StreamEvent]:
    """Read new events from the stream for the consumer group (XREADGROUP).

    Args:
        count: Maximum number of messages to fetch per call. Defaults to ``10``.

    Returns:
        List of :class:`StreamEvent` instances. Empty list if no new messages.

    Raises:
        RedisConnectionError: Redis connection failure.
        RedisServiceError: Redis operation failure.
    """
    try:
        raw = await self.client.xreadgroup(
            groupname=self.group,
            consumername=self.consumer,
            streams={self.stream_name: ">"},
            count=count,
        )
        if not raw:
            return []
        events = [self._parse(msg_id, fields) for msg_id, fields in raw[0][1]]
        log.debug("StreamConsumer | read count=%d stream='%s'", len(events), self.stream_name)
        return events
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream consumer connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream consumer error: {e}") from e
ack(event_id) async

Acknowledge that an event has been processed (XACK).

Parameters:

Name Type Description Default
event_id str

Stream entry ID returned by :meth:read.

required

Raises:

Type Description
RedisConnectionError

Redis connection failure.

RedisServiceError

Redis operation failure.

Source code in src/codex_platform/streams/consumer.py
async def ack(self, event_id: str) -> None:
    """Acknowledge that an event has been processed (XACK).

    Args:
        event_id: Stream entry ID returned by :meth:`read`.

    Raises:
        RedisConnectionError: Redis connection failure.
        RedisServiceError: Redis operation failure.
    """
    try:
        await self.client.xack(self.stream_name, self.group, event_id)
        log.debug("StreamConsumer | ack id='%s' stream='%s'", event_id, self.stream_name)
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream ack connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream ack error: {e}") from e

StreamEvent dataclass

Parsed Redis Stream event.

Source code in src/codex_platform/streams/consumer.py
@dataclass
class StreamEvent:
    """Parsed Redis Stream event."""

    id: str
    event_type: str
    data: dict[str, str]

RetrySchedulerProtocol

Bases: Protocol

Protocol for a retry scheduler (ARQ, Celery, etc.).

Pass to StreamDispatcher for automatic rescheduling of failed messages.

Source code in src/codex_platform/streams/dispatcher.py
@runtime_checkable
class RetrySchedulerProtocol(Protocol):
    """Protocol for a retry scheduler (ARQ, Celery, etc.).

    Pass to ``StreamDispatcher`` for automatic rescheduling of failed messages.
    """

    async def schedule_retry(
        self,
        stream_name: str,
        payload: dict[str, Any],
        delay: int = 60,
    ) -> None:
        """Schedules message reprocessing after a delay.

        Args:
            stream_name: Redis Stream name.
            payload:     Original message data.
            delay:       Retry delay in seconds.
        """
        ...
Functions
schedule_retry(stream_name, payload, delay=60) async

Schedules message reprocessing after a delay.

Parameters:

Name Type Description Default
stream_name str

Redis Stream name.

required
payload dict[str, Any]

Original message data.

required
delay int

Retry delay in seconds.

60
Source code in src/codex_platform/streams/dispatcher.py
async def schedule_retry(
    self,
    stream_name: str,
    payload: dict[str, Any],
    delay: int = 60,
) -> None:
    """Schedules message reprocessing after a delay.

    Args:
        stream_name: Redis Stream name.
        payload:     Original message data.
        delay:       Retry delay in seconds.
    """
    ...

StreamDispatcher

Routes Redis Stream messages to registered handlers by event type.

Handlers are registered via @dispatcher.on(event_type) decorator or by including StreamRouter instances.

On handler failure: if a retry_scheduler is provided, the message is scheduled for retry. Otherwise the exception is re-raised (message stays in PEL, unacknowledged).

Parameters:

Name Type Description Default
retry_scheduler RetrySchedulerProtocol | None

Optional retry scheduler implementing RetrySchedulerProtocol.

None

Example::

dispatcher = StreamDispatcher()

@dispatcher.on("user.registered")
async def welcome(payload: dict) -> None:
    await send_welcome_email(payload["email"])

processor.set_callback(dispatcher.process)
Source code in src/codex_platform/streams/dispatcher.py
class StreamDispatcher:
    """Routes Redis Stream messages to registered handlers by event type.

    Handlers are registered via ``@dispatcher.on(event_type)`` decorator
    or by including ``StreamRouter`` instances.

    On handler failure: if a ``retry_scheduler`` is provided, the message
    is scheduled for retry. Otherwise the exception is re-raised (message
    stays in PEL, unacknowledged).

    Args:
        retry_scheduler: Optional retry scheduler implementing ``RetrySchedulerProtocol``.

    Example::

        dispatcher = StreamDispatcher()

        @dispatcher.on("user.registered")
        async def welcome(payload: dict) -> None:
            await send_welcome_email(payload["email"])

        processor.set_callback(dispatcher.process)
    """

    def __init__(self, retry_scheduler: RetrySchedulerProtocol | None = None) -> None:
        self._retry_scheduler = retry_scheduler
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}
        log.info("StreamDispatcher | initialized")

    def include_router(self, router: StreamRouter) -> None:
        """Merges handlers from a ``StreamRouter`` into this dispatcher.

        Args:
            router: Router from a feature module.
        """
        for event_type, handlers in router.handlers.items():
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].extend(handlers)
        log.info("StreamDispatcher | included router types=%s", list(router.handlers.keys()))

    def on(
        self,
        event_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler directly on the dispatcher.

        Args:
            event_type:  Stream message type (e.g. ``"booking.confirmed"``).
            filter_func: Optional ``payload -> bool`` filter.
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append((handler, filter_func))
            return handler

        return decorator

    async def process(self, payload: dict[str, Any], stream_name: str = "") -> None:
        """Dispatches an incoming message to matching handlers.

        Called by ``StreamProcessor`` on each incoming message.

        Args:
            payload:     Message data dict. Must contain ``"type"`` field.
            stream_name: Stream name (used for retry scheduling only).
        """
        event_type = payload.get("type")
        if not event_type:
            log.warning("StreamDispatcher | message without 'type' field: %s", payload)
            return

        handlers = self._handlers.get(event_type, [])
        if not handlers:
            log.debug("StreamDispatcher | no handlers for type='%s'", event_type)
            return

        for handler, filter_func in handlers:
            try:
                if filter_func is None or filter_func(payload):
                    log.debug("StreamDispatcher | calling %s for type='%s'", handler.__name__, event_type)
                    await handler(payload)
            except Exception as e:
                log.error("StreamDispatcher | handler %s failed: %s", handler.__name__, e)

                if self._retry_scheduler:
                    try:
                        await self._retry_scheduler.schedule_retry(
                            stream_name=stream_name,
                            payload=payload,
                            delay=60,
                        )
                        log.info("StreamDispatcher | retry scheduled for type='%s'", event_type)
                        return
                    except Exception as retry_err:
                        log.error("StreamDispatcher | retry scheduling failed: %s", retry_err)

                raise
Functions
include_router(router)

Merges handlers from a StreamRouter into this dispatcher.

Parameters:

Name Type Description Default
router StreamRouter

Router from a feature module.

required
Source code in src/codex_platform/streams/dispatcher.py
def include_router(self, router: StreamRouter) -> None:
    """Merges handlers from a ``StreamRouter`` into this dispatcher.

    Args:
        router: Router from a feature module.
    """
    for event_type, handlers in router.handlers.items():
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].extend(handlers)
    log.info("StreamDispatcher | included router types=%s", list(router.handlers.keys()))
on(event_type, filter_func=None)

Decorator for registering a handler directly on the dispatcher.

Parameters:

Name Type Description Default
event_type str

Stream message type (e.g. "booking.confirmed").

required
filter_func FilterFunc | None

Optional payload -> bool filter.

None
Source code in src/codex_platform/streams/dispatcher.py
def on(
    self,
    event_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler directly on the dispatcher.

    Args:
        event_type:  Stream message type (e.g. ``"booking.confirmed"``).
        filter_func: Optional ``payload -> bool`` filter.
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append((handler, filter_func))
        return handler

    return decorator
process(payload, stream_name='') async

Dispatches an incoming message to matching handlers.

Called by StreamProcessor on each incoming message.

Parameters:

Name Type Description Default
payload dict[str, Any]

Message data dict. Must contain "type" field.

required
stream_name str

Stream name (used for retry scheduling only).

''
Source code in src/codex_platform/streams/dispatcher.py
async def process(self, payload: dict[str, Any], stream_name: str = "") -> None:
    """Dispatches an incoming message to matching handlers.

    Called by ``StreamProcessor`` on each incoming message.

    Args:
        payload:     Message data dict. Must contain ``"type"`` field.
        stream_name: Stream name (used for retry scheduling only).
    """
    event_type = payload.get("type")
    if not event_type:
        log.warning("StreamDispatcher | message without 'type' field: %s", payload)
        return

    handlers = self._handlers.get(event_type, [])
    if not handlers:
        log.debug("StreamDispatcher | no handlers for type='%s'", event_type)
        return

    for handler, filter_func in handlers:
        try:
            if filter_func is None or filter_func(payload):
                log.debug("StreamDispatcher | calling %s for type='%s'", handler.__name__, event_type)
                await handler(payload)
        except Exception as e:
            log.error("StreamDispatcher | handler %s failed: %s", handler.__name__, e)

            if self._retry_scheduler:
                try:
                    await self._retry_scheduler.schedule_retry(
                        stream_name=stream_name,
                        payload=payload,
                        delay=60,
                    )
                    log.info("StreamDispatcher | retry scheduled for type='%s'", event_type)
                    return
                except Exception as retry_err:
                    log.error("StreamDispatcher | retry scheduling failed: %s", retry_err)

            raise

StreamProcessor

Async engine for consuming and dispatching Redis Stream events.

Runs a continuous background polling loop. On each iteration reads a batch of messages from the consumer group, dispatches each to the registered callback, and ACKs on success.

If the callback fails — message stays in PEL (unacknowledged) for future recovery. The processor recovers automatically if the consumer group is lost (NOGROUP error).

Parameters:

Name Type Description Default
storage StreamStorageProtocol

Any object implementing StreamStorageProtocol. Use StreamConsumer from streams.consumer.

required
stream_name str

Redis Stream identifier.

required
consumer_group_name str

Shared group name (same across all instances = load balancing).

required
consumer_name str

Unique name for this processor instance.

required
batch_count int

Max messages per poll cycle.

10
poll_interval float

Sleep duration (seconds) when no messages available.

1.0
TODO

PEL recovery on startup — XPENDING/XCLAIM for messages stuck in PEL after a processor crash.

Example::

consumer = StreamConsumer(redis, "events:bot", "bot_group", "worker_1")
processor = StreamProcessor(
    storage=consumer,
    stream_name="events:bot",
    consumer_group_name="bot_group",
    consumer_name="worker_1",
)
processor.set_callback(my_async_handler)
await processor.start()
# ... later
await processor.stop()
Source code in src/codex_platform/streams/processor.py
class StreamProcessor:
    """Async engine for consuming and dispatching Redis Stream events.

    Runs a continuous background polling loop. On each iteration reads a batch
    of messages from the consumer group, dispatches each to the registered
    callback, and ACKs on success.

    If the callback fails — message stays in PEL (unacknowledged) for future
    recovery. The processor recovers automatically if the consumer group is lost
    (NOGROUP error).

    Args:
        storage:             Any object implementing ``StreamStorageProtocol``.
                             Use ``StreamConsumer`` from streams.consumer.
        stream_name:         Redis Stream identifier.
        consumer_group_name: Shared group name (same across all instances = load balancing).
        consumer_name:       Unique name for this processor instance.
        batch_count:         Max messages per poll cycle.
        poll_interval:       Sleep duration (seconds) when no messages available.

    TODO:
        PEL recovery on startup — XPENDING/XCLAIM for messages stuck in PEL
        after a processor crash.

    Example::

        consumer = StreamConsumer(redis, "events:bot", "bot_group", "worker_1")
        processor = StreamProcessor(
            storage=consumer,
            stream_name="events:bot",
            consumer_group_name="bot_group",
            consumer_name="worker_1",
        )
        processor.set_callback(my_async_handler)
        await processor.start()
        # ... later
        await processor.stop()
    """

    def __init__(
        self,
        storage: StreamStorageProtocol,
        stream_name: str,
        consumer_group_name: str,
        consumer_name: str,
        batch_count: int = 10,
        poll_interval: float = 1.0,
    ) -> None:
        self.storage = storage
        self.stream_name = stream_name
        self.group_name = consumer_group_name
        self.consumer_name = consumer_name
        self.batch_count = batch_count
        self.poll_interval = poll_interval

        self.is_running = False
        self._callback: MessageCallback | None = None
        self._task: asyncio.Task[None] | None = None

    def set_callback(self, callback: MessageCallback) -> None:
        """Registers the message handler.

        Args:
            callback: ``async def handler(payload: dict) -> None``
        """
        self._callback = callback

    async def start(self) -> None:
        """Starts the background polling loop.

        Creates the consumer group (up to 5 attempts with 3s delay),
        then spawns ``_consume_loop`` as an asyncio Task.
        """
        if self.is_running:
            log.warning("StreamProcessor | already running stream='%s'", self.stream_name)
            return

        for attempt in range(1, 6):
            try:
                await self.storage.create_group(self.stream_name, self.group_name)
                break
            except Exception as e:
                log.warning("StreamProcessor | create_group attempt %d/5: %s", attempt, e)
                if attempt < 5:
                    await asyncio.sleep(3)
                else:
                    log.error("StreamProcessor | failed to create group, giving up")
                    return

        self.is_running = True
        self._task = asyncio.create_task(self._consume_loop())
        log.info(
            "StreamProcessor | started stream='%s' group='%s' consumer='%s'",
            self.stream_name,
            self.group_name,
            self.consumer_name,
        )

    async def stop(self) -> None:
        """Stops the polling loop gracefully."""
        self.is_running = False
        if self._task and not self._task.done():
            self._task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._task
        log.info("StreamProcessor | stopped stream='%s'", self.stream_name)

    async def _consume_loop(self) -> None:
        try:
            while self.is_running:
                try:
                    messages = await self.storage.read_events(
                        stream_name=self.stream_name,
                        group_name=self.group_name,
                        consumer_name=self.consumer_name,
                        count=self.batch_count,
                    )

                    if not messages:
                        await asyncio.sleep(self.poll_interval)
                        continue

                    for message_id, data in messages:
                        await self._process_one(message_id, data)

                except asyncio.CancelledError:
                    raise
                except Exception as e:
                    log.error("StreamProcessor | consume loop error: %s", e)
                    if "NOGROUP" in str(e):
                        log.warning("StreamProcessor | consumer group lost, recreating...")
                        with contextlib.suppress(Exception):
                            await self.storage.create_group(self.stream_name, self.group_name)
                    await asyncio.sleep(5)

        except asyncio.CancelledError:
            log.info("StreamProcessor | consume loop cancelled")
            raise

    async def _process_one(self, message_id: str, data: dict[str, Any]) -> None:
        """Processes a single message. ACKs on success, leaves in PEL on failure."""
        try:
            if self._callback:
                await self._callback(data)
            await self.storage.ack_event(self.stream_name, self.group_name, message_id)
            log.debug("StreamProcessor | ack id='%s'", message_id)
        except Exception as e:
            log.error("StreamProcessor | failed to process message id='%s': %s", message_id, e)
Functions
set_callback(callback)

Registers the message handler.

Parameters:

Name Type Description Default
callback MessageCallback

async def handler(payload: dict) -> None

required
Source code in src/codex_platform/streams/processor.py
def set_callback(self, callback: MessageCallback) -> None:
    """Registers the message handler.

    Args:
        callback: ``async def handler(payload: dict) -> None``
    """
    self._callback = callback
start() async

Starts the background polling loop.

Creates the consumer group (up to 5 attempts with 3s delay), then spawns _consume_loop as an asyncio Task.

Source code in src/codex_platform/streams/processor.py
async def start(self) -> None:
    """Starts the background polling loop.

    Creates the consumer group (up to 5 attempts with 3s delay),
    then spawns ``_consume_loop`` as an asyncio Task.
    """
    if self.is_running:
        log.warning("StreamProcessor | already running stream='%s'", self.stream_name)
        return

    for attempt in range(1, 6):
        try:
            await self.storage.create_group(self.stream_name, self.group_name)
            break
        except Exception as e:
            log.warning("StreamProcessor | create_group attempt %d/5: %s", attempt, e)
            if attempt < 5:
                await asyncio.sleep(3)
            else:
                log.error("StreamProcessor | failed to create group, giving up")
                return

    self.is_running = True
    self._task = asyncio.create_task(self._consume_loop())
    log.info(
        "StreamProcessor | started stream='%s' group='%s' consumer='%s'",
        self.stream_name,
        self.group_name,
        self.consumer_name,
    )
stop() async

Stops the polling loop gracefully.

Source code in src/codex_platform/streams/processor.py
async def stop(self) -> None:
    """Stops the polling loop gracefully."""
    self.is_running = False
    if self._task and not self._task.done():
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task
    log.info("StreamProcessor | stopped stream='%s'", self.stream_name)

StreamStorageProtocol

Bases: Protocol

Adapter protocol for Redis Stream I/O.

StreamConsumer from streams.consumer implements this protocol. You can also implement it yourself for testing or alternative backends.

Source code in src/codex_platform/streams/processor.py
@runtime_checkable
class StreamStorageProtocol(Protocol):
    """Adapter protocol for Redis Stream I/O.

    ``StreamConsumer`` from ``streams.consumer`` implements this protocol.
    You can also implement it yourself for testing or alternative backends.
    """

    async def create_group(self, stream_name: str, group_name: str) -> None:
        """Creates a consumer group idempotently (XGROUP CREATE ... MKSTREAM)."""
        ...

    async def read_events(
        self,
        stream_name: str,
        group_name: str,
        consumer_name: str,
        count: int,
    ) -> list[tuple[str, dict[str, Any]]]:
        """Reads undelivered messages for the group (XREADGROUP ... >).

        Returns:
            List of (message_id, data_dict). Empty if no messages.
        """
        ...

    async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
        """Acknowledges message processing (XACK)."""
        ...
Functions
create_group(stream_name, group_name) async

Creates a consumer group idempotently (XGROUP CREATE ... MKSTREAM).

Source code in src/codex_platform/streams/processor.py
async def create_group(self, stream_name: str, group_name: str) -> None:
    """Creates a consumer group idempotently (XGROUP CREATE ... MKSTREAM)."""
    ...
read_events(stream_name, group_name, consumer_name, count) async

Reads undelivered messages for the group (XREADGROUP ... >).

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of (message_id, data_dict). Empty if no messages.

Source code in src/codex_platform/streams/processor.py
async def read_events(
    self,
    stream_name: str,
    group_name: str,
    consumer_name: str,
    count: int,
) -> list[tuple[str, dict[str, Any]]]:
    """Reads undelivered messages for the group (XREADGROUP ... >).

    Returns:
        List of (message_id, data_dict). Empty if no messages.
    """
    ...
ack_event(stream_name, group_name, message_id) async

Acknowledges message processing (XACK).

Source code in src/codex_platform/streams/processor.py
async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
    """Acknowledges message processing (XACK)."""
    ...

StreamProducer

Writes events to a Redis Stream (XADD).

Automatically sanitizes the payload before writing: - bool values are converted to "True" / "False" - None values are filtered out - all remaining values are coerced to str

Source code in src/codex_platform/streams/producer.py
class StreamProducer:
    """Writes events to a Redis Stream (XADD).

    Automatically sanitizes the payload before writing:
    - ``bool`` values are converted to ``"True"`` / ``"False"``
    - ``None`` values are filtered out
    - all remaining values are coerced to ``str``
    """

    def __init__(self, client: Redis, stream_name: str) -> None:
        self.client = client
        self.stream_name = stream_name

    async def add_event(self, event_type: str, data: dict[str, Any]) -> str:
        """Append an event to the Redis Stream (XADD).

        Args:
            event_type: Event type label, e.g. ``"new_appointment"`` or ``"new_contact"``.
            data: Event payload. ``None`` values are filtered out automatically.

        Returns:
            The ID of the newly added stream entry (e.g. ``"1718000000000-0"``).

        Raises:
            RedisConnectionError: Redis connection failure.
            RedisServiceError: Redis operation failure.
        """
        payload = self._sanitize({"type": event_type, **data})
        try:
            result = await self.client.xadd(self.stream_name, payload)
            log.info("StreamProducer | added event_type='%s' id='%s' stream='%s'", event_type, result, self.stream_name)
            return str(result)
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream producer connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream producer error: {e}") from e

    @staticmethod
    def _sanitize(data: dict[str, Any]) -> dict[str, str]:
        """Convert all values to str and filter out None entries."""
        return {
            k: ("True" if v is True else "False" if v is False else str(v)) for k, v in data.items() if v is not None
        }
Functions
add_event(event_type, data) async

Append an event to the Redis Stream (XADD).

Parameters:

Name Type Description Default
event_type str

Event type label, e.g. "new_appointment" or "new_contact".

required
data dict[str, Any]

Event payload. None values are filtered out automatically.

required

Returns:

Type Description
str

The ID of the newly added stream entry (e.g. "1718000000000-0").

Raises:

Type Description
RedisConnectionError

Redis connection failure.

RedisServiceError

Redis operation failure.

Source code in src/codex_platform/streams/producer.py
async def add_event(self, event_type: str, data: dict[str, Any]) -> str:
    """Append an event to the Redis Stream (XADD).

    Args:
        event_type: Event type label, e.g. ``"new_appointment"`` or ``"new_contact"``.
        data: Event payload. ``None`` values are filtered out automatically.

    Returns:
        The ID of the newly added stream entry (e.g. ``"1718000000000-0"``).

    Raises:
        RedisConnectionError: Redis connection failure.
        RedisServiceError: Redis operation failure.
    """
    payload = self._sanitize({"type": event_type, **data})
    try:
        result = await self.client.xadd(self.stream_name, payload)
        log.info("StreamProducer | added event_type='%s' id='%s' stream='%s'", event_type, result, self.stream_name)
        return str(result)
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream producer connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream producer error: {e}") from e

StreamRouter

Groups Redis Stream event handlers by message type.

After populating, include into a StreamDispatcher via include_router().

Example::

router = StreamRouter()

@router.on("order.paid")
async def on_order_paid(payload: dict) -> None:
    ...

dispatcher.include_router(router)
Source code in src/codex_platform/streams/router.py
class StreamRouter:
    """Groups Redis Stream event handlers by message type.

    After populating, include into a ``StreamDispatcher`` via ``include_router()``.

    Example::

        router = StreamRouter()

        @router.on("order.paid")
        async def on_order_paid(payload: dict) -> None:
            ...

        dispatcher.include_router(router)
    """

    def __init__(self) -> None:
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}

    def on(
        self,
        event_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler for an event type.

        Args:
            event_type:  Stream message type (e.g. ``"booking.confirmed"``).
            filter_func: Optional ``payload -> bool`` filter. Handler is called
                         only when filter returns ``True``.

        Returns:
            Decorator (returns the original handler unchanged).
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append((handler, filter_func))
            return handler

        return decorator

    @property
    def handlers(self) -> dict[str, list[tuple[HandlerFunc, FilterFunc | None]]]:
        """Registered handlers (read-only)."""
        return self._handlers
Attributes
handlers property

Registered handlers (read-only).

Functions
on(event_type, filter_func=None)

Decorator for registering a handler for an event type.

Parameters:

Name Type Description Default
event_type str

Stream message type (e.g. "booking.confirmed").

required
filter_func FilterFunc | None

Optional payload -> bool filter. Handler is called only when filter returns True.

None

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator (returns the original handler unchanged).

Source code in src/codex_platform/streams/router.py
def on(
    self,
    event_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler for an event type.

    Args:
        event_type:  Stream message type (e.g. ``"booking.confirmed"``).
        filter_func: Optional ``payload -> bool`` filter. Handler is called
                     only when filter returns ``True``.

    Returns:
        Decorator (returns the original handler unchanged).
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append((handler, filter_func))
        return handler

    return decorator