Skip to content

notifications.delivery.direct

direct

Deprecated compatibility direct adapter module.

Classes

DirectNotificationAdapter

Bases: NotificationAdapter

Compatibility wrapper preserving legacy monkeypatch points.

Source code in src/codex_platform/notifications/delivery/direct.py
class DirectNotificationAdapter(NotificationAdapter):
    """Compatibility wrapper preserving legacy monkeypatch points."""

    def __init__(self, config: Any) -> None:
        self.config = config

    def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
        from codex_platform.notifications.dto import NotificationPayloadDTO
        from codex_platform.notifications.orchestrator import BaseDeliveryOrchestrator
        from codex_platform.notifications.registry import ChannelRegistry

        log.debug("DirectNotificationAdapter | starting direct delivery")

        registry = ChannelRegistry()
        _register_default_channels(registry, self.config)
        channels = registry.build_channels(self.config)
        orchestrator = BaseDeliveryOrchestrator(channels=channels)

        payload_dto = NotificationPayloadDTO(**payload)
        asyncio.run(orchestrator.deliver(payload_dto))

        notification_id = payload.get("notification_id")
        log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
        return notification_id