Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
432e2ea
chore: ignore agent planning artifacts under docs/superpowers
andreahlert May 22, 2026
fafb734
feat: add durable execution primitives (suspend signal, records)
andreahlert May 22, 2026
e25aed8
feat: accept 'suspended' as a persisted run status
andreahlert May 22, 2026
d5c124e
feat: export durable execution primitives from burr.core
andreahlert May 22, 2026
8e6b356
fix: align persistence status docstrings with the suspended literal
andreahlert May 22, 2026
373bd1c
feat: add optional durable-storage persister methods with in-state fa…
andreahlert May 22, 2026
3716cdb
fix: clarify durable persister contracts and strengthen tests
andreahlert May 22, 2026
23f3906
fix: test durable codec and correct load_suspension docstring
andreahlert May 22, 2026
bcc25cd
test: cover suspension resolution, journal ordering, and precise load…
andreahlert May 22, 2026
a8c8e66
feat: add ApplicationContext.suspend() and durable runtime fields
andreahlert May 22, 2026
624c749
fix: clarify suspend docstring and unify context factory forwarding
andreahlert May 22, 2026
07b0899
feat: catch suspend signal in the sync run loop and persist suspension
andreahlert May 22, 2026
5e0a2ab
feat: catch suspend signal in the async run loop
andreahlert May 22, 2026
59bd3fd
fix: route handler failures during suspension to the error path
andreahlert May 22, 2026
2b76e0d
feat: add resume() helper for durable execution
andreahlert May 22, 2026
e4984d0
feat: add aresume() helper for async durable execution
andreahlert May 22, 2026
517c763
fix: wrap resolved-suspension state in State and test in-state fallback
andreahlert May 22, 2026
96f09d4
fix: persist suspension once via PersisterHook in in-state fallback
andreahlert May 22, 2026
376aa71
fix: scope resume idempotency to durable persisters, warn on missing …
andreahlert May 22, 2026
3a6b60e
fix: route async non-durable persisters through in-state fallback in …
andreahlert May 22, 2026
168337a
fix: aresume raises clear error for async non-durable persisters
andreahlert May 22, 2026
814688f
fix: aresume rejects all async persisters and avoids double-wrapping …
andreahlert May 22, 2026
201cacd
refactor: drop redundant else-pass branches in resume helpers
andreahlert May 22, 2026
dc4bd4c
feat: add ApplicationContext.durable() first-run execution and journa…
andreahlert May 22, 2026
f91a2d6
test: cover durable() journal replay
andreahlert May 22, 2026
fdff7c0
test: cover DeterminismError on durable() key mismatch
andreahlert May 22, 2026
bbfb9a5
refactor: consolidate durable() imports and cover kwargs forwarding
andreahlert May 22, 2026
4674524
feat: flush in-state journal on action completion for fallback persis…
andreahlert May 22, 2026
5fac592
fix: reset journal sink before direct step calls in streaming result …
andreahlert May 22, 2026
ed28aee
test: regression guard for journal double-count via stream_result
andreahlert May 22, 2026
ed3e03e
feat: add ApplicationContext.adurable() for coroutine sub-steps
andreahlert May 22, 2026
b8a526f
test: verify durable side effects run exactly once across suspend/resume
andreahlert May 22, 2026
099835f
test: non-deterministic branch around durable() raises DeterminismError
andreahlert May 22, 2026
ae5065b
test: cover adurable journaling into a durable persister and isolate …
andreahlert May 22, 2026
7a096dd
feat: SQLite persister durable storage (suspensions + journal tables)
andreahlert May 22, 2026
e9b473a
test: migrate stale durable tests off SQLitePersister to NonDurablePe…
andreahlert May 22, 2026
2bc9be3
fix: uniform bool return contract for mark_suspension_resolved
andreahlert May 22, 2026
a1fafc7
fix: align AsyncBaseStatePersister.mark_suspension_resolved with bool…
andreahlert May 22, 2026
0629507
test: SQLite suspend/resume survives a process boundary
andreahlert May 22, 2026
9b555c7
feat: PostgreSQL persister durable storage
andreahlert May 22, 2026
43897e7
test: env-configurable pg fixture and shim inheritance test
andreahlert May 22, 2026
da4b4a7
refactor: tighten type hints and imports on psycopg2 durable methods
andreahlert May 22, 2026
f6756b9
feat: asyncpg persister durable storage
andreahlert May 22, 2026
eae5d9a
feat: aiosqlite persister durable storage
andreahlert May 22, 2026
f1cd0ae
feat: redis persister durable storage (sync and async)
andreahlert May 22, 2026
8552798
feat: pymongo persister durable storage
andreahlert May 22, 2026
87d9844
test: drop mongo test db on fixture setup to absorb stale state
andreahlert May 22, 2026
0b55e6c
feat: async _ahandle_suspension wired into _astep and durable AsyncIn…
andreahlert May 22, 2026
3081896
feat: aresume() now supports async persisters end-to-end
andreahlert May 22, 2026
8a6177a
docs: correct AsyncInMemoryPersister docstring (Sync -> Async)
andreahlert May 22, 2026
7c00c1a
feat: add post_action_suspend and pre_action_resume lifecycle hooks
andreahlert May 22, 2026
5374eae
feat: fire suspend/resume lifecycle hooks from the run loop
andreahlert May 22, 2026
a595d3d
refactor: type and document hooks parameter on resume/aresume
andreahlert May 22, 2026
8a98f21
feat: tracking client records suspended runs for the UI
andreahlert May 22, 2026
094934b
docs: add human-in-the-loop durable execution example
andreahlert May 22, 2026
5e2f0f0
docs: document durable execution (suspend, durable, resume)
andreahlert May 22, 2026
ececa4b
chore: remove stale M5 milestone comment
andreahlert May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,6 @@ burr/tracking/server/build
examples/*/statemachine
examples/*/*/statemachine
.vscode

