Skip to content

Interop

Migration adapters between Murmur and the underlying libraries. Only this package may import pydantic_ai or faststream directly — the public API rule.

from murmur.interop import as_faststream_handler, from_pydantic_ai

See the migration guides for worked examples:

from_pydantic_ai

from_pydantic_ai

from_pydantic_ai(
    pydantic_ai_agent: Agent,
    *,
    name: str,
    output_type: type[BaseModel],
    trust_level: TrustLevel = MEDIUM,
    model: str | None = None,
    instructions: str | None = None,
) -> Agent

Wrap an existing PydanticAI Agent into a Murmur :class:Agent.

Extracts model (as "{system}:{model_name}") and instructions from the PydanticAI agent's internals; you can override either via the matching kwarg if extraction picks up something unhelpful (notably for TestModel, which has system="test"). output_type is required — PydanticAI's internal output schema is wrapped, so we don't try to excavate the user's original Pydantic class. Tools are not extracted — re-register them on Murmur's :class:murmur.tools.ToolRegistry directly (they execute through Murmur's policy gate, not the agent's).

from pydantic_ai import Agent as PAAgent from murmur.interop import from_pydantic_ai mu_agent = from_pydantic_ai(my_pa_agent, name="researcher", output_type=Finding)

Source code in src/murmur/interop/pydantic_ai.py
def from_pydantic_ai(
    pydantic_ai_agent: PydanticAIAgent,
    *,
    name: str,
    output_type: type[BaseModel],
    trust_level: TrustLevel = TrustLevel.MEDIUM,
    model: str | None = None,
    instructions: str | None = None,
) -> Agent:
    """Wrap an existing PydanticAI ``Agent`` into a Murmur :class:`Agent`.

    Extracts ``model`` (as ``"{system}:{model_name}"``) and ``instructions``
    from the PydanticAI agent's internals; you can override either via the
    matching kwarg if extraction picks up something unhelpful (notably for
    ``TestModel``, which has ``system="test"``). ``output_type`` is required
    — PydanticAI's internal output schema is wrapped, so we don't try to
    excavate the user's original Pydantic class. Tools are not extracted —
    re-register them on Murmur's :class:`murmur.tools.ToolRegistry` directly
    (they execute through Murmur's policy gate, not the agent's).

    >>> from pydantic_ai import Agent as PAAgent
    >>> from murmur.interop import from_pydantic_ai
    >>> mu_agent = from_pydantic_ai(my_pa_agent, name="researcher", output_type=Finding)
    """
    pa = pydantic_ai_agent

    extracted_model: str
    if model is not None:
        extracted_model = model
    else:
        # ``pa._model`` is typed as ``Model | str | None`` — cast to ``Any``
        # so we can branch on the runtime shape without ty fighting us. This
        # is the adapter; this is the place that knows about PA internals.
        pa_model = cast("Any", pa._model)  # noqa: SLF001 — sanctioned
        if isinstance(pa_model, str):
            extracted_model = pa_model
        else:
            try:
                extracted_model = f"{pa_model.system}:{pa_model.model_name}"
            except AttributeError as exc:  # pragma: no cover — pa-version drift
                raise ValueError(
                    "Could not extract model spec from the PydanticAI agent — "
                    "pass `model='provider:name'` explicitly."
                ) from exc

    extracted_instructions: str
    if instructions is not None:
        extracted_instructions = instructions
    else:
        raw_instructions: Any = pa._instructions  # noqa: SLF001
        if isinstance(raw_instructions, list):
            parts = [s for s in raw_instructions if isinstance(s, str)]
            extracted_instructions = " ".join(parts) if parts else ""
        elif isinstance(raw_instructions, str):
            extracted_instructions = raw_instructions
        else:
            extracted_instructions = ""

    return Agent(
        name=name,
        model=extracted_model,
        instructions=extracted_instructions,
        output_type=output_type,
        trust_level=trust_level,
    )

as_faststream_handler

as_faststream_handler

as_faststream_handler(
    agent: Agent, *, runtime: AgentRuntime | None = None
) -> Callable[[TaskSpec], Awaitable[AgentResult[BaseModel]]]

Adapt agent to a FastStream message handler signature.

Returns an async callable (task: TaskSpec) -> AgentResult[BaseModel] that the user can register as a @broker.subscriber("topic") handler in their existing FastStream application. The agent runs through runtime (a fresh :class:AgentRuntime if not supplied — thread mode, no broker, no tools registered).

For a fuller integration where Murmur owns broker lifecycle and workers, mount :class:murmur.server.AgentRouter instead.

from faststream.kafka import KafkaBroker from murmur.interop import as_faststream_handler broker = KafkaBroker("localhost:9092") handler = as_faststream_handler(my_agent) broker.subscriber("research.tasks")(handler)

Source code in src/murmur/interop/faststream.py
def as_faststream_handler(
    agent: Agent,
    *,
    runtime: AgentRuntime | None = None,
) -> Callable[[TaskSpec], Awaitable[AgentResult[BaseModel]]]:
    """Adapt ``agent`` to a FastStream message handler signature.

    Returns an async callable ``(task: TaskSpec) -> AgentResult[BaseModel]``
    that the user can register as a ``@broker.subscriber("topic")`` handler
    in their existing FastStream application. The agent runs through
    ``runtime`` (a fresh :class:`AgentRuntime` if not supplied — thread
    mode, no broker, no tools registered).

    For a fuller integration where Murmur owns broker lifecycle and
    workers, mount :class:`murmur.server.AgentRouter` instead.

    >>> from faststream.kafka import KafkaBroker
    >>> from murmur.interop import as_faststream_handler
    >>> broker = KafkaBroker("localhost:9092")
    >>> handler = as_faststream_handler(my_agent)
    >>> broker.subscriber("research.tasks")(handler)
    """
    # Lazy import — keep the module importable even when the runtime
    # subpackage hasn't been touched yet, and avoid an import cycle on
    # ``murmur.runtime`` → ``murmur.interop`` (which it doesn't have today,
    # but we want to keep optional).
    from murmur.runtime import AgentRuntime as _AgentRuntime

    rt: AgentRuntime = runtime if runtime is not None else _AgentRuntime()

    async def handler(task: TaskSpec) -> AgentResult[BaseModel]:
        return await rt.run(agent, task)

    return handler