Skip to content

messaging.delivery.direct

direct

Direct in-process delivery adapter.

Classes

DirectNotificationAdapter

Bases: NotificationAdapter

Adapter for synchronous in-process notification delivery.

Source code in src/codex_platform/messaging/delivery/direct.py
class DirectNotificationAdapter(NotificationAdapter):
    """Adapter for synchronous in-process notification delivery."""

    def __init__(self, config: Any) -> None:
        self.config = config

    def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
        """Deliver a notification synchronously via the orchestrator pipeline."""

        from codex_platform.messaging.dto import (
            NotificationPayloadDTO,
            RenderedNotificationDTO,
            TemplateNotificationDTO,
        )
        from codex_platform.messaging.orchestrator import BaseDeliveryOrchestrator
        from codex_platform.messaging.registry import ChannelRegistry

        log.debug("DirectNotificationAdapter | starting direct delivery")

        registry = ChannelRegistry()
        _register_default_channels(registry, self.config)
        channels = registry.build_channels(self.config)
        orchestrator = BaseDeliveryOrchestrator(channels=channels)

        payload_dto: NotificationPayloadDTO
        if "html_content" in payload:
            payload_dto = RenderedNotificationDTO(**payload)
        elif "template_name" in payload and "context_key" in payload:
            payload_dto = TemplateNotificationDTO(**payload)
        else:
            payload_dto = NotificationPayloadDTO(**payload)
        asyncio.run(orchestrator.deliver(payload_dto))

        notification_id = payload.get("notification_id")
        log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
        return notification_id
Functions
enqueue(_task_name, payload)

Deliver a notification synchronously via the orchestrator pipeline.

Source code in src/codex_platform/messaging/delivery/direct.py
def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
    """Deliver a notification synchronously via the orchestrator pipeline."""

    from codex_platform.messaging.dto import (
        NotificationPayloadDTO,
        RenderedNotificationDTO,
        TemplateNotificationDTO,
    )
    from codex_platform.messaging.orchestrator import BaseDeliveryOrchestrator
    from codex_platform.messaging.registry import ChannelRegistry

    log.debug("DirectNotificationAdapter | starting direct delivery")

    registry = ChannelRegistry()
    _register_default_channels(registry, self.config)
    channels = registry.build_channels(self.config)
    orchestrator = BaseDeliveryOrchestrator(channels=channels)

    payload_dto: NotificationPayloadDTO
    if "html_content" in payload:
        payload_dto = RenderedNotificationDTO(**payload)
    elif "template_name" in payload and "context_key" in payload:
        payload_dto = TemplateNotificationDTO(**payload)
    else:
        payload_dto = NotificationPayloadDTO(**payload)
    asyncio.run(orchestrator.deliver(payload_dto))

    notification_id = payload.get("notification_id")
    log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
    return notification_id