Skip to content

Orchestrator

Runner

runner

Orchestrator actor lifecycle -- the "thinking" layer above workers.

The orchestrator is a longer-lived LLM actor that: - Receives high-level goals (OrchestratorGoal messages) - Decomposes them into subtasks for workers (via decomposer.py) - Dispatches subtasks through the router and collects results - Synthesizes worker outputs into a coherent final answer (via synthesizer.py) - Performs periodic self-summarization checkpoints (via checkpoint.py)

This differs from PipelineOrchestrator in that it uses an LLM to dynamically decide which workers to invoke, rather than following a fixed stage sequence.

Message flow::

heddle.goals.incoming --> OrchestratorActor.handle_message()
    --> GoalDecomposer breaks goal into subtasks
    --> Publishes TaskMessages to heddle.tasks.incoming (one per subtask)
    --> Subscribes to heddle.results.{goal_id} for worker responses
    --> ResultSynthesizer combines results into a coherent answer
    --> Publishes final TaskResult to heddle.results.{goal_id}
Concurrency model

The max_concurrent_goals config setting (default 1) controls how many goals a single OrchestratorActor instance can process simultaneously. With the default of 1, goals are queued and processed one at a time (strict ordering). Higher values enable concurrent goal processing within a single instance. For horizontal scaling, run multiple OrchestratorActor instances with a NATS queue group.

All mutable state (conversation_history, checkpoint_counter) is per-goal inside GoalState, so concurrent goals are fully isolated — no shared mutable data, no locks required.

Within a single goal, subtasks are dispatched concurrently (all published to heddle.tasks.incoming at once) and results are collected as they arrive.

State tracking

The orchestrator is the ONLY stateful component in Heddle. It maintains: - _active_goals: maps goal_id -> GoalState for in-flight goals

Each GoalState carries its own conversation_history and checkpoint_counter so that concurrent goals never interfere.

Workers and the router are stateless by design.

See Also

heddle.orchestrator.pipeline -- PipelineOrchestrator (fixed stage sequence) heddle.orchestrator.decomposer -- GoalDecomposer (LLM-based task breakdown) heddle.orchestrator.synthesizer -- ResultSynthesizer (result combination) heddle.orchestrator.checkpoint -- CheckpointManager (context compression) heddle.core.messages -- all message schemas

GoalState dataclass

GoalState(goal: OrchestratorGoal, dispatched_tasks: dict[str, TaskMessage] = dict(), collected_results: dict[str, TaskResult] = dict(), start_time: float = time.monotonic(), conversation_history: list[dict[str, Any]] = list(), checkpoint_counter: int = 0)

Tracks the lifecycle of a single goal through decomposition and collection.

One GoalState exists per in-flight goal. It is created when a goal arrives, populated during decomposition, updated as results trickle in, and discarded after synthesis completes.

Conversation history and checkpoint state are per-goal so that concurrent goals (max_concurrent_goals > 1) maintain fully isolated state — no shared mutable data, no locks required.

Attributes:

Name Type Description
goal OrchestratorGoal

The original OrchestratorGoal message.

dispatched_tasks dict[str, TaskMessage]

Maps task_id -> TaskMessage for every subtask that was published to heddle.tasks.incoming.

collected_results dict[str, TaskResult]

Maps task_id -> TaskResult for every result received on heddle.results.{goal_id}.

start_time float

Monotonic timestamp when processing began.

conversation_history list[dict[str, Any]]

Accumulated context entries for checkpoint decisions. Each entry is a compact summary of a completed goal.

checkpoint_counter int

Monotonically increasing checkpoint version number for this goal's checkpoint chain.

all_collected property

all_collected: bool

True when every dispatched task has a corresponding result.

pending_count property

pending_count: int

Number of dispatched tasks still awaiting results.

OrchestratorActor

OrchestratorActor(actor_id: str, config_path: str, backend: LLMBackend, nats_url: str = 'nats://nats:4222', checkpoint_store: CheckpointStore | None = None, *, bus: Any | None = None)

Bases: BaseActor

Dynamic orchestrator actor -- LLM-driven goal decomposition and synthesis.

Unlike :class:PipelineOrchestrator which follows a fixed stage sequence, this actor uses an LLM to dynamically reason about which workers to invoke and how to combine their results.

Lifecycle per goal:

  1. Receive -- parse the incoming dict as an OrchestratorGoal.
  2. Decompose -- call :class:GoalDecomposer to break the goal into a list of TaskMessage subtasks.
  3. Dispatch -- publish each subtask to heddle.tasks.incoming so the router can forward them to the appropriate workers.
  4. Collect -- subscribe to heddle.results.{goal_id} and gather TaskResult messages until all subtasks have responded or the timeout expires.
  5. Synthesize -- call :class:ResultSynthesizer to combine all collected results into a coherent final answer.
  6. Publish -- send the synthesized TaskResult to heddle.results.{goal_id} for the original caller.
  7. Checkpoint (optional) -- if the accumulated conversation history exceeds the token threshold, compress it via :class:CheckpointManager.
Parameters

actor_id : str Unique identifier for this actor instance. config_path : str Path to the orchestrator YAML config file (e.g. configs/orchestrators/default.yaml). backend : LLMBackend LLM backend used for both decomposition and synthesis. Typically the same backend instance, but could be different tiers. nats_url : str NATS server URL. checkpoint_store : CheckpointStore | None Checkpoint persistence backend. Pass None to disable checkpointing.

Example:

::

from heddle.worker.backends import OllamaBackend
from heddle.contrib.redis.store import RedisCheckpointStore

backend = OllamaBackend(model="command-r7b:latest")
store = RedisCheckpointStore("redis://localhost:6379")
actor = OrchestratorActor(
    actor_id="orchestrator-1",
    config_path="configs/orchestrators/default.yaml",
    backend=backend,
    nats_url="nats://localhost:4222",
    checkpoint_store=store,
)
await actor.run("heddle.goals.incoming")
Source code in src/heddle/orchestrator/runner.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    backend: LLMBackend,
    nats_url: str = "nats://nats:4222",
    checkpoint_store: CheckpointStore | None = None,
    *,
    bus: Any | None = None,
) -> None:
    # Load config first so we can read max_concurrent_goals before
    # passing it to BaseActor.
    self._config_path = config_path
    self.config = self._load_config(config_path)
    max_goals = self.config.get("max_concurrent_goals", 1)
    super().__init__(actor_id, nats_url, max_concurrent=max_goals, bus=bus)
    self.backend = backend

    # Build the decomposer from config-defined available workers.
    # Each entry needs at least "name" and "description".
    available_workers = self.config.get("available_workers", [])
    if not available_workers:
        # Fallback: infer from the system_prompt if no explicit list.
        # The default.yaml lists workers in the system prompt text; callers
        # should provide an explicit list for production use.
        logger.warning(
            "orchestrator.no_available_workers",
            hint="Add 'available_workers' list to orchestrator config",
        )
    self.decomposer = GoalDecomposer.from_worker_configs(
        backend=backend,
        configs=available_workers,
    )

    # Synthesizer uses the same backend for LLM-based synthesis.
    self.synthesizer = ResultSynthesizer(backend=backend)

    # Checkpoint manager -- only initialized if a checkpoint store is provided.
    checkpoint_config = self.config.get("checkpoint", {})
    self._checkpoint_manager: CheckpointManager | None = None
    if checkpoint_store is not None:
        self._checkpoint_manager = CheckpointManager(
            store=checkpoint_store,
            token_threshold=checkpoint_config.get("token_threshold", 50_000),
            recent_window_size=checkpoint_config.get("recent_window", 5),
        )

    # Configurable timeouts and concurrency limits from YAML.
    self._task_timeout: float = float(self.config.get("timeout_seconds", 300))
    self._max_concurrent_tasks: int = self.config.get("max_concurrent_tasks", 5)

    # ---------- Mutable state ----------
    # Active goals being processed.  Keyed by goal_id.
    # With max_concurrent_goals > 1, multiple goals can be in-flight
    # simultaneously.  Each goal's mutable state (conversation_history,
    # checkpoint_counter) is isolated inside its GoalState — no shared
    # mutable data between goals, no locks required.
    self._active_goals: dict[str, GoalState] = {}

on_reload async

on_reload() -> None

Re-read the orchestrator config from disk on reload signal.

Updates config-derived settings (timeouts, concurrency limits). Does not rebuild the decomposer or synthesizer — those are constructed from the backend, which doesn't change at runtime.

