Skip to content

Protocols

The abstract ports Murmur is wired against. Every pluggable component is a typing.Protocol here first; concretes in sibling packages match structurally — no inheritance, no registration. Tests are keyed on the Protocol so every concrete passes the same shared contract suite.

from murmur.core.protocols import (
    Backend,
    BackendStatus,
    Broker,
    ContextPasser,
    EventEmitter,
    MessageHandler,
    Middleware,
    NextStage,
    OnComplete,
    OnError,
    OnStart,
    Pipeline,
    Registry,
    RouteDecision,
    Router,
    Stage,
    ToolDescriptor,
    ToolExecutor,
    ToolProvider,
    ToolsetProvider,
    Worker,
)

A handful of Protocols carry @runtime_checkable so they can be used as Pydantic field types or in isinstance() checks — EventEmitter and ToolsetProvider today.

Execution

Backend

The unit of execution. Concretes: AsyncBackend and JobBackend. Both pass the shared BackendContract test suite.

Backend

Pluggable execution backend.

Implementations must be safe to call concurrently from many tasks against a single instance. State per spawn is keyed by :class:AgentHandle.

spawn async
spawn(agent: Agent, task: TaskSpec, context: AgentContext) -> AgentHandle

Begin executing agent against task and return a handle.

Source code in src/murmur/core/protocols/backend.py
async def spawn(
    self,
    agent: Agent,
    task: TaskSpec,
    context: AgentContext,
) -> AgentHandle:
    """Begin executing ``agent`` against ``task`` and return a handle."""
    ...
status async
status(handle: AgentHandle) -> BackendStatus

Return the current execution state for handle.

Source code in src/murmur/core/protocols/backend.py
async def status(self, handle: AgentHandle) -> BackendStatus:
    """Return the current execution state for ``handle``."""
    ...
kill async
kill(handle: AgentHandle) -> None

Terminate handle early. Idempotent.

Source code in src/murmur/core/protocols/backend.py
async def kill(self, handle: AgentHandle) -> None:
    """Terminate ``handle`` early. Idempotent."""
    ...
result async
result(handle: AgentHandle) -> AgentResult[BaseModel]

Block until handle reaches a terminal state and return its result.

Source code in src/murmur/core/protocols/backend.py
async def result(self, handle: AgentHandle) -> AgentResult[BaseModel]:
    """Block until ``handle`` reaches a terminal state and return its result."""
    ...

BackendStatus

BackendStatus

Coarse-grained execution state for a handle.

Context passing

ContextPasser

Decides what flows into a spawn. Concretes today: FullContextPasser, NullContextPasser. SummaryContextPasser and SelectiveContextPasser are queued.

ContextPasser

Strategy for preparing context before an agent spawn.

Marked @runtime_checkable so Pydantic can validate Agent.context_passer fields via isinstance.

prepare async
prepare(context: AgentContext, task: TaskSpec) -> AgentContext

Return the context the next agent should see, given task.

Source code in src/murmur/core/protocols/context.py
async def prepare(
    self,
    context: AgentContext,
    task: TaskSpec,
) -> AgentContext:
    """Return the context the next agent should see, given ``task``."""
    ...

Tools

ToolProvider

Resolves an agent's allowed tools at dispatch time. Concrete today: StaticToolProvider. RoleBasedToolProvider (role → tool-set map) and DenylistToolProvider (base set minus denied) are queued.

ToolProvider

Resolves which tools an agent may call for a particular task.

resolve
resolve(agent_name: str, requested: frozenset[str]) -> frozenset[str]

Return the subset of requested the agent is allowed to use.

Source code in src/murmur/core/protocols/tools.py
def resolve(
    self,
    agent_name: str,
    requested: frozenset[str],
) -> frozenset[str]:
    """Return the subset of ``requested`` the agent is allowed to use."""
    ...

ToolExecutor

The runtime-side executor that gates and proxies every tool call. Concrete: murmur.tools.ToolExecutor — same name, different module (Protocol in core.protocols, concrete in tools).

ToolExecutor

Executes a tool call on behalf of an agent under policy.

execute async
execute(
    *,
    agent_name: str,
    task_id: str,
    trust_level: TrustLevel,
    allowed: frozenset[str],
    name: str,
    args: dict[str, object],
) -> object

Validate the call against policy, execute, log, and return the result.

