Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ fn create_atomic_server_agent_role() -> Role {
llm_context_window: Some(16000),
llm_router_enabled: false,
llm_router_config: None,
haystacks: vec![
Haystack::new(
"http://localhost:9883".to_string(), // Atomic server URL
ServiceType::Atomic,
true, // read-only
)
.with_atomic_secret(Some("your-base64-secret-here".to_string())),
],
haystacks: vec![Haystack::new(
"http://localhost:9883".to_string(), // Atomic server URL
ServiceType::Atomic,
true, // read-only
)
.with_atomic_secret(Some("your-base64-secret-here".to_string()))],
extra: {
let mut extra = AHashMap::new();
// Multi-agent specific configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use terraphim_config::Role;
use terraphim_multi_agent::{
CommandInput, CommandType, MultiAgentResult, TerraphimAgent, test_utils::create_test_role,
test_utils::create_test_role, CommandInput, CommandType, MultiAgentResult, TerraphimAgent,
};
use terraphim_persistence::DeviceStorage;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ahash::AHashMap;
use std::sync::Arc;
use terraphim_config::Role;
use terraphim_multi_agent::{
CommandInput, CommandType, MultiAgentResult, TerraphimAgent, test_utils::create_test_role,
test_utils::create_test_role, CommandInput, CommandType, MultiAgentResult, TerraphimAgent,
};
use terraphim_persistence::DeviceStorage;
use terraphim_types::RelevanceFunction;
Expand Down
2 changes: 1 addition & 1 deletion crates/terraphim_multi_agent/examples/simple_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! without complex storage operations to avoid memory issues.

use terraphim_multi_agent::{
CommandInput, CommandType, TerraphimAgent, test_utils::create_test_role,
test_utils::create_test_role, CommandInput, CommandType, TerraphimAgent,
};
use terraphim_persistence::DeviceStorage;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! that leverage the generic LLM interface instead of OpenRouter-specific code.

use terraphim_multi_agent::{
ChatAgent, ChatConfig, SummarizationAgent, SummarizationConfig, SummaryStyle, test_utils,
test_utils, ChatAgent, ChatConfig, SummarizationAgent, SummarizationConfig, SummaryStyle,
};

#[tokio::main]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use ahash::AHashMap;
use terraphim_config::Role;
use terraphim_multi_agent::{
CommandInput, CommandType, MultiAgentResult, TerraphimAgent, test_utils::create_test_role,
test_utils::create_test_role, CommandInput, CommandType, MultiAgentResult, TerraphimAgent,
};
use terraphim_persistence::DeviceStorage;
use terraphim_types::RelevanceFunction;
Expand Down
147 changes: 134 additions & 13 deletions crates/terraphim_orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ pub struct AgentOrchestrator {
/// Per-agent last cron fire timestamp to prevent re-triggering within same schedule window.
/// Key: agent name. Value: timestamp of last fire.
last_cron_fire: HashMap<String, chrono::DateTime<chrono::Utc>>,
/// Last compound-review fire time, used to gate the compound schedule
/// independently of `last_tick_time`. Mirrors the `last_cron_fire`
/// pattern for per-agent crons. Without this cursor, if the
/// `reconcile_tick` future is cancelled mid-await by its 90 s
/// `tokio::time::timeout` safety wrapper, `last_tick_time` is never
/// advanced and the same compound-review occurrence re-fires on the
/// very next tick, producing a worktree storm (#1562).
last_compound_review_fired_at: Option<chrono::DateTime<chrono::Utc>>,
/// Lazy-initialised Gitea tracker for gitea-issue pre-check.
pre_check_tracker: Option<terraphim_tracker::GiteaTracker>,
/// Active flow executions keyed by flow name.
Expand Down Expand Up @@ -824,6 +832,7 @@ impl AgentOrchestrator {
circuit_breakers: Arc::new(Mutex::new(HashMap::new())),
last_run_commits: HashMap::new(),
last_cron_fire: HashMap::new(),
last_compound_review_fired_at: None,
pre_check_tracker: None,
active_flows: HashMap::new(),
mention_cursors: HashMap::new(),
Expand Down Expand Up @@ -7137,26 +7146,46 @@ Remove the pause flag once the underlying failure is resolved:\n\n\
if let Some(compound_sched) = self.scheduler.compound_review_schedule() {
debug!(
last_tick = %self.last_tick_time,
last_fired = ?self.last_compound_review_fired_at,
now = %now,
"checking compound review schedule"
);

// Get next fire times for debugging
let upcoming: Vec<_> = compound_sched.after(&self.last_tick_time).take(3).collect();
debug!(upcoming = ?upcoming, "compound schedule upcoming times");

let should_fire = compound_sched
// Compute the earliest occurrence strictly after
// `last_tick_time` that is also <= now. This is the same
// occurrence the buggy code would have refired forever when
// the reconcile-tick future was cancelled mid-await by the
// 90 s `tokio::time::timeout` safety wrapper (#1562).
let next_fire = compound_sched
.after(&self.last_tick_time)
.take_while(|t| *t <= now)
.next()
.is_some();

debug!(should_fire = should_fire, "compound review fire check");
.next();
debug!(next_fire = ?next_fire, "compound schedule next fire");

if let Some(fire_time) = next_fire {
// Gate against re-firing the same occurrence. The
// cursor `last_compound_review_fired_at` is the per-
// occurrence dedup key, mirroring `last_cron_fire` for
// per-agent crons. It is updated *before* the `.await`
// below so a cancelled future cannot lose the update.
let already_fired = self
.last_compound_review_fired_at
.map(|prev| fire_time <= prev)
.unwrap_or(false);

if should_fire {
info!("compound review schedule fired, starting review");
self.handle_schedule_event(ScheduleEvent::CompoundReview)
.await;
if !already_fired {
// Record fire time BEFORE awaiting
// `handle_schedule_event` so that future
// cancellation cannot lose the update and
// re-trigger the same occurrence on the next tick.
self.last_compound_review_fired_at = Some(fire_time);
info!(
fire_time = %fire_time,
"compound review schedule fired, starting review"
);
self.handle_schedule_event(ScheduleEvent::CompoundReview)
.await;
}
}
}
}
Expand Down Expand Up @@ -7713,6 +7742,19 @@ Remove the pause flag once the underlying failure is resolved:\n\n\
self.last_tick_time = time;
}

/// Test helper: read the compound-review fire cursor.
#[doc(hidden)]
pub fn last_compound_review_fired_at(&self) -> Option<chrono::DateTime<chrono::Utc>> {
self.last_compound_review_fired_at
}

/// Test helper: clear the compound-review fire cursor for synthetic
/// testing of the cancellation property.
#[doc(hidden)]
pub fn clear_last_compound_review_fired_at(&mut self) {
self.last_compound_review_fired_at = None;
}

/// Test helper: access the telemetry store for assertions.
#[doc(hidden)]
pub fn telemetry_store(&self) -> &control_plane::TelemetryStore {
Expand Down Expand Up @@ -8094,6 +8136,85 @@ mod tests {
assert_eq!(result.agents_failed, 0);
}

/// Regression test for #1562.
///
/// Property: when `check_cron_schedules` fires the compound review,
/// `last_compound_review_fired_at` advances **before** the `await`
/// on `handle_schedule_event`. Calling `check_cron_schedules` a
/// second time without advancing wall-clock time must NOT re-fire
/// the same occurrence; the cursor stays put.
///
/// This is the property that breaks if the cursor is dropped: the
/// 90 s `tokio::time::timeout` wrapping `reconcile_tick` cancels
/// the future mid-await, `last_tick_time` is never updated, and
/// the next tick re-evaluates the same cron occurrence as "should
/// fire", spawning a new worktree every tick (the bigbox storm).
#[tokio::test]
async fn test_compound_review_cursor_advances_on_cancellation() {
// Build a test config and override compound_review so that the
// workflow has no review groups -- it still creates a worktree
// on the workspace git repo, but no agent subprocesses are
// launched. This mirrors `test_orchestrator_compound_review_manual`.
let mut config = test_config();
let tmp_worktree = TempDir::new().expect("tempdir");
config.compound_review.worktree_root = tmp_worktree.path().to_path_buf();
// Schedule fires hourly so we can use a recent `last_tick_time`.
// 5-field cron: minute 0 of every hour.
config.compound_review.schedule = "0 * * * *".to_string();

let mut orch = AgentOrchestrator::new(config).expect("orchestrator");

// Replace the compound workflow with one that uses an empty
// group list so the cron-fire path is a no-op apart from the
// worktree creation/removal. The orchestrator's
// `repo_path`/`base_branch` are inherited from the test config.
let repo_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../..");
let swarm_config = crate::compound::SwarmConfig {
groups: vec![],
timeout: Duration::from_secs(60),
worktree_root: tmp_worktree.path().to_path_buf(),
repo_path,
base_branch: "main".to_string(),
max_concurrent_agents: 1,
create_prs: false,
};
orch.compound_workflow = crate::compound::CompoundReviewWorkflow::new(swarm_config);

// Plant `last_tick_time` 2 hours ago so at least one occurrence
// of `0 * * * *` lies in [last_tick_time, now]. Clear the new
// cursor so the first call has nothing to compare against.
let two_hours_ago = chrono::Utc::now() - chrono::Duration::hours(2);
orch.set_last_tick_time(two_hours_ago);
orch.clear_last_compound_review_fired_at();
assert!(
orch.last_compound_review_fired_at().is_none(),
"cursor should start empty",
);

// First call: should advance the cursor to a past fire time.
orch.check_cron_schedules().await;
let cursor_after_first = orch
.last_compound_review_fired_at()
.expect("cursor should be Some after first fire");
assert!(
cursor_after_first <= chrono::Utc::now(),
"cursor should be in the past, got {}",
cursor_after_first
);

// Second call without advancing wall-clock or `last_tick_time`:
// the cursor must NOT advance (the same occurrence is gated).
orch.check_cron_schedules().await;
let cursor_after_second = orch
.last_compound_review_fired_at()
.expect("cursor should still be Some");
assert_eq!(
cursor_after_first, cursor_after_second,
"cursor must not re-advance on a re-check without new occurrences \
(#1562 storm regression)",
);
}

#[test]
fn test_orchestrator_from_toml() {
let toml_str = r#"
Expand Down
Loading