Skip to content

streams

streams

codex_platform.streams

Redis Streams broker — event sourcing / pub-sub layer.

Separate module from redis_service. Streams are a message broker, not a data structure.

Components: - StreamProducer — XADD (publish events) - StreamConsumer — XREADGROUP + XACK (low-level read) - StreamProcessor — background polling engine (wraps StreamConsumer) - StreamRouter — groups handlers by event type (per-feature) - StreamDispatcher — routes messages to handlers (generic, no DI) - StreamRuntime — wires producer/consumer/dispatcher/processor with logical groups

Typical setup::

from codex_platform.streams import (
    StreamConsumer, StreamProducer,
    StreamProcessor, StreamRouter, StreamDispatcher,
)

runtime = StreamRuntime(
    redis_client,
    StreamRuntimeConfig("events:orders", "monolith", "worker_1"),
)

router = StreamRouter()

@router.on("order.paid", group="orders")
async def handle_order(payload: dict) -> None:
    ...

runtime.include_router(router)
await runtime.start()

Classes

StreamConsumer

Reads events from a Redis Stream via a consumer group (XREADGROUP).

Acknowledges processed messages via XACK.

Source code in src/codex_platform/streams/consumer.py
class StreamConsumer:
    """Reads events from a Redis Stream via a consumer group (XREADGROUP).

    Acknowledges processed messages via XACK.
    """

    def __init__(self, client: Redis, stream_name: str, group: str, consumer: str) -> None:
        self.client = client
        self.stream_name = stream_name
        self.group = group
        self.consumer = consumer

    async def ensure_group(self) -> None:
        """Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM)."""
        try:
            await self.client.xgroup_create(self.stream_name, self.group, id="0", mkstream=True)
            log.info("StreamConsumer | group created stream='%s' group='%s'", self.stream_name, self.group)
        except RedisError as e:
            if "BUSYGROUP" in str(e):
                log.debug("StreamConsumer | group already exists stream='%s' group='%s'", self.stream_name, self.group)
            else:
                raise RedisServiceError(f"Failed to create group: {e}") from e

    async def create_group(self, stream_name: str | None = None, group_name: str | None = None) -> None:
        """Create a group using the ``StreamStorageProtocol`` method name."""
        original_stream = self.stream_name
        original_group = self.group
        if stream_name is not None:
            self.stream_name = stream_name
        if group_name is not None:
            self.group = group_name
        try:
            await self.ensure_group()
        finally:
            self.stream_name = original_stream
            self.group = original_group

    async def read(self, count: int = 10, block_ms: int | None = 1000) -> list[StreamEvent]:
        """Read new events from the stream for the consumer group (XREADGROUP).

        Args:
            count: Maximum number of messages to fetch per call. Defaults to ``10``.
            block_ms: Redis XREADGROUP block timeout in milliseconds. ``None``
                keeps the legacy non-blocking read behavior.

        Returns:
            List of :class:`StreamEvent` instances. Empty list if no new messages.

        Raises:
            RedisConnectionError: Redis connection failure.
            RedisServiceError: Redis operation failure.
        """
        try:
            options: dict[str, Any] = {"count": count}
            if block_ms is not None:
                options["block"] = block_ms
            raw = await self.client.xreadgroup(
                groupname=self.group,
                consumername=self.consumer,
                streams={self.stream_name: ">"},
                **options,
            )
            if not raw:
                return []
            events = [self._parse(msg_id, fields) for msg_id, fields in raw[0][1]]
            log.debug("StreamConsumer | read count=%d stream='%s'", len(events), self.stream_name)
            return events
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream consumer connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream consumer error: {e}") from e

    async def read_events(
        self,
        stream_name: str | None = None,
        group_name: str | None = None,
        consumer_name: str | None = None,
        count: int = 10,
        block_ms: int | None = 1000,
    ) -> list[tuple[str, dict[str, Any]]]:
        """Read dispatcher-ready events using the ``StreamStorageProtocol`` method name."""
        original_stream = self.stream_name
        original_group = self.group
        original_consumer = self.consumer
        if stream_name is not None:
            self.stream_name = stream_name
        if group_name is not None:
            self.group = group_name
        if consumer_name is not None:
            self.consumer = consumer_name
        try:
            events = await self.read(count=count, block_ms=block_ms)
            return [(event.id, event.payload) for event in events]
        finally:
            self.stream_name = original_stream
            self.group = original_group
            self.consumer = original_consumer

    async def ack(self, event_id: str) -> None:
        """Acknowledge that an event has been processed (XACK).

        Args:
            event_id: Stream entry ID returned by :meth:`read`.

        Raises:
            RedisConnectionError: Redis connection failure.
            RedisServiceError: Redis operation failure.
        """
        try:
            await self.client.xack(self.stream_name, self.group, event_id)
            log.debug("StreamConsumer | ack id='%s' stream='%s'", event_id, self.stream_name)
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream ack connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream ack error: {e}") from e

    async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
        """Acknowledge an event using the ``StreamStorageProtocol`` method name."""
        original_stream = self.stream_name
        original_group = self.group
        self.stream_name = stream_name
        self.group = group_name
        try:
            await self.ack(message_id)
        finally:
            self.stream_name = original_stream
            self.group = original_group

    @staticmethod
    def _parse(msg_id: Any, fields: dict[bytes | str, bytes | str]) -> StreamEvent:
        data = decode_stream_payload(fields)
        event_type = data.pop("type", "unknown")
        return StreamEvent(id=str(msg_id), event_type=event_type, data=data)
Functions
ensure_group() async

Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM).

