streams.dispatcher
dispatcher
codex_platform.streams.dispatcher
Generic Stream event dispatcher — routes messages to registered handlers.
Framework-agnostic: no DI container, no bot references. For framework-specific dispatchers (e.g. with a DI container) — extend this class.
Usage::
from codex_platform.streams.dispatcher import StreamDispatcher
from codex_platform.streams.router import StreamRouter
dispatcher = StreamDispatcher()
# Register handlers directly:
@dispatcher.on("booking.confirmed")
async def handle_booking(payload: dict) -> None:
...
# Or include a router from a feature module:
dispatcher.include_router(notifications_router)
# Connect to StreamProcessor:
processor.set_callback(dispatcher.process)
await processor.start()
Extending for framework-specific DI::
class BotDispatcher(StreamDispatcher):
def __init__(self, container):
super().__init__()
self.container = container
async def process(self, payload: dict) -> None:
# inject container into handlers, etc.
...
Classes
RetrySchedulerProtocol
Bases: Protocol
Protocol for a retry scheduler (ARQ, Celery, etc.).
Pass to StreamDispatcher for automatic rescheduling of failed messages.
Source code in src/codex_platform/streams/dispatcher.py
Functions
schedule_retry(stream_name, payload, delay=60)
async
Schedules message reprocessing after a delay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_name
|
str
|
Redis Stream name. |
required |
payload
|
dict[str, Any]
|
Original message data. |
required |
delay
|
int
|
Retry delay in seconds. |
60
|
Source code in src/codex_platform/streams/dispatcher.py
StreamDispatcher
Routes Redis Stream messages to registered handlers by event type.
Handlers are registered via @dispatcher.on(event_type) decorator
or by including StreamRouter instances.
On handler failure: if a retry_scheduler is provided, the message
is scheduled for retry. Otherwise the exception is re-raised (message
stays in PEL, unacknowledged).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_scheduler
|
RetrySchedulerProtocol | None
|
Optional retry scheduler implementing |
None
|
Example::
dispatcher = StreamDispatcher()
@dispatcher.on("user.registered")
async def welcome(payload: dict) -> None:
await send_welcome_email(payload["email"])
processor.set_callback(dispatcher.process)
Source code in src/codex_platform/streams/dispatcher.py
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | |
Functions
include_router(router)
Merges handlers from a StreamRouter into this dispatcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
router
|
StreamRouter
|
Router from a feature module. |
required |
Source code in src/codex_platform/streams/dispatcher.py
on(event_type, filter_func=None)
Decorator for registering a handler directly on the dispatcher.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
str
|
Stream message type (e.g. |
required |
filter_func
|
FilterFunc | None
|
Optional |
None
|
Source code in src/codex_platform/streams/dispatcher.py
process(payload, stream_name='')
async
Dispatches an incoming message to matching handlers.
Called by StreamProcessor on each incoming message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Message data dict. Must contain |
required |
stream_name
|
str
|
Stream name (used for retry scheduling only). |
''
|