Skip to content

streams.dispatcher

dispatcher

codex_platform.streams.dispatcher

Generic Stream event dispatcher — routes messages to registered handlers.

Framework-agnostic: no DI container, no bot references. For framework-specific dispatchers (e.g. with a DI container) — extend this class.

Usage::

from codex_platform.streams.dispatcher import StreamDispatcher
from codex_platform.streams.router import StreamRouter

dispatcher = StreamDispatcher()

# Register handlers directly:
@dispatcher.on("booking.confirmed")
async def handle_booking(payload: dict) -> None:
    ...

# Or include a router from a feature module:
dispatcher.include_router(notifications_router)

# Connect to StreamProcessor:
processor.set_callback(dispatcher.process)
await processor.start()

Extending for framework-specific DI::

class BotDispatcher(StreamDispatcher):
    def __init__(self, container):
        super().__init__()
        self.container = container

    async def process(self, payload: dict) -> None:
        # inject container into handlers, etc.
        ...

Classes

RetrySchedulerProtocol

Bases: Protocol

Protocol for a retry scheduler (ARQ, Celery, etc.).

Pass to StreamDispatcher for automatic rescheduling of failed messages.

Source code in src/codex_platform/streams/dispatcher.py
@runtime_checkable
class RetrySchedulerProtocol(Protocol):
    """Protocol for a retry scheduler (ARQ, Celery, etc.).

    Pass to ``StreamDispatcher`` for automatic rescheduling of failed messages.
    """

    async def schedule_retry(
        self,
        stream_name: str,
        payload: dict[str, Any],
        delay: int = 60,
    ) -> None:
        """Schedules message reprocessing after a delay.

        Args:
            stream_name: Redis Stream name.
            payload:     Original message data.
            delay:       Retry delay in seconds.
        """
        ...
Functions
schedule_retry(stream_name, payload, delay=60) async

Schedules message reprocessing after a delay.

Parameters:

Name Type Description Default
stream_name str

Redis Stream name.

required
payload dict[str, Any]

Original message data.

required
delay int

Retry delay in seconds.

60
Source code in src/codex_platform/streams/dispatcher.py
async def schedule_retry(
    self,
    stream_name: str,
    payload: dict[str, Any],
    delay: int = 60,
) -> None:
    """Schedules message reprocessing after a delay.

    Args:
        stream_name: Redis Stream name.
        payload:     Original message data.
        delay:       Retry delay in seconds.
    """
    ...

StreamDispatcher

Routes Redis Stream messages to registered handlers by event type.

Handlers are registered via @dispatcher.on(event_type) decorator or by including StreamRouter instances.

On handler failure: if a retry_scheduler is provided, the message is scheduled for retry. Otherwise the exception is re-raised (message stays in PEL, unacknowledged).

Parameters:

Name Type Description Default
retry_scheduler RetrySchedulerProtocol | None

Optional retry scheduler implementing RetrySchedulerProtocol.

None

Example::

dispatcher = StreamDispatcher()

@dispatcher.on("user.registered")
async def welcome(payload: dict) -> None:
    await send_welcome_email(payload["email"])

