ADR-011: Pipeline parallel levels use FIRST_COMPLETED, not gather¶
Status: Accepted. Pairs with: Invariant 16.
Context¶
PipelineOrchestrator runs stages of a pipeline in topologically
ordered "levels": within one level, every stage's inputs are
already available, so all stages in that level can run
concurrently. Across levels, execution is sequential — level N+1
waits for level N.
The implementation question is how to await the concurrent stages
inside one level. Two asyncio primitives are candidates:
asyncio.gather(*tasks)— runs all tasks concurrently; returns the full list of results when every task has completed (or raised).asyncio.wait(tasks, return_when=FIRST_COMPLETED)— runs all tasks concurrently; returns(done, pending)as soon as any single task completes. Caller iterates untilpendingis empty.
Functionally, both produce the same wall-clock latency for the level — both end when the slowest stage in the level finishes. The difference is in observability.
The baft audit pipeline has a parallel level with three auditor
stages (LA / PA / RT). Under gather, all three appeared to
complete simultaneously at the moment the slowest finished — the
Workshop UI and MCP pipeline.run consumers saw three
stage.completed events in a tight burst. The Workshop dashboard
became useless during long-running audits: a user could not tell
whether the level had any progress between minute 0 and minute 30.
The decision was whether to optimise for code simplicity (gather) or for incremental observability (FIRST_COMPLETED).
Decision¶
PipelineOrchestrator._run_parallel_level (in
src/heddle/orchestrator/pipeline.py:452-518) uses
asyncio.wait(..., return_when=FIRST_COMPLETED) in a loop. Each
stage's result is logged via stage.completed and stored in the
pipeline context as soon as it finishes, rather than waiting for
the entire level. The Workshop and MCP bridge see real-time
progress for every parallel level.
The general principle: use FIRST_COMPLETED for any path that
streams results as they arrive. Use gather only where all
results are required before the next code can run, and incremental
visibility doesn't matter.
Inside Heddle, "all results required before next code" is rare because the result stream and the Workshop are both incremental observers. The synthesizer is the one consumer that needs every result (it builds a structured prompt from the bucketed succeeded/failed/in_flight partitions per ADR-006), but it is fed from the result-collection layer rather than awaiting a task list directly, so it doesn't appear in this trade-off.
asyncio.gather does appear in the codebase — pipeline.py:515
uses it on the pending set during cleanup-on-cancel, where the
result of each pending task is return_exceptions=True and
explicitly ignored. That use is fine: it's a drain-and-discard, not
a result-streaming await.
Alternatives considered¶
asyncio.gather everywhere (rejected)¶
The simpler shape: results = await asyncio.gather(*stages). One
line, no loop, no done/pending set bookkeeping.
- Rejected because the Workshop and MCP bridge see no
progress during the parallel level. For a 30-minute audit
level with three auditors, the operator sees no
stage.completedevents for 30 minutes — and three at the end. The framework's incremental observability story collapses inside parallel levels. - The simplicity argument is genuine but the cost is paid every time someone watches a slow pipeline. Latency is the same, observability is strictly worse.
asyncio.as_completed(tasks) (rejected)¶
Iterate over asyncio.as_completed, which yields tasks in
completion order. Roughly the same shape as FIRST_COMPLETED but
with a different iteration idiom.
- Rejected because
as_completeddoesn't expose thependingset directly — cancellation semantics on early exit (e.g. pipeline timeout firing mid-level) are harder to get right. TheFIRST_COMPLETEDloop holds the pending set explicitly, which makes the cancel-and-drain cleanup one-liner explicit. - Stylistic; functionally close.
FIRST_COMPLETEDwins on cancellation clarity, not on streaming behaviour.
Manual asyncio.Queue + create_task (rejected)¶
Each stage posts its result to a shared asyncio.Queue; the
parent loop drains the queue.
- Rejected because it adds a coordination primitive that
asyncio.waitalready provides. The queue's only advantage would be unifying with a hypothetical streaming consumer of pipeline results — but the result-stream layer already exists and lives outside_run_parallel_level.
Consequences¶
Enables:
- Workshop dashboard shows per-stage progress in real time during parallel levels. The audit-pipeline UX justification is directly load-bearing.
- MCP
pipeline.runclients seestage.completedevents interleaved across the level, which lets them surface per-stage progress to their own consumers. - Pipeline context is updated incrementally — a stage that completes early in the level can be observed by code inspecting the context (e.g. checkpoint manager) before the level ends.
Costs:
- The loop shape is genuinely more code than
gather. Thedone/pendingbookkeeping and the cancellation-on-error branch take ~30 lines thatgatherwould compress into one. - A future contributor who wants "simpler code" will naturally
reach for
gather. The invariant text plus this ADR exist so the rejection is durable; the rationale is observability, not correctness. - Stages that complete out of order may write into the context in a different sequence than the pipeline's static declaration. Downstream code that depends on declaration order would break — but the existing dependency-inference (Invariant 6) already requires downstream stages to declare their inputs by name, so the declaration- order assumption is already disallowed.