Skip to content

Notifications Internal Modules

These pages expose the implementation pieces behind notification delivery: selectors, queue adapters, rendering modes, and payload construction.

Cache adapter

codex_django.notifications.adapters.cache_adapter

DjangoCacheAdapter

ContentCacheAdapter implementation using BaseDjangoRedisManager.

A thin infrastructure shim — the caching strategy (key naming, TTL, invalidation) lives in BaseEmailContentSelector, not here.

Usage::

cache_adapter = DjangoCacheAdapter()
value = cache_adapter.get("my:cache:key")
cache_adapter.set("my:cache:key", "value", timeout=3600)

Classes

DjangoCacheAdapter

Notification content cache adapter backed by Redis.

Source code in src/codex_django/notifications/adapters/cache_adapter.py
21
22
23
24
25
26
27
28
29
30
31
32
class DjangoCacheAdapter:
    """Notification content cache adapter backed by Redis."""

    def get(self, key: str) -> str | None:
        """Return a cached content value by key."""
        manager = get_notifications_cache_manager()
        return manager.get(key)

    def set(self, key: str, value: str, timeout: int) -> None:
        """Store a cached content value with a TTL."""
        manager = get_notifications_cache_manager()
        manager.set(key, value, timeout=timeout)
Functions
get(key)

Return a cached content value by key.

Source code in src/codex_django/notifications/adapters/cache_adapter.py
24
25
26
27
def get(self, key: str) -> str | None:
    """Return a cached content value by key."""
    manager = get_notifications_cache_manager()
    return manager.get(key)
set(key, value, timeout)

Store a cached content value with a TTL.

Source code in src/codex_django/notifications/adapters/cache_adapter.py
29
30
31
32
def set(self, key: str, value: str, timeout: int) -> None:
    """Store a cached content value with a TTL."""
    manager = get_notifications_cache_manager()
    manager.set(key, value, timeout=timeout)

Functions

Direct adapter

codex_django.notifications.adapters.direct_adapter

DjangoDirectAdapter

Inline (no-worker) notification delivery via Django's send_mail().

Useful for: - Development environments without Redis/ARQ - Simple use cases where async delivery is not required - Fallback when the queue is unavailable

Supports both payload modes: - Mode 1 (template): requires a TemplateRenderer from codex_platform. Renders Jinja2 identically to the production worker — dev == prod. - Mode 2 (rendered): sends html_content directly without rendering.

Usage::

# Mode 2 only (no renderer needed):
adapter = DjangoDirectAdapter()

# Mode 1 + Mode 2:
from codex_platform.notifications.renderer import TemplateRenderer
renderer = TemplateRenderer(templates_dir="path/to/templates")
adapter = DjangoDirectAdapter(renderer=renderer)

# Works the same as DjangoQueueAdapter:
adapter.enqueue("send_notification_task", payload=data)

Classes

DjangoDirectAdapter

Deliver notifications inline via Django send_mail().

Notes

The adapter implements the same enqueue interface as the queue-based adapter, which makes it useful for development, testing, and fallback delivery paths.

Source code in src/codex_django/notifications/adapters/direct_adapter.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class DjangoDirectAdapter:
    """Deliver notifications inline via Django ``send_mail()``.

    Notes:
        The adapter implements the same enqueue interface as the queue-based
        adapter, which makes it useful for development, testing, and fallback
        delivery paths.
    """

    def __init__(
        self,
        renderer: Any = None,
        use_on_commit: bool = True,
    ) -> None:
        self._renderer = renderer
        self._use_on_commit = use_on_commit

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Synchronously deliver or schedule delivery of a notification payload."""
        if self._use_on_commit:
            from django.db import transaction

            transaction.on_commit(lambda: self._send(payload))
            return None
        return self._send(payload)

    async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Asynchronously deliver a notification payload in a worker thread."""
        import asyncio

        await asyncio.to_thread(self._send, payload)
        return None

    # ------------------------------------------------------------------
    # Internal
    # ------------------------------------------------------------------

    def _send(self, payload: dict[str, Any]) -> str | None:
        """Send a rendered notification payload through Django mail."""
        mode = payload.get("mode", "rendered")

        if mode == "template":
            html_content, text_content = self._render(payload)
        else:
            html_content = payload.get("html_content", "")
            text_content = payload.get("text_content", "")

        recipient = payload.get("recipient_email", "")
        subject = payload.get("subject", "")

        if not recipient:
            log.warning("DjangoDirectAdapter: recipient_email is empty, skipping send")
            return None

        from django.conf import settings
        from django.core.mail import send_mail

        from_email = getattr(settings, "DEFAULT_FROM_EMAIL", "noreply@example.com")

        send_mail(
            subject=subject,
            message=text_content,
            from_email=from_email,
            recipient_list=[recipient],
            html_message=html_content or None,
            fail_silently=False,
        )
        log.info("DjangoDirectAdapter: sent email to %s (subject=%r)", recipient, subject)
        return None

    def _render(self, payload: dict[str, Any]) -> tuple[str, str]:
        """Render a template-mode payload using the configured renderer."""
        if self._renderer is None:
            raise ValueError(
                "DjangoDirectAdapter: 'renderer' is required for Mode 1 payloads "
                "(mode='template'). Pass a TemplateRenderer instance or use Mode 2 "
                "(build_rendered) to avoid rendering."
            )
        template_name: str = payload["template_name"]
        context_data: dict[str, Any] = payload.get("context_data", {})
        html_content: str = self._renderer.render(template_name, context_data)
        text_content: str = payload.get("text_content", "")
        return html_content, text_content
