Skip to content

Tracing & Observability

otel

OpenTelemetry integration for Heddle distributed tracing.

All public functions in this module are safe to call without opentelemetry installed — they degrade to no-ops. This lets production code instrument unconditionally while making OTel an optional dependency.

Trace context propagation uses W3C traceparent format, injected into NATS message dicts under the _trace_context key.

GenAI semantic conventions ~~~~~~~~~~~~~~~~~~~~~~~~~~

LLM call spans (llm.call) in worker/runner.py follow the emerging OTel GenAI semantic conventions for attribute naming:

  • gen_ai.system — provider identifier (anthropic, ollama, openai)
  • gen_ai.request.model / gen_ai.response.model — model names
  • gen_ai.usage.input_tokens / gen_ai.usage.output_tokens — token counts
  • gen_ai.request.temperature / gen_ai.request.max_tokens — request params

When HEDDLE_TRACE_CONTENT=1, prompt and completion text are recorded as span events (gen_ai.content.prompt, gen_ai.content.completion).

See: https://opentelemetry.io/docs/specs/semconv/gen-ai/

Legacy llm.* attributes are preserved for backward compatibility.

Setup::

from heddle.tracing import init_tracing
init_tracing("heddle-pipeline", endpoint="http://localhost:4317")

init_tracing

init_tracing(service_name: str = 'heddle', *, endpoint: str | None = None) -> bool

Initialize OTel tracing with OTLP exporter.

Idempotent: a second call is a no-op that returns True. Without this guard, calling init_tracing twice triggered the OTel SDK's "Overriding of current TracerProvider is not allowed" warning, which surfaced in tests and in CLI commands that re-imported the tracing module.

Parameters:

Name Type Description Default
service_name str

Service name reported to the collector.

'heddle'
endpoint str | None

OTLP gRPC endpoint (e.g. http://localhost:4317). Defaults to the OTEL_EXPORTER_OTLP_ENDPOINT env var.

None

Returns:

Type Description
bool

True if OTel was initialized (or was already initialized),

bool

False if not installed.

Source code in src/heddle/tracing/otel.py
def init_tracing(
    service_name: str = "heddle",
    *,
    endpoint: str | None = None,
) -> bool:
    """Initialize OTel tracing with OTLP exporter.

    Idempotent: a second call is a no-op that returns ``True``.  Without
    this guard, calling ``init_tracing`` twice triggered the OTel SDK's
    "Overriding of current TracerProvider is not allowed" warning, which
    surfaced in tests and in CLI commands that re-imported the tracing
    module.

    Args:
        service_name: Service name reported to the collector.
        endpoint: OTLP gRPC endpoint (e.g. ``http://localhost:4317``).
            Defaults to the ``OTEL_EXPORTER_OTLP_ENDPOINT`` env var.

    Returns:
        ``True`` if OTel was initialized (or was already initialized),
        ``False`` if not installed.
    """
    global _TRACING_INITIALIZED  # noqa: PLW0603 — module-level singleton flag is the simplest idempotency guard

    if not _HAS_OTEL:
        logger.info("tracing.otel_not_available", hint="install with: uv sync --extra otel")
        return False

    if _TRACING_INITIALIZED:
        logger.debug("tracing.already_initialized", service_name=service_name)
        return True

    try:
        from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (  # type: ignore[import-untyped]
            OTLPSpanExporter,
        )
        from opentelemetry.sdk.resources import Resource  # type: ignore[import-untyped]
        from opentelemetry.sdk.trace import TracerProvider  # type: ignore[import-untyped]
        from opentelemetry.sdk.trace.export import (
            BatchSpanProcessor,  # type: ignore[import-untyped]
        )
    except ImportError:
        logger.warning("tracing.sdk_import_failed", hint="install opentelemetry-sdk and exporter")
        return False

    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)

    exporter_kwargs: dict[str, Any] = {}
    if endpoint:
        exporter_kwargs["endpoint"] = endpoint

    exporter = OTLPSpanExporter(**exporter_kwargs)
    provider.add_span_processor(BatchSpanProcessor(exporter))
    _trace_mod.set_tracer_provider(provider)
    _TRACING_INITIALIZED = True

    # Resolve the *effective* endpoint: prefer explicit kwarg, else
    # fall back to the env var the exporter itself reads. Tracking the
    # resolved value (not the literal kwarg) makes ``status()`` honest
    # about what's actually configured.
    effective_endpoint = endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
    _STATUS.update(
        {
            "enabled": True,
            "service_name": service_name,
            "endpoint": effective_endpoint,
            "exporter_class": f"{type(exporter).__module__}.{type(exporter).__name__}",
        }
    )

    logger.info("tracing.initialized", service_name=service_name, endpoint=endpoint)

    # Initialize metrics alongside spans so consumers get one entry
    # point. Both surfaces share the same service_name and endpoint;
    # failures here are non-fatal — tracing succeeded even if metrics
    # init returns False (e.g. metrics SDK missing while trace SDK is
    # installed).
    from heddle.tracing.metrics import init_metrics

    init_metrics(service_name=service_name, endpoint=endpoint)

    return True

