Skip to content

Server

HTTP-facing surface for hosted agents. AgentServer is the standalone form (murmur serve); AgentRouter is the embedded form mounted on a user-supplied FastAPI app.

from murmur.server import AgentRouter, AgentServer, ErrorResponse

The [server] extra is required for these imports — see Installation.

AgentServer

AgentServer

AgentServer(
    *,
    runtime: AgentRuntime | None = None,
    run_store: RunStore | None = None,
    drain_timeout: float = 30.0,
    sse_emitter: SSEEventEmitter | None = None,
    dashboard_dir: Path | None = None,
    event_store: EventStore | None = None,
    auth_token: str | None = None,
)

Registers agents / groups and serves them via FastAPI.

server = AgentServer() server.register(my_agent) server.register_group(my_crew) await server.serve(port=8421)

For tests, build the app via :meth:app and drive it with httpx.AsyncClient(transport=httpx.ASGITransport(app=server.app)).

Source code in src/murmur/server/app.py
def __init__(
    self,
    *,
    runtime: AgentRuntime | None = None,
    run_store: RunStore | None = None,
    drain_timeout: float = 30.0,
    sse_emitter: SSEEventEmitter | None = None,
    dashboard_dir: Path | None = None,
    event_store: EventStore | None = None,
    auth_token: str | None = None,
) -> None:
    from murmur.runtime import AgentRuntime as _AgentRuntime

    # Static bearer-token guard. ``None`` (default) means anonymous access
    # — preserves the existing behaviour and aligns with the "exposure
    # surfaces opt-in" rule. When set, every non-health route on the HTTP
    # app and the MCP HTTP transport requires an
    # ``Authorization: Bearer <token>`` header that matches.
    self._auth_token: str | None = auth_token

    self._runtime: AgentRuntime = runtime or _AgentRuntime()
    self._run_store: RunStore = run_store or InMemoryRunStore()
    self._agents: dict[str, Agent] = {}
    self._groups: dict[str, AgentGroup] = {}
    self._drain_timeout = drain_timeout
    # When set, the server adds a ``GET /events/stream`` route streaming
    # every :class:`RuntimeEvent` enqueued onto the emitter to connected
    # SSE subscribers. The caller is responsible for wiring this same
    # emitter into ``runtime.event_emitter`` (typically via
    # :class:`MultiEventEmitter`) so events actually land here. Left
    # ``None`` to opt out — embedded mounts that don't want a public
    # event firehose just leave this off.
    self._sse_emitter: SSEEventEmitter | None = sse_emitter
    # Opt-in static-bundle mount for the read-only dashboard. Off by
    # default — operators must pass an explicit directory containing
    # ``index.html`` to expose it.
    self._dashboard_dir: Path | None = dashboard_dir
    # Optional persistent EventStore. When set, the server adds
    # ``GET /runs``, ``GET /runs/{trace_id}``, and ``GET /events``
    # routes that read from the store.
    self._event_store: EventStore | None = event_store
    # MCP exposure is opt-in at two levels: ``register_mcp`` enrolls a
    # specific agent; ``serve_mcp`` activates the surface. ``register``
    # alone (HTTP-only) does NOT touch this dict, so an agent registered
    # for HTTP is invisible to MCP clients unless the operator explicitly
    # opts in.
    self._mcp_enrollments: dict[str, MCPEnrollment] = {}
    self._active_runs: set[str] = set()
    self._shutting_down: bool = False
    self._app: FastAPI = self._build_app()

register

register(agent: Agent) -> None

Register an agent under its agent.name. Replaces by name.

Source code in src/murmur/server/app.py
def register(self, agent: Agent) -> None:
    """Register an agent under its ``agent.name``. Replaces by name."""
    self._agents[agent.name] = agent

register_group

register_group(group: AgentGroup) -> None

Register a group under its group.name.

Source code in src/murmur/server/app.py
def register_group(self, group: AgentGroup) -> None:
    """Register a group under its ``group.name``."""
    # Auto-register the group's agents so /agents/{name} also works.
    for a in group.agents:
        self._agents.setdefault(a.name, a)
    self._groups[group.name] = group

register_mcp

