← Streams | Главная
Data Flow: Streams
Поток производителя
- Приложение формирует данные события в виде Python-словаря.
StreamProducer.publish(event_type, data) JSON-кодирует структурные поля, сохраняет None и добавляет поле type.
- Отправляется
XADD {stream_name} * type {event_type} ...поля в Redis.
- Redis возвращает ID события (
{timestamp}-{seq}).
Поток потребителя
- При старте
StreamConsumer.ensure_group() вызывает XGROUP CREATE (идемпотентно — пропускает, если группа существует).
StreamConsumer.read(count) вызывает XREADGROUP GROUP {group} {consumer} COUNT {count} STREAMS {stream} >.
- Каждое сырое сообщение парсится в
StreamEvent(id, event_type, data).
StreamProcessor восстанавливает payload для dispatcher с полем type.
StreamDispatcher находит подходящие handler specs и вызывает включенные handlers.
- При успехе →
StreamConsumer.ack(event.id) → XACK.
Поток повторов / DLQ
sequenceDiagram
participant Processor
participant Handler as Обработчик
participant ARQ as ARQ Worker
participant Stream as Redis Stream
participant DLQ
Processor->>Handler: dispatch(event)
Handler-->>Processor: бросает исключение
Processor->>ARQ: enqueue requeue_to_stream(stream, payload)
ARQ->>ARQ: увеличивает payload._retries
alt retries < 5
ARQ->>Stream: XADD (повтор)
else retries >= 5
ARQ->>DLQ: XADD {stream}:dlq (с _failed_at, _original_stream)
end
Полная последовательность
sequenceDiagram
participant App as Приложение
participant Producer as StreamProducer
participant Redis
participant Consumer as StreamConsumer
participant Router as StreamRouter
participant Handler as Обработчик
App->>Producer: publish("new_order", {order_id: 1})
Producer->>Redis: XADD orders * type new_order order_id json:1
Redis-->>Producer: "1700000000-0"
Consumer->>Redis: XREADGROUP GROUP workers consumer1 STREAMS orders >
Redis-->>Consumer: [StreamEvent(id, "new_order", {order_id: 1})]
Consumer->>Router: dispatch({"type": "new_order", "order_id": 1})
Router->>Handler: handle_new_order(payload)
Handler-->>Router: OK
Consumer->>Redis: XACK orders workers "1700000000-0"