Source code in src/codex_platform/streams/consumer.py
async def ensure_group(self) -> None:
    """Create the consumer group if it does not already exist (XGROUP CREATE … MKSTREAM)."""
    try:
        await self.client.xgroup_create(self.stream_name, self.group, id="0", mkstream=True)
        log.info("StreamConsumer | group created stream='%s' group='%s'", self.stream_name, self.group)
    except RedisError as e:
        if "BUSYGROUP" in str(e):
            log.debug("StreamConsumer | group already exists stream='%s' group='%s'", self.stream_name, self.group)
        else:
            raise RedisServiceError(f"Failed to create group: {e}") from e
create_group(stream_name=None, group_name=None) async

Create a group using the StreamStorageProtocol method name.

Source code in src/codex_platform/streams/consumer.py
async def create_group(self, stream_name: str | None = None, group_name: str | None = None) -> None:
    """Create a group using the ``StreamStorageProtocol`` method name."""
    original_stream = self.stream_name
    original_group = self.group
    if stream_name is not None:
        self.stream_name = stream_name
    if group_name is not None:
        self.group = group_name
    try:
        await self.ensure_group()
    finally:
        self.stream_name = original_stream
        self.group = original_group
read(count=10, block_ms=1000) async

Read new events from the stream for the consumer group (XREADGROUP).

Parameters:

Name Type Description Default
count int

Maximum number of messages to fetch per call. Defaults to 10.

10
block_ms int | None

Redis XREADGROUP block timeout in milliseconds. None keeps the legacy non-blocking read behavior.

1000

Returns:

Type Description
list[StreamEvent]

List of :class:StreamEvent instances. Empty list if no new messages.

Raises:

Type Description
RedisConnectionError

Redis connection failure.

RedisServiceError

Redis operation failure.

Source code in src/codex_platform/streams/consumer.py
async def read(self, count: int = 10, block_ms: int | None = 1000) -> list[StreamEvent]:
    """Read new events from the stream for the consumer group (XREADGROUP).

    Args:
        count: Maximum number of messages to fetch per call. Defaults to ``10``.
        block_ms: Redis XREADGROUP block timeout in milliseconds. ``None``
            keeps the legacy non-blocking read behavior.

    Returns:
        List of :class:`StreamEvent` instances. Empty list if no new messages.

    Raises:
        RedisConnectionError: Redis connection failure.
        RedisServiceError: Redis operation failure.
    """
    try:
        options: dict[str, Any] = {"count": count}
        if block_ms is not None:
            options["block"] = block_ms
        raw = await self.client.xreadgroup(
            groupname=self.group,
            consumername=self.consumer,
            streams={self.stream_name: ">"},
            **options,
        )
        if not raw:
            return []
        events = [self._parse(msg_id, fields) for msg_id, fields in raw[0][1]]
        log.debug("StreamConsumer | read count=%d stream='%s'", len(events), self.stream_name)
        return events
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream consumer connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream consumer error: {e}") from e
read_events(stream_name=None, group_name=None, consumer_name=None, count=10, block_ms=1000) async

Read dispatcher-ready events using the StreamStorageProtocol method name.

Source code in src/codex_platform/streams/consumer.py
async def read_events(
    self,
    stream_name: str | None = None,
    group_name: str | None = None,
    consumer_name: str | None = None,
    count: int = 10,
    block_ms: int | None = 1000,
) -> list[tuple[str, dict[str, Any]]]:
    """Read dispatcher-ready events using the ``StreamStorageProtocol`` method name."""
    original_stream = self.stream_name
    original_group = self.group
    original_consumer = self.consumer
    if stream_name is not None:
        self.stream_name = stream_name
    if group_name is not None:
        self.group = group_name
    if consumer_name is not None:
        self.consumer = consumer_name
    try:
        events = await self.read(count=count, block_ms=block_ms)
        return [(event.id, event.payload) for event in events]
    finally:
        self.stream_name = original_stream
        self.group = original_group
        self.consumer = original_consumer
ack(event_id) async

Acknowledge that an event has been processed (XACK).

Parameters:

Name Type Description Default
event_id str

Stream entry ID returned by :meth:read.

required

Raises:

Type Description
RedisConnectionError

Redis connection failure.

RedisServiceError

Redis operation failure.

Source code in src/codex_platform/streams/consumer.py
async def ack(self, event_id: str) -> None:
    """Acknowledge that an event has been processed (XACK).

    Args:
        event_id: Stream entry ID returned by :meth:`read`.

    Raises:
        RedisConnectionError: Redis connection failure.
        RedisServiceError: Redis operation failure.
    """
    try:
        await self.client.xack(self.stream_name, self.group, event_id)
        log.debug("StreamConsumer | ack id='%s' stream='%s'", event_id, self.stream_name)
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream ack connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream ack error: {e}") from e
ack_event(stream_name, group_name, message_id) async

Acknowledge an event using the StreamStorageProtocol method name.

Source code in src/codex_platform/streams/consumer.py
async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
    """Acknowledge an event using the ``StreamStorageProtocol`` method name."""
    original_stream = self.stream_name
    original_group = self.group
    self.stream_name = stream_name
    self.group = group_name
    try:
        await self.ack(message_id)
    finally:
        self.stream_name = original_stream
        self.group = original_group

StreamEvent dataclass

Parsed Redis Stream event.

Source code in src/codex_platform/streams/consumer.py
@dataclass
class StreamEvent:
    """Parsed Redis Stream event."""

    id: str
    event_type: str
    data: dict[str, Any]

    @property
    def payload(self) -> dict[str, Any]:
        """Return dispatcher-ready payload with the ``type`` field restored."""
        return {"type": self.event_type, **self.data}
Attributes
payload property

Return dispatcher-ready payload with the type field restored.

RetrySchedulerProtocol

Bases: Protocol

Protocol for a retry scheduler (ARQ, Celery, etc.).

Pass to StreamDispatcher for automatic rescheduling of failed messages.

