Skip to content

notifications

Deprecated compatibility path. New code should import from codex_platform.messaging.

notifications

Deprecated compatibility imports for :mod:codex_platform.messaging.

Classes

AudienceBuilder

Bases: Protocol

Materializes campaign recipients from a host-defined filter.

Source code in src/codex_platform/messaging/audience.py
class AudienceBuilder(Protocol):
    """Materializes campaign recipients from a host-defined filter."""

    def count(self, audience_filter: dict[str, Any]) -> int:
        """Return the number of recipients matching the filter."""
        ...

    def materialize(self, audience_filter: dict[str, Any]) -> Iterable[CampaignRecipientDraft]:
        """Stream recipient drafts matching the filter."""
        ...
Functions
count(audience_filter)

Return the number of recipients matching the filter.

Source code in src/codex_platform/messaging/audience.py
def count(self, audience_filter: dict[str, Any]) -> int:
    """Return the number of recipients matching the filter."""
    ...
materialize(audience_filter)

Stream recipient drafts matching the filter.

Source code in src/codex_platform/messaging/audience.py
def materialize(self, audience_filter: dict[str, Any]) -> Iterable[CampaignRecipientDraft]:
    """Stream recipient drafts matching the filter."""
    ...

CampaignBatchDTO

Bases: BaseDTO

One campaign batch payload for the worker callback contract.

Source code in src/codex_platform/messaging/campaigns/dto.py
class CampaignBatchDTO(BaseDTO):
    """One campaign batch payload for the worker callback contract."""

    schema_version: int = PAYLOAD_SCHEMA_VERSION
    campaign_id: str
    template_name: str | None = None
    html_content: str | None = None
    subject: str
    recipients: list[CampaignRecipientDraft]
    base_context: dict[str, Any] = {}
    callback_url: str
    callback_token: str

    @model_validator(mode="after")
    def validate_rendering_mode(self) -> CampaignBatchDTO:
        """Require exactly one campaign rendering mode."""

        if bool(self.template_name) == bool(self.html_content):
            raise ValueError("exactly one of template_name or html_content must be set")
        return self
Functions
validate_rendering_mode()

Require exactly one campaign rendering mode.

Source code in src/codex_platform/messaging/campaigns/dto.py
@model_validator(mode="after")
def validate_rendering_mode(self) -> CampaignBatchDTO:
    """Require exactly one campaign rendering mode."""

    if bool(self.template_name) == bool(self.html_content):
        raise ValueError("exactly one of template_name or html_content must be set")
    return self

CampaignDispatcher

Bases: Protocol

Enqueues campaign batches for worker-side delivery.

Source code in src/codex_platform/messaging/campaigns/protocols.py
class CampaignDispatcher(Protocol):
    """Enqueues campaign batches for worker-side delivery."""

    def enqueue_batch(self, batch: CampaignBatchDTO) -> str:
        """Enqueue a batch and return the backend job identifier."""
        ...
Functions
enqueue_batch(batch)

Enqueue a batch and return the backend job identifier.

Source code in src/codex_platform/messaging/campaigns/protocols.py
def enqueue_batch(self, batch: CampaignBatchDTO) -> str:
    """Enqueue a batch and return the backend job identifier."""
    ...

CampaignRecipientDraft

Bases: BaseDTO

Recipient data needed by a campaign batch worker.

Source code in src/codex_platform/messaging/campaigns/dto.py
class CampaignRecipientDraft(BaseDTO):
    """Recipient data needed by a campaign batch worker."""

    recipient_id: str
    email: str
    first_name: str = ""
    last_name: str = ""
    locale: str = "de"
    unsubscribe_token: str | None = None

NotificationChannel

Bases: StrEnum

Supported notification delivery channel identifiers.

Source code in src/codex_platform/messaging/channels.py
class NotificationChannel(StrEnum):
    """Supported notification delivery channel identifiers."""

    EMAIL = "email"
    TELEGRAM = "telegram"
    SMS = "sms"
    WHATSAPP = "whatsapp"

