Skip to content

notifications

notifications

codex_platform.notifications

Building blocks for notification delivery.

# Contracts / DTOs
from codex_platform.notifications import NotificationPayloadDTO, NotificationRecipient
from codex_platform.notifications import NotificationChannel

# Channel pipeline
from codex_platform.notifications import AsyncEmailClient
from codex_platform.notifications import BaseDeliveryOrchestrator, ChannelRegistry

# Delivery adapters (how payload gets to the pipeline)
from codex_platform.notifications.delivery import ArqNotificationAdapter
from codex_platform.notifications.delivery import DirectNotificationAdapter

# Template rendering (requires jinja2 — install separately)
from codex_platform.notifications.renderer import TemplateRenderer

Classes

NotificationChannel

Bases: StrEnum

Supported notification delivery channel identifiers.

Source code in src/codex_platform/notifications/channels.py
class NotificationChannel(StrEnum):
    """Supported notification delivery channel identifiers."""

    EMAIL = "email"
    TELEGRAM = "telegram"
    SMS = "sms"
    WHATSAPP = "whatsapp"

AsyncEmailClient

Async SMTP email sender. Implements DeliveryChannel.

Usage

client = AsyncEmailClient( smtp_host="mail.example.com", smtp_port=465, smtp_user="user", smtp_password="pass", # pragma: allowlist secret smtp_from_email="noreply@example.com", ) success = await client.send( to="user@example.com", subject="Hello", html_content="Hi", text_content="Hi", )

Source code in src/codex_platform/notifications/clients/smtp.py
class AsyncEmailClient:
    """
    Async SMTP email sender. Implements ``DeliveryChannel``.

    Usage:
        client = AsyncEmailClient(
            smtp_host="mail.example.com", smtp_port=465,
            smtp_user="user", smtp_password="pass",  # pragma: allowlist secret
            smtp_from_email="noreply@example.com",
        )
        success = await client.send(
            to="user@example.com",
            subject="Hello",
            html_content="<b>Hi</b>",
            text_content="Hi",
        )
    """

    def __init__(
        self,
        smtp_host: str,
        smtp_port: int,
        smtp_user: str | None = None,
        smtp_password: str | None = None,  # pragma: allowlist secret
        smtp_from_email: str | None = None,
        smtp_use_tls: bool = False,
    ) -> None:
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.smtp_user = smtp_user
        self.smtp_password = smtp_password
        self.smtp_from_email = smtp_from_email
        self.smtp_use_tls = smtp_use_tls

    def is_available(self) -> bool:
        """Returns True if the client is configured (non-localhost host)."""
        return bool(self.smtp_host and self.smtp_host != "localhost")

    async def send(
        self,
        to: str,
        subject: str,
        html_content: str | None,
        text_content: str | None,
        timeout: int = 15,
    ) -> bool:
        """
        Send email via SMTP.

        Args:
            to: Recipient email address.
            subject: Email subject line.
            html_content: HTML body (preferred). Used as HTML part.
            text_content: Plain-text body (fallback for HTML-disabled clients).
                          If None and html_content is provided, a generic fallback
                          is used.
            timeout: SMTP connection timeout in seconds.

        Returns:
            True on success.

        Raises:
            RuntimeError: If aiosmtplib is not installed.
            aiosmtplib.SMTPException: On SMTP delivery failure.
        """
        if not _AIOSMTP_AVAILABLE:
            raise RuntimeError("aiosmtplib not installed. Run: pip install codex-tools[notifications]")
        await self._send_smtp(to, subject, html_content, text_content, timeout)
        log.info("AsyncEmailClient | sent to='%s'", to)
        return True

    async def _send_smtp(
        self,
        to: str,
        subject: str,
        html_content: str | None,
        text_content: str | None,
        timeout: int,
    ) -> None:
        message = EmailMessage()
        message["From"] = self.smtp_from_email
        message["To"] = to
        message["Subject"] = subject

        plain = text_content or "Please enable HTML to view this email."
        message.set_content(plain)

        if html_content:
            message.add_alternative(html_content, subtype="html")

        use_ssl = self.smtp_port == 465
        start_tls = self.smtp_port == 587 or (self.smtp_use_tls and self.smtp_port != 465)

        send_kwargs: dict[str, Any] = {
            "hostname": self.smtp_host,
            "port": self.smtp_port,
            "use_tls": use_ssl,
            "start_tls": start_tls,
            "timeout": timeout,
        }

        if self.smtp_user and self.smtp_password:
            send_kwargs["username"] = self.smtp_user
            send_kwargs["password"] = self.smtp_password

        await aiosmtplib.send(message, **send_kwargs)
Functions
is_available()

Returns True if the client is configured (non-localhost host).

Source code in src/codex_platform/notifications/clients/smtp.py
def is_available(self) -> bool:
    """Returns True if the client is configured (non-localhost host)."""
    return bool(self.smtp_host and self.smtp_host != "localhost")
send(to, subject, html_content, text_content, timeout=15) async

Send email via SMTP.

Parameters:

Name Type Description Default
to str

Recipient email address.

required
subject str

Email subject line.

required
html_content str | None

HTML body (preferred). Used as HTML part.

required
text_content str | None

Plain-text body (fallback for HTML-disabled clients). If None and html_content is provided, a generic fallback is used.

required
timeout int

SMTP connection timeout in seconds.

15

Returns:

Type Description
bool

True on success.

Raises:

Type Description
RuntimeError

If aiosmtplib is not installed.

SMTPException

On SMTP delivery failure.

Source code in src/codex_platform/notifications/clients/smtp.py
async def send(
    self,
    to: str,
    subject: str,
    html_content: str | None,
    text_content: str | None,
    timeout: int = 15,
) -> bool:
    """
    Send email via SMTP.

    Args:
        to: Recipient email address.
        subject: Email subject line.
        html_content: HTML body (preferred). Used as HTML part.
        text_content: Plain-text body (fallback for HTML-disabled clients).
                      If None and html_content is provided, a generic fallback
                      is used.
        timeout: SMTP connection timeout in seconds.

    Returns:
        True on success.

    Raises:
        RuntimeError: If aiosmtplib is not installed.
        aiosmtplib.SMTPException: On SMTP delivery failure.
    """
    if not _AIOSMTP_AVAILABLE:
        raise RuntimeError("aiosmtplib not installed. Run: pip install codex-tools[notifications]")
    await self._send_smtp(to, subject, html_content, text_content, timeout)
    log.info("AsyncEmailClient | sent to='%s'", to)
    return True

ArqNotificationAdapter

Bases: NotificationAdapter

Notification adapter that enqueues tasks into an ARQ/Redis queue.

Parameters:

Name Type Description Default
pool ArqRedis

ArqRedis connection pool from arq.connections.create_pool.

required

Raises on infrastructure failures (Redis unavailable, serialization errors).

Source code in src/codex_platform/notifications/delivery/arq.py
class ArqNotificationAdapter(NotificationAdapter):
    """
    Notification adapter that enqueues tasks into an ARQ/Redis queue.

    Args:
        pool: ``ArqRedis`` connection pool from ``arq.connections.create_pool``.

    Raises on infrastructure failures (Redis unavailable, serialization errors).
    """

    def __init__(self, pool: ArqRedis) -> None:
        self.pool = pool

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Sync enqueue — wraps async via asyncio. Use enqueue_async in async contexts."""
        import asyncio

        return asyncio.get_event_loop().run_until_complete(self.enqueue_async(task_name, payload))

    async def enqueue_async(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """
        Enqueue a notification task into the ARQ queue (async).

        Args:
            task_name: ARQ worker function name (e.g. ``'send_notification_task'``).
            payload:   Serialized ``NotificationPayloadDTO``.

        Returns:
            Job ID string, or None if ARQ returned no handle.

        Raises:
            ConnectionError: Redis is unreachable.
            Exception: Any ARQ/Redis infrastructure failure.
        """
        log.debug("ArqNotificationAdapter | enqueueing task=%s", task_name)
        job = await self.pool.enqueue_job(task_name, payload_dict=payload)
        job_id = getattr(job, "job_id", None)
        log.info("ArqNotificationAdapter | task=%s job_id=%s", task_name, job_id)
        return job_id
Functions
enqueue(task_name, payload)

Sync enqueue — wraps async via asyncio. Use enqueue_async in async contexts.

Source code in src/codex_platform/notifications/delivery/arq.py
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Sync enqueue — wraps async via asyncio. Use enqueue_async in async contexts."""
    import asyncio

    return asyncio.get_event_loop().run_until_complete(self.enqueue_async(task_name, payload))
enqueue_async(task_name, payload) async

Enqueue a notification task into the ARQ queue (async).

Parameters:

Name Type Description Default
task_name str

ARQ worker function name (e.g. 'send_notification_task').

required
payload dict[str, Any]

Serialized NotificationPayloadDTO.

required

Returns:

Type Description
str | None

Job ID string, or None if ARQ returned no handle.

Raises:

Type Description
ConnectionError

Redis is unreachable.

Exception

Any ARQ/Redis infrastructure failure.

Source code in src/codex_platform/notifications/delivery/arq.py
async def enqueue_async(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """
    Enqueue a notification task into the ARQ queue (async).

    Args:
        task_name: ARQ worker function name (e.g. ``'send_notification_task'``).
        payload:   Serialized ``NotificationPayloadDTO``.

    Returns:
        Job ID string, or None if ARQ returned no handle.

    Raises:
        ConnectionError: Redis is unreachable.
        Exception: Any ARQ/Redis infrastructure failure.
    """
    log.debug("ArqNotificationAdapter | enqueueing task=%s", task_name)
    job = await self.pool.enqueue_job(task_name, payload_dict=payload)
    job_id = getattr(job, "job_id", None)
    log.info("ArqNotificationAdapter | task=%s job_id=%s", task_name, job_id)
    return job_id

DirectNotificationAdapter

Bases: NotificationAdapter

Adapter for synchronous/monolithic notification delivery.

Injects a pre-built list of channels (via ChannelRegistry) into BaseDeliveryOrchestrator and runs the async pipeline synchronously.

Uses lazy imports to avoid circular dependencies.

Source code in src/codex_platform/notifications/delivery/direct.py
class DirectNotificationAdapter(NotificationAdapter):
    """
    Adapter for synchronous/monolithic notification delivery.

    Injects a pre-built list of channels (via ``ChannelRegistry``) into
    ``BaseDeliveryOrchestrator`` and runs the async pipeline synchronously.

    Uses lazy imports to avoid circular dependencies.
    """

    def __init__(self, config: Any) -> None:
        self.config = config

    def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
        """
        Deliver a notification synchronously via the orchestrator pipeline.

        Args:
            _task_name: Unused — direct delivery runs in-process.
            payload:    Serialized ``NotificationPayloadDTO``.

        Returns:
            ``notification_id`` from payload on success, None if not present.

        Raises:
            RuntimeError: If called from within a running event loop.
            Exception: Channel infrastructure failures propagate upward.
        """
        from codex_platform.notifications.dto import NotificationPayloadDTO
        from codex_platform.notifications.orchestrator import BaseDeliveryOrchestrator
        from codex_platform.notifications.registry import ChannelRegistry

        log.debug("DirectNotificationAdapter | starting direct delivery")

        registry = ChannelRegistry()
        _register_default_channels(registry, self.config)
        channels = registry.build_channels(self.config)
        orchestrator = BaseDeliveryOrchestrator(channels=channels)

        payload_dto = NotificationPayloadDTO(**payload)
        asyncio.run(orchestrator.deliver(payload_dto))

        notification_id = payload.get("notification_id")
        log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
        return notification_id
Functions
enqueue(_task_name, payload)

Deliver a notification synchronously via the orchestrator pipeline.

Parameters:

Name Type Description Default
_task_name str

Unused — direct delivery runs in-process.

required
payload dict[str, Any]

Serialized NotificationPayloadDTO.

required

Returns:

Type Description
str | None

notification_id from payload on success, None if not present.

Raises:

Type Description
RuntimeError

If called from within a running event loop.

Exception

Channel infrastructure failures propagate upward.

Source code in src/codex_platform/notifications/delivery/direct.py
def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
    """
    Deliver a notification synchronously via the orchestrator pipeline.

    Args:
        _task_name: Unused — direct delivery runs in-process.
        payload:    Serialized ``NotificationPayloadDTO``.

    Returns:
        ``notification_id`` from payload on success, None if not present.

    Raises:
        RuntimeError: If called from within a running event loop.
        Exception: Channel infrastructure failures propagate upward.
    """
    from codex_platform.notifications.dto import NotificationPayloadDTO
    from codex_platform.notifications.orchestrator import BaseDeliveryOrchestrator
    from codex_platform.notifications.registry import ChannelRegistry

    log.debug("DirectNotificationAdapter | starting direct delivery")

    registry = ChannelRegistry()
    _register_default_channels(registry, self.config)
    channels = registry.build_channels(self.config)
    orchestrator = BaseDeliveryOrchestrator(channels=channels)

    payload_dto = NotificationPayloadDTO(**payload)
    asyncio.run(orchestrator.deliver(payload_dto))

    notification_id = payload.get("notification_id")
    log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
    return notification_id

NotificationAdapter

Bases: Protocol

Contract for notification delivery transport.

Allows the same business logic to work with ARQ, Celery, Direct calls, or Django's built-in mail system.

Implementations MUST
  • Raise exceptions on infrastructure failures (network, broker, DB).
  • Return a job/task ID (str) when the backend provides one.
  • Return None for fire-and-forget transports with no tracking ID.
Source code in src/codex_platform/notifications/delivery/base.py
class NotificationAdapter(Protocol):
    """
    Contract for notification delivery transport.

    Allows the same business logic to work with ARQ, Celery, Direct calls,
    or Django's built-in mail system.

    Implementations MUST:
        - Raise exceptions on infrastructure failures (network, broker, DB).
        - Return a job/task ID (str) when the backend provides one.
        - Return None for fire-and-forget transports with no tracking ID.
    """

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """
        Deliver or enqueue the notification.

        Args:
            task_name: Worker function name to execute.
                       May be unused by adapters that deliver synchronously.
            payload: Serialized ``NotificationPayloadDTO`` (via ``.model_dump(mode="json")``).

        Returns:
            str: Job/task identifier for tracking.
            None: When the transport is fire-and-forget.

        Raises:
            Exception: Infrastructure errors MUST propagate — never swallow them.
        """
        ...
Functions
enqueue(task_name, payload)

Deliver or enqueue the notification.

Parameters:

Name Type Description Default
task_name str

Worker function name to execute. May be unused by adapters that deliver synchronously.

required
payload dict[str, Any]

Serialized NotificationPayloadDTO (via .model_dump(mode="json")).

required

Returns:

Name Type Description
str str | None

Job/task identifier for tracking.

None str | None

When the transport is fire-and-forget.

Raises:

Type Description
Exception

Infrastructure errors MUST propagate — never swallow them.

Source code in src/codex_platform/notifications/delivery/base.py
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """
    Deliver or enqueue the notification.

    Args:
        task_name: Worker function name to execute.
                   May be unused by adapters that deliver synchronously.
        payload: Serialized ``NotificationPayloadDTO`` (via ``.model_dump(mode="json")``).

    Returns:
        str: Job/task identifier for tracking.
        None: When the transport is fire-and-forget.

    Raises:
        Exception: Infrastructure errors MUST propagate — never swallow them.
    """
    ...

NotificationPayloadDTO

Bases: BaseDTO

Base notification payload — identification and routing only.

Do not use directly. Use TemplateNotificationDTO or RenderedNotificationDTO.

Source code in src/codex_platform/notifications/dto.py
class NotificationPayloadDTO(BaseDTO):
    """
    Base notification payload — identification and routing only.

    Do not use directly. Use TemplateNotificationDTO or RenderedNotificationDTO.
    """

    notification_id: str
    recipient: NotificationRecipient
    channels: list[NotificationChannel] = [NotificationChannel.EMAIL]
    event_type: str | None = None
    subject: str | None = None

NotificationRecipient

Bases: BaseDTO

Recipient info. PII fields auto-masked in repr via BaseDTO.

Source code in src/codex_platform/notifications/dto.py
class NotificationRecipient(BaseDTO):
    """Recipient info. PII fields auto-masked in __repr__ via BaseDTO."""

    email: str | None = None
    phone: str | None = None

RenderedNotificationDTO

Bases: NotificationPayloadDTO

Mode 2 — Pre-rendered HTML passed directly to the workers.

Use when Django (or another layer) renders the template and the workers only needs to deliver.

Source code in src/codex_platform/notifications/dto.py
class RenderedNotificationDTO(NotificationPayloadDTO):
    """
    Mode 2 — Pre-rendered HTML passed directly to the workers.

    Use when Django (or another layer) renders the template
    and the workers only needs to deliver.
    """

    html_content: str
    text_content: str | None = None

TemplateNotificationDTO

Bases: NotificationPayloadDTO

Mode 1 — Worker renders the template itself (requires Jinja2).

Redis key where context_data is stored (JSON).

Allows updating data after enqueue (e.g. reschedule).

template_name: Relative template path (e.g. 'booking/bk_confirmation.html').

Source code in src/codex_platform/notifications/dto.py
class TemplateNotificationDTO(NotificationPayloadDTO):
    """
    Mode 1 — Worker renders the template itself (requires Jinja2).

    context_key: Redis key where context_data is stored (JSON).
                 Allows updating data after enqueue (e.g. reschedule).
    template_name: Relative template path (e.g. 'booking/bk_confirmation.html').
    """

    template_name: str
    context_key: str

ContentCacheAdapter

Bases: Protocol

Adapter for caching email/notification content (used by BaseEmailContentSelector).

Source code in src/codex_platform/notifications/interfaces.py
class ContentCacheAdapter(Protocol):
    """Adapter for caching email/notification content (used by BaseEmailContentSelector)."""

    def get_cached_value(self, key: str) -> str | None:
        """Return cached string value or None if not found."""
        ...

    def set_cached_value(self, key: str, value: str, timeout: int) -> None:
        """Store value in cache with given timeout (seconds)."""
        ...
Functions
get_cached_value(key)

Return cached string value or None if not found.

Source code in src/codex_platform/notifications/interfaces.py
def get_cached_value(self, key: str) -> str | None:
    """Return cached string value or None if not found."""
    ...
set_cached_value(key, value, timeout)

Store value in cache with given timeout (seconds).

Source code in src/codex_platform/notifications/interfaces.py
def set_cached_value(self, key: str, value: str, timeout: int) -> None:
    """Store value in cache with given timeout (seconds)."""
    ...

ContentProvider

Bases: Protocol

Provides translated template text by key.

Source code in src/codex_platform/notifications/interfaces.py
class ContentProvider(Protocol):
    """Provides translated template text by key."""

    def get_text(self, key: str) -> str | None:
        """Return translated text or None if not found."""
        ...
Functions
get_text(key)

Return translated text or None if not found.

Source code in src/codex_platform/notifications/interfaces.py
def get_text(self, key: str) -> str | None:
    """Return translated text or None if not found."""
    ...

BaseDeliveryOrchestrator

Tries channels in order; stops on first success.

Channels are injected at construction time via Dependency Injection. deliver() is a pure send operation with no side effects beyond delivery.

Source code in src/codex_platform/notifications/orchestrator.py
class BaseDeliveryOrchestrator:
    """
    Tries channels in order; stops on first success.

    Channels are injected at construction time via Dependency Injection.
    ``deliver()`` is a pure send operation with no side effects beyond delivery.
    """

    def __init__(self, channels: list[DeliveryChannel]) -> None:
        """
        Args:
            channels: Ordered list of delivery channels to try.
                      Use ``ChannelRegistry.build_channels()`` to build this list.
        """
        self.channels = channels

    async def deliver(self, payload: "NotificationPayloadDTO") -> bool:
        """
        Deliver a notification payload through available channels.

        Tries each channel in order; stops on first success.
        If a channel raises, logs the exception and falls through to the next.

        Args:
            payload: ``NotificationPayloadDTO`` with ready-made html/text content.

        Returns:
            True if at least one channel succeeded, False if all exhausted.
        """
        to = payload.recipient.email or payload.recipient.phone or ""
        subject = payload.subject or ""

        if not to:
            log.error(
                "DeliveryOrchestrator | no recipient address, notification_id=%s",
                payload.notification_id,
            )
            return False

        for channel in self.channels:
            if not channel.is_available():
                continue
            try:
                if await channel.send(
                    to=to,
                    subject=subject,
                    html_content=getattr(payload, "html_content", None),
                    text_content=getattr(payload, "text_content", None),
                ):
                    log.info(
                        "DeliveryOrchestrator | delivered via %s, notification_id=%s",
                        type(channel).__name__,
                        payload.notification_id,
                    )
                    return True
            except Exception:
                log.exception(
                    "DeliveryOrchestrator | channel=%s failed, trying next",
                    type(channel).__name__,
                )

        log.error(
            "DeliveryOrchestrator | all channels exhausted, notification_id=%s",
            payload.notification_id,
        )
        return False
Functions
__init__(channels)

Parameters:

Name Type Description Default
channels list[DeliveryChannel]

Ordered list of delivery channels to try. Use ChannelRegistry.build_channels() to build this list.

required
Source code in src/codex_platform/notifications/orchestrator.py
def __init__(self, channels: list[DeliveryChannel]) -> None:
    """
    Args:
        channels: Ordered list of delivery channels to try.
                  Use ``ChannelRegistry.build_channels()`` to build this list.
    """
    self.channels = channels
deliver(payload) async

Deliver a notification payload through available channels.

Tries each channel in order; stops on first success. If a channel raises, logs the exception and falls through to the next.

Parameters:

Name Type Description Default
payload NotificationPayloadDTO

NotificationPayloadDTO with ready-made html/text content.

required

Returns:

Type Description
bool

True if at least one channel succeeded, False if all exhausted.

Source code in src/codex_platform/notifications/orchestrator.py
async def deliver(self, payload: "NotificationPayloadDTO") -> bool:
    """
    Deliver a notification payload through available channels.

    Tries each channel in order; stops on first success.
    If a channel raises, logs the exception and falls through to the next.

    Args:
        payload: ``NotificationPayloadDTO`` with ready-made html/text content.

    Returns:
        True if at least one channel succeeded, False if all exhausted.
    """
    to = payload.recipient.email or payload.recipient.phone or ""
    subject = payload.subject or ""

    if not to:
        log.error(
            "DeliveryOrchestrator | no recipient address, notification_id=%s",
            payload.notification_id,
        )
        return False

    for channel in self.channels:
        if not channel.is_available():
            continue
        try:
            if await channel.send(
                to=to,
                subject=subject,
                html_content=getattr(payload, "html_content", None),
                text_content=getattr(payload, "text_content", None),
            ):
                log.info(
                    "DeliveryOrchestrator | delivered via %s, notification_id=%s",
                    type(channel).__name__,
                    payload.notification_id,
                )
                return True
        except Exception:
            log.exception(
                "DeliveryOrchestrator | channel=%s failed, trying next",
                type(channel).__name__,
            )

    log.error(
        "DeliveryOrchestrator | all channels exhausted, notification_id=%s",
        payload.notification_id,
    )
    return False

DeliveryChannel

Bases: Protocol

Contract for a single delivery method.

Implementors: AsyncEmailClient, DjangoMailChannel, TelegramChannel, etc.

Source code in src/codex_platform/notifications/orchestrator.py
@runtime_checkable
class DeliveryChannel(Protocol):
    """
    Contract for a single delivery method.

    Implementors: AsyncEmailClient, DjangoMailChannel, TelegramChannel, etc.
    """

    async def send(
        self,
        to: str,
        subject: str,
        html_content: str | None,
        text_content: str | None,
    ) -> bool:
        """
        Attempt delivery. Return True on success, False on logical failure.
        Raise on infrastructure errors (so the orchestrator can log and try next channel).
        """
        ...

    def is_available(self) -> bool:
        """Return True if this channel is properly configured and ready."""
        ...
Functions
send(to, subject, html_content, text_content) async

Attempt delivery. Return True on success, False on logical failure. Raise on infrastructure errors (so the orchestrator can log and try next channel).

Source code in src/codex_platform/notifications/orchestrator.py
async def send(
    self,
    to: str,
    subject: str,
    html_content: str | None,
    text_content: str | None,
) -> bool:
    """
    Attempt delivery. Return True on success, False on logical failure.
    Raise on infrastructure errors (so the orchestrator can log and try next channel).
    """
    ...
is_available()

Return True if this channel is properly configured and ready.

Source code in src/codex_platform/notifications/orchestrator.py
def is_available(self) -> bool:
    """Return True if this channel is properly configured and ready."""
    ...

ChannelRegistry

Registry for delivery channels. Channels register with a factory function that returns a channel or None. build_channels() creates only the channels whose config is available.

Source code in src/codex_platform/notifications/registry.py
class ChannelRegistry:
    """
    Registry for delivery channels.
    Channels register with a factory function that returns a channel or None.
    build_channels() creates only the channels whose config is available.
    """

    def __init__(self) -> None:
        self._factories: list[tuple[str, Callable[[Any], DeliveryChannel | None]]] = []

    def register(
        self,
        name: str,
        factory: Callable[[Any], DeliveryChannel | None],
    ) -> None:
        """
        Register a channel factory.

        Args:
            name: Human-readable channel name (for logging).
            factory: Callable that takes config and returns DeliveryChannel or None.
                     Return None if the channel's config is missing/incomplete.
        """
        self._factories.append((name, factory))

    def build_channels(self, config: Any) -> list[DeliveryChannel]:
        """Build the list of available channels from all registered factories.

        Calls each factory with ``config``. A channel is included only when the
        factory returns a non-``None`` instance that also reports ``is_available() == True``.

        Args:
            config: Application settings object passed verbatim to every factory.

        Returns:
            Ordered list of ready-to-use :class:`DeliveryChannel` instances.
        """
        channels: list[DeliveryChannel] = []
        for name, factory in self._factories:
            try:
                channel = factory(config)
                if channel is not None and channel.is_available():
                    channels.append(channel)
                    log.info("ChannelRegistry | %s enabled", name)
                else:
                    log.debug("ChannelRegistry | %s skipped (not configured)", name)
            except Exception:
                log.exception("ChannelRegistry | %s factory failed", name)
        return channels
Functions
register(name, factory)

Register a channel factory.

Parameters:

Name Type Description Default
name str

Human-readable channel name (for logging).

required
factory Callable[[Any], DeliveryChannel | None]

Callable that takes config and returns DeliveryChannel or None. Return None if the channel's config is missing/incomplete.

required
Source code in src/codex_platform/notifications/registry.py
def register(
    self,
    name: str,
    factory: Callable[[Any], DeliveryChannel | None],
) -> None:
    """
    Register a channel factory.

    Args:
        name: Human-readable channel name (for logging).
        factory: Callable that takes config and returns DeliveryChannel or None.
                 Return None if the channel's config is missing/incomplete.
    """
    self._factories.append((name, factory))
build_channels(config)

Build the list of available channels from all registered factories.

Calls each factory with config. A channel is included only when the factory returns a non-None instance that also reports is_available() == True.

Parameters:

Name Type Description Default
config Any

Application settings object passed verbatim to every factory.

required

Returns:

Type Description
list[DeliveryChannel]

Ordered list of ready-to-use :class:DeliveryChannel instances.

Source code in src/codex_platform/notifications/registry.py
def build_channels(self, config: Any) -> list[DeliveryChannel]:
    """Build the list of available channels from all registered factories.

    Calls each factory with ``config``. A channel is included only when the
    factory returns a non-``None`` instance that also reports ``is_available() == True``.

    Args:
        config: Application settings object passed verbatim to every factory.

    Returns:
        Ordered list of ready-to-use :class:`DeliveryChannel` instances.
    """
    channels: list[DeliveryChannel] = []
    for name, factory in self._factories:
        try:
            channel = factory(config)
            if channel is not None and channel.is_available():
                channels.append(channel)
                log.info("ChannelRegistry | %s enabled", name)
            else:
                log.debug("ChannelRegistry | %s skipped (not configured)", name)
        except Exception:
            log.exception("ChannelRegistry | %s factory failed", name)
    return channels