Skip to content

Runs

Long-lived run handles for the submit / poll / stream pattern used by the HTTP server.

from murmur.runs import (
    InMemoryRunStore,
    RedisRunStore,
    RocksDBRunStore,
    RunEvent,
    RunEventType,
    RunProgress,
    RunState,
    RunStatus,
    RunStore,
    SQLiteRunStore,
)

The persistent concretes (SQLite, RocksDB, Redis) are lazy-loaded — import murmur.runs works without any of the optional extras installed. They show up only when accessed.

RunStore Protocol

RunStore

Pluggable persistence for in-flight runs.

The MVP ships an in-memory implementation; Redis / DB land later. Implementations must be safe to call concurrently from many tasks.

create async

create(run_id: str, target: str) -> None

Register a new run. target is the agent or group name.

Source code in src/murmur/runs/__init__.py
async def create(self, run_id: str, target: str) -> None:
    """Register a new run. ``target`` is the agent or group name."""
    ...

stream

stream(run_id: str) -> AsyncIterator[RunEvent]

Subscribe to run_id's event stream (SSE-friendly).

Source code in src/murmur/runs/__init__.py
def stream(self, run_id: str) -> AsyncIterator[RunEvent]:
    """Subscribe to ``run_id``'s event stream (SSE-friendly)."""
    ...

Value types

RunState

RunState

Coarse-grained lifecycle state for a submitted run.

RunStatus

RunStatus

A run's current state + progress snapshot.

run_id instance-attribute
run_id: str

The id returned by POST /submit. Echoed back here so this status object stands alone.

state instance-attribute
state: RunState

Coarse-grained lifecycle state. See :class:RunState.

progress class-attribute instance-attribute
progress: RunProgress | None = None

Counter snapshot. None until the runner reports the first update.

RunProgress

RunProgress

Per-step counters reported during a run.

total class-attribute instance-attribute
total: int = 0

Total number of agent invocations expected for the run. 0 until the runner knows the count (e.g. after fan-out resolution).

completed class-attribute instance-attribute
completed: int = 0

Successfully completed invocations so far.

failed class-attribute instance-attribute
failed: int = 0

Failed invocations so far. A run can finish in :attr:RunState.COMPLETED with non-zero failed if the topology tolerates partial failure.

running class-attribute instance-attribute
running: int = 0

Currently in-flight invocations.

RunEvent

RunEvent

Stream event published over SSE for GET /runs/{run_id}/stream.

type instance-attribute

Discriminator. See :class:RunEventType for per-type payload contracts.

run_id instance-attribute
run_id: str

Run id this event belongs to. Always present so events stand alone when serialised.

agent class-attribute instance-attribute
agent: str | None = None

The :class:Agent name involved, when relevant. None for run-level events (GROUP_COMPLETED, RUN_CANCELLED).

task_id class-attribute instance-attribute
task_id: str | None = None

The :class:TaskSpec id, when the event is about one task in particular. None for run-level events.

error class-attribute instance-attribute
error: str | None = None

Stringified error message — populated only on :data:AGENT_FAILED.

timestamp class-attribute instance-attribute
timestamp: datetime = Field(default_factory=lambda: now(tz=UTC))

UTC time of emission.

RunEventType

RunEventType

Discriminator for :class:RunEvent instances on the SSE stream.

AGENT_STARTED class-attribute instance-attribute
AGENT_STARTED = 'agent_started'

An agent invocation began. Payload: agent, task_id.

AGENT_COMPLETED class-attribute instance-attribute
AGENT_COMPLETED = 'agent_completed'

An agent invocation finished successfully. Payload: agent, task_id.

AGENT_FAILED class-attribute instance-attribute
AGENT_FAILED = 'agent_failed'

An agent invocation raised. Payload: agent, task_id, error.

GROUP_COMPLETED class-attribute instance-attribute
GROUP_COMPLETED = 'group_completed'

The whole :class:AgentGroup run finished — terminal.

RUN_CANCELLED class-attribute instance-attribute
RUN_CANCELLED = 'run_cancelled'

