Runtime¶
AgentRuntime¶
AgentRuntime
¶
AgentRuntime(
*,
broker: str | None = None,
broker_instance: Broker | None = None,
runtime_id: str | None = None,
registry: Registry | None = None,
backend: Backend | None = None,
tool_registry: ToolRegistry | None = None,
tool_executor: ToolExecutor | None = None,
options: RuntimeOptions | None = None,
event_emitter: EventEmitter | None = None,
publish_events: bool = False,
)
The orchestration runtime.
runtime = AgentRuntime() # AsyncBackend runtime = AgentRuntime(broker="memory://") # JobBackend, in-proc runtime = AgentRuntime(broker="kafka://...") # JobBackend, real broker
The backend / broker_instance / registry / tool_registry
/ tool_executor keyword arguments are escape hatches for tests and
advanced users; production code should rely on the broker-URL parsing
above.
Source code in src/murmur/runtime.py
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 | |
event_emitter
property
¶
event_emitter: EventEmitter
The runtime's event sink. Pass event_emitter= at init to
substitute a custom one (e.g. MultiEventEmitter for SSE +
log fan-out).
spawn_count
property
¶
Total dispatches accepted by this runtime (top-level + cascaded).
Read-only — useful for tests and observability. Compare against
:attr:RuntimeOptions.max_total_spawns to gauge headroom.
run
async
¶
run(agent: Agent | str, task: TaskSpec) -> AgentResult[BaseModel]
Run a single agent against a single task. Returns a typed result.
Wires the configured middleware (timeout, depth-limit, optional
retry) around backend dispatch via :class:Pipeline. gather is
unaffected — its per-slot path bypasses middleware to keep batch
semantics simple. Tune per-run behavior via :class:RuntimeOptions
passed to :meth:__init__.
Cascading-spawn semantics: when this call originates from inside
another agent's run (the spawn_agents tool, for instance), the
runtime reads the parent's frame from the _current_spawn
contextvar, derives the child :class:AgentContext (depth + 1,
ancestors + parent_name, parent_trace_id = parent's trace_id), and
rejects cycles before any backend work. The runtime's per-instance
_spawn_count is incremented on every accepted dispatch and
rejects further runs with :class:SpawnCapError once
:attr:RuntimeOptions.max_total_spawns is reached.
Cycle detection is name-based, not run-id-based. Agent.name
is the registry key and the canonical identity for an agent in
Murmur's model — two runtime.run calls on the same agent with
different inputs are still the same agent re-entering itself, and
that's the runaway case we want to catch. If a workflow needs the
same logic at a deeper level, give the deeper instance a distinct
name (worker-rev2 etc.) — that disambiguates intent and keeps
the cycle guard meaningful. For workflows that genuinely require
bounded reuse of a single registered name on the chain, opt into
:attr:RuntimeOptions.cycle_policy "permissive" and own
termination yourself (depth + cap remain enforced).
Source code in src/murmur/runtime.py
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 | |
gather
async
¶
gather(
agent: Agent | str,
tasks: Sequence[TaskSpec],
*,
max_concurrency: int = 100,
fail_fast: bool = False,
) -> list[AgentResult[BaseModel]]
Fan a single agent across many tasks. Bounded by max_concurrency.
Delegates to backend.gather when the backend implements one
(AsyncBackend uses an asyncio.Queue + worker pool;
JobBackend publishes via the ResultCollector). Falls
back to a semaphore-bounded fan-out otherwise. Default
(fail_fast=False): per-task failures always land in their
slot's :attr:AgentResult.error — never raises on partial failure.
fail_fast=True: re-raises the first task's error from the
gathered slots after the batch settles (we still wait for in-flight
tasks to finish so partial results aren't dropped).
:attr:RuntimeOptions.timeout_seconds applies to the whole batch
(matching :meth:run's pipeline-level wrapping). When the wall
clock fires before the backend gather settles, the
:class:asyncio.TimeoutError is translated into
:class:SpawnError — slots already claimed against
max_total_spawns stay claimed.
Source code in src/murmur/runtime.py
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 | |
run_sync
¶
run_sync(agent: Agent | str, task: TaskSpec) -> AgentResult[BaseModel]
Blocking variant of :meth:run for notebook / REPL / script use.
Internally :func:asyncio.run. Cannot be called from inside a
running event loop — raises :class:RuntimeError instead, with
a pointer to the async variant. Mirrors PydanticAI's
Agent.run_sync and the rest of the project's sync API surface.
Source code in src/murmur/runtime.py
gather_sync
¶
gather_sync(
agent: Agent | str,
tasks: Sequence[TaskSpec],
*,
max_concurrency: int = 100,
fail_fast: bool = False,
) -> list[AgentResult[BaseModel]]
Blocking variant of :meth:gather. Same caller restrictions as
:meth:run_sync.
Source code in src/murmur/runtime.py
run_group
async
¶
run_group(
group: AgentGroup | AgentTeam, task: TaskSpec
) -> AgentResult[BaseModel] | GroupResult
Walk an AgentGroup topology against task.
Returns one of two shapes depending on how many terminal nodes actually fire:
- Exactly one terminal — typical single-leaf or branch-routed
topology — returns a plain :class:
AgentResult. Identical to the pre-multi-terminal contract. - More than one terminal — moderator-and-specialists shape
where each leaf is its own terminal — returns a
:class:
GroupResultkeyed byAgent.namewith aggregate metadata (summed tokens, max duration,backend="group").
Failed slots in fan-out tiers are filtered before downstream
mappers run; if every slot in a tier fails, raises
:class:murmur.core.errors.AllAgentsFailedError.
Emits :data:EventType.GROUP_STARTED before traversal and
:data:EventType.GROUP_COMPLETED after the terminal result settles.
Per-agent events (AGENT_SPAWNED, AGENT_COMPLETED etc.) come
from each step's underlying :meth:run call.
Source code in src/murmur/runtime.py
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 | |
shutdown
async
¶
Release runtime-owned resources.
Three cleanup paths run in sequence:
- Eager-start supervisors (mp5) — when
:attr:
RuntimeOptions.mcp_eager_startis True, one supervisor task per provider holds the MCP context open. Setting each shutdown event lets the supervisors exitprovider.stop()on the same task that calledprovider.start(), which is what anyio's cancel scopes require. - Manually pre-warmed providers — providers a user
pre-warmed by calling
await provider.start()themselves get astop()here as a safety net. Providers in eager-start mode are already stopped by their supervisor; the secondstop()is a no-op. - Broker-mode runtimes additionally need
await backend.stop()— :class:AgentServer/ :class:AgentRouterlifespan already drives that.
Source code in src/murmur/runtime.py
RuntimeOptions¶
RuntimeOptions
¶
Frozen tuning knobs for :class:AgentRuntime.
Wraps the per-run middleware pipeline (timeout, retry, depth-limit) so callers can dial individual knobs without subclassing or constructing middleware directly. Defaults are safe — timeout is generous, retry is off, depth limit only kicks in for cascading sub-agent spawns.
runtime = AgentRuntime(options=RuntimeOptions(timeout_seconds=60))
timeout_seconds
class-attribute
instance-attribute
¶
Cancel the run after this long. TimeoutMiddleware translates the
underlying :class:asyncio.TimeoutError into a :class:SpawnError.
max_spawn_depth
class-attribute
instance-attribute
¶
Cap on cascading agent spawns — rejects runs whose
AgentContext.depth is already at or above this value. Top-level
runs have depth 0, so the limit only matters once the sub-agent
spawning path is in use.
max_total_spawns
class-attribute
instance-attribute
¶
Optional per-runtime kill switch for total dispatches over the
runtime's lifetime. None (default) = unbounded — what long-lived
workers and servers want.
When set, every run() and every backend-native gather() slot
decrements the budget; once exhausted, further dispatches fail with
:class:SpawnCapError and the counter never resets. Independent of
token budget — a runaway cascade hits this before the cost meter
catches up. Use it as an explicit opt-in safety rail (e.g. tests that
exercise the cap, or short-lived process boundaries) — leave it
None for any runtime that handles ongoing traffic.
cycle_policy
class-attribute
instance-attribute
¶
Cycle-detection policy for cascading sub-spawns.
"strict" (default) rejects any runtime.run / runtime.gather
whose target Agent.name already appears on the parent chain
(ancestors + immediate parent), raising :class:SpawnCycleError
before any backend work. This is the safe default — bounded reuse
patterns like reviewer → fact_checker → reviewer are
structurally indistinguishable from runaway recursion at the
runtime level, and most callers want the guard.
"permissive" skips the cycle check entirely. Termination
becomes the caller's responsibility — typically by tracking
iteration counts in tool arguments / agent inputs, or by relying
on :attr:max_spawn_depth and :attr:max_total_spawns, both of
which remain enforced regardless of this setting. Use this when a
legitimate workflow needs the same registered agent name to recur
on the chain (e.g. a critic loop with an explicit external
counter); avoid it on any runtime that runs untrusted prompts or
where bugs in tool plumbing could let an LLM ask for the same
agent forever.
retry_max_attempts
class-attribute
instance-attribute
¶
1 (default) means no retry. Set to 2+ to enable
:class:RetryMiddleware on transient :class:SpawnError.
retry_backoff_factor
class-attribute
instance-attribute
¶
Multiplicative backoff between retries
(backoff_factor ** attempt seconds).
token_budget
class-attribute
instance-attribute
¶
token_budget: TokenBudget | None = None
Optional token-cost ceiling for the runtime. None (default)
disables cost tracking. Construct via
:class:murmur.middleware.TokenBudget(limit=...). Once the budget is
exhausted, subsequent runtime.run / runtime.gather calls fail
with :class:BudgetExceededError before dispatch and emit a
:data:EventType.BUDGET_EXCEEDED event.
broker_signing_key
class-attribute
instance-attribute
¶
Optional symmetric HMAC-SHA256 key for authenticating broker
envelopes — opt-in. Default None preserves the documented "broker
is trusted" baseline: no signature is computed or verified.
When set on a publisher runtime, :class:murmur.backends.JobBackend
signs every outbound :class:murmur.messages.TaskMessage over its
safety-relevant fields (agent_name, request_id,
parent_spawn) before publishing. The matching worker — built
with :class:murmur.worker.Worker(..., signing_key=...) — verifies
on receive and rejects mismatched / missing signatures with a
structured failure :class:murmur.messages.ResultMessage so the
publisher's :meth:AgentRuntime.run resolves cleanly with
result.error set rather than the worker crashing.
Recommended length is at least 32 random bytes
(secrets.token_bytes(32)). Pass them as raw bytes — there is
no key-derivation layer. For key rotation, the worker accepts a
sequence of keys (signing_key=(new, old)) and verifies against
any; the publisher always signs with one — roll new workers first,
swap the publisher, then drop old.
mcp_eager_start
class-attribute
instance-attribute
¶
Hold MCP toolset providers open across runs via supervisor tasks.
Default False — every dispatch re-enters the MCP server's context
(PydanticAI does this internally on each list_tools /
direct_call_tool), respawning the stdio subprocess each time.
Cheap for low-frequency calls; wasteful at high throughput.
When True, :class:AgentRuntime spawns one supervisor task per
provider on first dispatch. The supervisor enters the provider's
context once (spawning the subprocess), holds the entry open until
:meth:AgentRuntime.shutdown signals shutdown, then releases it.
Other dispatch calls re-enter the same context — PydanticAI's
:class:MCPServer ref-counts entries, so the inner enter / exit
pairs are no-ops while the supervisor holds the outer entry.
Because anyio cancel scopes are task-bound, __aenter__ and
__aexit__ must run on the same asyncio task; the supervisor
pattern guarantees this. Always pair with a :meth:shutdown call
(or rely on :class:AgentRouter / :class:AgentServer lifespan,
which call it automatically) — otherwise the held subprocess leaks
until process exit.