Skip to content

Contrib Modules

The heddle.contrib package contains optional integrations that extend Heddle's capabilities. Each module requires its own optional dependency extra.

Module Extra Purpose
contrib.council council Multi-round agent deliberation framework
contrib.chatbridge chatbridge External chat/LLM session adapters
contrib.docproc docproc Document extraction backends (PDF / DOCX / Markdown)
contrib.duckdb duckdb Embedded analytics and vector search
contrib.lancedb lancedb ANN vector search via LanceDB
contrib.redis redis Production checkpoint persistence
contrib.rag rag RAG pipeline — Telegram exports, CSV, plain text
contrib.subprocess (stdlib only) Wrap any CLI tool as a Heddle processor worker

See Council How-To for the council and chatbridge guide. See RAG How-To for the RAG pipeline guide.

Council — Multi-Round Deliberation

Run structured team discussions where multiple LLM agents debate iteratively with pluggable protocols and convergence detection.

runner

CouncilRunner — NATS-free council execution.

Runs a multi-round deliberation directly against LLM backends without requiring NATS, actors, or running infrastructure. This is the council equivalent of :class:heddle.workshop.test_runner.WorkerTestRunner.

Usage::

from heddle.worker.backends import build_backends_from_env
from heddle.contrib.council.config import load_council_config
from heddle.contrib.council.runner import CouncilRunner

config = load_council_config("configs/councils/example.yaml")
runner = CouncilRunner(build_backends_from_env())
result = await runner.run("Should we adopt microservices?", config=config)

CouncilRunner

CouncilRunner(backends: dict[str, LLMBackend] | None = None, config: CouncilConfig | None = None)

Execute a council discussion directly against LLM backends.

This replicates the multi-round deliberation loop without NATS. Each agent turn calls backend.complete() directly, builds a transcript entry, and feeds it into the next round.

Parameters:

Name Type Description Default
backends dict[str, LLMBackend] | None

Dict mapping tier name ("local", "standard", "frontier") to :class:LLMBackend instances.

None
config CouncilConfig | None

Optional default :class:CouncilConfig. Can be overridden per-call in :meth:run.

None
Source code in src/heddle/contrib/council/runner.py
def __init__(
    self,
    backends: dict[str, LLMBackend] | None = None,
    config: CouncilConfig | None = None,
) -> None:
    self.backends = backends or {}
    self._default_config = config
    self._active_transcript: TranscriptStore | None = None
    # Cache of ChatBridge instances keyed by agent name.  Each
    # agent with ``bridge`` set gets a dedicated bridge so per-
    # session conversation history is preserved across rounds.
    self._bridges: dict[str, ChatBridge] = {}

inject

inject(agent_name: str, content: str, role: str = 'audience') -> None

Inject a spectator interjection into the active discussion.

Safe to call from another thread or coroutine while :meth:run is executing. The interjection will appear in the next agent's context as an audience reaction.

Raises :class:RuntimeError if no discussion is active.

Source code in src/heddle/contrib/council/runner.py
def inject(
    self,
    agent_name: str,
    content: str,
    role: str = "audience",
) -> None:
    """Inject a spectator interjection into the active discussion.

    Safe to call from another thread or coroutine while :meth:`run`
    is executing.  The interjection will appear in the next agent's
    context as an audience reaction.

    Raises :class:`RuntimeError` if no discussion is active.
    """
    if self._active_transcript is None:
        msg = "No active council discussion — call run() first"
        raise RuntimeError(msg)
    self._active_transcript.inject_interjection(agent_name, content, role)

run async

run(topic: str, config: CouncilConfig | None = None, on_turn: Callable | None = None) -> CouncilResult

Run a full council deliberation.

Parameters:

Name Type Description Default
topic str

The discussion topic / question.

required
config CouncilConfig | None

Council config (overrides the constructor default).

None
on_turn Callable | None

Optional callback invoked after each agent's turn with the :class:TranscriptEntry. May be sync or async.

None

Returns:

Type Description
CouncilResult

class:CouncilResult with the full transcript, synthesis,

CouncilResult

convergence info, and token usage.

Source code in src/heddle/contrib/council/runner.py
async def run(  # noqa: PLR0915
    self,
    topic: str,
    config: CouncilConfig | None = None,
    on_turn: Callable | None = None,
) -> CouncilResult:
    """Run a full council deliberation.

    Args:
        topic: The discussion topic / question.
        config: Council config (overrides the constructor default).
        on_turn: Optional callback invoked after each agent's turn
            with the :class:`TranscriptEntry`.  May be sync or async.

    Returns:
        :class:`CouncilResult` with the full transcript, synthesis,
        convergence info, and token usage.
    """
    cfg = config or self._default_config
    if cfg is None:
        msg = "No council config provided"
        raise ValueError(msg)

    start = time.monotonic()
    log = logger.bind(council=cfg.name, topic=topic[:80])
    log.info("council.start", agents=len(cfg.agents), max_rounds=cfg.max_rounds)

    protocol = get_protocol(cfg.protocol)
    protocol.set_agents(cfg.agents)
    transcript = TranscriptStore()
    self._active_transcript = transcript
    convergence_backend = self.backends.get(cfg.convergence.backend_tier.value)
    detector = ConvergenceDetector(cfg.convergence, backend=convergence_backend)

    total_tokens: dict[str, int] = {"prompt_tokens": 0, "completion_tokens": 0}
    converged = False
    convergence_score: float | None = None
    rounds_completed = 0
    # Per-turn budget — same formula the orchestrator uses, validated
    # at config-load time against the 5s floor.  A wedged backend
    # would otherwise starve the runner indefinitely (the bug B1
    # closed: the orchestrator path enforced a budget via
    # ``dispatch_and_wait_for_result``, the runner path did not).
    per_turn_timeout = cfg.per_turn_timeout()

    for round_num in range(1, cfg.max_rounds + 1):
        round_log = log.bind(round=round_num)
        round_log.info("council.round.start")

        transcript.start_round(round_num)
        turns = protocol.get_turn_order(round_num, cfg.agents, transcript)

        for turn in turns:
            agent = turn.agent
            context = protocol.build_agent_context(agent, transcript, round_num, topic)

            entry = await self._execute_agent_turn(
                agent=agent,
                context=context,
                round_num=round_num,
                topic=topic,
                config=cfg,
                timeout_seconds=per_turn_timeout,
            )

            transcript.add_entry(entry)

            # Accumulate tokens — keep prompt vs completion split
            # so the final ``total_token_usage`` honours the
            # backend's accounting.  Earlier code put the whole
            # ``token_count`` (prompt + completion) into
            # ``prompt_tokens`` and never populated
            # ``completion_tokens``, so any billing /
            # rate-limiting layer downstream got a wrong picture.
            total_tokens["prompt_tokens"] += entry.prompt_tokens
            total_tokens["completion_tokens"] += entry.completion_tokens

            if on_turn is not None:
                result = on_turn(entry)
                if hasattr(result, "__await__"):
                    await result

            round_log.info(
                "council.agent_turn.done",
                agent=agent.name,
                model=entry.model_used,
                tokens=entry.token_count,
            )

        # Check convergence.
        conv_result = await detector.check(transcript, round_num, topic)
        transcript.set_convergence_score(round_num, conv_result.score)
        convergence_score = conv_result.score
        rounds_completed = round_num

        round_log.info(
            "council.round.done",
            convergence_score=conv_result.score,
            converged=conv_result.converged,
        )

        if conv_result.converged:
            converged = True
            log.info("council.converged", round=round_num, score=conv_result.score)
            break

    # Facilitator synthesis — budgeted so a wedged backend cannot
    # leave the runner without ever publishing a final result.
    # Mirrors the orchestrator's synthesis timeout contract.
    try:
        synthesis = await call_with_budget(
            self._synthesize(cfg, transcript, topic, total_tokens),
            timeout_seconds=cfg.synthesis_timeout_seconds,
            label="synthesis",
        )
    except CouncilTimeoutError as exc:
        log.warning(
            "council.synthesis.timeout",
            timeout_seconds=exc.timeout_seconds,
        )
        synthesis = f"[Synthesis timed out after {exc.timeout_seconds:.0f}s]"

    # Build agent summaries (latest position per agent).
    agent_summaries = transcript.get_latest_positions()

    elapsed_ms = int((time.monotonic() - start) * 1000)
    log.info(
        "council.done",
        rounds=rounds_completed,
        converged=converged,
        elapsed_ms=elapsed_ms,
    )

    self._active_transcript = None

    return CouncilResult(
        topic=topic,
        rounds_completed=rounds_completed,
        converged=converged,
        convergence_score=convergence_score,
        synthesis=synthesis,
        transcript=transcript.rounds,
        agent_summaries=agent_summaries,
        total_token_usage=total_tokens,
        elapsed_ms=elapsed_ms,
    )

aclose async

aclose() -> None

Close any cached :class:ChatBridge sessions.

ChatBridges may hold open httpx.AsyncClient connections. Call this when you are done with the runner to release them cleanly. Safe to call multiple times.

Source code in src/heddle/contrib/council/runner.py
async def aclose(self) -> None:
    """Close any cached :class:`ChatBridge` sessions.

    ChatBridges may hold open ``httpx.AsyncClient`` connections.
    Call this when you are done with the runner to release them
    cleanly.  Safe to call multiple times.
    """
    for bridge in self._bridges.values():
        close = getattr(bridge, "aclose", None) or getattr(bridge, "close", None)
        if close is None:
            continue
        try:
            result = close()
            if hasattr(result, "__await__"):
                await result
        except Exception as e:  # pragma: no cover — best-effort cleanup
            logger.warning(
                "council.bridge.close_failed",
                bridge=type(bridge).__name__,
                error=str(e),
            )
    self._bridges.clear()

config

Council configuration loading and validation.

Loads council YAML configs into typed :class:CouncilConfig models. Follows the same pattern as :func:heddle.core.config.load_config.

CouncilConfig

Bases: BaseModel

Top-level council configuration, loaded from YAML.

Two timeouts compose the overall budget:

  • timeout_seconds — total wall time for the council run.
  • synthesis_timeout_seconds — the budget reserved for the facilitator's synthesis call at the end. Subtracted from the total to compute the per-turn budget.

