Protocols¶
The abstract ports Murmur is wired against. Every pluggable component
is a typing.Protocol here first; concretes in sibling packages match
structurally — no inheritance, no registration. Tests are keyed on
the Protocol so every concrete passes the same shared contract suite.
from murmur.core.protocols import (
Backend,
BackendStatus,
Broker,
ContextPasser,
EventEmitter,
MessageHandler,
Middleware,
NextStage,
OnComplete,
OnError,
OnStart,
Pipeline,
Registry,
RouteDecision,
Router,
Stage,
ToolDescriptor,
ToolExecutor,
ToolProvider,
ToolsetProvider,
Worker,
)
A handful of Protocols carry @runtime_checkable so they can be used as
Pydantic field types or in isinstance() checks — EventEmitter and
ToolsetProvider today.
Execution¶
Backend¶
The unit of execution. Concretes:
AsyncBackend and
JobBackend. Both pass the shared
BackendContract test suite.
Backend
¶
Pluggable execution backend.
Implementations must be safe to call concurrently from many tasks against
a single instance. State per spawn is keyed by :class:AgentHandle.
spawn
async
¶
spawn(agent: Agent, task: TaskSpec, context: AgentContext) -> AgentHandle
status
async
¶
status(handle: AgentHandle) -> BackendStatus
kill
async
¶
kill(handle: AgentHandle) -> None
result
async
¶
result(handle: AgentHandle) -> AgentResult[BaseModel]
BackendStatus¶
BackendStatus
¶
Coarse-grained execution state for a handle.
Context passing¶
ContextPasser¶
Decides what flows into a spawn. Concretes today:
FullContextPasser,
NullContextPasser.
SummaryContextPasser and SelectiveContextPasser are queued.
ContextPasser
¶
Strategy for preparing context before an agent spawn.
Marked @runtime_checkable so Pydantic can validate Agent.context_passer
fields via isinstance.
prepare
async
¶
prepare(context: AgentContext, task: TaskSpec) -> AgentContext
Tools¶
ToolProvider¶
Resolves an agent's allowed tools at dispatch time. Concrete today:
StaticToolProvider.
RoleBasedToolProvider (role → tool-set map) and DenylistToolProvider
(base set minus denied) are queued.
ToolExecutor¶
The runtime-side executor that gates and proxies every tool call.
Concrete: murmur.tools.ToolExecutor — same
name, different module (Protocol in core.protocols, concrete in
tools).
ToolExecutor
¶
Executes a tool call on behalf of an agent under policy.
execute
async
¶
execute(
*,
agent_name: str,
task_id: str,
trust_level: TrustLevel,
allowed: frozenset[str],
name: str,
args: dict[str, object],
) -> object
Validate the call against policy, execute, log, and return the result.
Source code in src/murmur/core/protocols/tools.py
ToolsetProvider¶
Dynamic, runtime-discovered tool sources — primarily MCP. Concrete:
MCPToolsetProvider, constructed via the
mcp_stdio / mcp_http / mcp_sse
factories. Marked @runtime_checkable so Pydantic accepts it as a
field type on Agent.mcp_servers.
ToolsetProvider
¶
Pluggable source of tools managed by the runtime lifecycle.
Implementations must be safe to call concurrently after :meth:start
has returned; call_tool may be invoked from many tasks at once.
start and stop are idempotent — calling either twice is a
no-op the runtime relies on for lazy startup and graceful shutdown.
start
async
¶
stop
async
¶
list_tools
async
¶
list_tools() -> Sequence[ToolDescriptor]
Return descriptors for every tool the provider exposes.
Must be callable after :meth:start. Implementations may cache
the result; the runtime calls this once per agent build.
Source code in src/murmur/core/protocols/toolsets.py
call_tool
async
¶
Invoke name with args and return the result.
Raises :class:murmur.core.errors.ToolExecutionError if name
is not exposed by this provider, or if the underlying tool call
fails. The runtime's :class:ToolExecutor wraps every call —
agents never reach this method directly.
Source code in src/murmur/core/protocols/toolsets.py
ToolDescriptor¶
ToolDescriptor
¶
Frozen description of a single tool exposed by a ToolsetProvider.
Mirrors the JSON Schema-shaped metadata MCP servers publish. Read-only
is a hint, not a security boundary — trust gating uses an explicit
allow-list, never this flag (see 9mt.5).
name
instance-attribute
¶
Tool name as it will appear to the agent. Must be unique within the provider.
input_schema
class-attribute
instance-attribute
¶
JSON Schema describing the tool's argument shape.
description
class-attribute
instance-attribute
¶
Human-readable description surfaced to the LLM.
read_only
class-attribute
instance-attribute
¶
Provider-declared hint that the tool does not mutate external state.
Informational only. TrustLevel.LOW does not auto-permit read-only
tools — an explicit allow-list is required.
Routing¶
Router¶
Classifies a task into a single-agent vs multi-agent path. Concrete
today: AlwaysSingleRouter in murmur.routing. An LLM-based router is
queued.
Router
¶
Pluggable routing strategy.
classify
async
¶
classify(task: TaskSpec) -> RouteDecision
RouteDecision¶
RouteDecision
¶
Events¶
EventEmitter¶
The instrumentation sink. Concretes:
LogEventEmitter,
SSEEventEmitter,
MultiEventEmitter,
BrokerEventBridge. All pass the shared
EventEmitterContract suite. Marked @runtime_checkable.
A WebSocketEventEmitter (push live events to connected dashboards) and
FastStreamEventEmitter (publish events onto a configurable broker
topic) are queued.
EventEmitter
¶
Sink for typed :class:RuntimeEvent envelopes.
Implementations must be safe to call concurrently. Emit is fire-and-forget from the runtime's perspective — emitters are expected to swallow their own delivery errors (a logging sink that raises would take the agent run down with it).
emit
async
¶
emit(event: RuntimeEvent) -> None
Persistence¶
Registry¶
Resolves agent / group names to spec instances. Concretes:
InMemoryRegistry, YamlRegistry in murmur.registry.
Run store¶
RunStore is documented under Runs since
it lives in murmur.runs rather than murmur.core.protocols — the
single exception to the Protocols-first layout, because the value
types it operates on (RunStatus, RunProgress, RunEvent) live in
the same package.
Pipeline¶
Pipeline¶
The composer that wires Stage and Middleware instances around the
backend dispatch. Concrete: the unnamed pipeline returned by
murmur.core.pipeline.build_pipeline.
Pipeline
¶
A composed chain of stages and middleware.
Stage¶
A single hop in the pipeline. Receives PipelineContext and a reference
to the next stage; can mutate context, transform the result, or
short-circuit.
Stage
¶
A pipeline stage. Produces or transforms a result.
Middleware¶
Identical shape to Stage. Distinct name to clarify intent — middleware
is for cross-cutting concerns (Retry,
Timeout,
DepthLimit,
CostTracking) where stages are
domain operations (route, resolve context, dispatch).
Middleware
¶
Cross-cutting wrapper around a stage. Same shape as Stage; different intent.
NextStage¶
NextStage
module-attribute
¶
Type of the next_stage callable a stage / middleware receives.
Distributed¶
Broker¶
The message-bus abstraction backing JobBackend. Concretes:
RedisBroker— Redis Streams; first-classconsumer_id,prefetch, andgroupsupport.KafkaBroker— Kafka with consumergroup_id.NatsBroker— NATS queue groups.RabbitBroker— RabbitMQ named queues (competing-consumer by default).InMemoryBroker— in-process round-robin,memory://URLs, used in tests.
Production code routes through the URL-keyed factory at
AgentRuntime(broker="redis://…") and stays scheme-agnostic; the per-
scheme classes are importable directly when an integration test needs
the explicit type.
Broker
¶
Minimum viable pub/sub surface.
Implementations must be safe to call concurrently. publish does not
wait for handlers to finish — fire-and-forget — but should report failure
to schedule the dispatch (e.g. broker is closed) by raising.
start
async
¶
stop
async
¶
publish
async
¶
subscribe
async
¶
subscribe(
topic: str,
handler: MessageHandler,
*,
group: str | None = None,
prefetch: int | None = None,
consumer_id: str | None = None,
reclaim_min_idle_ms: int | None = None,
) -> None
Register handler for messages on topic.
Two delivery semantics, picked via group:
group=None(default) — broadcast / pub-sub. Every subscriber on the topic receives every message. Used for runtime-id-scoped reply topics (only ever one subscriber anyway) and event-stream observers that all want the same payload.group=<str>— competing-consumer. All subscribers that share the same(topic, group)pair are pooled, and each published message is delivered to exactly one of them. Used by :class:murmur.worker.Workerso multiple workers serving the same agent split the workload instead of duplicating it. Per-broker mapping: Redis Streams consumer group, Kafka consumergroup_id, NATS queue group, a named RabbitMQ queue.
Multiple handlers per topic are allowed regardless of mode.
Subscriptions persist until :meth:stop is called — there is no
per-subscription unsubscribe in this Protocol; restart the broker
if you need to fully reset.
prefetch (when not None) bounds how many messages this
subscriber holds at once. Effective semantics differ by broker:
- Redis: forwarded to
StreamSub(max_records=...)— true per-poll batch cap.prefetch=1gives the most uniform fan-out across a Worker fleet. - NATS: forwarded to
pending_msgs_limit— in-flight backpressure cap, not per-poll batch. Bounds buffer size before the server stops pushing. - Kafka, RabbitMQ: currently a no-op. FastStream's Kafka
DefaultSubscriberignoresmax_records, and AMQP channel QoS lives on a different API than the wrapper exposes. Future change will switch Kafka to batch mode and callchannel.set_qosfor Rabbit.
None (default) lets the underlying broker pick.
consumer_id (when not None) names this subscriber inside
its competing-consumer pool. Currently effective on Redis only —
the value becomes the StreamSub.consumer name. A stable
consumer_id across Worker restarts lets the consumer reclaim
its own pending entry list (PEL) on the next poll cycle, and
keeps XINFO GROUPS consumer count bounded by the size of the
deployed fleet rather than the cumulative restart count. None
falls back to a per-subscription uuid4 — safe for short-lived
scripts, leaky for production. Other brokers ignore the field
today (Kafka identifies via group_id + partition assignment,
NATS by queue group membership, Rabbit by channel).
reclaim_min_idle_ms (when not None and group is set)
enables abandoned-PEL recovery: entries that have been pending
in another consumer's PEL for at least this long get reclaimed by
this subscriber and dispatched through handler. Without it, a
Worker that dies before XACK and is replaced by a worker with
a different consumer_id strands its pending entries forever.
Effective on Redis only — implemented as a sidecar subscriber that
runs XAUTOCLAIM alongside the normal XREADGROUP poll, so
the same handler processes both new and reclaimed entries; both
subscribers share the configured consumer_id so reclaimed
ownership is durable across the live worker's restarts. Other
brokers ignore the field. Sensible default for production:
30_000 (30 seconds).
Source code in src/murmur/core/protocols/broker.py
MessageHandler¶
MessageHandler
module-attribute
¶
Callback fired for each message that arrives on a subscribed topic.
Worker¶
The distributed consumer. Concrete:
murmur.worker.Worker.
Worker
¶
Distributed consumer Protocol.
start
async
¶
stop
async
¶
on_task_start
¶
on_task_complete
¶
on_task_complete(fn: OnComplete) -> OnComplete
Worker hooks¶
Type aliases for the lifecycle callbacks attached via @worker.on_task_start
etc.
OnStart
module-attribute
¶
async def fn(task_id: str, agent_name: str) -> None
OnComplete
module-attribute
¶
async def fn(task_id: str, agent_name: str, duration_ms: int) -> None
OnError
module-attribute
¶
async def fn(task_id: str, agent_name: str, error: BaseException) -> None