Skip to content

Notifications Internal Modules

These pages expose the implementation pieces behind notification delivery: selectors, queue adapters, rendering modes, and payload construction.

Cache adapter

codex_django.notifications.adapters.cache_adapter

DjangoCacheAdapter

ContentCacheAdapter implementation using BaseDjangoRedisManager.

A thin infrastructure shim — the caching strategy (key naming, TTL, invalidation) lives in BaseEmailContentSelector, not here.

Usage::

cache_adapter = DjangoCacheAdapter()
value = cache_adapter.get("my:cache:key")
cache_adapter.set("my:cache:key", "value", timeout=3600)

Classes

DjangoCacheAdapter

Notification content cache adapter backed by Redis.

Source code in src/codex_django/notifications/adapters/cache_adapter.py
21
22
23
24
25
26
27
28
29
30
31
32
class DjangoCacheAdapter:
    """Notification content cache adapter backed by Redis."""

    def get(self, key: str) -> str | None:
        """Return a cached content value by key."""
        manager = get_notifications_cache_manager()
        return manager.get(key)

    def set(self, key: str, value: str, timeout: int) -> None:
        """Store a cached content value with a TTL."""
        manager = get_notifications_cache_manager()
        manager.set(key, value, timeout=timeout)
Functions
get(key)

Return a cached content value by key.

Source code in src/codex_django/notifications/adapters/cache_adapter.py
24
25
26
27
def get(self, key: str) -> str | None:
    """Return a cached content value by key."""
    manager = get_notifications_cache_manager()
    return manager.get(key)
set(key, value, timeout)

Store a cached content value with a TTL.

Source code in src/codex_django/notifications/adapters/cache_adapter.py
29
30
31
32
def set(self, key: str, value: str, timeout: int) -> None:
    """Store a cached content value with a TTL."""
    manager = get_notifications_cache_manager()
    manager.set(key, value, timeout=timeout)

Functions

Direct adapter

codex_django.notifications.adapters.direct_adapter

DjangoDirectAdapter

Inline (no-worker) notification delivery via Django's send_mail().

Useful for: - Development environments without Redis/ARQ - Simple use cases where async delivery is not required - Fallback when the queue is unavailable

Supports both payload modes: - Mode 1 (template): requires a TemplateRenderer from codex_platform. Renders Jinja2 identically to the production worker — dev == prod. - Mode 2 (rendered): sends html_content directly without rendering.

Usage::

# Mode 2 only (no renderer needed):
adapter = DjangoDirectAdapter()

# Mode 1 + Mode 2:
from codex_platform.notifications.renderer import TemplateRenderer
renderer = TemplateRenderer(templates_dir="path/to/templates")
adapter = DjangoDirectAdapter(renderer=renderer)

# Works the same as DjangoQueueAdapter:
adapter.enqueue("send_notification_task", payload=data)

Classes

DjangoDirectAdapter

Deliver notifications inline via Django send_mail().

Notes

The adapter implements the same enqueue interface as the queue-based adapter, which makes it useful for development, testing, and fallback delivery paths.

Source code in src/codex_django/notifications/adapters/direct_adapter.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class DjangoDirectAdapter:
    """Deliver notifications inline via Django ``send_mail()``.

    Notes:
        The adapter implements the same enqueue interface as the queue-based
        adapter, which makes it useful for development, testing, and fallback
        delivery paths.
    """

    def __init__(
        self,
        renderer: Any = None,
        use_on_commit: bool = True,
    ) -> None:
        self._renderer = renderer
        self._use_on_commit = use_on_commit

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Synchronously deliver or schedule delivery of a notification payload."""
        if self._use_on_commit:
            from django.db import transaction

            transaction.on_commit(lambda: self._send(payload))
            return None
        return self._send(payload)

    async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Asynchronously deliver a notification payload in a worker thread."""
        import asyncio

        await asyncio.to_thread(self._send, payload)
        return None

    # ------------------------------------------------------------------
    # Internal
    # ------------------------------------------------------------------

    def _send(self, payload: dict[str, Any]) -> str | None:
        """Send a rendered notification payload through Django mail."""
        mode = payload.get("mode", "rendered")

        if mode == "template":
            html_content, text_content = self._render(payload)
        else:
            html_content = payload.get("html_content", "")
            text_content = payload.get("text_content", "")

        recipient = payload.get("recipient_email", "")
        subject = payload.get("subject", "")

        if not recipient:
            log.warning("DjangoDirectAdapter: recipient_email is empty, skipping send")
            return None

        from django.core.mail import send_mail

        from_email = self._resolve_from_email()

        send_mail(
            subject=subject,
            message=text_content,
            from_email=from_email,
            recipient_list=[recipient],
            html_message=html_content or None,
            fail_silently=False,
        )
        log.info("DjangoDirectAdapter: sent email to %s (subject=%r)", recipient, subject)
        return None

    def _resolve_from_email(self) -> str:
        """Resolve From address from SiteSettings (identity mixin) with fallback.

        Priority:
            1. ``SiteSettings.email_from`` (+ ``email_sender_name`` for display)
               resolved via ``CODEX_SITE_SETTINGS_MODEL`` and the Redis-synced
               site-settings manager.
            2. ``settings.DEFAULT_FROM_EMAIL``.
            3. ``"noreply@example.com"``.
        """
        from django.conf import settings

        fallback = getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@example.com")
        model_path = getattr(settings, "CODEX_SITE_SETTINGS_MODEL", None)
        if not model_path:
            return fallback
        try:
            from django.apps import apps

            from codex_django.core.redis.managers.settings import (
                get_site_settings_manager,
            )

            model_cls = apps.get_model(model_path)
            data = get_site_settings_manager().load_cached(model_cls) or {}
        except Exception:  # noqa: BLE001 — site settings are optional
            return fallback

        email = (data.get("email_from") or "").strip()
        if not email:
            return fallback
        name = (data.get("email_sender_name") or "").strip()
        return f"{name} <{email}>" if name else email

    def _render(self, payload: dict[str, Any]) -> tuple[str, str]:
        """Render a template-mode payload using the configured renderer."""
        if self._renderer is None:
            raise ValueError(
                "DjangoDirectAdapter: 'renderer' is required for Mode 1 payloads "
                "(mode='template'). Pass a TemplateRenderer instance or use Mode 2 "
                "(build_rendered) to avoid rendering."
            )
        template_name: str = payload["template_name"]
        context_data: dict[str, Any] = payload.get("context_data", {})
        html_content: str = self._renderer.render(template_name, context_data)
        text_content: str = payload.get("text_content", "")
        return html_content, text_content