status

status() -> dict[str, Any]

Return a snapshot of the current tracing configuration.

Addresses the inspectability-of-defaults guardrail: callers and operators can ask "is OTel active? what's the endpoint? what exporter is it using?" without guessing or reading process state.

Returns:

Type Description
dict[str, Any]

A dict with these keys (always present, types as documented):

dict[str, Any]
  • enabled (bool): True if the OTel SDK is installed and :func:init_tracing completed successfully. False otherwise — including when OTel is installed but init_tracing has not been called yet, and when an init_tracing attempt failed (e.g. exporter SDK import error).
dict[str, Any]
  • service_name (str | None): the service_name the last successful init_tracing was called with; None if not yet initialized.
dict[str, Any]
  • endpoint (str | None): the effective OTLP endpoint, resolved from (in priority order) the explicit endpoint kwarg to init_tracing or the OTEL_EXPORTER_OTLP_ENDPOINT environment variable. None if neither is set or if not yet initialized.
dict[str, Any]
  • exporter_class (str | None): the fully-qualified class name of the configured span exporter (e.g. "opentelemetry.exporter.otlp.proto.grpc.trace_exporter.OTLPSpanExporter"). None if not yet initialized.

The returned dict is a shallow copy of internal state — mutating it does not affect future status() calls.

TODO(cli): when a heddle status CLI subcommand is added, surface this dict in its output (a one-line "OTel: enabled, endpoint=…" summary plus a verbose mode that prints the full dict). See workspace AUDIT_TODO.md OTel W1.

Source code in src/heddle/tracing/otel.py
def status() -> dict[str, Any]:
    """Return a snapshot of the current tracing configuration.

    Addresses the inspectability-of-defaults guardrail: callers and
    operators can ask "is OTel active? what's the endpoint? what
    exporter is it using?" without guessing or reading process state.

    Returns:
        A dict with these keys (always present, types as documented):

        - ``enabled`` (bool): ``True`` if the OTel SDK is installed
            *and* :func:`init_tracing` completed successfully.
            ``False`` otherwise — including when OTel is installed but
            ``init_tracing`` has not been called yet, and when an
            ``init_tracing`` attempt failed (e.g. exporter SDK import
            error).
        - ``service_name`` (str | None): the ``service_name`` the
            last successful ``init_tracing`` was called with; ``None``
            if not yet initialized.
        - ``endpoint`` (str | None): the effective OTLP endpoint,
            resolved from (in priority order) the explicit ``endpoint``
            kwarg to ``init_tracing`` or the
            ``OTEL_EXPORTER_OTLP_ENDPOINT`` environment variable.
            ``None`` if neither is set or if not yet initialized.
        - ``exporter_class`` (str | None): the fully-qualified class
            name of the configured span exporter (e.g.
            ``"opentelemetry.exporter.otlp.proto.grpc.trace_exporter.OTLPSpanExporter"``).
            ``None`` if not yet initialized.

    The returned dict is a shallow copy of internal state — mutating
    it does not affect future ``status()`` calls.

    TODO(cli): when a ``heddle status`` CLI subcommand is added,
    surface this dict in its output (a one-line "OTel: enabled,
    endpoint=…" summary plus a verbose mode that prints the full
    dict). See workspace ``AUDIT_TODO.md`` OTel W1.
    """
    return dict(_STATUS)

get_tracer

get_tracer(name: str = 'heddle') -> Any

Get a tracer instance (real or no-op depending on OTel availability).

Parameters:

Name Type Description Default
name str

Instrumentation scope name (e.g. heddle.pipeline).

'heddle'

Returns:

Type Description
Any

An OTel Tracer if SDK is available, otherwise a _NoOpTracer.

