Bases: NotificationAdapter
Adapter for synchronous in-process notification delivery.
Source code in src/codex_platform/messaging/delivery/direct.py
| class DirectNotificationAdapter(NotificationAdapter):
"""Adapter for synchronous in-process notification delivery."""
def __init__(self, config: Any) -> None:
self.config = config
def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
"""Deliver a notification synchronously via the orchestrator pipeline."""
from codex_platform.messaging.dto import (
NotificationPayloadDTO,
RenderedNotificationDTO,
TemplateNotificationDTO,
)
from codex_platform.messaging.orchestrator import BaseDeliveryOrchestrator
from codex_platform.messaging.registry import ChannelRegistry
log.debug("DirectNotificationAdapter | starting direct delivery")
registry = ChannelRegistry()
_register_default_channels(registry, self.config)
channels = registry.build_channels(self.config)
orchestrator = BaseDeliveryOrchestrator(channels=channels)
payload_dto: NotificationPayloadDTO
if "html_content" in payload:
payload_dto = RenderedNotificationDTO(**payload)
elif "template_name" in payload and "context_key" in payload:
payload_dto = TemplateNotificationDTO(**payload)
else:
payload_dto = NotificationPayloadDTO(**payload)
asyncio.run(orchestrator.deliver(payload_dto))
notification_id = payload.get("notification_id")
log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
return notification_id
|
Deliver a notification synchronously via the orchestrator pipeline.
Source code in src/codex_platform/messaging/delivery/direct.py
| def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
"""Deliver a notification synchronously via the orchestrator pipeline."""
from codex_platform.messaging.dto import (
NotificationPayloadDTO,
RenderedNotificationDTO,
TemplateNotificationDTO,
)
from codex_platform.messaging.orchestrator import BaseDeliveryOrchestrator
from codex_platform.messaging.registry import ChannelRegistry
log.debug("DirectNotificationAdapter | starting direct delivery")
registry = ChannelRegistry()
_register_default_channels(registry, self.config)
channels = registry.build_channels(self.config)
orchestrator = BaseDeliveryOrchestrator(channels=channels)
payload_dto: NotificationPayloadDTO
if "html_content" in payload:
payload_dto = RenderedNotificationDTO(**payload)
elif "template_name" in payload and "context_key" in payload:
payload_dto = TemplateNotificationDTO(**payload)
else:
payload_dto = NotificationPayloadDTO(**payload)
asyncio.run(orchestrator.deliver(payload_dto))
notification_id = payload.get("notification_id")
log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
return notification_id
|