Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/terraphim_orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ repository = "https://github.com/terraphim/terraphim-ai"
default = ["quickwit"]
quickwit = ["dep:reqwest", "dep:reqwest-middleware", "dep:reqwest-retry"]
evolution = ["dep:terraphim_agent_evolution"]
# Expose `scope::test_support` to integration tests under `tests/`.
# The dependency on `tempfile` is also pulled in by `[dev-dependencies]`,
# so this feature is effectively a no-op for non-test consumers, but
# the cfg gate keeps the helper out of the production tree.
test-helpers = ["dep:tempfile"]

[dependencies]
# Terraphim internal crates
Expand Down Expand Up @@ -76,11 +81,26 @@ reqwest-middleware = { workspace = true, optional = true }
reqwest-retry = { workspace = true, optional = true }
ulid = "1.2.1"

# Optional: enabled by the `test-helpers` feature so that the
# `scope::test_support` module can be shared with integration tests.
tempfile = { workspace = true, optional = true }

[dev-dependencies]
tokio-test = "0.4"
tempfile = { workspace = true }
serial_test = "3"

# Layer 1 cancellation property test (epic #1567 / issue #1569).
# Requires the `test-helpers` feature to access
# `scope::test_support::setup_git_repo`. Declared explicitly so
# `cargo test --all-features` picks it up; the source file is also
# `#![cfg(all(unix, feature = "test-helpers"))]` for defensive
# double-gating.
[[test]]
name = "compound_cancellation_test"
path = "tests/compound_cancellation_test.rs"
required-features = ["test-helpers"]


[[bin]]
name = "adf"
Expand Down
86 changes: 62 additions & 24 deletions crates/terraphim_orchestrator/src/compound.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tracing::{debug, info, warn};
use uuid::Uuid;

use terraphim_types::{FindingCategory, FindingSeverity, ReviewAgentOutput, ReviewFinding};

use crate::config::CompoundReviewConfig;
use crate::error::OrchestratorError;
use crate::scope::WorktreeManager;
use crate::scope::{WorktreeManager, WORKTREE_REVIEW_PREFIX};

// Embed prompt templates at compile time to avoid CWD-dependent file loading.
// The ADF binary may run from /opt/ai-dark-factory/ but templates live in the
Expand Down Expand Up @@ -293,54 +293,77 @@ impl CompoundReviewWorkflow {
"filtered review groups"
);

// Create worktree for this review
let worktree_name = format!("review-{}", correlation_id);
let worktree_path = self
// Create worktree for this review.
//
// Drop-ordering invariant (epic #1567, Layer 1, issue #1569):
//
// `guard` MUST be declared BEFORE `tasks`. Locals drop in
// reverse declaration order, so:
// 1. `tasks: JoinSet` drops FIRST, aborting every spawned
// agent task. Because `Command::kill_on_drop(true)` is
// set in `run_single_agent`, each task's Drop kills its
// child subprocess synchronously (SIGKILL via the
// `tokio::process::Child` Drop wired through the kill-
// on-drop bit).
// 2. `guard: WorktreeGuard` drops LAST, running
// `git -C <repo> worktree remove --force <path>` (with
// a filesystem fallback). The git admin entry at
// `<repo>/.git/worktrees/<name>` is reconciled along
// with the directory.
//
// Inverting this order recreates the worktree storm race:
// the guard would remove the worktree while subprocesses
// still hold open file handles into it, then JoinSet abort
// would orphan those subprocesses against a torn-down git
// admin registry. The cancellation property test in
// `tests/compound_cancellation_test.rs` encodes this
// invariant; do NOT reorder these two locals.
let worktree_name = format!("{}{}", WORKTREE_REVIEW_PREFIX, correlation_id);
let guard = self
.worktree_manager
.create_worktree(&worktree_name, git_ref)
.await
.map_err(|e| {
OrchestratorError::CompoundReviewFailed(format!("failed to create worktree: {}", e))
})?;
let worktree_path = guard.path().to_path_buf();

// Channel for collecting agent outputs
let (tx, mut rx) = mpsc::channel::<AgentResult>(active_groups.len().max(1));
let mut tasks: JoinSet<AgentResult> = JoinSet::new();

// Spawn agents in parallel
// Spawn agents in parallel via JoinSet so that dropping the
// parent future aborts every child task before the guard's
// synchronous `git worktree remove` runs.
let mut spawned_count = 0;
for group in active_groups {
let tx = tx.clone();
let group = group.clone();
let worktree_path = worktree_path.clone();
let worktree_path_task = worktree_path.clone();
let changed_files = changed_files.clone();
let timeout = self.config.timeout;
let cli_tool = group.cli_tool.clone();

tokio::spawn(async move {
let result = run_single_agent(
tasks.spawn(async move {
run_single_agent(
&group,
&worktree_path,
&worktree_path_task,
&changed_files,
correlation_id,
timeout,
&cli_tool,
)
.await;
let _ = tx.send(result).await;
.await
});
spawned_count += 1;
}

// Collect results with deadline-based timeout
drop(tx);
// Collect results with deadline-based timeout.
let mut agent_outputs = Vec::new();
let mut failed_count = 0;
let collect_deadline =
tokio::time::Instant::now() + self.config.timeout + Duration::from_secs(10);

loop {
match tokio::time::timeout_at(collect_deadline, rx.recv()).await {
Ok(Some(result)) => match result {
match tokio::time::timeout_at(collect_deadline, tasks.join_next()).await {
Ok(Some(Ok(result))) => match result {
AgentResult::Success(output) => {
info!(agent = %output.agent, findings = output.findings.len(), "agent completed");
agent_outputs.push(output);
Expand All @@ -356,18 +379,24 @@ impl CompoundReviewWorkflow {
});
}
},
Ok(None) => break, // channel closed, all senders dropped
Ok(Some(Err(join_err))) => {
warn!(error = %join_err, "agent task aborted or panicked");
failed_count += 1;
}
Ok(None) => break, // JoinSet drained, all tasks finished
Err(_) => {
warn!("collection deadline exceeded, using partial results");
break;
}
}
}

// Cleanup worktree
if let Err(e) = self.worktree_manager.remove_worktree(&worktree_name).await {
warn!(error = %e, "failed to cleanup worktree");
}
// No explicit cleanup: `guard` drops at end of scope and
// invokes `git worktree remove --force` synchronously.
// Suppress the unused-variable lint -- the local exists for
// its Drop side effect; it is also borrowed above via
// `guard.path()`.
let _ = &guard;

// Collect all findings and deduplicate
let all_findings: Vec<ReviewFinding> = agent_outputs
Expand Down Expand Up @@ -489,6 +518,15 @@ async fn run_single_agent(
// Build the command with CLI-specific argument formatting
let mut cmd = tokio::process::Command::new(cli_tool);

// Ensure that dropping the `Child` handle kills the underlying
// subprocess (epic #1567, Layer 1, issue #1569). Without this,
// aborting the JoinSet wrapping this task drops `Child` but does
// not signal the OS process, leaving zombie agents running until
// their own `cmd.output()` timeout (up to 30 minutes in
// production). Combined with the JoinSet abort, kill_on_drop
// gives cooperative-then-forceful shutdown on cancellation.
cmd.kill_on_drop(true);

// Determine CLI name for argument format selection
let cli_name = std::path::Path::new(cli_tool)
.file_name()
Expand Down
Loading
Loading