Skip to content

Client

The murmur-client package ships separately and depends only on httpx + pydantic. It deliberately does not import pydantic_ai / faststream or any runtime-side machinery — the client knows the server URL, agent / group names, and JSON schemas; nothing else.

from murmur_client import LocalClient, MurmurClient, Run

Both client classes satisfy a shared _RunBackend Protocol — same call surface, different transport.

MurmurClient

HTTP client. Use when calling a remote AgentServer over the network.

MurmurClient

MurmurClient(
    server_url: str,
    *,
    timeout: float = 30.0,
    transport: AsyncBaseTransport | None = None,
    sync_transport: BaseTransport | None = None,
    auth_token: str | None = None,
)

Async HTTP client for an :class:AgentServer.

Use as an async context manager so the underlying httpx.AsyncClient is closed cleanly. transport is an escape hatch for tests (httpx.ASGITransport(app=server.app)).

Source code in packages/murmur-client/src/murmur_client/client.py
def __init__(
    self,
    server_url: str,
    *,
    timeout: float = 30.0,
    transport: httpx.AsyncBaseTransport | None = None,
    sync_transport: httpx.BaseTransport | None = None,
    auth_token: str | None = None,
) -> None:
    self._server_url = server_url.rstrip("/")
    self._timeout = timeout
    self._sync_transport = sync_transport
    self._auth_token: str | None = auth_token
    # Default headers ride on every request from this client. When an
    # ``auth_token`` is set we attach a static ``Authorization: Bearer``
    # — matches the static-token guard on :class:`AgentServer`.
    default_headers: dict[str, str] = {}
    if auth_token is not None:
        default_headers["Authorization"] = f"Bearer {auth_token}"
    self._http: httpx.AsyncClient = httpx.AsyncClient(
        base_url=self._server_url,
        timeout=timeout,
        transport=transport,
        headers=default_headers,
    )

run_sync

run_sync(
    agent_name: str, task: TaskSpec, *, request_id: str | None = None
) -> AgentResult[BaseModel]

Blocking variant of :meth:run for notebook / REPL / script use.

Opens a one-shot :class:httpx.Client for the call — does not share the persistent httpx.AsyncClient used by the async methods. Cannot be called from inside a running event loop (raises :class:RuntimeError).

Source code in packages/murmur-client/src/murmur_client/client.py
def run_sync(
    self,
    agent_name: str,
    task: TaskSpec,
    *,
    request_id: str | None = None,
) -> AgentResult[BaseModel]:
    """Blocking variant of :meth:`run` for notebook / REPL / script use.

    Opens a one-shot :class:`httpx.Client` for the call — does not
    share the persistent ``httpx.AsyncClient`` used by the async
    methods. **Cannot be called from inside a running event loop**
    (raises :class:`RuntimeError`).
    """
    reject_if_in_event_loop("MurmurClient.run_sync")
    rid = request_id or str(uuid.uuid4())
    with httpx.Client(
        base_url=self._server_url,
        timeout=self._timeout,
        transport=self._sync_transport,
    ) as http:
        r = http.post(
            f"/agents/{agent_name}/run",
            json={"task": task.model_dump(), "request_id": rid},
            headers={_REQUEST_ID_HEADER: rid},
        )
        self._raise_for_status(r)
        return _wire_to_agent_result(r.json())

usage async

usage(*, group_by: str | None = None) -> dict[str, Any]

GET /usage — token usage rollup.

Pass group_by="model" for per-model breakdown (server-side rollup); omit for the runtime-wide total. Returns the raw JSON payload — shape depends on group_by and is documented under docs/concepts/cost.md.

Source code in packages/murmur-client/src/murmur_client/client.py
async def usage(
    self,
    *,
    group_by: str | None = None,
) -> dict[str, Any]:
    """``GET /usage`` — token usage rollup.

    Pass ``group_by="model"`` for per-model breakdown (server-side
    rollup); omit for the runtime-wide total. Returns the raw JSON
    payload — shape depends on ``group_by`` and is documented under
    ``docs/concepts/cost.md``.
    """
    params = {"group_by": group_by} if group_by is not None else None
    r = await self._http.get("/usage", params=params)
    self._raise_for_status(r)
    return dict(r.json())

runtime_stats async

runtime_stats() -> dict[str, Any]

GET /runtime/stats — fleet-wide runtime + worker stats.

