Skip to content

Tools

Runtime-proxied callables exposed to agents under policy.

from murmur.tools import (
    StaticToolProvider,
    ToolExecutor,
    ToolFunc,
    ToolRegistry,
    mcp_http,
    mcp_sse,
    mcp_stdio,
)
from murmur.tools import (
    AbstractBuiltinTool,
    CodeExecutionTool,
    FileSearchTool,
    ImageGenerationTool,
    MCPServerTool,
    MemoryTool,
    WebFetchTool,
    WebSearchTool,
    XSearchTool,
)

Registry

ToolRegistry

ToolRegistry

ToolRegistry()

In-memory map of tool name → callable. Not a pluggable; data store only.

Source code in src/murmur/tools/registry.py
def __init__(self) -> None:
    # ``Any`` here is genuinely correct, not a workaround: the registry is
    # heterogeneous by design (one slot holds ``ToolFunc[str]``, the next
    # ``ToolFunc[MyModel]``). There's no single ``T`` that fits the dict.
    self._tools: dict[str, ToolFunc[Any]] = {}
register
register(name: str, func: ToolFunc[T]) -> None

Register a tool under name, preserving its return type at the call site.

The method is generic over T so a caller passing ToolFunc[str] keeps the typed view in their own code. The registry erases T to Any on storage (see __init__); retrieval via :meth:get returns ToolFunc[Any] because no caller can know the original T at lookup time.

Source code in src/murmur/tools/registry.py
def register(self, name: str, func: ToolFunc[T]) -> None:
    """Register a tool under ``name``, preserving its return type at the call site.

    The method is generic over ``T`` so a caller passing ``ToolFunc[str]``
    keeps the typed view in their own code. The registry erases ``T`` to
    ``Any`` on storage (see ``__init__``); retrieval via :meth:`get`
    returns ``ToolFunc[Any]`` because no caller can know the original
    ``T`` at lookup time.
    """
    if name in self._tools:
        raise RegistryError(f"tool '{name}' is already registered")
    self._tools[name] = func
unregister
unregister(name: str) -> None

Remove name from the registry. Idempotent — silent on miss.

