Contrib Modules¶
The heddle.contrib package contains optional integrations that extend Heddle's
capabilities. Each module requires its own optional dependency extra.
| Module | Extra | Purpose |
|---|---|---|
contrib.council |
council |
Multi-round agent deliberation framework |
contrib.chatbridge |
chatbridge |
External chat/LLM session adapters |
contrib.docproc |
docproc |
Document extraction backends (PDF / DOCX / Markdown) |
contrib.duckdb |
duckdb |
Embedded analytics and vector search |
contrib.lancedb |
lancedb |
ANN vector search via LanceDB |
contrib.redis |
redis |
Production checkpoint persistence |
contrib.rag |
rag |
RAG pipeline — Telegram exports, CSV, plain text |
contrib.subprocess |
(stdlib only) | Wrap any CLI tool as a Heddle processor worker |
See Council How-To for the council and chatbridge guide. See RAG How-To for the RAG pipeline guide.
Council — Multi-Round Deliberation¶
Run structured team discussions where multiple LLM agents debate iteratively with pluggable protocols and convergence detection.
runner ¶
CouncilRunner — NATS-free council execution.
Runs a multi-round deliberation directly against LLM backends without
requiring NATS, actors, or running infrastructure. This is the council
equivalent of :class:heddle.workshop.test_runner.WorkerTestRunner.
Usage::
from heddle.worker.backends import build_backends_from_env
from heddle.contrib.council.config import load_council_config
from heddle.contrib.council.runner import CouncilRunner
config = load_council_config("configs/councils/example.yaml")
runner = CouncilRunner(build_backends_from_env())
result = await runner.run("Should we adopt microservices?", config=config)
CouncilRunner ¶
Execute a council discussion directly against LLM backends.
This replicates the multi-round deliberation loop without NATS.
Each agent turn calls backend.complete() directly, builds a
transcript entry, and feeds it into the next round.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backends
|
dict[str, LLMBackend] | None
|
Dict mapping tier name ( |
None
|
config
|
CouncilConfig | None
|
Optional default :class: |
None
|
Source code in src/heddle/contrib/council/runner.py
inject ¶
Inject a spectator interjection into the active discussion.
Safe to call from another thread or coroutine while :meth:run
is executing. The interjection will appear in the next agent's
context as an audience reaction.
Raises :class:RuntimeError if no discussion is active.
Source code in src/heddle/contrib/council/runner.py
run
async
¶
run(topic: str, config: CouncilConfig | None = None, on_turn: Callable | None = None) -> CouncilResult
Run a full council deliberation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The discussion topic / question. |
required |
config
|
CouncilConfig | None
|
Council config (overrides the constructor default). |
None
|
on_turn
|
Callable | None
|
Optional callback invoked after each agent's turn
with the :class: |
None
|
Returns:
| Type | Description |
|---|---|
CouncilResult
|
class: |
CouncilResult
|
convergence info, and token usage. |
Source code in src/heddle/contrib/council/runner.py
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | |
aclose
async
¶
Close any cached :class:ChatBridge sessions.
ChatBridges may hold open httpx.AsyncClient connections.
Call this when you are done with the runner to release them
cleanly. Safe to call multiple times.
Source code in src/heddle/contrib/council/runner.py
config ¶
Council configuration loading and validation.
Loads council YAML configs into typed :class:CouncilConfig models.
Follows the same pattern as :func:heddle.core.config.load_config.
CouncilConfig ¶
Bases: BaseModel
Top-level council configuration, loaded from YAML.
Two timeouts compose the overall budget:
timeout_seconds— total wall time for the council run.synthesis_timeout_seconds— the budget reserved for the facilitator's synthesis call at the end. Subtracted from the total to compute the per-turn budget.
Why a separate synthesis budget: the original shape divided
timeout_seconds evenly across max_rounds * len(agents)
turns, leaving synthesis unbounded — which masked spurious
timeouts (the synthesis could wedge indefinitely on a frontier
model) and made per-turn budgeting fragile. Carving synthesis
out makes both halves explicit and bounded.
per_turn_timeout ¶
Compute the per-turn timeout in seconds.
Centralised so the orchestrator and any future caller agree on
the formula. Subtracts synthesis_timeout_seconds from the
total budget and divides across rounds * agents.
Source code in src/heddle/contrib/council/config.py
load_council_config ¶
Load a council YAML config and return a validated model.
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If path does not exist. |
ValidationError
|
If the YAML content is invalid. |
Source code in src/heddle/contrib/council/config.py
schemas ¶
Pydantic models for the council deliberation framework.
These models form the typed contract for multi-round agent discussions.
They are used by :class:CouncilRunner (NATS-free execution),
:class:CouncilOrchestrator (NATS-connected), and the MCP council bridge.
AgentConfig ¶
Bases: BaseModel
Configuration for a single council agent.
Each agent is backed by either an existing Heddle worker (via worker_type)
or an external chat bridge (via bridge). Exactly one must be set.
CouncilResult ¶
Bases: BaseModel
Final output of a council deliberation.
ConvergenceResult ¶
Bases: BaseModel
Result of a convergence check after a round.
TranscriptEntry ¶
Bases: BaseModel
A single contribution within a discussion round.
entry_type distinguishes panelist turns from audience
interjections. Agents may choose to engage with interjections
or ignore them — the protocol presents them separately.
ChatBridge — External Chat Adapters¶
Session-aware adapters for Claude, OpenAI, Ollama, and human-in-the-loop participation. Each adapter maintains per-session conversation history.
base ¶
ChatBridge ABC and shared data models.
All chat bridges implement :class:ChatBridge, which provides a
session-aware interface for multi-turn conversations with external
LLM providers or human participants.
ChatBridge ¶
Bases: ABC
Abstract base for external chat session adapters.
Each bridge maintains per-session conversation history. The worker itself remains stateless (per Heddle invariants) — the state lives in the bridge's internal session dict or in the external provider's session.
Subclasses must implement :meth:send_turn,
:meth:get_session_info, and :meth:close_session.
Source code in src/heddle/contrib/chatbridge/base.py
send_turn
abstractmethod
async
¶
Send a message and get a response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
The user message for this turn. |
required |
context
|
dict[str, Any]
|
Additional context (round metadata, topic, etc.). |
required |
session_id
|
str
|
Identifies the persistent conversation session. |
required |
Returns:
| Type | Description |
|---|---|
ChatResponse
|
The assistant's response as a :class: |
Source code in src/heddle/contrib/chatbridge/base.py
get_session_info
abstractmethod
async
¶
close_session
async
¶
aclose
async
¶
Release any I/O resources held by this bridge.
Subclasses that hold open connections (e.g. an
httpx.AsyncClient) override this to close them. Idempotent —
safe to call more than once. The default clears any in-memory
session state; subclasses that override should call super()
last so sessions are cleared after their resources are released.
Source code in src/heddle/contrib/chatbridge/base.py
ChatResponse ¶
Bases: BaseModel
Response from a chat bridge turn.
For "thinking" / chain-of-thought models served by some
OpenAI-compatible providers (LM Studio, vLLM with reasoning
models, DeepSeek, …), the visible answer may be empty while the
model produces a long internal monologue. When that happens,
bridges fall back to using the reasoning text as content so
downstream consumers see something; the raw monologue is still
available on reasoning_content for callers that want to
surface it separately (or strip it from logs).
SessionInfo ¶
Bases: BaseModel
Metadata about an active chat session.
anthropic ¶
Anthropic (Claude) chat bridge — session-aware Claude API adapter.
Unlike :class:heddle.worker.backends.AnthropicBackend which is stateless
per-call, this bridge accumulates messages per session, enabling
multi-turn conversations with Claude.
AnthropicChatBridge ¶
AnthropicChatBridge(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514', system_prompt: str = '', max_tokens: int = 2000)
Bases: ChatBridge
Claude API with per-session conversation history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str | None
|
Anthropic API key. Falls back to |
None
|
model
|
str
|
Model identifier (default: claude-sonnet-4-20250514). |
'claude-sonnet-4-20250514'
|
system_prompt
|
str
|
System instructions applied to all sessions. |
''
|
max_tokens
|
int
|
Default max tokens per turn. |
2000
|
Source code in src/heddle/contrib/chatbridge/anthropic.py
send_turn
async
¶
Send a turn to Claude, accumulating session messages.
Session history is only updated after the API call returns.
Earlier the user message was appended eagerly, so an HTTP
failure left it in session.messages; the next turn then
sent two consecutive user messages, which Claude rejects.
Source code in src/heddle/contrib/chatbridge/anthropic.py
get_session_info
async
¶
Return session metadata.
Source code in src/heddle/contrib/chatbridge/anthropic.py
aclose
async
¶
openai ¶
OpenAI chat bridge — session-aware OpenAI/ChatGPT adapter.
Supports any OpenAI-compatible API (OpenAI, Azure OpenAI, etc.).
OpenAIChatBridge ¶
OpenAIChatBridge(api_key: str | None = None, model: str = 'gpt-4o', base_url: str = 'https://api.openai.com', system_prompt: str = '', max_tokens: int = 2000)
Bases: ChatBridge
OpenAI Chat Completions API with per-session conversation history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
api_key
|
str | None
|
OpenAI API key. Falls back to |
None
|
model
|
str
|
Model identifier (default: gpt-4o). |
'gpt-4o'
|
base_url
|
str
|
API base URL (default: OpenAI). |
'https://api.openai.com'
|
system_prompt
|
str
|
System instructions applied to all sessions. |
''
|
max_tokens
|
int
|
Default max tokens per turn. |
2000
|
Source code in src/heddle/contrib/chatbridge/openai.py
send_turn
async
¶
Send a turn via OpenAI Chat Completions, accumulating history.
Session history is only updated after the API call returns
successfully. Earlier the user message was appended eagerly,
so an HTTP failure left it in the session and the next turn
sent two consecutive user messages — OpenAI accepts that
shape but produces confused output.
Source code in src/heddle/contrib/chatbridge/openai.py
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | |
get_session_info
async
¶
Return session metadata.
Source code in src/heddle/contrib/chatbridge/openai.py
aclose
async
¶
ollama ¶
Ollama chat bridge — session-aware local model adapter.
Wraps the Ollama /api/chat endpoint with per-session conversation
history for multi-turn local model interactions.
OllamaChatBridge ¶
OllamaChatBridge(model: str = 'llama3.2:3b', base_url: str = 'http://localhost:11434', system_prompt: str = '', max_tokens: int = 2000)
Bases: ChatBridge
Ollama chat API with per-session conversation history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
str
|
Ollama model name (default: llama3.2:3b). |
'llama3.2:3b'
|
base_url
|
str
|
Ollama server URL (default: http://localhost:11434). |
'http://localhost:11434'
|
system_prompt
|
str
|
System instructions applied to all sessions. |
''
|
max_tokens
|
int
|
Default max tokens per turn (num_predict). |
2000
|
Source code in src/heddle/contrib/chatbridge/ollama.py
send_turn
async
¶
Send a turn via Ollama /api/chat, accumulating history.
Session history is only updated after the API call returns. Earlier eagerness left a dangling user message on HTTP failure; on retry the next turn sent two consecutive user messages, which different Ollama backends handled inconsistently.
Source code in src/heddle/contrib/chatbridge/ollama.py
get_session_info
async
¶
Return session metadata.
Source code in src/heddle/contrib/chatbridge/ollama.py
aclose
async
¶
manual ¶
Manual chat bridge — human-in-the-loop adapter.
Allows a human participant to join a council discussion or other multi-agent flow. Two modes:
1. **Callback mode** — provide an ``on_prompt`` async callable that
receives the context and returns a response string.
2. **Queue mode** — prompts are put onto an ``asyncio.Queue``, and
responses are awaited from a separate response queue.
Both modes enforce a timeout to prevent indefinite blocking.
ManualChatBridge ¶
ManualChatBridge(on_prompt: Callable[[str, dict, str], Awaitable[str]] | None = None, prompt_queue: Queue | None = None, response_queue: Queue | None = None, timeout_seconds: float = 300.0, system_prompt: str = '')
Bases: ChatBridge
Human-in-the-loop chat bridge.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
on_prompt
|
Callable[[str, dict, str], Awaitable[str]] | None
|
Async callback |
None
|
prompt_queue
|
Queue | None
|
Queue where prompts are put for external consumption. |
None
|
response_queue
|
Queue | None
|
Queue where responses are expected. |
None
|
timeout_seconds
|
float
|
Max time to wait for a human response. |
300.0
|
system_prompt
|
str
|
System instructions (informational for the human). |
''
|
Source code in src/heddle/contrib/chatbridge/manual.py
send_turn
async
¶
Request a human response for this turn.
Session history is only updated after the human responds. Earlier the prompt was appended eagerly, so a timeout or queue failure left it in history; on retry the human would see two consecutive prompts.
Source code in src/heddle/contrib/chatbridge/manual.py
get_session_info
async
¶
Return session metadata.
Source code in src/heddle/contrib/chatbridge/manual.py
worker ¶
ChatBridgeBackend — wraps a ChatBridge as a Heddle ProcessingBackend.
This enables any ChatBridge adapter to be used as a standard Heddle worker via YAML config alone, without writing Python code::
name: "external_gpt4"
processing_backend: "heddle.contrib.chatbridge.worker.ChatBridgeBackend"
processing_config:
bridge_class: "heddle.contrib.chatbridge.openai.OpenAIChatBridge"
model: "gpt-4o"
api_key_env: "OPENAI_API_KEY"
ChatBridgeBackend ¶
Bases: SyncProcessingBackend
ProcessingBackend that delegates to a ChatBridge adapter.
The bridge class is dynamically imported from config, enabling YAML-only configuration of external chat agents.
Config keys
bridge_class: Dotted import path to a ChatBridge subclass. api_key_env: Optional env var name for API key. system_prompt: Optional system prompt for the bridge. **kwargs: Passed to the bridge constructor.
Source code in src/heddle/contrib/chatbridge/worker.py
process_sync ¶
Not used — we override process() for async bridge calls.
Source code in src/heddle/contrib/chatbridge/worker.py
process
async
¶
Delegate to the bridge's send_turn method.
Expects payload to contain
messageor the entire payload as the message_session_id(optional, defaults to "default")_context(optional, additional context dict)
Source code in src/heddle/contrib/chatbridge/worker.py
aclose
async
¶
Close the wrapped :class:ChatBridge (releases httpx clients).
Valkey/Redis Store¶
Production checkpoint store using Redis/Valkey. Replaces the default in-memory store for persistent orchestrator checkpoints.
store ¶
Valkey-backed key-value store.
Production implementation of :class:heddle.core.kvstore.KeyValueStore using
redis.asyncio (redis-py). The redis-py client library works unchanged
with Valkey.
Install with: pip install heddle-ai[redis].
Connection defaults
redis://redis:6379 — matches the Docker Compose / k8s service name.
For local dev: redis://localhost:6379.
RedisKeyValueStore ¶
Bases: KeyValueStore
Valkey-backed key-value store (via redis-py client).
Thin wrapper around redis.asyncio that implements the
KeyValueStore interface. Handles connection lifecycle and TTL-based
expiry natively. The redis-py client works unchanged with Valkey.
Source code in src/heddle/contrib/redis/store.py
DuckDB Query Backend¶
Action-dispatch query backend for DuckDB. Supports full-text search, filtering, statistics, single-row get, and vector similarity search.
query_backend ¶
Generic DuckDB query and analytics backend for Heddle workflows.
Provides a configurable action-dispatch query backend against any DuckDB table. Supports full-text search (via DuckDB FTS), attribute filtering, aggregate statistics, single-record retrieval, and vector similarity search.
Subclasses configure domain-specific behavior by passing constructor
parameters (table name, columns, filter definitions, etc.) rather than
overriding methods. For advanced customisation, override _get_handlers
to add or replace action handlers.
Example worker config::
processing_backend: "myapp.backends.MyQueryBackend"
backend_config:
db_path: "/tmp/workspace/data.duckdb"
See Also
heddle.worker.processor.SyncProcessingBackend -- base class for sync backends heddle.contrib.duckdb.DuckDBViewTool -- LLM-callable view tool heddle.contrib.duckdb.DuckDBVectorTool -- LLM-callable vector search tool
DuckDBQueryError ¶
Bases: BackendError
Raised when a DuckDB query operation fails.
Wraps underlying DuckDB exceptions with a descriptive message
and the original cause attached via __cause__.
DuckDBQueryBackend ¶
DuckDBQueryBackend(db_path: str = '/tmp/workspace/data.duckdb', *, table_name: str = 'documents', result_columns: list[str] | None = None, json_columns: set[str] | None = None, id_column: str = 'id', full_text_column: str | None = 'full_text', fts_fields: str = 'full_text,summary', filter_fields: dict[str, str] | None = None, stats_groups: set[str] | None = None, stats_aggregates: list[str] | None = None, default_order_by: str = 'rowid', embedding_column: str = 'embedding')
Bases: SyncProcessingBackend
Generic action-dispatch query backend for DuckDB tables.
Opens a read-only connection to the DuckDB database and dispatches
to the appropriate query handler based on the action field in
the payload.
All queries use parameterized statements to prevent SQL injection.
Results from search/filter actions exclude large content columns
(configurable via full_text_column) to keep messages small.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
str
|
Path to the DuckDB database file. |
'/tmp/workspace/data.duckdb'
|
table_name
|
str
|
Table to query. |
'documents'
|
result_columns
|
list[str] | None
|
Columns returned in search/filter results. |
None
|
json_columns
|
set[str] | None
|
Set of column names containing JSON strings that should be parsed back into Python objects on read. |
None
|
id_column
|
str
|
Primary key column name for the |
'id'
|
full_text_column
|
str | None
|
Large content column included only in
|
'full_text'
|
fts_fields
|
str
|
Comma-separated field names for DuckDB FTS
|
'full_text,summary'
|
filter_fields
|
dict[str, str] | None
|
Mapping of payload field names to SQL condition
templates. Example: |
None
|
stats_groups
|
set[str] | None
|
Set of column names allowed as |
None
|
stats_aggregates
|
list[str] | None
|
SQL aggregate expressions for the stats
query. Defaults to |
None
|
default_order_by
|
str
|
ORDER BY clause for filter results. |
'rowid'
|
embedding_column
|
str
|
Column name for vector embeddings used
in the |
'embedding'
|
Source code in src/heddle/contrib/duckdb/query_backend.py
process_sync ¶
Dispatch a query action against the DuckDB database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Must contain |
required |
config
|
dict[str, Any]
|
Worker config dict. May include |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dict with |
dict[str, Any]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the action is unknown. |
DuckDBQueryError
|
If the database query fails. |
Source code in src/heddle/contrib/duckdb/query_backend.py
DuckDB View Tool¶
Read-only DuckDB view exposed as an LLM-callable tool. Workers can query structured data during processing.
view_tool ¶
DuckDB view tool — exposes a DuckDB view as an LLM-callable tool.
When configured in a worker's knowledge_silos, this tool lets the LLM query a read-only DuckDB view during reasoning. The LLM can search (full-text) or list records from the view.
Example knowledge_silos config::
knowledge_silos:
- name: "catalog"
type: "tool"
provider: "heddle.contrib.duckdb.DuckDBViewTool"
config:
db_path: "/tmp/workspace/data.duckdb"
view_name: "summaries"
description: "Search and browse record summaries"
max_results: 20
The tool auto-introspects the view's columns via DESCRIBE to build its JSON Schema definition. Queries use parameterized SQL to prevent injection.
DuckDBViewTool ¶
DuckDBViewTool(db_path: str, view_name: str, description: str = 'Query a database view', max_results: int = 20)
Bases: SyncToolProvider
Expose a DuckDB view as an LLM-callable search/list tool.
The tool dynamically introspects the view's column schema at instantiation time and builds a JSON Schema tool definition that the LLM can call.
Supports two operations
search: Full-text ILIKE search across all text columnslist: List recent records with optional column filters
All queries are parameterized and results are capped at max_results.
Source code in src/heddle/contrib/duckdb/view_tool.py
get_definition ¶
Build JSON Schema tool definition from view columns.
Source code in src/heddle/contrib/duckdb/view_tool.py
execute_sync ¶
Execute a query against the DuckDB view.
Source code in src/heddle/contrib/duckdb/view_tool.py
DuckDB Vector Tool¶
Semantic similarity search via DuckDB embeddings, exposed as an LLM tool.
vector_tool ¶
DuckDB vector similarity search tool for LLM function-calling.
Uses embedding vectors stored in DuckDB to find semantically similar
records. Query text is embedded via Ollama at search time, then
compared against stored vectors using DuckDB's list_cosine_similarity.
Example knowledge_silos config::
knowledge_silos:
- name: "similar_items"
type: "tool"
provider: "heddle.contrib.duckdb.DuckDBVectorTool"
config:
db_path: "/tmp/workspace/data.duckdb"
table_name: "documents"
result_columns: ["id", "title", "summary", "created_at"]
embedding_column: "embedding"
tool_name: "find_similar"
description: "Find records semantically similar to a query"
embedding_model: "nomic-embed-text"
See Also
heddle.worker.embeddings -- OllamaEmbeddingProvider heddle.worker.tools -- SyncToolProvider base class
DuckDBVectorTool ¶
DuckDBVectorTool(db_path: str, table_name: str = 'documents', result_columns: list[str] | None = None, embedding_column: str = 'embedding', tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)
Bases: SyncToolProvider
Semantic similarity search over DuckDB vector embeddings.
Generates a query embedding via Ollama, then uses DuckDB's
list_cosine_similarity function to find the most similar
records by their stored embedding vectors.
Only records with non-null embeddings are searched.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
str
|
Path to the DuckDB database file. |
required |
table_name
|
str
|
Table containing the records and embeddings. |
'documents'
|
result_columns
|
list[str] | None
|
Columns to include in results. If None,
introspects the table schema at first use, excluding
the embedding column and any column named |
None
|
embedding_column
|
str
|
Name of the column storing embedding vectors. |
'embedding'
|
tool_name
|
str
|
Name exposed in the LLM tool definition. |
'find_similar'
|
description
|
str
|
Description exposed in the LLM tool definition. |
'Find semantically similar records'
|
embedding_model
|
str
|
Ollama model name for embedding generation. |
'nomic-embed-text'
|
ollama_url
|
str | None
|
Optional custom Ollama server URL. |
None
|
max_results
|
int
|
Hard cap on returned results. |
10
|
Source code in src/heddle/contrib/duckdb/vector_tool.py
result_columns
property
¶
Return result columns, introspecting on first access if needed.
get_definition ¶
Return tool definition for LLM function-calling.
Source code in src/heddle/contrib/duckdb/vector_tool.py
execute_sync ¶
Embed the query and search for similar records.
Source code in src/heddle/contrib/duckdb/vector_tool.py
LanceDB Vector Store¶
ANN vector storage and search via LanceDB. Faster than DuckDB for large
datasets. Implements the VectorStore ABC.
store ¶
LanceDB-backed vector store for embedded text chunks.
Stores EmbeddedChunk records in a LanceDB table with native vector columns. Supports: - Batch insertion of TextChunk objects (with embedding generation) - Pre-embedded chunk insertion - Approximate Nearest Neighbor (ANN) similarity search - Metadata filtering (e.g. by channel_id) - Basic CRUD (get, delete by chunk_id)
Uses Heddle's :class:OllamaEmbeddingProvider (default) or
:class:OpenAICompatibleEmbeddingProvider (e.g. for LM Studio) for
query embedding generation.
LanceDB provides ANN indexing for faster search over large datasets compared to exact cosine similarity in DuckDB.
LanceDBVectorStore ¶
LanceDBVectorStore(db_path: str = '/tmp/rag-vectors.lance', embedding_model: str = 'nomic-embed-text', ollama_url: str = 'http://localhost:11434', embedding_backend: str = 'ollama', embedding_url: str | None = None)
Bases: VectorStore
Embedded vector store backed by LanceDB.
Usage::
store = LanceDBVectorStore("/tmp/rag-vectors.lance")
store.initialize()
# Embed and store chunks
store.add_chunks(chunks)
# Search
results = store.search("earthquake damage", limit=5)
store.close()
Initialize the store.
See :class:heddle.contrib.rag.vectorstore.duckdb_store.DuckDBVectorStore
for parameter semantics. embedding_backend selects between
:class:OllamaEmbeddingProvider and
:class:OpenAICompatibleEmbeddingProvider (LM Studio etc.).
Source code in src/heddle/contrib/lancedb/store.py
initialize ¶
Open or create the LanceDB database and table.
Source code in src/heddle/contrib/lancedb/store.py
close ¶
add_chunks ¶
Embed and insert TextChunk objects. Returns count of inserted rows.
Source code in src/heddle/contrib/lancedb/store.py
add_embedded_chunks ¶
Insert pre-embedded chunks (no embedding generation needed).
Source code in src/heddle/contrib/lancedb/store.py
search ¶
search(query: str, limit: int = 10, min_score: float = 0.0, channel_ids: list[int] | None = None) -> list[SimilarityResult]
Semantic similarity search using LanceDB ANN.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Natural language query (embedded via Ollama) |
required |
limit
|
int
|
Maximum results to return |
10
|
min_score
|
float
|
Minimum cosine similarity threshold |
0.0
|
channel_ids
|
list[int] | None
|
Optional filter by source channel |
None
|
Returns:
| Type | Description |
|---|---|
list[SimilarityResult]
|
List of SimilarityResult sorted by descending similarity |
Source code in src/heddle/contrib/lancedb/store.py
count ¶
get ¶
Retrieve a single embedded chunk by ID.
Source code in src/heddle/contrib/lancedb/store.py
delete ¶
Delete a chunk by ID. Returns True if a row was deleted.
Source code in src/heddle/contrib/lancedb/store.py
delete_by_source ¶
Delete all chunks for a given source post. Returns count.
Source code in src/heddle/contrib/lancedb/store.py
stats ¶
Return summary statistics about the store.
Earlier to_pandas() on the full table OOM'd on large
stores (a 10M-chunk LanceDB table with 768-dim float32
vectors is ~30 GB once materialised). We now project to
scalar columns only via search().select(...).to_list()
so the vector column is never copied out of LanceDB.
Falls back to a row count if even the projected scan
fails (e.g. backend doesn't support the API combination).
Source code in src/heddle/contrib/lancedb/store.py
LanceDB Vector Tool¶
Semantic similarity search via LanceDB, exposed as an LLM tool.
tool ¶
LanceDB vector similarity search tool for LLM function-calling.
Uses embedding vectors stored in LanceDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using LanceDB's ANN search.
Example knowledge_silos config::
knowledge_silos:
- name: "similar_items"
type: "tool"
provider: "heddle.contrib.lancedb.LanceDBVectorTool"
config:
db_path: "/tmp/workspace/rag-vectors.lance"
table_name: "rag_chunks"
tool_name: "find_similar"
description: "Find records semantically similar to a query"
embedding_model: "nomic-embed-text"
See Also
heddle.worker.embeddings -- OllamaEmbeddingProvider heddle.worker.tools -- SyncToolProvider base class
LanceDBVectorTool ¶
LanceDBVectorTool(db_path: str, table_name: str = 'rag_chunks', vector_column: str = 'vector', result_columns: list[str] | None = None, tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)
Bases: SyncToolProvider
Semantic similarity search over LanceDB vector embeddings.
Generates a query embedding via Ollama, then uses LanceDB's ANN search to find the most similar records by their stored vectors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
str
|
Path to the LanceDB database directory. |
required |
table_name
|
str
|
Table containing the records and embeddings. |
'rag_chunks'
|
vector_column
|
str
|
Name of the column storing embedding vectors. |
'vector'
|
result_columns
|
list[str] | None
|
Columns to include in results. If None, returns chunk_id, text, source_channel_id, source_global_id. |
None
|
tool_name
|
str
|
Name exposed in the LLM tool definition. |
'find_similar'
|
description
|
str
|
Description exposed in the LLM tool definition. |
'Find semantically similar records'
|
embedding_model
|
str
|
Ollama model name for embedding generation. |
'nomic-embed-text'
|
ollama_url
|
str | None
|
Optional custom Ollama server URL. |
None
|
max_results
|
int
|
Hard cap on returned results. |
10
|
Source code in src/heddle/contrib/lancedb/tool.py
get_definition ¶
Return tool definition for LLM function-calling.
Source code in src/heddle/contrib/lancedb/tool.py
execute_sync ¶
Embed the query and search for similar records.
Source code in src/heddle/contrib/lancedb/tool.py
Document Processing — Extractors¶
Pluggable extractors for PDF / DOCX / Markdown / HTML inputs. All backends
produce the same ExtractorOutput so downstream pipeline stages don't
need to know which engine ran.
contracts ¶
Pydantic I/O contracts for document processing.
These models define the typed input/output schema shared by all extraction
backends (Docling, MarkItDown, SmartExtractor). Worker YAML configs can
reference them via input_schema_ref / output_schema_ref.
markitdown_backend ¶
MarkItDown-based document extraction backend.
Wraps Microsoft MarkItDown to convert PDF, DOCX, PPTX, XLSX, HTML, and other formats to Markdown. Much faster than Docling (no ML models, no torch) but cannot OCR scanned PDFs or extract complex table structures.
This backend produces the same output schema as DoclingBackend so it can be used as a drop-in replacement in the pipeline.
Input: {"file_ref": "filename.pdf"} (relative to workspace_dir) Output: {"file_ref": "filename_extracted.json", "page_count": N, "has_tables": bool, "sections": [...], "text_preview": "..."}
MarkItDown does not provide structural metadata (page count, tables, sections), so these are derived from the Markdown output:
page_count — Estimated from form-feed characters or defaults to 1.
has_tables — Detected via Markdown table syntax (| --- |).
sections — Parsed from Markdown heading lines (# / ## / ###).
See Also
configs/workers/doc_extractor_smart.yaml -- smart extractor config docman.backends.smart_extractor -- composite backend with fallback heddle.worker.processor.SyncProcessingBackend -- base class
MarkItDownConversionError ¶
Bases: BackendError
Raised when MarkItDown fails to convert a document.
Wraps underlying MarkItDown exceptions with a descriptive message
and the original cause attached via __cause__.
MarkItDownBackend ¶
Bases: SyncProcessingBackend
SyncProcessingBackend that uses Microsoft MarkItDown for extraction.
Fast, lightweight document-to-Markdown conversion without ML models. Suitable for well-structured digital PDFs, DOCX, PPTX, XLSX, HTML, and other text-based formats. Not suitable for scanned/image-based PDFs that require OCR.
Produces the same output contract as DoclingBackend so downstream pipeline stages (classifier, summarizer, ingest) work unchanged.
Source code in src/heddle/contrib/docproc/markitdown_backend.py
process_sync ¶
Extract text from a document using MarkItDown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Must contain |
required |
config
|
dict[str, Any]
|
Worker config dict. May include |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If file_ref attempts path traversal. |
FileNotFoundError
|
If the source file does not exist. |
MarkItDownConversionError
|
If MarkItDown fails to convert. |
Source code in src/heddle/contrib/docproc/markitdown_backend.py
docling_backend ¶
Docling-based document extraction backend.
Wraps IBM Docling to extract text, tables, and structure from PDF/DOCX files. Extends SyncProcessingBackend so the synchronous Docling work is automatically offloaded to a thread pool, keeping the async event loop responsive.
This is the first stage in DocMan's document processing pipeline: doc_extractor (this) -> doc_classifier -> doc_summarizer
Input: {"file_ref": "filename.pdf"} (relative to workspace_dir) Output: {"file_ref": "filename_extracted.json", "page_count": N, "has_tables": bool, "sections": [...], "text_preview": "..."}
The extracted JSON is written to workspace_dir and contains the full document text. Subsequent stages reference it via file_ref to avoid passing large text through NATS messages.
See Also
configs/workers/doc_extractor.yaml -- worker config with I/O schemas docs/docling-setup.md -- full Docling configuration and tuning guide heddle.worker.processor.SyncProcessingBackend -- base class for sync backends heddle.core.workspace.WorkspaceManager -- file-ref resolution with path safety
DoclingConversionError ¶
Bases: BackendError
Raised when Docling fails to convert a document.
Wraps underlying Docling exceptions (corrupt PDFs, unsupported formats,
out-of-memory conditions) with a descriptive message and the original
cause attached via __cause__.
DoclingBackend ¶
Bases: SyncProcessingBackend
SyncProcessingBackend that uses IBM Docling for document extraction.
Reads a source document (PDF or DOCX) from the workspace directory, runs Docling's DocumentConverter to extract text, tables, and structural metadata, then writes the full extracted content as JSON back to the workspace. Returns a lightweight summary (file_ref, page_count, has_tables, sections, text_preview) suitable for passing through NATS messages to downstream pipeline stages.
Because Docling is synchronous and CPU-bound, this backend extends SyncProcessingBackend which automatically offloads process_sync() to a thread pool via run_in_executor.
Attributes:
| Name | Type | Description |
|---|---|---|
workspace_dir |
Default workspace path. Can be overridden per-call
via the |
Source code in src/heddle/contrib/docproc/docling_backend.py
process_sync ¶
Extract text and structure from a document using Docling.
Validates the input file_ref for path traversal and existence, then runs Docling's DocumentConverter synchronously (the parent class handles thread pool offloading).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, Any]
|
Must contain |
required |
config
|
dict[str, Any]
|
Worker config dict (from the YAML's |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dict with |
dict[str, Any]
|
|
dict[str, Any]
|
unpacks this and publishes the TaskResult to NATS. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If file_ref attempts to escape the workspace directory (path traversal attack). |
FileNotFoundError
|
If the source file does not exist in the workspace. |
DoclingConversionError
|
If Docling fails to convert the document (corrupt file, unsupported format, OOM, etc.). |