From f1e6eceb9151bcf077dc5f7b0fe0715abf3f403c Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 17 May 2026 13:33:54 +0100 Subject: [PATCH 1/2] style(terraphim_multi_agent): apply cargo fmt to examples Pre-existing rustfmt drift in the example files blocks the workspace-wide `cargo fmt --check` step in the pre-commit hook. Re-formatting in a separate commit so the substantive Layer 0 cursor fix (Refs #1562) stays scoped to the orchestrator. Refs #1562 (Gitea) --- .../examples/enhanced_atomic_server_example.rs | 14 ++++++-------- .../examples/knowledge_graph_integration.rs | 2 +- .../examples/multi_agent_coordination.rs | 2 +- .../examples/simple_validation.rs | 2 +- .../examples/specialized_agents.rs | 2 +- .../examples/workflow_patterns_working.rs | 2 +- 6 files changed, 11 insertions(+), 13 deletions(-) diff --git a/crates/terraphim_multi_agent/examples/enhanced_atomic_server_example.rs b/crates/terraphim_multi_agent/examples/enhanced_atomic_server_example.rs index 4394da1d0..b47122f3e 100644 --- a/crates/terraphim_multi_agent/examples/enhanced_atomic_server_example.rs +++ b/crates/terraphim_multi_agent/examples/enhanced_atomic_server_example.rs @@ -31,14 +31,12 @@ fn create_atomic_server_agent_role() -> Role { llm_context_window: Some(16000), llm_router_enabled: false, llm_router_config: None, - haystacks: vec![ - Haystack::new( - "http://localhost:9883".to_string(), // Atomic server URL - ServiceType::Atomic, - true, // read-only - ) - .with_atomic_secret(Some("your-base64-secret-here".to_string())), - ], + haystacks: vec![Haystack::new( + "http://localhost:9883".to_string(), // Atomic server URL + ServiceType::Atomic, + true, // read-only + ) + .with_atomic_secret(Some("your-base64-secret-here".to_string()))], extra: { let mut extra = AHashMap::new(); // Multi-agent specific configuration diff --git a/crates/terraphim_multi_agent/examples/knowledge_graph_integration.rs b/crates/terraphim_multi_agent/examples/knowledge_graph_integration.rs index 081293557..e4717e80f 100644 --- a/crates/terraphim_multi_agent/examples/knowledge_graph_integration.rs +++ b/crates/terraphim_multi_agent/examples/knowledge_graph_integration.rs @@ -8,7 +8,7 @@ use terraphim_config::Role; use terraphim_multi_agent::{ - CommandInput, CommandType, MultiAgentResult, TerraphimAgent, test_utils::create_test_role, + test_utils::create_test_role, CommandInput, CommandType, MultiAgentResult, TerraphimAgent, }; use terraphim_persistence::DeviceStorage; diff --git a/crates/terraphim_multi_agent/examples/multi_agent_coordination.rs b/crates/terraphim_multi_agent/examples/multi_agent_coordination.rs index 674ed03a3..0c49b82a7 100644 --- a/crates/terraphim_multi_agent/examples/multi_agent_coordination.rs +++ b/crates/terraphim_multi_agent/examples/multi_agent_coordination.rs @@ -10,7 +10,7 @@ use ahash::AHashMap; use std::sync::Arc; use terraphim_config::Role; use terraphim_multi_agent::{ - CommandInput, CommandType, MultiAgentResult, TerraphimAgent, test_utils::create_test_role, + test_utils::create_test_role, CommandInput, CommandType, MultiAgentResult, TerraphimAgent, }; use terraphim_persistence::DeviceStorage; use terraphim_types::RelevanceFunction; diff --git a/crates/terraphim_multi_agent/examples/simple_validation.rs b/crates/terraphim_multi_agent/examples/simple_validation.rs index f2fd3bd29..03ce551b7 100644 --- a/crates/terraphim_multi_agent/examples/simple_validation.rs +++ b/crates/terraphim_multi_agent/examples/simple_validation.rs @@ -4,7 +4,7 @@ //! without complex storage operations to avoid memory issues. use terraphim_multi_agent::{ - CommandInput, CommandType, TerraphimAgent, test_utils::create_test_role, + test_utils::create_test_role, CommandInput, CommandType, TerraphimAgent, }; use terraphim_persistence::DeviceStorage; diff --git a/crates/terraphim_multi_agent/examples/specialized_agents.rs b/crates/terraphim_multi_agent/examples/specialized_agents.rs index 7a0acba7b..daf5b7e88 100644 --- a/crates/terraphim_multi_agent/examples/specialized_agents.rs +++ b/crates/terraphim_multi_agent/examples/specialized_agents.rs @@ -4,7 +4,7 @@ //! that leverage the generic LLM interface instead of OpenRouter-specific code. use terraphim_multi_agent::{ - ChatAgent, ChatConfig, SummarizationAgent, SummarizationConfig, SummaryStyle, test_utils, + test_utils, ChatAgent, ChatConfig, SummarizationAgent, SummarizationConfig, SummaryStyle, }; #[tokio::main] diff --git a/crates/terraphim_multi_agent/examples/workflow_patterns_working.rs b/crates/terraphim_multi_agent/examples/workflow_patterns_working.rs index a68e52b8d..9fd6a5fca 100644 --- a/crates/terraphim_multi_agent/examples/workflow_patterns_working.rs +++ b/crates/terraphim_multi_agent/examples/workflow_patterns_working.rs @@ -7,7 +7,7 @@ use ahash::AHashMap; use terraphim_config::Role; use terraphim_multi_agent::{ - CommandInput, CommandType, MultiAgentResult, TerraphimAgent, test_utils::create_test_role, + test_utils::create_test_role, CommandInput, CommandType, MultiAgentResult, TerraphimAgent, }; use terraphim_persistence::DeviceStorage; use terraphim_types::RelevanceFunction; From 94e07a70dd4fc3f90209b66eec86820d679b541c Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 17 May 2026 13:37:09 +0100 Subject: [PATCH 2/2] fix(terraphim_orchestrator): gate compound-review fires against per-occurrence cursor Adds `last_compound_review_fired_at: Option>` to `AgentOrchestrator` and rewrites the compound-review branch of `check_cron_schedules` so the cursor is recorded **before** the `.await` on `handle_schedule_event`. Mirrors the per-agent `last_cron_fire` pattern at the start of the same function. Why: `reconcile_tick` is wrapped in a 90 s `tokio::time::timeout` safety net. When the future is cancelled mid-await, `last_tick_time` is never updated, so the previous `should_fire` check kept returning true on every subsequent tick, spawning a fresh review worktree every 30 s (the bigbox storm). Recording the cursor synchronously before the await makes cancellation safe: the next iteration sees the cursor and short-circuits via `already_fired = fire_time <= prev`. Verified via new regression test `test_compound_review_cursor_advances_on_cancellation`: plants a `last_tick_time` 2 h in the past, runs `check_cron_schedules` twice without advancing wall-clock, and asserts the cursor is `Some(_)` after the first call and unchanged after the second. Note on design drift: the design doc (docs/design/adf-worktree-lifecycle-design.md section 4.1) used `take_while(|t| *t <= now)` plus an explicit `already_fired` gate. I initially collapsed both into `compound_sched.after(&cursor).next()` for brevity but reverted to the documented shape after the regression test exposed the catch-up vs gating semantic difference -- the design's `last_tick_time`-anchored `next_fire` plus separate cursor gate is the correct read of "same occurrence, do not re-fire". Line numbers in the design (`:241`, `:817`, `:7137-7161`, `:7712`) all matched current source unchanged. Layer 0 of the ADF worktree lifecycle epic (Gitea #1567). Out of scope: the per-agent cron path at lib.rs:7484, the WorktreeGuard refactor (Layer 1, Gitea #1569), startup sweep (Layer 2, #1570), and the adf-cleanup.sh hardening (Layer 3, #1571). Refs #1562 (Gitea) --- crates/terraphim_orchestrator/src/lib.rs | 147 +++++++++++++++++++++-- 1 file changed, 134 insertions(+), 13 deletions(-) diff --git a/crates/terraphim_orchestrator/src/lib.rs b/crates/terraphim_orchestrator/src/lib.rs index 216735c4e..68828108a 100644 --- a/crates/terraphim_orchestrator/src/lib.rs +++ b/crates/terraphim_orchestrator/src/lib.rs @@ -239,6 +239,14 @@ pub struct AgentOrchestrator { /// Per-agent last cron fire timestamp to prevent re-triggering within same schedule window. /// Key: agent name. Value: timestamp of last fire. last_cron_fire: HashMap>, + /// Last compound-review fire time, used to gate the compound schedule + /// independently of `last_tick_time`. Mirrors the `last_cron_fire` + /// pattern for per-agent crons. Without this cursor, if the + /// `reconcile_tick` future is cancelled mid-await by its 90 s + /// `tokio::time::timeout` safety wrapper, `last_tick_time` is never + /// advanced and the same compound-review occurrence re-fires on the + /// very next tick, producing a worktree storm (#1562). + last_compound_review_fired_at: Option>, /// Lazy-initialised Gitea tracker for gitea-issue pre-check. pre_check_tracker: Option, /// Active flow executions keyed by flow name. @@ -824,6 +832,7 @@ impl AgentOrchestrator { circuit_breakers: Arc::new(Mutex::new(HashMap::new())), last_run_commits: HashMap::new(), last_cron_fire: HashMap::new(), + last_compound_review_fired_at: None, pre_check_tracker: None, active_flows: HashMap::new(), mention_cursors: HashMap::new(), @@ -7137,26 +7146,46 @@ Remove the pause flag once the underlying failure is resolved:\n\n\ if let Some(compound_sched) = self.scheduler.compound_review_schedule() { debug!( last_tick = %self.last_tick_time, + last_fired = ?self.last_compound_review_fired_at, now = %now, "checking compound review schedule" ); - // Get next fire times for debugging - let upcoming: Vec<_> = compound_sched.after(&self.last_tick_time).take(3).collect(); - debug!(upcoming = ?upcoming, "compound schedule upcoming times"); - - let should_fire = compound_sched + // Compute the earliest occurrence strictly after + // `last_tick_time` that is also <= now. This is the same + // occurrence the buggy code would have refired forever when + // the reconcile-tick future was cancelled mid-await by the + // 90 s `tokio::time::timeout` safety wrapper (#1562). + let next_fire = compound_sched .after(&self.last_tick_time) .take_while(|t| *t <= now) - .next() - .is_some(); - - debug!(should_fire = should_fire, "compound review fire check"); + .next(); + debug!(next_fire = ?next_fire, "compound schedule next fire"); + + if let Some(fire_time) = next_fire { + // Gate against re-firing the same occurrence. The + // cursor `last_compound_review_fired_at` is the per- + // occurrence dedup key, mirroring `last_cron_fire` for + // per-agent crons. It is updated *before* the `.await` + // below so a cancelled future cannot lose the update. + let already_fired = self + .last_compound_review_fired_at + .map(|prev| fire_time <= prev) + .unwrap_or(false); - if should_fire { - info!("compound review schedule fired, starting review"); - self.handle_schedule_event(ScheduleEvent::CompoundReview) - .await; + if !already_fired { + // Record fire time BEFORE awaiting + // `handle_schedule_event` so that future + // cancellation cannot lose the update and + // re-trigger the same occurrence on the next tick. + self.last_compound_review_fired_at = Some(fire_time); + info!( + fire_time = %fire_time, + "compound review schedule fired, starting review" + ); + self.handle_schedule_event(ScheduleEvent::CompoundReview) + .await; + } } } } @@ -7713,6 +7742,19 @@ Remove the pause flag once the underlying failure is resolved:\n\n\ self.last_tick_time = time; } + /// Test helper: read the compound-review fire cursor. + #[doc(hidden)] + pub fn last_compound_review_fired_at(&self) -> Option> { + self.last_compound_review_fired_at + } + + /// Test helper: clear the compound-review fire cursor for synthetic + /// testing of the cancellation property. + #[doc(hidden)] + pub fn clear_last_compound_review_fired_at(&mut self) { + self.last_compound_review_fired_at = None; + } + /// Test helper: access the telemetry store for assertions. #[doc(hidden)] pub fn telemetry_store(&self) -> &control_plane::TelemetryStore { @@ -8094,6 +8136,85 @@ mod tests { assert_eq!(result.agents_failed, 0); } + /// Regression test for #1562. + /// + /// Property: when `check_cron_schedules` fires the compound review, + /// `last_compound_review_fired_at` advances **before** the `await` + /// on `handle_schedule_event`. Calling `check_cron_schedules` a + /// second time without advancing wall-clock time must NOT re-fire + /// the same occurrence; the cursor stays put. + /// + /// This is the property that breaks if the cursor is dropped: the + /// 90 s `tokio::time::timeout` wrapping `reconcile_tick` cancels + /// the future mid-await, `last_tick_time` is never updated, and + /// the next tick re-evaluates the same cron occurrence as "should + /// fire", spawning a new worktree every tick (the bigbox storm). + #[tokio::test] + async fn test_compound_review_cursor_advances_on_cancellation() { + // Build a test config and override compound_review so that the + // workflow has no review groups -- it still creates a worktree + // on the workspace git repo, but no agent subprocesses are + // launched. This mirrors `test_orchestrator_compound_review_manual`. + let mut config = test_config(); + let tmp_worktree = TempDir::new().expect("tempdir"); + config.compound_review.worktree_root = tmp_worktree.path().to_path_buf(); + // Schedule fires hourly so we can use a recent `last_tick_time`. + // 5-field cron: minute 0 of every hour. + config.compound_review.schedule = "0 * * * *".to_string(); + + let mut orch = AgentOrchestrator::new(config).expect("orchestrator"); + + // Replace the compound workflow with one that uses an empty + // group list so the cron-fire path is a no-op apart from the + // worktree creation/removal. The orchestrator's + // `repo_path`/`base_branch` are inherited from the test config. + let repo_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../.."); + let swarm_config = crate::compound::SwarmConfig { + groups: vec![], + timeout: Duration::from_secs(60), + worktree_root: tmp_worktree.path().to_path_buf(), + repo_path, + base_branch: "main".to_string(), + max_concurrent_agents: 1, + create_prs: false, + }; + orch.compound_workflow = crate::compound::CompoundReviewWorkflow::new(swarm_config); + + // Plant `last_tick_time` 2 hours ago so at least one occurrence + // of `0 * * * *` lies in [last_tick_time, now]. Clear the new + // cursor so the first call has nothing to compare against. + let two_hours_ago = chrono::Utc::now() - chrono::Duration::hours(2); + orch.set_last_tick_time(two_hours_ago); + orch.clear_last_compound_review_fired_at(); + assert!( + orch.last_compound_review_fired_at().is_none(), + "cursor should start empty", + ); + + // First call: should advance the cursor to a past fire time. + orch.check_cron_schedules().await; + let cursor_after_first = orch + .last_compound_review_fired_at() + .expect("cursor should be Some after first fire"); + assert!( + cursor_after_first <= chrono::Utc::now(), + "cursor should be in the past, got {}", + cursor_after_first + ); + + // Second call without advancing wall-clock or `last_tick_time`: + // the cursor must NOT advance (the same occurrence is gated). + orch.check_cron_schedules().await; + let cursor_after_second = orch + .last_compound_review_fired_at() + .expect("cursor should still be Some"); + assert_eq!( + cursor_after_first, cursor_after_second, + "cursor must not re-advance on a re-check without new occurrences \ + (#1562 storm regression)", + ); + } + #[test] fn test_orchestrator_from_toml() { let toml_str = r#"