Data Flow: Workers (ARQ)
Жизненный цикл задачи
-
Постановка в очередь — код приложения (или другой воркер) вызывает
BaseArqService.enqueue_job(function_name, *args). Задача записывается в Redis. -
Извлечение — ARQ-воркер опрашивает Redis и извлекает задачу для зарегистрированной функции.
-
Выполнение — функция-задача запускается с ARQ
ctx-словарём, инжектированным первым аргументом.ctxсодержит Redis pool и все объекты, зарегистрированные вon_startup. -
Успех — ARQ помечает результат задачи как выполненный и опционально хранит его
keep_resultсекунд. -
Ошибка — ARQ повторяет задачу до
max_retriesраз с паузойretry_delayсекунд между попытками. После исчерпанияmax_retriesзадача помечается как упавшая.
Диаграмма последовательности
sequenceDiagram
participant App as Приложение
participant Service as BaseArqService
participant Redis
participant Worker as ARQ Worker
participant Task as Задача
App->>Service: enqueue_job("send_email", payload)
Service->>Redis: RPUSH arq:queue job_data
Redis-->>Service: job_id
Worker->>Redis: BLPOP arq:queue
Redis-->>Worker: job_data
Worker->>Task: send_email(ctx, payload)
Task-->>Worker: result
alt успех
Worker->>Redis: SET arq:result:{job_id} result
else ошибка (остались попытки)
Worker->>Redis: RPUSH arq:queue job_data (retry_count++)
else попытки исчерпаны
Worker->>Redis: SET arq:result:{job_id} JobStatus.failed
end
Startup / Shutdown
sequenceDiagram
participant Docker
participant Worker as ARQ Worker
participant FS as /tmp filesystem
Docker->>Worker: запуск процесса
Worker->>Worker: on_startup(ctx)
Worker->>FS: touch /tmp/arq_worker_healthy
Note over Docker,FS: HEALTHCHECK: test -f /tmp/arq_worker_healthy
Docker->>Worker: SIGTERM
Worker->>Worker: on_shutdown(ctx)
Worker->>FS: unlink /tmp/arq_worker_healthy
Интеграция с повторами Streams
Когда обработчик Streams падает, StreamProcessor ставит в очередь requeue_to_stream как ARQ-задачу. Эта функция увеличивает _retries в payload и повторно вставляет событие в исходный стрим (или в DLQ после 5 попыток).
Полная последовательность повторов — в Streams Data Flow.