Skip to content

Worker

The heddle.worker package implements the two types of Heddle workers:

  • LLM Workers (runner.py) — call language models with system prompts, tool-use loops, and JSON parsing. Used for summarization, classification, extraction, and analysis tasks.
  • Processor Workers (processor.py) — run arbitrary Python code (no LLM). Used for data transformation, ingestion, and integration tasks.

Both types are stateless: they process one task, return a result, and reset.

See Building Workflows for the user-facing guide.

Base

Abstract base class for all workers (TaskWorker).

base

TaskWorker base class.

Extracts the reusable worker lifecycle from the LLM-specific worker: message parsing, I/O contract validation, result publishing, timing, and error handling. Subclasses implement process() to do the actual work.

TaskWorker

TaskWorker(actor_id: str, config_path: str, nats_url: str = 'nats://nats:4222')

Bases: BaseActor

Generic stateless worker base.

Lifecycle per message: 1. Receive TaskMessage 2. Validate input against worker contract 3. Delegate to process() (subclass implements) 4. Validate output against worker contract 5. Publish TaskResult 6. Reset (no state retained)

Source code in src/heddle/worker/base.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    nats_url: str = "nats://nats:4222",
) -> None:
    super().__init__(actor_id, nats_url)
    self._config_path = config_path
    self.config = self._load_config(config_path)

on_reload async

on_reload() -> None

Re-read the worker config from disk on reload signal.

Source code in src/heddle/worker/base.py
async def on_reload(self) -> None:
    """Re-read the worker config from disk on reload signal."""
    self.config = self._load_config(self._config_path)
    logger.info("worker.config_reloaded", config_path=self._config_path)

handle_message async

handle_message(data: dict[str, Any]) -> None

Handle an incoming task message through the full worker lifecycle.

Source code in src/heddle/worker/base.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """Handle an incoming task message through the full worker lifecycle."""
    task = TaskMessage(**data)
    start = time.monotonic()

    # Record the task arrival metric immediately after parse. Pairs with
    # ``record_task_completed`` in ``_publish_result``; subtracting one
    # from the other yields in-flight task depth per (worker_type, tier).
    record_task_received(
        worker_type=task.worker_type,
        model_tier=task.model_tier.value,
    )

    log = logger.bind(
        task_id=task.task_id,
        worker_type=task.worker_type,
        model_tier=task.model_tier.value,
    )

    # ``_elapsed_ms()`` is computed at every ``_publish_result`` call
    # site so the duration histogram captures worst-case latency on
    # failure paths too, not just the happy path.
    def _elapsed_ms() -> int:
        return int((time.monotonic() - start) * 1000)

    try:
        # 1. Validate input
        errors = validate_input(task.payload, self.config.get("input_schema", {}))
        if errors:
            await self._publish_result(
                task,
                TaskStatus.FAILED,
                error=f"Input validation: {errors}",
                elapsed=_elapsed_ms(),
            )
            return

        # 2. Delegate to subclass — inject model_tier into metadata
        #    so process() can resolve the correct LLM backend.
        enriched_metadata = {**task.metadata, "model_tier": task.model_tier.value}
        result = await self.process(task.payload, enriched_metadata)

        # 3. Validate output
        output = result["output"]
        output_errors = validate_output(output, self.config.get("output_schema", {}))
        if output_errors:
            await self._publish_result(
                task,
                TaskStatus.FAILED,
                error=f"Output validation: {output_errors}",
                model_used=result.get("model_used"),
                tokens=result.get("token_usage"),
                elapsed=_elapsed_ms(),
            )
            return

        # 4. Publish success
        elapsed = _elapsed_ms()
        await self._publish_result(
            task,
            TaskStatus.COMPLETED,
            output=output,
            model_used=result.get("model_used"),
            tokens=result.get("token_usage"),
            metadata=result.get("metadata"),
            elapsed=elapsed,
        )
        log.info("worker.completed", ms=elapsed)

    except Exception as e:
        log.error("worker.exception", error=str(e))
        await self._publish_result(task, TaskStatus.FAILED, error=str(e), elapsed=_elapsed_ms())
    finally:
        # Reset — worker holds NO state from this task.
        # This is a design invariant, not a suggestion. Any instance
        # variables set during process() must not affect subsequent
        # invocations.  ``finally`` ensures reset runs even when the
        # try-block exits early via ``return`` on input/output
        # validation failure, or when ``_publish_result`` raises.
        await self.reset()

process abstractmethod async

process(payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]

Process a task payload. Subclasses implement this.

Parameters:

Name Type Description Default
payload dict[str, Any]

Validated input dict (matches input_schema).

required
metadata dict[str, Any]

Task metadata dict (routing hints, pipeline context, etc.).

required

Returns:

Type Description
dict[str, Any]

A dict with the following structure::

{ "output": dict, # Must match output_schema "model_used": str | None, # Identifier for what processed this "token_usage": dict | None, # {"prompt_tokens": int, ...} or empty "metadata": dict | None, # Optional worker-side observability # (e.g. {"degraded_modes": [...]}) # surfaced on TaskResult.metadata }

Source code in src/heddle/worker/base.py
@abstractmethod
async def process(self, payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]:
    """Process a task payload. Subclasses implement this.

    Args:
        payload: Validated input dict (matches input_schema).
        metadata: Task metadata dict (routing hints, pipeline context, etc.).

    Returns:
        A dict with the following structure::

            {
                "output": dict,              # Must match output_schema
                "model_used": str | None,    # Identifier for what processed this
                "token_usage": dict | None,  # {"prompt_tokens": int, ...} or empty
                "metadata": dict | None,     # Optional worker-side observability
                                             #   (e.g. {"degraded_modes": [...]})
                                             #   surfaced on TaskResult.metadata
            }
    """
    ...

reset async

reset() -> None

Post-task cleanup hook for subclasses.

Called after every task (success or failure) to release temporary resources (file handles, caches, scratch buffers). The default implementation is a no-op. Override in subclasses that allocate per-task resources in process().

This is the enforcement point for the statelessness invariant: any state set during process() must be cleared here.

Source code in src/heddle/worker/base.py
async def reset(self) -> None:
    """Post-task cleanup hook for subclasses.

    Called after every task (success or failure) to release temporary
    resources (file handles, caches, scratch buffers). The default
    implementation is a no-op. Override in subclasses that allocate
    per-task resources in process().

    This is the enforcement point for the statelessness invariant:
    any state set during process() must be cleared here.
    """

Runner

LLMWorker — the main LLM worker actor. Includes execute_with_tools(), the standalone tool-use loop shared with the Workshop test bench.

runner

LLM worker actor.

Processes a single task via an LLM backend and resets. No state carries between tasks — this is enforced, not optional.

Supports tool-use: when knowledge_silos include tool-type entries, the worker offers those tools to the LLM and executes a multi-turn loop until the LLM produces a final text answer.

LLMWorker

LLMWorker(actor_id: str, config_path: str, backends: dict[str, LLMBackend], nats_url: str = 'nats://nats:4222')

Bases: TaskWorker

LLM-backed stateless worker.

Extends TaskWorker with LLM-specific logic: - Builds prompt from system_prompt + JSON payload - Loads knowledge silos (folder content → system prompt, tools → function-calling) - Calls the appropriate LLM backend by model tier - Executes multi-turn tool-use loop when tools are available - Parses JSON output from the LLM response (with fence-stripping fallback) - Applies silo_updates for writable folder silos

Source code in src/heddle/worker/runner.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    backends: dict[str, LLMBackend],
    nats_url: str = "nats://nats:4222",
) -> None:
    super().__init__(actor_id, config_path, nats_url)
    self.backends = backends

disconnect async

disconnect() -> None

Disconnect from the bus and close all owned LLM backends.

Backends are closed best-effort: a failure on one does not prevent the others from being closed, and bus disconnection runs first so an exception here cannot leak the NATS connection.

Source code in src/heddle/worker/runner.py
async def disconnect(self) -> None:
    """Disconnect from the bus and close all owned LLM backends.

    Backends are closed best-effort: a failure on one does not
    prevent the others from being closed, and bus disconnection
    runs first so an exception here cannot leak the NATS connection.
    """
    try:
        await super().disconnect()
    finally:
        for tier, backend in self.backends.items():
            try:
                await backend.aclose()
            except Exception as e:
                logger.warning(
                    "worker.backend_close_failed",
                    tier=tier,
                    backend=type(backend).__name__,
                    error=str(e),
                )

process async

process(payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]

Build prompt, call LLM with tool-use loop, and parse structured output.

