Skip to content

Middleware

Pipeline stages wrapping AgentRuntime.run per agent call. Each implements the Stage Protocol — __call__(context, next_stage) — and can be composed via RuntimeOptions.

from murmur.middleware import (
    DepthLimitMiddleware,
    RetryMiddleware,
    TimeoutMiddleware,
)
from murmur.middleware.cost_tracking import CostTrackingMiddleware, TokenBudget

RetryMiddleware

RetryMiddleware

RetryMiddleware(max_attempts: int = 3, backoff_factor: float = 1.5)

Retry on :class:SpawnError with multiplicative backoff.

Source code in src/murmur/middleware/retry.py
def __init__(self, max_attempts: int = 3, backoff_factor: float = 1.5) -> None:
    if max_attempts < 1:
        raise ValueError("max_attempts must be >= 1")
    if backoff_factor <= 0:
        raise ValueError("backoff_factor must be > 0")
    self._max_attempts = max_attempts
    self._backoff_factor = backoff_factor

TimeoutMiddleware

TimeoutMiddleware

TimeoutMiddleware(seconds: float)

Cancel the downstream stage if it takes longer than seconds.

Source code in src/murmur/middleware/timeout.py
def __init__(self, seconds: float) -> None:
    if seconds <= 0:
        raise ValueError("timeout seconds must be > 0")
    self._seconds = seconds

DepthLimitMiddleware

DepthLimitMiddleware

DepthLimitMiddleware(max_depth: int = 4)

Reject runs whose agent_context.depth exceeds max_depth.

Source code in src/murmur/middleware/depth_limit.py
def __init__(self, max_depth: int = 4) -> None:
    if max_depth < 1:
        raise ValueError("max_depth must be >= 1")
    self._max_depth = max_depth

CostTrackingMiddleware

Pre-check + post-charge token enforcement. See Cost tracking for semantics.

CostTrackingMiddleware

CostTrackingMiddleware(
    budget: TokenBudget, *, event_emitter: EventEmitter | None = None
)

Pipeline :class:Stage that gates and charges a :class:TokenBudget.

Built fresh per spawn by :meth:AgentRuntime.run — the per-spawn instance closes over the runtime's :class:EventEmitter so a BUDGET_EXCEEDED emission flows through the same sink as every other runtime event.

Source code in src/murmur/middleware/cost_tracking.py
def __init__(
    self,
    budget: TokenBudget,
    *,
    event_emitter: EventEmitter | None = None,
) -> None:
    self._budget = budget
    self._emitter = event_emitter

TokenBudget

TokenBudget

TokenBudget(limit: int)

Mutable token-cost ceiling.

Construct with a positive limit. Wire via :class:murmur.RuntimeOptions(token_budget=...). The runtime's :class:CostTrackingMiddleware decrements remaining after each agent run; once it hits 0 or below, subsequent runs raise :class:BudgetExceededError before dispatch.

Concurrency: an :class:asyncio.Lock guards :meth:consume against same-loop interleavings. Cross-process workers sharing one publisher- side budget will race; budget is a soft cap in distributed mode.

Source code in src/murmur/middleware/cost_tracking.py
def __init__(self, limit: int) -> None:
    if limit < 1:
        raise ValueError("limit must be >= 1")
    self._limit = limit
    self._remaining = limit
    self._lock = asyncio.Lock()

remaining property

remaining: int

Tokens remaining. Negative when the last run over-spent.

used property

used: int

limit - remaining. Reads cleanly even when remaining is negative.

consume async

consume(n: int) -> None

Decrement remaining by n. n=0 is a cheap no-op.

Source code in src/murmur/middleware/cost_tracking.py
async def consume(self, n: int) -> None:
    """Decrement ``remaining`` by ``n``. ``n=0`` is a cheap no-op."""
    if n <= 0:
        return
    async with self._lock:
        self._remaining -= n

reset

reset() -> None

Restore remaining to limit. Useful in tests + for callers running periodic windows (e.g. per-minute budgets).

Source code in src/murmur/middleware/cost_tracking.py
def reset(self) -> None:
    """Restore ``remaining`` to ``limit``. Useful in tests + for callers
    running periodic windows (e.g. per-minute budgets)."""
    self._remaining = self._limit