Skip to content

Events

Typed runtime instrumentation. Every emitter satisfies the EventEmitter Protocol (defined in murmur.core.protocols.events) and passes the shared EventEmitterContract test suite.

from murmur.events import (
    BrokerEventBridge,
    EventType,
    LogEventEmitter,
    MultiEventEmitter,
    OTelMetricsEmitter,
    RuntimeEvent,
    SSEEventEmitter,
)

Value types

RuntimeEvent

RuntimeEvent

Frozen envelope for one runtime instrumentation point.

Crosses thread / process / broker boundaries safely (Pydantic serialisation) so emitters can be wired in any backend without leaking unserialisable state.

event_type instance-attribute
event_type: EventType

Discriminator. Determines what payload shape callers should expect — see :class:EventType value docstrings for the per-type payload contract.

timestamp class-attribute instance-attribute
timestamp: datetime = Field(default_factory=lambda: now(tz=UTC))

UTC timestamp of the event. Set at the emission point inside the runtime; not the timestamp the emitter delivered to its sink.

agent_name instance-attribute
agent_name: str

Name of the :class:Agent the event is about — or the dispatching agent's name for batch / group / tool-call events.

task_id class-attribute instance-attribute
task_id: str | None = None

None for non-task events (BATCH_STARTED, GROUP_STARTED, etc.).

trace_id instance-attribute
trace_id: str

Same value as :attr:murmur.types.TaskSpec.request_id. Phase 2 does not introduce a new ID — every log line, every event, every broker message carries this one id.

parent_trace_id class-attribute instance-attribute
parent_trace_id: str | None = None

Reserved for cascading-spawn support (Phase 4). Stays None for top-level runs; child spawns will carry the parent's trace_id here.

payload class-attribute instance-attribute
payload: Mapping[str, object] = Field(default_factory=dict)

Per-event-type details — see :class:EventType docstrings.

EventType

EventType

Discriminator for :class:RuntimeEvent.event_type.

AGENT_DISPATCHED class-attribute instance-attribute
AGENT_DISPATCHED = 'agent_dispatched'

Emitted publisher-side by :class:JobBackend immediately after a task is published to the broker — before the worker picks it up and fires :data:AGENT_SPAWNED. Gives the publisher local visibility into "task accepted by broker" even when the distributed event bridge is off. AsyncBackend never emits this; AGENT_SPAWNED is the equivalent head-of-run signal.

Payload: {"backend": str, "broker": str | None, "trust_level": str}.

AGENT_SPAWNED class-attribute instance-attribute
AGENT_SPAWNED = 'agent_spawned'

Emitted by the runtime when an agent run starts.

Payload: {"backend": str, "trust_level": str}.

AGENT_COMPLETED class-attribute instance-attribute
AGENT_COMPLETED = 'agent_completed'

Emitted when an agent run produces an output.

Payload: {"duration_ms": int, "tokens_used": int, "backend": str, "model": str}. model is the resolved model identifier — either the user-supplied "provider:name" string or "{system}:{model_name}" when an instance was passed.

AGENT_FAILED class-attribute instance-attribute
AGENT_FAILED = 'agent_failed'

Emitted when an agent run errors.

Payload: {"duration_ms": int, "error": str, "backend": str}.

TOOL_CALL_STARTED class-attribute instance-attribute
TOOL_CALL_STARTED = 'tool_call_started'

Emitted by :class:ToolExecutor before each tool invocation.

Payload: {"tool_name": str, "trust_level": str}.

TOOL_CALL_COMPLETED class-attribute instance-attribute
TOOL_CALL_COMPLETED = 'tool_call_completed'

Emitted on a successful tool return.

Payload: {"tool_name": str, "duration_ms": int, "tokens_used": int}. tokens_used is best-effort LLM cost attribution to this tool call; until the agent loop reports a per-call delta it is 0.

TOOL_CALL_FAILED class-attribute instance-attribute
TOOL_CALL_FAILED = 'tool_call_failed'

Emitted on a tool exception.

Payload: {"tool_name": str, "error": str, "duration_ms": int}.

BATCH_STARTED class-attribute instance-attribute
BATCH_STARTED = 'batch_started'

Emitted at the head of :meth:AgentRuntime.gather.

Payload: {"task_count": int, "max_concurrency": int}.

BATCH_COMPLETED class-attribute instance-attribute
BATCH_COMPLETED = 'batch_completed'

Emitted after every slot of :meth:AgentRuntime.gather settles.

Payload: {"task_count": int, "success_count": int, "failure_count": int}.

GROUP_STARTED class-attribute instance-attribute
GROUP_STARTED = 'group_started'

Emitted at the head of :meth:AgentRuntime.run_group.

Payload: {"group_name": str, "node_count": int}.

GROUP_COMPLETED class-attribute instance-attribute
GROUP_COMPLETED = 'group_completed'

Emitted on terminal-result of :meth:AgentRuntime.run_group.

Payload: {"group_name": str, "duration_ms": int}.

BUDGET_EXCEEDED class-attribute instance-attribute
BUDGET_EXCEEDED = 'budget_exceeded'

