Skip to content

Worker

Distributed broker consumer. One process subscribes to per-agent task topics, dispatches each TaskMessage through an internal in-process AgentRuntime, and publishes ResultMessage envelopes back on the agent's results topic.

from murmur.worker import Worker

See the Distributed deployments guide for the wire shape and production patterns.

Worker

Worker

Worker(
    *,
    broker: Broker,
    agents: Mapping[str, Agent],
    runtime: AgentRuntime | None = None,
    concurrency: int = 10,
    prefetch: int = 5,
    consumer_id: str | None = None,
    signing_key: bytes | tuple[bytes, ...] | None = None,
    heartbeat_seconds: float = 30.0,
    reclaim_min_idle_ms: int | None = 30000,
)

Consumer for one or more registered agents.

Source code in src/murmur/worker/worker.py
def __init__(
    self,
    *,
    broker: Broker,
    agents: Mapping[str, Agent],
    runtime: AgentRuntime | None = None,
    concurrency: int = 10,
    prefetch: int = 5,
    consumer_id: str | None = None,
    signing_key: bytes | tuple[bytes, ...] | None = None,
    heartbeat_seconds: float = 30.0,
    reclaim_min_idle_ms: int | None = 30_000,
) -> None:
    if not agents:
        raise SpecValidationError("Worker requires at least one agent")
    if concurrency < 1:
        raise SpecValidationError("concurrency must be >= 1")
    if prefetch < 1:
        raise SpecValidationError("prefetch must be >= 1")
    if heartbeat_seconds < 0:
        raise SpecValidationError("heartbeat_seconds must be >= 0 (0 disables)")
    if reclaim_min_idle_ms is not None and reclaim_min_idle_ms < 0:
        raise SpecValidationError(
            "reclaim_min_idle_ms must be >= 0 (None or 0 disables)"
        )
    # Normalise signing_key to a tuple; ``None`` means "verification
    # disabled, broker is trusted" (default — unchanged behaviour).
    # A tuple supports key rotation: stamp new workers with
    # ``(new, old)``, swap publishers to ``new``, then drop ``old``
    # once the queue has drained. Validate non-empty bytes upfront
    # so a misconfiguration like ``signing_key=()`` doesn't silently
    # turn into "every signature rejected".
    self._signing_keys: tuple[bytes, ...] | None
    if signing_key is None:
        self._signing_keys = None
    elif isinstance(signing_key, bytes):
        if not signing_key:
            raise SpecValidationError("signing_key must be non-empty bytes")
        self._signing_keys = (signing_key,)
    else:
        keys = tuple(signing_key)
        if not keys:
            raise SpecValidationError(
                "signing_key tuple must contain at least one key"
            )
        for k in keys:
            if not isinstance(k, bytes) or not k:
                raise SpecValidationError(
                    "every key in signing_key must be non-empty bytes"
                )
        self._signing_keys = keys

    from murmur.events.broker import BrokerEventBridge
    from murmur.events.log import LogEventEmitter
    from murmur.events.multi import MultiEventEmitter
    from murmur.runtime import AgentRuntime as _AgentRuntime

    self._broker = broker
    self._agents: dict[str, Agent] = dict(agents)
    # NB: the worker's runtime MUST be AsyncBackend-backed. Passing a
    # broker-backed runtime here re-publishes tasks → infinite loop.
    #
    # When constructing our own runtime, we wire the distributed event
    # bridge into its emitter chain so per-agent / per-tool events
    # fire BOTH locally (to structlog via LogEventEmitter) AND, when
    # a TaskMessage carries an ``events_topic``, onto the broker for
    # the publisher to relay through its own emitter. The bridge is
    # contextvar-driven — no-op when no topic is bound — so installing
    # it has zero cost when distributed observability isn't used.
    #
    # Users supplying their own runtime keep full control of their
    # emitter chain; document the workaround (wrap their emitter in
    # a Multi with BrokerEventBridge themselves) in docstrings.
    if runtime is None:
        bridge = BrokerEventBridge(broker)
        runtime = _AgentRuntime(
            event_emitter=MultiEventEmitter([LogEventEmitter(), bridge])
        )
    self._runtime: AgentRuntime = runtime
    self._concurrency = concurrency
    self._prefetch = prefetch
    # Stable broker-side consumer name. Defaults to the worker
    # runtime's id so that operators who pin a stable ``runtime_id``
    # (the production pattern) automatically get stable Redis Stream
    # consumer names — pending entries are reclaimed on restart and
    # ``XINFO GROUPS`` consumer count stays bounded by fleet size.
    # Operators who want a different binding (e.g. a k8s pod name)
    # can override; ``None`` is forwarded as-is to the broker, which
    # then falls back to a per-subscription uuid.
    self._consumer_id: str | None = (
        consumer_id if consumer_id is not None else self._runtime.runtime_id
    )
    self._semaphore = asyncio.Semaphore(concurrency)
    self._active: dict[str, asyncio.Task[None]] = {}

    self._on_start: OnStart | None = None
    self._on_complete: OnComplete | None = None
    self._on_error: OnError | None = None
    self._started: bool = False
    self._heartbeat_seconds: float = float(heartbeat_seconds)
    self._heartbeat_task: asyncio.Task[None] | None = None
    self._reclaim_min_idle_ms: int | None = reclaim_min_idle_ms