AsyncEmailClient

Async SMTP email sender.

Source code in src/codex_platform/messaging/clients/smtp.py
class AsyncEmailClient:
    """Async SMTP email sender."""

    def __init__(
        self,
        smtp_host: str,
        smtp_port: int,
        smtp_user: str | None = None,
        smtp_password: str | None = None,  # pragma: allowlist secret
        smtp_from_email: str | None = None,
        smtp_use_tls: bool = False,
        smtp_from_name: str = "",
    ) -> None:
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.smtp_user = smtp_user
        self.smtp_password = smtp_password
        self.smtp_from_email = smtp_from_email
        self.smtp_use_tls = smtp_use_tls
        self.smtp_from_name = smtp_from_name

    def is_available(self) -> bool:
        """Returns True if the client is configured with a non-localhost host."""

        return bool(self.smtp_host and self.smtp_host != "localhost")

    async def send(
        self,
        to: str,
        subject: str,
        html_content: str | None,
        text_content: str | None,
        headers: ThreadHeadersDTO | None = None,
        timeout: int = 15,
    ) -> bool:
        """Send email via SMTP."""

        if not _AIOSMTP_AVAILABLE:
            raise RuntimeError("aiosmtplib not installed. Run: pip install codex-platform[notifications]")
        await self._send_smtp(to, subject, html_content, text_content, headers, timeout)
        log.info("AsyncEmailClient | sent to='%s'", to)
        return True

    async def _send_smtp(
        self,
        to: str,
        subject: str,
        html_content: str | None,
        text_content: str | None,
        headers: ThreadHeadersDTO | None,
        timeout: int,
    ) -> None:
        message = EmailMessage()
        message["From"] = (
            formataddr((self.smtp_from_name, self.smtp_from_email))
            if self.smtp_from_name and self.smtp_from_email
            else self.smtp_from_email
        )
        message["To"] = to
        message["Subject"] = subject

        if headers is not None:
            for name, value in render_email_headers(headers).items():
                message[name] = value

        plain = text_content or "Please enable HTML to view this email."
        message.set_content(plain)

        if html_content:
            message.add_alternative(html_content, subtype="html")

        use_ssl = self.smtp_port == 465
        start_tls = self.smtp_port == 587 or (self.smtp_use_tls and self.smtp_port != 465)

        send_kwargs: dict[str, Any] = {
            "hostname": self.smtp_host,
            "port": self.smtp_port,
            "use_tls": use_ssl,
            "start_tls": start_tls,
            "timeout": timeout,
        }

        if self.smtp_user and self.smtp_password:
            send_kwargs["username"] = self.smtp_user
            send_kwargs["password"] = self.smtp_password

        await aiosmtplib.send(message, **send_kwargs)
Functions
is_available()

Returns True if the client is configured with a non-localhost host.

Source code in src/codex_platform/messaging/clients/smtp.py
def is_available(self) -> bool:
    """Returns True if the client is configured with a non-localhost host."""

    return bool(self.smtp_host and self.smtp_host != "localhost")
send(to, subject, html_content, text_content, headers=None, timeout=15) async

Send email via SMTP.

Source code in src/codex_platform/messaging/clients/smtp.py
async def send(
    self,
    to: str,
    subject: str,
    html_content: str | None,
    text_content: str | None,
    headers: ThreadHeadersDTO | None = None,
    timeout: int = 15,
) -> bool:
    """Send email via SMTP."""

    if not _AIOSMTP_AVAILABLE:
        raise RuntimeError("aiosmtplib not installed. Run: pip install codex-platform[notifications]")
    await self._send_smtp(to, subject, html_content, text_content, headers, timeout)
    log.info("AsyncEmailClient | sent to='%s'", to)
    return True

ArqNotificationAdapter

Bases: NotificationAdapter

Notification adapter that enqueues tasks into an ARQ/Redis queue.

