workers.arq.base
base
codex_platform.workers.arq.base
Base infrastructure for ARQ background workers.
Usage
from codex_platform.workers.arq import BaseArqWorkerSettings, base_startup, base_shutdown
class MyWorkerSettings(BaseArqWorkerSettings): redis_settings = RedisSettings(...) functions = MY_FUNCTIONS on_startup = my_startup on_shutdown = my_shutdown
Classes
BaseArqService
Async ARQ client for use inside worker tasks.
For the Django producer-side client see
codex_platform.adapters.arq.client.BaseArqClient.
Example::
service = BaseArqService(redis_settings)
await service.init()
await service.enqueue_job("my_task", arg1, arg2)
await service.close()
Source code in src/codex_platform/workers/arq/base.py
Functions
__init__(redis_settings)
Initialize the service with Redis connection settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
redis_settings
|
RedisSettings
|
ARQ |
required |
Source code in src/codex_platform/workers/arq/base.py
init()
async
Open the ARQ connection pool. No-op if already initialized.
Raises:
| Type | Description |
|---|---|
Exception
|
If the Redis connection cannot be established. |
Source code in src/codex_platform/workers/arq/base.py
close()
async
Close the ARQ connection pool gracefully.
Source code in src/codex_platform/workers/arq/base.py
enqueue_job(function, *args, **kwargs)
async
Enqueue a background job. Initializes the pool on first call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
function
|
str
|
ARQ worker function name. |
required |
*args
|
Any
|
Positional arguments forwarded to the function. |
()
|
**kwargs
|
Any
|
Keyword arguments forwarded to the function. |
{}
|
Returns:
| Type | Description |
|---|---|
Any | None
|
ARQ |
Source code in src/codex_platform/workers/arq/base.py
BaseArqWorkerSettings
Base ARQ WorkerSettings. Subclass and override fields in your workers module.
Example::
class MyWorkerSettings(BaseArqWorkerSettings):
redis_settings = RedisSettings(host="localhost", port=6379)
functions = [my_task1, my_task2] + CORE_FUNCTIONS
on_startup = my_startup
on_shutdown = my_shutdown
Source code in src/codex_platform/workers/arq/base.py
Functions
base_startup(ctx)
async
Base startup hook. Creates health check file. Extend in your workers.
base_shutdown(ctx)
async
Base shutdown hook. Removes health check file. Extend in your workers.
requeue_to_stream(ctx, stream_name, payload)
async
Re-enqueue a failed message back into a Redis Stream for retry processing.
After 5 retries the message is moved to a Dead Letter Queue (DLQ) instead of
being dropped. DLQ key: {stream_name}:dlq.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
dict[str, Any]
|
ARQ worker context dict. Must contain a |
required |
stream_name
|
str
|
Target Redis Stream name. |
required |
payload
|
dict[str, Any]
|
Original message payload. |
required |