Skip to content

Groups

Declarative multi-agent topologies. AgentGroup holds a frozen DAG of agents connected by Edge objects; the runner is invoked through AgentRuntime.run_group().

from murmur import AgentGroup, Edge, FanOut
from murmur.groups import EdgeMapper, get_fan_out_field

AgentGroup

AgentGroup dataclass

AgentGroup(name: str, topology: Mapping[Agent, EdgeOrEdges] = dict())

A named DAG of agents.

crew = AgentGroup( ... name="research", ... topology={ ... head: Edge(to=(minion,), mapper=head_to_minions), ... minion: Edge(to=(synthesizer,), mapper=minions_to_synth), ... synthesizer: Edge.terminal(), ... }, ... )

Multiple outgoing edges with mutually-exclusive conditions enable branch routing:

crew = AgentGroup( ... name="ticket_router", ... topology={ ... triage: ( ... Edge(to=(quick_replier,), condition=lambda o: o.severity == "low"), ... Edge(to=(escalator,), condition=lambda o: o.severity == "high"), ... ), ... quick_replier: Edge.terminal(), ... escalator: Edge.terminal(), ... }, ... )

name instance-attribute

name: str

Stable identifier — used as the registry key, the broker topic suffix for group-level events, and the agent_name field on :data:EventType.GROUP_* events.

topology class-attribute instance-attribute

topology: Mapping[Agent, EdgeOrEdges] = field(default_factory=dict)

The DAG. Keys are :class:Agent instances; each value is one :class:Edge (most common) or a tuple of edges for branch routing with :attr:Edge.condition predicates. Validated at construction — cycles, dangling refs, and missing entry / terminal nodes raise :class:TopologyError.

agents property

agents: tuple[Agent, ...]

Tuple of agents in topology declaration order.

outgoing_edges

outgoing_edges(src: Agent) -> tuple[Edge, ...]

Outgoing edges from src — always a tuple, even for single edges.

Source code in src/murmur/groups/spec.py
def outgoing_edges(self, src: Agent) -> tuple[Edge, ...]:
    """Outgoing edges from ``src`` — always a tuple, even for single edges."""
    return _normalize(self.topology[src])

entry_nodes

entry_nodes() -> tuple[Agent, ...]

Nodes with no incoming edges.

Source code in src/murmur/groups/spec.py
def entry_nodes(self) -> tuple[Agent, ...]:
    """Nodes with no incoming edges."""
    targets: set[Agent] = set()
    for src in self.topology:
        for edge in self.outgoing_edges(src):
            targets.update(edge.to)
    return tuple(a for a in self.topology if a not in targets)

terminal_nodes

terminal_nodes() -> tuple[Agent, ...]

Nodes with no outgoing edges (every outgoing edge has empty to).

Source code in src/murmur/groups/spec.py
def terminal_nodes(self) -> tuple[Agent, ...]:
    """Nodes with no outgoing edges (every outgoing edge has empty ``to``)."""
    result: list[Agent] = []
    for src in self.topology:
        edges = self.outgoing_edges(src)
        if all(not edge.to for edge in edges):
            result.append(src)
    return tuple(result)

topological_order

topological_order() -> tuple[Agent, ...]

Kahn's algorithm — order in which to walk the DAG.

Source code in src/murmur/groups/spec.py
def topological_order(self) -> tuple[Agent, ...]:
    """Kahn's algorithm — order in which to walk the DAG."""
    indeg: dict[Agent, int] = dict.fromkeys(self.topology, 0)
    for src in self.topology:
        for edge in self.outgoing_edges(src):
            for tgt in edge.to:
                indeg[tgt] = indeg.get(tgt, 0) + 1
    queue: list[Agent] = [a for a, d in indeg.items() if d == 0]
    order: list[Agent] = []
    while queue:
        node = queue.pop(0)
        order.append(node)
        for edge in self.outgoing_edges(node):
            for tgt in edge.to:
                indeg[tgt] -= 1
                if indeg[tgt] == 0:
                    queue.append(tgt)
    if len(order) != len(self.topology):  # pragma: no cover — caught by _has_cycle
        raise TopologyError(f"AgentGroup {self.name!r} topology has a cycle")
    return tuple(order)

topological_tiers

topological_tiers() -> tuple[tuple[Agent, ...], ...]

Topological order grouped into dependency tiers.

Each tier contains nodes whose dependencies are all in earlier tiers. Within one tier the nodes are pairwise independent and therefore safe to dispatch in parallel. Tier order across the result preserves the DAG's dependency direction — tier i+1 is only reachable once every node in tier i has produced a result. Topology declaration order is preserved within each tier so the contract is "results stored, terminal returned" rather than "node X ran before node Y".