Source code in src/codex_platform/messaging/delivery/arq.py
class ArqNotificationAdapter(NotificationAdapter):
    """Notification adapter that enqueues tasks into an ARQ/Redis queue."""

    def __init__(self, pool: ArqRedis) -> None:
        self.pool = pool

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Sync enqueue wrapper. Prefer ``enqueue_async`` in async contexts."""

        import asyncio

        return asyncio.get_event_loop().run_until_complete(self.enqueue_async(task_name, payload))

    async def enqueue_async(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Enqueue a notification task into the ARQ queue."""

        log.debug("ArqNotificationAdapter | enqueueing task=%s", task_name)
        job = await self.pool.enqueue_job(task_name, payload_dict=payload)
        job_id = getattr(job, "job_id", None)
        log.info("ArqNotificationAdapter | task=%s job_id=%s", task_name, job_id)
        return job_id
Functions
enqueue(task_name, payload)

Sync enqueue wrapper. Prefer enqueue_async in async contexts.

Source code in src/codex_platform/messaging/delivery/arq.py
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Sync enqueue wrapper. Prefer ``enqueue_async`` in async contexts."""

    import asyncio

    return asyncio.get_event_loop().run_until_complete(self.enqueue_async(task_name, payload))
enqueue_async(task_name, payload) async

Enqueue a notification task into the ARQ queue.

Source code in src/codex_platform/messaging/delivery/arq.py
async def enqueue_async(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Enqueue a notification task into the ARQ queue."""

    log.debug("ArqNotificationAdapter | enqueueing task=%s", task_name)
    job = await self.pool.enqueue_job(task_name, payload_dict=payload)
    job_id = getattr(job, "job_id", None)
    log.info("ArqNotificationAdapter | task=%s job_id=%s", task_name, job_id)
    return job_id

DirectNotificationAdapter

Bases: NotificationAdapter

Adapter for synchronous in-process notification delivery.

Source code in src/codex_platform/messaging/delivery/direct.py
class DirectNotificationAdapter(NotificationAdapter):
    """Adapter for synchronous in-process notification delivery."""

    def __init__(self, config: Any) -> None:
        self.config = config

    def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
        """Deliver a notification synchronously via the orchestrator pipeline."""

        from codex_platform.messaging.dto import (
            NotificationPayloadDTO,
            RenderedNotificationDTO,
            TemplateNotificationDTO,
        )
        from codex_platform.messaging.orchestrator import BaseDeliveryOrchestrator
        from codex_platform.messaging.registry import ChannelRegistry

        log.debug("DirectNotificationAdapter | starting direct delivery")

        registry = ChannelRegistry()
        _register_default_channels(registry, self.config)
        channels = registry.build_channels(self.config)
        orchestrator = BaseDeliveryOrchestrator(channels=channels)

        payload_dto: NotificationPayloadDTO
        if "html_content" in payload:
            payload_dto = RenderedNotificationDTO(**payload)
        elif "template_name" in payload and "context_key" in payload:
            payload_dto = TemplateNotificationDTO(**payload)
        else:
            payload_dto = NotificationPayloadDTO(**payload)
        asyncio.run(orchestrator.deliver(payload_dto))

        notification_id = payload.get("notification_id")
        log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
        return notification_id
Functions
enqueue(_task_name, payload)

Deliver a notification synchronously via the orchestrator pipeline.

Source code in src/codex_platform/messaging/delivery/direct.py
def enqueue(self, _task_name: str, payload: dict[str, Any]) -> str | None:
    """Deliver a notification synchronously via the orchestrator pipeline."""

    from codex_platform.messaging.dto import (
        NotificationPayloadDTO,
        RenderedNotificationDTO,
        TemplateNotificationDTO,
    )
    from codex_platform.messaging.orchestrator import BaseDeliveryOrchestrator
    from codex_platform.messaging.registry import ChannelRegistry

    log.debug("DirectNotificationAdapter | starting direct delivery")

    registry = ChannelRegistry()
    _register_default_channels(registry, self.config)
    channels = registry.build_channels(self.config)
    orchestrator = BaseDeliveryOrchestrator(channels=channels)

    payload_dto: NotificationPayloadDTO
    if "html_content" in payload:
        payload_dto = RenderedNotificationDTO(**payload)
    elif "template_name" in payload and "context_key" in payload:
        payload_dto = TemplateNotificationDTO(**payload)
    else:
        payload_dto = NotificationPayloadDTO(**payload)
    asyncio.run(orchestrator.deliver(payload_dto))

    notification_id = payload.get("notification_id")
    log.info("DirectNotificationAdapter | delivered notification_id=%s", notification_id)
    return notification_id