The run was cancelled by a client request — terminal.

Concretes

InMemoryRunStore

InMemoryRunStore

InMemoryRunStore()

Per-process :class:RunStore. Suitable for single-host deployments.

Source code in src/murmur/runs/__init__.py
def __init__(self) -> None:
    self._records: dict[str, _RunRecord] = {}

SQLiteRunStore

Requires pip install "murmur-runtime[sqlite]".

SQLiteRunStore

SQLiteRunStore(path: str | Path)

aiosqlite-backed :class:murmur.runs.RunStore.

store = SQLiteRunStore("runs.db") await store.create("abc", target="researcher")

Source code in src/murmur/runs/sqlite.py
def __init__(self, path: str | Path) -> None:
    self._path: str = str(path)
    self._db: aiosqlite.Connection | None = None
    self._init_lock: asyncio.Lock = asyncio.Lock()
    self._listeners: dict[str, list[asyncio.Event]] = {}
    self._write_locks: dict[str, asyncio.Lock] = {}
close async
close() -> None

Close the underlying connection. Idempotent.

Source code in src/murmur/runs/sqlite.py
async def close(self) -> None:
    """Close the underlying connection. Idempotent."""
    if self._db is not None:
        await self._db.close()
        self._db = None

RocksDBRunStore

Requires pip install "murmur-runtime[rocksdb]".

RocksDBRunStore

RocksDBRunStore(path: str | Path)

rocksdict-backed :class:murmur.runs.RunStore.

store = RocksDBRunStore("./runs.db") await store.create("abc", target="researcher")

Source code in src/murmur/runs/rocksdb.py
def __init__(self, path: str | Path) -> None:
    self._path: str = str(path)
    self._db: Rdict | None = None
    self._init_lock: asyncio.Lock = asyncio.Lock()
    self._listeners: dict[str, list[asyncio.Event]] = {}
    self._write_locks: dict[str, asyncio.Lock] = {}
    self._seq_counters: dict[str, int] = {}
close async
close() -> None

Flush + close the underlying RocksDB. Idempotent.

Source code in src/murmur/runs/rocksdb.py
async def close(self) -> None:
    """Flush + close the underlying RocksDB. Idempotent."""
    if self._db is None:
        return
    db = self._db
    self._db = None
    await asyncio.to_thread(db.close)

RedisRunStore

Requires pip install "murmur-runtime[redis-runstore]".

RedisRunStore

RedisRunStore(
    url: str | None = None,
    *,
    client: Redis | None = None,
    key_prefix: str = "murmur:runs",
    ttl_seconds: int = _DEFAULT_TTL_SECONDS,
)

Redis-backed :class:murmur.runs.RunStore.

Either pass url (e.g. redis://localhost:6379/0) and a client is created via :func:redis.asyncio.from_url, or pass client= with an existing redis.asyncio.Redis instance — useful for pre-configured connection pools and for the test seam (FakeRedis backed by a shared FakeServer exercises cross-instance behaviour without Docker).

store = RedisRunStore(url="redis://localhost:6379/0") await store.create("abc", target="researcher")

Source code in src/murmur/runs/redis.py
def __init__(
    self,
    url: str | None = None,
    *,
    client: Redis | None = None,
    key_prefix: str = "murmur:runs",
    ttl_seconds: int = _DEFAULT_TTL_SECONDS,
) -> None:
    if (url is None) == (client is None):
        raise ValueError("pass exactly one of `url=` or `client=`")
    self._client: Redis = client if client is not None else from_url(url or "")
    self._owns_client: bool = client is None
    self._key_prefix: str = key_prefix.rstrip(":")
    self._ttl_seconds: int = ttl_seconds
    self._set_state_script = self._client.register_script(_SET_STATE_LUA)
close async
close() -> None

Disconnect — only closes the client we constructed ourselves.

Source code in src/murmur/runs/redis.py
async def close(self) -> None:
    """Disconnect — only closes the client we constructed ourselves."""
    if self._owns_client:
        await self._client.aclose()