Source code in src/murmur/groups/spec.py
def topological_tiers(self) -> tuple[tuple[Agent, ...], ...]:
    """Topological order grouped into dependency tiers.

    Each tier contains nodes whose dependencies are all in earlier
    tiers. Within one tier the nodes are pairwise independent and
    therefore safe to dispatch in parallel. Tier order across the
    result preserves the DAG's dependency direction — tier ``i+1``
    is only reachable once every node in tier ``i`` has produced
    a result. Topology declaration order is preserved within each
    tier so the contract is "results stored, terminal returned"
    rather than "node X ran before node Y".
    """
    # Stable lookup for declaration order — used to sort each tier's
    # ready set independently of the order in which earlier-tier
    # parents happened to release their downstreams (which depends on
    # iteration order over outgoing edges, not topology declaration).
    order_index: dict[Agent, int] = {a: i for i, a in enumerate(self.topology)}
    indeg: dict[Agent, int] = dict.fromkeys(self.topology, 0)
    for src in self.topology:
        for edge in self.outgoing_edges(src):
            for tgt in edge.to:
                indeg[tgt] = indeg.get(tgt, 0) + 1
    tiers: list[tuple[Agent, ...]] = []
    ready: list[Agent] = [a for a in self.topology if indeg[a] == 0]
    seen = 0
    while ready:
        tier = tuple(ready)
        tiers.append(tier)
        seen += len(tier)
        next_ready: list[Agent] = []
        for node in tier:
            for edge in self.outgoing_edges(node):
                for tgt in edge.to:
                    indeg[tgt] -= 1
                    if indeg[tgt] == 0:
                        next_ready.append(tgt)
        next_ready.sort(key=order_index.__getitem__)
        ready = next_ready
    if seen != len(self.topology):  # pragma: no cover — caught by _has_cycle
        raise TopologyError(f"AgentGroup {self.name!r} topology has a cycle")
    return tuple(tiers)

Edge

Edge dataclass

Edge(
    to: tuple[Agent, ...] = (),
    mapper: EdgeMapper | None = None,
    max_concurrency: int = 100,
    condition: EdgeCondition | None = None,
)

A frozen value object describing one outgoing connection.

to class-attribute instance-attribute

to: tuple[Agent, ...] = ()

Downstream agents. Empty tuple == terminal node.

mapper class-attribute instance-attribute

mapper: EdgeMapper | None = None

Optional (upstream_output) -> TaskSpec | list[TaskSpec] transform.

max_concurrency class-attribute instance-attribute

max_concurrency: int = 100

Width cap when this edge fans out.

condition class-attribute instance-attribute

condition: EdgeCondition | None = None

Optional predicate over the upstream's typed output.

None means "always fire". When set, the runner evaluates the callable with the upstream output (same value the mapper would receive) and only traverses to to agents when it returns truthy. Async callables are awaited.

terminal staticmethod

terminal() -> Edge

Convenience for terminal nodes — Edge(to=()).

Source code in src/murmur/groups/edge.py
@staticmethod
def terminal() -> Edge:
    """Convenience for terminal nodes — ``Edge(to=())``."""
    return Edge(to=())

EdgeMapper

EdgeMapper module-attribute

EdgeMapper = Callable[..., Any]

Return TaskSpec for single dispatch, list[TaskSpec] for fan-out.

FanOut

FanOut module-attribute

FanOut = Annotated[F, _FanOutMarker()]

Type annotation marking a Pydantic field as the fan-out target.

Use as FanOut[list[T]] on the field that holds the items the group runner should split over. The runner will spawn one downstream agent per item when no explicit mapper is set on the edge.

class DecompositionResult(BaseModel): ... sub_questions: FanOut[list[SubQuestion]] ... reasoning: str = ""

Constraints (enforced by :func:murmur.groups.get_fan_out_field):

  • The annotated type must be list[T]. Not tuple, not set.
  • Only one field per model may carry the marker.

get_fan_out_field

get_fan_out_field

get_fan_out_field(model: type[Any]) -> tuple[str, tuple[type[Any], ...]] | None

Return (field_name, item_types) for the model's fan-out field.

Returns None if the model has no :data:FanOut-annotated field. item_types is always a tuple — length 1 for FanOut[list[T]] and length N for FanOut[list[T1 | T2 | ... | TN]].

Raises :class:SpecValidationError if more than one field carries the marker, or if the annotated type is not list[T].

Source code in src/murmur/groups/_introspection.py
def get_fan_out_field(
    model: type[Any],
) -> tuple[str, tuple[type[Any], ...]] | None:
    """Return ``(field_name, item_types)`` for the model's fan-out field.

    Returns ``None`` if the model has no :data:`FanOut`-annotated field.
    ``item_types`` is always a tuple — length 1 for ``FanOut[list[T]]``
    and length N for ``FanOut[list[T1 | T2 | ... | TN]]``.

    Raises :class:`SpecValidationError` if more than one field carries
    the marker, or if the annotated type is not ``list[T]``.
    """
    hints = get_type_hints(model, include_extras=True)
    matches: list[tuple[str, tuple[type[Any], ...]]] = []
    for name, hint in hints.items():
        if get_origin(hint) is not Annotated:
            continue
        args = get_args(hint)
        if len(args) < 2:
            continue
        inner = args[0]
        metadata = args[1:]
        if not any(isinstance(m, _FanOutMarker) for m in metadata):
            continue
        if get_origin(inner) is not list:
            raise SpecValidationError(
                f"FanOut field {model.__name__}.{name} must wrap list[T]; got {inner!r}"
            )
        list_args = get_args(inner)
        if not list_args:
            raise SpecValidationError(
                f"FanOut field {model.__name__}.{name} must specify a list "
                f"item type, e.g. FanOut[list[SubQuestion]]"
            )
        matches.append((name, _flatten_union(list_args[0])))

    if not matches:
        return None
    if len(matches) > 1:
        names = ", ".join(n for n, _ in matches)
        raise SpecValidationError(
            f"{model.__name__} has multiple FanOut fields ({names}); "
            f"only one is allowed"
        )
    return matches[0]