Events¶
Typed runtime instrumentation. Every emitter satisfies the
EventEmitter Protocol (defined in murmur.core.protocols.events) and
passes the shared EventEmitterContract test suite.
from murmur.events import (
BrokerEventBridge,
EventType,
LogEventEmitter,
MultiEventEmitter,
OTelMetricsEmitter,
RuntimeEvent,
SSEEventEmitter,
)
Value types¶
RuntimeEvent¶
RuntimeEvent
¶
Frozen envelope for one runtime instrumentation point.
Crosses thread / process / broker boundaries safely (Pydantic serialisation) so emitters can be wired in any backend without leaking unserialisable state.
event_type
instance-attribute
¶
event_type: EventType
Discriminator. Determines what payload shape callers should expect —
see :class:EventType value docstrings for the per-type payload contract.
timestamp
class-attribute
instance-attribute
¶
UTC timestamp of the event. Set at the emission point inside the runtime; not the timestamp the emitter delivered to its sink.
agent_name
instance-attribute
¶
Name of the :class:Agent the event is about — or the dispatching
agent's name for batch / group / tool-call events.
task_id
class-attribute
instance-attribute
¶
None for non-task events (BATCH_STARTED, GROUP_STARTED, etc.).
trace_id
instance-attribute
¶
Same value as :attr:murmur.types.TaskSpec.request_id. Phase 2 does
not introduce a new ID — every log line, every event, every broker
message carries this one id.
parent_trace_id
class-attribute
instance-attribute
¶
Reserved for cascading-spawn support (Phase 4). Stays None for
top-level runs; child spawns will carry the parent's trace_id here.
payload
class-attribute
instance-attribute
¶
Per-event-type details — see :class:EventType docstrings.
EventType¶
EventType
¶
Discriminator for :class:RuntimeEvent.event_type.
AGENT_DISPATCHED
class-attribute
instance-attribute
¶
Emitted publisher-side by :class:JobBackend immediately after a
task is published to the broker — before the worker picks it up and
fires :data:AGENT_SPAWNED. Gives the publisher local visibility into
"task accepted by broker" even when the distributed event bridge is
off. AsyncBackend never emits this; AGENT_SPAWNED is the equivalent
head-of-run signal.
Payload: {"backend": str, "broker": str | None, "trust_level": str}.
AGENT_SPAWNED
class-attribute
instance-attribute
¶
Emitted by the runtime when an agent run starts.
Payload: {"backend": str, "trust_level": str}.
AGENT_COMPLETED
class-attribute
instance-attribute
¶
Emitted when an agent run produces an output.
Payload: {"duration_ms": int, "tokens_used": int, "backend": str,
"model": str}. model is the resolved model identifier — either
the user-supplied "provider:name" string or "{system}:{model_name}"
when an instance was passed.
AGENT_FAILED
class-attribute
instance-attribute
¶
Emitted when an agent run errors.
Payload: {"duration_ms": int, "error": str, "backend": str}.
TOOL_CALL_STARTED
class-attribute
instance-attribute
¶
Emitted by :class:ToolExecutor before each tool invocation.
Payload: {"tool_name": str, "trust_level": str}.
TOOL_CALL_COMPLETED
class-attribute
instance-attribute
¶
Emitted on a successful tool return.
Payload: {"tool_name": str, "duration_ms": int, "tokens_used": int}.
tokens_used is best-effort LLM cost attribution to this tool call;
until the agent loop reports a per-call delta it is 0.
TOOL_CALL_FAILED
class-attribute
instance-attribute
¶
Emitted on a tool exception.
Payload: {"tool_name": str, "error": str, "duration_ms": int}.
BATCH_STARTED
class-attribute
instance-attribute
¶
Emitted at the head of :meth:AgentRuntime.gather.
Payload: {"task_count": int, "max_concurrency": int}.
BATCH_COMPLETED
class-attribute
instance-attribute
¶
Emitted after every slot of :meth:AgentRuntime.gather settles.
Payload: {"task_count": int, "success_count": int, "failure_count": int}.
GROUP_STARTED
class-attribute
instance-attribute
¶
Emitted at the head of :meth:AgentRuntime.run_group.
Payload: {"group_name": str, "node_count": int}.
GROUP_COMPLETED
class-attribute
instance-attribute
¶
Emitted on terminal-result of :meth:AgentRuntime.run_group.
Payload: {"group_name": str, "duration_ms": int}.
BUDGET_EXCEEDED
class-attribute
instance-attribute
¶
Emitted by cost-tracking middleware just before raising BudgetExceededError.
Payload: {"limit": int, "used": int, "scope": "task" | "runtime"}.
DEPTH_LIMIT_EXCEEDED
class-attribute
instance-attribute
¶
Emitted by depth-limit middleware just before raising DepthLimitError.
Payload: {"limit": int, "depth": int}.
WORKER_STARTED
class-attribute
instance-attribute
¶
Emitted by :class:Worker.start after subscriptions are live and the
worker is consuming. Pairs with :data:WORKER_STOPPED for restart-history
reconstruction.
Payload: {"runtime_id": str, "agents": list[str], "broker_scheme": str | None,
"concurrency": int, "prefetch": int, "consumer_id": str | None,
"heartbeat_seconds": float}.
WORKER_STOPPED
class-attribute
instance-attribute
¶
Emitted by :class:Worker.stop before draining in-flight tasks. The
event still flows because the runtime's emitter chain is independent of
the broker subscription that the stop is about to drop.
Payload: {"runtime_id": str, "agents": list[str],
"broker_scheme": str | None}.
WORKER_HEARTBEAT
class-attribute
instance-attribute
¶
Emitted by :class:Worker on a configurable timer so the dashboard's
fleet view can answer "is this worker alive?" without inferring liveness
from activity. Fires from start() until stop(); heartbeat_seconds=0
disables it.
Payload: {"agent_subscriptions": list[str], "in_flight": int,
"concurrency_cap": int, "broker_scheme": str | None, "runtime_id": str}.
agent_name on the envelope carries the worker's runtime id (the worker
isn't tied to a single agent — its runtime id is the closest stable handle).
Emitters¶
LogEventEmitter¶
LogEventEmitter
¶
Default :class:EventEmitter — writes via structlog.
Stateless and trivially safe to share across runtimes / threads /
runs. Construction takes no arguments; the bound logger is resolved
per-call so structlog.testing.capture_logs can intercept emit
output even after the CLI has set cache_logger_on_first_use=True.
SSEEventEmitter¶
SSEEventEmitter
¶
:class:EventEmitter that fans events out to SSE subscribers.
Source code in src/murmur/events/sse.py
emit
async
¶
emit(event: RuntimeEvent) -> None
Enqueue event onto every subscriber's queue. Non-blocking.
Slow consumers whose queue is full drop the new event (we'd rather lose telemetry than backpressure the runtime). The drop is logged once per occurrence so it's visible.
Source code in src/murmur/events/sse.py
subscribe
async
¶
Yield SSE-formatted dicts for an :class:EventSourceResponse.
Each dict has event (the :class:EventType value) and
data (JSON-serialised :class:RuntimeEvent). Heartbeats use
event="ping" with empty data.
Cleanly removes the subscriber's queue and cancels its heartbeat task on cancellation — closing the SSE connection client-side triggers exactly that.
Source code in src/murmur/events/sse.py
MultiEventEmitter¶
MultiEventEmitter
¶
MultiEventEmitter(emitters: Sequence[EventEmitter])
Broadcast each emitted event to every wrapped emitter.
Construction takes a sequence (typically a list or tuple) of emitters.
Wrap as the runtime's event_emitter to attach more than one sink:
from murmur.events import LogEventEmitter, MultiEventEmitter emitter = MultiEventEmitter([LogEventEmitter(), my_metrics_emitter]) runtime = AgentRuntime(event_emitter=emitter)
Source code in src/murmur/events/multi.py
BrokerEventBridge¶
BrokerEventBridge
¶
BrokerEventBridge(broker: Broker)
:class:EventEmitter that relays each event to a broker topic.
The topic is read from the per-task contextvar — see
:func:bind_event_topic. When no topic is bound, emit is a
no-op, so adding the bridge to a runtime's emitter chain has zero
cost when distributed observability isn't requested.
Source code in src/murmur/events/broker.py
OTelMetricsEmitter¶
OTelMetricsEmitter
¶
Emit OTel GenAI metrics from each :class:RuntimeEvent.
Recorded instruments:
gen_ai.client.token.usage— histogram of tokens per agent run, attributes{operation, provider, request_model, token_type}.token_typeis always"total"because Murmur sums input + output tokens at the runtime level; we don't have the per-side breakdown without delving into PydanticAI'sRunUsage.gen_ai.client.operation.duration— seconds per agent run, attributes{operation, provider, request_model, error_type}.error_typeis unset on success and the typed error-class name on failure (BudgetExceededError,DepthLimitError, etc.).murmur.tool.calls— counter of tool invocations; attributes{tool, agent, status}wherestatus∈ok|error.murmur.tool.duration_ms— histogram of tool-call latency in ms.murmur.rejections— counter forBUDGET_EXCEEDED/DEPTH_LIMIT_EXCEEDED/AGENT_FAILEDevents with a typed reason payload.
Cardinality discipline: agent, tool, operation, provider,
request_model are all expected to be bounded sets. Do not label
by trace_id / task_id / prompt content — that's the cardinality
bomb the planning doc calls out.
Build the OTel instruments.
meter_provider defaults to the globally-configured one
(opentelemetry.metrics.get_meter_provider()). Pass an explicit
provider to bind this emitter to a non-global one — useful in tests
where multiple emitters share an :class:InMemoryMetricReader.
Raises :class:ImportError with an install hint when the
murmur-runtime[otel] extra isn't present.