Source code in src/heddle/orchestrator/runner.py
async def on_reload(self) -> None:
    """Re-read the orchestrator config from disk on reload signal.

    Updates config-derived settings (timeouts, concurrency limits).
    Does not rebuild the decomposer or synthesizer — those are
    constructed from the backend, which doesn't change at runtime.
    """
    self.config = self._load_config(self._config_path)
    self._task_timeout = float(self.config.get("timeout_seconds", 300))
    self._max_concurrent_tasks = self.config.get("max_concurrent_tasks", 5)
    logger.info("orchestrator.config_reloaded", config_path=self._config_path)

handle_message async

handle_message(data: dict[str, Any]) -> None

Handle an incoming OrchestratorGoal.

This is the main entry point called by :meth:BaseActor._process_one for every message received on heddle.goals.incoming.

The method orchestrates the full goal lifecycle: parse, decompose, dispatch, collect, synthesize, publish. Errors at any stage result in a FAILED TaskResult published to the goal's result subject.

Parameters

data : dict[str, Any] Raw message dict, expected to conform to :class:OrchestratorGoal schema.

Source code in src/heddle/orchestrator/runner.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Handle an incoming OrchestratorGoal.

    This is the main entry point called by :meth:`BaseActor._process_one`
    for every message received on ``heddle.goals.incoming``.

    The method orchestrates the full goal lifecycle: parse, decompose,
    dispatch, collect, synthesize, publish.  Errors at any stage result
    in a ``FAILED`` TaskResult published to the goal's result subject.

    Parameters
    ----------
    data : dict[str, Any]
        Raw message dict, expected to conform to
        :class:`OrchestratorGoal` schema.
    """
    # -- 1. Parse --
    try:
        goal = OrchestratorGoal(**data)
    except Exception as e:
        logger.error(
            "orchestrator.parse_error",
            error=str(e),
            data_keys=list(data.keys()),
        )
        return

    log = logger.bind(goal_id=goal.goal_id)
    log.info("orchestrator.goal_received", instruction=goal.instruction[:120])
    record_orchestrator_goal_received(self.actor_id)

    goal_state = GoalState(goal=goal)
    self._active_goals[goal.goal_id] = goal_state

    try:
        # -- 2. Decompose --
        with _tracer.start_as_current_span(
            "orchestrator.decompose",
            attributes={"orchestrator.goal_id": goal.goal_id},
        ) as decompose_span:
            subtasks = await self._decompose_goal(goal, log)
            decompose_span.set_attribute(
                "orchestrator.subtask_count",
                len(subtasks) if subtasks else 0,
            )

        if not subtasks:
            log.warning("orchestrator.no_subtasks")
            await self._publish_final_result(
                goal,
                TaskStatus.FAILED,
                error="Decomposition produced no subtasks for this goal.",
            )
            return

        # Enforce max concurrent tasks limit.
        #
        # Earlier shape silently truncated the list and dispatched
        # the first N subtasks, then published ``COMPLETED`` —
        # producing a partial result the caller had no way to
        # detect.  We now fail the goal with an explicit error so
        # the caller learns the goal was too large and can either
        # raise the limit or split the goal at submission time.
        #
        # Future enhancement: chunk the dispatch into N-at-a-time
        # batches.  Out of scope for the current fix; the failure
        # is the safe default until that is implemented.
        if len(subtasks) > self._max_concurrent_tasks:
            log.error(
                "orchestrator.subtask_limit_exceeded",
                requested=len(subtasks),
                limit=self._max_concurrent_tasks,
            )
            await self._publish_final_result(
                goal,
                TaskStatus.FAILED,
                error=(
                    f"Decomposition produced {len(subtasks)} subtasks but "
                    f"max_concurrent_tasks={self._max_concurrent_tasks}.  "
                    "Raise the limit or split the goal."
                ),
            )
            return

        # -- 3. Subscribe → dispatch → collect.
        # Subscription is opened BEFORE dispatch.  NATS is at-most-once,
        # so a fast worker can publish its result onto
        # ``heddle.results.{goal_id}`` between dispatch and the moment
        # we subscribe — losing that result and forcing the goal to
        # time out.  The ``async with`` block makes the ordering
        # explicit and is enforced by ResultStream (iterating without
        # entering the context raises).
        expected_ids = {t.task_id for t in subtasks}
        stream = ResultStream(
            bus=self._bus,
            subject=f"heddle.results.{goal.goal_id}",
            expected_task_ids=expected_ids,
            timeout=self._task_timeout,
        )
        async with stream:
            with _tracer.start_as_current_span(
                "orchestrator.dispatch",
                attributes={
                    "orchestrator.goal_id": goal.goal_id,
                    "orchestrator.subtask_count": len(subtasks),
                },
            ):
                await self._dispatch_subtasks(goal_state, subtasks, log)

            with _tracer.start_as_current_span(
                "orchestrator.collect",
                attributes={
                    "orchestrator.goal_id": goal.goal_id,
                    "orchestrator.expected_count": len(goal_state.dispatched_tasks),
                    "orchestrator.timeout_seconds": self._task_timeout,
                },
            ) as collect_span:
                results = await self._collect_results(stream, goal_state, log)
                collect_span.set_attribute("orchestrator.collected_count", len(results))
                collect_span.set_attribute(
                    "orchestrator.success_count",
                    sum(1 for r in results if r.status == TaskStatus.COMPLETED),
                )

        # -- 5. Synthesize --
        with _tracer.start_as_current_span(
            "orchestrator.synthesize",
            attributes={
                "orchestrator.goal_id": goal.goal_id,
                "orchestrator.result_count": len(results),
            },
        ) as synth_span:
            synthesis = await self._synthesize_results(goal, results, log)
            synth_span.set_attribute(
                "orchestrator.confidence",
                synthesis.get("confidence", "unknown"),
            )

        # Annotate synthesis with goal-level timeout context.  The
        # synthesizer's per-task ``failed`` list already carries the
        # synthetic FAILED entries for unresponded tasks; this block
        # gives operators the goal-level numbers (expected, collected,
        # the configured timeout, and the IDs that didn't respond)
        # without forcing them to reconstruct it from the failed list.
        expected_count = len(goal_state.dispatched_tasks)
        collected_count = len(goal_state.collected_results)
        if expected_count > collected_count:
            pending_ids = sorted(
                set(goal_state.dispatched_tasks) - set(goal_state.collected_results)
            )
            synthesis.setdefault("metadata", {})["timeout"] = {
                "expected_count": expected_count,
                "collected_count": collected_count,
                "timeout_seconds": self._task_timeout,
                "pending_task_ids": pending_ids,
            }

        # -- 6. Publish final result --
        elapsed = int((time.monotonic() - goal_state.start_time) * 1000)
        await self._publish_final_result(
            goal,
            TaskStatus.COMPLETED,
            output=synthesis,
            elapsed=elapsed,
        )
        log.info("orchestrator.goal_completed", ms=elapsed)

        # -- 7. Record in conversation history and check for checkpoint --
        await self._record_in_history(goal_state, results, synthesis)
        await self._maybe_checkpoint(goal_state, log)

    except Exception as e:
        log.error("orchestrator.goal_failed", error=str(e), exc_info=True)
        elapsed = int((time.monotonic() - goal_state.start_time) * 1000)
        await self._publish_final_result(
            goal,
            TaskStatus.FAILED,
            error=f"Orchestrator error: {e}",
            elapsed=elapsed,
        )
    finally:
        # Clean up goal state regardless of outcome.
        self._active_goals.pop(goal.goal_id, None)

Pipeline

pipeline

Pipeline orchestrator for multi-stage processing with automatic parallelism.

Executes a defined sequence of stages, passing results from each stage as input to later stages. Each stage maps to a worker_type. Stages can be LLM workers, processor workers, or any other actor — the pipeline doesn't care about the implementation, only the message contract.

Stage dependencies are automatically inferred from input_mapping paths: if stage B references "A.output.field", then B depends on A. Stages with no inter-stage dependencies (only goal.* paths) are independent and execute in parallel. Alternatively, explicit depends_on lists in the YAML config override automatic inference.

Execution proceeds in levels — each level contains stages whose dependencies are all satisfied by earlier levels. Stages within a level run concurrently via asyncio.wait(FIRST_COMPLETED) for incremental progress reporting.

Pipeline definition comes from YAML config with stages, input mappings, and optional conditions.

Data flow through the pipeline::

OrchestratorGoal arrives at handle_message()
    ↓
context = { "goal": { "instruction": ..., "context": { ... } } }
    ↓
Build execution levels from stage dependencies (Kahn's algorithm)
    ↓
For each level:
    For each stage in level (concurrently if >1):
        1. Evaluate condition (skip if false)
        2. Build payload via input_mapping (dot-notation paths into context)
        3. Publish TaskMessage to heddle.tasks.incoming
        4. Wait for TaskResult on heddle.results.{goal_id}
        5. Store result: context[stage_name] = { "output": ..., ... }
    ↓
Publish final TaskResult with all stage outputs

Input mapping example (from doc_pipeline.yaml)::

input_mapping:
    text_preview: "extract.output.text_preview"
    metadata: "extract.output.metadata"

This resolves to::

payload["text_preview"] = context["extract"]["output"]["text_preview"]
payload["metadata"] = context["extract"]["output"]["metadata"]
See Also

heddle.orchestrator.runner — dynamic LLM-based orchestrator heddle.core.messages.OrchestratorGoal — the input message type configs/orchestrators/ — pipeline config YAML files

PipelineStageError

PipelineStageError(stage_name: str, message: str)

Bases: Exception

Raised when a pipeline stage fails or times out.

Source code in src/heddle/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineTimeoutError

PipelineTimeoutError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when a pipeline stage times out waiting for a result.

Source code in src/heddle/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineValidationError

PipelineValidationError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when input or output schema validation fails for a stage.

Source code in src/heddle/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineWorkerError

PipelineWorkerError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when a worker returns FAILED status for a stage.

Source code in src/heddle/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineMappingError

PipelineMappingError(stage_name: str, message: str)

Bases: PipelineStageError

Raised when input_mapping resolution fails for a stage.

Source code in src/heddle/orchestrator/pipeline.py
def __init__(self, stage_name: str, message: str) -> None:
    self.stage_name = stage_name
    super().__init__(message)

PipelineOrchestrator

PipelineOrchestrator(actor_id: str, config_path: str, nats_url: str = 'nats://nats:4222', *, bus: Any | None = None)

Bases: BaseActor

Pipeline orchestrator with automatic stage parallelism.

Processes an OrchestratorGoal by running it through a series of stages organized into execution levels based on their dependencies. Stages within the same level run concurrently; levels execute sequentially. Stage outputs are accumulated in a context dict and can be referenced by subsequent stages via input_mapping.

Source code in src/heddle/orchestrator/pipeline.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    nats_url: str = "nats://nats:4222",
    *,
    bus: Any | None = None,
) -> None:
    self._config_path = config_path
    self.config = self._load_config(config_path)
    max_goals = self.config.get("max_concurrent_goals", 1)
    super().__init__(actor_id, nats_url, max_concurrent=max_goals, bus=bus)

on_reload async

on_reload() -> None

Re-read the pipeline config from disk on reload signal.

Source code in src/heddle/orchestrator/pipeline.py
async def on_reload(self) -> None:
    """Re-read the pipeline config from disk on reload signal."""
    self.config = self._load_config(self._config_path)
    logger.info("pipeline.config_reloaded", config_path=self._config_path)

handle_message async

handle_message(data: dict[str, Any]) -> None

Execute the pipeline for an incoming orchestrator goal.

Source code in src/heddle/orchestrator/pipeline.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Execute the pipeline for an incoming orchestrator goal."""
    goal = OrchestratorGoal(**data)
    record_orchestrator_goal_received(self.actor_id)
    stages = self.config["pipeline_stages"]
    timeout = self.config.get("timeout_seconds", 300)

    log = logger.bind(
        goal_id=goal.goal_id,
        pipeline=self.config["name"],
        request_id=goal.request_id or goal.goal_id,
    )

    # Build execution levels from dependency graph.
    deps = self._infer_dependencies(stages)
    levels = self._build_execution_levels(stages, deps)

    # Log execution plan.
    level_summary = [[s["name"] for s in level] for level in levels]
    log.info(
        "pipeline.started",
        stages=len(stages),
        levels=len(levels),
        execution_plan=level_summary,
    )

    # Accumulated context: goal info + results from each completed stage.
    context: dict[str, Any] = {
        "goal": {
            "instruction": goal.instruction,
            "context": goal.context,
        },
    }

    start = time.monotonic()
    completed_stage_count = 0
    total_stage_count = len(stages)

    try:
        for level_idx, level in enumerate(levels):
            level_log = log.bind(level=level_idx)

            if len(level) == 1:
                # Single stage — no concurrency overhead.
                stage = level[0]
                name, result_dict = await self._execute_stage(
                    stage,
                    context,
                    goal,
                    timeout,
                    level_log,
                )
                if not result_dict.get("_skipped"):
                    context[name] = result_dict
                completed_stage_count += 1
                level_log.info(
                    "pipeline.stage_progress",
                    completed=completed_stage_count,
                    total=total_stage_count,
                )
            else:
                completed_stage_count = await self._execute_parallel_level(
                    level,
                    context,
                    goal,
                    timeout,
                    level_log,
                    completed_stage_count,
                    total_stage_count,
                )

    except PipelineStageError as e:
        # Build a brief input summary for diagnostics.
        stage_context = context.get("goal", {}).get("context")
        input_summary = repr(stage_context)[:200] if stage_context else ""
        log.error(
            "pipeline.stage_failed",
            stage=e.stage_name,
            error_type=type(e).__name__,
            error=str(e),
            input_summary=input_summary,
        )
        elapsed = int((time.monotonic() - start) * 1000)
        await self._publish_pipeline_result(
            goal,
            TaskStatus.FAILED,
            error=str(e),
            elapsed=elapsed,
        )
        return
    except Exception as e:
        log.error(
            "pipeline.unexpected_error",
            error=str(e),
            error_type=type(e).__name__,
        )
        elapsed = int((time.monotonic() - start) * 1000)
        await self._publish_pipeline_result(
            goal,
            TaskStatus.FAILED,
            error=f"Pipeline error ({type(e).__name__}): {e}",
            elapsed=elapsed,
        )
        return

    # All stages complete.
    elapsed = int((time.monotonic() - start) * 1000)
    log.info("pipeline.completed", ms=elapsed, stages_run=len(context) - 1)

    # Build final output from all stage results.
    final_output = {
        name: data["output"]
        for name, data in context.items()
        if name != "goal" and isinstance(data, dict) and "output" in data
    }

    # Build execution timeline for observability.
    timeline = [
        {
            "stage": name,
            "started_at": data.get("started_at"),
            "ended_at": data.get("ended_at"),
            "wall_time_ms": data.get("wall_time_ms"),
            "processing_time_ms": data.get("processing_time_ms"),
        }
        for name, data in context.items()
        if name != "goal" and isinstance(data, dict) and "started_at" in data
    ]

    await self._publish_pipeline_result(
        goal,
        TaskStatus.COMPLETED,
        output=final_output,
        elapsed=elapsed,
        timeline=timeline,
    )

