← Streams | Home
Data Flow: Streams
Producer Flow
- App constructs event data as a Python dict.
StreamProducer.add_event(event_type, data) sanitizes the payload (all values → str, None filtered out, type field injected).
XADD {stream_name} * type {event_type} ...fields is sent to Redis.
- Redis returns the event ID (
{timestamp}-{seq}).
Consumer Flow
- At startup,
StreamConsumer.ensure_group() calls XGROUP CREATE (idempotent — skips if group exists).
StreamConsumer.read(count) calls XREADGROUP GROUP {group} {consumer} COUNT {count} STREAMS {stream} >.
- Each raw message is parsed into
StreamEvent(id, event_type, data).
StreamDispatcher passes each event to StreamRouter.dispatch(event).
StreamRouter looks up the handler by event.event_type and calls it.
- On success →
StreamConsumer.ack(event.id) → XACK.
Retry / DLQ Flow
sequenceDiagram
participant Processor
participant Handler
participant ARQ as ARQ Worker
participant Stream as Redis Stream
participant DLQ
Processor->>Handler: dispatch(event)
Handler-->>Processor: raises Exception
Processor->>ARQ: enqueue requeue_to_stream(stream, payload)
ARQ->>ARQ: increment payload._retries
alt retries < 5
ARQ->>Stream: XADD (retry)
else retries >= 5
ARQ->>DLQ: XADD {stream}:dlq (with _failed_at, _original_stream)
end
Full Sequence
sequenceDiagram
participant App
participant Producer as StreamProducer
participant Redis
participant Consumer as StreamConsumer
participant Router as StreamRouter
participant Handler
App->>Producer: add_event("new_order", {order_id: 1})
Producer->>Redis: XADD orders * type new_order order_id 1
Redis-->>Producer: "1700000000-0"
Consumer->>Redis: XREADGROUP GROUP workers consumer1 STREAMS orders >
Redis-->>Consumer: [StreamEvent(id, "new_order", {order_id: "1"})]
Consumer->>Router: dispatch(event)
Router->>Handler: handle_new_order(event)
Handler-->>Router: OK
Consumer->>Redis: XACK orders workers "1700000000-0"