NotificationAdapter

Bases: Protocol

Contract for notification delivery transport.

Source code in src/codex_platform/messaging/delivery/base.py
class NotificationAdapter(Protocol):
    """Contract for notification delivery transport."""

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Deliver or enqueue a serialized notification payload."""
        ...
Functions
enqueue(task_name, payload)

Deliver or enqueue a serialized notification payload.

Source code in src/codex_platform/messaging/delivery/base.py
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Deliver or enqueue a serialized notification payload."""
    ...

NotificationPayloadDTO

Bases: BaseDTO

Base notification payload: identification and routing only.

Source code in src/codex_platform/messaging/dto.py
class NotificationPayloadDTO(BaseDTO):
    """Base notification payload: identification and routing only."""

    schema_version: int = PAYLOAD_SCHEMA_VERSION
    notification_id: str
    recipient: NotificationRecipient
    channels: list[NotificationChannel] = [NotificationChannel.EMAIL]
    event_type: str | None = None
    subject: str | None = None
    headers: ThreadHeadersDTO | None = None

NotificationRecipient

Bases: BaseDTO

Recipient info. PII fields auto-masked in repr via BaseDTO.

Source code in src/codex_platform/messaging/dto.py
class NotificationRecipient(BaseDTO):
    """Recipient info. PII fields auto-masked in ``repr`` via BaseDTO."""

    email: str | None = None
    phone: str | None = None

RenderedNotificationDTO

Bases: NotificationPayloadDTO

Mode 2: pre-rendered HTML passed directly to the worker.

Source code in src/codex_platform/messaging/dto.py
class RenderedNotificationDTO(NotificationPayloadDTO):
    """Mode 2: pre-rendered HTML passed directly to the worker."""

    html_content: str
    text_content: str | None = None

TemplateNotificationDTO

Bases: NotificationPayloadDTO

Mode 1: worker fetches context from Redis and renders the template.

Source code in src/codex_platform/messaging/dto.py
class TemplateNotificationDTO(NotificationPayloadDTO):
    """Mode 1: worker fetches context from Redis and renders the template."""

    template_name: str
    context_key: str

ThreadHeadersDTO

Bases: BaseDTO

RFC 5322 threading headers attached to outbound email notifications.

Source code in src/codex_platform/messaging/dto.py
class ThreadHeadersDTO(BaseDTO):
    """RFC 5322 threading headers attached to outbound email notifications."""

    message_id: str
    in_reply_to: str | None = None
    references: list[str] = []
    thread_key: str
    reply_match_token: str | None = None

ContentCacheAdapter

Bases: Protocol

Adapter for caching email/notification content.

Source code in src/codex_platform/messaging/interfaces.py
class ContentCacheAdapter(Protocol):
    """Adapter for caching email/notification content."""

    def get_cached_value(self, key: str) -> str | None:
        """Return cached string value or None if not found."""
        ...

    def set_cached_value(self, key: str, value: str, timeout: int) -> None:
        """Store value in cache with given timeout in seconds."""
        ...
Functions
get_cached_value(key)

Return cached string value or None if not found.

Source code in src/codex_platform/messaging/interfaces.py
def get_cached_value(self, key: str) -> str | None:
    """Return cached string value or None if not found."""
    ...
set_cached_value(key, value, timeout)

Store value in cache with given timeout in seconds.