Source code in src/murmur/core/protocols/tools.py
async def execute(
    self,
    *,
    agent_name: str,
    task_id: str,
    trust_level: TrustLevel,
    allowed: frozenset[str],
    name: str,
    args: dict[str, object],
) -> object:
    """Validate the call against policy, execute, log, and return the result."""
    ...

ToolsetProvider

Dynamic, runtime-discovered tool sources — primarily MCP. Concrete: MCPToolsetProvider, constructed via the mcp_stdio / mcp_http / mcp_sse factories. Marked @runtime_checkable so Pydantic accepts it as a field type on Agent.mcp_servers.

ToolsetProvider

Pluggable source of tools managed by the runtime lifecycle.

Implementations must be safe to call concurrently after :meth:start has returned; call_tool may be invoked from many tasks at once. start and stop are idempotent — calling either twice is a no-op the runtime relies on for lazy startup and graceful shutdown.

start async
start() -> None

Open the connection / spawn the subprocess. Idempotent.

Source code in src/murmur/core/protocols/toolsets.py
async def start(self) -> None:
    """Open the connection / spawn the subprocess. Idempotent."""
    ...
stop async
stop() -> None

Close the connection / terminate the subprocess. Idempotent.

Source code in src/murmur/core/protocols/toolsets.py
async def stop(self) -> None:
    """Close the connection / terminate the subprocess. Idempotent."""
    ...
list_tools async
list_tools() -> Sequence[ToolDescriptor]

Return descriptors for every tool the provider exposes.

Must be callable after :meth:start. Implementations may cache the result; the runtime calls this once per agent build.

Source code in src/murmur/core/protocols/toolsets.py
async def list_tools(self) -> Sequence[ToolDescriptor]:
    """Return descriptors for every tool the provider exposes.

    Must be callable after :meth:`start`. Implementations may cache
    the result; the runtime calls this once per agent build.
    """
    ...
call_tool async
call_tool(name: str, args: Mapping[str, object]) -> object

Invoke name with args and return the result.

Raises :class:murmur.core.errors.ToolExecutionError if name is not exposed by this provider, or if the underlying tool call fails. The runtime's :class:ToolExecutor wraps every call — agents never reach this method directly.

Source code in src/murmur/core/protocols/toolsets.py
async def call_tool(self, name: str, args: Mapping[str, object]) -> object:
    """Invoke ``name`` with ``args`` and return the result.

    Raises :class:`murmur.core.errors.ToolExecutionError` if ``name``
    is not exposed by this provider, or if the underlying tool call
    fails. The runtime's :class:`ToolExecutor` wraps every call —
    agents never reach this method directly.
    """
    ...

ToolDescriptor

ToolDescriptor

Frozen description of a single tool exposed by a ToolsetProvider.

Mirrors the JSON Schema-shaped metadata MCP servers publish. Read-only is a hint, not a security boundary — trust gating uses an explicit allow-list, never this flag (see 9mt.5).

name instance-attribute
name: str

Tool name as it will appear to the agent. Must be unique within the provider.

input_schema class-attribute instance-attribute
input_schema: Mapping[str, object] = Field(default_factory=dict)

JSON Schema describing the tool's argument shape.

description class-attribute instance-attribute
description: str = ''

Human-readable description surfaced to the LLM.

read_only class-attribute instance-attribute
read_only: bool = False

Provider-declared hint that the tool does not mutate external state.

Informational only. TrustLevel.LOW does not auto-permit read-only tools — an explicit allow-list is required.

Routing

Router

Classifies a task into a single-agent vs multi-agent path. Concrete today: AlwaysSingleRouter in murmur.routing. An LLM-based router is queued.

Router

Pluggable routing strategy.

classify async
classify(task: TaskSpec) -> RouteDecision

Decide whether task runs as single-agent or multi-agent.

Source code in src/murmur/core/protocols/router.py
async def classify(self, task: TaskSpec) -> RouteDecision:
    """Decide whether ``task`` runs as single-agent or multi-agent."""
    ...

RouteDecision

RouteDecision

Outcome of router classification.

SINGLE class-attribute instance-attribute
SINGLE = 'single'

Run a single agent against the task.

MULTI class-attribute instance-attribute
MULTI = 'multi'

Hand off to the orchestrator for fan-out + aggregation.

Events

EventEmitter

The instrumentation sink. Concretes: LogEventEmitter, SSEEventEmitter, MultiEventEmitter, BrokerEventBridge. All pass the shared EventEmitterContract suite. Marked @runtime_checkable.

A WebSocketEventEmitter (push live events to connected dashboards) and FastStreamEventEmitter (publish events onto a configurable broker topic) are queued.