Emitted by cost-tracking middleware just before raising BudgetExceededError.

Payload: {"limit": int, "used": int, "scope": "task" | "runtime"}.

DEPTH_LIMIT_EXCEEDED class-attribute instance-attribute
DEPTH_LIMIT_EXCEEDED = 'depth_limit_exceeded'

Emitted by depth-limit middleware just before raising DepthLimitError.

Payload: {"limit": int, "depth": int}.

WORKER_STARTED class-attribute instance-attribute
WORKER_STARTED = 'worker_started'

Emitted by :class:Worker.start after subscriptions are live and the worker is consuming. Pairs with :data:WORKER_STOPPED for restart-history reconstruction.

Payload: {"runtime_id": str, "agents": list[str], "broker_scheme": str | None, "concurrency": int, "prefetch": int, "consumer_id": str | None, "heartbeat_seconds": float}.

WORKER_STOPPED class-attribute instance-attribute
WORKER_STOPPED = 'worker_stopped'

Emitted by :class:Worker.stop before draining in-flight tasks. The event still flows because the runtime's emitter chain is independent of the broker subscription that the stop is about to drop.

Payload: {"runtime_id": str, "agents": list[str], "broker_scheme": str | None}.

WORKER_HEARTBEAT class-attribute instance-attribute
WORKER_HEARTBEAT = 'worker_heartbeat'

Emitted by :class:Worker on a configurable timer so the dashboard's fleet view can answer "is this worker alive?" without inferring liveness from activity. Fires from start() until stop(); heartbeat_seconds=0 disables it.

Payload: {"agent_subscriptions": list[str], "in_flight": int, "concurrency_cap": int, "broker_scheme": str | None, "runtime_id": str}. agent_name on the envelope carries the worker's runtime id (the worker isn't tied to a single agent — its runtime id is the closest stable handle).

Emitters

LogEventEmitter

LogEventEmitter

Default :class:EventEmitter — writes via structlog.

Stateless and trivially safe to share across runtimes / threads / runs. Construction takes no arguments; the bound logger is resolved per-call so structlog.testing.capture_logs can intercept emit output even after the CLI has set cache_logger_on_first_use=True.

SSEEventEmitter

SSEEventEmitter

SSEEventEmitter(*, heartbeat_interval: float = 15.0, queue_max: int = 1024)

:class:EventEmitter that fans events out to SSE subscribers.

Source code in src/murmur/events/sse.py
def __init__(
    self,
    *,
    heartbeat_interval: float = 15.0,
    queue_max: int = 1024,
) -> None:
    if heartbeat_interval <= 0:
        raise ValueError("heartbeat_interval must be > 0")
    if queue_max < 1:
        raise ValueError("queue_max must be >= 1")
    self._heartbeat_interval = heartbeat_interval
    self._queue_max = queue_max
    self._subscribers: list[asyncio.Queue[RuntimeEvent | _Heartbeat]] = []
    self._lock = asyncio.Lock()
emit async
emit(event: RuntimeEvent) -> None

Enqueue event onto every subscriber's queue. Non-blocking.