register_mcp(
    agent: Agent, *, tool_name: str | None = None, description: str | None = None
) -> None

Enroll an agent for MCP exposure — opt-in, distinct from :meth:register.

register() makes an agent reachable over HTTP; this method additionally exposes it as an MCP tool that clients (Claude Desktop, Cursor, MCP Inspector, …) call once :meth:serve_mcp is running. Agents registered only with register() stay invisible to MCP clients.

tool_name defaults to agent.name; override when you want a public-facing name distinct from the internal one (e.g. agent "researcher-v3" → tool "research"). description defaults to a truncated agent.instructions and is what the calling LLM reads to decide when to invoke the tool — make it specific.

The agent is also auto-registered for the runtime so the MCP bridge can dispatch via runtime.run without an extra server.register(agent) call. Re-enrolling an agent under the same tool_name replaces the previous entry.

Source code in src/murmur/server/app.py
def register_mcp(
    self,
    agent: Agent,
    *,
    tool_name: str | None = None,
    description: str | None = None,
) -> None:
    """Enroll an agent for MCP exposure — opt-in, distinct from
    :meth:`register`.

    ``register()`` makes an agent reachable over HTTP; this method
    additionally exposes it as an MCP tool that clients (Claude
    Desktop, Cursor, MCP Inspector, …) call once :meth:`serve_mcp`
    is running. Agents registered only with ``register()`` stay
    invisible to MCP clients.

    ``tool_name`` defaults to ``agent.name``; override when you
    want a public-facing name distinct from the internal one
    (e.g. agent ``"researcher-v3"`` → tool ``"research"``).
    ``description`` defaults to a truncated ``agent.instructions``
    and is what the calling LLM reads to decide when to invoke the
    tool — make it specific.

    The agent is also auto-registered for the runtime so the MCP
    bridge can dispatch via ``runtime.run`` without an extra
    ``server.register(agent)`` call. Re-enrolling an agent under
    the same ``tool_name`` replaces the previous entry.
    """
    from murmur.mcp_server import MCPEnrollment

    # Auto-register on the runtime + HTTP map. The bridge needs the
    # agent reachable by name; making the operator manage two
    # registries (HTTP and MCP) for the same physical agent would be
    # an obvious footgun. Per-agent MCP opt-in is preserved because
    # this method is distinct from ``register``.
    self._agents.setdefault(agent.name, agent)

    resolved_tool_name = tool_name if tool_name is not None else agent.name
    resolved_description = (
        description
        if description is not None
        else _summarise_instructions(agent.instructions)
    )
    self._mcp_enrollments[resolved_tool_name] = MCPEnrollment(
        agent=agent,
        tool_name=resolved_tool_name,
        description=resolved_description,
    )

serve_mcp async

serve_mcp(
    *,
    transport: Literal["stdio", "http"] = "stdio",
    server_name: str = "murmur",
    instructions: str | None = None,
    host: str = "127.0.0.1",
    port: int = 8765,
) -> None

Run the MCP server.

Blocks until the transport exits (Ctrl-C for stdio; standard ASGI shutdown for HTTP). Constructs a fresh :class:FastMCP per call so multiple invocations on the same server work cleanly. Raises :class:ImportError with a setup hint if the murmur-runtime[mcp-server] extra isn't installed.

Only agents added via :meth:register_mcp appear as tools. If no agents are enrolled, raises :class:RegistryError rather than silently starting an empty server.

Source code in src/murmur/server/app.py
async def serve_mcp(
    self,
    *,
    transport: Literal["stdio", "http"] = "stdio",
    server_name: str = "murmur",
    instructions: str | None = None,
    host: str = "127.0.0.1",
    port: int = 8765,
) -> None:
    """Run the MCP server.

    Blocks until the transport exits (Ctrl-C for stdio; standard
    ASGI shutdown for HTTP). Constructs a fresh :class:`FastMCP`
    per call so multiple invocations on the same server work
    cleanly. Raises :class:`ImportError` with a setup hint if the
    ``murmur-runtime[mcp-server]`` extra isn't installed.

    Only agents added via :meth:`register_mcp` appear as tools. If
    no agents are enrolled, raises :class:`RegistryError` rather
    than silently starting an empty server.
    """
    if not self._mcp_enrollments:
        raise RegistryError(
            "no agents enrolled for MCP — call register_mcp(agent) "
            "for at least one agent before serve_mcp()"
        )
    # Lazy import — keeps ``import murmur.server`` free of the mcp
    # SDK when the extra isn't installed.
    from murmur.mcp_server._server import serve as _mcp_serve

    await _mcp_serve(
        runtime=self._runtime,
        enrollments=tuple(self._mcp_enrollments.values()),
        transport=transport,
        server_name=server_name,
        instructions=instructions,
        host=host,
        port=port,
        auth_token=self._auth_token,
    )