Source code in src/codex_platform/streams/dispatcher.py
@runtime_checkable
class RetrySchedulerProtocol(Protocol):
    """Protocol for a retry scheduler (ARQ, Celery, etc.).

    Pass to ``StreamDispatcher`` for automatic rescheduling of failed messages.
    """

    async def schedule_retry(
        self,
        stream_name: str,
        payload: dict[str, Any],
        delay: int = 60,
    ) -> None:
        """Schedules message reprocessing after a delay.

        Args:
            stream_name: Redis Stream name.
            payload:     Original message data.
            delay:       Retry delay in seconds.
        """
        ...
Functions
schedule_retry(stream_name, payload, delay=60) async

Schedules message reprocessing after a delay.

Parameters:

Name Type Description Default
stream_name str

Redis Stream name.

required
payload dict[str, Any]

Original message data.

required
delay int

Retry delay in seconds.

60
Source code in src/codex_platform/streams/dispatcher.py
async def schedule_retry(
    self,
    stream_name: str,
    payload: dict[str, Any],
    delay: int = 60,
) -> None:
    """Schedules message reprocessing after a delay.

    Args:
        stream_name: Redis Stream name.
        payload:     Original message data.
        delay:       Retry delay in seconds.
    """
    ...

StreamDispatcher

Routes Redis Stream messages to registered handlers by event type.

Handlers are registered via @dispatcher.on(event_type) decorator or by including StreamRouter instances.

On handler failure: if a retry_scheduler is provided, the message is scheduled for retry. Otherwise the exception is re-raised (message stays in PEL, unacknowledged).

Parameters:

Name Type Description Default
retry_scheduler RetrySchedulerProtocol | None

Optional retry scheduler implementing RetrySchedulerProtocol.

None

Example::

dispatcher = StreamDispatcher()

@dispatcher.on("user.registered")
async def welcome(payload: dict) -> None:
    await send_welcome_email(payload["email"])