Why a separate synthesis budget: the original shape divided timeout_seconds evenly across max_rounds * len(agents) turns, leaving synthesis unbounded — which masked spurious timeouts (the synthesis could wedge indefinitely on a frontier model) and made per-turn budgeting fragile. Carving synthesis out makes both halves explicit and bounded.

per_turn_timeout

per_turn_timeout() -> float

Compute the per-turn timeout in seconds.

Centralised so the orchestrator and any future caller agree on the formula. Subtracts synthesis_timeout_seconds from the total budget and divides across rounds * agents.

Source code in src/heddle/contrib/council/config.py
def per_turn_timeout(self) -> float:
    """Compute the per-turn timeout in seconds.

    Centralised so the orchestrator and any future caller agree on
    the formula.  Subtracts ``synthesis_timeout_seconds`` from the
    total budget and divides across rounds * agents.
    """
    return (self.timeout_seconds - self.synthesis_timeout_seconds) / max(
        self.max_rounds * len(self.agents), 1
    )

load_council_config

load_council_config(path: str | Path) -> CouncilConfig

Load a council YAML config and return a validated model.

Raises:

Type Description
FileNotFoundError

If path does not exist.

ValidationError

If the YAML content is invalid.

Source code in src/heddle/contrib/council/config.py
def load_council_config(path: str | Path) -> CouncilConfig:
    """Load a council YAML config and return a validated model.

    Raises:
        FileNotFoundError: If *path* does not exist.
        pydantic.ValidationError: If the YAML content is invalid.
    """
    path = Path(path)
    with path.open() as f:
        raw = yaml.safe_load(f)
    return CouncilConfig(**raw)

schemas

Pydantic models for the council deliberation framework.

These models form the typed contract for multi-round agent discussions. They are used by :class:CouncilRunner (NATS-free execution), :class:CouncilOrchestrator (NATS-connected), and the MCP council bridge.

AgentConfig

Bases: BaseModel

Configuration for a single council agent.

Each agent is backed by either an existing Heddle worker (via worker_type) or an external chat bridge (via bridge). Exactly one must be set.

CouncilResult

Bases: BaseModel

Final output of a council deliberation.

ConvergenceResult

Bases: BaseModel

Result of a convergence check after a round.

TranscriptEntry

Bases: BaseModel

A single contribution within a discussion round.

entry_type distinguishes panelist turns from audience interjections. Agents may choose to engage with interjections or ignore them — the protocol presents them separately.

ChatBridge — External Chat Adapters

Session-aware adapters for Claude, OpenAI, Ollama, and human-in-the-loop participation. Each adapter maintains per-session conversation history.

base

ChatBridge ABC and shared data models.

All chat bridges implement :class:ChatBridge, which provides a session-aware interface for multi-turn conversations with external LLM providers or human participants.

ChatBridge

ChatBridge(system_prompt: str = '')

Bases: ABC

Abstract base for external chat session adapters.

Each bridge maintains per-session conversation history. The worker itself remains stateless (per Heddle invariants) — the state lives in the bridge's internal session dict or in the external provider's session.

Subclasses must implement :meth:send_turn, :meth:get_session_info, and :meth:close_session.

Source code in src/heddle/contrib/chatbridge/base.py
def __init__(self, system_prompt: str = "") -> None:
    self._sessions: dict[str, _Session] = {}
    self._system_prompt = system_prompt

send_turn abstractmethod async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a message and get a response.

Parameters:

Name Type Description Default
message str

The user message for this turn.

required
context dict[str, Any]

Additional context (round metadata, topic, etc.).

required
session_id str

Identifies the persistent conversation session.

required

Returns:

Type Description
ChatResponse

The assistant's response as a :class:ChatResponse.

Source code in src/heddle/contrib/chatbridge/base.py
@abstractmethod
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a message and get a response.

    Args:
        message: The user message for this turn.
        context: Additional context (round metadata, topic, etc.).
        session_id: Identifies the persistent conversation session.

    Returns:
        The assistant's response as a :class:`ChatResponse`.
    """

get_session_info abstractmethod async

get_session_info(session_id: str) -> SessionInfo

Return metadata about a session.

Source code in src/heddle/contrib/chatbridge/base.py
@abstractmethod
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return metadata about a session."""

close_session async

close_session(session_id: str) -> None

Clean up session state.

Source code in src/heddle/contrib/chatbridge/base.py
async def close_session(self, session_id: str) -> None:
    """Clean up session state."""
    self._sessions.pop(session_id, None)

aclose async

aclose() -> None

Release any I/O resources held by this bridge.

Subclasses that hold open connections (e.g. an httpx.AsyncClient) override this to close them. Idempotent — safe to call more than once. The default clears any in-memory session state; subclasses that override should call super() last so sessions are cleared after their resources are released.

Source code in src/heddle/contrib/chatbridge/base.py
async def aclose(self) -> None:
    """Release any I/O resources held by this bridge.

    Subclasses that hold open connections (e.g. an
    ``httpx.AsyncClient``) override this to close them.  Idempotent —
    safe to call more than once.  The default clears any in-memory
    session state; subclasses that override should call ``super()``
    last so sessions are cleared after their resources are released.
    """
    self._sessions.clear()

ChatResponse

Bases: BaseModel

Response from a chat bridge turn.

For "thinking" / chain-of-thought models served by some OpenAI-compatible providers (LM Studio, vLLM with reasoning models, DeepSeek, …), the visible answer may be empty while the model produces a long internal monologue. When that happens, bridges fall back to using the reasoning text as content so downstream consumers see something; the raw monologue is still available on reasoning_content for callers that want to surface it separately (or strip it from logs).

SessionInfo

Bases: BaseModel

Metadata about an active chat session.

anthropic

Anthropic (Claude) chat bridge — session-aware Claude API adapter.

Unlike :class:heddle.worker.backends.AnthropicBackend which is stateless per-call, this bridge accumulates messages per session, enabling multi-turn conversations with Claude.

AnthropicChatBridge