Decomposer

decomposer

Task decomposition logic for orchestrators.

Responsible for breaking down complex goals into concrete subtasks that can be routed to individual workers.

This module is used by OrchestratorActor (runner.py), NOT by PipelineOrchestrator (which has its stages pre-defined in YAML).

The GoalDecomposer uses an LLM backend to analyze a high-level goal and produce a list of concrete TaskMessages, each targeting a specific worker_type. The LLM is given the goal instruction, domain context, and a manifest of available workers (names, descriptions, and input schemas) so it can make informed routing decisions and construct valid payloads.

The decomposition prompt asks the LLM to output structured JSON::

[
    {"worker_type": "extractor", "payload": {...}, "model_tier": "local"},
    {"worker_type": "summarizer", "payload": {...}, "model_tier": "local"},
    ...
]

Each entry maps directly to a TaskMessage. The parent_task_id is set to the caller-provided value so that worker results route back to the orchestrator via heddle.results.{goal_id}.

See Also

heddle.core.messages.TaskMessage — the output message type heddle.core.messages.OrchestratorGoal — the input message type heddle.worker.backends.LLMBackend — the LLM interface used for decomposition

WorkerDescriptor dataclass

WorkerDescriptor(name: str, description: str, input_schema: dict[str, Any] = dict(), default_tier: str = 'standard')

Metadata about an available worker type.

Used to ground the LLM's decomposition in what the system can actually execute. Typically constructed from a worker's YAML config file via the :meth:GoalDecomposer.from_worker_configs factory method.

Attributes:

Name Type Description
name str

The worker_type identifier (e.g. "summarizer", "extractor"). Must match the name field in the worker's YAML config.

description str

One-line human-readable description of what the worker does.

