notifications.orchestrator
orchestrator
codex_platform.workers.notifications.orchestrator
Fallback delivery orchestrator. Tries channels in order; stops on first success.
Usage
registry = ChannelRegistry() registry.register("smtp", lambda cfg: AsyncEmailClient(...)) channels = registry.build_channels(settings)
orchestrator = BaseDeliveryOrchestrator(channels=channels) success = await orchestrator.deliver(payload_dto)
Classes
DeliveryChannel
Bases: Protocol
Contract for a single delivery method.
Implementors: AsyncEmailClient, DjangoMailChannel, TelegramChannel, etc.
Source code in src/codex_platform/notifications/orchestrator.py
Functions
send(to, subject, html_content, text_content)
async
Attempt delivery. Return True on success, False on logical failure. Raise on infrastructure errors (so the orchestrator can log and try next channel).
Source code in src/codex_platform/notifications/orchestrator.py
BaseDeliveryOrchestrator
Tries channels in order; stops on first success.
Channels are injected at construction time via Dependency Injection.
deliver() is a pure send operation with no side effects beyond delivery.
Source code in src/codex_platform/notifications/orchestrator.py
Functions
__init__(channels)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channels
|
list[DeliveryChannel]
|
Ordered list of delivery channels to try.
Use |
required |
deliver(payload)
async
Deliver a notification payload through available channels.
Tries each channel in order; stops on first success. If a channel raises, logs the exception and falls through to the next.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
NotificationPayloadDTO
|
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one channel succeeded, False if all exhausted. |