start async

start() -> None

Begin consuming tasks. Subscribes broker to each agent's topic.

Emits a Murmur-branded startup banner (multi-line, written to stderr) plus a structured worker_started event that includes the per-agent task topics, broker scheme + URL, and concurrency. FastStream's own subscriber chatter is silenced upstream in :func:murmur.backends._brokers.make_broker.

Source code in src/murmur/worker/worker.py
async def start(self) -> None:
    """Begin consuming tasks. Subscribes ``broker`` to each agent's topic.

    Emits a Murmur-branded startup banner (multi-line, written to
    stderr) plus a structured ``worker_started`` event that includes
    the per-agent task topics, broker scheme + URL, and concurrency.
    FastStream's own subscriber chatter is silenced upstream in
    :func:`murmur.backends._brokers.make_broker`.
    """
    if self._started:
        return
    await self._broker.start()
    subscriptions: dict[str, str] = {}
    for agent_name in self._agents:
        topic = task_topic(agent_name)
        # ``group=topic`` puts every Worker serving this agent into one
        # competing-consumer pool — each TaskMessage is delivered to
        # exactly one Worker rather than fanned out to all of them.
        # Without the group, Redis pub-sub / per-process Kafka groups /
        # NATS without queue groups all broadcast, which triples LLM
        # cost and produces orphan results on the publisher.
        #
        # ``prefetch`` caps how many messages this subscriber claims
        # per poll. Lower values give tighter fan-out fairness across
        # the fleet at the cost of more broker round-trips; higher
        # values favour throughput. ``prefetch=1`` is the choice when
        # one worker shouldn't hog a burst.
        await self._broker.subscribe(
            topic,
            self._make_handler(agent_name),
            group=topic,
            prefetch=self._prefetch,
            consumer_id=self._consumer_id,
            reclaim_min_idle_ms=self._reclaim_min_idle_ms,
        )
        subscriptions[agent_name] = topic
    self._started = True

    broker_repr = _broker_repr(self._broker)
    runtime_id = self._runtime.runtime_id
    _print_banner(
        broker=broker_repr,
        runtime_id=runtime_id,
        subscriptions=subscriptions,
        concurrency=self._concurrency,
    )
    await log.ainfo(
        "worker_started",
        agents=list(self._agents.keys()),
        subscriptions=subscriptions,
        results_topic=result_topic(runtime_id),
        broker=broker_repr,
        concurrency=self._concurrency,
    )
    from murmur.events.types import EventType, RuntimeEvent

    broker_scheme = getattr(self._broker, "scheme", None)
    agents_list = list(self._agents.keys())
    await self._runtime.event_emitter.emit(
        RuntimeEvent(
            event_type=EventType.WORKER_STARTED,
            agent_name=runtime_id,
            trace_id=runtime_id,
            payload={
                "runtime_id": runtime_id,
                "agents": agents_list,
                "broker_scheme": broker_scheme,
                "concurrency": self._concurrency,
                "prefetch": self._prefetch,
                "consumer_id": self._consumer_id,
                "heartbeat_seconds": self._heartbeat_seconds,
            },
        )
    )
    if self._heartbeat_seconds > 0:
        self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())

stop async

stop() -> None

Drain in-flight tasks then disconnect the broker.

Source code in src/murmur/worker/worker.py
async def stop(self) -> None:
    """Drain in-flight tasks then disconnect the broker."""
    if not self._started:
        return
    from murmur.events.types import EventType, RuntimeEvent

    runtime_id = self._runtime.runtime_id
    await self._runtime.event_emitter.emit(
        RuntimeEvent(
            event_type=EventType.WORKER_STOPPED,
            agent_name=runtime_id,
            trace_id=runtime_id,
            payload={
                "runtime_id": runtime_id,
                "agents": list(self._agents.keys()),
                "broker_scheme": getattr(self._broker, "scheme", None),
            },
        )
    )
    if self._heartbeat_task is not None:
        self._heartbeat_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._heartbeat_task
        self._heartbeat_task = None
    active = list(self._active.values())
    if active:
        await asyncio.gather(*active, return_exceptions=True)
    await self._broker.stop()
    self._started = False
    await log.ainfo("worker_stopped")