Interop¶
Migration adapters between Murmur and the underlying libraries. Only
this package may import pydantic_ai or faststream directly — the
public API rule.
See the migration guides for worked examples:
from_pydantic_ai¶
from_pydantic_ai
¶
from_pydantic_ai(
pydantic_ai_agent: Agent,
*,
name: str,
output_type: type[BaseModel],
trust_level: TrustLevel = MEDIUM,
model: str | None = None,
instructions: str | None = None,
) -> Agent
Wrap an existing PydanticAI Agent into a Murmur :class:Agent.
Extracts model (as "{system}:{model_name}") and instructions
from the PydanticAI agent's internals; you can override either via the
matching kwarg if extraction picks up something unhelpful (notably for
TestModel, which has system="test"). output_type is required
— PydanticAI's internal output schema is wrapped, so we don't try to
excavate the user's original Pydantic class. Tools are not extracted —
re-register them on Murmur's :class:murmur.tools.ToolRegistry directly
(they execute through Murmur's policy gate, not the agent's).
from pydantic_ai import Agent as PAAgent from murmur.interop import from_pydantic_ai mu_agent = from_pydantic_ai(my_pa_agent, name="researcher", output_type=Finding)
Source code in src/murmur/interop/pydantic_ai.py
as_faststream_handler¶
as_faststream_handler
¶
as_faststream_handler(
agent: Agent, *, runtime: AgentRuntime | None = None
) -> Callable[[TaskSpec], Awaitable[AgentResult[BaseModel]]]
Adapt agent to a FastStream message handler signature.
Returns an async callable (task: TaskSpec) -> AgentResult[BaseModel]
that the user can register as a @broker.subscriber("topic") handler
in their existing FastStream application. The agent runs through
runtime (a fresh :class:AgentRuntime if not supplied — thread
mode, no broker, no tools registered).
For a fuller integration where Murmur owns broker lifecycle and
workers, mount :class:murmur.server.AgentRouter instead.
from faststream.kafka import KafkaBroker from murmur.interop import as_faststream_handler broker = KafkaBroker("localhost:9092") handler = as_faststream_handler(my_agent) broker.subscriber("research.tasks")(handler)