Source code in src/codex_platform/messaging/interfaces.py
def set_cached_value(self, key: str, value: str, timeout: int) -> None:
    """Store value in cache with given timeout in seconds."""
    ...

ContentProvider

Bases: Protocol

Provides translated template text by key.

Source code in src/codex_platform/messaging/interfaces.py
class ContentProvider(Protocol):
    """Provides translated template text by key."""

    def get_text(self, key: str) -> str | None:
        """Return translated text or None if not found."""
        ...
Functions
get_text(key)

Return translated text or None if not found.

Source code in src/codex_platform/messaging/interfaces.py
def get_text(self, key: str) -> str | None:
    """Return translated text or None if not found."""
    ...

BaseDeliveryOrchestrator

Tries channels in order and stops on the first successful delivery.

Source code in src/codex_platform/messaging/orchestrator.py
class BaseDeliveryOrchestrator:
    """Tries channels in order and stops on the first successful delivery."""

    def __init__(self, channels: list[DeliveryChannel]) -> None:
        self.channels = channels

    async def deliver(self, payload: NotificationPayloadDTO) -> bool:
        """Deliver a notification payload through available channels."""

        to = payload.recipient.email or payload.recipient.phone or ""
        subject = payload.subject or ""

        if not to:
            log.error(
                "DeliveryOrchestrator | no recipient address, notification_id=%s",
                payload.notification_id,
            )
            return False

        for channel in self.channels:
            if not channel.is_available():
                continue
            try:
                html_content = getattr(payload, "html_content", None)
                text_content = getattr(payload, "text_content", None)
                headers = getattr(payload, "headers", None)
                send_parameters = signature(channel.send).parameters
                supports_headers = "headers" in send_parameters or any(
                    parameter.kind == Parameter.VAR_KEYWORD for parameter in send_parameters.values()
                )
                delivered = (
                    await channel.send(
                        to=to,
                        subject=subject,
                        html_content=html_content,
                        text_content=text_content,
                        headers=headers,
                    )
                    if supports_headers
                    else await channel.send(
                        to=to,
                        subject=subject,
                        html_content=html_content,
                        text_content=text_content,
                    )
                )

                if delivered:
                    log.info(
                        "DeliveryOrchestrator | delivered via %s, notification_id=%s",
                        type(channel).__name__,
                        payload.notification_id,
                    )
                    return True
            except Exception:
                log.exception(
                    "DeliveryOrchestrator | channel=%s failed, trying next",
                    type(channel).__name__,
                )

        log.error(
            "DeliveryOrchestrator | all channels exhausted, notification_id=%s",
            payload.notification_id,
        )
        return False
Functions
deliver(payload) async

Deliver a notification payload through available channels.

Source code in src/codex_platform/messaging/orchestrator.py
async def deliver(self, payload: NotificationPayloadDTO) -> bool:
    """Deliver a notification payload through available channels."""

    to = payload.recipient.email or payload.recipient.phone or ""
    subject = payload.subject or ""

    if not to:
        log.error(
            "DeliveryOrchestrator | no recipient address, notification_id=%s",
            payload.notification_id,
        )
        return False

    for channel in self.channels:
        if not channel.is_available():
            continue
        try:
            html_content = getattr(payload, "html_content", None)
            text_content = getattr(payload, "text_content", None)
            headers = getattr(payload, "headers", None)
            send_parameters = signature(channel.send).parameters
            supports_headers = "headers" in send_parameters or any(
                parameter.kind == Parameter.VAR_KEYWORD for parameter in send_parameters.values()
            )
            delivered = (
                await channel.send(
                    to=to,
                    subject=subject,
                    html_content=html_content,
                    text_content=text_content,
                    headers=headers,
                )
                if supports_headers
                else await channel.send(
                    to=to,
                    subject=subject,
                    html_content=html_content,
                    text_content=text_content,
                )
            )

            if delivered:
                log.info(
                    "DeliveryOrchestrator | delivered via %s, notification_id=%s",
                    type(channel).__name__,
                    payload.notification_id,
                )
                return True
        except Exception:
            log.exception(
                "DeliveryOrchestrator | channel=%s failed, trying next",
                type(channel).__name__,
            )

    log.error(
        "DeliveryOrchestrator | all channels exhausted, notification_id=%s",
        payload.notification_id,
    )
    return False