Functions
enqueue(task_name, payload)

Synchronously deliver or schedule delivery of a notification payload.

Source code in src/codex_django/notifications/adapters/direct_adapter.py
55
56
57
58
59
60
61
62
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Synchronously deliver or schedule delivery of a notification payload."""
    if self._use_on_commit:
        from django.db import transaction

        transaction.on_commit(lambda: self._send(payload))
        return None
    return self._send(payload)
aenqueue(task_name, payload) async

Asynchronously deliver a notification payload in a worker thread.

Source code in src/codex_django/notifications/adapters/direct_adapter.py
64
65
66
67
68
69
async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Asynchronously deliver a notification payload in a worker thread."""
    import asyncio

    await asyncio.to_thread(self._send, payload)
    return None

i18n adapter

codex_django.notifications.adapters.i18n_adapter

DjangoI18nAdapter

Provides language override context manager for notification content lookup.

Usage::

adapter = DjangoI18nAdapter()
with adapter.translation_override("de"):
    subject = EmailContent.objects.get(key="booking_subject").text

Classes

DjangoI18nAdapter

Wraps Django's translation.override() for notification language switching.

Source code in src/codex_django/notifications/adapters/i18n_adapter.py
18
19
20
21
22
23
24
class DjangoI18nAdapter:
    """Wraps Django's translation.override() for notification language switching."""

    def translation_override(self, language: str) -> AbstractContextManager[None]:
        from django.utils.translation import override

        return override(language)

Queue adapter

codex_django.notifications.adapters.queue_adapter

DjangoQueueAdapter

NotificationAdapter implementation that enqueues tasks via DjangoArqClient.

Wraps enqueue() in transaction.on_commit() by default so the ARQ job is only dispatched after the DB transaction commits successfully.

Usage::

adapter = DjangoQueueAdapter(arq_client=DjangoArqClient())

# From a sync view (WSGI):
adapter.enqueue("send_notification_task", payload=data)

# From an async view (ASGI):
await adapter.aenqueue("send_notification_task", payload=data)

Classes

DjangoQueueAdapter

Queue-backed notification adapter for ARQ-based delivery.

Source code in src/codex_django/notifications/adapters/queue_adapter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class DjangoQueueAdapter:
    """Queue-backed notification adapter for ARQ-based delivery."""

    def __init__(self, arq_client: DjangoArqClient, use_on_commit: bool = True) -> None:
        self._arq = arq_client
        self._use_on_commit = use_on_commit

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Synchronously enqueue a notification payload.

        Returns ``None`` when ``use_on_commit=True`` because the job is not
        enqueued until the transaction commits. Otherwise returns the ARQ job id.
        """
        if self._use_on_commit:
            from django.db import transaction

            transaction.on_commit(lambda: self._arq.enqueue(task_name, payload=payload))
            return None
        return self._arq.enqueue(task_name, payload=payload)

    async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Asynchronously enqueue a notification payload without ``on_commit``."""
        return await self._arq.aenqueue(task_name, payload=payload)
Functions
enqueue(task_name, payload)

Synchronously enqueue a notification payload.

Returns None when use_on_commit=True because the job is not enqueued until the transaction commits. Otherwise returns the ARQ job id.