input_schema dict[str, Any]

JSON Schema dict for the worker's expected payload. Included in the LLM prompt so it can construct valid payloads.

default_tier str

The default ModelTier string for this worker ("local", "standard", or "frontier").

to_prompt_block

to_prompt_block() -> str

Format this worker as a multi-line block for the LLM system prompt.

Includes the name, description, expected payload schema, and default model tier so the LLM knows exactly how to construct valid sub-tasks.

Source code in src/heddle/orchestrator/decomposer.py
def to_prompt_block(self) -> str:
    """Format this worker as a multi-line block for the LLM system prompt.

    Includes the name, description, expected payload schema, and default
    model tier so the LLM knows exactly how to construct valid sub-tasks.
    """
    lines = [
        f"  Worker: {self.name}",
        f"    Description: {self.description}",
        f"    Default tier: {self.default_tier}",
    ]
    if self.input_schema:
        schema_str = json.dumps(self.input_schema, indent=2)
        indented = _indent(schema_str, 6)
        lines.append(f"    Input payload schema:\n{indented}")
    return "\n".join(lines)

GoalDecomposer

GoalDecomposer(backend: LLMBackend, workers: list[WorkerDescriptor], *, max_tokens: int = 2000, temperature: float = 0.0)

LLM-based goal decomposition.

Turns a high-level goal string into a list of TaskMessage objects ready for dispatch through the router.

The decomposer asks an LLM to plan which workers to invoke and how to parameterize each one. It then parses the structured JSON response into validated TaskMessage objects.

All parsing and validation failures are handled gracefully -- invalid sub-tasks are logged and skipped rather than crashing the orchestrator. If the entire LLM response is unparseable, an empty list is returned.

Parameters:

Name Type Description Default
backend LLMBackend

An LLM backend instance (OllamaBackend, AnthropicBackend, etc.) used to generate the decomposition plan.

required
workers list[WorkerDescriptor]

List of WorkerDescriptor objects describing the available worker types. These are injected into the system prompt so the LLM knows what tools it can plan around.

required
max_tokens int

Maximum tokens for the LLM response. Should be large enough to accommodate the full JSON plan. Defaults to 2000.

2000
temperature float

Sampling temperature. Low values (0.0--0.2) produce more deterministic plans. Defaults to 0.0 for reproducibility.

0.0

Example::

workers = [
    WorkerDescriptor(
        name="summarizer",
        description="Compresses text to structured summary",
        input_schema={"type": "object", "required": ["text"], ...},
        default_tier="local",
    ),
    WorkerDescriptor(
        name="extractor",
        description="Extracts structured fields from text",
        input_schema={"type": "object", "required": ["text", "fields"], ...},
        default_tier="standard",
    ),
]
decomposer = GoalDecomposer(backend=ollama_backend, workers=workers)
tasks = await decomposer.decompose(
    goal="Summarize this report and extract the key dates",
    context={"text": "...report content..."},
)
# tasks is a list[TaskMessage] ready for dispatch
Source code in src/heddle/orchestrator/decomposer.py
def __init__(
    self,
    backend: LLMBackend,
    workers: list[WorkerDescriptor],
    *,
    max_tokens: int = 2000,
    temperature: float = 0.0,
) -> None:
    self._backend = backend
    self._workers = workers
    self._worker_names = {w.name for w in workers}
    self._system_prompt = _build_system_prompt(workers)
    self._max_tokens = max_tokens
    self._temperature = temperature

decompose async

decompose(goal: str, context: dict[str, Any] | None = None, *, parent_task_id: str | None = None, priority: TaskPriority = TaskPriority.NORMAL) -> list[TaskMessage]

Decompose a high-level goal into a list of TaskMessage objects.

Sends the goal and context to the LLM along with descriptions of all available workers. The LLM returns a JSON plan which is parsed and validated into TaskMessage instances.

This method never raises on LLM or parsing failures -- it logs the error and returns an empty list. The orchestrator can then decide whether to retry with different parameters or report failure upstream.

Parameters:

Name Type Description Default
goal str

Natural-language description of what needs to be accomplished.

required
context dict[str, Any] | None

Optional domain-specific data dict (e.g. file references, category lists, full text content). Included verbatim in the LLM prompt so it can construct appropriate payloads.

None
parent_task_id str | None

If this decomposition is part of a larger goal, all generated TaskMessages will carry this as their parent_task_id for result correlation. Typically set to OrchestratorGoal.goal_id.

None
priority TaskPriority

Default priority for generated tasks. Individual tasks may override this if the LLM specifies a different priority.

NORMAL

Returns:

Type Description
list[TaskMessage]

A list of TaskMessage objects ready for dispatch to the router.

list[TaskMessage]

Returns an empty list if:

list[TaskMessage]
  • The LLM backend call fails (network error, timeout, etc.)
list[TaskMessage]
  • The LLM response cannot be parsed as JSON
list[TaskMessage]
  • The LLM returns an empty plan
list[TaskMessage]
  • All sub-tasks fail validation (unknown worker types, etc.)
Source code in src/heddle/orchestrator/decomposer.py
async def decompose(
    self,
    goal: str,
    context: dict[str, Any] | None = None,
    *,
    parent_task_id: str | None = None,
    priority: TaskPriority = TaskPriority.NORMAL,
) -> list[TaskMessage]:
    """Decompose a high-level goal into a list of TaskMessage objects.

    Sends the goal and context to the LLM along with descriptions of all
    available workers. The LLM returns a JSON plan which is parsed and
    validated into TaskMessage instances.

    This method never raises on LLM or parsing failures -- it logs the
    error and returns an empty list. The orchestrator can then decide
    whether to retry with different parameters or report failure upstream.

    Args:
        goal: Natural-language description of what needs to be accomplished.
        context: Optional domain-specific data dict (e.g. file references,
            category lists, full text content). Included verbatim in the
            LLM prompt so it can construct appropriate payloads.
        parent_task_id: If this decomposition is part of a larger goal,
            all generated TaskMessages will carry this as their
            ``parent_task_id`` for result correlation. Typically set to
            ``OrchestratorGoal.goal_id``.
        priority: Default priority for generated tasks. Individual tasks
            may override this if the LLM specifies a different priority.

    Returns:
        A list of TaskMessage objects ready for dispatch to the router.
        Returns an empty list if:

        - The LLM backend call fails (network error, timeout, etc.)
        - The LLM response cannot be parsed as JSON
        - The LLM returns an empty plan
        - All sub-tasks fail validation (unknown worker types, etc.)
    """
    log = logger.bind(
        goal_preview=goal[:120],
        parent_task_id=parent_task_id,
    )
    log.info("decomposer.starting", num_workers=len(self._workers))

    user_message = _build_user_message(goal, context)

    # -- Call the LLM backend --
    try:
        response = await self._backend.complete(
            system_prompt=self._system_prompt,
            user_message=user_message,
            max_tokens=self._max_tokens,
            temperature=self._temperature,
        )
    except Exception:
        log.exception("decomposer.llm_call_failed")
        return []

    raw_content = response["content"]
    model_used = response.get("model", "unknown")
    log.debug(
        "decomposer.llm_response",
        model=model_used,
        prompt_tokens=response.get("prompt_tokens"),
        completion_tokens=response.get("completion_tokens"),
        response_length=len(raw_content),
    )

    # -- Parse the JSON response into raw subtask dicts --
    try:
        raw_tasks = _extract_json_array(raw_content)
    except ValueError:
        log.error(
            "decomposer.json_parse_failed",
            raw_preview=raw_content[:300],
        )
        return []

    if not raw_tasks:
        log.info("decomposer.empty_plan", goal_preview=goal[:120])
        return []

    # -- Validate and convert each raw dict into a TaskMessage --
    tasks: list[TaskMessage] = []
    for i, raw_task in enumerate(raw_tasks):
        task = self._parse_subtask(
            raw_task,
            index=i,
            parent_task_id=parent_task_id,
            default_priority=priority,
        )
        if task is not None:
            tasks.append(task)

    log.info(
        "decomposer.completed",
        total_planned=len(raw_tasks),
        total_valid=len(tasks),
        worker_types=[t.worker_type for t in tasks],
    )
    return tasks

from_worker_configs classmethod

from_worker_configs(backend: LLMBackend, configs: list[dict[str, Any]], **kwargs: Any) -> GoalDecomposer

Build WorkerDescriptors from raw worker config dicts.

This avoids the caller having to manually construct WorkerDescriptor objects when the data is already available as parsed YAML configs.

