Skip to content

streams.router

router

codex_platform.streams.router

Stream event router — groups handlers by message type.

Analogous to Aiogram/FastAPI routers: define handlers in feature modules, then include into a dispatcher. Framework-agnostic.

Usage::

from codex_platform.streams.router import StreamRouter

router = StreamRouter()

@router.on("booking.confirmed")
async def handle_booking(payload: dict) -> None:
    booking_id = payload["booking_id"]
    ...

@router.on("notification.created", filter_func=lambda p: p.get("urgent"))
async def handle_urgent(payload: dict) -> None:
    ...

Classes

StreamRouter

Groups Redis Stream event handlers by message type.

After populating, include into a StreamDispatcher via include_router().

Example::

router = StreamRouter()

@router.on("order.paid")
async def on_order_paid(payload: dict) -> None:
    ...

dispatcher.include_router(router)
Source code in src/codex_platform/streams/router.py
class StreamRouter:
    """Groups Redis Stream event handlers by message type.

    After populating, include into a ``StreamDispatcher`` via ``include_router()``.

    Example::

        router = StreamRouter()

        @router.on("order.paid")
        async def on_order_paid(payload: dict) -> None:
            ...

        dispatcher.include_router(router)
    """

    def __init__(self) -> None:
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}

    def on(
        self,
        event_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler for an event type.

        Args:
            event_type:  Stream message type (e.g. ``"booking.confirmed"``).
            filter_func: Optional ``payload -> bool`` filter. Handler is called
                         only when filter returns ``True``.

        Returns:
            Decorator (returns the original handler unchanged).
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append((handler, filter_func))
            return handler

        return decorator

    @property
    def handlers(self) -> dict[str, list[tuple[HandlerFunc, FilterFunc | None]]]:
        """Registered handlers (read-only)."""
        return self._handlers
Attributes
handlers property

Registered handlers (read-only).

Functions
on(event_type, filter_func=None)

Decorator for registering a handler for an event type.

Parameters:

Name Type Description Default
event_type str

Stream message type (e.g. "booking.confirmed").

required
filter_func FilterFunc | None

Optional payload -> bool filter. Handler is called only when filter returns True.

None

Returns:

Type Description
Callable[[HandlerFunc], HandlerFunc]

Decorator (returns the original handler unchanged).

Source code in src/codex_platform/streams/router.py
def on(
    self,
    event_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler for an event type.

    Args:
        event_type:  Stream message type (e.g. ``"booking.confirmed"``).
        filter_func: Optional ``payload -> bool`` filter. Handler is called
                     only when filter returns ``True``.

    Returns:
        Decorator (returns the original handler unchanged).
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append((handler, filter_func))
        return handler

    return decorator