DeliveryChannel

Bases: Protocol

Contract for a single delivery method.

Source code in src/codex_platform/messaging/orchestrator.py
@runtime_checkable
class DeliveryChannel(Protocol):
    """Contract for a single delivery method."""

    async def send(
        self,
        to: str,
        subject: str,
        html_content: str | None,
        text_content: str | None,
        headers: ThreadHeadersDTO | None = None,
    ) -> bool:
        """Attempt delivery. Return True on success, False on logical failure."""
        ...

    def is_available(self) -> bool:
        """Return True if this channel is properly configured and ready."""
        ...
Functions
send(to, subject, html_content, text_content, headers=None) async

Attempt delivery. Return True on success, False on logical failure.

Source code in src/codex_platform/messaging/orchestrator.py
async def send(
    self,
    to: str,
    subject: str,
    html_content: str | None,
    text_content: str | None,
    headers: ThreadHeadersDTO | None = None,
) -> bool:
    """Attempt delivery. Return True on success, False on logical failure."""
    ...
is_available()

Return True if this channel is properly configured and ready.

Source code in src/codex_platform/messaging/orchestrator.py
def is_available(self) -> bool:
    """Return True if this channel is properly configured and ready."""
    ...

ChannelRegistry

Registry for config-driven delivery channel factories.

Source code in src/codex_platform/messaging/registry.py
class ChannelRegistry:
    """Registry for config-driven delivery channel factories."""

    def __init__(self) -> None:
        self._factories: list[tuple[str, Callable[[Any], DeliveryChannel | None]]] = []

    def register(
        self,
        name: str,
        factory: Callable[[Any], DeliveryChannel | None],
    ) -> None:
        """Register a channel factory."""

        if any(existing == name for existing, _factory in self._factories):
            log.warning("ChannelRegistry | duplicate registration for %s", name)
        self._factories.append((name, factory))

    def build_channels(self, config: Any) -> list[DeliveryChannel]:
        """Build available channels from registered factories."""

        channels: list[DeliveryChannel] = []
        for name, factory in self._factories:
            try:
                channel = factory(config)
                if channel is not None and channel.is_available():
                    channels.append(channel)
                    log.info("ChannelRegistry | %s enabled", name)
                else:
                    log.debug("ChannelRegistry | %s skipped (not configured)", name)
            except Exception:
                log.exception("ChannelRegistry | %s factory failed", name)
        return channels
Functions
register(name, factory)

Register a channel factory.

Source code in src/codex_platform/messaging/registry.py
def register(
    self,
    name: str,
    factory: Callable[[Any], DeliveryChannel | None],
) -> None:
    """Register a channel factory."""

    if any(existing == name for existing, _factory in self._factories):
        log.warning("ChannelRegistry | duplicate registration for %s", name)
    self._factories.append((name, factory))
build_channels(config)

Build available channels from registered factories.

Source code in src/codex_platform/messaging/registry.py
def build_channels(self, config: Any) -> list[DeliveryChannel]:
    """Build available channels from registered factories."""

    channels: list[DeliveryChannel] = []
    for name, factory in self._factories:
        try:
            channel = factory(config)
            if channel is not None and channel.is_available():
                channels.append(channel)
                log.info("ChannelRegistry | %s enabled", name)
            else:
                log.debug("ChannelRegistry | %s skipped (not configured)", name)
        except Exception:
            log.exception("ChannelRegistry | %s factory failed", name)
    return channels

TemplateRenderer

Jinja2 HTML template renderer.