Parameters:

Name Type Description Default
backend LLMBackend

The LLM backend to use for decomposition.

required
configs list[dict[str, Any]]

List of worker config dicts, each containing at minimum name and description keys. Typically loaded from the worker YAML files in configs/workers/.

required
**kwargs Any

Additional keyword arguments forwarded to the GoalDecomposer constructor (e.g. max_tokens, temperature).

{}

Returns:

Type Description
GoalDecomposer

A configured GoalDecomposer instance.

Example::

import yaml

with open("configs/workers/summarizer.yaml") as f:
    summarizer_cfg = yaml.safe_load(f)
with open("configs/workers/classifier.yaml") as f:
    classifier_cfg = yaml.safe_load(f)

decomposer = GoalDecomposer.from_worker_configs(
    backend=ollama_backend,
    configs=[summarizer_cfg, classifier_cfg],
)
Source code in src/heddle/orchestrator/decomposer.py
@classmethod
def from_worker_configs(
    cls,
    backend: LLMBackend,
    configs: list[dict[str, Any]],
    **kwargs: Any,
) -> GoalDecomposer:
    """Build WorkerDescriptors from raw worker config dicts.

    This avoids the caller having to manually construct WorkerDescriptor
    objects when the data is already available as parsed YAML configs.

    Args:
        backend: The LLM backend to use for decomposition.
        configs: List of worker config dicts, each containing at minimum
            ``name`` and ``description`` keys. Typically loaded from the
            worker YAML files in ``configs/workers/``.
        **kwargs: Additional keyword arguments forwarded to the
            GoalDecomposer constructor (e.g. ``max_tokens``, ``temperature``).

    Returns:
        A configured GoalDecomposer instance.

    Example::

        import yaml

        with open("configs/workers/summarizer.yaml") as f:
            summarizer_cfg = yaml.safe_load(f)
        with open("configs/workers/classifier.yaml") as f:
            classifier_cfg = yaml.safe_load(f)

        decomposer = GoalDecomposer.from_worker_configs(
            backend=ollama_backend,
            configs=[summarizer_cfg, classifier_cfg],
        )
    """
    workers = [
        WorkerDescriptor(
            name=cfg["name"],
            description=cfg.get("description", "No description provided."),
            input_schema=cfg.get("input_schema", {}),
            default_tier=cfg.get("default_model_tier", "standard"),
        )
        for cfg in configs
    ]
    return cls(backend=backend, workers=workers, **kwargs)

Synthesizer

synthesizer

Result aggregation for orchestrators.

Responsible for combining results from multiple workers into a coherent final output.

This module is used by OrchestratorActor (runner.py), NOT by PipelineOrchestrator (which simply collects stage outputs into a dict).

Two modes of operation:

1. **Simple merge** (no LLM backend required)
   Partitions results into succeeded/failed, aggregates outputs into a
   structured dict with metadata.  Fast, deterministic, zero cost.

2. **LLM synthesis** (requires an LLM backend + a goal string)
   Sends the collected worker outputs to an LLM with instructions to
   produce a coherent narrative synthesis.  Use this when the orchestrator
   needs to present a unified answer to the user rather than a bag of
   sub-results.
Design decisions
  • Partial failures are first-class: every output dict contains both succeeded and failed sections so callers never lose visibility into what went wrong.
  • The LLM synthesis prompt is kept internal to this module; callers only pass the goal string and the list of TaskResults.
  • Token-budget awareness: if the combined result text is very large, the synthesizer truncates individual outputs before sending them to the LLM to avoid blowing the context window.

ResultSynthesizer

ResultSynthesizer(backend: LLMBackend | None = None, max_output_chars: int = _MAX_OUTPUT_CHARS)

Combines multiple worker :class:TaskResult objects into a final output.

The synthesizer operates in one of two modes depending on how it is constructed and invoked:

Simple merge (default, no LLM): Call :meth:merge or call :meth:synthesize without a goal. Returns a structured dict with succeeded and failed sections plus aggregate metadata.

LLM synthesis (requires backend and a goal): Call :meth:synthesize with a goal string. The LLM receives the original goal, all worker outputs, and instructions to produce a unified answer.

Parameters

backend : LLMBackend | None An optional LLM backend (e.g. :class:OllamaBackend, :class:AnthropicBackend). When provided and a goal is passed to :meth:synthesize, the synthesizer will use the LLM to produce a coherent narrative. When None, only deterministic merge is available. max_output_chars : int Per-result character budget when building the LLM prompt. Outputs longer than this are truncated to avoid exceeding the model's context window. Defaults to :data:_MAX_OUTPUT_CHARS.

Example:

::

# Simple merge (no LLM)
synth = ResultSynthesizer()
merged = synth.merge(results)

# LLM synthesis
synth = ResultSynthesizer(backend=my_ollama_backend)
combined = await synth.synthesize(results, goal="Summarise the document")
Source code in src/heddle/orchestrator/synthesizer.py
def __init__(
    self,
    backend: LLMBackend | None = None,
    max_output_chars: int = _MAX_OUTPUT_CHARS,
) -> None:
    self._backend = backend
    self._max_output_chars = max_output_chars

merge

merge(results: list[TaskResult]) -> dict[str, Any]

Deterministic merge of task results — no LLM involved.

Partitions results into succeeded and failed groups, extracts their outputs (or errors), and returns a structured dict with aggregate metadata.

Parameters

results : list[TaskResult] Worker results to merge. May be empty.

Returns:

dict[str, Any] A dict with the following top-level keys:

- ``succeeded`` — list of dicts, each containing ``task_id``,
  ``worker_type``, ``output``, ``model_used``, and
  ``processing_time_ms`` for every completed result.
- ``failed`` — list of dicts, each containing ``task_id``,
  ``worker_type``, ``error``, and ``processing_time_ms`` for every
  failed result.
- ``in_flight`` — list of dicts, each containing ``task_id``,
  ``worker_type``, ``status``, and ``processing_time_ms`` for any
  task still in a non-terminal state.  Empty under the current
  dynamic-orchestrator caller (cc49783 converts pending tasks
  to synthetic ``FAILED`` placeholders before synthesis).
- ``metadata`` — aggregate statistics: ``total``, ``succeeded``,
  ``failed``, ``in_flight``, ``total_processing_time_ms``,
  ``models_used``, and ``total_tokens``.
Source code in src/heddle/orchestrator/synthesizer.py
def merge(self, results: list[TaskResult]) -> dict[str, Any]:
    """Deterministic merge of task results — no LLM involved.

    Partitions *results* into succeeded and failed groups, extracts their
    outputs (or errors), and returns a structured dict with aggregate
    metadata.

    Parameters
    ----------
    results : list[TaskResult]
        Worker results to merge.  May be empty.

    Returns:
    -------
    dict[str, Any]
        A dict with the following top-level keys:

        - ``succeeded`` — list of dicts, each containing ``task_id``,
          ``worker_type``, ``output``, ``model_used``, and
          ``processing_time_ms`` for every completed result.
        - ``failed`` — list of dicts, each containing ``task_id``,
          ``worker_type``, ``error``, and ``processing_time_ms`` for every
          failed result.
        - ``in_flight`` — list of dicts, each containing ``task_id``,
          ``worker_type``, ``status``, and ``processing_time_ms`` for any
          task still in a non-terminal state.  Empty under the current
          dynamic-orchestrator caller (cc49783 converts pending tasks
          to synthetic ``FAILED`` placeholders before synthesis).
        - ``metadata`` — aggregate statistics: ``total``, ``succeeded``,
          ``failed``, ``in_flight``, ``total_processing_time_ms``,
          ``models_used``, and ``total_tokens``.
    """
    parts = self._partition(results)
    succeeded = parts["succeeded"]
    failed = parts["failed"]
    in_flight = parts["in_flight"]

    succeeded_entries = [
        {
            "task_id": r.task_id,
            "worker_type": r.worker_type,
            "output": r.output,
            "model_used": r.model_used,
            "processing_time_ms": r.processing_time_ms,
        }
        for r in succeeded
    ]

    failed_entries = [
        {
            "task_id": r.task_id,
            "worker_type": r.worker_type,
            "error": r.error,
            "processing_time_ms": r.processing_time_ms,
        }
        for r in failed
    ]

    in_flight_entries = [
        {
            "task_id": r.task_id,
            "worker_type": r.worker_type,
            "status": r.status.value,
            "processing_time_ms": r.processing_time_ms,
        }
        for r in in_flight
    ]

    # Aggregate token usage across all results (succeeded or not).
    total_tokens: dict[str, int] = {}
    for r in results:
        for key, value in r.token_usage.items():
            total_tokens[key] = total_tokens.get(key, 0) + value

    # Collect distinct model identifiers (filter out None).
    models_used = sorted({r.model_used for r in results if r.model_used is not None})

    metadata = {
        "total": len(results),
        "succeeded": len(succeeded),
        "failed": len(failed),
        "in_flight": len(in_flight),
        "total_processing_time_ms": sum(r.processing_time_ms for r in results),
        "models_used": models_used,
        "total_tokens": total_tokens,
    }

    if in_flight:
        # Distinct event from merge_partial_failure: in-flight tasks
        # are *not* errors and a future caller passing live state
        # should be observable as a separate operator signal.
        logger.warning(
            "synthesizer.merge_with_in_flight",
            total=len(results),
            in_flight=len(in_flight),
            in_flight_workers=[r.worker_type for r in in_flight],
        )
    if failed:
        logger.warning(
            "synthesizer.merge_partial_failure",
            total=len(results),
            failed=len(failed),
            failed_workers=[r.worker_type for r in failed],
        )
    if not failed and not in_flight:
        logger.info(
            "synthesizer.merge_complete",
            total=len(results),
        )

    return {
        "succeeded": succeeded_entries,
        "failed": failed_entries,
        "in_flight": in_flight_entries,
        "metadata": metadata,
    }