Source code in src/heddle/tracing/otel.py
def get_tracer(name: str = "heddle") -> Any:
    """Get a tracer instance (real or no-op depending on OTel availability).

    Args:
        name: Instrumentation scope name (e.g. ``heddle.pipeline``).

    Returns:
        An OTel ``Tracer`` if SDK is available, otherwise a ``_NoOpTracer``.
    """
    if _HAS_OTEL:
        return _trace_mod.get_tracer(name)
    return _NOOP_TRACER

inject_trace_context

inject_trace_context(carrier: dict[str, Any]) -> None

Inject current trace context into a message dict.

Adds a _trace_context key containing W3C propagation headers. Safe to call without OTel installed (no-op).

Parameters:

Name Type Description Default
carrier dict[str, Any]

Message dict (modified in-place).

required
Source code in src/heddle/tracing/otel.py
def inject_trace_context(carrier: dict[str, Any]) -> None:
    """Inject current trace context into a message dict.

    Adds a ``_trace_context`` key containing W3C propagation headers.
    Safe to call without OTel installed (no-op).

    Args:
        carrier: Message dict (modified in-place).
    """
    if not _HAS_OTEL:
        return
    headers: dict[str, str] = {}
    _propagate_mod.inject(headers)
    if headers:
        carrier["_trace_context"] = headers

extract_trace_context

extract_trace_context(carrier: dict[str, Any]) -> Any

Extract trace context from a message dict.

Reads the _trace_context key and returns an OTel Context that can be passed to tracer.start_as_current_span(context=...).

Parameters:

Name Type Description Default
carrier dict[str, Any]

Message dict with optional _trace_context key.

required

Returns:

Type Description
Any

An OTel Context if available, otherwise None.

Source code in src/heddle/tracing/otel.py
def extract_trace_context(carrier: dict[str, Any]) -> Any:
    """Extract trace context from a message dict.

    Reads the ``_trace_context`` key and returns an OTel ``Context``
    that can be passed to ``tracer.start_as_current_span(context=...)``.

    Args:
        carrier: Message dict with optional ``_trace_context`` key.

    Returns:
        An OTel ``Context`` if available, otherwise ``None``.
    """
    if not _HAS_OTEL:
        return None
    headers = carrier.get("_trace_context")
    if not headers or not isinstance(headers, dict):
        return None
    return _propagate_mod.extract(headers)

trace_correlation_processor

trace_correlation_processor(logger: Any, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]

Structlog processor that tags log records with the active trace context.

When called inside a span, adds trace_id (32-char hex) and span_id (16-char hex) to the event_dict so downstream renderers and shippers can correlate logs with their span in any OTel backend. No-op when OTel is unavailable or when no span is active.

Wire into a structlog.configure(...) call before the renderer, e.g.::

structlog.configure(processors=[
    structlog.processors.TimeStamper(fmt="iso"),
    trace_correlation_processor,
    structlog.dev.ConsoleRenderer(),
])

The hex encoding matches the W3C traceparent convention used by most OTel backends and the heddle _trace_context wire field.

Source code in src/heddle/tracing/otel.py
def trace_correlation_processor(
    logger: Any,  # noqa: ARG001 — structlog processor signature requires (logger, method, event)
    method_name: str,  # noqa: ARG001 — structlog processor signature requires (logger, method, event)
    event_dict: dict[str, Any],
) -> dict[str, Any]:
    """Structlog processor that tags log records with the active trace context.

    When called inside a span, adds ``trace_id`` (32-char hex) and
    ``span_id`` (16-char hex) to the event_dict so downstream renderers
    and shippers can correlate logs with their span in any OTel backend.
    No-op when OTel is unavailable or when no span is active.

    Wire into a ``structlog.configure(...)`` call before the renderer,
    e.g.::

        structlog.configure(processors=[
            structlog.processors.TimeStamper(fmt="iso"),
            trace_correlation_processor,
            structlog.dev.ConsoleRenderer(),
        ])

    The hex encoding matches the W3C traceparent convention used by
    most OTel backends and the heddle ``_trace_context`` wire field.
    """
    if not _HAS_OTEL:
        return event_dict
    span = _trace_mod.get_current_span()
    span_context = span.get_span_context()
    if not span_context.is_valid:
        return event_dict
    event_dict["trace_id"] = format(span_context.trace_id, "032x")
    event_dict["span_id"] = format(span_context.span_id, "016x")
    return event_dict