Skip to content

workers.arq.base

base

codex_platform.workers.arq.base

Base infrastructure for ARQ background workers.

Usage

from codex_platform.workers.arq import BaseArqWorkerSettings, base_startup, base_shutdown

class MyWorkerSettings(BaseArqWorkerSettings): redis_settings = RedisSettings(...) functions = MY_FUNCTIONS on_startup = my_startup on_shutdown = my_shutdown

Classes

BaseArqService

Async ARQ client for use inside worker tasks.

For the Django producer-side client see codex_platform.adapters.arq.client.BaseArqClient.

Example::

service = BaseArqService(redis_settings)
await service.init()
await service.enqueue_job("my_task", arg1, arg2)
await service.close()
Source code in src/codex_platform/workers/arq/base.py
class BaseArqService:
    """Async ARQ client for use inside worker tasks.

    For the Django producer-side client see
    ``codex_platform.adapters.arq.client.BaseArqClient``.

    Example::

        service = BaseArqService(redis_settings)
        await service.init()
        await service.enqueue_job("my_task", arg1, arg2)
        await service.close()
    """

    def __init__(self, redis_settings: RedisSettings) -> None:
        """Initialize the service with Redis connection settings.

        Args:
            redis_settings: ARQ ``RedisSettings`` instance.
        """
        self.pool: ArqRedis | None = None
        self.redis_settings = redis_settings

    async def init(self) -> None:
        """Open the ARQ connection pool. No-op if already initialized.

        Raises:
            Exception: If the Redis connection cannot be established.
        """
        if not self.pool:
            try:
                self.pool = await create_pool(self.redis_settings)
                log.debug("BaseArqService | init | status=success")
            except Exception:
                log.exception("BaseArqService | init | status=failed")
                raise

    async def close(self) -> None:
        """Close the ARQ connection pool gracefully."""
        if self.pool:
            try:
                await self.pool.close()
                log.debug("BaseArqService | close | status=success")
            except Exception:
                log.exception("BaseArqService | close | status=failed")

    async def enqueue_job(self, function: str, *args: Any, **kwargs: Any) -> Any | None:
        """Enqueue a background job. Initializes the pool on first call.

        Args:
            function: ARQ worker function name.
            *args: Positional arguments forwarded to the function.
            **kwargs: Keyword arguments forwarded to the function.

        Returns:
            ARQ ``Job`` handle, or ``None`` if enqueueing failed.
        """
        if not self.pool:
            await self.init()
        if self.pool:
            try:
                job = await self.pool.enqueue_job(function, *args, **kwargs)
                log.debug(
                    "BaseArqService | enqueue_job | function=%s | job_id=%s",
                    function,
                    job.job_id if job else "None",
                )
                return job
            except Exception:
                log.exception("BaseArqService | enqueue_job | function=%s | status=failed", function)
                return None
        return None
Functions
__init__(redis_settings)

Initialize the service with Redis connection settings.

Parameters:

Name Type Description Default
redis_settings RedisSettings

ARQ RedisSettings instance.

required
Source code in src/codex_platform/workers/arq/base.py
def __init__(self, redis_settings: RedisSettings) -> None:
    """Initialize the service with Redis connection settings.

    Args:
        redis_settings: ARQ ``RedisSettings`` instance.
    """
    self.pool: ArqRedis | None = None
    self.redis_settings = redis_settings
init() async

Open the ARQ connection pool. No-op if already initialized.

Raises:

Type Description
Exception

If the Redis connection cannot be established.

Source code in src/codex_platform/workers/arq/base.py
async def init(self) -> None:
    """Open the ARQ connection pool. No-op if already initialized.

    Raises:
        Exception: If the Redis connection cannot be established.
    """
    if not self.pool:
        try:
            self.pool = await create_pool(self.redis_settings)
            log.debug("BaseArqService | init | status=success")
        except Exception:
            log.exception("BaseArqService | init | status=failed")
            raise
close() async

Close the ARQ connection pool gracefully.

Source code in src/codex_platform/workers/arq/base.py
async def close(self) -> None:
    """Close the ARQ connection pool gracefully."""
    if self.pool:
        try:
            await self.pool.close()
            log.debug("BaseArqService | close | status=success")
        except Exception:
            log.exception("BaseArqService | close | status=failed")
enqueue_job(function, *args, **kwargs) async

Enqueue a background job. Initializes the pool on first call.

Parameters:

Name Type Description Default
function str

ARQ worker function name.

required
*args Any

Positional arguments forwarded to the function.

()
**kwargs Any

Keyword arguments forwarded to the function.

{}

Returns:

Type Description
Any | None