synthesize async

synthesize(results: list[TaskResult], goal: str | None = None) -> dict[str, Any]

Combine worker results into a final coherent output.

If an LLM backend was provided at construction time and a goal string is supplied, the method delegates to :meth:_llm_synthesize which asks the LLM to produce a unified narrative. Otherwise it falls back to :meth:merge.

Parameters

results : list[TaskResult] Worker results to synthesize. May be empty (in which case the output will indicate that no results were available). goal : str | None The original high-level goal that spawned these tasks. Required for LLM synthesis mode; ignored in merge mode.

Returns:

dict[str, Any] In merge mode the return value is identical to :meth:merge.

In **LLM mode** the dict contains:

- ``synthesis`` — the LLM's coherent combined answer (str).
- ``confidence`` — ``"high"``, ``"medium"``, or ``"low"`` (str).
- ``conflicts`` — list of contradictions the LLM identified.
- ``gaps`` — list of missing information from failed tasks.
- ``succeeded`` / ``failed`` / ``metadata`` — same as merge mode.
- ``llm_metadata`` — model used and token counts for the synthesis
  call itself.
Source code in src/heddle/orchestrator/synthesizer.py
async def synthesize(
    self,
    results: list[TaskResult],
    goal: str | None = None,
) -> dict[str, Any]:
    """Combine worker results into a final coherent output.

    If an LLM backend was provided at construction time **and** a *goal*
    string is supplied, the method delegates to :meth:`_llm_synthesize`
    which asks the LLM to produce a unified narrative.  Otherwise it falls
    back to :meth:`merge`.

    Parameters
    ----------
    results : list[TaskResult]
        Worker results to synthesize.  May be empty (in which case the
        output will indicate that no results were available).
    goal : str | None
        The original high-level goal that spawned these tasks.  Required
        for LLM synthesis mode; ignored in merge mode.

    Returns:
    -------
    dict[str, Any]
        In **merge mode** the return value is identical to :meth:`merge`.

        In **LLM mode** the dict contains:

        - ``synthesis`` — the LLM's coherent combined answer (str).
        - ``confidence`` — ``"high"``, ``"medium"``, or ``"low"`` (str).
        - ``conflicts`` — list of contradictions the LLM identified.
        - ``gaps`` — list of missing information from failed tasks.
        - ``succeeded`` / ``failed`` / ``metadata`` — same as merge mode.
        - ``llm_metadata`` — model used and token counts for the synthesis
          call itself.
    """
    # Fast path: no results at all.
    if not results:
        logger.warning("synthesizer.no_results")
        return self.merge(results)

    # Decide mode.
    use_llm = self._backend is not None and goal is not None
    if not use_llm:
        return self.merge(results)

    return await self._llm_synthesize(results, goal)  # type: ignore[arg-type]

Checkpoint

checkpoint

Self-summarization checkpoint system for orchestrators.

The orchestrator's context is precious. This module compresses conversation history into structured state snapshots at defined intervals, allowing the orchestrator to "reboot" with a clean, compact understanding of where things stand.

Checkpoint trigger: when estimated token count exceeds threshold.

Storage: Pluggable via CheckpointStore (see orchestrator/store.py). Keys follow the pattern::

heddle:checkpoint:{goal_id}:{checkpoint_number}  — versioned checkpoint
heddle:checkpoint:{goal_id}:latest                — pointer to most recent

The orchestrator workflow with checkpoints::

1. Process goal, accumulate conversation_history
2. After each worker result: should_checkpoint(conversation_history)
3. If True: create_checkpoint() → compress state → persist to store
4. Orchestrator "reboots" with: system_prompt + format_for_injection(checkpoint)
   + last N interactions (recent_window_size)

This is conceptually similar to how Claude Code itself handles context compression — the key insight is the same: keep a structured summary + recent window rather than the full history.

This module is used by OrchestratorActor (runner.py).

PipelineOrchestrator does NOT use checkpoints because its sequential stage execution doesn't accumulate unbounded context.

