Bases: NotificationAdapter
Compatibility wrapper preserving legacy monkeypatch points.
Source code in src/codex_platform/notifications/delivery/direct.py
| class DirectNotificationAdapter(NotificationAdapter):
"""Compatibility wrapper preserving legacy monkeypatch points."""
def __init__(self, config: Any) -> None:
self.config = config
def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
from codex_platform.notifications.dto import NotificationPayloadDTO
from codex_platform.notifications.orchestrator import BaseDeliveryOrchestrator
from codex_platform.notifications.registry import ChannelRegistry
log.debug("DirectNotificationAdapter | starting direct delivery")
registry = ChannelRegistry()
_register_default_channels(registry, self.config)
channels = registry.build_channels(self.config)
orchestrator = BaseDeliveryOrchestrator(channels=channels)
payload_dto = NotificationPayloadDTO(**payload)
asyncio.run(orchestrator.deliver(payload_dto))
notification_id = payload.get("notification_id")
log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
return notification_id
|