EventEmitter

Sink for typed :class:RuntimeEvent envelopes.

Implementations must be safe to call concurrently. Emit is fire-and-forget from the runtime's perspective — emitters are expected to swallow their own delivery errors (a logging sink that raises would take the agent run down with it).

emit async
emit(event: RuntimeEvent) -> None

Forward event to the underlying sink. Non-blocking.

Source code in src/murmur/core/protocols/events.py
async def emit(self, event: RuntimeEvent) -> None:
    """Forward ``event`` to the underlying sink. Non-blocking."""
    ...

Persistence

Registry

Resolves agent / group names to spec instances. Concretes: InMemoryRegistry, YamlRegistry in murmur.registry.

Registry

Pluggable spec registry.

get
get(name: str) -> Agent

Return the registered agent name or raise RegistryError.

Source code in src/murmur/core/protocols/registry.py
def get(self, name: str) -> Agent:
    """Return the registered agent ``name`` or raise ``RegistryError``."""
    ...
list
list() -> frozenset[str]

Return the set of registered agent names.

Source code in src/murmur/core/protocols/registry.py
def list(self) -> frozenset[str]:
    """Return the set of registered agent names."""
    ...
validate
validate() -> ValidationErrors

Return a list of human-readable validation errors (empty == OK).

Source code in src/murmur/core/protocols/registry.py
def validate(self) -> ValidationErrors:
    """Return a list of human-readable validation errors (empty == OK)."""
    ...

Run store

RunStore is documented under Runs since it lives in murmur.runs rather than murmur.core.protocols — the single exception to the Protocols-first layout, because the value types it operates on (RunStatus, RunProgress, RunEvent) live in the same package.

Pipeline

Pipeline

The composer that wires Stage and Middleware instances around the backend dispatch. Concrete: the unnamed pipeline returned by murmur.core.pipeline.build_pipeline.

Pipeline

A composed chain of stages and middleware.

run async
run(context: PipelineContext) -> T

Execute the chain end-to-end and return the final result.

Source code in src/murmur/core/protocols/pipeline.py
async def run(self, context: PipelineContext) -> T:
    """Execute the chain end-to-end and return the final result."""
    ...

Stage

A single hop in the pipeline. Receives PipelineContext and a reference to the next stage; can mutate context, transform the result, or short-circuit.

Stage

A pipeline stage. Produces or transforms a result.

Middleware

Identical shape to Stage. Distinct name to clarify intent — middleware is for cross-cutting concerns (Retry, Timeout, DepthLimit, CostTracking) where stages are domain operations (route, resolve context, dispatch).

Middleware

Cross-cutting wrapper around a stage. Same shape as Stage; different intent.

NextStage

NextStage module-attribute

NextStage = Callable[['PipelineContext'], Awaitable[T]]

Type of the next_stage callable a stage / middleware receives.

Distributed

Broker

The message-bus abstraction backing JobBackend. Concretes:

  • RedisBroker — Redis Streams; first-class consumer_id, prefetch, and group support.
  • KafkaBroker — Kafka with consumer group_id.
  • NatsBroker — NATS queue groups.
  • RabbitBroker — RabbitMQ named queues (competing-consumer by default).
  • InMemoryBroker — in-process round-robin, memory:// URLs, used in tests.

Production code routes through the URL-keyed factory at AgentRuntime(broker="redis://…") and stays scheme-agnostic; the per- scheme classes are importable directly when an integration test needs the explicit type.

Broker

Minimum viable pub/sub surface.

Implementations must be safe to call concurrently. publish does not wait for handlers to finish — fire-and-forget — but should report failure to schedule the dispatch (e.g. broker is closed) by raising.

start async
start() -> None

Connect / open the broker. Idempotent.

Source code in src/murmur/core/protocols/broker.py
async def start(self) -> None:
    """Connect / open the broker. Idempotent."""
    ...
stop async
stop() -> None

Disconnect cleanly and drop all subscriptions. Idempotent.

Source code in src/murmur/core/protocols/broker.py
async def stop(self) -> None:
    """Disconnect cleanly and drop all subscriptions. Idempotent."""
    ...
publish async
publish(topic: str, payload: bytes) -> None

Publish payload to topic. Does not wait for delivery.

Source code in src/murmur/core/protocols/broker.py
async def publish(self, topic: str, payload: bytes) -> None:
    """Publish ``payload`` to ``topic``. Does not wait for delivery."""
    ...