Used for short-lived tool registrations (e.g. AgentTeam's per-run delegate tool) where the registration scope is one runtime.run_group(team, ...) call rather than the runtime's lifetime.

Source code in src/murmur/tools/registry.py
def unregister(self, name: str) -> None:
    """Remove ``name`` from the registry. Idempotent — silent on miss.

    Used for short-lived tool registrations (e.g. ``AgentTeam``'s
    per-run ``delegate`` tool) where the registration scope is one
    ``runtime.run_group(team, ...)`` call rather than the runtime's
    lifetime.
    """
    self._tools.pop(name, None)

StaticToolProvider

StaticToolProvider

StaticToolProvider(allowed: frozenset[str])

Fixed allow-list ToolProvider.

Source code in src/murmur/tools/registry.py
def __init__(self, allowed: frozenset[str]) -> None:
    self._allowed = allowed

ToolFunc

ToolFunc module-attribute

ToolFunc = Callable[..., Awaitable[T]]

Tool callable parameterised by its result type.

Use bare ToolFunc (or explicitly ToolFunc[Any]) when you don't care about the result type — the registry stores tools heterogeneously and returns ToolFunc[Any]. Use ToolFunc[MyType] in user code to keep the return-type information visible to type checkers when the caller knows the shape of the tool's output.

async def web_search(query: str) -> str: ... typed: ToolFunc[str] = web_search # ty narrows to Awaitable[str] registry.register("web_search", typed) # registry erases T to Any

Executor

ToolExecutor

ToolExecutor

ToolExecutor(registry: ToolRegistry, *, event_emitter: EventEmitter | None = None)

Policy-aware tool dispatcher.

Source code in src/murmur/tools/executor.py
def __init__(
    self,
    registry: ToolRegistry,
    *,
    event_emitter: EventEmitter | None = None,
) -> None:
    self._registry = registry
    # Default emitter forwards every event to structlog with the same
    # event names previously used by direct ``log.ainfo`` calls — so
    # callers asserting on ``capture_logs()`` keep working without
    # opting into the event API.
    self._emitter: EventEmitter = event_emitter or LogEventEmitter()
registry property
registry: ToolRegistry

The :class:ToolRegistry this executor consults at execute time.

Exposed as a public read-only attribute so callers that pass a custom executor can pin runtime/backend registry identity against this value — keeping the registry visible at the agent- build path and at execution-time fall-through aligned.

execute async
execute(
    *,
    agent_name: str,
    task_id: str,
    trust_level: TrustLevel,
    allowed: frozenset[str],
    name: str,
    args: dict[str, object],
    trace_id: str | None = None,
    external_call: Callable[..., Awaitable[object]] | None = None,
    low_trust_overrides: frozenset[str] = frozenset(),
) -> object

Apply policy, emit lifecycle events, dispatch.

external_call is the escape hatch for tools that don't live in the local :class:ToolRegistry — currently MCP-discovered tools, where the callable closes over the originating provider plus the tool name. The same trust + allow-list + lifecycle-event gate runs regardless of which dispatch path the call takes.

low_trust_overrides is the per-call extension to :data:_READ_ONLY_TOOLS. MCP providers pass their explicit allow list here so a user who opts a tool into LOW trust at the provider level isn't blocked by the global read-only set (which only knows about native tools).

trace_id is forwarded into the emitted :class:RuntimeEvent. When None (the default for unwired callers), task_id substitutes — a tool call without a task lineage is rare but possible in tests.

Source code in src/murmur/tools/executor.py
async def execute(
    self,
    *,
    agent_name: str,
    task_id: str,
    trust_level: TrustLevel,
    allowed: frozenset[str],
    name: str,
    args: dict[str, object],
    trace_id: str | None = None,
    external_call: Callable[..., Awaitable[object]] | None = None,
    low_trust_overrides: frozenset[str] = frozenset(),
) -> object:
    """Apply policy, emit lifecycle events, dispatch.

    ``external_call`` is the escape hatch for tools that don't live in the
    local :class:`ToolRegistry` — currently MCP-discovered tools, where the
    callable closes over the originating provider plus the tool name. The
    same trust + allow-list + lifecycle-event gate runs regardless of which
    dispatch path the call takes.

    ``low_trust_overrides`` is the per-call extension to
    :data:`_READ_ONLY_TOOLS`. MCP providers pass their explicit ``allow``
    list here so a user who opts a tool into ``LOW`` trust at the provider
    level isn't blocked by the global read-only set (which only knows
    about native tools).

    ``trace_id`` is forwarded into the emitted :class:`RuntimeEvent`. When
    ``None`` (the default for unwired callers), ``task_id`` substitutes —
    a tool call without a task lineage is rare but possible in tests.
    """
    if trust_level is TrustLevel.SANDBOX:
        raise TrustViolationError(
            f"agent '{agent_name}' has SANDBOX trust — no tools permitted"
        )
    if trust_level is TrustLevel.LOW and name not in (
        _READ_ONLY_TOOLS | low_trust_overrides
    ):
        raise TrustViolationError(
            f"agent '{agent_name}' has LOW trust — '{name}' is not read-only"
        )
    if name not in allowed:
        raise TrustViolationError(
            f"tool '{name}' is not in the allow-list for agent '{agent_name}'"
        )

    func: Callable[..., Awaitable[object]] = (
        external_call if external_call is not None else self._registry.get(name)
    )

    effective_trace_id = trace_id if trace_id is not None else task_id

    await self._emitter.emit(
        RuntimeEvent(
            event_type=EventType.TOOL_CALL_STARTED,
            agent_name=agent_name,
            task_id=task_id,
            trace_id=effective_trace_id,
            payload={"tool_name": name, "trust_level": trust_level.value},
        )
    )
    started = time.perf_counter()
    try:
        result = await func(**args)
    except Exception as exc:
        duration_ms = int((time.perf_counter() - started) * 1000)
        await self._emitter.emit(
            RuntimeEvent(
                event_type=EventType.TOOL_CALL_FAILED,
                agent_name=agent_name,
                task_id=task_id,
                trace_id=effective_trace_id,
                payload={
                    "tool_name": name,
                    "error": str(exc),
                    "duration_ms": duration_ms,
                },
            )
        )
        raise ToolExecutionError(f"tool '{name}' failed: {exc}") from exc

    duration_ms = int((time.perf_counter() - started) * 1000)
    # tokens_used is per-tool LLM cost attribution. The tool function
    # itself doesn't consume tokens — its cost belongs to the LLM
    # round-trip that triggered the call. Until the agent loop reports
    # a delta, this is best-effort 0 so the dashboard's per-tool
    # latency panel still has the field present.
    await self._emitter.emit(
        RuntimeEvent(
            event_type=EventType.TOOL_CALL_COMPLETED,
            agent_name=agent_name,
            task_id=task_id,
            trace_id=effective_trace_id,
            payload={
                "tool_name": name,
                "duration_ms": duration_ms,
                "tokens_used": 0,
            },
        )
    )
    return result

MCP factories

Construct ToolsetProvider instances backed by the three MCP transports. See the MCP concept page for trust matrix, prefixing, and lifecycle modes.

mcp_stdio

mcp_stdio

mcp_stdio(
    command: str,
    args: Sequence[str] = (),
    *,
    env: Mapping[str, str] | None = None,
    cwd: str | Path | None = None,
    allow: Sequence[str] | None = None,
    prefix: str | None = None,
) -> MCPToolsetProvider

Build an :class:MCPToolsetProvider over a stdio MCP server subprocess.

The subprocess is spawned lazily on the first :meth:MCPToolsetProvider.start call (the runtime owns lifecycle).

PARAMETER DESCRIPTION
command

Executable to run (e.g. "npx", "uv").

TYPE: str

args

Positional arguments passed to command.

TYPE: Sequence[str] DEFAULT: ()

env

Environment variables for the child process. None (default) means the child gets no environment, matching PydanticAI's default — pass dict(os.environ) to inherit the parent's.

TYPE: Mapping[str, str] | None DEFAULT: None

cwd

Working directory for the child process.

TYPE: str | Path | None DEFAULT: None

allow

Explicit tool allow-list. See :class:MCPToolsetProvider for trust-level semantics. Allow-list entries match the prefixed tool name when prefix is set.

TYPE: Sequence[str] | None DEFAULT: None

prefix

Optional namespace prefix prepended to every tool name the server reports — e.g. prefix="git_" turns read_file into git_read_file. Lets two MCP servers exposing same-named tools coexist on one agent. Forwarded to PydanticAI's MCPServerStdio(tool_prefix=...).

TYPE: str | None DEFAULT: None

Source code in src/murmur/tools/mcp.py
def mcp_stdio(
    command: str,
    args: Sequence[str] = (),
    *,
    env: Mapping[str, str] | None = None,
    cwd: str | Path | None = None,
    allow: Sequence[str] | None = None,
    prefix: str | None = None,
) -> MCPToolsetProvider:
    """Build an :class:`MCPToolsetProvider` over a stdio MCP server subprocess.

    The subprocess is spawned lazily on the first
    :meth:`MCPToolsetProvider.start` call (the runtime owns lifecycle).

    Args:
        command: Executable to run (e.g. ``"npx"``, ``"uv"``).
        args: Positional arguments passed to ``command``.
        env: Environment variables for the child process. ``None`` (default)
            means the child gets *no* environment, matching PydanticAI's
            default — pass ``dict(os.environ)`` to inherit the parent's.
        cwd: Working directory for the child process.
        allow: Explicit tool allow-list. See
            :class:`MCPToolsetProvider` for trust-level semantics. Allow-list
            entries match the *prefixed* tool name when ``prefix`` is set.
        prefix: Optional namespace prefix prepended to every tool name the
            server reports — e.g. ``prefix="git_"`` turns ``read_file`` into
            ``git_read_file``. Lets two MCP servers exposing same-named
            tools coexist on one agent. Forwarded to PydanticAI's
            ``MCPServerStdio(tool_prefix=...)``.
    """
    return MCPToolsetProvider(
        MCPServerStdio(
            command=command,
            args=list(args),
            env=dict(env) if env is not None else None,
            cwd=cwd,
            tool_prefix=prefix,
        ),
        allow=allow,
    )

mcp_http

mcp_http

mcp_http(
    url: str,
    *,
    headers: Mapping[str, str] | None = None,
    allow: Sequence[str] | None = None,
    prefix: str | None = None,
) -> MCPToolsetProvider

Build an :class:MCPToolsetProvider over a Streamable-HTTP MCP server.

Used for newer MCP servers that speak the streamable-HTTP transport. Use :func:mcp_sse for legacy SSE-only servers. allow is the optional tool allow-list — see :class:MCPToolsetProvider. prefix namespaces every reported tool name (forwarded to PydanticAI's MCPServerStreamableHTTP(tool_prefix=...)) so two HTTP MCP servers with overlapping tool names can coexist on one agent.

Source code in src/murmur/tools/mcp.py
def mcp_http(
    url: str,
    *,
    headers: Mapping[str, str] | None = None,
    allow: Sequence[str] | None = None,
    prefix: str | None = None,
) -> MCPToolsetProvider:
    """Build an :class:`MCPToolsetProvider` over a Streamable-HTTP MCP server.

    Used for newer MCP servers that speak the streamable-HTTP transport.
    Use :func:`mcp_sse` for legacy SSE-only servers. ``allow`` is the
    optional tool allow-list — see :class:`MCPToolsetProvider`. ``prefix``
    namespaces every reported tool name (forwarded to PydanticAI's
    ``MCPServerStreamableHTTP(tool_prefix=...)``) so two HTTP MCP servers
    with overlapping tool names can coexist on one agent.
    """
    return MCPToolsetProvider(
        MCPServerStreamableHTTP(
            url=url,
            headers=dict(headers) if headers is not None else None,
            tool_prefix=prefix,
        ),
        allow=allow,
    )

mcp_sse

mcp_sse

mcp_sse(
    url: str,
    *,
    headers: Mapping[str, str] | None = None,
    allow: Sequence[str] | None = None,
    prefix: str | None = None,
) -> MCPToolsetProvider

Build an :class:MCPToolsetProvider over a Server-Sent-Events MCP server.

allow is the optional tool allow-list — see :class:MCPToolsetProvider. prefix namespaces every reported tool name (forwarded to MCPServerSSE(tool_prefix=...)).

Source code in src/murmur/tools/mcp.py
def mcp_sse(
    url: str,
    *,
    headers: Mapping[str, str] | None = None,
    allow: Sequence[str] | None = None,
    prefix: str | None = None,
) -> MCPToolsetProvider:
    """Build an :class:`MCPToolsetProvider` over a Server-Sent-Events MCP server.

    ``allow`` is the optional tool allow-list — see :class:`MCPToolsetProvider`.
    ``prefix`` namespaces every reported tool name (forwarded to
    ``MCPServerSSE(tool_prefix=...)``).
    """
    return MCPToolsetProvider(
        MCPServerSSE(
            url=url,
            headers=dict(headers) if headers is not None else None,
            tool_prefix=prefix,
        ),
        allow=allow,
    )

Dynamic fan-out

Factory + value types for the LLM-callable spawn_agents tool — see the Agents concept page for the end-to-end usage pattern.

make_spawn_agents_tool

make_spawn_agents_tool

make_spawn_agents_tool(
    *,
    runtime: AgentRuntime,
    template: AgentTemplate,
    output_type: type[BaseModel],
    max_concurrency: int = 10,
) -> Callable[[list[SpawnSpec]], Awaitable[list[SpawnResult]]]

Build an LLM-callable tool that spawns child agents under template.

PARAMETER DESCRIPTION
runtime

The runtime that will dispatch the children. Children are run via runtime.run with the same backend / event emitter / cost budget as any direct call.

TYPE: AgentRuntime

template

Bounds what the LLM can spawn. Trust level, model, and tool surface come from the template; the LLM cannot escalate.

TYPE: AgentTemplate

output_type

Output type shared by every child this tool spawns. (Per-child output types — mode B — are a future extension; this factory enforces a single shape today.)

TYPE: type[BaseModel]

max_concurrency

Cap on simultaneous in-flight children. Defaults to 10.

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
Callable[[list[SpawnSpec]], Awaitable[list[SpawnResult]]]

An async callable suitable for registration on a

Callable[[list[SpawnSpec]], Awaitable[list[SpawnResult]]]

class:ToolRegistry (the runtime's runtime.tools.register(...))

Callable[[list[SpawnSpec]], Awaitable[list[SpawnResult]]]

and forwarding to the parent agent's tools= set.

The returned callable's signature is (specs: list[SpawnSpec]) -> list[SpawnResult] — PydanticAI's schema introspection turns that into the tool's JSON schema for the LLM.

Source code in src/murmur/tools/spawn.py
def make_spawn_agents_tool(
    *,
    runtime: AgentRuntime,
    template: AgentTemplate,
    output_type: type[BaseModel],
    max_concurrency: int = 10,
) -> Callable[[list[SpawnSpec]], Awaitable[list[SpawnResult]]]:
    """Build an LLM-callable tool that spawns child agents under ``template``.

    Args:
        runtime: The runtime that will dispatch the children. Children are
            run via ``runtime.run`` with the same backend / event emitter /
            cost budget as any direct call.
        template: Bounds what the LLM can spawn. Trust level, model, and
            tool surface come from the template; the LLM cannot escalate.
        output_type: Output type shared by every child this tool spawns.
            (Per-child output types — mode B — are a future extension; this
            factory enforces a single shape today.)
        max_concurrency: Cap on simultaneous in-flight children. Defaults to 10.

    Returns:
        An async callable suitable for registration on a
        :class:`ToolRegistry` (the runtime's ``runtime.tools.register(...)``)
        and forwarding to the parent agent's ``tools=`` set.

    The returned callable's signature is ``(specs: list[SpawnSpec]) ->
    list[SpawnResult]`` — PydanticAI's schema introspection turns that
    into the tool's JSON schema for the LLM.
    """
    if max_concurrency < 1:
        raise ValueError("max_concurrency must be >= 1")

    sem = asyncio.Semaphore(max_concurrency)

    async def _run_one(spec: SpawnSpec) -> SpawnResult:
        async with sem:
            try:
                child = template.agent(
                    name=spec.name,
                    instructions=spec.instructions,
                    output_type=output_type,
                )
            except Exception as exc:
                # Materialisation failure (validation error, missing model,
                # etc.) — treat as a child failure rather than propagating.
                return SpawnResult(name=spec.name, success=False, error=str(exc))
            try:
                result = await runtime.run(child, TaskSpec(input=spec.input))
            except Exception as exc:
                return SpawnResult(name=spec.name, success=False, error=str(exc))
            if result.is_ok() and result.output is not None:
                return SpawnResult(
                    name=spec.name,
                    success=True,
                    output=result.output.model_dump(),
                )
            error_msg = (
                str(result.error) if result.error is not None else "unknown failure"
            )
            return SpawnResult(name=spec.name, success=False, error=error_msg)

    async def spawn_agents(specs: list[SpawnSpec]) -> list[SpawnResult]:
        """Spawn child agents in parallel under the bound template.

        Each spec materialises a child via the template (inheriting trust
        level, model, tool surface, etc.) and dispatches it through the
        runtime. Returns one :class:`SpawnResult` per spec, in order, with
        per-child failures captured rather than raised.
        """
        if not specs:
            return []
        return await asyncio.gather(*(_run_one(s) for s in specs))

    return spawn_agents

SpawnSpec

SpawnSpec

One child to spawn — the shape the LLM picks per fan-out slot.

SpawnResult

SpawnResult

One child's outcome — what the parent agent consumes.

name instance-attribute
name: str

The child's :attr:SpawnSpec.name, echoed back so the parent can correlate without tracking order.

success instance-attribute
success: bool

True when the child returned a validated output; False on spawn / dispatch / validation failure.

output class-attribute instance-attribute
output: dict[str, Any] | None = None

The child's output, dumped via Pydantic's model_dump(). None when success is False. Any here covers the heterogeneous payload shapes a child's output_type may produce — typing is statically restored by the parent's downstream handling.

error class-attribute instance-attribute
error: str | None = None

Stringified failure cause when success is False.

Built-in / provider-side tools

These are PydanticAI's AbstractBuiltinTool subclasses, re-exported from murmur.tools so users never import pydantic_ai. They execute on the LLM provider's infrastructure — Anthropic web search, OpenAI code execution, Gemini file search, etc. — and bypass the Murmur ToolExecutor by design. See Tools — built-in / provider-side for the executor-bypass caveat.

Class Provider Notes
WebSearchTool Anthropic, OpenAI, Gemini, Groq Native web search; takes max_uses and allowed_domains.
WebFetchTool Anthropic Fetch a URL and add to the conversation; takes max_uses.
CodeExecutionTool Anthropic, OpenAI, Gemini, Groq Provider-side sandboxed Python execution.
FileSearchTool OpenAI, Gemini Search uploaded files / vector store.
ImageGenerationTool OpenAI, Gemini, Groq Generate images inline.
MemoryTool Anthropic Persistent memory across conversation turns.
MCPServerTool OpenAI Provider-managed MCP servers (distinct from Murmur's MCP consume side).
XSearchTool Grok / xAI X (Twitter) search.
AbstractBuiltinTool Base class. Use the concrete subclasses; this is the common type for Agent.builtin_tools.
from murmur import Agent
from murmur.tools import WebSearchTool, CodeExecutionTool

agent = Agent(
    name="researcher",
    model="anthropic:claude-sonnet-4-6",
    instructions="...",
    output_type=Out,
    builtin_tools=(
        WebSearchTool(max_uses=10),
        CodeExecutionTool(),
    ),
)

For the per-class kwargs, see PydanticAI's built-in tools docs.