serve async

serve(port: int = 8421, host: str = '0.0.0.0') -> None

Start the server. Handles SIGTERM / SIGINT for graceful shutdown.

Source code in src/murmur/server/app.py
async def serve(self, port: int = 8421, host: str = "0.0.0.0") -> None:
    """Start the server. Handles SIGTERM / SIGINT for graceful shutdown."""
    import uvicorn

    config = uvicorn.Config(
        self._app, host=host, port=port, log_level="info", lifespan="on"
    )
    server = uvicorn.Server(config)
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        with _suppress_value_error():
            loop.add_signal_handler(sig, lambda: self._initiate_shutdown(server))
    await server.serve()

AgentRouter

AgentRouter

AgentRouter(
    *,
    runtime: AgentRuntime | None = None,
    run_store: RunStore | None = None,
    drain_timeout: float = 30.0,
    server: AgentServer | None = None,
    start_workers: bool = True,
    worker_runtime: AgentRuntime | None = None,
    worker_concurrency: int = 10,
    prefix: str = "",
    tags: list[str] | None = None,
    sse_emitter: SSEEventEmitter | None = None,
)

A FastAPI APIRouter that exposes registered agents and groups.

Constructable standalone or backed by a pre-built :class:AgentServer. Users typically pass a :class:~murmur.AgentRuntime and let the router construct the server internally; advanced users can build the server themselves and pass it in via server= for shared-state scenarios.

Source code in src/murmur/server/router.py
def __init__(
    self,
    *,
    runtime: AgentRuntime | None = None,
    run_store: RunStore | None = None,
    drain_timeout: float = 30.0,
    server: AgentServer | None = None,
    start_workers: bool = True,
    worker_runtime: AgentRuntime | None = None,
    worker_concurrency: int = 10,
    prefix: str = "",
    tags: list[str] | None = None,
    sse_emitter: SSEEventEmitter | None = None,
) -> None:
    if server is not None and (runtime is not None or run_store is not None):
        raise ValueError(
            "pass either `server=` or "
            "(`runtime=`/`run_store=`/`drain_timeout=`) — not both"
        )
    if server is not None and sse_emitter is not None:
        raise ValueError(
            "`sse_emitter=` is for the router-built server; pass it on the "
            "AgentServer you constructed via `server=` instead"
        )

    self._server: AgentServer = server or AgentServer(
        runtime=runtime,
        run_store=run_store,
        drain_timeout=drain_timeout,
        sse_emitter=sse_emitter,
    )
    self._start_workers = start_workers
    # The worker's runtime MUST be in-process — broker-mode would
    # re-publish each consumed task and infinite-loop. ``Worker``
    # defaults to a fresh ``AgentRuntime()`` (in-process) when
    # ``runtime=None`` is forwarded; advanced users override via
    # ``worker_runtime=`` to inject custom tools / middleware /
    # test-model factories without disturbing the publishing runtime.
    self._worker_runtime: AgentRuntime | None = worker_runtime
    self._worker_concurrency = worker_concurrency
    self._worker: Any = None  # lazily built in lifespan when broker-mode

    # FastAPI's ``tags`` accepts ``list[str | Enum]``; we restrict the
    # public surface to ``list[str]`` for simplicity. ``None`` is the
    # default — only forward when the user actually provided one.
    router_kwargs: dict[str, Any] = {
        "prefix": prefix,
        "lifespan": self._lifespan_context,
    }
    if tags is not None:
        router_kwargs["tags"] = list(tags)
    super().__init__(**router_kwargs)

    for route in self._server._build_routes().routes:  # noqa: SLF001
        self.routes.append(route)