processor.set_callback(dispatcher.process)
Source code in src/codex_platform/streams/dispatcher.py
class StreamDispatcher:
    """Routes Redis Stream messages to registered handlers by event type.

    Handlers are registered via ``@dispatcher.on(event_type)`` decorator
    or by including ``StreamRouter`` instances.

    On handler failure: if a ``retry_scheduler`` is provided, the message
    is scheduled for retry. Otherwise the exception is re-raised (message
    stays in PEL, unacknowledged).

    Args:
        retry_scheduler: Optional retry scheduler implementing ``RetrySchedulerProtocol``.

    Example::

        dispatcher = StreamDispatcher()

        @dispatcher.on("user.registered")
        async def welcome(payload: dict) -> None:
            await send_welcome_email(payload["email"])

        processor.set_callback(dispatcher.process)
    """

    def __init__(self, retry_scheduler: RetrySchedulerProtocol | None = None) -> None:
        self._retry_scheduler = retry_scheduler
        self._handlers: dict[str, list[tuple[HandlerFunc, FilterFunc | None]]] = {}
        log.info("StreamDispatcher | initialized")

    def include_router(self, router: StreamRouter) -> None:
        """Merges handlers from a ``StreamRouter`` into this dispatcher.

        Args:
            router: Router from a feature module.
        """
        for event_type, handlers in router.handlers.items():
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].extend(handlers)
        log.info("StreamDispatcher | included router types=%s", list(router.handlers.keys()))

    def on(
        self,
        event_type: str,
        filter_func: FilterFunc | None = None,
    ) -> Callable[[HandlerFunc], HandlerFunc]:
        """Decorator for registering a handler directly on the dispatcher.

        Args:
            event_type:  Stream message type (e.g. ``"booking.confirmed"``).
            filter_func: Optional ``payload -> bool`` filter.
        """

        def decorator(handler: HandlerFunc) -> HandlerFunc:
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append((handler, filter_func))
            return handler

        return decorator

    async def process(self, payload: dict[str, Any], stream_name: str = "") -> None:
        """Dispatches an incoming message to matching handlers.

        Called by ``StreamProcessor`` on each incoming message.

        Args:
            payload:     Message data dict. Must contain ``"type"`` field.
            stream_name: Stream name (used for retry scheduling only).
        """
        event_type = payload.get("type")
        if not event_type:
            log.warning("StreamDispatcher | message without 'type' field: %s", payload)
            return

        handlers = self._handlers.get(event_type, [])
        if not handlers:
            log.debug("StreamDispatcher | no handlers for type='%s'", event_type)
            return

        for handler, filter_func in handlers:
            try:
                if filter_func is None or filter_func(payload):
                    log.debug("StreamDispatcher | calling %s for type='%s'", handler.__name__, event_type)
                    await handler(payload)
            except Exception as e:
                log.error("StreamDispatcher | handler %s failed: %s", handler.__name__, e)

                if self._retry_scheduler:
                    try:
                        await self._retry_scheduler.schedule_retry(
                            stream_name=stream_name,
                            payload=payload,
                            delay=60,
                        )
                        log.info("StreamDispatcher | retry scheduled for type='%s'", event_type)
                        return
                    except Exception as retry_err:
                        log.error("StreamDispatcher | retry scheduling failed: %s", retry_err)

                raise
Functions
include_router(router)

Merges handlers from a StreamRouter into this dispatcher.

Parameters:

Name Type Description Default
router StreamRouter

Router from a feature module.

required
Source code in src/codex_platform/streams/dispatcher.py
def include_router(self, router: StreamRouter) -> None:
    """Merges handlers from a ``StreamRouter`` into this dispatcher.

    Args:
        router: Router from a feature module.
    """
    for event_type, handlers in router.handlers.items():
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].extend(handlers)
    log.info("StreamDispatcher | included router types=%s", list(router.handlers.keys()))
on(event_type, filter_func=None)

Decorator for registering a handler directly on the dispatcher.

Parameters:

Name Type Description Default
event_type str

Stream message type (e.g. "booking.confirmed").

required
filter_func FilterFunc | None

Optional payload -> bool filter.

None
Source code in src/codex_platform/streams/dispatcher.py
def on(
    self,
    event_type: str,
    filter_func: FilterFunc | None = None,
) -> Callable[[HandlerFunc], HandlerFunc]:
    """Decorator for registering a handler directly on the dispatcher.

    Args:
        event_type:  Stream message type (e.g. ``"booking.confirmed"``).
        filter_func: Optional ``payload -> bool`` filter.
    """

    def decorator(handler: HandlerFunc) -> HandlerFunc:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append((handler, filter_func))
        return handler

    return decorator
process(payload, stream_name='') async

Dispatches an incoming message to matching handlers.

Called by StreamProcessor on each incoming message.

Parameters:

Name Type Description Default
payload dict[str, Any]

Message data dict. Must contain "type" field.

required
stream_name str

Stream name (used for retry scheduling only).

''
Source code in src/codex_platform/streams/dispatcher.py
async def process(self, payload: dict[str, Any], stream_name: str = "") -> None:
    """Dispatches an incoming message to matching handlers.

    Called by ``StreamProcessor`` on each incoming message.

    Args:
        payload:     Message data dict. Must contain ``"type"`` field.
        stream_name: Stream name (used for retry scheduling only).
    """
    event_type = payload.get("type")
    if not event_type:
        log.warning("StreamDispatcher | message without 'type' field: %s", payload)
        return

    handlers = self._handlers.get(event_type, [])
    if not handlers:
        log.debug("StreamDispatcher | no handlers for type='%s'", event_type)
        return

    for handler, filter_func in handlers:
        try:
            if filter_func is None or filter_func(payload):
                log.debug("StreamDispatcher | calling %s for type='%s'", handler.__name__, event_type)
                await handler(payload)
        except Exception as e:
            log.error("StreamDispatcher | handler %s failed: %s", handler.__name__, e)

            if self._retry_scheduler:
                try:
                    await self._retry_scheduler.schedule_retry(
                        stream_name=stream_name,
                        payload=payload,
                        delay=60,
                    )
                    log.info("StreamDispatcher | retry scheduled for type='%s'", event_type)
                    return
                except Exception as retry_err:
                    log.error("StreamDispatcher | retry scheduling failed: %s", retry_err)

            raise