AnthropicChatBridge(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514', system_prompt: str = '', max_tokens: int = 2000)

Bases: ChatBridge

Claude API with per-session conversation history.

Parameters:

Name Type Description Default
api_key str | None

Anthropic API key. Falls back to ANTHROPIC_API_KEY env.

None
model str

Model identifier (default: claude-sonnet-4-20250514).

'claude-sonnet-4-20250514'
system_prompt str

System instructions applied to all sessions.

''
max_tokens int

Default max tokens per turn.

2000
Source code in src/heddle/contrib/chatbridge/anthropic.py
def __init__(
    self,
    api_key: str | None = None,
    model: str = "claude-sonnet-4-20250514",
    system_prompt: str = "",
    max_tokens: int = 2000,
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "")
    # Reject empty keys at construction so a misconfiguration
    # surfaces here rather than as a generic 401 on the first
    # send_turn call.  Earlier the bridge silently sent
    # ``x-api-key: `` (empty) to Anthropic.
    if not self._api_key:
        from heddle.contrib.chatbridge.exceptions import ChatBridgeMisconfiguredError

        raise ChatBridgeMisconfiguredError(
            "AnthropicChatBridge: api_key is empty. Pass api_key=... "
            "or set the ANTHROPIC_API_KEY env var."
        )
    self._model = model
    self._max_tokens = max_tokens
    self._client = httpx.AsyncClient(
        base_url="https://api.anthropic.com",
        headers={
            "x-api-key": self._api_key,
            "anthropic-version": self.ANTHROPIC_API_VERSION,
            "content-type": "application/json",
        },
        timeout=120.0,
    )

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a turn to Claude, accumulating session messages.

Session history is only updated after the API call returns. Earlier the user message was appended eagerly, so an HTTP failure left it in session.messages; the next turn then sent two consecutive user messages, which Claude rejects.

Source code in src/heddle/contrib/chatbridge/anthropic.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a turn to Claude, accumulating session messages.

    Session history is only updated after the API call returns.
    Earlier the user message was appended eagerly, so an HTTP
    failure left it in ``session.messages``; the next turn then
    sent two consecutive ``user`` messages, which Claude rejects.
    """
    session = self._get_or_create_session(session_id)
    # Build the request with the new user message included but
    # do not mutate ``session.messages`` until the call returns.
    user_msg = {"role": "user", "content": message}
    api_messages = [*session.messages, user_msg]

    body: dict[str, Any] = {
        "model": self._model,
        "max_tokens": self._max_tokens,
        "system": session.system_prompt or self._system_prompt,
        "messages": api_messages,
    }

    resp = await self._client.post("/v1/messages", json=body)
    resp.raise_for_status()
    data = resp.json()

    # Extract text content and ``thinking`` content from response
    # blocks separately.  Extended-thinking blocks land on
    # ``reasoning_content`` so consumers that surface
    # OpenAI-compat reasoning (``message.reasoning_content``) and
    # Anthropic extended thinking can read both via the same
    # ChatResponse field.
    text_parts: list[str] = []
    thinking_parts: list[str] = []
    for block in data.get("content", []):
        btype = block.get("type")
        if btype == "text":
            text_parts.append(block.get("text", ""))
        elif btype == "thinking":
            thinking_parts.append(block.get("thinking", ""))
    content = "".join(text_parts)
    reasoning = "".join(thinking_parts) if thinking_parts else None

    # Both messages persist together — only after the API call
    # succeeded — so a mid-call failure leaves session.messages
    # untouched (no dangling user message on retry).
    session.messages.append(user_msg)
    session.messages.append({"role": "assistant", "content": content})

    usage = data.get("usage", {})
    return ChatResponse(
        content=content,
        model=data.get("model", self._model),
        token_usage={
            "prompt_tokens": usage.get("input_tokens", 0),
            "completion_tokens": usage.get("output_tokens", 0),
        },
        stop_reason=data.get("stop_reason"),
        session_id=session_id,
        reasoning_content=reasoning,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/anthropic.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    info = SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model=self._model,
        message_count=len(session.messages) if session else 0,
    )
    if session:
        info.created_at = session.created_at
    return info

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient and clear sessions.

Source code in src/heddle/contrib/chatbridge/anthropic.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient`` and clear sessions."""
    await self._client.aclose()
    await super().aclose()

openai

OpenAI chat bridge — session-aware OpenAI/ChatGPT adapter.

Supports any OpenAI-compatible API (OpenAI, Azure OpenAI, etc.).

OpenAIChatBridge

OpenAIChatBridge(api_key: str | None = None, model: str = 'gpt-4o', base_url: str = 'https://api.openai.com', system_prompt: str = '', max_tokens: int = 2000)

Bases: ChatBridge

OpenAI Chat Completions API with per-session conversation history.

Parameters:

Name Type Description Default
api_key str | None

OpenAI API key. Falls back to OPENAI_API_KEY env.

None
model str

Model identifier (default: gpt-4o).

'gpt-4o'
base_url str

API base URL (default: OpenAI).

'https://api.openai.com'
system_prompt str

System instructions applied to all sessions.

''
max_tokens int

Default max tokens per turn.

2000
Source code in src/heddle/contrib/chatbridge/openai.py
def __init__(
    self,
    api_key: str | None = None,
    model: str = "gpt-4o",
    base_url: str = "https://api.openai.com",
    system_prompt: str = "",
    max_tokens: int = 2000,
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._api_key = api_key or os.environ.get("OPENAI_API_KEY", "")
    # Reject empty keys at construction so a misconfiguration
    # surfaces here rather than as a generic 401 on the first
    # send_turn call.  Earlier the bridge silently sent
    # ``Authorization: Bearer `` (empty) to OpenAI.
    if not self._api_key:
        from heddle.contrib.chatbridge.exceptions import ChatBridgeMisconfiguredError

        raise ChatBridgeMisconfiguredError(
            "OpenAIChatBridge: api_key is empty. Pass api_key=... "
            "or set the OPENAI_API_KEY env var."
        )
    self._model = model
    self._max_tokens = max_tokens
    self._client = httpx.AsyncClient(
        base_url=base_url,
        headers={
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json",
        },
        timeout=120.0,
    )

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a turn via OpenAI Chat Completions, accumulating history.

Session history is only updated after the API call returns successfully. Earlier the user message was appended eagerly, so an HTTP failure left it in the session and the next turn sent two consecutive user messages — OpenAI accepts that shape but produces confused output.

Source code in src/heddle/contrib/chatbridge/openai.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a turn via OpenAI Chat Completions, accumulating history.

    Session history is only updated after the API call returns
    successfully.  Earlier the user message was appended eagerly,
    so an HTTP failure left it in the session and the next turn
    sent two consecutive ``user`` messages — OpenAI accepts that
    shape but produces confused output.
    """
    session = self._get_or_create_session(session_id)
    # Build messages array with system prompt prepended; do not
    # mutate ``session.messages`` yet — only on success.
    user_msg = {"role": "user", "content": message}
    api_messages: list[dict[str, str]] = []
    sys_prompt = session.system_prompt or self._system_prompt
    if sys_prompt:
        api_messages.append({"role": "system", "content": sys_prompt})
    api_messages.extend(session.messages)
    api_messages.append(user_msg)

    body: dict[str, Any] = {
        "model": self._model,
        "messages": api_messages,
        "max_tokens": self._max_tokens,
    }

    resp = await self._client.post("/v1/chat/completions", json=body)
    resp.raise_for_status()
    data = resp.json()

    choice = data.get("choices", [{}])[0]
    # ``message`` is the local turn input; rebinding it from the
    # API response would shadow our own argument.  Use a distinct
    # name for the API-returned message dict so the local
    # ``message`` (and ``user_msg`` built from it) stay readable.
    api_message = choice.get("message", {})
    # D3: detect tool_calls and raise rather than returning an
    # empty assistant turn.  Some OpenAI-compatible providers
    # emit tool_calls with empty content; the bridge does not
    # currently implement tool execution, so silently returning
    # "" would make the agent appear to say nothing.  Raise a
    # typed error so the council loop (or other consumer) can
    # attribute the failure to the right cause.  Detect BEFORE
    # mutating ``session.messages`` so the bad turn doesn't
    # poison history on retry.
    if api_message.get("tool_calls"):
        from heddle.contrib.chatbridge.exceptions import UnsupportedToolUseError

        raise UnsupportedToolUseError(
            bridge="openai",
            model=self._model,
            tool_calls=api_message["tool_calls"],
        )
    content = api_message.get("content") or ""
    # Thinking-model quirk (mirrors OpenAICompatibleBackend in
    # heddle.worker.backends): some OpenAI-compatible providers
    # (LM Studio for qwen3.*/deepseek-r1, vLLM with a reasoning
    # parser, DeepSeek's first-party API) split the model's
    # chain-of-thought onto ``message.reasoning_content`` while
    # leaving ``message.content`` empty.  We rescue it so
    # callers don't get a silent empty string, and surface the
    # raw value on ``ChatResponse.reasoning_content`` so
    # operators can log or strip it.  See
    # docs/TROUBLESHOOTING.md "Thinking model returns empty
    # content" for provider knobs that disable the trace at
    # request time.
    #
    # TODO(thinking-config): expose a ``disable_thinking=True``
    # constructor flag (paired with the matching backend
    # parameter) that maps to provider-specific request params
    # — qwen ``extra_body={"enable_thinking": False}`` via
    # LM Studio / vLLM, OpenAI ``reasoning_effort="low"``, etc.
    # See the equivalent TODO in OpenAICompatibleBackend.
    reasoning_content = api_message.get("reasoning_content") or None
    if not content and reasoning_content:
        content = reasoning_content
        logger.info(
            "chatbridge.reasoning_content.rescue",
            bridge_type=self.bridge_type,
            model=self._model,
            response_model=data.get("model"),
            completion_tokens=data.get("usage", {}).get("completion_tokens", 0),
            max_tokens=self._max_tokens,
            reasoning_chars=len(reasoning_content),
        )

    # Both messages persist together — only after the API call
    # succeeded and the tool_calls check passed — so a mid-call
    # failure leaves session.messages untouched.
    session.messages.append(user_msg)
    session.messages.append({"role": "assistant", "content": content})

    usage = data.get("usage", {})
    return ChatResponse(
        content=content,
        model=data.get("model", self._model),
        token_usage={
            "prompt_tokens": usage.get("prompt_tokens", 0),
            "completion_tokens": usage.get("completion_tokens", 0),
        },
        stop_reason=choice.get("finish_reason"),
        session_id=session_id,
        reasoning_content=reasoning_content,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/openai.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    info = SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model=self._model,
        message_count=len(session.messages) if session else 0,
    )
    if session:
        info.created_at = session.created_at
    return info

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient and clear sessions.

Source code in src/heddle/contrib/chatbridge/openai.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient`` and clear sessions."""
    await self._client.aclose()
    await super().aclose()

ollama

Ollama chat bridge — session-aware local model adapter.

Wraps the Ollama /api/chat endpoint with per-session conversation history for multi-turn local model interactions.

OllamaChatBridge

OllamaChatBridge(model: str = 'llama3.2:3b', base_url: str = 'http://localhost:11434', system_prompt: str = '', max_tokens: int = 2000)

Bases: ChatBridge

Ollama chat API with per-session conversation history.

Parameters:

Name Type Description Default
model str

Ollama model name (default: llama3.2:3b).

'llama3.2:3b'
base_url str

Ollama server URL (default: http://localhost:11434).

'http://localhost:11434'
system_prompt str

System instructions applied to all sessions.

''
max_tokens int

Default max tokens per turn (num_predict).

2000
Source code in src/heddle/contrib/chatbridge/ollama.py
def __init__(
    self,
    model: str = "llama3.2:3b",
    base_url: str = "http://localhost:11434",
    system_prompt: str = "",
    max_tokens: int = 2000,
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._model = model
    self._max_tokens = max_tokens
    self._client = httpx.AsyncClient(
        base_url=base_url,
        timeout=120.0,
    )

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a turn via Ollama /api/chat, accumulating history.

Session history is only updated after the API call returns. Earlier eagerness left a dangling user message on HTTP failure; on retry the next turn sent two consecutive user messages, which different Ollama backends handled inconsistently.

Source code in src/heddle/contrib/chatbridge/ollama.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a turn via Ollama /api/chat, accumulating history.

    Session history is only updated after the API call returns.
    Earlier eagerness left a dangling user message on HTTP
    failure; on retry the next turn sent two consecutive user
    messages, which different Ollama backends handled
    inconsistently.
    """
    session = self._get_or_create_session(session_id)
    # Build messages array with system prompt prepended; do not
    # mutate ``session.messages`` yet — only on success.
    user_msg = {"role": "user", "content": message}
    api_messages: list[dict[str, str]] = []
    sys_prompt = session.system_prompt or self._system_prompt
    if sys_prompt:
        api_messages.append({"role": "system", "content": sys_prompt})
    api_messages.extend(session.messages)
    api_messages.append(user_msg)

    body: dict[str, Any] = {
        "model": self._model,
        "messages": api_messages,
        "stream": False,
        "options": {"num_predict": self._max_tokens},
    }

    resp = await self._client.post("/api/chat", json=body)
    resp.raise_for_status()
    data = resp.json()

    content = data.get("message", {}).get("content", "")

    # Both messages persist together — only after the API call
    # succeeded — so a mid-call failure leaves session.messages
    # untouched.
    session.messages.append(user_msg)
    session.messages.append({"role": "assistant", "content": content})

    # Ollama provides token counts differently.
    prompt_tokens = data.get("prompt_eval_count", 0)
    completion_tokens = data.get("eval_count", 0)

    return ChatResponse(
        content=content,
        model=data.get("model", self._model),
        token_usage={
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
        },
        stop_reason="stop" if data.get("done") else None,
        session_id=session_id,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/ollama.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    info = SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model=self._model,
        message_count=len(session.messages) if session else 0,
    )
    if session:
        info.created_at = session.created_at
    return info

aclose async

aclose() -> None

Close the underlying httpx.AsyncClient and clear sessions.

Source code in src/heddle/contrib/chatbridge/ollama.py
async def aclose(self) -> None:
    """Close the underlying ``httpx.AsyncClient`` and clear sessions."""
    await self._client.aclose()
    await super().aclose()

manual

Manual chat bridge — human-in-the-loop adapter.

Allows a human participant to join a council discussion or other multi-agent flow. Two modes:

1. **Callback mode** — provide an ``on_prompt`` async callable that
   receives the context and returns a response string.
2. **Queue mode** — prompts are put onto an ``asyncio.Queue``, and
   responses are awaited from a separate response queue.

Both modes enforce a timeout to prevent indefinite blocking.

ManualChatBridge

ManualChatBridge(on_prompt: Callable[[str, dict, str], Awaitable[str]] | None = None, prompt_queue: Queue | None = None, response_queue: Queue | None = None, timeout_seconds: float = 300.0, system_prompt: str = '')

Bases: ChatBridge

Human-in-the-loop chat bridge.

Parameters:

Name Type Description Default
on_prompt Callable[[str, dict, str], Awaitable[str]] | None

Async callback (message, context, session_id) -> str. If provided, this is called for each turn.

None
prompt_queue Queue | None

Queue where prompts are put for external consumption.

None
response_queue Queue | None

Queue where responses are expected.

None
timeout_seconds float

Max time to wait for a human response.

300.0
system_prompt str

System instructions (informational for the human).

''
Source code in src/heddle/contrib/chatbridge/manual.py
def __init__(
    self,
    on_prompt: Callable[[str, dict, str], Awaitable[str]] | None = None,
    prompt_queue: asyncio.Queue | None = None,
    response_queue: asyncio.Queue | None = None,
    timeout_seconds: float = 300.0,
    system_prompt: str = "",
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._on_prompt = on_prompt
    self._prompt_queue = prompt_queue
    self._response_queue = response_queue
    self._timeout = timeout_seconds

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Request a human response for this turn.

Session history is only updated after the human responds. Earlier the prompt was appended eagerly, so a timeout or queue failure left it in history; on retry the human would see two consecutive prompts.

Source code in src/heddle/contrib/chatbridge/manual.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Request a human response for this turn.

    Session history is only updated after the human responds.
    Earlier the prompt was appended eagerly, so a timeout or
    queue failure left it in history; on retry the human would
    see two consecutive prompts.
    """
    session = self._get_or_create_session(session_id)
    prompt_entry = {"role": "system", "content": message}

    if self._on_prompt is not None:
        content = await asyncio.wait_for(
            self._on_prompt(message, context, session_id),
            timeout=self._timeout,
        )
    elif self._prompt_queue is not None and self._response_queue is not None:
        await self._prompt_queue.put(
            {
                "message": message,
                "context": context,
                "session_id": session_id,
            }
        )
        content = await asyncio.wait_for(
            self._response_queue.get(),
            timeout=self._timeout,
        )
    else:
        msg = "ManualChatBridge needs either on_prompt or both prompt_queue and response_queue"
        raise ValueError(msg)

    # Both entries persist together once the human responded —
    # a timeout or queue failure before this point leaves
    # session.messages untouched.
    session.messages.append(prompt_entry)
    session.messages.append({"role": "human", "content": content})

    return ChatResponse(
        content=content,
        model="human",
        token_usage={},
        stop_reason="human_input",
        session_id=session_id,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/manual.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    return SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model="human",
        message_count=len(session.messages) if session else 0,
        created_at=session.created_at if session else None,
    )

worker

ChatBridgeBackend — wraps a ChatBridge as a Heddle ProcessingBackend.

This enables any ChatBridge adapter to be used as a standard Heddle worker via YAML config alone, without writing Python code::

name: "external_gpt4"
processing_backend: "heddle.contrib.chatbridge.worker.ChatBridgeBackend"
processing_config:
  bridge_class: "heddle.contrib.chatbridge.openai.OpenAIChatBridge"
  model: "gpt-4o"
  api_key_env: "OPENAI_API_KEY"

ChatBridgeBackend

ChatBridgeBackend(**config: Any)

Bases: SyncProcessingBackend

ProcessingBackend that delegates to a ChatBridge adapter.

The bridge class is dynamically imported from config, enabling YAML-only configuration of external chat agents.

Config keys

bridge_class: Dotted import path to a ChatBridge subclass. api_key_env: Optional env var name for API key. system_prompt: Optional system prompt for the bridge. **kwargs: Passed to the bridge constructor.

Source code in src/heddle/contrib/chatbridge/worker.py
def __init__(self, **config: Any) -> None:
    super().__init__(serialize_writes=False)
    self._config = config
    self._bridge = self._create_bridge(config)

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Not used — we override process() for async bridge calls.

Source code in src/heddle/contrib/chatbridge/worker.py
def process_sync(
    self,
    payload: dict[str, Any],
    config: dict[str, Any],
) -> dict[str, Any]:
    """Not used — we override process() for async bridge calls."""
    msg = "ChatBridgeBackend.process_sync should not be called directly"
    raise NotImplementedError(msg)

process async

process(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Delegate to the bridge's send_turn method.

Expects payload to contain
  • message or the entire payload as the message
  • _session_id (optional, defaults to "default")
  • _context (optional, additional context dict)
Source code in src/heddle/contrib/chatbridge/worker.py
async def process(
    self,
    payload: dict[str, Any],
    config: dict[str, Any],
) -> dict[str, Any]:
    """Delegate to the bridge's send_turn method.

    Expects payload to contain:
        - ``message`` or the entire payload as the message
        - ``_session_id`` (optional, defaults to "default")
        - ``_context`` (optional, additional context dict)
    """
    session_id = payload.get("_session_id", "default")
    context = payload.get("_context", {})

    # Use 'message' field if present, otherwise serialize full payload.
    import json

    message = payload.get("message")
    if message is None:
        # Remove internal fields before serializing.
        clean = {k: v for k, v in payload.items() if not k.startswith("_")}
        message = json.dumps(clean, ensure_ascii=False, indent=2)

    try:
        response = await self._bridge.send_turn(message, context, session_id)
    except Exception as e:
        msg = f"ChatBridge send_turn failed: {e}"
        raise ChatBridgeBackendError(msg) from e

    return {
        "output": {
            "content": response.content,
            "model": response.model,
            "session_id": response.session_id,
        },
        "model_used": response.model,
        "token_usage": response.token_usage,
    }

aclose async

aclose() -> None

Close the wrapped :class:ChatBridge (releases httpx clients).

Source code in src/heddle/contrib/chatbridge/worker.py
async def aclose(self) -> None:
    """Close the wrapped :class:`ChatBridge` (releases httpx clients)."""
    close = getattr(self._bridge, "aclose", None)
    if close is not None:
        await close()

Valkey/Redis Store

Production checkpoint store using Redis/Valkey. Replaces the default in-memory store for persistent orchestrator checkpoints.

store

Valkey-backed key-value store.

Production implementation of :class:heddle.core.kvstore.KeyValueStore using redis.asyncio (redis-py). The redis-py client library works unchanged with Valkey.

Install with: pip install heddle-ai[redis].

Connection defaults

redis://redis:6379 — matches the Docker Compose / k8s service name. For local dev: redis://localhost:6379.

RedisKeyValueStore

RedisKeyValueStore(redis_url: str = 'redis://redis:6379')

Bases: KeyValueStore

Valkey-backed key-value store (via redis-py client).

Thin wrapper around redis.asyncio that implements the KeyValueStore interface. Handles connection lifecycle and TTL-based expiry natively. The redis-py client works unchanged with Valkey.

Source code in src/heddle/contrib/redis/store.py
def __init__(self, redis_url: str = "redis://redis:6379") -> None:
    self._redis = redis.from_url(redis_url)

set async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/heddle/contrib/redis/store.py
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    if ttl_seconds:
        await self._redis.set(key, value, ex=ttl_seconds)
    else:
        await self._redis.set(key, value)

get async

get(key: str) -> str | None

Retrieve a value, or None if missing/expired.

Source code in src/heddle/contrib/redis/store.py
async def get(self, key: str) -> str | None:
    """Retrieve a value, or ``None`` if missing/expired."""
    result = await self._redis.get(key)
    if result is None:
        return None
    if isinstance(result, bytes):
        return result.decode()
    return result

set_if_not_exists async

set_if_not_exists(key: str, value: str, ttl_seconds: int) -> bool

Atomic SET NX EX (Valkey/Redis primitive).

Source code in src/heddle/contrib/redis/store.py
async def set_if_not_exists(self, key: str, value: str, ttl_seconds: int) -> bool:
    """Atomic SET NX EX (Valkey/Redis primitive)."""
    result = await self._redis.set(key, value, ex=ttl_seconds, nx=True)
    return bool(result)

aclose async

aclose() -> None

Close the underlying Redis client connection pool.

Source code in src/heddle/contrib/redis/store.py
async def aclose(self) -> None:
    """Close the underlying Redis client connection pool."""
    await self._redis.aclose()

DuckDB Query Backend

Action-dispatch query backend for DuckDB. Supports full-text search, filtering, statistics, single-row get, and vector similarity search.

query_backend

Generic DuckDB query and analytics backend for Heddle workflows.

Provides a configurable action-dispatch query backend against any DuckDB table. Supports full-text search (via DuckDB FTS), attribute filtering, aggregate statistics, single-record retrieval, and vector similarity search.

Subclasses configure domain-specific behavior by passing constructor parameters (table name, columns, filter definitions, etc.) rather than overriding methods. For advanced customisation, override _get_handlers to add or replace action handlers.

Example worker config::

processing_backend: "myapp.backends.MyQueryBackend"
backend_config:
  db_path: "/tmp/workspace/data.duckdb"
See Also

heddle.worker.processor.SyncProcessingBackend -- base class for sync backends heddle.contrib.duckdb.DuckDBViewTool -- LLM-callable view tool heddle.contrib.duckdb.DuckDBVectorTool -- LLM-callable vector search tool

DuckDBQueryError

Bases: BackendError

Raised when a DuckDB query operation fails.

Wraps underlying DuckDB exceptions with a descriptive message and the original cause attached via __cause__.

DuckDBQueryBackend

DuckDBQueryBackend(db_path: str = '/tmp/workspace/data.duckdb', *, table_name: str = 'documents', result_columns: list[str] | None = None, json_columns: set[str] | None = None, id_column: str = 'id', full_text_column: str | None = 'full_text', fts_fields: str = 'full_text,summary', filter_fields: dict[str, str] | None = None, stats_groups: set[str] | None = None, stats_aggregates: list[str] | None = None, default_order_by: str = 'rowid', embedding_column: str = 'embedding')

Bases: SyncProcessingBackend

Generic action-dispatch query backend for DuckDB tables.

Opens a read-only connection to the DuckDB database and dispatches to the appropriate query handler based on the action field in the payload.

All queries use parameterized statements to prevent SQL injection. Results from search/filter actions exclude large content columns (configurable via full_text_column) to keep messages small.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file.

'/tmp/workspace/data.duckdb'
table_name str

Table to query.

'documents'
result_columns list[str] | None

Columns returned in search/filter results.

None
json_columns set[str] | None

Set of column names containing JSON strings that should be parsed back into Python objects on read.

None
id_column str

Primary key column name for the get action.

'id'
full_text_column str | None

Large content column included only in get results. Set to None if no such column exists.

'full_text'
fts_fields str

Comma-separated field names for DuckDB FTS match_bm25 calls (e.g. "content,summary").

'full_text,summary'
filter_fields dict[str, str] | None

Mapping of payload field names to SQL condition templates. Example: {"min_pages": "page_count >= ?"}. Each key is checked in the payload; if present, its SQL template is added to the WHERE clause.

None
stats_groups set[str] | None

Set of column names allowed as group_by values for the stats action.

None
stats_aggregates list[str] | None

SQL aggregate expressions for the stats query. Defaults to ["COUNT(*) AS record_count"].

None
default_order_by str

ORDER BY clause for filter results.

'rowid'
embedding_column str

Column name for vector embeddings used in the vector_search action.

'embedding'
Source code in src/heddle/contrib/duckdb/query_backend.py
def __init__(
    self,
    db_path: str = "/tmp/workspace/data.duckdb",
    *,
    table_name: str = "documents",
    result_columns: list[str] | None = None,
    json_columns: set[str] | None = None,
    id_column: str = "id",
    full_text_column: str | None = "full_text",
    fts_fields: str = "full_text,summary",
    filter_fields: dict[str, str] | None = None,
    stats_groups: set[str] | None = None,
    stats_aggregates: list[str] | None = None,
    default_order_by: str = "rowid",
    embedding_column: str = "embedding",
) -> None:
    from heddle.contrib._sql_security import (
        validate_sql_identifier,
        validate_sql_identifier_list,
    )

    self.db_path = Path(db_path)
    # Validate identifier-shaped fields at construction so a typo
    # in YAML (or worse, a crafted value) fails fast rather than
    # silently corrupting an interpolated query.  ``stats_aggregates``
    # and ``default_order_by`` are intentionally free-form SQL
    # (callers ship things like ``"COUNT(*) AS n"``); we don't
    # validate those — they're a documented power-user surface.
    self.table_name = validate_sql_identifier(table_name, field="table_name")
    self.id_column = validate_sql_identifier(id_column, field="id_column")
    self.embedding_column = validate_sql_identifier(embedding_column, field="embedding_column")
    if full_text_column is not None:
        self.full_text_column = validate_sql_identifier(
            full_text_column, field="full_text_column"
        )
    else:
        self.full_text_column = None
    # ``fts_fields`` is comma-separated, e.g. "full_text,summary".
    # Each name must be a valid identifier.  We re-join after
    # validation so the stored value is the canonical
    # whitespace-trimmed form.
    self.fts_fields = ",".join(validate_sql_identifier_list(fts_fields, field="fts_fields"))
    # ``result_columns`` is interpolated as ``f"d.{c}"`` /
    # ``", ".join(result_columns)`` in every SELECT.  Validate
    # at construction so a misconfigured YAML (or a future
    # caller bypassing the schema) can't smuggle a SQL fragment
    # through an unquoted identifier slot.
    raw_columns = result_columns or ["id"]
    self.result_columns = [
        validate_sql_identifier(c, field="result_columns item") for c in raw_columns
    ]
    self.json_columns = json_columns or set()
    self.filter_fields = filter_fields or {}
    self.stats_groups = stats_groups or set()
    self.stats_aggregates = stats_aggregates or ["COUNT(*) AS record_count"]
    self.default_order_by = default_order_by

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Dispatch a query action against the DuckDB database.

Parameters:

Name Type Description Default
payload dict[str, Any]

Must contain action (str). Additional fields depend on the action type.

required
config dict[str, Any]

Worker config dict. May include db_path to override the constructor default.

required

Returns:

Type Description
dict[str, Any]

A dict with "output" (query results) and

dict[str, Any]

"model_used" (always "duckdb").

Raises:

Type Description
ValueError

If the action is unknown.

DuckDBQueryError

If the database query fails.

Source code in src/heddle/contrib/duckdb/query_backend.py
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Dispatch a query action against the DuckDB database.

    Args:
        payload: Must contain ``action`` (str). Additional fields
            depend on the action type.
        config: Worker config dict. May include ``db_path`` to
            override the constructor default.

    Returns:
        A dict with ``"output"`` (query results) and
        ``"model_used"`` (always ``"duckdb"``).

    Raises:
        ValueError: If the action is unknown.
        DuckDBQueryError: If the database query fails.
    """
    db_path = config.get("db_path", str(self.db_path))
    action = payload.get("action", "")

    handlers = self._get_handlers()

    handler = handlers.get(action)
    if not handler:
        raise ValueError(f"Unknown action '{action}'. Supported: {', '.join(handlers.keys())}")

    try:
        conn = duckdb.connect(db_path, read_only=True)
        try:
            # Load FTS extension for search queries.
            conn.execute("LOAD fts")
            result = handler(conn, payload)
        finally:
            conn.close()
    except (ValueError, DuckDBQueryError):
        raise
    except Exception as exc:
        raise DuckDBQueryError(f"Query failed (action={action}): {exc}") from exc

    return {"output": result, "model_used": "duckdb"}

DuckDB View Tool

Read-only DuckDB view exposed as an LLM-callable tool. Workers can query structured data during processing.

view_tool

DuckDB view tool — exposes a DuckDB view as an LLM-callable tool.

When configured in a worker's knowledge_silos, this tool lets the LLM query a read-only DuckDB view during reasoning. The LLM can search (full-text) or list records from the view.

Example knowledge_silos config::

knowledge_silos:
  - name: "catalog"
    type: "tool"
    provider: "heddle.contrib.duckdb.DuckDBViewTool"
    config:
      db_path: "/tmp/workspace/data.duckdb"
      view_name: "summaries"
      description: "Search and browse record summaries"
      max_results: 20

The tool auto-introspects the view's columns via DESCRIBE to build its JSON Schema definition. Queries use parameterized SQL to prevent injection.

DuckDBViewTool

DuckDBViewTool(db_path: str, view_name: str, description: str = 'Query a database view', max_results: int = 20)

Bases: SyncToolProvider

Expose a DuckDB view as an LLM-callable search/list tool.

The tool dynamically introspects the view's column schema at instantiation time and builds a JSON Schema tool definition that the LLM can call.

Supports two operations
  • search: Full-text ILIKE search across all text columns
  • list: List recent records with optional column filters

All queries are parameterized and results are capped at max_results.

Source code in src/heddle/contrib/duckdb/view_tool.py
def __init__(
    self,
    db_path: str,
    view_name: str,
    description: str = "Query a database view",
    max_results: int = 20,
) -> None:
    self.db_path = db_path
    # Validate at construction so misconfigured YAML fails loudly
    # at startup rather than silently breaking (or worse, executing
    # an injected DDL fragment) later when ``DESCRIBE {view_name}``
    # is interpolated.
    self.view_name = validate_sql_identifier(view_name, field="view_name")
    self.description = description
    self.max_results = max_results
    self._columns: list[dict[str, str]] = []
    self._introspect()

get_definition

get_definition() -> dict[str, Any]

Build JSON Schema tool definition from view columns.

Source code in src/heddle/contrib/duckdb/view_tool.py
def get_definition(self) -> dict[str, Any]:
    """Build JSON Schema tool definition from view columns."""
    # Build filterable columns (non-text columns that might be useful for filtering)
    filter_props: dict[str, Any] = {}
    for col in self._columns:
        col_type = col["type"].upper()
        if col_type in ("VARCHAR", "TEXT"):
            filter_props[col["name"]] = {"type": "string"}
        elif col_type in ("INTEGER", "BIGINT", "SMALLINT", "TINYINT"):
            filter_props[col["name"]] = {"type": "integer"}
        elif col_type in ("DOUBLE", "FLOAT", "DECIMAL"):
            filter_props[col["name"]] = {"type": "number"}
        elif col_type == "BOOLEAN":
            filter_props[col["name"]] = {"type": "boolean"}

    return {
        "name": f"query_{self.view_name}",
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["search", "list"],
                    "description": "search: full-text search; list: browse records",
                },
                "query": {
                    "type": "string",
                    "description": "Search query (for search operation)",
                },
                "limit": {
                    "type": "integer",
                    "description": (
                        f"Max results to return (default: 10, max: {self.max_results})"
                    ),
                },
                "filters": {
                    "type": "object",
                    "description": "Column filters (for list operation)",
                    "properties": filter_props,
                },
            },
            "required": ["operation"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Execute a query against the DuckDB view.

Source code in src/heddle/contrib/duckdb/view_tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Execute a query against the DuckDB view."""
    operation = arguments.get("operation", "list")
    limit = min(arguments.get("limit", 10), self.max_results)

    try:
        conn = duckdb.connect(self.db_path, read_only=True)
        try:
            if operation == "search":
                result = self._search(conn, arguments, limit)
            else:
                result = self._list(conn, arguments, limit)
        finally:
            conn.close()
    except Exception as e:
        return json.dumps({"error": str(e)})

    return json.dumps(result, default=str)

DuckDB Vector Tool

Semantic similarity search via DuckDB embeddings, exposed as an LLM tool.

vector_tool

DuckDB vector similarity search tool for LLM function-calling.

Uses embedding vectors stored in DuckDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using DuckDB's list_cosine_similarity.

Example knowledge_silos config::

knowledge_silos:
  - name: "similar_items"
    type: "tool"
    provider: "heddle.contrib.duckdb.DuckDBVectorTool"
    config:
      db_path: "/tmp/workspace/data.duckdb"
      table_name: "documents"
      result_columns: ["id", "title", "summary", "created_at"]
      embedding_column: "embedding"
      tool_name: "find_similar"
      description: "Find records semantically similar to a query"
      embedding_model: "nomic-embed-text"
See Also

heddle.worker.embeddings -- OllamaEmbeddingProvider heddle.worker.tools -- SyncToolProvider base class

DuckDBVectorTool

DuckDBVectorTool(db_path: str, table_name: str = 'documents', result_columns: list[str] | None = None, embedding_column: str = 'embedding', tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)

Bases: SyncToolProvider

Semantic similarity search over DuckDB vector embeddings.

Generates a query embedding via Ollama, then uses DuckDB's list_cosine_similarity function to find the most similar records by their stored embedding vectors.

Only records with non-null embeddings are searched.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file.

required
table_name str

Table containing the records and embeddings.

'documents'
result_columns list[str] | None

Columns to include in results. If None, introspects the table schema at first use, excluding the embedding column and any column named full_text.

None
embedding_column str

Name of the column storing embedding vectors.

'embedding'
tool_name str

Name exposed in the LLM tool definition.

'find_similar'
description str

Description exposed in the LLM tool definition.

'Find semantically similar records'
embedding_model str

Ollama model name for embedding generation.

'nomic-embed-text'
ollama_url str | None

Optional custom Ollama server URL.

None
max_results int

Hard cap on returned results.

10
Source code in src/heddle/contrib/duckdb/vector_tool.py
def __init__(
    self,
    db_path: str,
    table_name: str = "documents",
    result_columns: list[str] | None = None,
    embedding_column: str = "embedding",
    tool_name: str = "find_similar",
    description: str = "Find semantically similar records",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str | None = None,
    max_results: int = 10,
) -> None:
    from heddle.contrib._sql_security import validate_sql_identifier

    self.db_path = db_path
    # Identifiers flow into ``DESCRIBE`` and ``SELECT ... FROM
    # {table}`` statements as f-string interpolations.  Validate at
    # construction so misconfigured YAML rejects loudly instead of
    # silently corrupting (or hijacking) queries.
    self.table_name = validate_sql_identifier(table_name, field="table_name")
    self.embedding_column = validate_sql_identifier(embedding_column, field="embedding_column")
    # ``_result_columns`` is interpolated as ``", ".join(self.result_columns)``
    # in every SELECT; validate explicit columns at construction.
    # ``None`` is preserved so introspection (which already pulls
    # validated identifiers from ``DESCRIBE``) drives the
    # populated value lazily.
    if result_columns is None:
        self._result_columns: list[str] | None = None
    else:
        self._result_columns = [
            validate_sql_identifier(c, field="result_columns item") for c in result_columns
        ]
    self.tool_name = tool_name
    self.description = description
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.max_results = max_results

result_columns property

result_columns: list[str]

Return result columns, introspecting on first access if needed.

get_definition

get_definition() -> dict[str, Any]

Return tool definition for LLM function-calling.

Source code in src/heddle/contrib/duckdb/vector_tool.py
def get_definition(self) -> dict[str, Any]:
    """Return tool definition for LLM function-calling."""
    return {
        "name": self.tool_name,
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language query to find similar records",
                },
                "limit": {
                    "type": "integer",
                    "description": f"Max results (default: 5, max: {self.max_results})",
                },
            },
            "required": ["query"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Embed the query and search for similar records.

Source code in src/heddle/contrib/duckdb/vector_tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Embed the query and search for similar records."""
    query = arguments.get("query", "")
    limit = min(arguments.get("limit", 5), self.max_results)

    if not query.strip():
        return json.dumps({"results": [], "total": 0})

    # Generate query embedding via Ollama.
    query_embedding = self._embed_query(query)
    if query_embedding is None:
        return json.dumps({"error": "Failed to generate query embedding"})

    try:
        conn = duckdb.connect(self.db_path, read_only=True)
        try:
            result = self._similarity_search(conn, query_embedding, limit)
        finally:
            conn.close()
    except Exception as e:
        return json.dumps({"error": str(e)})

    return json.dumps(result, default=str)

LanceDB Vector Store

ANN vector storage and search via LanceDB. Faster than DuckDB for large datasets. Implements the VectorStore ABC.

store

LanceDB-backed vector store for embedded text chunks.

Stores EmbeddedChunk records in a LanceDB table with native vector columns. Supports: - Batch insertion of TextChunk objects (with embedding generation) - Pre-embedded chunk insertion - Approximate Nearest Neighbor (ANN) similarity search - Metadata filtering (e.g. by channel_id) - Basic CRUD (get, delete by chunk_id)

Uses Heddle's :class:OllamaEmbeddingProvider (default) or :class:OpenAICompatibleEmbeddingProvider (e.g. for LM Studio) for query embedding generation.

LanceDB provides ANN indexing for faster search over large datasets compared to exact cosine similarity in DuckDB.

LanceDBVectorStore

LanceDBVectorStore(db_path: str = '/tmp/rag-vectors.lance', embedding_model: str = 'nomic-embed-text', ollama_url: str = 'http://localhost:11434', embedding_backend: str = 'ollama', embedding_url: str | None = None)

Bases: VectorStore

Embedded vector store backed by LanceDB.

Usage::

store = LanceDBVectorStore("/tmp/rag-vectors.lance")
store.initialize()

# Embed and store chunks
store.add_chunks(chunks)

# Search
results = store.search("earthquake damage", limit=5)

store.close()

Initialize the store.

See :class:heddle.contrib.rag.vectorstore.duckdb_store.DuckDBVectorStore for parameter semantics. embedding_backend selects between :class:OllamaEmbeddingProvider and :class:OpenAICompatibleEmbeddingProvider (LM Studio etc.).

Source code in src/heddle/contrib/lancedb/store.py
def __init__(
    self,
    db_path: str = "/tmp/rag-vectors.lance",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str = "http://localhost:11434",
    embedding_backend: str = "ollama",
    embedding_url: str | None = None,
) -> None:
    """Initialize the store.

    See :class:`heddle.contrib.rag.vectorstore.duckdb_store.DuckDBVectorStore`
    for parameter semantics.  ``embedding_backend`` selects between
    :class:`OllamaEmbeddingProvider` and
    :class:`OpenAICompatibleEmbeddingProvider` (LM Studio etc.).
    """
    self.db_path = Path(db_path)
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.embedding_backend = embedding_backend
    self.embedding_url = embedding_url
    self._db: Any = None
    self._table: Any = None
    self._embedding_dim: int | None = None

initialize

initialize() -> LanceDBVectorStore

Open or create the LanceDB database and table.

Source code in src/heddle/contrib/lancedb/store.py
def initialize(self) -> LanceDBVectorStore:
    """Open or create the LanceDB database and table."""
    import lancedb

    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    self._db = lancedb.connect(str(self.db_path))

    # Check if table already exists
    if self.TABLE_NAME in self._db.list_tables():
        self._table = self._db.open_table(self.TABLE_NAME)
    else:
        self._table = None  # Created on first insert (need schema from data)

    logger.info("Initialized LanceDB vector store at %s", self.db_path)
    return self

close

close() -> None

Close the database connection.

Source code in src/heddle/contrib/lancedb/store.py
def close(self) -> None:
    """Close the database connection."""
    self._table = None
    self._db = None

add_chunks

add_chunks(chunks: list[TextChunk], batch_size: int = 64) -> int

Embed and insert TextChunk objects. Returns count of inserted rows.

Source code in src/heddle/contrib/lancedb/store.py
def add_chunks(  # pragma: no cover
    self,
    chunks: list[TextChunk],
    batch_size: int = 64,
) -> int:
    """Embed and insert TextChunk objects. Returns count of inserted rows."""
    if not chunks:
        return 0

    total_inserted = 0
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i : i + batch_size]
        texts = [c.text for c in batch]

        try:
            embeddings = self._embed_texts(texts)
        except Exception as exc:
            logger.error("Embedding batch %d failed: %s", i // batch_size, exc)
            continue

        records = []
        for chunk, emb in zip(batch, embeddings, strict=False):
            records.append(
                {
                    "chunk_id": chunk.chunk_id,
                    "source_global_id": chunk.source_global_id,
                    "source_channel_id": chunk.source_channel_id,
                    "source_channel_name": chunk.source_channel_name,
                    "text": chunk.text,
                    "char_start": chunk.char_start,
                    "char_end": chunk.char_end,
                    "chunk_index": chunk.chunk_index,
                    "total_chunks": chunk.total_chunks,
                    "strategy": chunk.strategy.value
                    if hasattr(chunk.strategy, "value")
                    else str(chunk.strategy),
                    "timestamp_unix": chunk.timestamp_unix,
                    "vector": emb,
                    "embedding_model": self.embedding_model,
                    "embedding_dim": len(emb),
                }
            )

        try:
            created = self._ensure_table(records)
            if not created and self._table is not None and records:
                self._table.add(records)
            total_inserted += len(records)
        except Exception as exc:
            logger.warning("Insert batch %d failed: %s", i // batch_size, exc)

    logger.info("Inserted %d / %d chunks into %s", total_inserted, len(chunks), self.TABLE_NAME)
    return total_inserted

add_embedded_chunks

add_embedded_chunks(chunks: list[EmbeddedChunk]) -> int

Insert pre-embedded chunks (no embedding generation needed).

Source code in src/heddle/contrib/lancedb/store.py
def add_embedded_chunks(self, chunks: list[EmbeddedChunk]) -> int:
    """Insert pre-embedded chunks (no embedding generation needed)."""
    if not chunks:
        return 0

    records = [
        {
            "chunk_id": ec.chunk_id,
            "source_global_id": ec.source_global_id,
            "source_channel_id": ec.source_channel_id,
            "text": ec.text,
            "vector": ec.embedding,
            "embedding_model": ec.model,
            "embedding_dim": ec.dimensions,
            "source_channel_name": "",
            "char_start": 0,
            "char_end": 0,
            "chunk_index": 0,
            "total_chunks": 1,
            "strategy": "sentence",
            "timestamp_unix": 0,
        }
        for ec in chunks
    ]

    try:
        created = self._ensure_table(records)
        if not created and self._table is not None:
            self._table.add(records)
        return len(records)
    except Exception as exc:
        logger.warning("Insert pre-embedded chunks failed: %s", exc)
        return 0

search

search(query: str, limit: int = 10, min_score: float = 0.0, channel_ids: list[int] | None = None) -> list[SimilarityResult]

Semantic similarity search using LanceDB ANN.

Parameters:

Name Type Description Default
query str

Natural language query (embedded via Ollama)

required
limit int

Maximum results to return

10
min_score float

Minimum cosine similarity threshold

0.0
channel_ids list[int] | None

Optional filter by source channel

None

Returns:

Type Description
list[SimilarityResult]

List of SimilarityResult sorted by descending similarity

Source code in src/heddle/contrib/lancedb/store.py
def search(  # pragma: no cover
    self,
    query: str,
    limit: int = 10,
    min_score: float = 0.0,
    channel_ids: list[int] | None = None,
) -> list[SimilarityResult]:
    """
    Semantic similarity search using LanceDB ANN.

    Args:
        query:       Natural language query (embedded via Ollama)
        limit:       Maximum results to return
        min_score:   Minimum cosine similarity threshold
        channel_ids: Optional filter by source channel

    Returns:
        List of SimilarityResult sorted by descending similarity
    """
    if self._table is None:
        return []

    embeddings = self._embed_texts([query])
    if not embeddings:
        return []

    query_emb = embeddings[0]

    search_query = self._table.search(query_emb, vector_column_name="vector").limit(limit)

    if channel_ids:
        filter_expr = _build_channel_id_filter(channel_ids)
        search_query = search_query.where(f"({filter_expr})")

    try:
        raw_results = search_query.to_list()
    except Exception as exc:
        logger.error("LanceDB search failed: %s", exc)
        return []

    results: list[SimilarityResult] = []
    for row in raw_results:
        # LanceDB returns _distance (L2) by default; for cosine metric
        # it returns 1 - cosine_similarity, so score = 1 - _distance
        distance = row.get("_distance", 1.0)
        score = max(0.0, 1.0 - distance)

        if score < min_score:
            continue

        results.append(
            SimilarityResult(
                chunk_id=row["chunk_id"],
                text=row["text"],
                score=score,
                source_channel_id=row["source_channel_id"],
                source_global_id=row["source_global_id"],
                metadata={
                    "source_channel_name": row.get("source_channel_name", ""),
                    "timestamp_unix": row.get("timestamp_unix", 0),
                    "strategy": row.get("strategy", ""),
                },
            )
        )

    return results

count

count() -> int

Return total number of stored chunks.

Source code in src/heddle/contrib/lancedb/store.py
def count(self) -> int:
    """Return total number of stored chunks."""
    if self._table is None:
        return 0
    return self._table.count_rows()

get

get(chunk_id: str) -> EmbeddedChunk | None

Retrieve a single embedded chunk by ID.

Source code in src/heddle/contrib/lancedb/store.py
def get(self, chunk_id: str) -> EmbeddedChunk | None:
    """Retrieve a single embedded chunk by ID."""
    if self._table is None:
        return None

    # ``chunk_id`` is user data (not an identifier) flowing into a
    # LanceDB single-quoted string literal.  Escape ``'`` → ``''``
    # so a quote in the id can't break out of the literal.
    try:
        results = (
            self._table.search()
            .where(f"chunk_id = '{escape_sql_string_literal(chunk_id)}'")
            .limit(1)
            .to_list()
        )
    except Exception:
        return None

    if not results:
        return None

    row = results[0]
    return EmbeddedChunk(
        chunk_id=row["chunk_id"],
        source_global_id=row["source_global_id"],
        source_channel_id=row["source_channel_id"],
        text=row["text"],
        embedding=list(row.get("vector", [])),
        model=row.get("embedding_model", ""),
        dimensions=row.get("embedding_dim", 0),
    )

delete

delete(chunk_id: str) -> bool

Delete a chunk by ID. Returns True if a row was deleted.

Source code in src/heddle/contrib/lancedb/store.py
def delete(self, chunk_id: str) -> bool:
    """Delete a chunk by ID. Returns True if a row was deleted."""
    if self._table is None:
        return False

    before = self._table.count_rows()
    try:
        self._table.delete(f"chunk_id = '{escape_sql_string_literal(chunk_id)}'")
    except Exception as exc:
        logger.warning("Delete failed for chunk %s: %s", chunk_id, exc)
        return False
    return self._table.count_rows() < before

delete_by_source

delete_by_source(source_global_id: str) -> int

Delete all chunks for a given source post. Returns count.

Source code in src/heddle/contrib/lancedb/store.py
def delete_by_source(self, source_global_id: str) -> int:
    """Delete all chunks for a given source post. Returns count."""
    if self._table is None:
        return 0

    before = self._table.count_rows()
    try:
        self._table.delete(
            f"source_global_id = '{escape_sql_string_literal(source_global_id)}'"
        )
    except Exception as exc:
        logger.warning("Delete by source failed for %s: %s", source_global_id, exc)
        return 0
    return before - self._table.count_rows()

stats

stats() -> dict[str, Any]

Return summary statistics about the store.

Earlier to_pandas() on the full table OOM'd on large stores (a 10M-chunk LanceDB table with 768-dim float32 vectors is ~30 GB once materialised). We now project to scalar columns only via search().select(...).to_list() so the vector column is never copied out of LanceDB. Falls back to a row count if even the projected scan fails (e.g. backend doesn't support the API combination).

Source code in src/heddle/contrib/lancedb/store.py
def stats(self) -> dict[str, Any]:
    """Return summary statistics about the store.

    Earlier ``to_pandas()`` on the full table OOM'd on large
    stores (a 10M-chunk LanceDB table with 768-dim float32
    vectors is ~30 GB once materialised).  We now project to
    scalar columns only via ``search().select(...).to_list()``
    so the ``vector`` column is never copied out of LanceDB.
    Falls back to a row count if even the projected scan
    fails (e.g. backend doesn't support the API combination).
    """
    if self._table is None:
        return {"total_chunks": 0}

    try:
        total = self._table.count_rows()
        if total == 0:
            return {"total_chunks": 0}

        # Project only the cheap scalar columns.  The
        # ``search()`` API is the vector-similarity entry point;
        # called without a vector query it scans the table, and
        # ``select`` constrains the columns LanceDB materialises.
        rows = (
            self._table.search()
            .select(["source_global_id", "source_channel_id", "timestamp_unix"])
            .limit(total)
            .to_list()
        )

        unique_posts: set[str] = set()
        unique_channels: set[int] = set()
        earliest = None
        latest = None
        for row in rows:
            unique_posts.add(row["source_global_id"])
            unique_channels.add(row["source_channel_id"])
            ts = row["timestamp_unix"]
            if earliest is None or ts < earliest:
                earliest = ts
            if latest is None or ts > latest:
                latest = ts

        return {
            "total_chunks": total,
            "unique_posts": len(unique_posts),
            "unique_channels": len(unique_channels),
            "earliest_timestamp": int(earliest) if earliest is not None else 0,
            "latest_timestamp": int(latest) if latest is not None else 0,
            "db_path": str(self.db_path),
        }
    except Exception as exc:
        logger.warning("Stats query failed: %s", exc)
        return {"total_chunks": self.count(), "db_path": str(self.db_path)}

LanceDB Vector Tool

Semantic similarity search via LanceDB, exposed as an LLM tool.

tool

LanceDB vector similarity search tool for LLM function-calling.

Uses embedding vectors stored in LanceDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using LanceDB's ANN search.

Example knowledge_silos config::

knowledge_silos:
  - name: "similar_items"
    type: "tool"
    provider: "heddle.contrib.lancedb.LanceDBVectorTool"
    config:
      db_path: "/tmp/workspace/rag-vectors.lance"
      table_name: "rag_chunks"
      tool_name: "find_similar"
      description: "Find records semantically similar to a query"
      embedding_model: "nomic-embed-text"
See Also

heddle.worker.embeddings -- OllamaEmbeddingProvider heddle.worker.tools -- SyncToolProvider base class

LanceDBVectorTool

LanceDBVectorTool(db_path: str, table_name: str = 'rag_chunks', vector_column: str = 'vector', result_columns: list[str] | None = None, tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)

Bases: SyncToolProvider

Semantic similarity search over LanceDB vector embeddings.

Generates a query embedding via Ollama, then uses LanceDB's ANN search to find the most similar records by their stored vectors.

Parameters:

Name Type Description Default
db_path str

Path to the LanceDB database directory.

required
table_name str

Table containing the records and embeddings.

'rag_chunks'
vector_column str

Name of the column storing embedding vectors.

'vector'
result_columns list[str] | None

Columns to include in results. If None, returns chunk_id, text, source_channel_id, source_global_id.

None
tool_name str

Name exposed in the LLM tool definition.

'find_similar'
description str

Description exposed in the LLM tool definition.

'Find semantically similar records'
embedding_model str

Ollama model name for embedding generation.

'nomic-embed-text'
ollama_url str | None

Optional custom Ollama server URL.

None
max_results int

Hard cap on returned results.

10
Source code in src/heddle/contrib/lancedb/tool.py
def __init__(
    self,
    db_path: str,
    table_name: str = "rag_chunks",
    vector_column: str = "vector",
    result_columns: list[str] | None = None,
    tool_name: str = "find_similar",
    description: str = "Find semantically similar records",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str | None = None,
    max_results: int = 10,
) -> None:
    self.db_path = db_path
    self.table_name = table_name
    self.vector_column = vector_column
    self._result_columns = result_columns or [
        "chunk_id",
        "text",
        "source_channel_id",
        "source_global_id",
    ]
    self.tool_name = tool_name
    self.description = description
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.max_results = max_results

get_definition

get_definition() -> dict[str, Any]

Return tool definition for LLM function-calling.

Source code in src/heddle/contrib/lancedb/tool.py
def get_definition(self) -> dict[str, Any]:
    """Return tool definition for LLM function-calling."""
    return {
        "name": self.tool_name,
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language query to find similar records",
                },
                "limit": {
                    "type": "integer",
                    "description": f"Max results (default: 5, max: {self.max_results})",
                },
            },
            "required": ["query"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Embed the query and search for similar records.

Source code in src/heddle/contrib/lancedb/tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:  # pragma: no cover
    """Embed the query and search for similar records."""
    query = arguments.get("query", "")
    limit = min(arguments.get("limit", 5), self.max_results)

    if not query.strip():
        return json.dumps({"results": [], "total": 0})

    query_embedding = self._embed_query(query)
    if query_embedding is None:
        return json.dumps({"error": "Failed to generate query embedding"})

    try:
        import lancedb

        db = lancedb.connect(self.db_path)
        if self.table_name not in db.table_names():
            return json.dumps({"results": [], "total": 0})

        table = db.open_table(self.table_name)
        raw_results = (
            table.search(query_embedding, vector_column_name=self.vector_column)
            .limit(limit)
            .to_list()
        )

        results = []
        for row in raw_results:
            record = {}
            for col in self._result_columns:
                if col in row:
                    record[col] = row[col]
            distance = row.get("_distance", 1.0)
            record["similarity"] = max(0.0, 1.0 - distance)
            results.append(record)

        return json.dumps({"results": results, "total": len(results)}, default=str)

    except Exception as e:
        return json.dumps({"error": str(e)})

Document Processing — Extractors

Pluggable extractors for PDF / DOCX / Markdown / HTML inputs. All backends produce the same ExtractorOutput so downstream pipeline stages don't need to know which engine ran.

contracts

Pydantic I/O contracts for document processing.

These models define the typed input/output schema shared by all extraction backends (Docling, MarkItDown, SmartExtractor). Worker YAML configs can reference them via input_schema_ref / output_schema_ref.

ExtractorInput

Bases: BaseModel

Input for document extraction backends.

ExtractorOutput

Bases: BaseModel

Output from document extraction (shared by all backends).

markitdown_backend

MarkItDown-based document extraction backend.

Wraps Microsoft MarkItDown to convert PDF, DOCX, PPTX, XLSX, HTML, and other formats to Markdown. Much faster than Docling (no ML models, no torch) but cannot OCR scanned PDFs or extract complex table structures.

This backend produces the same output schema as DoclingBackend so it can be used as a drop-in replacement in the pipeline.

Input: {"file_ref": "filename.pdf"} (relative to workspace_dir) Output: {"file_ref": "filename_extracted.json", "page_count": N, "has_tables": bool, "sections": [...], "text_preview": "..."}

MarkItDown does not provide structural metadata (page count, tables, sections), so these are derived from the Markdown output:

page_count  — Estimated from form-feed characters or defaults to 1.
has_tables  — Detected via Markdown table syntax (| --- |).
sections    — Parsed from Markdown heading lines (# / ## / ###).
See Also

configs/workers/doc_extractor_smart.yaml -- smart extractor config docman.backends.smart_extractor -- composite backend with fallback heddle.worker.processor.SyncProcessingBackend -- base class

MarkItDownConversionError

Bases: BackendError

Raised when MarkItDown fails to convert a document.

Wraps underlying MarkItDown exceptions with a descriptive message and the original cause attached via __cause__.

MarkItDownBackend

MarkItDownBackend(workspace_dir: str = '/tmp/docman-workspace')

Bases: SyncProcessingBackend

SyncProcessingBackend that uses Microsoft MarkItDown for extraction.

Fast, lightweight document-to-Markdown conversion without ML models. Suitable for well-structured digital PDFs, DOCX, PPTX, XLSX, HTML, and other text-based formats. Not suitable for scanned/image-based PDFs that require OCR.

Produces the same output contract as DoclingBackend so downstream pipeline stages (classifier, summarizer, ingest) work unchanged.

Source code in src/heddle/contrib/docproc/markitdown_backend.py
def __init__(self, workspace_dir: str = "/tmp/docman-workspace") -> None:
    self.workspace_dir = Path(workspace_dir)

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Extract text from a document using MarkItDown.

Parameters:

Name Type Description Default
payload dict[str, Any]

Must contain file_ref (str) -- the filename of the source document, relative to the workspace directory.

required
config dict[str, Any]

Worker config dict. May include workspace_dir to override the constructor default.

required

Returns:

Type Description
dict[str, Any]

{"output": {...}, "model_used": "markitdown"}

Raises:

Type Description
ValueError

If file_ref attempts path traversal.

FileNotFoundError

If the source file does not exist.

MarkItDownConversionError

If MarkItDown fails to convert.

Source code in src/heddle/contrib/docproc/markitdown_backend.py
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Extract text from a document using MarkItDown.

    Args:
        payload: Must contain ``file_ref`` (str) -- the filename of the
            source document, relative to the workspace directory.
        config: Worker config dict.  May include ``workspace_dir`` to
            override the constructor default.

    Returns:
        ``{"output": {...}, "model_used": "markitdown"}``

    Raises:
        ValueError: If file_ref attempts path traversal.
        FileNotFoundError: If the source file does not exist.
        MarkItDownConversionError: If MarkItDown fails to convert.
    """
    file_ref = payload["file_ref"]
    ws_dir = config.get("workspace_dir", str(self.workspace_dir))
    ws = WorkspaceManager(ws_dir)

    source_path = ws.resolve(file_ref)

    try:
        result = self._extract(source_path, ws, config)
    except MarkItDownConversionError:
        raise
    except Exception as exc:
        raise MarkItDownConversionError(f"Failed to extract '{file_ref}': {exc}") from exc

    return {"output": result, "model_used": "markitdown"}

docling_backend

Docling-based document extraction backend.

Wraps IBM Docling to extract text, tables, and structure from PDF/DOCX files. Extends SyncProcessingBackend so the synchronous Docling work is automatically offloaded to a thread pool, keeping the async event loop responsive.

This is the first stage in DocMan's document processing pipeline: doc_extractor (this) -> doc_classifier -> doc_summarizer

Input: {"file_ref": "filename.pdf"} (relative to workspace_dir) Output: {"file_ref": "filename_extracted.json", "page_count": N, "has_tables": bool, "sections": [...], "text_preview": "..."}

The extracted JSON is written to workspace_dir and contains the full document text. Subsequent stages reference it via file_ref to avoid passing large text through NATS messages.

Docling tuning options can be passed via backend_config in the worker YAML

device: "mps" | "cpu" | "cuda" | "auto" (default: "auto") num_threads: int (default: 8 on Apple Silicon, 4 elsewhere) ocr_engine: "ocrmac" | "easyocr" | "tesseract" (default: "ocrmac" on macOS) layout_batch_size: int (default: 4) ocr_batch_size: int (default: 4) do_ocr: bool (default: true) do_table_structure: bool (default: true)

See Also

configs/workers/doc_extractor.yaml -- worker config with I/O schemas docs/docling-setup.md -- full Docling configuration and tuning guide heddle.worker.processor.SyncProcessingBackend -- base class for sync backends heddle.core.workspace.WorkspaceManager -- file-ref resolution with path safety

DoclingConversionError

Bases: BackendError

Raised when Docling fails to convert a document.

Wraps underlying Docling exceptions (corrupt PDFs, unsupported formats, out-of-memory conditions) with a descriptive message and the original cause attached via __cause__.

DoclingBackend

DoclingBackend(workspace_dir: str = '/tmp/docman-workspace')

Bases: SyncProcessingBackend

SyncProcessingBackend that uses IBM Docling for document extraction.

Reads a source document (PDF or DOCX) from the workspace directory, runs Docling's DocumentConverter to extract text, tables, and structural metadata, then writes the full extracted content as JSON back to the workspace. Returns a lightweight summary (file_ref, page_count, has_tables, sections, text_preview) suitable for passing through NATS messages to downstream pipeline stages.

Because Docling is synchronous and CPU-bound, this backend extends SyncProcessingBackend which automatically offloads process_sync() to a thread pool via run_in_executor.

Attributes:

Name Type Description
workspace_dir

Default workspace path. Can be overridden per-call via the workspace_dir key in the config dict.

Source code in src/heddle/contrib/docproc/docling_backend.py
def __init__(self, workspace_dir: str = "/tmp/docman-workspace") -> None:
    self.workspace_dir = Path(workspace_dir)

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Extract text and structure from a document using Docling.

Validates the input file_ref for path traversal and existence, then runs Docling's DocumentConverter synchronously (the parent class handles thread pool offloading).

Parameters:

Name Type Description Default
payload dict[str, Any]

Must contain file_ref (str) -- the filename of the source document, relative to the workspace directory.

required
config dict[str, Any]

Worker config dict (from the YAML's backend_config section). May include workspace_dir to override the constructor default, plus Docling tuning options such as device, ocr_engine, num_threads, etc.

required

Returns:

Type Description
dict[str, Any]

A dict with "output" (the extraction result dict) and

dict[str, Any]

"model_used" (always "docling"). The ProcessorWorker

dict[str, Any]

unpacks this and publishes the TaskResult to NATS.

Raises:

Type Description
ValueError

If file_ref attempts to escape the workspace directory (path traversal attack).

FileNotFoundError

If the source file does not exist in the workspace.

DoclingConversionError

If Docling fails to convert the document (corrupt file, unsupported format, OOM, etc.).

Source code in src/heddle/contrib/docproc/docling_backend.py
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Extract text and structure from a document using Docling.

    Validates the input file_ref for path traversal and existence, then
    runs Docling's DocumentConverter synchronously (the parent class
    handles thread pool offloading).

    Args:
        payload: Must contain ``file_ref`` (str) -- the filename of the
            source document, relative to the workspace directory.
        config: Worker config dict (from the YAML's ``backend_config``
            section). May include ``workspace_dir`` to override the
            constructor default, plus Docling tuning options such as
            ``device``, ``ocr_engine``, ``num_threads``, etc.

    Returns:
        A dict with ``"output"`` (the extraction result dict) and
        ``"model_used"`` (always ``"docling"``). The ProcessorWorker
        unpacks this and publishes the TaskResult to NATS.

    Raises:
        ValueError: If file_ref attempts to escape the workspace
            directory (path traversal attack).
        FileNotFoundError: If the source file does not exist in the
            workspace.
        DoclingConversionError: If Docling fails to convert the
            document (corrupt file, unsupported format, OOM, etc.).
    """
    file_ref = payload["file_ref"]
    ws_dir = config.get("workspace_dir", str(self.workspace_dir))
    ws = WorkspaceManager(ws_dir)

    # Validate path traversal and existence via WorkspaceManager.
    source_path = ws.resolve(file_ref)

    try:
        result = self._extract(source_path, ws, config)
    except DoclingConversionError:
        raise
    except Exception as exc:
        raise DoclingConversionError(f"Failed to extract '{file_ref}': {exc}") from exc

    return {"output": result, "model_used": "docling"}