Source code in src/heddle/worker/runner.py
async def process(  # noqa: PLR0915
    self, payload: dict[str, Any], metadata: dict[str, Any]
) -> dict[str, Any]:
    """Build prompt, call LLM with tool-use loop, and parse structured output."""
    # 1. Build prompt
    system_prompt = self.config["system_prompt"]

    # Degraded-mode accumulator: when an *optional* knowledge resource
    # (silo with ``required: false`` or source with same) fails to
    # load, the loaders append a {kind, name, reason} dict here and
    # we surface the list on TaskResult.metadata so callers can
    # detect "ran without resource X" without having to scrape logs.
    # Required failures raise RequiredKnowledgeMissingError, which
    # bubbles up to TaskWorker.handle_message and becomes a FAILED
    # task — that's the strict-by-default behaviour from F1.
    degraded: list[dict[str, Any]] = []

    # 1a. Knowledge silo injection — load folder silos into system prompt
    silos = self.config.get("knowledge_silos", [])
    if silos:
        from heddle.worker.knowledge import load_knowledge_silos

        silo_text = load_knowledge_silos(silos, skipped=degraded)
        if silo_text:
            system_prompt = silo_text + "\n\n" + system_prompt

    # 1b. Legacy knowledge injection — prepend loaded knowledge to system prompt
    knowledge_sources = self.config.get("knowledge_sources", [])
    if knowledge_sources:
        from heddle.worker.knowledge import load_knowledge_sources

        knowledge_text = load_knowledge_sources(knowledge_sources, skipped=degraded)
        if knowledge_text:
            system_prompt = knowledge_text + "\n\n" + system_prompt

    # 1c. File-ref resolution — read workspace files and inject content.
    #     Failures here MUST fail the task: silently continuing produced
    #     LLM output for inputs the worker never actually saw, which the
    #     orchestrator then trusted as "successful processing of file X".
    #     Re-raising lets ``TaskWorker.handle_message`` publish a FAILED
    #     ``TaskResult`` with the offending file path in ``error``.
    workspace_dir = self.config.get("workspace_dir")
    file_ref_fields = self.config.get("resolve_file_refs", [])
    if workspace_dir and file_ref_fields:
        from heddle.core.workspace import WorkspaceManager

        # Detach from the caller's payload before mutating: shallow-copy
        # and rebind locally so the ``_content`` keys we add never leak
        # back into the ``TaskMessage.payload`` the actor framework holds
        # (worker statelessness — adjacent tasks must not see each other's
        # resolved content).
        payload = dict(payload)
        ws = WorkspaceManager(workspace_dir)
        for field in file_ref_fields:
            if field in payload:
                try:
                    content = ws.read_json(payload[field])
                    payload[f"{field}_content"] = content
                except (ValueError, FileNotFoundError, json.JSONDecodeError) as e:
                    logger.error(
                        "worker.file_ref_resolution_failed",
                        field=field,
                        file_ref=payload[field],
                        error=str(e),
                    )
                    raise ValueError(
                        f"Failed to resolve file_ref '{payload[field]}' "
                        f"for field '{field}': {e}"
                    ) from e

    user_message = json.dumps(payload, indent=2)

    # 2. Load tool providers from knowledge_silos
    tool_providers = _load_tool_providers(silos, skipped=degraded)
    tool_defs = [p.get_definition() for p in tool_providers.values()] or None

    # 3. Resolve backend from task metadata or config default
    tier = metadata.get(
        "model_tier",
        self.config.get("default_model_tier", self.config.get("default_tier", "standard")),
    )
    backend = self.backends.get(tier)
    if not backend:
        raise RuntimeError(f"No backend for tier: {tier}")

    # 4. Call LLM with tool-use loop
    logger.info("worker.calling_llm", tier=tier, tools=len(tool_providers))
    max_tokens = self.config.get("max_output_tokens", 2000)
    tool_timeout = float(self.config.get("tool_timeout_seconds", DEFAULT_TOOL_TIMEOUT_SECONDS))
    if tool_timeout == 0 and tool_providers:
        logger.warning(
            "worker.tool_timeout_disabled",
            worker_type=self.config.get("name", "unknown"),
            hint="tool_timeout_seconds=0 disables per-tool bound",
        )
    result = await execute_with_tools(
        backend=backend,
        system_prompt=system_prompt,
        user_message=user_message,
        tool_providers=tool_providers,
        tool_defs=tool_defs,
        max_tokens=max_tokens,
        tool_timeout_seconds=tool_timeout,
    )
    total_prompt_tokens = result["prompt_tokens"]
    total_completion_tokens = result["completion_tokens"]

    # 4b. Log token usage for cost tracking.
    logger.info(
        "worker.llm_usage",
        worker_type=self.config.get("name", "unknown"),
        model_used=result.get("model", "unknown"),
        input_tokens=total_prompt_tokens,
        output_tokens=total_completion_tokens,
    )

    # 5. Parse JSON output — handles markdown fences and preamble text
    if result.get("content") is None:
        raise ValueError("LLM did not produce a text response after tool-use loop")

    output = _extract_json(result["content"])

    # 7. Process silo_updates — apply write-back to writable folder silos
    silo_updates = output.pop("silo_updates", None)
    if silo_updates:
        from heddle.worker.knowledge import apply_silo_updates

        apply_silo_updates(silo_updates, silos)

    return {
        "output": output,
        "model_used": result["model"],
        "token_usage": {
            "prompt_tokens": total_prompt_tokens,
            "completion_tokens": total_completion_tokens,
        },
        # Empty when no optional resource was skipped — keeps
        # TaskResult.metadata clean for the happy path.
        "metadata": {"degraded_modes": degraded} if degraded else {},
    }

execute_with_tools async

execute_with_tools(backend: LLMBackend, system_prompt: str, user_message: str, tool_providers: dict[str, ToolProvider], tool_defs: list[dict[str, Any]] | None, max_tokens: int = 2000, tool_timeout_seconds: float = DEFAULT_TOOL_TIMEOUT_SECONDS) -> dict[str, Any]

Execute an LLM call with multi-round tool-use loop.

This function encapsulates the core LLM interaction pattern used by both LLMWorker.process() and WorkerTestRunner.run(). It handles:

  • Initial LLM call with optional tool definitions
  • Multi-turn tool execution loop (up to MAX_TOOL_ROUNDS)
  • Per-tool execution timeout (default DEFAULT_TOOL_TIMEOUT_SECONDS)
  • Token count aggregation across rounds
  • Error handling for unknown tools and tool execution failures

Parameters:

Name Type Description Default
backend LLMBackend

LLM backend to call (Anthropic, Ollama, OpenAI-compatible).

required
system_prompt str

Full system prompt (with knowledge silo content prepended).

required
user_message str

User message (typically JSON payload).

required
tool_providers dict[str, ToolProvider]

Map of tool name → ToolProvider for execution.

required
tool_defs list[dict[str, Any]] | None

Tool definitions list for the LLM (or None for no tools).

required
max_tokens int

Maximum output tokens per LLM call.

2000
tool_timeout_seconds float

Per-tool execution timeout. 0 disables the bound (subject to MAX_TOOL_ROUNDS and the outer orchestrator timeout). Defaults to :data:DEFAULT_TOOL_TIMEOUT_SECONDS (30s).

DEFAULT_TOOL_TIMEOUT_SECONDS

Returns:

Type Description
dict[str, Any]

Dict with keys: content, model, prompt_tokens, completion_tokens,

dict[str, Any]

tool_calls, stop_reason. Token counts are aggregated across all

dict[str, Any]

tool-use rounds.

Source code in src/heddle/worker/runner.py
async def execute_with_tools(  # noqa: PLR0912, PLR0915
    backend: LLMBackend,
    system_prompt: str,
    user_message: str,
    tool_providers: dict[str, ToolProvider],
    tool_defs: list[dict[str, Any]] | None,
    max_tokens: int = 2000,
    tool_timeout_seconds: float = DEFAULT_TOOL_TIMEOUT_SECONDS,
) -> dict[str, Any]:
    """Execute an LLM call with multi-round tool-use loop.

    This function encapsulates the core LLM interaction pattern used by both
    ``LLMWorker.process()`` and ``WorkerTestRunner.run()``.  It handles:

    - Initial LLM call with optional tool definitions
    - Multi-turn tool execution loop (up to ``MAX_TOOL_ROUNDS``)
    - Per-tool execution timeout (default ``DEFAULT_TOOL_TIMEOUT_SECONDS``)
    - Token count aggregation across rounds
    - Error handling for unknown tools and tool execution failures

    Args:
        backend: LLM backend to call (Anthropic, Ollama, OpenAI-compatible).
        system_prompt: Full system prompt (with knowledge silo content prepended).
        user_message: User message (typically JSON payload).
        tool_providers: Map of tool name → ToolProvider for execution.
        tool_defs: Tool definitions list for the LLM (or None for no tools).
        max_tokens: Maximum output tokens per LLM call.
        tool_timeout_seconds: Per-tool execution timeout.  ``0`` disables
            the bound (subject to ``MAX_TOOL_ROUNDS`` and the outer
            orchestrator timeout).  Defaults to
            :data:`DEFAULT_TOOL_TIMEOUT_SECONDS` (30s).

    Returns:
        Dict with keys: content, model, prompt_tokens, completion_tokens,
        tool_calls, stop_reason.  Token counts are aggregated across all
        tool-use rounds.
    """
    with _tracer.start_as_current_span(
        "llm.call",
        attributes={"llm.max_tokens": max_tokens, "llm.has_tools": tool_defs is not None},
    ) as llm_span:
        result = await backend.complete(
            system_prompt=system_prompt,
            user_message=user_message,
            max_tokens=max_tokens,
            tools=tool_defs,
        )
        # Legacy attributes (backward compat)
        llm_span.set_attribute("llm.model", result.get("model", "unknown"))
        llm_span.set_attribute("llm.prompt_tokens", result.get("prompt_tokens", 0))
        llm_span.set_attribute("llm.completion_tokens", result.get("completion_tokens", 0))

        # OTel GenAI semantic conventions
        # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/
        llm_span.set_attribute("gen_ai.system", result.get("gen_ai_system", "unknown"))
        llm_span.set_attribute("gen_ai.request.model", result.get("gen_ai_request_model", ""))
        llm_span.set_attribute("gen_ai.response.model", result.get("gen_ai_response_model", ""))
        llm_span.set_attribute("gen_ai.usage.input_tokens", result.get("prompt_tokens", 0))
        llm_span.set_attribute("gen_ai.usage.output_tokens", result.get("completion_tokens", 0))
        if result.get("gen_ai_request_temperature") is not None:
            llm_span.set_attribute(
                "gen_ai.request.temperature", result["gen_ai_request_temperature"]
            )
        if result.get("gen_ai_request_max_tokens") is not None:
            llm_span.set_attribute("gen_ai.request.max_tokens", result["gen_ai_request_max_tokens"])

        # Optional content logging (opt-in via env var — may contain PII)
        if os.environ.get("HEDDLE_TRACE_CONTENT", "").lower() in ("1", "true"):
            llm_span.add_event(
                "gen_ai.content.prompt",
                {"gen_ai.prompt": user_message},
            )
            if result.get("content"):
                llm_span.add_event(
                    "gen_ai.content.completion",
                    {"gen_ai.completion": result["content"]},
                )

    total_prompt_tokens = result.get("prompt_tokens", 0)
    total_completion_tokens = result.get("completion_tokens", 0)
    messages: list[dict[str, Any]] | None = None
    rounds = 0

    while result.get("tool_calls") and rounds < MAX_TOOL_ROUNDS:
        rounds += 1
        logger.info("worker.tool_round", round=rounds, calls=len(result["tool_calls"]))

        # Build message history on first tool round
        if messages is None:
            messages = [{"role": "user", "content": user_message}]

        # Append assistant message with tool calls
        assistant_msg: dict[str, Any] = {"role": "assistant", "tool_calls": result["tool_calls"]}
        if result.get("content"):
            assistant_msg["content"] = result["content"]
        messages.append(assistant_msg)

        # Execute each tool call
        for call in result["tool_calls"]:
            tool_name = call["name"]
            provider = tool_providers.get(tool_name)
            if provider is None:
                tool_result = json.dumps({"error": f"Unknown tool: {tool_name}"})
                logger.warning("worker.unknown_tool", tool=tool_name)
            else:
                # Per-tool timeout — a misbehaving ToolProvider
                # (synchronous DuckDB query that hangs, a network
                # call without its own timeout, a deadlocked sync→
                # async bridge) would otherwise wedge the worker
                # until the outer orchestrator timeout fires.  We
                # wrap in ``asyncio.wait_for`` with the config-time
                # ``tool_timeout_seconds``; ``0`` disables (the
                # worker logged a WARN at startup if so).
                tool_started = time.monotonic()
                try:
                    if tool_timeout_seconds > 0:
                        tool_result = await asyncio.wait_for(
                            provider.execute(call["arguments"]),
                            timeout=tool_timeout_seconds,
                        )
                    else:
                        tool_result = await provider.execute(call["arguments"])
                except TimeoutError:
                    elapsed_ms = int((time.monotonic() - tool_started) * 1000)
                    tool_result = json.dumps(
                        {
                            "error": (
                                f"Tool '{tool_name}' exceeded {tool_timeout_seconds:.1f}s timeout"
                            ),
                        }
                    )
                    logger.warning(
                        "worker.tool.timeout",
                        tool=tool_name,
                        round=rounds,
                        timeout_seconds=tool_timeout_seconds,
                        elapsed_ms=elapsed_ms,
                    )
                except Exception as e:
                    tool_result = json.dumps({"error": str(e)})
                    logger.error("worker.tool_execution_failed", tool=tool_name, error=str(e))

            messages.append(
                {
                    "role": "tool",
                    "tool_call_id": call["id"],
                    "content": tool_result,
                }
            )

        # Call LLM again with updated message history
        with _tracer.start_as_current_span(
            "llm.tool_continuation",
            attributes={"llm.tool_round": rounds},
        ) as cont_span:
            result = await backend.complete(
                system_prompt=system_prompt,
                user_message=user_message,
                messages=messages,
                max_tokens=max_tokens,
                tools=tool_defs,
            )
            cont_span.set_attribute("llm.prompt_tokens", result.get("prompt_tokens", 0))
            cont_span.set_attribute("llm.completion_tokens", result.get("completion_tokens", 0))
        total_prompt_tokens += result.get("prompt_tokens", 0)
        total_completion_tokens += result.get("completion_tokens", 0)

    if rounds >= MAX_TOOL_ROUNDS:
        logger.warning("worker.max_tool_rounds_reached", rounds=rounds)

    # Return result with aggregated token counts
    result["prompt_tokens"] = total_prompt_tokens
    result["completion_tokens"] = total_completion_tokens
    return result

