redis_service.operations.pipeline
pipeline
codex_platform.redis_service.operations.pipeline
Redis Pipeline & Atomic operations.
Two modes:
- execute — pipeline without MULTI/EXEC (batching, not atomic).
- transact — pipeline with MULTI/EXEC (atomic transaction).
- atomic — async context manager that yields a Pipeline in MULTI/EXEC mode.
- eval_script — Lua script execution (always atomic in Redis).
Usage::
# Batching (faster, not atomic)
results = await ops.pipeline.execute(build_fn)
# Atomic transaction (MULTI/EXEC)
results = await ops.pipeline.transact(build_fn)
# Atomic context manager
async with ops.pipeline.atomic() as pipe:
await pipe.set("k1", "v1")
await pipe.incr("counter")
# committed on exit
# Lua script
await ops.pipeline.eval_script(
"return redis.call('SET', KEYS[1], ARGV[1])",
keys=["mykey"], args=["value"],
)
Classes
PipelineOperations
Redis pipeline and atomic transaction operations.
Provides three execution modes:
- Batching (
execute) — commands are sent in one round-trip, not atomic. - Transaction (
transact) — commands are wrapped in MULTI/EXEC, atomic. - Context manager (
atomic) — atomic block with automatic EXEC on exit.
Also exposes Lua script execution via eval_script.
Source code in src/codex_platform/redis_service/operations/pipeline.py
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | |
Functions
execute(builder_func)
async
Execute a sequence of commands in a pipeline without MULTI/EXEC (batching).
Commands are sent in a single round-trip but are not atomic.
Use transact or atomic when atomicity is required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
builder_func
|
Callable[[Pipeline], Awaitable[None]]
|
Async function that receives a |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of results for each queued command, in order. |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Example::
async def build(pipe):
await pipe.set("k1", "v1")
await pipe.set("k2", "v2")
results = await ops.execute(build)
Source code in src/codex_platform/redis_service/operations/pipeline.py
transact(builder_func)
async
Execute commands in an atomic transaction (MULTI/EXEC).
All commands inside builder_func are executed atomically.
If the server crashes before EXEC, none of the commands are applied.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
builder_func
|
Callable[[Pipeline], Awaitable[None]]
|
Async function that receives a |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of results for each queued command, in order. |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Example::
async def transfer(pipe):
await pipe.decrby("wallet:from", 100)
await pipe.incrby("wallet:to", 100)
results = await ops.transact(transfer)
Source code in src/codex_platform/redis_service/operations/pipeline.py
atomic()
async
Async context manager for an atomic block (MULTI/EXEC).
Opens a Pipeline in transaction mode. EXEC is called automatically on exit.
Yields:
| Type | Description |
|---|---|
AsyncIterator[Pipeline]
|
|
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Example::
async with service.pipeline.atomic() as pipe:
await pipe.set("session:abc", "token")
await pipe.expire("session:abc", 3600)
# EXEC is called automatically on context exit
Source code in src/codex_platform/redis_service/operations/pipeline.py
eval_script(script, keys=None, args=None)
async
Execute a Lua script (EVAL). Lua scripts are always atomic in Redis.
Use for complex read-modify-write operations that must not be interrupted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
script
|
str
|
Lua script source code. |
required |
keys
|
list[str] | None
|
List of Redis keys accessible as |
None
|
args
|
list[Any] | None
|
Script arguments accessible as |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Value returned by the Lua script. |
Raises:
| Type | Description |
|---|---|
RedisConnectionError
|
Redis connection failure. |
RedisServiceError
|
Redis operation failure. |
Example::
result = await ops.eval_script(
"""
local val = redis.call('GET', KEYS[1])
if val == ARGV[1] then
redis.call('SET', KEYS[1], ARGV[2])
return 1
end
return 0
""",
keys=["mykey"],
args=["expected", "new_value"],
)