server property

server: AgentServer

The backing :class:AgentServer — registry, run-store, drain.

broker property

broker: Any

The underlying FastStream broker (KafkaBroker, NatsBroker, RabbitBroker, RedisBroker) when the runtime is broker-mode, else None.

Use this to register your own @broker.subscriber("user.events") handlers next to Murmur's — they share the same connection and the same lifecycle (started / stopped via the host app's lifespan once 24d lands).

Treated as a documented re-export of FastStream's broker; consult the FastStream docs for its full API. Returns None when the runtime is in-process or when the broker is the in-memory testing broker (which has no FastStream surface).

For "bring your own broker" (the user already has a configured FastStream broker in their app), pass it through to the runtime via the existing AgentRuntime(broker_instance=...) kwarg — wrap the inner broker via :func:murmur.backends._brokers.make_broker (the _fs_broker= constructor seam).

register

register(agent: Agent) -> None

Register an agent. Mirrors :meth:AgentServer.register.

Source code in src/murmur/server/router.py
def register(self, agent: Agent) -> None:
    """Register an agent. Mirrors :meth:`AgentServer.register`."""
    self._server.register(agent)

register_group

register_group(group: AgentGroup) -> None

Register a group. Mirrors :meth:AgentServer.register_group.

Source code in src/murmur/server/router.py
def register_group(self, group: AgentGroup) -> None:
    """Register a group. Mirrors :meth:`AgentServer.register_group`."""
    self._server.register_group(group)

install_exception_handlers classmethod

install_exception_handlers(app: FastAPI) -> None

Attach Murmur's error handlers + request-id middleware to app.

Idempotent on the per-handler level (FastAPI replaces handlers for the same exception type). Call once after :meth:include_router.

Source code in src/murmur/server/router.py
@classmethod
def install_exception_handlers(cls, app: FastAPI) -> None:
    """Attach Murmur's error handlers + request-id middleware to ``app``.

    Idempotent on the per-handler level (FastAPI replaces handlers for the
    same exception type). Call once after :meth:`include_router`.
    """

    @app.middleware("http")
    async def _request_id_middleware(
        request: Request,
        call_next: Callable[[Request], Awaitable[Any]],
    ) -> Any:
        request_id = request.headers.get(_REQUEST_ID_HEADER) or str(uuid.uuid4())
        request.state.request_id = request_id
        structlog.contextvars.bind_contextvars(request_id=request_id)
        try:
            response = await call_next(request)
            response.headers[_REQUEST_ID_HEADER] = request_id
            return response
        finally:
            structlog.contextvars.unbind_contextvars("request_id")

    @app.exception_handler(MurmurError)
    async def _murmur_handler(request: Request, exc: MurmurError) -> JSONResponse:
        request_id = getattr(request.state, "request_id", "")
        return JSONResponse(
            status_code=status_for(exc),
            content=error_to_response(exc, request_id=request_id).model_dump(),
        )

    @app.exception_handler(TimeoutError)
    async def _timeout_handler(request: Request, exc: TimeoutError) -> JSONResponse:
        request_id = getattr(request.state, "request_id", "")
        return JSONResponse(
            status_code=504,
            content=error_to_response(exc, request_id=request_id).model_dump(),
        )

    # silence "defined but unused" — the decorators do the wiring
    _ = (_request_id_middleware, _murmur_handler, _timeout_handler)

ErrorResponse

ErrorResponse

Wire shape returned by the server on any non-2xx response.

error instance-attribute

error: str

Class name of the raised error — e.g. "BudgetExceededError".

message instance-attribute

message: str

Human-readable detail.

agent class-attribute instance-attribute

agent: str | None = None

Which agent failed, if applicable.

task_id class-attribute instance-attribute

task_id: str | None = None

Which task failed, if applicable.

request_id instance-attribute

request_id: str

Always present for correlation across logs / traces.

AgentRouter.install_exception_handlers(app) — classmethod, called once on the host FastAPI app to wire Murmur's domain errors to the HTTP status codes in server/errors.py.