workers.arq.task_utils
task_utils
codex_platform.workers.arq.task_utils
Utilities for writing ARQ tasks with less boilerplate.
Usage
from codex_platform.workers.arq import arq_task
@arq_task(retry_backoff=60, max_retries=3) async def send_booking_notification(ctx, appointment_id: int): service = ctx["notification_service"] await service.send(...)
Functions
arq_task(*, retry_backoff=30, max_retries=5, log_args=False)
Decorator that adds standard error handling, logging, and retry to ARQ tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retry_backoff
|
int
|
Base delay in seconds. Actual delay = job_try * retry_backoff. |
30
|
max_retries
|
int
|
Maximum number of retries before giving up. |
5
|
log_args
|
bool
|
If True, log task arguments (be careful with PII). |
False
|
Wrapped function receives ctx as first arg (standard ARQ convention). On exception: - If job_try < max_retries → raises Retry(defer=job_try * retry_backoff) - If job_try >= max_retries → logs error and returns None (task dropped)