Functions
enqueue(task_name, payload)

Synchronously deliver or schedule delivery of a notification payload.

Source code in src/codex_django/notifications/adapters/direct_adapter.py
55
56
57
58
59
60
61
62
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Synchronously deliver or schedule delivery of a notification payload."""
    if self._use_on_commit:
        from django.db import transaction

        transaction.on_commit(lambda: self._send(payload))
        return None
    return self._send(payload)
aenqueue(task_name, payload) async

Asynchronously deliver a notification payload in a worker thread.

Source code in src/codex_django/notifications/adapters/direct_adapter.py
64
65
66
67
68
69
async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Asynchronously deliver a notification payload in a worker thread."""
    import asyncio

    await asyncio.to_thread(self._send, payload)
    return None

i18n adapter

codex_django.notifications.adapters.i18n_adapter

DjangoI18nAdapter

Provides language override context manager for notification content lookup.

Usage::

adapter = DjangoI18nAdapter()
with adapter.translation_override("de"):
    subject = EmailContent.objects.get(key="booking_subject").text

Classes

DjangoI18nAdapter

Wraps Django's translation.override() for notification language switching.

Source code in src/codex_django/notifications/adapters/i18n_adapter.py
18
19
20
21
22
23
24
class DjangoI18nAdapter:
    """Wraps Django's translation.override() for notification language switching."""

    def translation_override(self, language: str) -> AbstractContextManager[None]:
        from django.utils.translation import override

        return override(language)

Queue adapter

codex_django.notifications.adapters.queue_adapter

DjangoQueueAdapter

NotificationAdapter implementation that enqueues tasks via DjangoArqClient.

Wraps enqueue() in transaction.on_commit() by default so the ARQ job is only dispatched after the DB transaction commits successfully.

Usage::

adapter = DjangoQueueAdapter(arq_client=DjangoArqClient())

# From a sync view (WSGI):
adapter.enqueue("send_notification_task", payload=data)

# From an async view (ASGI):
await adapter.aenqueue("send_notification_task", payload=data)

Classes

DjangoQueueAdapter

Queue-backed notification adapter for ARQ-based delivery.

Source code in src/codex_django/notifications/adapters/queue_adapter.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class DjangoQueueAdapter:
    """Queue-backed notification adapter for ARQ-based delivery."""

    def __init__(self, arq_client: DjangoArqClient, use_on_commit: bool = True) -> None:
        self._arq = arq_client
        self._use_on_commit = use_on_commit

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Synchronously enqueue a notification payload.

        Returns ``None`` when ``use_on_commit=True`` because the job is not
        enqueued until the transaction commits. Otherwise returns the ARQ job id.
        """
        if self._use_on_commit:
            from django.db import transaction

            transaction.on_commit(lambda: self._arq.enqueue(task_name, payload=payload))
            return None
        return self._arq.enqueue(task_name, payload=payload)

    async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Asynchronously enqueue a notification payload without ``on_commit``."""
        return await self._arq.aenqueue(task_name, payload=payload)
Functions
enqueue(task_name, payload)

Synchronously enqueue a notification payload.

Returns None when use_on_commit=True because the job is not enqueued until the transaction commits. Otherwise returns the ARQ job id.

Source code in src/codex_django/notifications/adapters/queue_adapter.py
35
36
37
38
39
40
41
42
43
44
45
46
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Synchronously enqueue a notification payload.

    Returns ``None`` when ``use_on_commit=True`` because the job is not
    enqueued until the transaction commits. Otherwise returns the ARQ job id.
    """
    if self._use_on_commit:
        from django.db import transaction

        transaction.on_commit(lambda: self._arq.enqueue(task_name, payload=payload))
        return None
    return self._arq.enqueue(task_name, payload=payload)
aenqueue(task_name, payload) async

Asynchronously enqueue a notification payload without on_commit.

Source code in src/codex_django/notifications/adapters/queue_adapter.py
48
49
50
async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Asynchronously enqueue a notification payload without ``on_commit``."""
    return await self._arq.aenqueue(task_name, payload=payload)