Source code in packages/murmur-client/src/murmur_client/client.py
async def runtime_stats(self) -> dict[str, Any]:
    """``GET /runtime/stats`` — fleet-wide runtime + worker stats."""
    r = await self._http.get("/runtime/stats")
    self._raise_for_status(r)
    return dict(r.json())

list_tools async

list_tools() -> list[dict[str, Any]]

GET /tools — registered tool descriptors.

Source code in packages/murmur-client/src/murmur_client/client.py
async def list_tools(self) -> list[dict[str, Any]]:
    """``GET /tools`` — registered tool descriptors."""
    r = await self._http.get("/tools")
    self._raise_for_status(r)
    return list(r.json())

stream_events async

stream_events() -> AsyncIterator[RunEvent]

Subscribe to GET /events/stream — every :class:RuntimeEvent the server emits, not scoped to a single run.

Requires the server to have been built with an :class:SSEEventEmitter. Yields decoded :class:RunEvent payloads; terminate the iteration to disconnect.

Source code in packages/murmur-client/src/murmur_client/client.py
async def stream_events(self) -> AsyncIterator[RunEvent]:
    """Subscribe to ``GET /events/stream`` — every :class:`RuntimeEvent`
    the server emits, not scoped to a single run.

    Requires the server to have been built with an
    :class:`SSEEventEmitter`. Yields decoded :class:`RunEvent` payloads;
    terminate the iteration to disconnect.
    """
    async with self._http.stream("GET", "/events/stream") as response:
        self._raise_for_status(response)
        async for line in response.aiter_lines():
            if not line.startswith("data:"):
                continue
            payload = line[len("data:") :].strip()
            if not payload:
                continue
            try:
                yield RunEvent.model_validate_json(payload)
            except Exception:  # pragma: no cover — malformed lines
                continue

LocalClient

In-process client. Use when calling agents mounted in the same Python process (typically via AgentRouter).

LocalClient

LocalClient(
    *,
    server: AgentServer | None = None,
    runtime: AgentRuntime | None = None,
    run_store: RunStore | None = None,
)

Async in-process client. Same surface as :class:murmur_client.MurmurClient.

Construct with either:

  • server=: wrap an existing :class:AgentServer (typical when the server is already shared with an :class:AgentRouter).
  • runtime= / run_store=: build an internal server from these.

The two constructor shapes are mutually exclusive — pass one or the other, not both.

Source code in src/murmur/client/local.py
def __init__(
    self,
    *,
    server: AgentServer | None = None,
    runtime: AgentRuntime | None = None,
    run_store: RunStore | None = None,
) -> None:
    if server is not None and (runtime is not None or run_store is not None):
        raise ValueError(
            "pass either `server=` or (`runtime=`/`run_store=`) — not both"
        )

    self._server: AgentServer = server or AgentServer(
        runtime=runtime, run_store=run_store
    )

server property

server: AgentServer

The backing server — registry, runtime, run-store.

close async

close() -> None

No-op for the local client — kept for API symmetry with the HTTP client. The backing :class:AgentServer and its runtime own their own lifecycles; we don't tear them down here.

Source code in src/murmur/client/local.py
async def close(self) -> None:
    """No-op for the local client — kept for API symmetry with the
    HTTP client. The backing :class:`AgentServer` and its runtime
    own their own lifecycles; we don't tear them down here.
    """
    return None

run_sync

run_sync(
    agent_name: str, task: TaskSpec, *, request_id: str | None = None
) -> AgentResult[BaseModel]

Blocking variant of :meth:run — wraps the async path in :func:asyncio.run. Cannot be called from inside a running event loop.

Source code in src/murmur/client/local.py
def run_sync(
    self,
    agent_name: str,
    task: TaskSpec,
    *,
    request_id: str | None = None,
) -> AgentResult[BaseModel]:
    """Blocking variant of :meth:`run` — wraps the async path in
    :func:`asyncio.run`. **Cannot be called from inside a running
    event loop**.
    """
    reject_if_in_event_loop("LocalClient.run_sync")
    return asyncio.run(self.run(agent_name, task, request_id=request_id))

Run

Long-lived run handle returned by client.submit().

Run

Run(*, client: _RunBackend, run_id: str, target: str)

Handle for an asynchronously-dispatched run.

Backed by either :class:MurmurClient (HTTP) or :class:murmur_client.LocalClient (in-process); the four _status / _result / _cancel / _stream hooks satisfy a shared Protocol.

Source code in packages/murmur-client/src/murmur_client/client.py
def __init__(self, *, client: _RunBackend, run_id: str, target: str) -> None:
    self._client = client
    self._run_id = run_id
    self._target = target