diff --git a/crates/terraphim_orchestrator/Cargo.toml b/crates/terraphim_orchestrator/Cargo.toml index e21529155..37bb7f317 100644 --- a/crates/terraphim_orchestrator/Cargo.toml +++ b/crates/terraphim_orchestrator/Cargo.toml @@ -11,6 +11,11 @@ repository = "https://github.com/terraphim/terraphim-ai" default = ["quickwit"] quickwit = ["dep:reqwest", "dep:reqwest-middleware", "dep:reqwest-retry"] evolution = ["dep:terraphim_agent_evolution"] +# Expose `scope::test_support` to integration tests under `tests/`. +# The dependency on `tempfile` is also pulled in by `[dev-dependencies]`, +# so this feature is effectively a no-op for non-test consumers, but +# the cfg gate keeps the helper out of the production tree. +test-helpers = ["dep:tempfile"] [dependencies] # Terraphim internal crates @@ -76,11 +81,26 @@ reqwest-middleware = { workspace = true, optional = true } reqwest-retry = { workspace = true, optional = true } ulid = "1.2.1" +# Optional: enabled by the `test-helpers` feature so that the +# `scope::test_support` module can be shared with integration tests. +tempfile = { workspace = true, optional = true } + [dev-dependencies] tokio-test = "0.4" tempfile = { workspace = true } serial_test = "3" +# Layer 1 cancellation property test (epic #1567 / issue #1569). +# Requires the `test-helpers` feature to access +# `scope::test_support::setup_git_repo`. Declared explicitly so +# `cargo test --all-features` picks it up; the source file is also +# `#![cfg(all(unix, feature = "test-helpers"))]` for defensive +# double-gating. +[[test]] +name = "compound_cancellation_test" +path = "tests/compound_cancellation_test.rs" +required-features = ["test-helpers"] + [[bin]] name = "adf" diff --git a/crates/terraphim_orchestrator/src/compound.rs b/crates/terraphim_orchestrator/src/compound.rs index c90e59041..f51c165cd 100644 --- a/crates/terraphim_orchestrator/src/compound.rs +++ b/crates/terraphim_orchestrator/src/compound.rs @@ -1,7 +1,7 @@ use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; -use tokio::sync::mpsc; +use tokio::task::JoinSet; use tracing::{debug, info, warn}; use uuid::Uuid; @@ -9,7 +9,7 @@ use terraphim_types::{FindingCategory, FindingSeverity, ReviewAgentOutput, Revie use crate::config::CompoundReviewConfig; use crate::error::OrchestratorError; -use crate::scope::WorktreeManager; +use crate::scope::{WorktreeManager, WORKTREE_REVIEW_PREFIX}; // Embed prompt templates at compile time to avoid CWD-dependent file loading. // The ADF binary may run from /opt/ai-dark-factory/ but templates live in the @@ -293,54 +293,77 @@ impl CompoundReviewWorkflow { "filtered review groups" ); - // Create worktree for this review - let worktree_name = format!("review-{}", correlation_id); - let worktree_path = self + // Create worktree for this review. + // + // Drop-ordering invariant (epic #1567, Layer 1, issue #1569): + // + // `guard` MUST be declared BEFORE `tasks`. Locals drop in + // reverse declaration order, so: + // 1. `tasks: JoinSet` drops FIRST, aborting every spawned + // agent task. Because `Command::kill_on_drop(true)` is + // set in `run_single_agent`, each task's Drop kills its + // child subprocess synchronously (SIGKILL via the + // `tokio::process::Child` Drop wired through the kill- + // on-drop bit). + // 2. `guard: WorktreeGuard` drops LAST, running + // `git -C worktree remove --force ` (with + // a filesystem fallback). The git admin entry at + // `/.git/worktrees/` is reconciled along + // with the directory. + // + // Inverting this order recreates the worktree storm race: + // the guard would remove the worktree while subprocesses + // still hold open file handles into it, then JoinSet abort + // would orphan those subprocesses against a torn-down git + // admin registry. The cancellation property test in + // `tests/compound_cancellation_test.rs` encodes this + // invariant; do NOT reorder these two locals. + let worktree_name = format!("{}{}", WORKTREE_REVIEW_PREFIX, correlation_id); + let guard = self .worktree_manager .create_worktree(&worktree_name, git_ref) .await .map_err(|e| { OrchestratorError::CompoundReviewFailed(format!("failed to create worktree: {}", e)) })?; + let worktree_path = guard.path().to_path_buf(); - // Channel for collecting agent outputs - let (tx, mut rx) = mpsc::channel::(active_groups.len().max(1)); + let mut tasks: JoinSet = JoinSet::new(); - // Spawn agents in parallel + // Spawn agents in parallel via JoinSet so that dropping the + // parent future aborts every child task before the guard's + // synchronous `git worktree remove` runs. let mut spawned_count = 0; for group in active_groups { - let tx = tx.clone(); let group = group.clone(); - let worktree_path = worktree_path.clone(); + let worktree_path_task = worktree_path.clone(); let changed_files = changed_files.clone(); let timeout = self.config.timeout; let cli_tool = group.cli_tool.clone(); - tokio::spawn(async move { - let result = run_single_agent( + tasks.spawn(async move { + run_single_agent( &group, - &worktree_path, + &worktree_path_task, &changed_files, correlation_id, timeout, &cli_tool, ) - .await; - let _ = tx.send(result).await; + .await }); spawned_count += 1; } - // Collect results with deadline-based timeout - drop(tx); + // Collect results with deadline-based timeout. let mut agent_outputs = Vec::new(); let mut failed_count = 0; let collect_deadline = tokio::time::Instant::now() + self.config.timeout + Duration::from_secs(10); loop { - match tokio::time::timeout_at(collect_deadline, rx.recv()).await { - Ok(Some(result)) => match result { + match tokio::time::timeout_at(collect_deadline, tasks.join_next()).await { + Ok(Some(Ok(result))) => match result { AgentResult::Success(output) => { info!(agent = %output.agent, findings = output.findings.len(), "agent completed"); agent_outputs.push(output); @@ -356,7 +379,11 @@ impl CompoundReviewWorkflow { }); } }, - Ok(None) => break, // channel closed, all senders dropped + Ok(Some(Err(join_err))) => { + warn!(error = %join_err, "agent task aborted or panicked"); + failed_count += 1; + } + Ok(None) => break, // JoinSet drained, all tasks finished Err(_) => { warn!("collection deadline exceeded, using partial results"); break; @@ -364,10 +391,12 @@ impl CompoundReviewWorkflow { } } - // Cleanup worktree - if let Err(e) = self.worktree_manager.remove_worktree(&worktree_name).await { - warn!(error = %e, "failed to cleanup worktree"); - } + // No explicit cleanup: `guard` drops at end of scope and + // invokes `git worktree remove --force` synchronously. + // Suppress the unused-variable lint -- the local exists for + // its Drop side effect; it is also borrowed above via + // `guard.path()`. + let _ = &guard; // Collect all findings and deduplicate let all_findings: Vec = agent_outputs @@ -489,6 +518,15 @@ async fn run_single_agent( // Build the command with CLI-specific argument formatting let mut cmd = tokio::process::Command::new(cli_tool); + // Ensure that dropping the `Child` handle kills the underlying + // subprocess (epic #1567, Layer 1, issue #1569). Without this, + // aborting the JoinSet wrapping this task drops `Child` but does + // not signal the OS process, leaving zombie agents running until + // their own `cmd.output()` timeout (up to 30 minutes in + // production). Combined with the JoinSet abort, kill_on_drop + // gives cooperative-then-forceful shutdown on cancellation. + cmd.kill_on_drop(true); + // Determine CLI name for argument format selection let cli_name = std::path::Path::new(cli_tool) .file_name() diff --git a/crates/terraphim_orchestrator/src/scope.rs b/crates/terraphim_orchestrator/src/scope.rs index b6e611315..989ebe25c 100644 --- a/crates/terraphim_orchestrator/src/scope.rs +++ b/crates/terraphim_orchestrator/src/scope.rs @@ -4,6 +4,20 @@ use std::time::Instant; use tracing::{debug, error, info, warn}; use uuid::Uuid; +use crate::worktree_guard::WorktreeGuard; + +/// Directory-name prefix for compound-review worktrees. +/// +/// Single source of truth referenced by: +/// - `compound.rs::run` when constructing `review-` names. +/// - Layer 2 (`scope::WorktreeManager::sweep_stale`) when matching +/// stale entries on startup. +/// - Layer 3 (`scripts/adf-setup/adf-cleanup.sh`) for the operator +/// cleanup helper. +/// +/// Changes here must be mirrored in the shell script. +pub const WORKTREE_REVIEW_PREFIX: &str = "review-"; + /// Check if `prefix` is a proper path prefix of `path`. /// Ensures "src/" matches "src/main.rs" but not "src-backup/". pub(crate) fn is_path_prefix(prefix: &str, path: &str) -> bool { @@ -256,12 +270,16 @@ impl WorktreeManager { /// * `name` - Name of the worktree (used as directory name) /// * `git_ref` - Git reference (branch, tag, commit) to check out /// - /// Returns the path to the created worktree. + /// Returns a `WorktreeGuard` that owns cleanup of the worktree. + /// When the guard is dropped without `.keep()` being called, it + /// invokes `git worktree remove --force` against the repository + /// (reconciling the `/.git/worktrees/` admin entry) + /// and falls back to filesystem removal on failure. pub async fn create_worktree( &self, name: &str, git_ref: &str, - ) -> Result { + ) -> Result { let worktree_path = self.worktree_base.join(name); // Create parent directory if needed @@ -297,7 +315,7 @@ impl WorktreeManager { } info!(name = %name, path = %worktree_path.display(), "worktree created"); - Ok(worktree_path) + Ok(WorktreeGuard::for_managed(&self.repo_path, worktree_path)) } /// Remove a worktree. @@ -670,17 +688,20 @@ mod tests { let (_temp_dir, repo_path) = setup_git_repo(); let manager = WorktreeManager::new(&repo_path); - let worktree_path = manager.create_worktree("feature-branch", "HEAD").await; + let guard_result = manager.create_worktree("feature-branch", "HEAD").await; assert!( - worktree_path.is_ok(), + guard_result.is_ok(), "create_worktree failed: {:?}", - worktree_path.err() + guard_result.err() ); - let path = worktree_path.unwrap(); + let guard = guard_result.unwrap(); + let path = guard.path().to_path_buf(); assert!(path.exists()); assert!(path.join(".git").exists()); assert!(path.join("README.md").exists()); + // Disarm so guard's Drop doesn't fight the temp_dir teardown. + guard.keep(); } #[tokio::test] @@ -688,10 +709,12 @@ mod tests { let (_temp_dir, repo_path) = setup_git_repo(); let manager = WorktreeManager::new(&repo_path); - // Create worktree - manager.create_worktree("to-remove", "HEAD").await.unwrap(); + // Create worktree; keep the guard so the manual remove path + // below is what removes it (not the guard's Drop). + let guard = manager.create_worktree("to-remove", "HEAD").await.unwrap(); let path = manager.worktree_base().join("to-remove"); assert!(path.exists()); + guard.keep(); // Remove worktree let result = manager.remove_worktree("to-remove").await; @@ -714,10 +737,12 @@ mod tests { let (_temp_dir, repo_path) = setup_git_repo(); let manager = WorktreeManager::new(&repo_path); - // Create multiple worktrees - manager.create_worktree("wt1", "HEAD").await.unwrap(); - manager.create_worktree("wt2", "HEAD").await.unwrap(); - manager.create_worktree("wt3", "HEAD").await.unwrap(); + // Create multiple worktrees; disarm each guard so cleanup_all + // is what removes them (this test asserts the manager's + // own bulk path, not the guard's Drop). + manager.create_worktree("wt1", "HEAD").await.unwrap().keep(); + manager.create_worktree("wt2", "HEAD").await.unwrap().keep(); + manager.create_worktree("wt3", "HEAD").await.unwrap().keep(); let worktrees = manager.list_worktrees().unwrap(); assert_eq!(worktrees.len(), 3); @@ -739,9 +764,13 @@ mod tests { let worktrees = manager.list_worktrees().unwrap(); assert!(worktrees.is_empty()); - // Create worktrees - manager.create_worktree("wt-a", "HEAD").await.unwrap(); - manager.create_worktree("wt-b", "HEAD").await.unwrap(); + // Create worktrees; keep the guards so the worktrees survive + // long enough for list_worktrees to enumerate them. + let _g_a = manager.create_worktree("wt-a", "HEAD").await.unwrap(); + let _g_b = manager.create_worktree("wt-b", "HEAD").await.unwrap(); + // Disarm so guard Drop doesn't race the TempDir teardown. + _g_a.keep(); + _g_b.keep(); let worktrees = manager.list_worktrees().unwrap(); assert_eq!(worktrees.len(), 2); @@ -756,8 +785,9 @@ mod tests { assert!(!manager.worktree_exists("test-wt")); - manager.create_worktree("test-wt", "HEAD").await.unwrap(); + let guard = manager.create_worktree("test-wt", "HEAD").await.unwrap(); assert!(manager.worktree_exists("test-wt")); + guard.keep(); manager.remove_worktree("test-wt").await.unwrap(); assert!(!manager.worktree_exists("test-wt")); @@ -777,10 +807,86 @@ mod tests { let (_temp_dir, repo_path) = setup_git_repo(); let manager = WorktreeManager::new(&repo_path); - manager.create_worktree("duplicate", "HEAD").await.unwrap(); + let guard = manager.create_worktree("duplicate", "HEAD").await.unwrap(); // Creating duplicate should fail let result = manager.create_worktree("duplicate", "HEAD").await; assert!(result.is_err()); + guard.keep(); + } +} + +/// Test helpers exposed at module scope (under the `test-helpers` +/// feature, or when compiling the lib's own test target) so +/// integration tests under `tests/` can re-use the git repo fixture. +/// These helpers have no production callers; they exist to share +/// setup code across `scope.rs::tests` and the cancellation property +/// test. +#[doc(hidden)] +#[cfg(any(test, feature = "test-helpers"))] +pub mod test_support { + use std::path::PathBuf; + use std::process::Command; + use tempfile::TempDir; + + /// Initialise a real git repository in a fresh `TempDir` with one + /// commit. Returns the `TempDir` (caller owns the lifetime) and + /// the repo path. + pub fn setup_git_repo() -> (TempDir, PathBuf) { + std::env::remove_var("GIT_INDEX_FILE"); + + let temp_dir = TempDir::new().expect("failed to create temp dir"); + let repo_path = temp_dir.path().to_path_buf(); + + let output = Command::new("git") + .arg("init") + .arg(&repo_path) + .env_remove("GIT_INDEX_FILE") + .output() + .expect("failed to run git init"); + assert!(output.status.success(), "git init failed"); + + Command::new("git") + .arg("-C") + .arg(&repo_path) + .arg("config") + .arg("user.email") + .arg("test@test.com") + .env_remove("GIT_INDEX_FILE") + .output() + .expect("failed to config git email"); + + Command::new("git") + .arg("-C") + .arg(&repo_path) + .arg("config") + .arg("user.name") + .arg("Test User") + .env_remove("GIT_INDEX_FILE") + .output() + .expect("failed to config git name"); + + std::fs::write(repo_path.join("README.md"), "# Test Repo").expect("failed to write file"); + + Command::new("git") + .arg("-C") + .arg(&repo_path) + .arg("add") + .arg(".") + .env_remove("GIT_INDEX_FILE") + .output() + .expect("failed to git add"); + + Command::new("git") + .arg("-C") + .arg(&repo_path) + .arg("commit") + .arg("-m") + .arg("Initial commit") + .env_remove("GIT_INDEX_FILE") + .output() + .expect("failed to git commit"); + + (temp_dir, repo_path) } } diff --git a/crates/terraphim_orchestrator/src/worktree_guard.rs b/crates/terraphim_orchestrator/src/worktree_guard.rs index 3689e7cbd..79e22f3f3 100644 --- a/crates/terraphim_orchestrator/src/worktree_guard.rs +++ b/crates/terraphim_orchestrator/src/worktree_guard.rs @@ -26,19 +26,51 @@ use tracing::{debug, info, warn}; pub struct WorktreeGuard { path: PathBuf, should_cleanup: bool, + /// When `Some`, `Drop` runs `git -C worktree remove + /// --force ` first and falls back to a filesystem-only + /// removal on non-zero exit or when the git CLI is not + /// invokable. When `None`, only the filesystem path runs (the + /// existing per-agent caller in `lib.rs`, unchanged). + repo_path: Option, } impl WorktreeGuard { /// Create a new worktree guard for the given path. /// /// The path will be removed when the guard is dropped unless - /// `keep()` is called. + /// `keep()` is called. This constructor performs filesystem-only + /// cleanup; use `for_managed` for git-aware cleanup of worktrees + /// created via `WorktreeManager::create_worktree`. pub fn new>(path: P) -> Self { let path = path.as_ref().to_path_buf(); debug!(path = %path.display(), "worktree guard created"); Self { path, should_cleanup: true, + repo_path: None, + } + } + + /// Create a managed guard whose `Drop` invokes `git worktree + /// remove --force` against `repo_path` before falling back to + /// filesystem removal. + /// + /// Use this when the worktree was created via + /// `WorktreeManager::create_worktree` so the git admin registry + /// at `/.git/worktrees/` is reconciled along with the + /// directory itself. + pub fn for_managed, P: AsRef>(repo_path: R, worktree_path: P) -> Self { + let path = worktree_path.as_ref().to_path_buf(); + let repo = repo_path.as_ref().to_path_buf(); + debug!( + repo_path = %repo.display(), + worktree_path = %path.display(), + "managed worktree guard created" + ); + Self { + path, + should_cleanup: true, + repo_path: Some(repo), } } @@ -66,6 +98,49 @@ impl WorktreeGuard { return; } + // Managed path: try `git worktree remove --force` first so the + // git admin entry at `/.git/worktrees/` is + // reconciled. The synchronous std Command is intentional -- + // Drop cannot be async, and git worktree remove is sub-second. + if let Some(ref repo) = self.repo_path { + let start = std::time::Instant::now(); + let status = std::process::Command::new("git") + .arg("-C") + .arg(repo) + .arg("worktree") + .arg("remove") + .arg("--force") + .arg(&self.path) + .env_remove("GIT_INDEX_FILE") + .status(); + + match status { + Ok(s) if s.success() => { + info!( + path = %self.path.display(), + duration_ms = start.elapsed().as_millis() as u64, + "worktree cleaned up via git" + ); + return; + } + Ok(s) => { + warn!( + path = %self.path.display(), + exit_code = ?s.code(), + "git worktree remove failed, falling back to fs" + ); + } + Err(e) => { + warn!( + path = %self.path.display(), + error = %e, + "git CLI not invokable, falling back to fs" + ); + } + } + } + + // Fallback / unmanaged path: filesystem-only removal. match std::fs::remove_dir_all(&self.path) { Ok(_) => { info!(path = %self.path.display(), "worktree cleaned up"); @@ -180,4 +255,110 @@ mod tests { assert_eq!(result, 42); assert!(!worktree.exists()); } + + /// Minimal real git repo bootstrap for guard tests. Mirrors the + /// helper in `scope::tests::setup_git_repo` but kept inline here + /// so the unit tests are self-contained. + fn init_git_repo() -> TempDir { + std::env::remove_var("GIT_INDEX_FILE"); + let temp_dir = TempDir::new().expect("temp dir"); + let repo = temp_dir.path(); + let run = |args: &[&str]| { + let status = std::process::Command::new("git") + .arg("-C") + .arg(repo) + .args(args) + .env_remove("GIT_INDEX_FILE") + .status() + .expect("git invocation"); + assert!(status.success(), "git {:?} failed", args); + }; + std::process::Command::new("git") + .arg("init") + .arg(repo) + .env_remove("GIT_INDEX_FILE") + .status() + .expect("git init"); + run(&["config", "user.email", "test@test.com"]); + run(&["config", "user.name", "Test User"]); + std::fs::write(repo.join("README.md"), "# Test").unwrap(); + run(&["add", "."]); + run(&["commit", "-m", "init"]); + temp_dir + } + + #[test] + fn test_managed_guard_invokes_git_remove() { + let repo = init_git_repo(); + let worktree = repo.path().join(".worktrees/managed-remove"); + + // Use real git worktree add so the admin entry exists. + let status = std::process::Command::new("git") + .arg("-C") + .arg(repo.path()) + .arg("worktree") + .arg("add") + .arg(&worktree) + .arg("HEAD") + .env_remove("GIT_INDEX_FILE") + .status() + .expect("git worktree add"); + assert!(status.success(), "git worktree add failed"); + assert!(worktree.exists()); + // git admin registry entry exists + let admin = repo.path().join(".git/worktrees/managed-remove"); + assert!(admin.exists(), "git admin entry should exist"); + + { + let _guard = WorktreeGuard::for_managed(repo.path(), &worktree); + } + + assert!( + !worktree.exists(), + "managed guard should remove worktree dir" + ); + assert!( + !admin.exists(), + "managed guard should reconcile git admin entry" + ); + } + + #[test] + fn test_managed_guard_fallback_on_git_failure() { + // Point repo_path at a non-git directory so `git worktree + // remove` exits non-zero, exercising the fs fallback. + let temp_dir = TempDir::new().unwrap(); + let not_a_repo = temp_dir.path().join("not-a-repo"); + std::fs::create_dir(¬_a_repo).unwrap(); + + let worktree = temp_dir.path().join("orphan-worktree"); + std::fs::create_dir(&worktree).unwrap(); + File::create(worktree.join("payload.txt")).unwrap(); + + { + let _guard = WorktreeGuard::for_managed(¬_a_repo, &worktree); + } + + assert!( + !worktree.exists(), + "fallback fs removal should remove worktree dir" + ); + } + + #[test] + fn test_managed_guard_keep_disarms() { + let temp_dir = TempDir::new().unwrap(); + let fake_repo = temp_dir.path().join("repo"); + std::fs::create_dir(&fake_repo).unwrap(); + let worktree = temp_dir.path().join("kept-worktree"); + std::fs::create_dir(&worktree).unwrap(); + + let guard = WorktreeGuard::for_managed(&fake_repo, &worktree); + guard.keep(); + + assert!( + worktree.exists(), + "managed guard with keep() must not remove" + ); + } } diff --git a/crates/terraphim_orchestrator/tests/compound_cancellation_test.rs b/crates/terraphim_orchestrator/tests/compound_cancellation_test.rs new file mode 100644 index 000000000..d82c25dd9 --- /dev/null +++ b/crates/terraphim_orchestrator/tests/compound_cancellation_test.rs @@ -0,0 +1,358 @@ +//! Cancellation property test for the compound review swarm +//! (epic #1567, Layer 1, Gitea issue #1569). +//! +//! Property under test: +//! +//! > Cancelling `CompoundReviewWorkflow::run` at an arbitrary `.await` +//! > point (here via `JoinHandle::abort`) must leave **zero** worktree +//! > directories on disk and **zero** agent subprocesses alive within +//! > a bounded time (2 s). +//! +//! This is the acceptance criterion that the bigbox storm violated. +//! The test uses real git, real subprocesses, and a real worktree -- +//! no mocks. The agent subprocess is `/bin/sleep` so it does not +//! self-exit during the test window. +//! +//! Compiled only when the `test-helpers` feature is enabled (so the +//! `scope::test_support` shared fixture is visible) and on Unix +//! (the test uses `/bin/sleep` and `/proc/` for PID liveness). + +#![cfg(all(unix, feature = "test-helpers"))] + +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +use tempfile::TempDir; +use tokio::time::sleep; + +use terraphim_orchestrator::scope::{test_support::setup_git_repo, WORKTREE_REVIEW_PREFIX}; +use terraphim_orchestrator::{CompoundReviewWorkflow, ReviewGroupDef, SwarmConfig}; +use terraphim_types::FindingCategory; + +/// Static prompt content for the test group. The struct requires a +/// `&'static str`, and we do not exercise prompt parsing here. +const TEST_PROMPT: &str = "ignored-prompt"; + +/// Write a tiny shell wrapper that ignores all args and execs +/// `/bin/sleep 999`, returning the absolute path. We need this +/// because the `run_single_agent` dispatcher appends `prompt` and +/// each changed-file path as positional args; `/bin/sleep` itself +/// rejects non-numeric args. +fn write_sleep_wrapper(dir: &Path) -> PathBuf { + let script = dir.join("fake-agent.sh"); + std::fs::write( + &script, + "#!/bin/sh\n# Ignore all args; sleep long enough for the test.\nexec /bin/sleep 999\n", + ) + .expect("write wrapper script"); + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&script).unwrap().permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&script, perms).expect("chmod wrapper"); + script +} + +/// Build a SwarmConfig with a single long-sleep review group. +fn make_swarm_config(repo_path: PathBuf, worktree_root: PathBuf, cli_tool: &Path) -> SwarmConfig { + let group = ReviewGroupDef { + agent_name: "sleep-agent".to_string(), + category: FindingCategory::Quality, + llm_tier: "Quick".to_string(), + cli_tool: cli_tool.to_string_lossy().into_owned(), + model: None, + prompt_template: "test".to_string(), + prompt_content: TEST_PROMPT, + visual_only: false, + persona: None, + }; + + SwarmConfig { + groups: vec![group], + // Set the per-agent timeout very high so the subprocess does + // not exit on its own during the test window. Cancellation + // is exercised by aborting the outer JoinHandle below. + timeout: Duration::from_secs(600), + worktree_root, + repo_path, + base_branch: "HEAD".to_string(), + max_concurrent_agents: 1, + create_prs: false, + } +} + +/// Poll a closure until it returns true or the deadline elapses. +async fn poll_until(deadline: Duration, mut check: F) -> bool +where + F: FnMut() -> bool, +{ + let start = Instant::now(); + while start.elapsed() < deadline { + if check() { + return true; + } + sleep(Duration::from_millis(50)).await; + } + check() +} + +/// Collect PIDs whose `/proc//cwd` symlink resolves into the +/// given worktree subtree. Called BEFORE abort so the cwd is still +/// the live path (not ` (deleted)` after teardown). +fn collect_pids_with_cwd_under(prefix: &Path) -> Vec { + let mut pids = Vec::new(); + let proc = match std::fs::read_dir("/proc") { + Ok(p) => p, + Err(_) => return pids, + }; + for entry in proc.flatten() { + let name = entry.file_name(); + let name_str = match name.to_str() { + Some(s) => s, + None => continue, + }; + let pid: u32 = match name_str.parse() { + Ok(n) => n, + Err(_) => continue, + }; + let cwd_link = entry.path().join("cwd"); + if let Ok(target) = std::fs::read_link(&cwd_link) { + if target.starts_with(prefix) { + pids.push(pid); + } + } + } + pids +} + +/// Return true if `/proc/` no longer exists (process gone). +fn pid_is_gone(pid: u32) -> bool { + !PathBuf::from(format!("/proc/{}", pid)).exists() +} + +/// Return the path of the first `review-*` directory under `base`, +/// or None if none exists. +fn first_review_dir(base: &Path) -> Option { + let entries = std::fs::read_dir(base).ok()?; + for entry in entries.flatten() { + let name = entry.file_name(); + if let Some(s) = name.to_str() { + if s.starts_with(WORKTREE_REVIEW_PREFIX) { + return Some(entry.path()); + } + } + } + None +} + +/// Return true if any `review-*` directory exists under `base`. +fn any_review_dir_exists(base: &Path) -> bool { + let Ok(entries) = std::fs::read_dir(base) else { + return false; + }; + for entry in entries.flatten() { + let name = entry.file_name(); + if let Some(s) = name.to_str() { + if s.starts_with(WORKTREE_REVIEW_PREFIX) && entry.path().is_dir() { + return true; + } + } + } + false +} + +/// Return true if any `.git/worktrees/review-*` admin entry exists. +fn any_review_admin_entry(repo: &Path) -> bool { + let admin_root = repo.join(".git").join("worktrees"); + let Ok(entries) = std::fs::read_dir(&admin_root) else { + return false; + }; + for entry in entries.flatten() { + let name = entry.file_name(); + if let Some(s) = name.to_str() { + if s.starts_with(WORKTREE_REVIEW_PREFIX) { + return true; + } + } + } + false +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_cancellation_leaves_no_worktree() { + // 1. Real git repo with a single commit. + let (_repo_tempdir, repo_path) = setup_git_repo(); + + // 2. Worktree base under a separate tempdir so we can scan it. + let wt_tempdir = TempDir::new().expect("worktree tempdir"); + let worktree_root = wt_tempdir.path().to_path_buf(); + let script_tempdir = TempDir::new().expect("script tempdir"); + let cli_tool = write_sleep_wrapper(script_tempdir.path()); + + let swarm = make_swarm_config(repo_path.clone(), worktree_root.clone(), &cli_tool); + let workflow = CompoundReviewWorkflow::new(swarm); + + // 3. Spawn `workflow.run("HEAD", "HEAD")` so changed_files is + // empty (no diff against itself). The fake cli_tool wrapper + // ignores any args and execs `/bin/sleep 999`, so the + // subprocess is guaranteed to be live when we abort. + let handle = tokio::spawn(async move { workflow.run("HEAD", "HEAD").await }); + + // 4. Wait up to 5 s for the worktree to be created. + let appeared = poll_until(Duration::from_secs(5), || { + first_review_dir(&worktree_root).is_some() + }) + .await; + assert!( + appeared, + "worktree under {} never appeared within 5 s", + worktree_root.display() + ); + + let review_dir = first_review_dir(&worktree_root).unwrap(); + println!("worktree created at {}", review_dir.display()); + + // 5. Poll until at least one subprocess has its `cwd` inside the + // worktree. This validates the test's premise: the fake-agent + // wrapper actually got far enough to spawn `/bin/sleep`. We + // snapshot PIDs BEFORE abort so the readlink resolves the + // live path (after teardown, /proc//cwd renders as + // " (deleted)" which would defeat path matching). + let mut subprocess_pids: Vec = Vec::new(); + let subprocess_live = poll_until(Duration::from_secs(5), || { + subprocess_pids = collect_pids_with_cwd_under(&review_dir); + !subprocess_pids.is_empty() + }) + .await; + assert!( + subprocess_live, + "no agent subprocess ever spawned with cwd under {}", + review_dir.display() + ); + println!("captured agent PIDs before abort: {:?}", subprocess_pids); + + // 6. Abort the outer task. Locals in `run` drop in reverse + // declaration order: tasks (JoinSet) drops first -> agent + // tasks aborted -> Child kill-on-drop fires -> subprocesses + // die. THEN the guard drops -> `git worktree remove --force`. + handle.abort(); + // Await so the runtime processes the abort. Result is ignored; + // it may be Cancelled or Err. + let _ = handle.await; + + // 7. Within 2 s assert every captured PID is gone. This is the + // discriminating "no zombie agents" check: if `kill_on_drop` + // were missing, the JoinSet abort would drop the Child handle + // without signalling the OS process, and the sleep would + // still be live here. + let pids_for_assert = subprocess_pids.clone(); + let no_zombie = poll_until(Duration::from_secs(2), || { + pids_for_assert.iter().all(|p| pid_is_gone(*p)) + }) + .await; + let still_alive: Vec = pids_for_assert + .iter() + .copied() + .filter(|p| !pid_is_gone(*p)) + .collect(); + assert!( + no_zombie, + "agent subprocess(es) survived cancellation: {:?}", + still_alive + ); + + // 8. Within 2 s assert no `review-*` dir remains under the base. + let dir_gone = poll_until(Duration::from_secs(2), || { + !any_review_dir_exists(&worktree_root) + }) + .await; + assert!( + dir_gone, + "worktree directory under {} survived cancellation: dir_exists={}, list={:?}", + worktree_root.display(), + any_review_dir_exists(&worktree_root), + std::fs::read_dir(&worktree_root) + .map(|d| d + .flatten() + .map(|e| e.file_name().to_string_lossy().to_string()) + .collect::>()) + .unwrap_or_default() + ); + + // 9. Assert no `.git/worktrees/review-*` admin entry remains. + let admin_gone = poll_until(Duration::from_secs(2), || { + !any_review_admin_entry(&repo_path) + }) + .await; + assert!( + admin_gone, + "git admin entry under {}/.git/worktrees survived", + repo_path.display() + ); +} + +/// Storm-property variant: the Layer 0 cursor bug allowed +/// `check_cron_schedules` to fire the compound review repeatedly when +/// the reconcile tick cancellation kept the cursor unadvanced. The +/// property under Layer 1 is: even if the schedule re-fires before +/// the previous run finishes, every cancelled run leaves zero +/// worktrees on disk. We simulate the storm by spawning two +/// `run("HEAD", "HEAD")` calls back-to-back and aborting both. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_storm_cancellation_leaves_no_worktree() { + let (_repo_tempdir, repo_path) = setup_git_repo(); + let wt_tempdir = TempDir::new().expect("worktree tempdir"); + let worktree_root = wt_tempdir.path().to_path_buf(); + let script_tempdir = TempDir::new().expect("script tempdir"); + let cli_tool = write_sleep_wrapper(script_tempdir.path()); + + let swarm_a = make_swarm_config(repo_path.clone(), worktree_root.clone(), &cli_tool); + let workflow_a = CompoundReviewWorkflow::new(swarm_a); + let handle_a = tokio::spawn(async move { workflow_a.run("HEAD", "HEAD").await }); + + // Wait for the first worktree dir, then trigger a second run + // before the first finishes. + let first_appeared = poll_until(Duration::from_secs(5), || { + first_review_dir(&worktree_root).is_some() + }) + .await; + assert!(first_appeared, "first worktree did not appear"); + + let swarm_b = make_swarm_config(repo_path.clone(), worktree_root.clone(), &cli_tool); + let workflow_b = CompoundReviewWorkflow::new(swarm_b); + let handle_b = tokio::spawn(async move { workflow_b.run("HEAD", "HEAD").await }); + + // Brief yield so the second run advances past `create_worktree` + // (which collides on the same name only if the UUIDs happened + // to match; in practice they do not, so this is a second + // distinct worktree). + sleep(Duration::from_millis(200)).await; + + handle_a.abort(); + handle_b.abort(); + let _ = handle_a.await; + let _ = handle_b.await; + + // Within 5 s assert no `review-*` dir remains. Storm-shaped + // cancellation must drain to zero. + let dir_gone = poll_until(Duration::from_secs(5), || { + !any_review_dir_exists(&worktree_root) + }) + .await; + assert!( + dir_gone, + "storm-cancelled runs left worktree(s) on disk under {}: {:?}", + worktree_root.display(), + std::fs::read_dir(&worktree_root) + .map(|d| d + .flatten() + .map(|e| e.file_name().to_string_lossy().to_string()) + .collect::>()) + .unwrap_or_default() + ); + + let admin_gone = poll_until(Duration::from_secs(5), || { + !any_review_admin_entry(&repo_path) + }) + .await; + assert!(admin_gone, "git admin entries survived storm cancellation"); +}