Backends

LLM backend implementations: AnthropicBackend, OllamaBackend, OpenAICompatibleBackend. Plus build_backends_from_env() for automatic backend detection from environment variables and ~/.heddle/config.yaml.

backends

LLM backend adapters — uniform interface for local and API models.

Each backend wraps a specific LLM provider's API and normalizes the response into a consistent dict format. Workers never call APIs directly; they always go through a backend.

To add a new backend
  1. Subclass LLMBackend
  2. Implement complete() returning the standard response dict
  3. Register it in cli/main.py's worker command (backend resolution by tier)

All backends use httpx with a 120s timeout. Adjust if your models are slow.

Tool-use support

Backends accept optional tools and messages parameters. When tools is provided, the LLM may return tool_calls instead of content. When messages is provided, it replaces the single user_message for multi-turn conversations (tool execution loop).

LLMBackend

Bases: ABC

Common interface all model backends implement.

complete abstractmethod async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request and return a normalized response dict.

Parameters:

Name Type Description Default
system_prompt str

System instructions for the LLM.

required
user_message str

User message (ignored when messages is provided).

required
max_tokens int

Maximum tokens in the response.

2000
temperature float

Sampling temperature.

0.0
tools list[dict[str, Any]] | None

Optional list of tool definitions for function-calling.

None
messages list[dict[str, Any]] | None

Optional full message history for multi-turn. When provided, overrides user_message.

None

Returns:

Type Description
dict[str, Any]

A dict with the following structure::

{ "content": str | None, # Text response (None if tool_calls) "model": str, # Model identifier "prompt_tokens": int, "completion_tokens": int, "tool_calls": list | None, # [{"id": str, "name": str, "arguments": dict}] "stop_reason": str | None, # "end_turn" | "tool_use" }

Source code in src/heddle/worker/backends.py
@abstractmethod
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request and return a normalized response dict.

    Args:
        system_prompt: System instructions for the LLM.
        user_message: User message (ignored when ``messages`` is provided).
        max_tokens: Maximum tokens in the response.
        temperature: Sampling temperature.
        tools: Optional list of tool definitions for function-calling.
        messages: Optional full message history for multi-turn. When
            provided, overrides ``user_message``.

    Returns:
        A dict with the following structure::

            {
                "content": str | None,      # Text response (None if tool_calls)
                "model": str,               # Model identifier
                "prompt_tokens": int,
                "completion_tokens": int,
                "tool_calls": list | None,  # [{"id": str, "name": str, "arguments": dict}]
                "stop_reason": str | None,  # "end_turn" | "tool_use"
            }
    """
    ...

aclose async

aclose() -> None

Release any I/O resources held by this backend.

Subclasses that hold open connections (e.g. an httpx.AsyncClient) override this to close them. Idempotent — safe to call more than once. The default is a no-op so test mocks and lightweight subclasses do not need to implement it.

Source code in src/heddle/worker/backends.py
async def aclose(self) -> None:  # noqa: B027 — intentional no-op default
    """Release any I/O resources held by this backend.

    Subclasses that hold open connections (e.g. an ``httpx.AsyncClient``)
    override this to close them. Idempotent — safe to call more than
    once. The default is a no-op so test mocks and lightweight
    subclasses do not need to implement it.
    """

AnthropicBackend

AnthropicBackend(api_key: str, model: str = 'claude-sonnet-4-20250514')

Bases: LLMBackend

Claude API via httpx (Messages API).

Uses the Anthropic Messages API directly via httpx rather than the anthropic Python SDK — this keeps dependencies minimal and avoids version coupling.

Source code in src/heddle/worker/backends.py
def __init__(self, api_key: str, model: str = "claude-sonnet-4-20250514") -> None:
    self.api_key = api_key
    self.model = model
    self.client = httpx.AsyncClient(
        base_url="https://api.anthropic.com",
        headers={
            "x-api-key": api_key,
            "anthropic-version": self.ANTHROPIC_API_VERSION,
            "content-type": "application/json",
        },
        timeout=120.0,
    )

complete async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request via the Anthropic Messages API.

Source code in src/heddle/worker/backends.py
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request via the Anthropic Messages API."""
    # Build messages array
    if messages is not None:
        api_messages = _anthropic_messages(messages)
    else:
        api_messages = [{"role": "user", "content": user_message}]

    body: dict[str, Any] = {
        "model": self.model,
        "max_tokens": max_tokens,
        "temperature": temperature,
        "system": system_prompt,
        "messages": api_messages,
    }

    # Add tool definitions if provided
    if tools:
        body["tools"] = [
            {
                "name": t["name"],
                "description": t.get("description", ""),
                "input_schema": t.get("parameters", {"type": "object"}),
            }
            for t in tools
        ]

    resp = await self.client.post("/v1/messages", json=body)
    resp.raise_for_status()
    data = resp.json()

    # Parse response — may contain text blocks, tool_use blocks,
    # and (when extended thinking is enabled by the caller)
    # ``thinking`` blocks.  Anthropic responses can interleave
    # text and tool_use blocks (e.g. text → tool_use → text);
    # accumulate text and tool_use into lists so nothing is
    # silently overwritten.
    #
    # ``thinking`` blocks land on the response dict's
    # ``thinking`` key (mirrors the OpenAI-compat
    # ``reasoning_content`` rescue pattern).  Heddle does not
    # enable extended thinking by default — the caller must
    # include ``thinking={"type": "enabled", "budget_tokens": N}``
    # in the request body, which the current API does not yet
    # expose — but surfacing the field forward-prepares the
    # field for the day a caller flips that switch.  See
    # https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking
    text_parts: list[str] = []
    thinking_parts: list[str] = []
    tool_calls = None

    for block in data.get("content", []):
        if block["type"] == "text":
            text_parts.append(block["text"])
        elif block["type"] == "tool_use":
            if tool_calls is None:
                tool_calls = []
            tool_calls.append(
                {
                    "id": block["id"],
                    "name": block["name"],
                    "arguments": block["input"],
                }
            )
        elif block["type"] == "thinking":
            thinking_parts.append(block.get("thinking", ""))

    # ``None`` when no text blocks were present preserves the
    # original contract (callers distinguish "no text" from "empty
    # text" — the OpenAI-compat backend returns None for tool-only
    # responses).
    content = "".join(text_parts) if text_parts else None
    thinking = "".join(thinking_parts) if thinking_parts else None

    return {
        "content": content,
        "thinking": thinking,
        "model": data["model"],
        "prompt_tokens": data["usage"]["input_tokens"],
        "completion_tokens": data["usage"]["output_tokens"],
        "tool_calls": tool_calls,
        "stop_reason": data.get("stop_reason"),
        # OTel GenAI semantic convention metadata
        "gen_ai_system": "anthropic",
        "gen_ai_request_model": self.model,
        "gen_ai_response_model": data["model"],
        "gen_ai_request_temperature": temperature,
        "gen_ai_request_max_tokens": max_tokens,
    }

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient.

Source code in src/heddle/worker/backends.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient``."""
    await self.client.aclose()

OllamaBackend

OllamaBackend(model: str = 'llama3.2:3b', base_url: str | None = None)

Bases: LLMBackend

Local models via Ollama HTTP API.

Default base_url points to K8s service name "ollama". For local dev, override with http://localhost:11434 (set OLLAMA_URL env var).

Note: Ollama's token counts (prompt_eval_count, eval_count) may be absent for some models; we default to 0 in that case.

Thinking-model quirk

Ollama serves several thinking-style models (qwen3, deepseek-r1, …) that emit their reasoning trace inline as <think>...</think> tags inside message.content. We currently pass the content through unmodified — the tags end up in the agent's response and downstream consumers (workers, judges) see them. For most council use cases this is fine; for strict-JSON workers it can corrupt output.

TODO(ollama-think-tags): expose a strip_think_tags=True constructor flag (or surface reasoning_content similarly to :class:OpenAICompatibleBackend) that splits <think>...</think> out of content and returns it on a separate response key. Most newer Ollama builds also support options.think: false (or model-specific chat_template_kwargs={"enable_thinking": false}) to disable the trace at request time — wire that through the same flag.

Source code in src/heddle/worker/backends.py
def __init__(
    self,
    model: str = "llama3.2:3b",
    base_url: str | None = None,
) -> None:
    # Honour ``OLLAMA_URL`` as a constructor-time default so the
    # backend agrees with the embedding provider's resolution
    # policy (``OllamaEmbeddingProvider`` already reads the env
    # var).  Explicit ``base_url=`` still wins.  Falls back to
    # the container-friendly default if neither is provided.
    self.model = model
    resolved_url = base_url or os.environ.get("OLLAMA_URL", "http://ollama:11434")
    self.client = httpx.AsyncClient(base_url=resolved_url, timeout=120.0)

complete async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request via the Ollama HTTP API.

Source code in src/heddle/worker/backends.py
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request via the Ollama HTTP API."""
    # Build messages array
    if messages is not None:
        api_messages = [
            {"role": "system", "content": system_prompt},
            *_ollama_messages(messages),
        ]
    else:
        api_messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ]

    body: dict[str, Any] = {
        "model": self.model,
        "messages": api_messages,
        "stream": False,
        "options": {"temperature": temperature, "num_predict": max_tokens},
    }

    # Add tool definitions if provided (OpenAI-compatible format)
    if tools:
        body["tools"] = [
            {
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("parameters", {"type": "object"}),
                },
            }
            for t in tools
        ]

    resp = await self.client.post("/api/chat", json=body)
    resp.raise_for_status()
    data = resp.json()

    # Parse tool calls from Ollama response
    message = data.get("message", {})
    content = message.get("content") or None
    tool_calls = None

    raw_calls = message.get("tool_calls")
    if raw_calls:
        tool_calls = []
        for i, call in enumerate(raw_calls):
            func = call.get("function", {})
            tool_calls.append(
                {
                    "id": f"call_{i}",
                    "name": func.get("name", ""),
                    "arguments": func.get("arguments", {}),
                }
            )

    stop_reason = "tool_use" if tool_calls else "end_turn"

    return {
        "content": content,
        "model": self.model,
        "prompt_tokens": data.get("prompt_eval_count", 0),
        "completion_tokens": data.get("eval_count", 0),
        "tool_calls": tool_calls,
        "stop_reason": stop_reason,
        # OTel GenAI semantic convention metadata
        "gen_ai_system": "ollama",
        "gen_ai_request_model": self.model,
        "gen_ai_response_model": self.model,
        "gen_ai_request_temperature": temperature,
        "gen_ai_request_max_tokens": max_tokens,
    }

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient.