Source code in src/codex_django/notifications/adapters/queue_adapter.py
35
36
37
38
39
40
41
42
43
44
45
46
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Synchronously enqueue a notification payload.

    Returns ``None`` when ``use_on_commit=True`` because the job is not
    enqueued until the transaction commits. Otherwise returns the ARQ job id.
    """
    if self._use_on_commit:
        from django.db import transaction

        transaction.on_commit(lambda: self._arq.enqueue(task_name, payload=payload))
        return None
    return self._arq.enqueue(task_name, payload=payload)
aenqueue(task_name, payload) async

Asynchronously enqueue a notification payload without on_commit.

Source code in src/codex_django/notifications/adapters/queue_adapter.py
48
49
50
async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Asynchronously enqueue a notification payload without ``on_commit``."""
    return await self._arq.aenqueue(task_name, payload=payload)

ARQ client adapter

codex_django.notifications.adapters.arq_client

Django-facing ARQ adapter built on top of codex-platform delivery primitives.

Classes

DjangoArqClient

Thin Django-friendly wrapper around codex-platform's ARQ adapter.

Source code in src/codex_django/notifications/adapters/arq_client.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DjangoArqClient:
    """Thin Django-friendly wrapper around codex-platform's ARQ adapter."""

    def __init__(
        self,
        *,
        adapter: _ArqAdapterProtocol | None = None,
        pool: Any | None = None,
        redis_settings: Any | None = None,
    ) -> None:
        self._pool: Any | None = pool
        self._adapter: _ArqAdapterProtocol | None = adapter
        self._redis_settings: Any | None = redis_settings

    @staticmethod
    def build_redis_settings_from_django() -> Any:
        """Build ARQ RedisSettings from Django settings as a convenience fallback."""
        from arq.connections import RedisSettings

        redis_url = getattr(settings, "ARQ_REDIS_URL", None) or getattr(settings, "REDIS_URL", None)
        if redis_url:
            parsed = urlparse(str(redis_url))
            database = int(parsed.path.lstrip("/") or "0")
            return RedisSettings(
                host=parsed.hostname or "localhost",
                port=parsed.port or 6379,
                username=parsed.username,
                password=parsed.password,
                database=database,
                ssl=parsed.scheme == "rediss",
            )

        return RedisSettings(
            host=getattr(settings, "REDIS_HOST", "localhost"),
            port=int(getattr(settings, "REDIS_PORT", 6379)),
            password=getattr(settings, "REDIS_PASSWORD", None),
            database=int(getattr(settings, "REDIS_DB", 0)),
        )

    async def _get_adapter(self) -> _ArqAdapterProtocol:
        """Lazily create the underlying codex-platform ARQ adapter."""
        if self._adapter is None:
            from arq.connections import create_pool
            from codex_platform.notifications.delivery import ArqNotificationAdapter

            if self._pool is None:
                redis_settings = self._redis_settings or self.build_redis_settings_from_django()
                self._pool = await create_pool(redis_settings)
            self._adapter = cast(_ArqAdapterProtocol, ArqNotificationAdapter(self._pool))
        return self._adapter

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Enqueue a task synchronously via the platform ARQ adapter."""
        adapter = async_to_sync(self._get_adapter)()
        return adapter.enqueue(task_name, payload)

    async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Enqueue a task asynchronously via the platform ARQ adapter."""
        adapter = await self._get_adapter()
        return await adapter.enqueue_async(task_name, payload)
Functions
build_redis_settings_from_django() staticmethod

Build ARQ RedisSettings from Django settings as a convenience fallback.

Source code in src/codex_django/notifications/adapters/arq_client.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@staticmethod
def build_redis_settings_from_django() -> Any:
    """Build ARQ RedisSettings from Django settings as a convenience fallback."""
    from arq.connections import RedisSettings

    redis_url = getattr(settings, "ARQ_REDIS_URL", None) or getattr(settings, "REDIS_URL", None)
    if redis_url:
        parsed = urlparse(str(redis_url))
        database = int(parsed.path.lstrip("/") or "0")
        return RedisSettings(
            host=parsed.hostname or "localhost",
            port=parsed.port or 6379,
            username=parsed.username,
            password=parsed.password,
            database=database,
            ssl=parsed.scheme == "rediss",
        )

    return RedisSettings(
        host=getattr(settings, "REDIS_HOST", "localhost"),
        port=int(getattr(settings, "REDIS_PORT", 6379)),
        password=getattr(settings, "REDIS_PASSWORD", None),
        database=int(getattr(settings, "REDIS_DB", 0)),
    )
enqueue(task_name, payload)

Enqueue a task synchronously via the platform ARQ adapter.

Source code in src/codex_django/notifications/adapters/arq_client.py
71
72
73
74
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Enqueue a task synchronously via the platform ARQ adapter."""
    adapter = async_to_sync(self._get_adapter)()
    return adapter.enqueue(task_name, payload)
aenqueue(task_name, payload) async

Enqueue a task asynchronously via the platform ARQ adapter.

Source code in src/codex_django/notifications/adapters/arq_client.py
76
77
78
79
async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Enqueue a task asynchronously via the platform ARQ adapter."""
    adapter = await self._get_adapter()
    return await adapter.enqueue_async(task_name, payload)