# Agent-authored planning artifacts (never commit)
docs/superpowers/
7 changes: 7 additions & 0 deletions burr/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
ApplicationContext,
ApplicationGraph,
)
from burr.core.durable import DeterminismError, JournalEntry, SuspensionRecord
from burr.core.graph import Graph, GraphBuilder
from burr.core.resume import aresume, resume
from burr.core.state import State

__all__ = [
Expand All @@ -32,11 +34,16 @@
"ApplicationBuilder",
"ApplicationGraph",
"ApplicationContext",
"aresume",
"Condition",
"default",
"DeterminismError",
"expr",
"JournalEntry",
"Result",
"resume",
"State",
"SuspensionRecord",
"when",
"Graph",
"GraphBuilder",
Expand Down
353 changes: 330 additions & 23 deletions burr/core/application.py

Large diffs are not rendered by default.

154 changes: 154 additions & 0 deletions burr/core/durable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Primitives for durable execution: the suspend control-flow signal, the
determinism error, and the records persisted to support resume."""

import dataclasses
from typing import Any, Dict, Optional


class _Suspended(BaseException):
"""Internal control-flow signal raised by ``ApplicationContext.suspend()``.

Subclasses ``BaseException`` (not ``Exception``) on purpose: a user
``try/except Exception`` wrapping an LLM/IO call inside an action must NOT
swallow it. The run loop catches it explicitly. It is never an error and is
never logged or persisted as a failure.
"""

def __init__(
self,
channel: str,
schema_json: Optional[dict] = None,
metadata: Optional[dict] = None,
):
self.channel = channel
self.schema_json = schema_json
self.metadata = metadata
super().__init__(f"Execution suspended on channel '{channel}'")


class DeterminismError(Exception):
"""Raised on resume when ``ctx.durable()`` calls do not replay in the same
order, or with the same keys, as the recorded journal. This converts a
silent footgun (lost re-execution or stale cache) into a loud failure."""


@dataclasses.dataclass
class SuspensionRecord:
"""Everything needed to resume a suspended run. Persisted when an action
calls ``suspend()``. ``metadata``, ``inputs`` and ``state`` are serialized
through ``burr.core.serde``."""

suspension_id: str
partition_key: Optional[str]
app_id: str
sequence_id: int
position: str # name of the suspended action
channel: str
schema_json: Optional[dict]
metadata: Optional[dict]
inputs: Dict[str, Any]
state: Dict[str, Any] # entry state of the suspended action
created_at: str
resolved: bool = False


@dataclasses.dataclass
class JournalEntry:
"""One memoized ``ctx.durable()`` sub-step. ``result`` is serialized through
``burr.core.serde``.

.. note::
The in-state codec (``write_journal_into_state`` / ``read_journal_from_state``)
serializes entries via ``dataclasses.asdict``, which recurses into nested
dataclasses and converts them to plain dicts. Those dicts are NOT
reconstructed back into their original types on read. Callers must keep
``result`` (and any nested fields) to plain JSON-friendly types, or accept
that nested dataclasses come back as plain dicts after a round-trip through
the in-state codec.
"""

partition_key: Optional[str]
app_id: str
sequence_id: int
step_key: str
call_index: int
result: Any


def supports_durable_storage(persister) -> bool:
"""True if the persister overrides the durable-storage methods. When False,
the Application stores suspensions and journal entries inside the State.

.. note::
All-or-nothing override contract: a persister is considered to support
durable storage only when it overrides ALL five durable-storage methods
(``save_suspension``, ``load_suspension``, ``save_journal_entry``,
``load_journal``, ``mark_suspension_resolved``). Detection is based solely
on ``save_suspension``; partial overrides are not detected and will raise
``NotImplementedError`` at call time.
"""
from burr.core.persistence import (
AsyncBaseStatePersister,
BaseStatePersister,
)

base = AsyncBaseStatePersister if persister.is_async() else BaseStatePersister
return type(persister).save_suspension is not base.save_suspension


# --- In-state fallback codec --------------------------------------------------
# When the persister has no dedicated storage, suspensions and journal entries
# ride inside a reserved State namespace, which the existing PersisterHook saves.

DURABLE_STATE_KEY = "__burr_durable__"


def write_suspension_into_state(state, record: "SuspensionRecord"):
"""Return a new State with the suspension record embedded."""
bucket = dict(state.get(DURABLE_STATE_KEY, {}) or {})
bucket["suspension"] = dataclasses.asdict(record)
return state.update(**{DURABLE_STATE_KEY: bucket})


def read_suspension_from_state(state, channel: str) -> "Optional[SuspensionRecord]":
bucket = state.get(DURABLE_STATE_KEY, {}) or {}
raw = bucket.get("suspension")
if raw is None or raw.get("channel") != channel:
return None
return SuspensionRecord(**raw)


def write_journal_into_state(state, entries: "list"):
"""Return a new State with the journal entries embedded.

.. warning::
Serializes via ``dataclasses.asdict``, which recursively converts nested
dataclasses to plain dicts. They are NOT reconstructed to their original
types when read back via ``read_journal_from_state``. Keep ``JournalEntry.result``
and any nested fields as plain JSON-friendly types to avoid type loss.
"""
bucket = dict(state.get(DURABLE_STATE_KEY, {}) or {})
bucket["journal"] = [dataclasses.asdict(e) for e in entries]
return state.update(**{DURABLE_STATE_KEY: bucket})


def read_journal_from_state(state) -> "list":
bucket = state.get(DURABLE_STATE_KEY, {}) or {}
return [JournalEntry(**raw) for raw in bucket.get("journal", [])]
Loading
Loading