Source code in src/heddle/worker/backends.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient``."""
    await self.client.aclose()

OpenAICompatibleBackend

OpenAICompatibleBackend(base_url: str, api_key: str = 'not-needed', model: str = 'default')

Bases: LLMBackend

Any OpenAI-compatible API (vLLM, llama.cpp server, LiteLLM, LM Studio, etc.).

The base_url may be the host (e.g. http://localhost:8000) or include a trailing /v1 (e.g. http://localhost:1234/v1) — the latter is normalized so that requests still hit /v1/chat/completions.

Source code in src/heddle/worker/backends.py
def __init__(self, base_url: str, api_key: str = "not-needed", model: str = "default") -> None:
    self.model = model
    # Strip a trailing ``/v1`` (and any stray trailing slash) so the
    # ``/v1/chat/completions`` path we POST to does not end up doubled.
    normalized = base_url.rstrip("/")
    if normalized.endswith("/v1"):
        normalized = normalized[: -len("/v1")]
    self.base_url = normalized
    self.client = httpx.AsyncClient(
        base_url=normalized,
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=120.0,
    )

complete async

complete(system_prompt: str, user_message: str, max_tokens: int = 2000, temperature: float = 0.0, *, tools: list[dict[str, Any]] | None = None, messages: list[dict[str, Any]] | None = None) -> dict[str, Any]

Complete an LLM request via an OpenAI-compatible API.

Source code in src/heddle/worker/backends.py
async def complete(
    self,
    system_prompt: str,
    user_message: str,
    max_tokens: int = 2000,
    temperature: float = 0.0,
    *,
    tools: list[dict[str, Any]] | None = None,
    messages: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
    """Complete an LLM request via an OpenAI-compatible API."""
    # Build messages array
    if messages is not None:
        api_messages = [
            {"role": "system", "content": system_prompt},
            *_openai_messages(messages),
        ]
    else:
        api_messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ]

    body: dict[str, Any] = {
        "model": self.model,
        "messages": api_messages,
        "max_tokens": max_tokens,
        "temperature": temperature,
    }

    # Add tool definitions if provided
    if tools:
        body["tools"] = [
            {
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("parameters", {"type": "object"}),
                },
            }
            for t in tools
        ]

    resp = await self.client.post("/v1/chat/completions", json=body)
    resp.raise_for_status()
    data = resp.json()
    usage = data.get("usage", {})

    # Parse response
    choice = data["choices"][0]
    message = choice.get("message", {})
    content = message.get("content")
    # Thinking-model quirk: several OpenAI-compatible providers
    # (LM Studio for qwen3.*/deepseek-r1, vLLM with a reasoning
    # parser, DeepSeek's first-party API) split the model's
    # chain-of-thought onto ``message.reasoning_content`` while
    # leaving ``message.content`` empty.  Naïve OpenAI parsers
    # treat the empty string as the model's reply and drop the
    # actual output on the floor.  We rescue it so workers
    # don't silently lose output, AND surface the raw value on
    # the response dict so callers can log or strip it.
    #
    # The rescued content is the ENTIRE thinking trace, which
    # is verbose internal monologue — not what an operator
    # usually wants displayed to end-users.  See
    # docs/TROUBLESHOOTING.md "Thinking model returns empty
    # content" for the available knobs (qwen ``/no_think`` /
    # ``enable_thinking: false``, deepseek-r1 prompt-side
    # disable, OpenAI ``reasoning_effort``, etc.).
    #
    # TODO(thinking-config): expose a ``disable_thinking=True``
    # constructor flag that maps to provider-appropriate
    # request params (``extra_body={"enable_thinking": False}``
    # for qwen via vLLM/LM Studio, ``reasoning_effort="low"``
    # for OpenAI o-series) so callers can opt out of the
    # reasoning trace at request time instead of paying for
    # tokens we then have to rescue.  The OpenAI Chat
    # Completions schema does not standardize this — it has to
    # be provider-specific.
    reasoning_content = message.get("reasoning_content") or None
    if not content and reasoning_content:
        content = reasoning_content
        # Operator-relevant signal: the model produced no
        # visible answer and we are surfacing its monologue as
        # a substitute.  Useful for spotting silently-broken
        # configs (max_tokens too low, system prompt
        # interfering, etc.) without paging on every successful
        # call.
        logger.info(
            "backend.reasoning_content.rescue",
            gen_ai_system=self.gen_ai_system,
            model=self.model,
            response_model=data.get("model"),
            completion_tokens=data.get("usage", {}).get("completion_tokens", 0),
            max_tokens=max_tokens,
            reasoning_chars=len(reasoning_content),
        )
    tool_calls = None

    raw_calls = message.get("tool_calls")
    if raw_calls:
        tool_calls = []
        for call in raw_calls:
            func = call.get("function", {})
            args = func.get("arguments", "{}")
            if isinstance(args, str):
                try:
                    args = json.loads(args)
                except json.JSONDecodeError:
                    args = {"_raw": args}
            tool_calls.append(
                {
                    "id": call.get("id", ""),
                    "name": func.get("name", ""),
                    "arguments": args,
                }
            )

    finish_reason = choice.get("finish_reason", "stop")
    stop_reason = "tool_use" if finish_reason == "tool_calls" else "end_turn"

    return {
        "content": content,
        "model": data.get("model", self.model),
        "prompt_tokens": usage.get("prompt_tokens", 0),
        "completion_tokens": usage.get("completion_tokens", 0),
        "tool_calls": tool_calls,
        "stop_reason": stop_reason,
        "reasoning_content": reasoning_content,
        # OTel GenAI semantic convention metadata
        "gen_ai_system": self.gen_ai_system,
        "gen_ai_request_model": self.model,
        "gen_ai_response_model": data.get("model", self.model),
        "gen_ai_request_temperature": temperature,
        "gen_ai_request_max_tokens": max_tokens,
    }

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient.

Source code in src/heddle/worker/backends.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient``."""
    await self.client.aclose()

LMStudioBackend

LMStudioBackend(model: str = 'default', base_url: str = 'http://localhost:1234/v1', api_key: str = 'not-needed')

Bases: OpenAICompatibleBackend

Local models via LM Studio's OpenAI-compatible API.