ARQ Job handle, or None if enqueueing failed.

Source code in src/codex_platform/workers/arq/base.py
async def enqueue_job(self, function: str, *args: Any, **kwargs: Any) -> Any | None:
    """Enqueue a background job. Initializes the pool on first call.

    Args:
        function: ARQ worker function name.
        *args: Positional arguments forwarded to the function.
        **kwargs: Keyword arguments forwarded to the function.

    Returns:
        ARQ ``Job`` handle, or ``None`` if enqueueing failed.
    """
    if not self.pool:
        await self.init()
    if self.pool:
        try:
            job = await self.pool.enqueue_job(function, *args, **kwargs)
            log.debug(
                "BaseArqService | enqueue_job | function=%s | job_id=%s",
                function,
                job.job_id if job else "None",
            )
            return job
        except Exception:
            log.exception("BaseArqService | enqueue_job | function=%s | status=failed", function)
            return None
    return None

BaseArqWorkerSettings

Base ARQ WorkerSettings. Subclass and override fields in your workers module.

Example::

class MyWorkerSettings(BaseArqWorkerSettings):
    redis_settings = RedisSettings(host="localhost", port=6379)
    functions = [my_task1, my_task2] + CORE_FUNCTIONS
    on_startup = my_startup
    on_shutdown = my_shutdown
Source code in src/codex_platform/workers/arq/base.py
class BaseArqWorkerSettings:
    """Base ARQ ``WorkerSettings``. Subclass and override fields in your workers module.

    Example::

        class MyWorkerSettings(BaseArqWorkerSettings):
            redis_settings = RedisSettings(host="localhost", port=6379)
            functions = [my_task1, my_task2] + CORE_FUNCTIONS
            on_startup = my_startup
            on_shutdown = my_shutdown
    """

    max_jobs: int = 20
    job_timeout: int = 60
    keep_result: int = 60
    max_retries: int = 5
    retry_delay: int = 10

    on_startup = base_startup
    on_shutdown = base_shutdown

Functions

base_startup(ctx) async

Base startup hook. Creates health check file. Extend in your workers.

Source code in src/codex_platform/workers/arq/base.py
async def base_startup(ctx: dict[str, Any]) -> None:
    """Base startup hook. Creates health check file. Extend in your workers."""
    HEALTH_FILE.touch()
    log.info("ArqWorker | startup | health_file=%s", HEALTH_FILE)

base_shutdown(ctx) async

Base shutdown hook. Removes health check file. Extend in your workers.

Source code in src/codex_platform/workers/arq/base.py
async def base_shutdown(ctx: dict[str, Any]) -> None:
    """Base shutdown hook. Removes health check file. Extend in your workers."""
    HEALTH_FILE.unlink(missing_ok=True)
    log.info("ArqWorker | shutdown")

requeue_to_stream(ctx, stream_name, payload) async

Re-enqueue a failed message back into a Redis Stream for retry processing.

After 5 retries the message is moved to a Dead Letter Queue (DLQ) instead of being dropped. DLQ key: {stream_name}:dlq.

Parameters:

Name Type Description Default
ctx dict[str, Any]

ARQ worker context dict. Must contain a "stream_manager" key.

required
stream_name str

Target Redis Stream name.

required
payload dict[str, Any]

Original message payload. _retries counter is incremented in-place.

required
Source code in src/codex_platform/workers/arq/base.py
async def requeue_to_stream(ctx: dict[str, Any], stream_name: str, payload: dict[str, Any]) -> None:
    """Re-enqueue a failed message back into a Redis Stream for retry processing.

    After 5 retries the message is moved to a Dead Letter Queue (DLQ) instead of
    being dropped. DLQ key: ``{stream_name}:dlq``.

    Args:
        ctx:         ARQ worker context dict. Must contain a ``"stream_manager"`` key.
        stream_name: Target Redis Stream name.
        payload:     Original message payload. ``_retries`` counter is incremented in-place.
    """
    sm = ctx.get("stream_manager")
    if not sm:
        log.error("requeue_to_stream | stream_manager not in context")
        return
    retries = int(payload.get("_retries", 0)) + 1
    if retries > 5:
        dlq_name = f"{stream_name}:dlq"
        payload["_failed_at"] = datetime.now(UTC).isoformat()
        payload["_original_stream"] = stream_name
        await sm.add_event(dlq_name, payload)
        log.error("requeue_to_stream | max retries reached | moved to DLQ '%s'", dlq_name)
        return
    payload["_retries"] = str(retries)
    await sm.add_event(stream_name, payload)
    log.info("requeue_to_stream | requeued to '%s' retry=%d", stream_name, retries)