processor.set_callback(dispatcher.process)
Source code in src/codex_platform/streams/dispatcher.py
class StreamDispatcher:
    """Routes Redis Stream messages to registered handlers by event type.

    Handlers are registered via ``@dispatcher.on(event_type)`` decorator
    or by including ``StreamRouter`` instances.

    On handler failure: if a ``retry_scheduler`` is provided, the message
    is scheduled for retry. Otherwise the exception is re-raised (message
    stays in PEL, unacknowledged).

    Args:
        retry_scheduler: Optional retry scheduler implementing ``RetrySchedulerProtocol``.

    Example::

        dispatcher = StreamDispatcher()

        @dispatcher.on("user.registered")
        async def welcome(payload: dict) -> None:
            await send_welcome_email(payload["email"])

        processor.set_callback(dispatcher.process)
    """

    def __init__(self, retry_scheduler: RetrySchedulerProtocol | None = None) -> None:
        self._retry_scheduler = retry_scheduler
        self._handlers: dict[str, list[StreamHandlerSpec]] = {}
        log.info("StreamDispatcher | initialized")

    def include_router(self, router: StreamRouter, *, enabled_groups: set[str] | None = None) -> None:
        """Merges handlers from a ``StreamRouter`` into this dispatcher.

        Args:
            router: Router from a feature module.
            enabled_groups: Optional logical handler groups to include.
        """
        for event_type, handlers in router.handlers.items():
            selected = [
                spec
                for spec in handlers
                if enabled_groups is None or spec.group is None or spec.group in enabled_groups
            ]
            if selected:
                if event_type not in self._handlers:
                    self._handlers[event_type] = []
                self._handlers[event_type].extend(selected)
        log.info("StreamDispatcher | included router types=%s", list(router.handlers.keys()))

    def on(
        self,
        event_type: str,
        filter_func: FilterFunc | None = None,
        *,
        group: str | None = None,
        reply: bool = False,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler directly on the dispatcher.

        Args:
            event_type:  Stream message type (e.g. ``"booking.confirmed"``).
            filter_func: Optional ``payload -> bool`` filter.
            group: Optional logical processing group used by ``StreamRuntime``.
            reply: Whether the handler participates in request/reply flows.
        """
        from .router import StreamHandlerSpec

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append(
                StreamHandlerSpec(
                    event_type=event_type,
                    handler=handler,
                    filter_func=filter_func,
                    group=group,
                    reply=reply,
                )
            )
            return handler

        return decorator

    async def process(self, payload: dict[str, Any], stream_name: str = "") -> None:
        """Dispatches an incoming message to matching handlers.

        Called by ``StreamProcessor`` on each incoming message.

        Args:
            payload:     Message data dict. Must contain ``"type"`` field.
            stream_name: Stream name (used for retry scheduling only).
        """
        event_type = payload.get("type")
        if not event_type:
            log.warning("StreamDispatcher | message without 'type' field: %s", payload)
            return

        handlers = self._handlers.get(event_type, [])
        if not handlers:
            log.debug("StreamDispatcher | no handlers for type='%s'", event_type)
            return

        for spec in handlers:
            try:
                if spec.filter_func is None or spec.filter_func(payload):
                    log.debug("StreamDispatcher | calling %s for type='%s'", spec.handler.__name__, event_type)
                    await spec.handler(payload)
            except Exception as e:
                log.error("StreamDispatcher | handler %s failed: %s", spec.handler.__name__, e)

                if self._retry_scheduler:
                    try:
                        await self._retry_scheduler.schedule_retry(
                            stream_name=stream_name,
                            payload=payload,
                            delay=60,
                        )
                        log.info("StreamDispatcher | retry scheduled for type='%s'", event_type)
                        return
                    except Exception as retry_err:
                        log.error("StreamDispatcher | retry scheduling failed: %s", retry_err)

                raise
Functions
include_router(router, *, enabled_groups=None)

Merges handlers from a StreamRouter into this dispatcher.

Parameters:

Name Type Description Default
router StreamRouter

Router from a feature module.

required
enabled_groups set[str] | None

Optional logical handler groups to include.

None
Source code in src/codex_platform/streams/dispatcher.py
def include_router(self, router: StreamRouter, *, enabled_groups: set[str] | None = None) -> None:
    """Merges handlers from a ``StreamRouter`` into this dispatcher.

    Args:
        router: Router from a feature module.
        enabled_groups: Optional logical handler groups to include.
    """
    for event_type, handlers in router.handlers.items():
        selected = [
            spec
            for spec in handlers
            if enabled_groups is None or spec.group is None or spec.group in enabled_groups
        ]
        if selected:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].extend(selected)
    log.info("StreamDispatcher | included router types=%s", list(router.handlers.keys()))
on(event_type, filter_func=None, *, group=None, reply=False)

Decorator for registering a handler directly on the dispatcher.

Parameters:

Name Type Description Default
event_type str

Stream message type (e.g. "booking.confirmed").

required
filter_func FilterFunc | None

Optional payload -> bool filter.

None
group str | None

Optional logical processing group used by StreamRuntime.

None
reply bool

Whether the handler participates in request/reply flows.

False
Source code in src/codex_platform/streams/dispatcher.py
def on(
    self,
    event_type: str,
    filter_func: FilterFunc | None = None,
    *,
    group: str | None = None,
    reply: bool = False,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler directly on the dispatcher.

    Args:
        event_type:  Stream message type (e.g. ``"booking.confirmed"``).
        filter_func: Optional ``payload -> bool`` filter.
        group: Optional logical processing group used by ``StreamRuntime``.
        reply: Whether the handler participates in request/reply flows.
    """
    from .router import StreamHandlerSpec

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(
            StreamHandlerSpec(
                event_type=event_type,
                handler=handler,
                filter_func=filter_func,
                group=group,
                reply=reply,
            )
        )
        return handler

    return decorator
process(payload, stream_name='') async

Dispatches an incoming message to matching handlers.

Called by StreamProcessor on each incoming message.

Parameters:

Name Type Description Default
payload dict[str, Any]

Message data dict. Must contain "type" field.

required
stream_name str

Stream name (used for retry scheduling only).

''
Source code in src/codex_platform/streams/dispatcher.py
async def process(self, payload: dict[str, Any], stream_name: str = "") -> None:
    """Dispatches an incoming message to matching handlers.

    Called by ``StreamProcessor`` on each incoming message.

    Args:
        payload:     Message data dict. Must contain ``"type"`` field.
        stream_name: Stream name (used for retry scheduling only).
    """
    event_type = payload.get("type")
    if not event_type:
        log.warning("StreamDispatcher | message without 'type' field: %s", payload)
        return

    handlers = self._handlers.get(event_type, [])
    if not handlers:
        log.debug("StreamDispatcher | no handlers for type='%s'", event_type)
        return

    for spec in handlers:
        try:
            if spec.filter_func is None or spec.filter_func(payload):
                log.debug("StreamDispatcher | calling %s for type='%s'", spec.handler.__name__, event_type)
                await spec.handler(payload)
        except Exception as e:
            log.error("StreamDispatcher | handler %s failed: %s", spec.handler.__name__, e)

            if self._retry_scheduler:
                try:
                    await self._retry_scheduler.schedule_retry(
                        stream_name=stream_name,
                        payload=payload,
                        delay=60,
                    )
                    log.info("StreamDispatcher | retry scheduled for type='%s'", event_type)
                    return
                except Exception as retry_err:
                    log.error("StreamDispatcher | retry scheduling failed: %s", retry_err)

            raise

StreamProcessor

Async engine for consuming and dispatching Redis Stream events.

Runs a continuous background polling loop. On each iteration reads a batch of messages from the consumer group, dispatches each to the registered callback, and ACKs on success.

If the callback fails — message stays in PEL (unacknowledged) for future recovery. The processor recovers automatically if the consumer group is lost (NOGROUP error).

Parameters:

Name Type Description Default
storage StreamStorageProtocol

Any object implementing StreamStorageProtocol. Use StreamConsumer from streams.consumer.

required
stream_name str

Redis Stream identifier.

required
consumer_group_name str

Shared group name (same across all instances = load balancing).

required
consumer_name str

Unique name for this processor instance.

required
batch_count int

Max messages per poll cycle.

10
poll_interval float

Fallback sleep duration (seconds) after empty reads or errors.

1.0
block_ms int | None

Redis XREADGROUP block timeout in milliseconds.

1000
TODO

PEL recovery on startup — XPENDING/XCLAIM for messages stuck in PEL after a processor crash.

Example::

consumer = StreamConsumer(redis, "events:bot", "bot_group", "worker_1")
processor = StreamProcessor(
    storage=consumer,
    stream_name="events:bot",
    consumer_group_name="bot_group",
    consumer_name="worker_1",
)
processor.set_callback(my_async_handler)
await processor.start()
# ... later
await processor.stop()
Source code in src/codex_platform/streams/processor.py
class StreamProcessor:
    """Async engine for consuming and dispatching Redis Stream events.

    Runs a continuous background polling loop. On each iteration reads a batch
    of messages from the consumer group, dispatches each to the registered
    callback, and ACKs on success.

    If the callback fails — message stays in PEL (unacknowledged) for future
    recovery. The processor recovers automatically if the consumer group is lost
    (NOGROUP error).

    Args:
        storage:             Any object implementing ``StreamStorageProtocol``.
                             Use ``StreamConsumer`` from streams.consumer.
        stream_name:         Redis Stream identifier.
        consumer_group_name: Shared group name (same across all instances = load balancing).
        consumer_name:       Unique name for this processor instance.
        batch_count:         Max messages per poll cycle.
        poll_interval:       Fallback sleep duration (seconds) after empty reads or errors.
        block_ms:            Redis XREADGROUP block timeout in milliseconds.

    TODO:
        PEL recovery on startup — XPENDING/XCLAIM for messages stuck in PEL
        after a processor crash.

    Example::

        consumer = StreamConsumer(redis, "events:bot", "bot_group", "worker_1")
        processor = StreamProcessor(
            storage=consumer,
            stream_name="events:bot",
            consumer_group_name="bot_group",
            consumer_name="worker_1",
        )
        processor.set_callback(my_async_handler)
        await processor.start()
        # ... later
        await processor.stop()
    """

    def __init__(
        self,
        storage: StreamStorageProtocol,
        stream_name: str,
        consumer_group_name: str,
        consumer_name: str,
        batch_count: int = 10,
        poll_interval: float = 1.0,
        block_ms: int | None = 1000,
    ) -> None:
        self.storage = storage
        self.stream_name = stream_name
        self.group_name = consumer_group_name
        self.consumer_name = consumer_name
        self.batch_count = batch_count
        self.poll_interval = poll_interval
        self.block_ms = block_ms

        self.is_running = False
        self._callback: MessageCallback | None = None
        self._task: asyncio.Task[None] | None = None

    def set_callback(self, callback: MessageCallback) -> None:
        """Registers the message handler.

        Args:
            callback: ``async def handler(payload: dict) -> None``
        """
        self._callback = callback

    async def start(self) -> None:
        """Starts the background polling loop.

        Creates the consumer group (up to 5 attempts with 3s delay),
        then spawns ``_consume_loop`` as an asyncio Task.
        """
        if self.is_running:
            log.warning("StreamProcessor | already running stream='%s'", self.stream_name)
            return

        for attempt in range(1, 6):
            try:
                await self.storage.create_group(self.stream_name, self.group_name)
                break
            except Exception as e:
                log.warning("StreamProcessor | create_group attempt %d/5: %s", attempt, e)
                if attempt < 5:
                    await asyncio.sleep(3)
                else:
                    log.error("StreamProcessor | failed to create group, giving up")
                    return

        self.is_running = True
        self._task = asyncio.create_task(self._consume_loop())
        log.info(
            "StreamProcessor | started stream='%s' group='%s' consumer='%s'",
            self.stream_name,
            self.group_name,
            self.consumer_name,
        )

    async def stop(self) -> None:
        """Stops the polling loop gracefully."""
        self.is_running = False
        if self._task and not self._task.done():
            self._task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._task
        log.info("StreamProcessor | stopped stream='%s'", self.stream_name)

    async def _consume_loop(self) -> None:
        try:
            while self.is_running:
                try:
                    messages = await self.storage.read_events(
                        stream_name=self.stream_name,
                        group_name=self.group_name,
                        consumer_name=self.consumer_name,
                        count=self.batch_count,
                        block_ms=self.block_ms,
                    )

                    if not messages:
                        await asyncio.sleep(self.poll_interval)
                        continue

                    for message_id, data in messages:
                        await self._process_one(message_id, data)

                except asyncio.CancelledError:
                    raise
                except Exception as e:
                    log.error("StreamProcessor | consume loop error: %s", e)
                    if "NOGROUP" in str(e):
                        log.warning("StreamProcessor | consumer group lost, recreating...")
                        with contextlib.suppress(Exception):
                            await self.storage.create_group(self.stream_name, self.group_name)
                    await asyncio.sleep(5)

        except asyncio.CancelledError:
            log.info("StreamProcessor | consume loop cancelled")
            raise

    async def _process_one(self, message_id: str, data: dict[str, Any]) -> None:
        """Processes a single message. ACKs on success, leaves in PEL on failure."""
        try:
            if self._callback:
                await self._callback(data)
            await self.storage.ack_event(self.stream_name, self.group_name, message_id)
            log.debug("StreamProcessor | ack id='%s'", message_id)
        except Exception as e:
            log.error("StreamProcessor | failed to process message id='%s': %s", message_id, e)
Functions
set_callback(callback)

Registers the message handler.

Parameters:

Name Type Description Default
callback MessageCallback

async def handler(payload: dict) -> None

required
Source code in src/codex_platform/streams/processor.py
def set_callback(self, callback: MessageCallback) -> None:
    """Registers the message handler.

    Args:
        callback: ``async def handler(payload: dict) -> None``
    """
    self._callback = callback
start() async

Starts the background polling loop.

Creates the consumer group (up to 5 attempts with 3s delay), then spawns _consume_loop as an asyncio Task.

Source code in src/codex_platform/streams/processor.py
async def start(self) -> None:
    """Starts the background polling loop.

    Creates the consumer group (up to 5 attempts with 3s delay),
    then spawns ``_consume_loop`` as an asyncio Task.
    """
    if self.is_running:
        log.warning("StreamProcessor | already running stream='%s'", self.stream_name)
        return

    for attempt in range(1, 6):
        try:
            await self.storage.create_group(self.stream_name, self.group_name)
            break
        except Exception as e:
            log.warning("StreamProcessor | create_group attempt %d/5: %s", attempt, e)
            if attempt < 5:
                await asyncio.sleep(3)
            else:
                log.error("StreamProcessor | failed to create group, giving up")
                return

    self.is_running = True
    self._task = asyncio.create_task(self._consume_loop())
    log.info(
        "StreamProcessor | started stream='%s' group='%s' consumer='%s'",
        self.stream_name,
        self.group_name,
        self.consumer_name,
    )
stop() async

Stops the polling loop gracefully.

Source code in src/codex_platform/streams/processor.py
async def stop(self) -> None:
    """Stops the polling loop gracefully."""
    self.is_running = False
    if self._task and not self._task.done():
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task
    log.info("StreamProcessor | stopped stream='%s'", self.stream_name)

StreamStorageProtocol

Bases: Protocol

Adapter protocol for Redis Stream I/O.

StreamConsumer from streams.consumer implements this protocol. You can also implement it yourself for testing or alternative backends.

Source code in src/codex_platform/streams/processor.py
@runtime_checkable
class StreamStorageProtocol(Protocol):
    """Adapter protocol for Redis Stream I/O.

    ``StreamConsumer`` from ``streams.consumer`` implements this protocol.
    You can also implement it yourself for testing or alternative backends.
    """

    async def create_group(self, stream_name: str, group_name: str) -> None:
        """Creates a consumer group idempotently (XGROUP CREATE ... MKSTREAM)."""
        ...

    async def read_events(
        self,
        stream_name: str,
        group_name: str,
        consumer_name: str,
        count: int,
        block_ms: int | None = 1000,
    ) -> list[tuple[str, dict[str, Any]]]:
        """Reads undelivered messages for the group (XREADGROUP ... >).

        Returns:
            List of (message_id, data_dict). Empty if no messages.
        """
        ...

    async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
        """Acknowledges message processing (XACK)."""
        ...
Functions
create_group(stream_name, group_name) async

Creates a consumer group idempotently (XGROUP CREATE ... MKSTREAM).

Source code in src/codex_platform/streams/processor.py
async def create_group(self, stream_name: str, group_name: str) -> None:
    """Creates a consumer group idempotently (XGROUP CREATE ... MKSTREAM)."""
    ...
read_events(stream_name, group_name, consumer_name, count, block_ms=1000) async

Reads undelivered messages for the group (XREADGROUP ... >).

Returns:

Type Description
list[tuple[str, dict[str, Any]]]

List of (message_id, data_dict). Empty if no messages.

Source code in src/codex_platform/streams/processor.py
async def read_events(
    self,
    stream_name: str,
    group_name: str,
    consumer_name: str,
    count: int,
    block_ms: int | None = 1000,
) -> list[tuple[str, dict[str, Any]]]:
    """Reads undelivered messages for the group (XREADGROUP ... >).

    Returns:
        List of (message_id, data_dict). Empty if no messages.
    """
    ...
ack_event(stream_name, group_name, message_id) async

Acknowledges message processing (XACK).

Source code in src/codex_platform/streams/processor.py
async def ack_event(self, stream_name: str, group_name: str, message_id: str) -> None:
    """Acknowledges message processing (XACK)."""
    ...

StreamProducer

Writes events to a Redis Stream (XADD).

Encodes structured payload values as JSON-prefixed Redis Stream fields.

Source code in src/codex_platform/streams/producer.py
class StreamProducer:
    """Writes events to a Redis Stream (XADD).

    Encodes structured payload values as JSON-prefixed Redis Stream fields.
    """

    def __init__(self, client: Redis, stream_name: str) -> None:
        self.client = client
        self.stream_name = stream_name

    async def publish(
        self,
        event_type: str,
        data: dict[str, Any],
        *,
        correlation_id: str | None = None,
    ) -> str:
        """Append an event to the Redis Stream (XADD).

        Args:
            event_type: Event type label, e.g. ``"new_appointment"`` or ``"new_contact"``.
            data: Event payload.
            correlation_id: Optional request/reply correlation id.

        Returns:
            The ID of the newly added stream entry (e.g. ``"1718000000000-0"``).

        Raises:
            RedisConnectionError: Redis connection failure.
            RedisServiceError: Redis operation failure.
        """
        payload_data = {"type": event_type, **data}
        if correlation_id is not None:
            payload_data["correlation_id"] = correlation_id
        payload = self._sanitize(payload_data)
        try:
            result = await self.client.xadd(self.stream_name, payload)
            log.info("StreamProducer | added event_type='%s' id='%s' stream='%s'", event_type, result, self.stream_name)
            return str(result)
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream producer connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream producer error: {e}") from e

    async def request(
        self,
        event_type: str,
        data: dict[str, Any],
        *,
        timeout: float = 30.0,
        correlation_id: str | None = None,
    ) -> dict[str, Any]:
        """Publish an event and wait for a basic Redis-list reply.

        Replies are read from ``reply:{correlation_id}`` using ``BRPOP``.
        """
        cid = correlation_id or str(uuid.uuid4())
        await self.publish(event_type, data, correlation_id=cid)
        reply_key = f"reply:{cid}"
        try:
            result = await self.client.brpop(reply_key, timeout=max(1, math.ceil(timeout)))
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream reply connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream reply error: {e}") from e
        if result is None:
            raise StreamReplyTimeoutError(f"Timed out waiting for stream reply: {reply_key}")
        _key, raw_value = result
        value = decode_stream_value(raw_value)
        if isinstance(value, str):
            with contextlib.suppress(json.JSONDecodeError):
                value = json.loads(value)
        if not isinstance(value, dict):
            raise RedisServiceError(f"Stream reply must decode to dict, got {type(value).__name__}")
        return value

    async def publish_reply(
        self,
        correlation_id: str,
        data: dict[str, Any],
        *,
        ttl: int | None = None,
    ) -> int:
        """Publish a basic request/reply response to ``reply:{correlation_id}``."""
        reply_key = f"reply:{correlation_id}"
        encoded = encode_stream_payload({"reply": data})["reply"]
        try:
            length = await self.client.lpush(reply_key, encoded)
            if ttl is not None:
                await self.client.expire(reply_key, ttl)
            return int(length)
        except (ConnectionError, TimeoutError) as e:
            raise RedisConnectionError(f"Stream reply publish connection failed: {e}") from e
        except RedisError as e:
            raise RedisServiceError(f"Stream reply publish error: {e}") from e

    async def add_event(self, event_type: str, data: dict[str, Any]) -> str:
        """Compatibility alias for :meth:`publish`."""
        return await self.publish(event_type, data)

    @staticmethod
    def _sanitize(data: dict[str, Any]) -> dict[str, str]:
        """Encode all values for Redis Stream storage."""
        return encode_stream_payload(data)
Functions
publish(event_type, data, *, correlation_id=None) async

Append an event to the Redis Stream (XADD).

Parameters:

Name Type Description Default
event_type str

Event type label, e.g. "new_appointment" or "new_contact".

required
data dict[str, Any]

Event payload.

required
correlation_id str | None

Optional request/reply correlation id.

None

Returns:

Type Description
str

The ID of the newly added stream entry (e.g. "1718000000000-0").

Raises:

Type Description
RedisConnectionError

Redis connection failure.

RedisServiceError

Redis operation failure.

Source code in src/codex_platform/streams/producer.py
async def publish(
    self,
    event_type: str,
    data: dict[str, Any],
    *,
    correlation_id: str | None = None,
) -> str:
    """Append an event to the Redis Stream (XADD).

    Args:
        event_type: Event type label, e.g. ``"new_appointment"`` or ``"new_contact"``.
        data: Event payload.
        correlation_id: Optional request/reply correlation id.

    Returns:
        The ID of the newly added stream entry (e.g. ``"1718000000000-0"``).

    Raises:
        RedisConnectionError: Redis connection failure.
        RedisServiceError: Redis operation failure.
    """
    payload_data = {"type": event_type, **data}
    if correlation_id is not None:
        payload_data["correlation_id"] = correlation_id
    payload = self._sanitize(payload_data)
    try:
        result = await self.client.xadd(self.stream_name, payload)
        log.info("StreamProducer | added event_type='%s' id='%s' stream='%s'", event_type, result, self.stream_name)
        return str(result)
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream producer connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream producer error: {e}") from e
request(event_type, data, *, timeout=30.0, correlation_id=None) async

Publish an event and wait for a basic Redis-list reply.

Replies are read from reply:{correlation_id} using BRPOP.

Source code in src/codex_platform/streams/producer.py
async def request(
    self,
    event_type: str,
    data: dict[str, Any],
    *,
    timeout: float = 30.0,
    correlation_id: str | None = None,
) -> dict[str, Any]:
    """Publish an event and wait for a basic Redis-list reply.

    Replies are read from ``reply:{correlation_id}`` using ``BRPOP``.
    """
    cid = correlation_id or str(uuid.uuid4())
    await self.publish(event_type, data, correlation_id=cid)
    reply_key = f"reply:{cid}"
    try:
        result = await self.client.brpop(reply_key, timeout=max(1, math.ceil(timeout)))
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream reply connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream reply error: {e}") from e
    if result is None:
        raise StreamReplyTimeoutError(f"Timed out waiting for stream reply: {reply_key}")
    _key, raw_value = result
    value = decode_stream_value(raw_value)
    if isinstance(value, str):
        with contextlib.suppress(json.JSONDecodeError):
            value = json.loads(value)
    if not isinstance(value, dict):
        raise RedisServiceError(f"Stream reply must decode to dict, got {type(value).__name__}")
    return value
publish_reply(correlation_id, data, *, ttl=None) async

Publish a basic request/reply response to reply:{correlation_id}.

Source code in src/codex_platform/streams/producer.py
async def publish_reply(
    self,
    correlation_id: str,
    data: dict[str, Any],
    *,
    ttl: int | None = None,
) -> int:
    """Publish a basic request/reply response to ``reply:{correlation_id}``."""
    reply_key = f"reply:{correlation_id}"
    encoded = encode_stream_payload({"reply": data})["reply"]
    try:
        length = await self.client.lpush(reply_key, encoded)
        if ttl is not None:
            await self.client.expire(reply_key, ttl)
        return int(length)
    except (ConnectionError, TimeoutError) as e:
        raise RedisConnectionError(f"Stream reply publish connection failed: {e}") from e
    except RedisError as e:
        raise RedisServiceError(f"Stream reply publish error: {e}") from e
add_event(event_type, data) async

Compatibility alias for :meth:publish.

Source code in src/codex_platform/streams/producer.py
async def add_event(self, event_type: str, data: dict[str, Any]) -> str:
    """Compatibility alias for :meth:`publish`."""
    return await self.publish(event_type, data)

StreamReplyTimeoutError

Bases: RedisServiceError

Raised when a request/reply stream call does not receive a reply in time.

Source code in src/codex_platform/streams/producer.py
class StreamReplyTimeoutError(RedisServiceError):
    """Raised when a request/reply stream call does not receive a reply in time."""

StreamHandlerSpec dataclass

Registered stream handler metadata.

Source code in src/codex_platform/streams/router.py
@dataclass(frozen=True)
class StreamHandlerSpec:
    """Registered stream handler metadata."""

    event_type: str
    handler: HandlerFunc
    filter_func: FilterFunc | None = None
    group: str | None = None
    reply: bool = False

StreamRouter

Groups Redis Stream event handlers by message type.

After populating, include into a StreamDispatcher via include_router().

Example::

router = StreamRouter()

@router.on("order.paid")
async def on_order_paid(payload: dict) -> None:
    ...

dispatcher.include_router(router)
Source code in src/codex_platform/streams/router.py
class StreamRouter:
    """Groups Redis Stream event handlers by message type.

    After populating, include into a ``StreamDispatcher`` via ``include_router()``.

    Example::

        router = StreamRouter()

        @router.on("order.paid")
        async def on_order_paid(payload: dict) -> None:
            ...

        dispatcher.include_router(router)
    """

    def __init__(self) -> None:
        self._handlers: dict[str, list[StreamHandlerSpec]] = {}

    def on(
        self,
        event_type: str,
        filter_func: FilterFunc | None = None,
        *,
        group: str | None = None,
        reply: bool = False,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler for an event type.

        Args:
            event_type:  Stream message type (e.g. ``"booking.confirmed"``).
            filter_func: Optional ``payload -> bool`` filter. Handler is called
                         only when filter returns ``True``.
            group: Optional logical processing group used by ``StreamRuntime``.
            reply: Whether the handler participates in request/reply flows.

        Returns:
            Decorator (returns the original handler unchanged).
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append(
                StreamHandlerSpec(
                    event_type=event_type,
                    handler=handler,
                    filter_func=filter_func,
                    group=group,
                    reply=reply,
                )
            )
            return handler

        return decorator

    @property
    def handlers(self) -> dict[str, list[StreamHandlerSpec]]:
        """Registered handlers (read-only)."""
        return self._handlers
Attributes
handlers property

Registered handlers (read-only).

Functions
on(event_type, filter_func=None, *, group=None, reply=False)

Decorator for registering a handler for an event type.

Parameters:

Name Type Description Default
event_type str

Stream message type (e.g. "booking.confirmed").

required
filter_func FilterFunc | None

Optional payload -> bool filter. Handler is called only when filter returns True.

None
group str | None

Optional logical processing group used by StreamRuntime.

None
reply bool

Whether the handler participates in request/reply flows.

False

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator (returns the original handler unchanged).

Source code in src/codex_platform/streams/router.py
def on(
    self,
    event_type: str,
    filter_func: FilterFunc | None = None,
    *,
    group: str | None = None,
    reply: bool = False,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler for an event type.

    Args:
        event_type:  Stream message type (e.g. ``"booking.confirmed"``).
        filter_func: Optional ``payload -> bool`` filter. Handler is called
                     only when filter returns ``True``.
        group: Optional logical processing group used by ``StreamRuntime``.
        reply: Whether the handler participates in request/reply flows.

    Returns:
        Decorator (returns the original handler unchanged).
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(
            StreamHandlerSpec(
                event_type=event_type,
                handler=handler,
                filter_func=filter_func,
                group=group,
                reply=reply,
            )
        )
        return handler

    return decorator

StreamRuntime

Convenience runtime for grouped Redis Streams handlers.

A runtime owns a dispatcher, consumer, producer, and processor for one Redis stream. Logical handler groups are selected by StreamRuntimeConfig while Redis delivery remains controlled by consumer_group.

Source code in src/codex_platform/streams/runtime.py
class StreamRuntime:
    """Convenience runtime for grouped Redis Streams handlers.

    A runtime owns a dispatcher, consumer, producer, and processor for one Redis
    stream. Logical handler groups are selected by ``StreamRuntimeConfig`` while
    Redis delivery remains controlled by ``consumer_group``.
    """

    def __init__(
        self,
        redis: Redis,
        config: StreamRuntimeConfig,
        *,
        retry_scheduler: RetrySchedulerProtocol | None = None,
    ) -> None:
        config.validate()
        self.redis = redis
        self.config = config
        self.dispatcher = StreamDispatcher(retry_scheduler=retry_scheduler)
        self.producer = StreamProducer(redis, config.stream_name)
        self.consumer = StreamConsumer(
            redis,
            config.stream_name,
            config.consumer_group,
            config.consumer_name,
        )
        self.processor = StreamProcessor(
            storage=self.consumer,
            stream_name=config.stream_name,
            consumer_group_name=config.consumer_group,
            consumer_name=config.consumer_name,
            batch_count=config.batch_count,
            poll_interval=config.poll_interval,
            block_ms=config.block_ms,
        )
        self.processor.set_callback(self.dispatcher.process)

    def include_router(self, router: StreamRouter) -> None:
        """Include handlers from a router, respecting configured logical groups."""
        self.dispatcher.include_router(router, enabled_groups=self.config.enabled_group_set)

    async def start(self) -> None:
        """Start the underlying stream processor."""
        await self.processor.start()

    async def stop(self) -> None:
        """Stop the underlying stream processor."""
        await self.processor.stop()
Functions
include_router(router)

Include handlers from a router, respecting configured logical groups.

Source code in src/codex_platform/streams/runtime.py
def include_router(self, router: StreamRouter) -> None:
    """Include handlers from a router, respecting configured logical groups."""
    self.dispatcher.include_router(router, enabled_groups=self.config.enabled_group_set)
start() async

Start the underlying stream processor.

Source code in src/codex_platform/streams/runtime.py
async def start(self) -> None:
    """Start the underlying stream processor."""
    await self.processor.start()
stop() async

Stop the underlying stream processor.

Source code in src/codex_platform/streams/runtime.py
async def stop(self) -> None:
    """Stop the underlying stream processor."""
    await self.processor.stop()

StreamRuntimeConfig dataclass

Configuration for one running Redis Streams worker process.

Source code in src/codex_platform/streams/runtime.py
@dataclass(frozen=True)
class StreamRuntimeConfig:
    """Configuration for one running Redis Streams worker process."""

    stream_name: str
    consumer_group: str
    consumer_name: str
    enabled_groups: Iterable[str] | None = None
    batch_count: int = 10
    poll_interval: float = 1.0
    block_ms: int | None = 1000
    default_monolith_group: str = "monolith"

    @property
    def enabled_group_set(self) -> set[str] | None:
        """Normalized logical groups, or ``None`` when all handlers are enabled."""
        return None if self.enabled_groups is None else set(self.enabled_groups)

    def validate(self) -> None:
        """Reject ambiguous partial-group startup on the default monolith group."""
        if self.enabled_group_set is not None and self.consumer_group == self.default_monolith_group:
            raise ValueError("Partial stream runtimes must use an explicit non-monolith consumer_group")
Attributes
enabled_group_set property

Normalized logical groups, or None when all handlers are enabled.

Functions
validate()

Reject ambiguous partial-group startup on the default monolith group.

Source code in src/codex_platform/streams/runtime.py
def validate(self) -> None:
    """Reject ambiguous partial-group startup on the default monolith group."""
    if self.enabled_group_set is not None and self.consumer_group == self.default_monolith_group:
        raise ValueError("Partial stream runtimes must use an explicit non-monolith consumer_group")