workers.arq
arq
ARQ workers infrastructure — base classes, config, task utilities.
Classes
BaseArqService
Async ARQ client for use inside worker tasks.
For the Django producer-side client see
codex_platform.adapters.arq.client.BaseArqClient.
Example::
service = BaseArqService(redis_settings)
await service.init()
await service.enqueue_job("my_task", arg1, arg2)
await service.close()
Source code in src/codex_platform/workers/arq/base.py
Functions
__init__(redis_settings)
Initialize the service with Redis connection settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
redis_settings
|
RedisSettings
|
ARQ |
required |
Source code in src/codex_platform/workers/arq/base.py
init()
async
Open the ARQ connection pool. No-op if already initialized.
Raises:
| Type | Description |
|---|---|
Exception
|
If the Redis connection cannot be established. |
Source code in src/codex_platform/workers/arq/base.py
close()
async
Close the ARQ connection pool gracefully.
Source code in src/codex_platform/workers/arq/base.py
enqueue_job(function, *args, **kwargs)
async
Enqueue a background job. Initializes the pool on first call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
function
|
str
|
ARQ worker function name. |
required |
*args
|
Any
|
Positional arguments forwarded to the function. |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to the function. |
{}
|
Returns:
| Type | Description |
|---|---|
Any | None
|
ARQ |
Source code in src/codex_platform/workers/arq/base.py
BaseArqWorkerSettings
Base ARQ WorkerSettings. Subclass and override fields in your workers module.
Example::
class MyWorkerSettings(BaseArqWorkerSettings):
redis_settings = RedisSettings(host="localhost", port=6379)
functions = [my_task1, my_task2] + CORE_FUNCTIONS
on_startup = my_startup
on_shutdown = my_shutdown
Source code in src/codex_platform/workers/arq/base.py
BaseWorkerConfig
Bases: BaseCommonSettings
Base configuration for ARQ workers — provides SMTP and ARQ settings.
Maps standard Django .env variable names (EMAIL_HOST, etc.) to
internal SMTP_-prefixed fields for clarity.
Extend this class in your project to add vendor-specific fields (Twilio, Seven.io, SendGrid, etc.).
Example::
class MyWorkerConfig(BaseWorkerConfig):
SEVEN_IO_API_KEY: str | None = None
TEMPLATES_DIR: str = "src/workers/templates"
class Config:
env_file = ".env"
Source code in src/codex_platform/workers/arq/config.py
Attributes
arq_redis_settings
property
Build and return an ARQ RedisSettings object from the base Redis config.
Functions
base_shutdown(ctx)
async
Base shutdown hook. Removes health check file. Extend in your workers.
base_startup(ctx)
async
Base startup hook. Creates health check file. Extend in your workers.
requeue_to_stream(ctx, stream_name, payload)
async
Re-enqueue a failed message back into a Redis Stream for retry processing.
After 5 retries the message is moved to a Dead Letter Queue (DLQ) instead of
being dropped. DLQ key: {stream_name}:dlq.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
dict[str, Any]
|
ARQ worker context dict. Must contain a |
required |
stream_name
|
str
|
Target Redis Stream name. |
required |
payload
|
dict[str, Any]
|
Original message payload. |
required |
Source code in src/codex_platform/workers/arq/base.py
arq_task(*, retry_backoff=30, max_retries=5, log_args=False)
Decorator that adds standard error handling, logging, and retry to ARQ tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_backoff
|
int
|
Base delay in seconds. Actual delay = job_try * retry_backoff. |
30
|
max_retries
|
int
|
Maximum number of retries before giving up. |
5
|
log_args
|
bool
|
If True, log task arguments (be careful with PII). |
False
|
Wrapped function receives ctx as first arg (standard ARQ convention). On exception: - If job_try < max_retries → raises Retry(defer=job_try * retry_backoff) - If job_try >= max_retries → logs error and returns None (task dropped)