ARQ client adapter

codex_django.notifications.adapters.arq_client

Django-facing ARQ adapter built on top of codex-platform delivery primitives.

Classes

DjangoArqClient

Thin Django-friendly wrapper around codex-platform's ARQ adapter.

Source code in src/codex_django/notifications/adapters/arq_client.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class DjangoArqClient:
    """Thin Django-friendly wrapper around codex-platform's ARQ adapter."""

    def __init__(
        self,
        *,
        adapter: _ArqAdapterProtocol | None = None,
        pool: Any | None = None,
        redis_settings: Any | None = None,
    ) -> None:
        self._pool: Any | None = pool
        self._adapter: _ArqAdapterProtocol | None = adapter
        self._redis_settings: Any | None = redis_settings

    @staticmethod
    def build_redis_settings_from_django() -> Any:
        """Build ARQ RedisSettings from Django settings as a convenience fallback."""
        from arq.connections import RedisSettings

        redis_url = getattr(settings, "ARQ_REDIS_URL", None) or getattr(settings, "REDIS_URL", None)
        if redis_url:
            parsed = urlparse(str(redis_url))
            database = int(parsed.path.lstrip("/") or "0")
            return RedisSettings(
                host=parsed.hostname or "localhost",
                port=parsed.port or 6379,
                username=parsed.username,
                password=parsed.password,
                database=database,
                ssl=parsed.scheme == "rediss",
            )

        return RedisSettings(
            host=getattr(settings, "REDIS_HOST", "localhost"),
            port=int(getattr(settings, "REDIS_PORT", 6379)),
            password=getattr(settings, "REDIS_PASSWORD", None),
            database=int(getattr(settings, "REDIS_DB", 0)),
        )

    async def _get_adapter(self) -> _ArqAdapterProtocol:
        """Lazily create the underlying codex-platform ARQ adapter."""
        if self._adapter is None:
            from arq.connections import create_pool
            from codex_platform.notifications.delivery import ArqNotificationAdapter

            if self._pool is None:
                redis_settings = self._redis_settings or self.build_redis_settings_from_django()
                self._pool = await create_pool(redis_settings)
            self._adapter = cast(_ArqAdapterProtocol, ArqNotificationAdapter(self._pool))
        return self._adapter

    def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Enqueue a task synchronously via the platform ARQ adapter."""
        adapter = async_to_sync(self._get_adapter)()
        return adapter.enqueue(task_name, payload)

    async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
        """Enqueue a task asynchronously via the platform ARQ adapter."""
        adapter = await self._get_adapter()
        return await adapter.enqueue_async(task_name, payload)
Functions
build_redis_settings_from_django() staticmethod

Build ARQ RedisSettings from Django settings as a convenience fallback.

Source code in src/codex_django/notifications/adapters/arq_client.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@staticmethod
def build_redis_settings_from_django() -> Any:
    """Build ARQ RedisSettings from Django settings as a convenience fallback."""
    from arq.connections import RedisSettings

    redis_url = getattr(settings, "ARQ_REDIS_URL", None) or getattr(settings, "REDIS_URL", None)
    if redis_url:
        parsed = urlparse(str(redis_url))
        database = int(parsed.path.lstrip("/") or "0")
        return RedisSettings(
            host=parsed.hostname or "localhost",
            port=parsed.port or 6379,
            username=parsed.username,
            password=parsed.password,
            database=database,
            ssl=parsed.scheme == "rediss",
        )

    return RedisSettings(
        host=getattr(settings, "REDIS_HOST", "localhost"),
        port=int(getattr(settings, "REDIS_PORT", 6379)),
        password=getattr(settings, "REDIS_PASSWORD", None),
        database=int(getattr(settings, "REDIS_DB", 0)),
    )
enqueue(task_name, payload)

Enqueue a task synchronously via the platform ARQ adapter.

Source code in src/codex_django/notifications/adapters/arq_client.py
71
72
73
74
def enqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Enqueue a task synchronously via the platform ARQ adapter."""
    adapter = async_to_sync(self._get_adapter)()
    return adapter.enqueue(task_name, payload)
aenqueue(task_name, payload) async

Enqueue a task asynchronously via the platform ARQ adapter.

Source code in src/codex_django/notifications/adapters/arq_client.py
76
77
78
79
async def aenqueue(self, task_name: str, payload: dict[str, Any]) -> str | None:
    """Enqueue a task asynchronously via the platform ARQ adapter."""
    adapter = await self._get_adapter()
    return await adapter.enqueue_async(task_name, payload)