Migrating from FastStream¶
If you already have a FastStream-driven service consuming task messages and producing results, Murmur slots in at the dispatch boundary — keeping your broker semantics untouched while adding the Murmur policy gate, lifecycle hooks, and registry.
Why migrate¶
FastStream gives you broker abstraction across Kafka / NATS / RabbitMQ / Redis. What it doesn't give you for an agent fleet:
- An Agent contract. FastStream subscribers receive raw messages; you build the validation, retry, and tool-execution scaffolding yourself. Murmur ships that.
- Trust enforcement. Same as PydanticAI — FastStream has no notion of
per-call policy. Murmur layers
ToolExecutorover every dispatch. - A registry. Murmur's
YamlRegistry+InMemoryRegistryresolve agents by name. With FastStream alone you wire each subscriber by hand. - Embedded mode.
AgentRoutermounts on any FastAPI app — share auth, middleware, and lifespan with your existing service. - Observability events. Typed
RuntimeEventemitters fan out to log / SSE / broker bridge sinks without touching the agent code.
You keep FastStream where it earns its keep — the actual broker transport — and add Murmur on top.
Cookbook¶
Replace a FastStream subscriber with a Murmur Worker¶
The most common migration path. Before:
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
@broker.subscriber("research-tasks")
async def handle_task(message: TaskPayload) -> ResultPayload:
...
After:
from murmur import Agent, AgentRuntime
from murmur.worker import Worker
researcher = Agent(
name="research-minion",
model="anthropic:claude-sonnet-4-6",
instructions="...",
output_type=ResearchFinding,
)
runtime = AgentRuntime() # in-process internally
worker = Worker(
runtime=runtime,
broker="kafka://localhost:9092",
agents=("research-minion",),
concurrency=20,
)
await worker.start()
Murmur generates the per-agent task topic + {agent}.results reply topic
automatically. The wire envelope (TaskMessage / ResultMessage) is
defined in murmur.messages — primitive fields (success: bool,
output_payload: dict, error_message: str) so generic BaseModel
serialisation isn't a problem.
Expose a Murmur agent as a FastStream handler¶
If you have an existing FastStream broker and want to plug a Murmur agent
into it without running a Worker:
from murmur.interop import as_faststream_handler
handler = as_faststream_handler(agent, runtime=runtime)
broker.subscriber("research-tasks")(handler)
This is the inverse of from_pydantic_ai — murmur.interop is the only
place allowed to import faststream directly.
Migrate broker imports¶
# before — direct FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
# after — Murmur runtime parses the URL internally
from murmur import AgentRuntime
runtime = AgentRuntime(broker="kafka://localhost:9092")
The runtime constructs the right FastStream broker (KafkaBroker,
NatsBroker, RabbitBroker, RedisBroker) from the URL scheme. You
never import the concrete broker class.
Lifecycle hooks¶
FastStream has subscriber lifecycle via decorators; Murmur's Worker
offers the same shape:
@worker.on_task_start
async def on_start(task_id: str, agent_name: str) -> None:
metrics.task_started.inc(agent=agent_name)
@worker.on_task_complete
async def on_complete(task_id: str, agent_name: str, duration_ms: int) -> None:
metrics.task_completed.observe(agent=agent_name, duration=duration_ms)
@worker.on_task_error
async def on_error(task_id: str, agent_name: str, error: Exception) -> None:
metrics.task_failed.inc(agent=agent_name, error_type=type(error).__name__)
Auto-discover agents from a registry¶
Every YAML spec under ./specs becomes a subscriber. With raw FastStream
you'd wire each one manually.
What does not change¶
- Broker semantics — at-least-once, ordering guarantees, partition keys — are FastStream's, not Murmur's. Murmur doesn't change them.
- Authentication to the broker — TLS, SASL, etc. — passes through via the URL or the broker arguments.
- Topic naming can be customised (Murmur defaults to
murmur.{agent_name}.tasksandmurmur.{agent_name}.results). - Existing FastStream middleware continues to work for transport concerns; Murmur's pipeline middleware sits at a different layer — the two are layered (broker → pipeline → backend dispatch → result), not interchangeable.
Incremental adoption path¶
- Wrap one agent in a
Workerand run it against your existing broker. Confirm the wire envelope behaves correctly. - Add the policy gate. Set
trust_level=on the agent and audit which tools fire under each level. - Add observability. Wire
LogEventEmitter(default — already on) plusSSEEventEmitterfor a live event stream, orBrokerEventBridgefor centralised dashboards. - Move from one-off subscribers to a registry-backed fleet. Use
--all-fromto auto-discover agents in YAML.