subscribe async
subscribe(
    topic: str,
    handler: MessageHandler,
    *,
    group: str | None = None,
    prefetch: int | None = None,
    consumer_id: str | None = None,
    reclaim_min_idle_ms: int | None = None,
) -> None

Register handler for messages on topic.

Two delivery semantics, picked via group:

  • group=None (default) — broadcast / pub-sub. Every subscriber on the topic receives every message. Used for runtime-id-scoped reply topics (only ever one subscriber anyway) and event-stream observers that all want the same payload.
  • group=<str>competing-consumer. All subscribers that share the same (topic, group) pair are pooled, and each published message is delivered to exactly one of them. Used by :class:murmur.worker.Worker so multiple workers serving the same agent split the workload instead of duplicating it. Per-broker mapping: Redis Streams consumer group, Kafka consumer group_id, NATS queue group, a named RabbitMQ queue.

Multiple handlers per topic are allowed regardless of mode. Subscriptions persist until :meth:stop is called — there is no per-subscription unsubscribe in this Protocol; restart the broker if you need to fully reset.

prefetch (when not None) bounds how many messages this subscriber holds at once. Effective semantics differ by broker:

  • Redis: forwarded to StreamSub(max_records=...) — true per-poll batch cap. prefetch=1 gives the most uniform fan-out across a Worker fleet.
  • NATS: forwarded to pending_msgs_limit — in-flight backpressure cap, not per-poll batch. Bounds buffer size before the server stops pushing.
  • Kafka, RabbitMQ: currently a no-op. FastStream's Kafka DefaultSubscriber ignores max_records, and AMQP channel QoS lives on a different API than the wrapper exposes. Future change will switch Kafka to batch mode and call channel.set_qos for Rabbit.

None (default) lets the underlying broker pick.

consumer_id (when not None) names this subscriber inside its competing-consumer pool. Currently effective on Redis only — the value becomes the StreamSub.consumer name. A stable consumer_id across Worker restarts lets the consumer reclaim its own pending entry list (PEL) on the next poll cycle, and keeps XINFO GROUPS consumer count bounded by the size of the deployed fleet rather than the cumulative restart count. None falls back to a per-subscription uuid4 — safe for short-lived scripts, leaky for production. Other brokers ignore the field today (Kafka identifies via group_id + partition assignment, NATS by queue group membership, Rabbit by channel).

reclaim_min_idle_ms (when not None and group is set) enables abandoned-PEL recovery: entries that have been pending in another consumer's PEL for at least this long get reclaimed by this subscriber and dispatched through handler. Without it, a Worker that dies before XACK and is replaced by a worker with a different consumer_id strands its pending entries forever. Effective on Redis only — implemented as a sidecar subscriber that runs XAUTOCLAIM alongside the normal XREADGROUP poll, so the same handler processes both new and reclaimed entries; both subscribers share the configured consumer_id so reclaimed ownership is durable across the live worker's restarts. Other brokers ignore the field. Sensible default for production: 30_000 (30 seconds).