Note: Token counting uses tiktoken with cl100k_base encoding (OpenAI's tokenizer). For Anthropic models, token counts are approximate (~10-15% estimation error). This is acceptable for checkpoint threshold decisions where exact counts are not critical.

CheckpointManager

CheckpointManager(store: KeyValueStore, token_threshold: int = 50000, recent_window_size: int = 5, encoding_name: str = 'cl100k_base', ttl_seconds: int = 86400)

Manages orchestrator state compression.

Workflow:

  1. After each worker result, estimate_tokens() checks context size
  2. If threshold exceeded, create_checkpoint() asks a summarizer to compress the current state
  3. The orchestrator restarts with: system_prompt + checkpoint + recent_window
Source code in src/heddle/orchestrator/checkpoint.py
def __init__(
    self,
    store: KeyValueStore,
    token_threshold: int = 50_000,  # Trigger checkpoint at this count
    recent_window_size: int = 5,  # Keep last N interactions in detail
    encoding_name: str = "cl100k_base",
    ttl_seconds: int = 86400,  # Key expiry (default: 24h)
) -> None:
    self.store = store
    self.token_threshold = token_threshold
    self.recent_window_size = recent_window_size
    self.encoder = tiktoken.get_encoding(encoding_name)
    self.ttl_seconds = ttl_seconds

estimate_tokens

estimate_tokens(text: str) -> int

Estimate token count for a string.

Source code in src/heddle/orchestrator/checkpoint.py
def estimate_tokens(self, text: str) -> int:
    """Estimate token count for a string."""
    return len(self.encoder.encode(text))

should_checkpoint

should_checkpoint(conversation_history: list[dict[str, Any]]) -> bool

Check if context has grown enough to trigger compression.

Source code in src/heddle/orchestrator/checkpoint.py
def should_checkpoint(self, conversation_history: list[dict[str, Any]]) -> bool:
    """Check if context has grown enough to trigger compression."""
    total = sum(self.estimate_tokens(json.dumps(msg)) for msg in conversation_history)
    return total > self.token_threshold

create_checkpoint async

create_checkpoint(goal_id: str, original_instruction: str, completed_tasks: list[dict[str, Any]], pending_tasks: list[dict[str, Any]], open_issues: list[str], decisions_made: list[str], checkpoint_number: int) -> CheckpointState

Build a checkpoint.

The orchestrator or a dedicated summarizer compresses current state into this structure.

Source code in src/heddle/orchestrator/checkpoint.py
async def create_checkpoint(
    self,
    goal_id: str,
    original_instruction: str,
    completed_tasks: list[dict[str, Any]],
    pending_tasks: list[dict[str, Any]],
    open_issues: list[str],
    decisions_made: list[str],
    checkpoint_number: int,
) -> CheckpointState:
    """Build a checkpoint.

    The orchestrator or a dedicated summarizer compresses current state
    into this structure.
    """
    # Build executive summary from completed task outcomes.
    # Only the last 20 tasks are included to keep the summary concise.
    # Of those, only the last 10 are rendered into the summary text.
    outcomes = []
    for t in completed_tasks[-20:]:
        status = t.get("status", "unknown")
        summary = t.get("summary", t.get("worker_type", "task"))
        outcomes.append(f"- [{status}] {summary}")

    executive_summary = (
        f"Goal: {original_instruction}\n"
        f"Progress: {len(completed_tasks)} completed, {len(pending_tasks)} pending.\n"
        f"Recent outcomes:\n" + "\n".join(outcomes[-10:])
    )

    total_tokens = self.estimate_tokens(executive_summary)

    checkpoint = CheckpointState(
        goal_id=goal_id,
        original_instruction=original_instruction,
        executive_summary=executive_summary,
        completed_tasks=[
            {
                "task_id": t["task_id"],
                "worker_type": t.get("worker_type"),
                "summary": t.get("summary", ""),
            }
            for t in completed_tasks
        ],
        pending_tasks=pending_tasks,
        open_issues=open_issues,
        decisions_made=decisions_made,
        context_token_count=total_tokens,
        checkpoint_number=checkpoint_number,
    )

    # Persist to store with configurable TTL (default 24h).
    # Long-running goals can increase ttl_seconds at construction time.
    prefix = domain_prefix("checkpoint")
    key = f"{prefix}{goal_id}:{checkpoint_number}"
    await self.store.set(key, checkpoint.model_dump_json(), self.ttl_seconds)

    # Maintain a "latest" pointer so load_latest() doesn't need to scan.
    await self.store.set(f"{prefix}{goal_id}:latest", key, self.ttl_seconds)

    logger.info(
        "checkpoint.created",
        goal_id=goal_id,
        checkpoint_number=checkpoint_number,
        token_count=total_tokens,
    )
    return checkpoint

load_latest async

load_latest(goal_id: str) -> CheckpointState | None

Load the most recent checkpoint for a goal.

Source code in src/heddle/orchestrator/checkpoint.py
async def load_latest(self, goal_id: str) -> CheckpointState | None:
    """Load the most recent checkpoint for a goal."""
    prefix = domain_prefix("checkpoint")
    latest_key = await self.store.get(f"{prefix}{goal_id}:latest")
    if not latest_key:
        return None
    data = await self.store.get(latest_key)
    if not data:
        return None
    return CheckpointState.model_validate_json(data)

format_for_injection

format_for_injection(checkpoint: CheckpointState) -> str

Format checkpoint as context to inject into a fresh orchestrator session.

This is what the orchestrator sees when it "wakes up" after a checkpoint.

Source code in src/heddle/orchestrator/checkpoint.py
def format_for_injection(self, checkpoint: CheckpointState) -> str:
    """Format checkpoint as context to inject into a fresh orchestrator session.

    This is what the orchestrator sees when it "wakes up" after a checkpoint.
    """
    sections = [
        f"=== CHECKPOINT #{checkpoint.checkpoint_number} ===",
        f"Original Goal: {checkpoint.original_instruction}",
        "",
        "--- Executive Summary ---",
        checkpoint.executive_summary,
        "",
        f"--- Decisions Made ({len(checkpoint.decisions_made)}) ---",
    ]
    sections.extend(f"  * {d}" for d in checkpoint.decisions_made)

    if checkpoint.open_issues:
        sections.append(f"\n--- Open Issues ({len(checkpoint.open_issues)}) ---")
        sections.extend(f"  ! {issue}" for issue in checkpoint.open_issues)

    sections.append(f"\n--- Pending Tasks ({len(checkpoint.pending_tasks)}) ---")
    sections.extend(f"  -> {t}" for t in checkpoint.pending_tasks)

    sections.append("\n=== END CHECKPOINT ===")
    return "\n".join(sections)

Store

store

Backward-compatibility shim.

This module originally defined CheckpointStore and InMemoryCheckpointStore. The store abstraction is now general-purpose and lives in :mod:heddle.core.kvstore. The old names continue to work as aliases for the new ones.

Migration: new code should import from heddle.core.kvstore directly. Existing imports from this module are kept working through the v0.x series and will be removed in v1.0.

InMemoryCheckpointStore

InMemoryCheckpointStore()

Bases: KeyValueStore

In-memory key-value store for tests and local development.

Values are stored in a dict with optional expiry timestamps. Expiry is checked lazily on get() — no background cleanup.

Source code in src/heddle/core/kvstore.py
def __init__(self) -> None:
    # Maps key -> (value, expires_at | None)
    self._data: dict[str, tuple[str, float | None]] = {}

set async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/heddle/core/kvstore.py
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    expires_at = time.monotonic() + ttl_seconds if ttl_seconds else None
    self._data[key] = (value, expires_at)

get async

get(key: str) -> str | None

Retrieve a value, or None if missing/expired.

Source code in src/heddle/core/kvstore.py
async def get(self, key: str) -> str | None:
    """Retrieve a value, or ``None`` if missing/expired."""
    entry = self._data.get(key)
    if entry is None:
        return None
    value, expires_at = entry
    if expires_at is not None and time.monotonic() > expires_at:
        del self._data[key]
        return None
    return value

set_if_not_exists async

set_if_not_exists(key: str, value: str, ttl_seconds: int) -> bool

Atomic set-if-absent. Lazily expires the existing entry first.

Single-event-loop atomicity is sufficient for the test/dev use cases this store targets; multi-process coordination requires :class:RedisKeyValueStore.

Source code in src/heddle/core/kvstore.py
async def set_if_not_exists(self, key: str, value: str, ttl_seconds: int) -> bool:
    """Atomic set-if-absent. Lazily expires the existing entry first.

    Single-event-loop atomicity is sufficient for the test/dev use
    cases this store targets; multi-process coordination requires
    :class:`RedisKeyValueStore`.
    """
    entry = self._data.get(key)
    if entry is not None:
        _v, expires_at = entry
        if expires_at is None or time.monotonic() <= expires_at:
            return False
        del self._data[key]
    self._data[key] = (value, time.monotonic() + ttl_seconds)
    return True

CheckpointStore

Bases: ABC

Abstract TTL-aware key-value store.

Implementations must handle:

  • String key-value storage.
  • TTL-based expiration (best-effort; lazy expiry is acceptable).
  • Returning None for missing or expired keys.

set abstractmethod async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/heddle/core/kvstore.py
@abstractmethod
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    ...

get abstractmethod async

get(key: str) -> str | None

Retrieve a value, or None if missing/expired.

Source code in src/heddle/core/kvstore.py
@abstractmethod
async def get(self, key: str) -> str | None:
    """Retrieve a value, or ``None`` if missing/expired."""
    ...

set_if_not_exists abstractmethod async

set_if_not_exists(key: str, value: str, ttl_seconds: int) -> bool

Atomically set key to value with TTL only if absent.

Returns True if the value was stored (caller "won the race"), False if the key already existed (caller lost). TTL is mandatory — the SETNX-without-TTL footgun (orphaned leases on crash) is explicitly disallowed.

Used by :func:heddle.contrib.events.lease.finalization_lease and any other coordination primitive that needs a single-writer guarantee with bounded recovery.

Source code in src/heddle/core/kvstore.py
@abstractmethod
async def set_if_not_exists(self, key: str, value: str, ttl_seconds: int) -> bool:
    """Atomically set ``key`` to ``value`` with TTL only if absent.

    Returns ``True`` if the value was stored (caller "won the
    race"), ``False`` if the key already existed (caller lost).
    TTL is mandatory — the SETNX-without-TTL footgun (orphaned
    leases on crash) is explicitly disallowed.

    Used by :func:`heddle.contrib.events.lease.finalization_lease`
    and any other coordination primitive that needs a single-writer
    guarantee with bounded recovery.
    """
    ...

aclose async

aclose() -> None

Release any I/O resources held by this store.

Subclasses that hold open client connections override this to close them. Idempotent — safe to call more than once. Default is a no-op.

Source code in src/heddle/core/kvstore.py
async def aclose(self) -> None:  # noqa: B027 — intentional no-op default
    """Release any I/O resources held by this store.

    Subclasses that hold open client connections override this to close
    them. Idempotent — safe to call more than once. Default is a no-op.
    """

scope

scope(prefix: str) -> ScopedKeyValueStore

Return a view that transparently prepends prefix to all keys.

The view implements the same KeyValueStore interface, so it nests cleanly — a scoped view of a scoped view concatenates the prefixes. aclose() on a scoped view is a no-op; the underlying store owns the connection.

Source code in src/heddle/core/kvstore.py
def scope(self, prefix: str) -> ScopedKeyValueStore:
    """Return a view that transparently prepends ``prefix`` to all keys.

    The view implements the same ``KeyValueStore`` interface, so it nests
    cleanly — a scoped view of a scoped view concatenates the prefixes.
    ``aclose()`` on a scoped view is a no-op; the underlying store owns
    the connection.
    """
    return ScopedKeyValueStore(self, prefix)

Result Stream

stream

Streaming result collection for orchestrators.

Provides ResultStream, an async iterator that yields TaskResult objects as they arrive from the message bus — rather than blocking until all results are collected.

Lifecycle (publish-before-subscribe race avoidance)::

stream = ResultStream(bus, subject, expected_ids, timeout)
async with stream:                # subscribes
    await dispatch_subtasks(...)  # safe to publish now
    results = await stream.collect_all()

Subscribing BEFORE the caller publishes any task whose result we expect is mandatory — NATS is at-most-once. If a fast worker publishes its result onto heddle.results.{goal_id} before we have an active subscription, that result is lost and the goal will time out. The async with block makes the ordering explicit at every call site.

Two consumption modes:

1. **Batch** (backward compatible with pre-Strategy-A code)::

       async with ResultStream(bus, subject, expected_ids, timeout) as stream:
           await dispatch(...)
           results = await stream.collect_all()

2. **Incremental** — enables progress callbacks and early exit::

       async with ResultStream(bus, subject, ids, timeout,
                               on_result=my_progress_callback) as stream:
           await dispatch(...)
           async for result in stream:
               # process each result as it arrives
               ...

The on_result callback is invoked for every arriving result with the signature (result, collected_count, expected_count) -> bool | None. Returning True signals early exit — the stream stops collecting and the caller gets whatever has arrived so far.

This module is used by:

  • OrchestratorActor._collect_results() — dynamic orchestrator
  • Potentially by MCPBridge for richer progress reporting (future)

Design decisions:

  • Subscribe-before-publish enforced: callers must use async with (or explicit start()). Iterating without entering the context raises a :class:RuntimeError rather than silently lazy-subscribing — the lazy form was the original race and is now treated as a bug.
  • Single-use: a ResultStream can only be iterated once (it owns the bus subscription lifecycle).
  • Callback errors are non-fatal: if on_result raises, the error is logged and collection continues.
  • Duplicate filtering: results for the same task_id are silently skipped (at-least-once delivery tolerance).
  • Unknown task_ids are ignored: only results matching expected_task_ids are collected.

ResultCallback

Bases: Protocol

Callback invoked when a result arrives during streaming collection.

Parameters

result : TaskResult The just-arrived result. collected : int How many results have been collected so far (including this one). expected : int Total number of expected results.

Returns:

bool | None Return True to signal early exit (stop collecting). Return None or False to continue.

ResultStream

ResultStream(bus: MessageBus, subject: str, expected_task_ids: set[str], timeout: float, *, on_result: ResultCallback | None = None)

Async iterator that yields TaskResult objects as they arrive.

Wraps a bus subscription for a specific result subject, filtering incoming messages to only those matching expected_task_ids.

The stream terminates when:

  • All expected results have arrived, OR
  • The timeout expires, OR
  • The on_result callback returns True (early exit), OR
  • The subscription is closed.

After iteration, inspect :attr:collected, :attr:timed_out, and :attr:early_exited for post-mortem state.

Parameters

bus : MessageBus The message bus to subscribe on. subject : str NATS subject to subscribe to (e.g. heddle.results.{goal_id}). expected_task_ids : set[str] Set of task_ids we expect results for. timeout : float Maximum seconds to wait for all results. on_result : ResultCallback | None Optional callback invoked as each result arrives.

Example:

::

stream = ResultStream(
    bus=nats_bus,
    subject=f"heddle.results.{goal_id}",
    expected_task_ids={"task-1", "task-2", "task-3"},
    timeout=60.0,
    on_result=my_progress_handler,
)

# Batch mode (drop-in replacement for old collect):
results = await stream.collect_all()

# Or streaming mode:
async for result in stream:
    print(f"Got {result.worker_type}: {result.status}")
Source code in src/heddle/orchestrator/stream.py
def __init__(
    self,
    bus: MessageBus,
    subject: str,
    expected_task_ids: set[str],
    timeout: float,
    *,
    on_result: ResultCallback | None = None,
) -> None:
    self._bus = bus
    self._subject = subject
    self._expected_ids = frozenset(expected_task_ids)
    self._timeout = timeout
    self._on_result = on_result

    # Mutable state — populated during iteration.
    self._collected: dict[str, TaskResult] = {}
    self._timed_out: bool = False
    self._early_exited: bool = False
    self._consumed: bool = False
    # Subscription lifecycle.  ``_sub`` is set by ``start()``/``__aenter__``
    # and cleared by ``aclose()``/``__aexit__``.  Iterating without
    # entering the context raises (see ``__aiter__``) — this is the
    # publish-before-subscribe race fix: callers MUST subscribe
    # before they publish any task whose result we expect.
    self._sub: Subscription | None = None

collected property

collected: dict[str, TaskResult]

Map of task_id → TaskResult for all collected results.

expected_count property

expected_count: int

Number of results we expect.

collected_count property

collected_count: int

Number of results collected so far.

all_collected property

all_collected: bool

True when every expected result has arrived.

timed_out property

timed_out: bool

True if collection ended due to timeout.

early_exited property

early_exited: bool

True if collection ended due to on_result callback signaling stop.

pending_ids property

pending_ids: frozenset[str]

Task IDs that were expected but never arrived.

start async

start() -> ResultStream

Subscribe to the bus subject.

MUST be called before the caller publishes any task whose result is expected on this subject — NATS is at-most-once. Prefer async with stream: (which calls start on __aenter__) to make the ordering explicit at the call site.

Idempotent — calling twice is an error. Returns self to allow stream = await ResultStream(...).start() if the caller prefers that style.

Source code in src/heddle/orchestrator/stream.py
async def start(self) -> ResultStream:
    """Subscribe to the bus subject.

    MUST be called before the caller publishes any task whose result
    is expected on this subject — NATS is at-most-once.  Prefer
    ``async with stream:`` (which calls ``start`` on ``__aenter__``)
    to make the ordering explicit at the call site.

    Idempotent — calling twice is an error.  Returns ``self`` to
    allow ``stream = await ResultStream(...).start()`` if the caller
    prefers that style.
    """
    if self._sub is not None:
        raise RuntimeError("ResultStream.start() called twice")
    self._sub = await self._bus.subscribe(self._subject)
    return self

aclose async

aclose() -> None

Release the subscription. Idempotent.

Safe to call from a finally block; the second call is a no-op.

Source code in src/heddle/orchestrator/stream.py
async def aclose(self) -> None:
    """Release the subscription.  Idempotent.

    Safe to call from a ``finally`` block; the second call is a
    no-op.
    """
    if self._sub is not None:
        sub = self._sub
        self._sub = None
        await sub.unsubscribe()

__aenter__ async

__aenter__() -> ResultStream

Subscribe so the caller can publish without losing fast results.

Source code in src/heddle/orchestrator/stream.py
async def __aenter__(self) -> ResultStream:
    """Subscribe so the caller can publish without losing fast results."""
    await self.start()
    return self

__aexit__ async

__aexit__(exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None) -> None

Release the subscription regardless of how the block exits.

Source code in src/heddle/orchestrator/stream.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Release the subscription regardless of how the block exits."""
    await self.aclose()

collect_all async

collect_all() -> list[TaskResult]

Consume the stream fully, returning all collected results as a list.

Must be called from inside an async with block (or after an explicit start()).

Source code in src/heddle/orchestrator/stream.py
async def collect_all(self) -> list[TaskResult]:
    """Consume the stream fully, returning all collected results as a list.

    Must be called from inside an ``async with`` block (or after an
    explicit ``start()``).
    """
    return [result async for result in self]

__aiter__

__aiter__() -> AsyncIterator[TaskResult]

Return the async iterator (self — delegates to _stream).

Source code in src/heddle/orchestrator/stream.py
def __aiter__(self) -> AsyncIterator[TaskResult]:
    """Return the async iterator (self — delegates to _stream)."""
    if self._sub is None:
        raise RuntimeError(
            "ResultStream must be started before iteration. "
            "Use 'async with stream:' or 'await stream.start()' "
            "BEFORE publishing tasks whose results you expect — "
            "subscribing afterwards loses any result that the worker "
            "publishes between dispatch and the first iteration."
        )
    if self._consumed:
        raise RuntimeError(
            "ResultStream has already been consumed. "
            "Create a new ResultStream for another iteration."
        )
    self._consumed = True
    return self._stream()