LM Studio runs a local HTTP server (default port 1234) that exposes an OpenAI-compatible /v1/chat/completions endpoint backed by its MLX or llama.cpp runtime. Any model loaded in LM Studio's UI is reachable through this backend.

Parameters:

Name Type Description Default
model str

Model identifier as shown by GET /v1/models (or "default" to use whichever model LM Studio routes to).

'default'
base_url str

LM Studio server URL. Both http://localhost:1234 and http://localhost:1234/v1 are accepted; the trailing /v1 is normalized away.

'http://localhost:1234/v1'
api_key str

Ignored by LM Studio but sent as a Bearer token for wire compatibility.

'not-needed'
Source code in src/heddle/worker/backends.py
def __init__(
    self,
    model: str = "default",
    base_url: str = "http://localhost:1234/v1",
    api_key: str = "not-needed",
) -> None:
    super().__init__(base_url=base_url, api_key=api_key, model=model)

build_backends_from_env

build_backends_from_env() -> dict[str, LLMBackend]

Build LLM backends from environment variables and ~/.heddle/config.yaml.

Resolution priority: env vars > config.yaml > built-in defaults.

Resolves available backends based on which env vars are set:

  • LM_STUDIO_URL → :class:LMStudioBackend for the local tier
  • LM_STUDIO_MODEL → Override LM Studio model (default: default)
  • OLLAMA_URL → :class:OllamaBackend for the local tier
  • OLLAMA_MODEL → Override Ollama model (default: llama3.2:3b)
  • HEDDLE_LOCAL_BACKEND"lmstudio" or "ollama" to pick explicitly when both URLs are set. Defaults to LM Studio.
  • ANTHROPIC_API_KEY → :class:AnthropicBackend for standard
  • frontier
  • FRONTIER_MODEL → Override frontier model (default: claude-opus-4-20250514)

Returns:

Type Description
dict[str, LLMBackend]

Dict mapping tier name → LLMBackend instance. May be empty if no

dict[str, LLMBackend]

environment variables are set.

Source code in src/heddle/worker/backends.py
def build_backends_from_env() -> dict[str, LLMBackend]:
    """Build LLM backends from environment variables and ``~/.heddle/config.yaml``.

    Resolution priority: env vars > config.yaml > built-in defaults.

    Resolves available backends based on which env vars are set:

    - ``LM_STUDIO_URL`` → :class:`LMStudioBackend` for the ``local`` tier
    - ``LM_STUDIO_MODEL`` → Override LM Studio model (default: ``default``)
    - ``OLLAMA_URL`` → :class:`OllamaBackend` for the ``local`` tier
    - ``OLLAMA_MODEL`` → Override Ollama model (default: ``llama3.2:3b``)
    - ``HEDDLE_LOCAL_BACKEND`` → ``"lmstudio"`` or ``"ollama"`` to pick
      explicitly when both URLs are set.  Defaults to LM Studio.
    - ``ANTHROPIC_API_KEY`` → :class:`AnthropicBackend` for ``standard``
      + ``frontier``
    - ``FRONTIER_MODEL`` → Override frontier model (default:
      ``claude-opus-4-20250514``)

    Returns:
        Dict mapping tier name → LLMBackend instance. May be empty if no
        environment variables are set.
    """
    # Load config.yaml defaults (best-effort; env vars still override).
    # Earlier this silently swallowed every exception, so a malformed
    # ~/.heddle/config.yaml produced an empty backends dict with no
    # operator-visible signal — "no backend for tier" errors looked
    # like missing env vars.  Log at WARN so the operator can grep
    # for it; missing-file is still allowed (load_config returns an
    # empty config when the file doesn't exist).
    try:
        from heddle.cli.config import apply_config_to_env, load_config

        config = load_config()
        apply_config_to_env(config)
    except Exception as exc:
        import structlog as _structlog

        _structlog.get_logger().warning(
            "backends.config_load_failed",
            error=str(exc),
            error_type=type(exc).__name__,
            hint=(
                "~/.heddle/config.yaml could not be loaded; falling back "
                "to environment variables only.  Fix the YAML or remove the "
                "file to silence this warning."
            ),
        )

    backends: dict[str, LLMBackend] = {}

    local_backend = _select_local_backend()
    if local_backend is not None:
        backends["local"] = local_backend

    if anthropic_key := os.getenv("ANTHROPIC_API_KEY"):
        backends["standard"] = AnthropicBackend(api_key=anthropic_key)
        backends["frontier"] = AnthropicBackend(
            api_key=anthropic_key,
            model=os.getenv("FRONTIER_MODEL", "claude-opus-4-20250514"),
        )

    return backends

Processor

ProcessorWorker and SyncProcessingBackend ABC for non-LLM workers. Includes serialize_writes option and BackendError hierarchy.

processor

Processor worker for non-LLM task processing.

ProcessorWorker delegates to a ProcessingBackend — any Python library, rules engine, or external tool that isn't an LLM. Examples: Docling for document extraction, ffmpeg for media, scikit-learn for classification.

This module also provides:

BackendError
    Base exception for processing backend failures. Backend
    implementations should subclass this (e.g., DoclingConversionError)
    to provide structured errors with the original cause preserved.

SyncProcessingBackend
    Base class for backends wrapping synchronous, CPU-bound libraries.
    Subclasses implement ``process_sync()`` which is automatically
    offloaded to a thread pool via ``asyncio.run_in_executor``.

BackendError

Bases: Exception

Base error for processing backend failures.

Backend implementations should raise subclasses of this to provide structured, domain-specific errors with the original cause preserved via __cause__.

Example::

class DoclingConversionError(BackendError):
    """Raised when Docling fails to convert a document."""

try:
    converter.convert(path)
except Exception as exc:
    raise DoclingConversionError(f"Failed: {exc}") from exc

ProcessingBackend

Bases: ABC

Generic processing backend interface for non-LLM workers.

Implementations wrap a specific tool or library (Docling, ffmpeg, etc.) and translate between Heddle's payload/output dicts and that tool's API.

process abstractmethod async