Slow consumers whose queue is full drop the new event (we'd rather lose telemetry than backpressure the runtime). The drop is logged once per occurrence so it's visible.

Source code in src/murmur/events/sse.py
async def emit(self, event: RuntimeEvent) -> None:
    """Enqueue ``event`` onto every subscriber's queue. Non-blocking.

    Slow consumers whose queue is full drop the *new* event (we'd
    rather lose telemetry than backpressure the runtime). The drop
    is logged once per occurrence so it's visible.
    """
    async with self._lock:
        subs = list(self._subscribers)
    for q in subs:
        try:
            q.put_nowait(event)
        except asyncio.QueueFull:
            await _log.awarning(
                "sse_subscriber_overflow",
                event_type=event.event_type.value,
                queue_max=self._queue_max,
            )
subscribe async
subscribe() -> AsyncGenerator[dict[str, str], None]

Yield SSE-formatted dicts for an :class:EventSourceResponse.

Each dict has event (the :class:EventType value) and data (JSON-serialised :class:RuntimeEvent). Heartbeats use event="ping" with empty data.

Cleanly removes the subscriber's queue and cancels its heartbeat task on cancellation — closing the SSE connection client-side triggers exactly that.

Source code in src/murmur/events/sse.py
async def subscribe(self) -> AsyncGenerator[dict[str, str], None]:
    """Yield SSE-formatted dicts for an :class:`EventSourceResponse`.

    Each dict has ``event`` (the :class:`EventType` value) and
    ``data`` (JSON-serialised :class:`RuntimeEvent`). Heartbeats use
    ``event="ping"`` with empty data.

    Cleanly removes the subscriber's queue and cancels its heartbeat
    task on cancellation — closing the SSE connection client-side
    triggers exactly that.
    """
    q: asyncio.Queue[RuntimeEvent | _Heartbeat] = asyncio.Queue(
        maxsize=self._queue_max
    )
    async with self._lock:
        self._subscribers.append(q)
    heartbeat_task = asyncio.create_task(
        self._heartbeat_loop(q),
        name="murmur-sse-heartbeat",
    )
    try:
        while True:
            item = await q.get()
            if isinstance(item, _Heartbeat):
                yield {"event": "ping", "data": ""}
            else:
                yield {
                    "event": item.event_type.value,
                    "data": item.model_dump_json(),
                }
    finally:
        heartbeat_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await heartbeat_task
        async with self._lock:
            if q in self._subscribers:
                self._subscribers.remove(q)

MultiEventEmitter

MultiEventEmitter

MultiEventEmitter(emitters: Sequence[EventEmitter])

Broadcast each emitted event to every wrapped emitter.

Construction takes a sequence (typically a list or tuple) of emitters. Wrap as the runtime's event_emitter to attach more than one sink:

from murmur.events import LogEventEmitter, MultiEventEmitter emitter = MultiEventEmitter([LogEventEmitter(), my_metrics_emitter]) runtime = AgentRuntime(event_emitter=emitter)

Source code in src/murmur/events/multi.py
def __init__(self, emitters: Sequence[EventEmitter]) -> None:
    self._emitters: tuple[EventEmitter, ...] = tuple(emitters)
emitters property
emitters: tuple[EventEmitter, ...]

The wrapped emitters in declaration order.

BrokerEventBridge

BrokerEventBridge

BrokerEventBridge(broker: Broker)

:class:EventEmitter that relays each event to a broker topic.

The topic is read from the per-task contextvar — see :func:bind_event_topic. When no topic is bound, emit is a no-op, so adding the bridge to a runtime's emitter chain has zero cost when distributed observability isn't requested.

Source code in src/murmur/events/broker.py
def __init__(self, broker: Broker) -> None:
    self._broker = broker

OTelMetricsEmitter

OTelMetricsEmitter

OTelMetricsEmitter(*, meter_provider: Any | None = None)

Emit OTel GenAI metrics from each :class:RuntimeEvent.

Recorded instruments:

  • gen_ai.client.token.usage — histogram of tokens per agent run, attributes {operation, provider, request_model, token_type}. token_type is always "total" because Murmur sums input + output tokens at the runtime level; we don't have the per-side breakdown without delving into PydanticAI's RunUsage.
  • gen_ai.client.operation.duration — seconds per agent run, attributes {operation, provider, request_model, error_type}. error_type is unset on success and the typed error-class name on failure (BudgetExceededError, DepthLimitError, etc.).
  • murmur.tool.calls — counter of tool invocations; attributes {tool, agent, status} where statusok|error.
  • murmur.tool.duration_ms — histogram of tool-call latency in ms.
  • murmur.rejections — counter for BUDGET_EXCEEDED / DEPTH_LIMIT_EXCEEDED / AGENT_FAILED events with a typed reason payload.

Cardinality discipline: agent, tool, operation, provider, request_model are all expected to be bounded sets. Do not label by trace_id / task_id / prompt content — that's the cardinality bomb the planning doc calls out.

Build the OTel instruments.

meter_provider defaults to the globally-configured one (opentelemetry.metrics.get_meter_provider()). Pass an explicit provider to bind this emitter to a non-global one — useful in tests where multiple emitters share an :class:InMemoryMetricReader.

Raises :class:ImportError with an install hint when the murmur-runtime[otel] extra isn't present.

Source code in src/murmur/events/otel.py
def __init__(
    self,
    *,
    meter_provider: Any | None = None,
) -> None:
    """Build the OTel instruments.

    ``meter_provider`` defaults to the globally-configured one
    (``opentelemetry.metrics.get_meter_provider()``). Pass an explicit
    provider to bind this emitter to a non-global one — useful in tests
    where multiple emitters share an :class:`InMemoryMetricReader`.

    Raises :class:`ImportError` with an install hint when the
    ``murmur-runtime[otel]`` extra isn't present.
    """
    try:
        from opentelemetry import metrics
    except ImportError as exc:  # pragma: no cover - exercised by extras test
        raise ImportError(_INSTALL_HINT) from exc

    provider = meter_provider or metrics.get_meter_provider()
    meter = provider.get_meter("murmur", "0")
    self._token_usage = meter.create_histogram(
        name="gen_ai.client.token.usage",
        description="Number of tokens used per GenAI client operation.",
        unit="{token}",
    )
    self._operation_duration = meter.create_histogram(
        name="gen_ai.client.operation.duration",
        description="Wall-clock duration of a GenAI client operation.",
        unit="s",
    )
    self._tool_calls = meter.create_counter(
        name="murmur.tool.calls",
        description="Tool invocations dispatched through Murmur's executor.",
        unit="{call}",
    )
    self._tool_duration = meter.create_histogram(
        name="murmur.tool.duration_ms",
        description="Wall-clock duration of a single tool invocation.",
        unit="ms",
    )
    self._rejections = meter.create_counter(
        name="murmur.rejections",
        description=(
            "Run-level rejections by typed reason "
            "(budget, depth, cycle, cap, timeout, trust, validation)."
        ),
        unit="{rejection}",
    )