Source code in src/codex_platform/messaging/renderer.py
class TemplateRenderer:
    """Jinja2 HTML template renderer."""

    def __init__(self, templates_dir: str) -> None:
        if not _JINJA2_AVAILABLE:
            raise RuntimeError(
                "jinja2 is not installed. "
                "Run: pip install codex-platform[notifications]\n"
                "If you are passing pre-rendered html_content to the worker, "
                "you do not need TemplateRenderer at all."
            )

        if not os.path.exists(templates_dir):
            raise FileNotFoundError(f"Templates directory not found: {templates_dir}")

        self.env = Environment(
            loader=FileSystemLoader(templates_dir),
            autoescape=select_autoescape(["html", "xml"]),
        )
        self.templates_dir = templates_dir
        log.info("TemplateRenderer | initialized with dir='%s'", templates_dir)

    def render(self, template_name: str, context: dict[str, Any]) -> str:
        """Render a template with the given context."""

        log.debug("TemplateRenderer | rendering template='%s'", template_name)
        template = self.env.get_template(template_name)
        return str(template.render(context))
Functions
render(template_name, context)

Render a template with the given context.

Source code in src/codex_platform/messaging/renderer.py
def render(self, template_name: str, context: dict[str, Any]) -> str:
    """Render a template with the given context."""

    log.debug("TemplateRenderer | rendering template='%s'", template_name)
    template = self.env.get_template(template_name)
    return str(template.render(context))

Functions

resolve_template_path(template_name, prefix_map=None)

Map a template key such as bk_confirmation to a template path.

Source code in src/codex_platform/messaging/renderer.py
def resolve_template_path(template_name: str, prefix_map: dict[str, str] | None = None) -> str:
    """Map a template key such as ``bk_confirmation`` to a template path."""

    if "/" in template_name or "\\" in template_name:
        return template_name
    if template_name.endswith(".html"):
        return template_name

    prefixes = DEFAULT_TEMPLATE_PREFIX_MAP if prefix_map is None else prefix_map
    for prefix, directory in prefixes.items():
        if template_name.startswith(prefix):
            return f"{directory}/{template_name}.html"
    return f"{template_name}.html"

build_message_id(*, domain)

Build a host-scoped RFC 5322 Message-ID.

Source code in src/codex_platform/messaging/threading.py
def build_message_id(*, domain: str) -> str:
    """Build a host-scoped RFC 5322 Message-ID."""

    normalized_domain = domain.strip().strip("@")
    if not normalized_domain:
        raise ValueError("domain must not be empty")
    return f"<{uuid.uuid4()}@{normalized_domain}>"

build_thread_key()

Build an opaque thread key safe to use in email headers and URLs.

Source code in src/codex_platform/messaging/threading.py
def build_thread_key() -> str:
    """Build an opaque thread key safe to use in email headers and URLs."""

    token = base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("ascii").rstrip("=")
    return f"tk_{token}"

parse_references(header_value)

Parse a References header into individual Message-ID values.

Source code in src/codex_platform/messaging/threading.py
def parse_references(header_value: str) -> list[str]:
    """Parse a References header into individual Message-ID values."""

    return [part for part in header_value.split() if part]

render_email_headers(dto)

Render thread DTO fields into outbound email headers.

Source code in src/codex_platform/messaging/threading.py
def render_email_headers(dto: ThreadHeadersDTO) -> dict[str, str]:
    """Render thread DTO fields into outbound email headers."""

    headers = {
        "Message-ID": dto.message_id,
        CODEX_THREAD_HEADER: dto.thread_key,
        LEGACY_LILY_THREAD_HEADER: dto.thread_key,
    }
    if dto.in_reply_to:
        headers["In-Reply-To"] = dto.in_reply_to
    if dto.references:
        headers["References"] = serialize_references(dto.references)
    return headers

serialize_references(message_ids)

Serialize Message-ID values into a References header.

Source code in src/codex_platform/messaging/threading.py
def serialize_references(message_ids: list[str]) -> str:
    """Serialize Message-ID values into a References header."""

    return " ".join(message_ids)