process(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Process a task payload.

Parameters:

Name Type Description Default
payload dict[str, Any]

Validated input dict from TaskMessage.

required
config dict[str, Any]

Full worker config dict (for backend-specific settings).

required

Returns:

Type Description
dict[str, Any]

A dict with the following structure::

{ "output": dict, # Structured output matching output_schema "model_used": str | None, # Identifier (e.g., "docling-v2", "ffmpeg-6.1") }

Source code in src/heddle/worker/processor.py
@abstractmethod
async def process(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Process a task payload.

    Args:
        payload: Validated input dict from TaskMessage.
        config: Full worker config dict (for backend-specific settings).

    Returns:
        A dict with the following structure::

            {
                "output": dict,           # Structured output matching output_schema
                "model_used": str | None, # Identifier (e.g., "docling-v2", "ffmpeg-6.1")
            }
    """
    ...

aclose async

aclose() -> None

Release any I/O resources held by this backend.

Subclasses that wrap stateful clients (DB connections, HTTP clients, dynamically-loaded ChatBridges) override this to close them. Idempotent — safe to call more than once. The default is a no-op.

Source code in src/heddle/worker/processor.py
async def aclose(self) -> None:  # noqa: B027 — intentional no-op default
    """Release any I/O resources held by this backend.

    Subclasses that wrap stateful clients (DB connections, HTTP
    clients, dynamically-loaded ChatBridges) override this to close
    them.  Idempotent — safe to call more than once.  The default
    is a no-op.
    """

SyncProcessingBackend

SyncProcessingBackend(*, serialize_writes: bool = False)

Bases: ProcessingBackend

Base class for backends wrapping synchronous, CPU-bound libraries.

Subclasses implement process_sync() instead of process(). The synchronous method is automatically offloaded to a thread pool via asyncio.run_in_executor so the async event loop stays responsive.

If serialize_writes=True, an asyncio.Lock ensures only one call to process_sync runs at a time. Use this for backends that write to single-writer stores like DuckDB.

Use this for backends that wrap libraries like Docling, ffmpeg, scikit-learn, or any other tool that performs blocking I/O or CPU-intensive computation.

Example::

class FFmpegBackend(SyncProcessingBackend):
    def process_sync(self, payload, config):
        # CPU-bound work — runs in thread pool automatically
        subprocess.run(["ffmpeg", ...])
        return {"output": {...}, "model_used": "ffmpeg"}
Source code in src/heddle/worker/processor.py
def __init__(self, *, serialize_writes: bool = False) -> None:
    self._write_lock: asyncio.Lock | None = asyncio.Lock() if serialize_writes else None

process_sync abstractmethod

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Process a task payload synchronously.

This method runs in a thread pool — do not use await here. Return format is identical to ProcessingBackend.process().

Parameters:

Name Type Description Default
payload dict[str, Any]

Validated input dict from TaskMessage.

required
config dict[str, Any]

Full worker config dict (for backend-specific settings).

required

Returns:

Type Description
dict[str, Any]

{"output": dict, "model_used": str | None}

Source code in src/heddle/worker/processor.py
@abstractmethod
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Process a task payload synchronously.

    This method runs in a thread pool — do not use ``await`` here.
    Return format is identical to ``ProcessingBackend.process()``.

    Args:
        payload: Validated input dict from TaskMessage.
        config: Full worker config dict (for backend-specific settings).

    Returns:
        ``{"output": dict, "model_used": str | None}``
    """
    ...

process async

process(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Offload process_sync() to a thread pool and return the result.

If serialize_writes was set, acquires the write lock first to ensure single-writer semantics (e.g., for DuckDB).

Source code in src/heddle/worker/processor.py
async def process(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Offload process_sync() to a thread pool and return the result.

    If ``serialize_writes`` was set, acquires the write lock first to
    ensure single-writer semantics (e.g., for DuckDB).
    """
    loop = asyncio.get_running_loop()
    if self._write_lock is not None:
        async with self._write_lock:
            return await loop.run_in_executor(None, self.process_sync, payload, config)
    return await loop.run_in_executor(None, self.process_sync, payload, config)

ProcessorWorker

ProcessorWorker(actor_id: str, config_path: str, backend: ProcessingBackend, nats_url: str = 'nats://nats:4222')

Bases: TaskWorker

Non-LLM stateless worker.

Delegates processing to a ProcessingBackend instead of an LLM. Follows the same lifecycle as LLMWorker: validate input, process, validate output, publish result.

Source code in src/heddle/worker/processor.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    backend: ProcessingBackend,
    nats_url: str = "nats://nats:4222",
) -> None:
    super().__init__(actor_id, config_path, nats_url)
    self.backend = backend

disconnect async

disconnect() -> None

Disconnect from the bus and close the owned processing backend.

Source code in src/heddle/worker/processor.py
async def disconnect(self) -> None:
    """Disconnect from the bus and close the owned processing backend."""
    try:
        await super().disconnect()
    finally:
        try:
            await self.backend.aclose()
        except Exception as e:
            logger.warning(
                "processor.backend_close_failed",
                backend=type(self.backend).__name__,
                error=str(e),
            )

process async

process(payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]

Delegate processing to the backend and return the result.

Source code in src/heddle/worker/processor.py
async def process(self, payload: dict[str, Any], metadata: dict[str, Any]) -> dict[str, Any]:
    """Delegate processing to the backend and return the result."""
    logger.info("processor.processing", backend=type(self.backend).__name__)
    result = await self.backend.process(payload, self.config)
    return {
        "output": result["output"],
        "model_used": result.get("model_used"),
        "token_usage": {},
    }

Tools

ToolProvider ABC and SyncToolProvider for LLM function-calling tools. Workers can expose tools that the LLM calls during processing (max 10 rounds).

tools

Tool provider abstraction for LLM function-calling.

Workers can offer tools to LLMs via their config's knowledge_silos key. Each tool-type silo specifies a provider class path (fully qualified, like processing_backend) and a config dict passed to the constructor.

Tool providers define what the LLM can call (via get_definition()) and execute the call when the LLM invokes it (via execute()).

Example config::

knowledge_silos:
  - name: "document_catalog"
    type: "tool"
    provider: "docman.tools.duckdb_view.DuckDBViewTool"
    config:
      db_path: "/tmp/docman-workspace/docman.duckdb"
      view_name: "document_summaries"

ToolProvider

Bases: ABC

A tool that can be offered to an LLM for function-calling.

Subclasses define the tool's JSON Schema definition and implement the execution logic. The LLMWorker manages the multi-turn loop: it passes tool definitions to the backend, receives tool_calls, dispatches to the appropriate provider, and feeds results back.

get_definition abstractmethod

get_definition() -> dict[str, Any]

Return the tool definition in standard JSON Schema format.

The returned dict must contain
  • name: Tool name (alphanumeric + underscores)
  • description: What the tool does (shown to LLM)
  • parameters: JSON Schema object for the tool's arguments

Example::

{
    "name": "search_documents",
    "description": "Full-text search over document catalog",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {"type": "string"},
            "limit": {"type": "integer", "default": 10},
        },
        "required": ["query"],
    },
}
Source code in src/heddle/worker/tools.py
@abstractmethod
def get_definition(self) -> dict[str, Any]:
    """Return the tool definition in standard JSON Schema format.

    The returned dict must contain:
        - ``name``: Tool name (alphanumeric + underscores)
        - ``description``: What the tool does (shown to LLM)
        - ``parameters``: JSON Schema object for the tool's arguments

    Example::

        {
            "name": "search_documents",
            "description": "Full-text search over document catalog",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                    "limit": {"type": "integer", "default": 10},
                },
                "required": ["query"],
            },
        }
    """
    ...

execute abstractmethod async

execute(arguments: dict[str, Any]) -> str

Execute the tool with LLM-provided arguments.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Parsed arguments matching the tool's parameters schema.

required

Returns:

Type Description
str

Result as a string (typically JSON). This is sent back to the LLM

str

as the tool result in the next turn.

Source code in src/heddle/worker/tools.py
@abstractmethod
async def execute(self, arguments: dict[str, Any]) -> str:
    """Execute the tool with LLM-provided arguments.

    Args:
        arguments: Parsed arguments matching the tool's parameters schema.

    Returns:
        Result as a string (typically JSON). This is sent back to the LLM
        as the tool result in the next turn.
    """
    ...

SyncToolProvider

Bases: ToolProvider

Convenience base for synchronous tool implementations.

Subclasses implement execute_sync() which is automatically offloaded to a thread pool. Use this for tools that wrap synchronous libraries (e.g., DuckDB queries, file I/O).

execute_sync abstractmethod

execute_sync(arguments: dict[str, Any]) -> str

Execute the tool synchronously. Runs in a thread pool.

Source code in src/heddle/worker/tools.py
@abstractmethod
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Execute the tool synchronously. Runs in a thread pool."""
    ...

execute async

execute(arguments: dict[str, Any]) -> str

Offload execute_sync() to a thread pool and return the result.

Source code in src/heddle/worker/tools.py
async def execute(self, arguments: dict[str, Any]) -> str:
    """Offload execute_sync() to a thread pool and return the result."""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self.execute_sync, arguments)

load_tool_provider

load_tool_provider(class_path: str, config: dict[str, Any]) -> ToolProvider

Import and instantiate a ToolProvider by fully qualified class path.

Follows the same dynamic-import pattern as _load_processing_backend in cli/main.py.

Parameters:

Name Type Description Default
class_path str

Dotted path like docman.tools.duckdb_view.DuckDBViewTool.

required
config dict[str, Any]

Dict of keyword arguments passed to the constructor.

required

Returns:

Type Description
ToolProvider

An instantiated ToolProvider.

Raises:

Type Description
ImportError

If the module cannot be imported.

AttributeError

If the class is not found in the module.

TypeError

If the class is not a ToolProvider subclass.

Source code in src/heddle/worker/tools.py
def load_tool_provider(class_path: str, config: dict[str, Any]) -> ToolProvider:
    """Import and instantiate a ToolProvider by fully qualified class path.

    Follows the same dynamic-import pattern as ``_load_processing_backend``
    in ``cli/main.py``.

    Args:
        class_path: Dotted path like ``docman.tools.duckdb_view.DuckDBViewTool``.
        config: Dict of keyword arguments passed to the constructor.

    Returns:
        An instantiated ToolProvider.

    Raises:
        ImportError: If the module cannot be imported.
        AttributeError: If the class is not found in the module.
        TypeError: If the class is not a ToolProvider subclass.
    """
    if "." not in class_path:
        raise ImportError(
            f"Tool provider '{class_path}' must be a fully qualified class path "
            f"(e.g., 'docman.tools.duckdb_view.DuckDBViewTool')"
        )

    module_path, class_name = class_path.rsplit(".", 1)
    module = importlib.import_module(module_path)

    tool_class = getattr(module, class_name, None)
    if tool_class is None:
        raise AttributeError(f"Tool class '{class_name}' not found in '{module_path}'")

    if not (isinstance(tool_class, type) and issubclass(tool_class, ToolProvider)):
        raise TypeError(f"'{class_path}' is not a ToolProvider subclass")

    return tool_class(**config)

Knowledge

Knowledge silo loading, injection into system prompts, and write-back. Supports read-only, read-write, and tool-based knowledge sources.

knowledge

Scoped knowledge/RAG loader for worker context injection.

Workers can have knowledge sources defined in their config YAML under a knowledge_sources key. This module loads those files and formats them for injection into the system prompt, giving workers domain-specific context.

Knowledge silos extend this with folder-based knowledge

knowledge_silos: - name: "classification_guides" type: "folder" path: "knowledge/classification/" permissions: "read" # "read" or "read_write"

Folder silos load all text files from a directory into the system prompt. Writable silos accept silo_updates from the LLM output to persist learned patterns (add/modify/delete files within the silo folder).

RequiredKnowledgeMissingError

RequiredKnowledgeMissingError(kind: str, name: str, reason: str)

Bases: Exception

Raised when a knowledge resource with required=True cannot be loaded.

Default for required is True (strict-by-default policy chosen during F-session scoping) — operators opt out per-resource with required: false when log-and-skip is what they actually want. The exception carries the resource kind, name, and reason so the worker layer can surface a clean error in the failed task result.

Source code in src/heddle/worker/knowledge.py
def __init__(self, kind: str, name: str, reason: str) -> None:
    super().__init__(f"Required {kind} {name!r} could not be loaded: {reason}")
    self.kind = kind
    self.name = name
    self.reason = reason

load_knowledge_sources

load_knowledge_sources(sources: list[dict[str, Any]], *, skipped: list[dict[str, Any]] | None = None) -> str

Load knowledge sources and format them for system prompt injection.

Each source has: - path: file path to the knowledge file - inject_as: "reference" (append to prompt) or "few_shot" (format as examples) - max_bytes (optional): per-source read cap override (default: 10 MiB) - required (optional, default True): when True, a missing or oversize file raises :class:RequiredKnowledgeMissingError; when False, the source is skipped with a warning and recorded to skipped if the accumulator was passed.

Source code in src/heddle/worker/knowledge.py
def load_knowledge_sources(
    sources: list[dict[str, Any]],
    *,
    skipped: list[dict[str, Any]] | None = None,
) -> str:
    """
    Load knowledge sources and format them for system prompt injection.

    Each source has:
    - path: file path to the knowledge file
    - inject_as: "reference" (append to prompt) or "few_shot" (format as examples)
    - max_bytes (optional): per-source read cap override (default: 10 MiB)
    - required (optional, default True): when True, a missing or
      oversize file raises :class:`RequiredKnowledgeMissingError`;
      when False, the source is skipped with a warning and recorded
      to ``skipped`` if the accumulator was passed.
    """
    sections = []

    for source in sources:
        path = Path(source["path"])
        inject_as = source.get("inject_as", "reference")
        max_bytes = source.get("max_bytes")
        required = _is_required(source)

        if not path.exists():
            if required:
                raise RequiredKnowledgeMissingError("knowledge_source", str(path), "file not found")
            logger.warning(
                "knowledge.source_not_found",
                path=str(path),
                inject_as=inject_as,
            )
            _record_skip(skipped, "knowledge_source", str(path), "not_found")
            continue

        try:
            enforce_file_size(path, max_bytes=max_bytes)
        except FileTooLargeError as exc:
            if required:
                raise RequiredKnowledgeMissingError(
                    "knowledge_source",
                    str(path),
                    f"exceeds size cap ({exc.size}/{exc.max_bytes} bytes)",
                ) from exc
            logger.warning(
                "knowledge.source_too_large",
                path=str(path),
                size=exc.size,
                max_bytes=exc.max_bytes,
            )
            _record_skip(skipped, "knowledge_source", str(path), "too_large")
            continue

        content = path.read_text()

        if inject_as == "reference":
            sections.append(f"\n--- Reference: {path.name} ---\n{content}")
        elif inject_as == "few_shot":
            sections.append(_format_few_shot(content, path.suffix))

    return "\n".join(sections)

load_knowledge_silos

load_knowledge_silos(silos: list[dict[str, Any]], *, skipped: list[dict[str, Any]] | None = None) -> str

Load folder-type silos and return formatted content for system prompt.

Only processes silos with type="folder". Tool-type silos are handled separately by the runner's _load_tool_providers().

Each silo accepts an optional required field (default True) — when True, a missing folder raises :class:RequiredKnowledgeMissingError; when False, the silo is skipped with a warning and recorded to skipped if the accumulator was passed.

Returns:

Type Description
str

Concatenated content from all folder silos, with section headers.

str

Empty string if no folder silos or no content found.

Source code in src/heddle/worker/knowledge.py
def load_knowledge_silos(
    silos: list[dict[str, Any]],
    *,
    skipped: list[dict[str, Any]] | None = None,
) -> str:
    """Load folder-type silos and return formatted content for system prompt.

    Only processes silos with ``type="folder"``. Tool-type silos are handled
    separately by the runner's ``_load_tool_providers()``.

    Each silo accepts an optional ``required`` field (default True) — when
    True, a missing folder raises :class:`RequiredKnowledgeMissingError`;
    when False, the silo is skipped with a warning and recorded to
    ``skipped`` if the accumulator was passed.

    Returns:
        Concatenated content from all folder silos, with section headers.
        Empty string if no folder silos or no content found.
    """
    sections: list[str] = []

    for silo in silos:
        if silo.get("type") != "folder":
            continue

        name = silo.get("name", "unnamed")
        path = Path(silo["path"])
        required = _is_required(silo)

        if not path.is_dir():
            if required:
                raise RequiredKnowledgeMissingError(
                    "knowledge_silo", name, f"folder not found: {path}"
                )
            logger.warning("knowledge.silo_folder_not_found", path=str(path), silo=name)
            _record_skip(skipped, "knowledge_silo", name, "folder_not_found")
            continue

        content = _load_folder_contents(path)
        if content:
            sections.append(f"--- Knowledge Silo: {name} ---\n{content}")

    return "\n\n".join(sections)

apply_silo_updates

apply_silo_updates(updates: list[dict[str, Any]], silos: list[dict[str, Any]]) -> None

Apply LLM-requested file modifications to writable folder silos.

Each update dict has
  • silo: Name of the target silo
  • action: "add" | "modify" | "delete"
  • filename: Target filename within the silo folder
  • content: File content (for add/modify actions)
Validates
  • Target silo exists and has permissions="read_write"
  • Filename has no path traversal (../)
  • Action is one of the allowed values
Source code in src/heddle/worker/knowledge.py
def apply_silo_updates(
    updates: list[dict[str, Any]],
    silos: list[dict[str, Any]],
) -> None:
    """Apply LLM-requested file modifications to writable folder silos.

    Each update dict has:
        - ``silo``: Name of the target silo
        - ``action``: ``"add"`` | ``"modify"`` | ``"delete"``
        - ``filename``: Target filename within the silo folder
        - ``content``: File content (for add/modify actions)

    Validates:
        - Target silo exists and has ``permissions="read_write"``
        - Filename has no path traversal (``../``)
        - Action is one of the allowed values
    """
    # Build lookup of writable folder silos
    writable: dict[str, Path] = {}
    for silo in silos:
        if silo.get("type") == "folder" and silo.get("permissions") == "read_write":
            writable[silo["name"]] = Path(silo["path"])

    for update in updates:
        silo_name = update.get("silo", "")
        action = update.get("action", "")
        filename = update.get("filename", "")
        content = update.get("content", "")

        # Validate silo is writable
        if silo_name not in writable:
            logger.warning(
                "knowledge.silo_update_denied",
                silo=silo_name,
                reason="not writable or not found",
            )
            continue

        # Validate filename — cheap early reject for obviously
        # malicious input.  Kept ahead of ``resolve_within`` so the
        # log distinguishes "raw filename looked traversal-y"
        # (``path traversal``) from "filename resolved outside the
        # silo via symlink or odd component combo"
        # (``path escapes silo``).  Two different operator signals.
        if ".." in filename or filename.startswith("/"):
            logger.warning(
                "knowledge.silo_update_denied",
                silo=silo_name,
                filename=filename,
                reason="path traversal",
            )
            continue

        folder = writable[silo_name]

        # Resolve under the silo root via the shared helper.  Catches
        # symlinks and any traversal the cheap check above missed.
        try:
            target = resolve_within(folder, filename)
        except ValueError:
            logger.warning(
                "knowledge.silo_update_denied",
                silo=silo_name,
                filename=filename,
                reason="path escapes silo",
            )
            continue

        if action == "add":
            target.parent.mkdir(parents=True, exist_ok=True)
            target.write_text(content, encoding="utf-8")
            logger.info("knowledge.silo_file_added", silo=silo_name, file=filename)

        elif action == "modify":
            if not target.exists():
                logger.warning(
                    "knowledge.silo_update_skipped",
                    silo=silo_name,
                    filename=filename,
                    reason="file not found for modify",
                )
                continue
            target.write_text(content, encoding="utf-8")
            logger.info("knowledge.silo_file_modified", silo=silo_name, file=filename)

        elif action == "delete":
            if target.exists():
                target.unlink()
                logger.info("knowledge.silo_file_deleted", silo=silo_name, file=filename)

        else:
            logger.warning(
                "knowledge.silo_update_skipped",
                silo=silo_name,
                action=action,
                reason="unknown action",
            )

Embeddings

EmbeddingProvider ABC and OllamaEmbeddingProvider for text embedding generation via Ollama's /api/embed endpoint.

embeddings

Embedding provider abstraction for vector generation.

Workers and tools generate vector embeddings from text via an :class:EmbeddingProvider. Two implementations ship with Heddle:

  • :class:OllamaEmbeddingProvider — Ollama's /api/embed endpoint.
  • :class:OpenAICompatibleEmbeddingProvider — any /v1/embeddings endpoint (LM Studio, OpenAI, vLLM, TEI, …).

Example usage::

provider = OllamaEmbeddingProvider(model="nomic-embed-text")
vector = await provider.embed("some text to embed")
vectors = await provider.embed_batch(["text 1", "text 2"])

# LM Studio (or any OpenAI-compatible /v1/embeddings server):
provider = OpenAICompatibleEmbeddingProvider(
    model="text-embedding-nomic-embed-text-v1.5",
    base_url="http://localhost:1234/v1",
)

Statelessness convention ~~~~~~~~~~~~~~~~~~~~~~~~

Providers are stateless by convention with respect to Heddle's TaskWorker lifecycle. They are injected into workers and reused across tasks, so they sit outside the per-task reset() boundary. The only instance state allowed on a concrete provider is values that are stable for the lifetime of the provider (e.g. cached _dimensions, derived once from the configured model and never changing for that provider instance). Caches that affect correctness — anything that could yield a different result depending on prior tasks — are not allowed here; promote such state to the worker, where the framework's stateless-worker invariant guarantees reset() between tasks.

Concretely: if a provider grows a state field, the safe shape is "set once in __init__ (or first-call, if derived from a remote probe) and never mutated by per-call code paths." A provider whose behaviour changes based on the sequence of calls it has seen is a silent bug waiting to surface in concurrent worker pools.

EmbeddingProvider

Bases: ABC

Common interface for generating vector embeddings from text.

Two parallel call paths are exposed: an async path (embed / embed_batch) used from async worker code, and a sync path (embed_sync / embed_batch_sync) used from sync code (RAG tools, processor workers, ProcessingBackend.process_sync).

Why both: callers running on a worker thread cannot safely fall back to asyncio.run(provider.embed(...)) in every harness. asyncio.run raises RuntimeError: cannot be called from a running event loop whenever the calling thread already has a live loop (TestClient flows, nested sync→async→sync chains, and some pytest-asyncio fixtures all surface this). Providing a real sync path eliminates the ambiguity — pick the method that matches your call site, no event-loop juggling required.

dimensions abstractmethod property

dimensions: int

Return the dimensionality of embeddings produced by this provider.

embed abstractmethod async

embed(text: str) -> list[float]

Return embedding vector for the given text.

Source code in src/heddle/worker/embeddings.py
@abstractmethod
async def embed(self, text: str) -> list[float]:
    """Return embedding vector for the given text."""
    ...

embed_batch abstractmethod async

embed_batch(texts: list[str]) -> list[list[float]]

Return embedding vectors for a batch of texts.

Source code in src/heddle/worker/embeddings.py
@abstractmethod
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Return embedding vectors for a batch of texts."""
    ...

embed_sync abstractmethod

embed_sync(text: str) -> list[float]

Synchronous counterpart to :meth:embed.

Implementations use a sync HTTP client so the call is safe from a worker thread regardless of whether the calling thread has a live event loop. Prefer the async path from async code; reach for the sync path only when the caller is itself sync.

Source code in src/heddle/worker/embeddings.py
@abstractmethod
def embed_sync(self, text: str) -> list[float]:
    """Synchronous counterpart to :meth:`embed`.

    Implementations use a sync HTTP client so the call is safe
    from a worker thread regardless of whether the calling
    thread has a live event loop.  Prefer the async path from
    async code; reach for the sync path only when the caller is
    itself sync.
    """
    ...

embed_batch_sync abstractmethod

embed_batch_sync(texts: list[str]) -> list[list[float]]

Synchronous counterpart to :meth:embed_batch.

Source code in src/heddle/worker/embeddings.py
@abstractmethod
def embed_batch_sync(self, texts: list[str]) -> list[list[float]]:
    """Synchronous counterpart to :meth:`embed_batch`."""
    ...

aclose async

aclose() -> None

Release any I/O resources held by this provider.

Subclasses that hold open connections (e.g. an httpx.AsyncClient and/or an httpx.Client) override this to close them. Idempotent — safe to call more than once. Default is a no-op.

Source code in src/heddle/worker/embeddings.py
async def aclose(self) -> None:  # noqa: B027 — intentional no-op default
    """Release any I/O resources held by this provider.

    Subclasses that hold open connections (e.g. an
    ``httpx.AsyncClient`` and/or an ``httpx.Client``) override this
    to close them. Idempotent — safe to call more than once.
    Default is a no-op.
    """

OllamaEmbeddingProvider

OllamaEmbeddingProvider(model: str = 'nomic-embed-text', base_url: str | None = None)

Bases: EmbeddingProvider

Generate embeddings via Ollama's /api/embed endpoint.

Uses the Ollama embedding API which supports both single and batch embedding generation. Dimensions are detected lazily from the first embedding call and cached.

Parameters:

Name Type Description Default
model str

Embedding model name (default: "nomic-embed-text").

'nomic-embed-text'
base_url str | None

Ollama server URL. Falls back to OLLAMA_URL env var, then "http://localhost:11434".

None
Source code in src/heddle/worker/embeddings.py
def __init__(
    self,
    model: str = "nomic-embed-text",
    base_url: str | None = None,
) -> None:
    self.model = model
    self.base_url = base_url or os.environ.get("OLLAMA_URL") or "http://localhost:11434"
    self._dimensions: int | None = None
    self._client = httpx.AsyncClient(base_url=self.base_url, timeout=120.0)
    # Sync client is created lazily on first ``embed_sync`` call —
    # async-only callers don't pay for it.
    self._sync_client: httpx.Client | None = None

dimensions property

dimensions: int

Return embedding dimensionality (detected from first call).

embed async

embed(text: str) -> list[float]

Generate embedding for a single text string.

Source code in src/heddle/worker/embeddings.py
async def embed(self, text: str) -> list[float]:
    """Generate embedding for a single text string."""
    resp = await self._client.post(
        "/api/embed",
        json={"model": self.model, "input": text},
    )
    resp.raise_for_status()
    data = resp.json()
    embedding = data["embeddings"][0]

    # Cache dimensions from first call
    if self._dimensions is None:
        self._dimensions = len(embedding)

    return embedding

embed_batch async

embed_batch(texts: list[str]) -> list[list[float]]

Generate embeddings for multiple texts in one call.

Ollama's /api/embed supports batch input via the input field accepting a list of strings.

Source code in src/heddle/worker/embeddings.py
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Generate embeddings for multiple texts in one call.

    Ollama's /api/embed supports batch input via the ``input`` field
    accepting a list of strings.
    """
    if not texts:
        return []

    resp = await self._client.post(
        "/api/embed",
        json={"model": self.model, "input": texts},
    )
    resp.raise_for_status()
    data = resp.json()
    embeddings = data["embeddings"]

    if self._dimensions is None and embeddings:
        self._dimensions = len(embeddings[0])

    return embeddings

embed_sync

embed_sync(text: str) -> list[float]

Synchronous single-text embedding via a sync httpx.Client.

Source code in src/heddle/worker/embeddings.py
def embed_sync(self, text: str) -> list[float]:
    """Synchronous single-text embedding via a sync ``httpx.Client``."""
    resp = self._get_sync_client().post(
        "/api/embed",
        json={"model": self.model, "input": text},
    )
    resp.raise_for_status()
    data = resp.json()
    embedding = data["embeddings"][0]

    if self._dimensions is None:
        self._dimensions = len(embedding)

    return embedding

embed_batch_sync

embed_batch_sync(texts: list[str]) -> list[list[float]]

Synchronous batch embedding via a sync httpx.Client.

Source code in src/heddle/worker/embeddings.py
def embed_batch_sync(self, texts: list[str]) -> list[list[float]]:
    """Synchronous batch embedding via a sync ``httpx.Client``."""
    if not texts:
        return []

    resp = self._get_sync_client().post(
        "/api/embed",
        json={"model": self.model, "input": texts},
    )
    resp.raise_for_status()
    data = resp.json()
    embeddings = data["embeddings"]

    if self._dimensions is None and embeddings:
        self._dimensions = len(embeddings[0])

    return embeddings

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient and sync httpx.Client.

Source code in src/heddle/worker/embeddings.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient`` and sync ``httpx.Client``."""
    await self._client.aclose()
    if self._sync_client is not None:
        self._sync_client.close()

OpenAICompatibleEmbeddingProvider

OpenAICompatibleEmbeddingProvider(model: str = 'text-embedding-nomic-embed-text-v1.5', base_url: str | None = None, api_key: str | None = None)

Bases: EmbeddingProvider

Generate embeddings via any OpenAI-compatible /v1/embeddings API.

Works with LM Studio, OpenAI, vLLM, Text Embeddings Inference (TEI), LiteLLM, and any other server that speaks the OpenAI embeddings schema. Batch input is sent as a list under input per the OpenAI spec; the server is expected to return data as a list of objects with an embedding field.

Parameters:

Name Type Description Default
model str

Embedding model name as exposed by the server's /v1/models endpoint (e.g. "text-embedding-nomic-embed-text-v1.5" for LM Studio, "text-embedding-3-small" for OpenAI).

'text-embedding-nomic-embed-text-v1.5'
base_url str | None

Server base URL. Both http://host:port and http://host:port/v1 are accepted; the trailing /v1 is normalized away. Falls back to LM_STUDIO_URL env var, then http://localhost:1234/v1.

None
api_key str | None

Sent as a Bearer token. LM Studio ignores it; OpenAI requires a real key. Falls back to OPENAI_API_KEY env.

None
Source code in src/heddle/worker/embeddings.py
def __init__(
    self,
    model: str = "text-embedding-nomic-embed-text-v1.5",
    base_url: str | None = None,
    api_key: str | None = None,
) -> None:
    self.model = model
    raw = base_url or os.environ.get("LM_STUDIO_URL") or "http://localhost:1234/v1"
    normalized = raw.rstrip("/")
    if normalized.endswith("/v1"):
        normalized = normalized[: -len("/v1")]
    self.base_url = normalized
    self._dimensions: int | None = None
    key = api_key or os.environ.get("OPENAI_API_KEY") or "not-needed"
    self._auth_header = {"Authorization": f"Bearer {key}"}
    self._client = httpx.AsyncClient(
        base_url=normalized,
        headers=self._auth_header,
        timeout=120.0,
    )
    # Sync client is created lazily on first ``embed_sync`` call.
    self._sync_client: httpx.Client | None = None

dimensions property

dimensions: int

Return embedding dimensionality (detected from first call).

embed async

embed(text: str) -> list[float]

Generate embedding for a single text string.

Source code in src/heddle/worker/embeddings.py
async def embed(self, text: str) -> list[float]:
    """Generate embedding for a single text string."""
    resp = await self._client.post(
        "/v1/embeddings",
        json={"model": self.model, "input": text},
    )
    resp.raise_for_status()
    data = resp.json()
    embedding = data["data"][0]["embedding"]

    if self._dimensions is None:
        self._dimensions = len(embedding)

    return embedding

embed_batch async

embed_batch(texts: list[str]) -> list[list[float]]

Generate embeddings for a batch of texts in one call.

Source code in src/heddle/worker/embeddings.py
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Generate embeddings for a batch of texts in one call."""
    if not texts:
        return []

    resp = await self._client.post(
        "/v1/embeddings",
        json={"model": self.model, "input": texts},
    )
    resp.raise_for_status()
    data = resp.json()
    # OpenAI spec: data is a list of {object, index, embedding}.
    # Sort by index to be safe (most servers return in order, but
    # the spec does not strictly guarantee it).
    items = sorted(data["data"], key=lambda d: d.get("index", 0))
    embeddings = [item["embedding"] for item in items]

    if self._dimensions is None and embeddings:
        self._dimensions = len(embeddings[0])

    return embeddings

embed_sync

embed_sync(text: str) -> list[float]

Synchronous single-text embedding via a sync httpx.Client.

Source code in src/heddle/worker/embeddings.py
def embed_sync(self, text: str) -> list[float]:
    """Synchronous single-text embedding via a sync ``httpx.Client``."""
    resp = self._get_sync_client().post(
        "/v1/embeddings",
        json={"model": self.model, "input": text},
    )
    resp.raise_for_status()
    data = resp.json()
    embedding = data["data"][0]["embedding"]

    if self._dimensions is None:
        self._dimensions = len(embedding)

    return embedding

embed_batch_sync

embed_batch_sync(texts: list[str]) -> list[list[float]]

Synchronous batch embedding via a sync httpx.Client.

Source code in src/heddle/worker/embeddings.py
def embed_batch_sync(self, texts: list[str]) -> list[list[float]]:
    """Synchronous batch embedding via a sync ``httpx.Client``."""
    if not texts:
        return []

    resp = self._get_sync_client().post(
        "/v1/embeddings",
        json={"model": self.model, "input": texts},
    )
    resp.raise_for_status()
    data = resp.json()
    # Same order-by-index defensiveness as the async path.
    items = sorted(data["data"], key=lambda d: d.get("index", 0))
    embeddings = [item["embedding"] for item in items]

    if self._dimensions is None and embeddings:
        self._dimensions = len(embeddings[0])

    return embeddings

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient and sync httpx.Client.

Source code in src/heddle/worker/embeddings.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient`` and sync ``httpx.Client``."""
    await self._client.aclose()
    if self._sync_client is not None:
        self._sync_client.close()