Source code in src/murmur/core/protocols/broker.py
async def subscribe(
    self,
    topic: str,
    handler: MessageHandler,
    *,
    group: str | None = None,
    prefetch: int | None = None,
    consumer_id: str | None = None,
    reclaim_min_idle_ms: int | None = None,
) -> None:
    """Register ``handler`` for messages on ``topic``.

    Two delivery semantics, picked via ``group``:

    - ``group=None`` (default) — **broadcast / pub-sub**. Every subscriber
      on the topic receives every message. Used for runtime-id-scoped
      reply topics (only ever one subscriber anyway) and event-stream
      observers that all want the same payload.
    - ``group=<str>`` — **competing-consumer**. All subscribers that
      share the same ``(topic, group)`` pair are pooled, and each
      published message is delivered to exactly one of them. Used by
      :class:`murmur.worker.Worker` so multiple workers serving the same
      agent split the workload instead of duplicating it. Per-broker
      mapping: Redis Streams consumer group, Kafka consumer ``group_id``,
      NATS queue group, a named RabbitMQ queue.

    Multiple handlers per topic are allowed regardless of mode.
    Subscriptions persist until :meth:`stop` is called — there is no
    per-subscription unsubscribe in this Protocol; restart the broker
    if you need to fully reset.

    ``prefetch`` (when not ``None``) bounds how many messages this
    subscriber holds at once. **Effective semantics differ by broker:**

    - Redis: forwarded to ``StreamSub(max_records=...)`` — true
      per-poll batch cap. ``prefetch=1`` gives the most uniform
      fan-out across a Worker fleet.
    - NATS: forwarded to ``pending_msgs_limit`` — in-flight backpressure
      cap, not per-poll batch. Bounds buffer size before the server
      stops pushing.
    - Kafka, RabbitMQ: currently a no-op. FastStream's Kafka
      ``DefaultSubscriber`` ignores ``max_records``, and AMQP channel
      QoS lives on a different API than the wrapper exposes. Future
      change will switch Kafka to batch mode and call
      ``channel.set_qos`` for Rabbit.

    ``None`` (default) lets the underlying broker pick.

    ``consumer_id`` (when not ``None``) names this subscriber inside
    its competing-consumer pool. Currently effective on Redis only —
    the value becomes the ``StreamSub.consumer`` name. A **stable**
    ``consumer_id`` across Worker restarts lets the consumer reclaim
    its own pending entry list (PEL) on the next poll cycle, and
    keeps ``XINFO GROUPS`` consumer count bounded by the size of the
    deployed fleet rather than the cumulative restart count. ``None``
    falls back to a per-subscription ``uuid4`` — safe for short-lived
    scripts, leaky for production. Other brokers ignore the field
    today (Kafka identifies via ``group_id`` + partition assignment,
    NATS by queue group membership, Rabbit by channel).

    ``reclaim_min_idle_ms`` (when not ``None`` and ``group`` is set)
    enables **abandoned-PEL recovery**: entries that have been pending
    in another consumer's PEL for at least this long get reclaimed by
    this subscriber and dispatched through ``handler``. Without it, a
    Worker that dies before ``XACK`` and is replaced by a worker with
    a different ``consumer_id`` strands its pending entries forever.
    Effective on Redis only — implemented as a sidecar subscriber that
    runs ``XAUTOCLAIM`` alongside the normal ``XREADGROUP`` poll, so
    the same handler processes both new and reclaimed entries; both
    subscribers share the configured ``consumer_id`` so reclaimed
    ownership is durable across the live worker's restarts. Other
    brokers ignore the field. Sensible default for production:
    ``30_000`` (30 seconds).
    """
    ...

MessageHandler

MessageHandler module-attribute

MessageHandler: TypeAlias = Callable[[bytes], Awaitable[None]]

Callback fired for each message that arrives on a subscribed topic.

Worker

The distributed consumer. Concrete: murmur.worker.Worker.

Worker

Distributed consumer Protocol.

start async
start() -> None

Begin consuming tasks. Returns when stop is called.

Source code in src/murmur/core/protocols/worker.py
async def start(self) -> None:
    """Begin consuming tasks. Returns when ``stop`` is called."""
    ...
stop async
stop() -> None

Drain in-flight tasks and stop consuming.

Source code in src/murmur/core/protocols/worker.py
async def stop(self) -> None:
    """Drain in-flight tasks and stop consuming."""
    ...
on_task_start
on_task_start(fn: OnStart) -> OnStart

Register a coroutine fired when a task starts. Returns fn unchanged.

Source code in src/murmur/core/protocols/worker.py
def on_task_start(self, fn: OnStart) -> OnStart:
    """Register a coroutine fired when a task starts. Returns ``fn`` unchanged."""
    ...
on_task_complete
on_task_complete(fn: OnComplete) -> OnComplete

Register a coroutine fired when a task completes.

Source code in src/murmur/core/protocols/worker.py
def on_task_complete(self, fn: OnComplete) -> OnComplete:
    """Register a coroutine fired when a task completes."""
    ...
on_task_error
on_task_error(fn: OnError) -> OnError

Register a coroutine fired when a task fails.

Source code in src/murmur/core/protocols/worker.py
def on_task_error(self, fn: OnError) -> OnError:
    """Register a coroutine fired when a task fails."""
    ...

Worker hooks

Type aliases for the lifecycle callbacks attached via @worker.on_task_start etc.

OnStart module-attribute

OnStart = Callable[[str, str], Awaitable[None]]

async def fn(task_id: str, agent_name: str) -> None

OnComplete module-attribute

OnComplete = Callable[[str, str, int], Awaitable[None]]

async def fn(task_id: str, agent_name: str, duration_ms: int) -> None

OnError module-attribute

OnError = Callable[[str, str, BaseException], Awaitable[None]]

async def fn(task_id: str, agent_name: str, error: BaseException) -> None