Skip to content

Scheduler

scheduler

Scheduler actor — time-driven dispatch of goals and tasks.

The scheduler is a long-lived actor that reads a YAML config defining cron expressions and fixed-interval timers. When a timer fires, it publishes either an OrchestratorGoal or a TaskMessage to the appropriate NATS subject.

Design
  • Extends BaseActor (long-lived, not TaskWorker)
  • Overrides run() to launch a background timer loop alongside the standard message subscription
  • handle_message() is a minimal no-op (satisfies the ABC)
  • Uses croniter for cron parsing, asyncio.sleep for intervals
  • Graceful shutdown cancels the timer loop via _running flag

NATS subjects:

  • Subscribes to: heddle.scheduler.{name} (health checks / future control)
  • Publishes to: heddle.goals.incoming (for dispatch_type "goal") or heddle.tasks.incoming (for dispatch_type "task")

ScheduleEntry dataclass

ScheduleEntry(name: str, cron: str | None, interval_seconds: float | None, dispatch_type: str, goal_config: dict[str, Any] | None = None, task_config: dict[str, Any] | None = None, expand_from: str | None = None, next_fire: float = 0.0)

Parsed schedule entry from YAML config.

If expand_from is set, the scheduler calls the referenced function before each fire. The function must return a list of context dicts. One goal/task is dispatched per dict, with the context merged into the payload (for tasks) or context (for goals). This enables per-session dispatch where the expansion function queries for active sessions and returns [{"session_id": "s1"}, {"session_id": "s2"}].

SchedulerActor

SchedulerActor(actor_id: str, config_path: str, nats_url: str = 'nats://nats:4222', *, bus: MessageBus | None = None)

Bases: BaseActor

Time-driven actor that dispatches goals and tasks on schedule.

All schedules are defined at startup via YAML config. The actor maintains a background timer loop that checks schedules every second and fires due entries by publishing to the appropriate NATS subject.

Source code in src/heddle/scheduler/scheduler.py
def __init__(
    self,
    actor_id: str,
    config_path: str,
    nats_url: str = "nats://nats:4222",
    *,
    bus: MessageBus | None = None,
) -> None:
    super().__init__(actor_id, nats_url, bus=bus)
    # Stored so on_reload() can re-read from the same file.
    self._config_path = config_path
    self.config = self._load_config(config_path)
    self._schedules: list[ScheduleEntry] = self._parse_schedules(
        self.config.get("schedules", [])
    )
    self._timer_task: asyncio.Task | None = None

run async

run(subject: str, queue_group: str | None = None) -> None

Start the scheduler with background timer loop and subscription.

Parallels :meth:BaseActor.run but adds a background timer loop alongside the subscription loop and the control listener. Earlier revisions omitted the control listener, which silently disabled config hot-reload (heddle.control.reload was never observed) — the listener is now started exactly like BaseActor's version so on_reload() triggers from broadcasts.

Source code in src/heddle/scheduler/scheduler.py
async def run(self, subject: str, queue_group: str | None = None) -> None:
    """Start the scheduler with background timer loop and subscription.

    Parallels :meth:`BaseActor.run` but adds a background timer loop
    alongside the subscription loop and the control listener.  Earlier
    revisions omitted the control listener, which silently disabled
    config hot-reload (``heddle.control.reload`` was never observed)
    — the listener is now started exactly like BaseActor's version
    so ``on_reload()`` triggers from broadcasts.
    """
    self._shutdown_event = asyncio.Event()
    self._semaphore = asyncio.Semaphore(self.max_concurrent)

    await self.connect()
    await self.subscribe(subject, queue_group)
    self._running = True
    self._install_signal_handlers()

    self._initialize_fire_times()

    logger.info(
        "scheduler.running",
        actor_id=self.actor_id,
        subject=subject,
        schedule_count=len(self._schedules),
    )

    # Launch background timer loop and control listener.
    self._timer_task = asyncio.create_task(self._timer_loop())
    control_task = asyncio.create_task(self._run_control_listener())

    try:
        # Standard subscription loop (mirrors BaseActor.run)
        async for data in self._sub:
            if not self._running:
                break
            if self.max_concurrent == 1:
                await self._process_one(data)
            else:
                task = asyncio.create_task(self._process_one(data))
                self._background_tasks.add(task)
                task.add_done_callback(self._background_tasks.discard)
    except asyncio.CancelledError:
        pass
    finally:
        self._running = False
        if self._timer_task and not self._timer_task.done():
            self._timer_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._timer_task
        control_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await control_task
        await self.disconnect()

on_reload async

on_reload() -> None

Re-read the YAML config and rebuild schedules.

Existing fire times are reset because the schedule list itself may have changed; the next _initialize_fire_times happens on the next run(). Until then, in-flight _timer_loop iterations will still see the old _schedules reference until we swap it atomically below.

Source code in src/heddle/scheduler/scheduler.py
async def on_reload(self) -> None:
    """Re-read the YAML config and rebuild schedules.

    Existing fire times are reset because the schedule list itself
    may have changed; the next ``_initialize_fire_times`` happens on
    the next ``run()``.  Until then, in-flight ``_timer_loop``
    iterations will still see the old ``_schedules`` reference until
    we swap it atomically below.
    """
    new_config = self._load_config(self._config_path)
    new_schedules = self._parse_schedules(new_config.get("schedules", []))
    self.config = new_config
    # Atomic swap: any timer-loop iteration in flight either sees
    # the old list (and finishes) or the new list (and starts using
    # it).  We re-initialize fire times immediately so the timer
    # loop has valid ``next_fire`` values for any newly-added entries.
    self._schedules = new_schedules
    self._initialize_fire_times()
    logger.info(
        "scheduler.config_reloaded",
        config_path=self._config_path,
        schedule_count=len(self._schedules),
    )

handle_message async

handle_message(data: dict[str, Any]) -> None

No-op message handler. The scheduler is timer-driven.

Satisfies BaseActor's abstract requirement. A future enhancement could respond to health-check or status queries here.

Source code in src/heddle/scheduler/scheduler.py
async def handle_message(self, data: dict[str, Any]) -> None:
    """No-op message handler.  The scheduler is timer-driven.

    Satisfies BaseActor's abstract requirement.  A future enhancement
    could respond to health-check or status queries here.
    """
    logger.debug(
        "scheduler.message_received",
        actor_id=self.actor_id,
        keys=list(data.keys()),
    )