diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index cd3e93c9cb2c..89ea9611c046 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2708,6 +2708,7 @@ dependencies = [ "tokio", "tokio-tungstenite", "tokio-util", + "toml 0.9.11+spec-1.1.0", "tracing", "uuid", "wiremock", diff --git a/codex-rs/app-server-client/src/lib.rs b/codex-rs/app-server-client/src/lib.rs index 0911cc448dd3..b9e95ef84fd1 100644 --- a/codex-rs/app-server-client/src/lib.rs +++ b/codex-rs/app-server-client/src/lib.rs @@ -50,6 +50,7 @@ pub use codex_core::StateDbHandle; use codex_core::config::Config; pub use codex_exec_server::EnvironmentManager; pub use codex_exec_server::EnvironmentManagerArgs; +pub use codex_exec_server::EnvironmentResolver; pub use codex_exec_server::ExecServerRuntimePaths; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 4013bbe76bc9..80f00f60807e 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -52,6 +52,7 @@ use codex_core::check_execpolicy_for_warnings; use codex_core::config::find_codex_home; use codex_core::init_state_db_from_config; use codex_exec_server::EnvironmentManager; +use codex_exec_server::EnvironmentResolver; use codex_exec_server::ExecServerRuntimePaths; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; @@ -371,15 +372,19 @@ pub enum PluginStartupTasks { Skip, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct AppServerRuntimeOptions { pub plugin_startup_tasks: PluginStartupTasks, + /// Optional resolver passed through to the environment manager. App-server + /// still uses strict environment-id lookup today. + pub environment_resolver: Option>, } impl Default for AppServerRuntimeOptions { fn default() -> Self { Self { plugin_startup_tasks: PluginStartupTasks::Start, + environment_resolver: None, } } } @@ -417,15 +422,20 @@ pub async fn run_main_with_transport_options( auth: AppServerWebsocketAuthSettings, runtime_options: AppServerRuntimeOptions, ) -> IoResult<()> { - let environment_manager = Arc::new( - EnvironmentManager::new(EnvironmentManagerArgs::new( - ExecServerRuntimePaths::from_optional_paths( - arg0_paths.codex_self_exe.clone(), - arg0_paths.codex_linux_sandbox_exe.clone(), - )?, - )) - .await, - ); + let AppServerRuntimeOptions { + plugin_startup_tasks, + environment_resolver, + } = runtime_options; + let mut environment_manager_args = + EnvironmentManagerArgs::new(ExecServerRuntimePaths::from_optional_paths( + arg0_paths.codex_self_exe.clone(), + arg0_paths.codex_linux_sandbox_exe.clone(), + )?); + if let Some(environment_resolver) = environment_resolver { + environment_manager_args = + environment_manager_args.with_environment_resolver(environment_resolver); + } + let environment_manager = Arc::new(EnvironmentManager::new(environment_manager_args).await); let (transport_event_tx, mut transport_event_rx) = mpsc::channel::(CHANNEL_CAPACITY); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); @@ -765,7 +775,7 @@ pub async fn run_main_with_transport_options( auth_manager, rpc_transport: analytics_rpc_transport(&transport), remote_control_handle: Some(remote_control_handle.clone()), - plugin_startup_tasks: runtime_options.plugin_startup_tasks, + plugin_startup_tasks, })); let mut thread_created_rx = processor.thread_created_receiver(); let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count(); diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 13dd026654c0..6311e8fe1d93 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -73,6 +73,7 @@ const SUBMIT_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug)] pub struct TestEnv { environment: codex_exec_server::Environment, + exec_server_url: Option, cwd: AbsolutePathBuf, local_cwd_temp_dir: Option>, remote_container_name: Option, @@ -86,6 +87,7 @@ impl TestEnv { codex_exec_server::Environment::create_for_tests(/*exec_server_url*/ None)?; Ok(Self { environment, + exec_server_url: None, cwd, local_cwd_temp_dir: Some(local_cwd_temp_dir), remote_container_name: None, @@ -100,10 +102,6 @@ impl TestEnv { &self.environment } - pub fn exec_server_url(&self) -> Option<&str> { - self.environment.exec_server_url() - } - fn local_cwd_temp_dir(&self) -> Option> { self.local_cwd_temp_dir.clone() } @@ -123,7 +121,7 @@ pub async fn test_env() -> Result { Some(remote_env) => { let websocket_url = remote_exec_server_url()?; let environment = - codex_exec_server::Environment::create_for_tests(Some(websocket_url))?; + codex_exec_server::Environment::create_for_tests(Some(websocket_url.clone()))?; let cwd = remote_aware_cwd_path(); environment .get_filesystem() @@ -135,6 +133,7 @@ pub async fn test_env() -> Result { .await?; Ok(TestEnv { environment, + exec_server_url: Some(websocket_url), cwd, local_cwd_temp_dir: None, remote_container_name: Some(remote_env.container_name), @@ -385,7 +384,7 @@ impl TestCodexBuilder { let exec_server_url = self .exec_server_url .clone() - .or_else(|| test_env.exec_server_url().map(str::to_owned)); + .or_else(|| test_env.exec_server_url.clone()); let local_runtime_paths = codex_exec_server::ExecServerRuntimePaths::new( std::env::current_exe()?, /*codex_linux_sandbox_exe*/ None, diff --git a/codex-rs/exec-server/BUILD.bazel b/codex-rs/exec-server/BUILD.bazel index 57ebe041f8cb..237d3a7f2b0d 100644 --- a/codex-rs/exec-server/BUILD.bazel +++ b/codex-rs/exec-server/BUILD.bazel @@ -3,6 +3,9 @@ load("//:defs.bzl", "codex_rust_crate") codex_rust_crate( name = "exec-server", crate_name = "codex_exec_server", + deps_extra = [ + "@crates//:toml", + ], # Keep the crate's integration tests single-threaded under Bazel because # they install process-global test-binary dispatch state, and the remote # exec-server cases already rely on serialization around the full CLI path. diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 1495397c7828..c466a234c1ed 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -28,6 +28,7 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = { workspace = true } thiserror = { workspace = true } +toml = { workspace = true } tokio = { workspace = true, features = [ "fs", "io-std", diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 47359393d368..55cd99b6c9c2 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -17,13 +17,14 @@ use tokio::sync::mpsc; use tokio::sync::watch; use tokio::time::timeout; -use tokio_tungstenite::connect_async; use tracing::debug; use crate::ProcessId; use crate::client_api::ExecServerClientConnectOptions; +use crate::client_api::ExecServerTransportParams; use crate::client_api::HttpClient; use crate::client_api::RemoteExecServerConnectArgs; +use crate::client_api::StdioExecServerConnectArgs; use crate::connection::JsonRpcConnection; use crate::process::ExecProcessEvent; use crate::process::ExecProcessEventLog; @@ -105,6 +106,16 @@ impl From for ExecServerClientConnectOptions { } } +impl From for ExecServerClientConnectOptions { + fn from(value: StdioExecServerConnectArgs) -> Self { + Self { + client_name: value.client_name, + initialize_timeout: value.initialize_timeout, + resume_session_id: value.resume_session_id, + } + } +} + impl RemoteExecServerConnectArgs { pub fn new(websocket_url: String, client_name: String) -> Self { Self { @@ -180,29 +191,25 @@ pub struct ExecServerClient { #[derive(Clone)] pub(crate) struct LazyRemoteExecServerClient { - websocket_url: String, + transport_params: ExecServerTransportParams, client: Arc>, } impl LazyRemoteExecServerClient { - pub(crate) fn new(websocket_url: String) -> Self { + pub(crate) fn new(transport_params: ExecServerTransportParams) -> Self { Self { - websocket_url, + transport_params, client: Arc::new(OnceCell::new()), } } pub(crate) async fn get(&self) -> Result { self.client - .get_or_try_init(|| async { - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs { - websocket_url: self.websocket_url.clone(), - client_name: "codex-environment".to_string(), - connect_timeout: Duration::from_secs(5), - initialize_timeout: Duration::from_secs(5), - resume_session_id: None, - }) - .await + // TODO: Add reconnect/disconnect handling here instead of reusing + // the first successfully initialized connection forever. + .get_or_try_init(|| { + let transport_params = self.transport_params.clone(); + async move { ExecServerClient::connect_for_transport(transport_params).await } }) .await .cloned() @@ -269,32 +276,6 @@ pub enum ExecServerError { } impl ExecServerClient { - pub async fn connect_websocket( - args: RemoteExecServerConnectArgs, - ) -> Result { - let websocket_url = args.websocket_url.clone(); - let connect_timeout = args.connect_timeout; - let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str())) - .await - .map_err(|_| ExecServerError::WebSocketConnectTimeout { - url: websocket_url.clone(), - timeout: connect_timeout, - })? - .map_err(|source| ExecServerError::WebSocketConnect { - url: websocket_url.clone(), - source, - })?; - - Self::connect( - JsonRpcConnection::from_websocket( - stream, - format!("exec-server websocket {websocket_url}"), - ), - args.into(), - ) - .await - } - pub async fn initialize( &self, options: ExecServerClientConnectOptions, @@ -443,7 +424,7 @@ impl ExecServerClient { .clone() } - async fn connect( + pub(crate) async fn connect( connection: JsonRpcConnection, options: ExecServerClientConnectOptions, ) -> Result { @@ -893,18 +874,28 @@ mod tests { use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use pretty_assertions::assert_eq; + use std::collections::HashMap; + #[cfg(unix)] + use std::path::Path; + #[cfg(unix)] + use std::process::Command; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::io::duplex; use tokio::sync::mpsc; + use tokio::sync::oneshot; use tokio::time::Duration; + #[cfg(unix)] + use tokio::time::sleep; use tokio::time::timeout; use super::ExecServerClient; use super::ExecServerClientConnectOptions; use crate::ProcessId; + use crate::client_api::StdioExecServerCommand; + use crate::client_api::StdioExecServerConnectArgs; use crate::connection::JsonRpcConnection; use crate::process::ExecProcessEvent; use crate::protocol::EXEC_CLOSED_METHOD; @@ -942,6 +933,162 @@ mod tests { .expect("json-rpc line should write"); } + #[cfg(not(windows))] + #[tokio::test] + async fn connect_stdio_command_initializes_json_rpc_client() { + let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs { + command: StdioExecServerCommand { + program: "sh".to_string(), + args: vec![ + "-c".to_string(), + "read _line; printf '%s\\n' '{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'; read _line; sleep 60".to_string(), + ], + env: HashMap::new(), + cwd: None, + }, + client_name: "stdio-test-client".to_string(), + initialize_timeout: Duration::from_secs(1), + resume_session_id: None, + }) + .await + .expect("stdio client should connect"); + + assert_eq!(client.session_id().as_deref(), Some("stdio-test")); + } + + #[cfg(windows)] + #[tokio::test] + async fn connect_stdio_command_initializes_json_rpc_client_on_windows() { + let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs { + command: StdioExecServerCommand { + program: "powershell".to_string(), + args: vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + "$null = [Console]::In.ReadLine(); [Console]::Out.WriteLine('{\"id\":1,\"result\":{\"sessionId\":\"stdio-test\"}}'); $null = [Console]::In.ReadLine(); Start-Sleep -Seconds 60".to_string(), + ], + env: HashMap::new(), + cwd: None, + }, + client_name: "stdio-test-client".to_string(), + initialize_timeout: Duration::from_secs(1), + resume_session_id: None, + }) + .await + .expect("stdio client should connect"); + + assert_eq!(client.session_id().as_deref(), Some("stdio-test")); + } + + #[cfg(unix)] + #[tokio::test] + async fn dropping_stdio_client_terminates_spawned_process() { + let tempdir = tempfile::tempdir().expect("tempdir should be created"); + let pid_file = tempdir.path().join("server.pid"); + let stdio_script = format!( + "read _line; \ + echo \"$$\" > {}; \ + printf '%s\\n' '{{\"id\":1,\"result\":{{\"sessionId\":\"stdio-test\"}}}}'; \ + read _line; \ + sleep 60", + shell_quote(pid_file.as_path()), + ); + + let client = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs { + command: StdioExecServerCommand { + program: "sh".to_string(), + args: vec!["-c".to_string(), stdio_script], + env: HashMap::new(), + cwd: None, + }, + client_name: "stdio-test-client".to_string(), + initialize_timeout: Duration::from_secs(1), + resume_session_id: None, + }) + .await + .expect("stdio client should connect"); + let server_pid = read_pid_file(pid_file.as_path()).await; + assert!( + process_exists(server_pid), + "spawned stdio process should be running before client drop" + ); + + drop(client); + + wait_for_process_exit(server_pid).await; + } + + #[cfg(unix)] + #[tokio::test] + async fn malformed_stdio_message_terminates_spawned_process() { + let tempdir = tempfile::tempdir().expect("tempdir should be created"); + let pid_file = tempdir.path().join("server.pid"); + let stdio_script = format!( + "read _line; \ + echo \"$$\" > {}; \ + printf '%s\\n' 'not-json'; \ + sleep 60", + shell_quote(pid_file.as_path()), + ); + + let result = ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs { + command: StdioExecServerCommand { + program: "sh".to_string(), + args: vec!["-c".to_string(), stdio_script], + env: HashMap::new(), + cwd: None, + }, + client_name: "stdio-test-client".to_string(), + initialize_timeout: Duration::from_secs(1), + resume_session_id: None, + }) + .await; + assert!(result.is_err(), "malformed stdio server should not connect"); + + let server_pid = read_pid_file(pid_file.as_path()).await; + wait_for_process_exit(server_pid).await; + } + + #[cfg(unix)] + async fn read_pid_file(path: &Path) -> u32 { + for _ in 0..20 { + if let Ok(contents) = std::fs::read_to_string(path) { + return contents + .trim() + .parse() + .expect("pid file should contain a pid"); + } + sleep(Duration::from_millis(50)).await; + } + panic!("pid file {} should be written", path.display()); + } + + #[cfg(unix)] + async fn wait_for_process_exit(pid: u32) { + for _ in 0..20 { + if !process_exists(pid) { + return; + } + sleep(Duration::from_millis(100)).await; + } + panic!("process {pid} should exit"); + } + + #[cfg(unix)] + fn process_exists(pid: u32) -> bool { + Command::new("kill") + .arg("-0") + .arg(pid.to_string()) + .status() + .is_ok_and(|status| status.success()) + } + + #[cfg(unix)] + fn shell_quote(path: &Path) -> String { + let value = path.to_string_lossy(); + format!("'{}'", value.replace('\'', "'\\''")) + } + #[tokio::test] async fn process_events_are_delivered_in_seq_order_when_notifications_are_reordered() { let (client_stdin, server_reader) = duplex(1 << 20); @@ -1085,6 +1232,92 @@ mod tests { server.await.expect("server task should finish"); } + #[tokio::test] + async fn transport_disconnect_fails_sessions_and_rejects_new_sessions() { + let (client_stdin, server_reader) = duplex(1 << 20); + let (mut server_writer, client_stdout) = duplex(1 << 20); + let (disconnect_tx, disconnect_rx) = oneshot::channel(); + let server = tokio::spawn(async move { + let mut lines = BufReader::new(server_reader).lines(); + let initialize = read_jsonrpc_line(&mut lines).await; + let request = match initialize { + JSONRPCMessage::Request(request) if request.method == INITIALIZE_METHOD => request, + other => panic!("expected initialize request, got {other:?}"), + }; + write_jsonrpc_line( + &mut server_writer, + JSONRPCMessage::Response(JSONRPCResponse { + id: request.id, + result: serde_json::to_value(InitializeResponse { + session_id: "session-1".to_string(), + }) + .expect("initialize response should serialize"), + }), + ) + .await; + + let initialized = read_jsonrpc_line(&mut lines).await; + match initialized { + JSONRPCMessage::Notification(notification) + if notification.method == INITIALIZED_METHOD => {} + other => panic!("expected initialized notification, got {other:?}"), + } + + let _ = disconnect_rx.await; + drop(server_writer); + }); + + let client = ExecServerClient::connect( + JsonRpcConnection::from_stdio( + client_stdout, + client_stdin, + "test-exec-server-client".to_string(), + ), + ExecServerClientConnectOptions::default(), + ) + .await + .expect("client should connect"); + + let process_id = ProcessId::from("disconnect"); + let session = client + .register_session(&process_id) + .await + .expect("session should register"); + let mut events = session.subscribe_events(); + + disconnect_tx.send(()).expect("disconnect should signal"); + + let event = timeout(Duration::from_secs(1), events.recv()) + .await + .expect("session failure should not time out") + .expect("session event stream should stay open"); + let ExecProcessEvent::Failed(message) = event else { + panic!("expected session failure after disconnect, got {event:?}"); + }; + assert_eq!(message, "exec-server transport disconnected"); + + let response = session + .read( + /*after_seq*/ None, /*max_bytes*/ None, /*wait_ms*/ None, + ) + .await + .expect("disconnected session read should synthesize a response"); + assert_eq!( + response.failure.as_deref(), + Some("exec-server transport disconnected") + ); + assert!(response.closed); + + let new_session = client.register_session(&ProcessId::from("new")).await; + assert!(matches!( + new_session, + Err(super::ExecServerError::Disconnected(_)) + )); + + drop(client); + server.await.expect("server task should finish"); + } + #[tokio::test] async fn wake_notifications_do_not_block_other_sessions() { let (client_stdin, server_reader) = duplex(1 << 20); diff --git a/codex-rs/exec-server/src/client_api.rs b/codex-rs/exec-server/src/client_api.rs index b1761b69f11b..8adfadd6e705 100644 --- a/codex-rs/exec-server/src/client_api.rs +++ b/codex-rs/exec-server/src/client_api.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::path::PathBuf; use std::time::Duration; use futures::future::BoxFuture; @@ -25,6 +27,32 @@ pub struct RemoteExecServerConnectArgs { pub resume_session_id: Option, } +/// Stdio connection arguments for a command-backed exec-server. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct StdioExecServerConnectArgs { + pub command: StdioExecServerCommand, + pub client_name: String, + pub initialize_timeout: Duration, + pub resume_session_id: Option, +} + +/// Structured process command used to start an exec-server over stdio. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct StdioExecServerCommand { + pub program: String, + pub args: Vec, + pub env: HashMap, + pub cwd: Option, +} + +/// Parameters used to connect to a remote exec-server environment. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum ExecServerTransportParams { + WebSocketUrl(String), + #[allow(dead_code)] + StdioCommand(StdioExecServerCommand), +} + /// Sends HTTP requests through a runtime-selected transport. /// /// This is the HTTP capability counterpart to [`crate::ExecBackend`]. Callers diff --git a/codex-rs/exec-server/src/client_transport.rs b/codex-rs/exec-server/src/client_transport.rs new file mode 100644 index 000000000000..560630e3db79 --- /dev/null +++ b/codex-rs/exec-server/src/client_transport.rs @@ -0,0 +1,125 @@ +use std::process::Stdio; +use std::time::Duration; + +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Command; +use tokio::time::timeout; +use tokio_tungstenite::connect_async; +use tracing::debug; +use tracing::warn; + +use crate::ExecServerClient; +use crate::ExecServerError; +use crate::client_api::RemoteExecServerConnectArgs; +use crate::client_api::StdioExecServerCommand; +use crate::client_api::StdioExecServerConnectArgs; +use crate::connection::JsonRpcConnection; + +const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment"; +const ENVIRONMENT_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5); + +impl ExecServerClient { + pub(crate) async fn connect_for_transport( + transport_params: crate::client_api::ExecServerTransportParams, + ) -> Result { + match transport_params { + crate::client_api::ExecServerTransportParams::WebSocketUrl(websocket_url) => { + Self::connect_websocket(RemoteExecServerConnectArgs { + websocket_url, + client_name: ENVIRONMENT_CLIENT_NAME.to_string(), + connect_timeout: ENVIRONMENT_CONNECT_TIMEOUT, + initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT, + resume_session_id: None, + }) + .await + } + crate::client_api::ExecServerTransportParams::StdioCommand(command) => { + Self::connect_stdio_command(StdioExecServerConnectArgs { + command, + client_name: ENVIRONMENT_CLIENT_NAME.to_string(), + initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT, + resume_session_id: None, + }) + .await + } + } + } + + pub async fn connect_websocket( + args: RemoteExecServerConnectArgs, + ) -> Result { + let websocket_url = args.websocket_url.clone(); + let connect_timeout = args.connect_timeout; + let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str())) + .await + .map_err(|_| ExecServerError::WebSocketConnectTimeout { + url: websocket_url.clone(), + timeout: connect_timeout, + })? + .map_err(|source| ExecServerError::WebSocketConnect { + url: websocket_url.clone(), + source, + })?; + + Self::connect( + JsonRpcConnection::from_websocket( + stream, + format!("exec-server websocket {websocket_url}"), + ), + args.into(), + ) + .await + } + + pub(crate) async fn connect_stdio_command( + args: StdioExecServerConnectArgs, + ) -> Result { + let mut child = stdio_command_process(&args.command) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(ExecServerError::Spawn)?; + + let stdin = child.stdin.take().ok_or_else(|| { + ExecServerError::Protocol("spawned exec-server command has no stdin".to_string()) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + ExecServerError::Protocol("spawned exec-server command has no stdout".to_string()) + })?; + if let Some(stderr) = child.stderr.take() { + tokio::spawn(async move { + let mut lines = BufReader::new(stderr).lines(); + loop { + match lines.next_line().await { + Ok(Some(line)) => debug!("exec-server stdio stderr: {line}"), + Ok(None) => break, + Err(err) => { + warn!("failed to read exec-server stdio stderr: {err}"); + break; + } + } + } + }); + } + + Self::connect( + JsonRpcConnection::from_stdio(stdout, stdin, "exec-server stdio command".to_string()) + .with_child_process(child), + args.into(), + ) + .await + } +} + +fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command { + let mut command = Command::new(&stdio_command.program); + command.args(&stdio_command.args); + command.envs(&stdio_command.env); + if let Some(cwd) = &stdio_command.cwd { + command.current_dir(cwd); + } + command +} diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 71f4f31059fc..f1e65e321aac 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -3,10 +3,12 @@ use futures::SinkExt; use futures::StreamExt; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::process::Child; use tokio::sync::mpsc; use tokio::sync::watch; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Message; +use tracing::debug; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; @@ -22,11 +24,55 @@ pub(crate) enum JsonRpcConnectionEvent { Disconnected { reason: Option }, } +pub(crate) enum JsonRpcTransport { + Plain, + Stdio { _transport: Box }, +} + +impl JsonRpcTransport { + fn from_child_process(child_process: Child) -> Self { + Self::Stdio { + _transport: Box::new(StdioTransport { + child_process: Some(child_process), + }), + } + } +} + +pub(crate) struct StdioTransport { + child_process: Option, +} + +impl Drop for StdioTransport { + fn drop(&mut self) { + let Some(mut child_process) = self.child_process.take() else { + return; + }; + + if let Err(err) = child_process.start_kill() { + debug!("failed to terminate exec-server stdio child: {err}"); + } + match tokio::runtime::Handle::try_current() { + Ok(handle) => { + handle.spawn(async move { + if let Err(err) = child_process.wait().await { + debug!("failed to wait for exec-server stdio child: {err}"); + } + }); + } + Err(err) => { + debug!("failed to wait for exec-server stdio child without a Tokio runtime: {err}"); + } + } + } +} + pub(crate) struct JsonRpcConnection { - outgoing_tx: mpsc::Sender, - incoming_rx: mpsc::Receiver, - disconnected_rx: watch::Receiver, - task_handles: Vec>, + pub(crate) outgoing_tx: mpsc::Sender, + pub(crate) incoming_rx: mpsc::Receiver, + pub(crate) disconnected_rx: watch::Receiver, + pub(crate) task_handles: Vec>, + pub(crate) transport: JsonRpcTransport, } impl JsonRpcConnection { @@ -117,6 +163,7 @@ impl JsonRpcConnection { incoming_rx, disconnected_rx, task_handles: vec![reader_task, writer_task], + transport: JsonRpcTransport::Plain, } } @@ -251,23 +298,13 @@ impl JsonRpcConnection { incoming_rx, disconnected_rx, task_handles: vec![reader_task, writer_task], + transport: JsonRpcTransport::Plain, } } - pub(crate) fn into_parts( - self, - ) -> ( - mpsc::Sender, - mpsc::Receiver, - watch::Receiver, - Vec>, - ) { - ( - self.outgoing_tx, - self.incoming_rx, - self.disconnected_rx, - self.task_handles, - ) + pub(crate) fn with_child_process(mut self, child_process: Child) -> Self { + self.transport = JsonRpcTransport::from_child_process(child_process); + self } } diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 855989dafbc2..53b2c95f17c3 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -7,9 +7,14 @@ use crate::ExecutorFileSystem; use crate::HttpClient; use crate::client::LazyRemoteExecServerClient; use crate::client::http_client::ReqwestHttpClient; +use crate::client_api::ExecServerTransportParams; use crate::environment_provider::DefaultEnvironmentProvider; +use crate::environment_provider::EnvironmentDefault; use crate::environment_provider::EnvironmentProvider; +use crate::environment_provider::EnvironmentProviderSnapshot; use crate::environment_provider::normalize_exec_server_url; +use crate::environment_resolver::EnvironmentResolver; +use crate::environment_toml::environment_provider_from_codex_home; use crate::local_file_system::LocalFileSystem; use crate::local_process::LocalProcess; use crate::process::ExecBackend; @@ -31,13 +36,14 @@ pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL"; /// shell/filesystem tool availability. /// /// Remote environments create remote filesystem and execution backends that -/// lazy-connect to the configured exec-server on first use. The websocket is -/// not opened when the manager or environment is constructed. +/// lazy-connect to the configured exec-server on first use. The remote +/// transport is not opened when the manager or environment is constructed. #[derive(Debug)] pub struct EnvironmentManager { default_environment: Option, environments: HashMap>, local_environment: Arc, + environment_resolver: Option>, } pub const LOCAL_ENVIRONMENT_ID: &str = "local"; @@ -46,14 +52,26 @@ pub const REMOTE_ENVIRONMENT_ID: &str = "remote"; #[derive(Clone, Debug)] pub struct EnvironmentManagerArgs { pub local_runtime_paths: ExecServerRuntimePaths, + /// Optional resolver supplied by embedding runtimes. It is stored only; + /// environment lookup remains strict until policy is wired explicitly. + environment_resolver: Option>, } impl EnvironmentManagerArgs { pub fn new(local_runtime_paths: ExecServerRuntimePaths) -> Self { Self { local_runtime_paths, + environment_resolver: None, } } + + pub fn with_environment_resolver( + mut self, + environment_resolver: Arc, + ) -> Self { + self.environment_resolver = Some(environment_resolver); + self + } } impl EnvironmentManager { @@ -66,14 +84,18 @@ impl EnvironmentManager { Arc::new(Environment::default_for_tests()), )]), local_environment: Arc::new(Environment::default_for_tests()), + environment_resolver: None, } } /// Builds a test-only manager with environment access disabled. pub fn disabled_for_tests(local_runtime_paths: ExecServerRuntimePaths) -> Self { - let mut manager = Self::from_environments(HashMap::new(), local_runtime_paths); - manager.default_environment = None; - manager + Self { + default_environment: None, + environments: HashMap::new(), + local_environment: Arc::new(Environment::local(local_runtime_paths)), + environment_resolver: None, + } } /// Builds a test-only manager from a raw exec-server URL value. @@ -81,7 +103,7 @@ impl EnvironmentManager { exec_server_url: Option, local_runtime_paths: ExecServerRuntimePaths, ) -> Self { - Self::from_default_provider_url(exec_server_url, local_runtime_paths).await + Self::from_default_provider_url(exec_server_url, local_runtime_paths, None).await } /// Builds a manager from `CODEX_EXEC_SERVER_URL` and local runtime paths @@ -89,25 +111,50 @@ impl EnvironmentManager { pub async fn new(args: EnvironmentManagerArgs) -> Self { let EnvironmentManagerArgs { local_runtime_paths, + environment_resolver, } = args; let exec_server_url = std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok(); - Self::from_default_provider_url(exec_server_url, local_runtime_paths).await + Self::from_default_provider_url(exec_server_url, local_runtime_paths, environment_resolver) + .await + } + + /// Builds a manager from `CODEX_HOME` and local runtime paths used when + /// creating local filesystem helpers. + /// + /// If `CODEX_HOME/environments.toml` is present, it defines the configured + /// environments. Otherwise this preserves the legacy + /// `CODEX_EXEC_SERVER_URL` behavior. Callers that ignore user config + /// should use [`Self::from_env`] instead. + pub async fn from_codex_home( + codex_home: impl AsRef, + local_runtime_paths: ExecServerRuntimePaths, + ) -> Result { + let provider = environment_provider_from_codex_home(codex_home.as_ref())?; + Self::from_provider(provider.as_ref(), local_runtime_paths).await + } + + /// Builds a manager from the legacy environment-variable provider without + /// reading user config files from `CODEX_HOME`. + pub async fn from_env( + local_runtime_paths: ExecServerRuntimePaths, + ) -> Result { + let provider = DefaultEnvironmentProvider::from_env(); + Self::from_provider(&provider, local_runtime_paths).await } async fn from_default_provider_url( exec_server_url: Option, local_runtime_paths: ExecServerRuntimePaths, + environment_resolver: Option>, ) -> Self { - let environment_disabled = normalize_exec_server_url(exec_server_url.clone()).1; let provider = DefaultEnvironmentProvider::new(exec_server_url); - let provider_environments = provider.environments(&local_runtime_paths); - let mut manager = Self::from_environments(provider_environments, local_runtime_paths); - if environment_disabled { - // TODO: Remove this legacy `CODEX_EXEC_SERVER_URL=none` crutch once - // environment attachment defaulting moves out of EnvironmentManager. - manager.default_environment = None; + match Self::from_provider(&provider, local_runtime_paths).await { + Ok(mut manager) => { + manager.environment_resolver = environment_resolver; + manager + } + Err(err) => panic!("default provider should create valid environments: {err}"), } - manager } /// Builds a manager from a provider-supplied startup snapshot. @@ -118,16 +165,20 @@ impl EnvironmentManager { where P: EnvironmentProvider + ?Sized, { - Self::from_provider_environments( - provider.get_environments(&local_runtime_paths).await?, + Self::from_provider_snapshot( + provider.snapshot(&local_runtime_paths).await?, local_runtime_paths, ) } - fn from_provider_environments( - environments: HashMap, + fn from_provider_snapshot( + snapshot: EnvironmentProviderSnapshot, local_runtime_paths: ExecServerRuntimePaths, ) -> Result { + let EnvironmentProviderSnapshot { + environments, + default, + } = snapshot; for id in environments.keys() { if id.is_empty() { return Err(ExecServerError::Protocol( @@ -136,21 +187,16 @@ impl EnvironmentManager { } } - Ok(Self::from_environments(environments, local_runtime_paths)) - } - - fn from_environments( - environments: HashMap, - local_runtime_paths: ExecServerRuntimePaths, - ) -> Self { - // TODO: Stop deriving a default environment here once omitted - // environment attachment is owned by thread/session setup. - let default_environment = if environments.contains_key(REMOTE_ENVIRONMENT_ID) { - Some(REMOTE_ENVIRONMENT_ID.to_string()) - } else if environments.contains_key(LOCAL_ENVIRONMENT_ID) { - Some(LOCAL_ENVIRONMENT_ID.to_string()) - } else { - None + let default_environment = match default { + EnvironmentDefault::Disabled => None, + EnvironmentDefault::EnvironmentId(environment_id) => { + if !environments.contains_key(&environment_id) { + return Err(ExecServerError::Protocol(format!( + "default environment `{environment_id}` is not configured" + ))); + } + Some(environment_id) + } }; let local_environment = Arc::new(Environment::local(local_runtime_paths)); let environments = environments @@ -158,11 +204,12 @@ impl EnvironmentManager { .map(|(id, environment)| (id, Arc::new(environment))) .collect(); - Self { + Ok(Self { default_environment, environments, local_environment, - } + environment_resolver: None, + }) } /// Returns the default environment instance. @@ -194,7 +241,7 @@ impl EnvironmentManager { /// paths used by filesystem helpers. #[derive(Clone)] pub struct Environment { - exec_server_url: Option, + remote_transport: Option, exec_backend: Arc, filesystem: Arc, http_client: Arc, @@ -205,7 +252,7 @@ impl Environment { /// Builds a test-only local environment without configured sandbox helper paths. pub fn default_for_tests() -> Self { Self { - exec_server_url: None, + remote_transport: None, exec_backend: Arc::new(LocalProcess::default()), filesystem: Arc::new(LocalFileSystem::unsandboxed()), http_client: Arc::new(ReqwestHttpClient), @@ -217,7 +264,7 @@ impl Environment { impl std::fmt::Debug for Environment { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Environment") - .field("exec_server_url", &self.exec_server_url) + .field("exec_server_url", &self.exec_server_url()) .finish_non_exhaustive() } } @@ -260,7 +307,7 @@ impl Environment { pub(crate) fn local(local_runtime_paths: ExecServerRuntimePaths) -> Self { Self { - exec_server_url: None, + remote_transport: None, exec_backend: Arc::new(LocalProcess::default()), filesystem: Arc::new(LocalFileSystem::with_runtime_paths( local_runtime_paths.clone(), @@ -274,13 +321,23 @@ impl Environment { exec_server_url: String, local_runtime_paths: Option, ) -> Self { - let client = LazyRemoteExecServerClient::new(exec_server_url.clone()); + Self::remote_with_transport( + ExecServerTransportParams::WebSocketUrl(exec_server_url), + local_runtime_paths, + ) + } + + pub(crate) fn remote_with_transport( + transport_params: ExecServerTransportParams, + local_runtime_paths: Option, + ) -> Self { + let client = LazyRemoteExecServerClient::new(transport_params.clone()); let exec_backend: Arc = Arc::new(RemoteProcess::new(client.clone())); let filesystem: Arc = Arc::new(RemoteFileSystem::new(client.clone())); Self { - exec_server_url: Some(exec_server_url), + remote_transport: Some(transport_params), exec_backend, filesystem, http_client: Arc::new(client), @@ -289,12 +346,15 @@ impl Environment { } pub fn is_remote(&self) -> bool { - self.exec_server_url.is_some() + self.remote_transport.is_some() } /// Returns the remote exec-server URL when this environment is remote. - pub fn exec_server_url(&self) -> Option<&str> { - self.exec_server_url.as_deref() + pub(crate) fn exec_server_url(&self) -> Option<&str> { + match self.remote_transport.as_ref() { + Some(ExecServerTransportParams::WebSocketUrl(url)) => Some(url.as_str()), + Some(ExecServerTransportParams::StdioCommand(_)) | None => None, + } } pub fn local_runtime_paths(&self) -> Option<&ExecServerRuntimePaths> { @@ -323,10 +383,28 @@ mod tests { use super::EnvironmentManager; use super::LOCAL_ENVIRONMENT_ID; use super::REMOTE_ENVIRONMENT_ID; + use crate::EnvironmentProvider; + use crate::ExecServerError; use crate::ExecServerRuntimePaths; use crate::ProcessId; + use crate::environment_provider::EnvironmentDefault; + use crate::environment_provider::EnvironmentProviderSnapshot; use pretty_assertions::assert_eq; + struct TestEnvironmentProvider { + snapshot: EnvironmentProviderSnapshot, + } + + #[async_trait::async_trait] + impl EnvironmentProvider for TestEnvironmentProvider { + async fn snapshot( + &self, + _local_runtime_paths: &ExecServerRuntimePaths, + ) -> Result { + Ok(self.snapshot.clone()) + } + } + fn test_runtime_paths() -> ExecServerRuntimePaths { ExecServerRuntimePaths::new( std::env::current_exe().expect("current exe"), @@ -417,15 +495,20 @@ mod tests { } #[tokio::test] - async fn environment_manager_builds_from_provider_environments() { - let manager = EnvironmentManager::from_environments( - HashMap::from([( - REMOTE_ENVIRONMENT_ID.to_string(), - Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) - .expect("remote environment"), - )]), - test_runtime_paths(), - ); + async fn environment_manager_builds_from_provider() { + let provider = TestEnvironmentProvider { + snapshot: EnvironmentProviderSnapshot { + environments: HashMap::from([( + REMOTE_ENVIRONMENT_ID.to_string(), + Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) + .expect("remote environment"), + )]), + default: EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()), + }, + }; + let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths()) + .await + .expect("environment manager"); assert_eq!( manager.default_environment_id(), @@ -443,11 +526,15 @@ mod tests { #[tokio::test] async fn environment_manager_rejects_empty_environment_id() { - let err = EnvironmentManager::from_provider_environments( - HashMap::from([("".to_string(), Environment::default_for_tests())]), - test_runtime_paths(), - ) - .expect_err("empty id should fail"); + let provider = TestEnvironmentProvider { + snapshot: EnvironmentProviderSnapshot { + environments: HashMap::from([("".to_string(), Environment::default_for_tests())]), + default: EnvironmentDefault::Disabled, + }, + }; + let err = EnvironmentManager::from_provider(&provider, test_runtime_paths()) + .await + .expect_err("empty id should fail"); assert_eq!( err.to_string(), @@ -455,6 +542,73 @@ mod tests { ); } + #[tokio::test] + async fn environment_manager_uses_explicit_provider_default() { + let provider = TestEnvironmentProvider { + snapshot: EnvironmentProviderSnapshot { + environments: HashMap::from([ + ( + LOCAL_ENVIRONMENT_ID.to_string(), + Environment::default_for_tests(), + ), + ( + "devbox".to_string(), + Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) + .expect("remote environment"), + ), + ]), + default: EnvironmentDefault::EnvironmentId("devbox".to_string()), + }, + }; + let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths()) + .await + .expect("manager"); + + assert_eq!(manager.default_environment_id(), Some("devbox")); + assert!(manager.default_environment().expect("default").is_remote()); + } + + #[tokio::test] + async fn environment_manager_disables_provider_default() { + let provider = TestEnvironmentProvider { + snapshot: EnvironmentProviderSnapshot { + environments: HashMap::from([( + LOCAL_ENVIRONMENT_ID.to_string(), + Environment::default_for_tests(), + )]), + default: EnvironmentDefault::Disabled, + }, + }; + let manager = EnvironmentManager::from_provider(&provider, test_runtime_paths()) + .await + .expect("manager"); + + assert_eq!(manager.default_environment_id(), None); + assert!(manager.default_environment().is_none()); + assert!(manager.get_environment(LOCAL_ENVIRONMENT_ID).is_some()); + } + + #[tokio::test] + async fn environment_manager_rejects_unknown_provider_default() { + let provider = TestEnvironmentProvider { + snapshot: EnvironmentProviderSnapshot { + environments: HashMap::from([( + LOCAL_ENVIRONMENT_ID.to_string(), + Environment::default_for_tests(), + )]), + default: EnvironmentDefault::EnvironmentId("missing".to_string()), + }, + }; + let err = EnvironmentManager::from_provider(&provider, test_runtime_paths()) + .await + .expect_err("unknown default should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: default environment `missing` is not configured" + ); + } + #[tokio::test] async fn environment_manager_uses_provider_supplied_local_environment() { let manager = EnvironmentManager::create_for_tests( diff --git a/codex-rs/exec-server/src/environment_provider.rs b/codex-rs/exec-server/src/environment_provider.rs index 7c8db07e85e5..0e4bcc519162 100644 --- a/codex-rs/exec-server/src/environment_provider.rs +++ b/codex-rs/exec-server/src/environment_provider.rs @@ -11,16 +11,29 @@ use crate::environment::REMOTE_ENVIRONMENT_ID; /// Lists the concrete environments available to Codex. /// -/// Implementations should return the provider-owned startup snapshot that -/// `EnvironmentManager` will cache. Providers that want the local environment to -/// be addressable by id should include it explicitly in the returned map. +/// Implementations own a startup snapshot containing both the available +/// environment list and default environment selection. Providers that want the +/// local environment to be addressable by id should include it explicitly in +/// the returned map. #[async_trait] pub trait EnvironmentProvider: Send + Sync { - /// Returns the environments available for a new manager. - async fn get_environments( + /// Returns the provider-owned environment startup snapshot. + async fn snapshot( &self, local_runtime_paths: &ExecServerRuntimePaths, - ) -> Result, ExecServerError>; + ) -> Result; +} + +#[derive(Clone, Debug)] +pub struct EnvironmentProviderSnapshot { + pub environments: HashMap, + pub default: EnvironmentDefault, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum EnvironmentDefault { + Disabled, + EnvironmentId(String), } /// Default provider backed by `CODEX_EXEC_SERVER_URL`. @@ -40,15 +53,15 @@ impl DefaultEnvironmentProvider { Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok()) } - pub(crate) fn environments( + pub(crate) fn snapshot_inner( &self, local_runtime_paths: &ExecServerRuntimePaths, - ) -> HashMap { + ) -> EnvironmentProviderSnapshot { let mut environments = HashMap::from([( LOCAL_ENVIRONMENT_ID.to_string(), Environment::local(local_runtime_paths.clone()), )]); - let exec_server_url = normalize_exec_server_url(self.exec_server_url.clone()).0; + let (exec_server_url, disabled) = normalize_exec_server_url(self.exec_server_url.clone()); if let Some(exec_server_url) = exec_server_url { environments.insert( @@ -57,17 +70,28 @@ impl DefaultEnvironmentProvider { ); } - environments + let default = if disabled { + EnvironmentDefault::Disabled + } else if environments.contains_key(REMOTE_ENVIRONMENT_ID) { + EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()) + } else { + EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()) + }; + + EnvironmentProviderSnapshot { + environments, + default, + } } } #[async_trait] impl EnvironmentProvider for DefaultEnvironmentProvider { - async fn get_environments( + async fn snapshot( &self, local_runtime_paths: &ExecServerRuntimePaths, - ) -> Result, ExecServerError> { - Ok(self.environments(local_runtime_paths)) + ) -> Result { + Ok(self.snapshot_inner(local_runtime_paths)) } } @@ -98,10 +122,11 @@ mod tests { async fn default_provider_returns_local_environment_when_url_is_missing() { let provider = DefaultEnvironmentProvider::new(/*exec_server_url*/ None); let runtime_paths = test_runtime_paths(); - let environments = provider - .get_environments(&runtime_paths) + let snapshot = provider + .snapshot(&runtime_paths) .await .expect("environments"); + let environments = snapshot.environments; assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote()); assert_eq!( @@ -109,42 +134,54 @@ mod tests { Some(&runtime_paths) ); assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID)); + assert_eq!( + snapshot.default, + EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()) + ); } #[tokio::test] async fn default_provider_returns_local_environment_when_url_is_empty() { let provider = DefaultEnvironmentProvider::new(Some(String::new())); let runtime_paths = test_runtime_paths(); - let environments = provider - .get_environments(&runtime_paths) + let snapshot = provider + .snapshot(&runtime_paths) .await .expect("environments"); + let environments = snapshot.environments; assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote()); assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID)); + assert_eq!( + snapshot.default, + EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()) + ); } #[tokio::test] async fn default_provider_returns_local_environment_for_none_value() { let provider = DefaultEnvironmentProvider::new(Some("none".to_string())); let runtime_paths = test_runtime_paths(); - let environments = provider - .get_environments(&runtime_paths) + let snapshot = provider + .snapshot(&runtime_paths) .await .expect("environments"); + let environments = snapshot.environments; assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote()); assert!(!environments.contains_key(REMOTE_ENVIRONMENT_ID)); + assert_eq!(snapshot.default, EnvironmentDefault::Disabled); } #[tokio::test] async fn default_provider_adds_remote_environment_for_websocket_url() { let provider = DefaultEnvironmentProvider::new(Some("ws://127.0.0.1:8765".to_string())); let runtime_paths = test_runtime_paths(); - let environments = provider - .get_environments(&runtime_paths) + let snapshot = provider + .snapshot(&runtime_paths) .await .expect("environments"); + let environments = snapshot.environments; assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote()); let remote_environment = &environments[REMOTE_ENVIRONMENT_ID]; @@ -153,6 +190,10 @@ mod tests { remote_environment.exec_server_url(), Some("ws://127.0.0.1:8765") ); + assert_eq!( + snapshot.default, + EnvironmentDefault::EnvironmentId(REMOTE_ENVIRONMENT_ID.to_string()) + ); } #[tokio::test] @@ -160,12 +201,12 @@ mod tests { let provider = DefaultEnvironmentProvider::new(Some(" ws://127.0.0.1:8765 ".to_string())); let runtime_paths = test_runtime_paths(); let environments = provider - .get_environments(&runtime_paths) + .snapshot(&runtime_paths) .await .expect("environments"); assert_eq!( - environments[REMOTE_ENVIRONMENT_ID].exec_server_url(), + environments.environments[REMOTE_ENVIRONMENT_ID].exec_server_url(), Some("ws://127.0.0.1:8765") ); } diff --git a/codex-rs/exec-server/src/environment_resolver.rs b/codex-rs/exec-server/src/environment_resolver.rs new file mode 100644 index 000000000000..5efa203c3b75 --- /dev/null +++ b/codex-rs/exec-server/src/environment_resolver.rs @@ -0,0 +1,21 @@ +use std::fmt::Debug; + +use async_trait::async_trait; + +use crate::Environment; +use crate::ExecServerError; +use crate::ExecServerRuntimePaths; + +/// Resolves environment ids that are not already present in the manager snapshot. +/// +/// This is an optional extension point for embedders. `EnvironmentManager` +/// stores the resolver, but `get_environment` remains a strict snapshot lookup +/// until resolution policy is wired explicitly. +#[async_trait] +pub trait EnvironmentResolver: Send + Sync + Debug { + async fn resolve_environment( + &self, + environment_id: &str, + local_runtime_paths: &ExecServerRuntimePaths, + ) -> Result, ExecServerError>; +} diff --git a/codex-rs/exec-server/src/environment_toml.rs b/codex-rs/exec-server/src/environment_toml.rs new file mode 100644 index 000000000000..99808d7896cc --- /dev/null +++ b/codex-rs/exec-server/src/environment_toml.rs @@ -0,0 +1,708 @@ +use std::collections::HashMap; +use std::collections::HashSet; +use std::path::Path; +use std::path::PathBuf; + +use async_trait::async_trait; +use serde::Deserialize; +use tokio_tungstenite::tungstenite::client::IntoClientRequest; + +use crate::DefaultEnvironmentProvider; +use crate::Environment; +use crate::EnvironmentProvider; +use crate::ExecServerError; +use crate::ExecServerRuntimePaths; +use crate::client_api::ExecServerTransportParams; +use crate::client_api::StdioExecServerCommand; +use crate::environment::LOCAL_ENVIRONMENT_ID; +use crate::environment_provider::EnvironmentDefault; +use crate::environment_provider::EnvironmentProviderSnapshot; + +const ENVIRONMENTS_TOML_FILE: &str = "environments.toml"; +const MAX_ENVIRONMENT_ID_LEN: usize = 64; + +#[derive(Deserialize, Debug, Default)] +#[serde(deny_unknown_fields)] +struct EnvironmentsToml { + default: Option, + + #[serde(default)] + environments: Vec, +} + +#[derive(Deserialize, Debug, Default, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +struct EnvironmentToml { + id: String, + url: Option, + program: Option, + args: Option>, + env: Option>, + cwd: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +struct TomlEnvironmentProvider { + default: EnvironmentDefault, + environments: HashMap, +} + +impl TomlEnvironmentProvider { + #[cfg(test)] + fn new(config: EnvironmentsToml) -> Result { + Self::new_with_config_dir(config, /*config_dir*/ None) + } + + fn new_with_config_dir( + config: EnvironmentsToml, + config_dir: Option<&Path>, + ) -> Result { + let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]); + let mut environments = HashMap::with_capacity(config.environments.len()); + for item in config.environments { + let (id, transport) = parse_environment_toml(item, config_dir)?; + if !ids.insert(id.clone()) { + return Err(ExecServerError::Protocol(format!( + "environment id `{id}` is duplicated" + ))); + } + environments.insert(id, transport); + } + let default = normalize_default_environment_id(config.default.as_deref(), &ids)?; + Ok(Self { + default, + environments, + }) + } +} + +#[async_trait] +impl EnvironmentProvider for TomlEnvironmentProvider { + async fn snapshot( + &self, + local_runtime_paths: &ExecServerRuntimePaths, + ) -> Result { + let mut environments = HashMap::from([( + LOCAL_ENVIRONMENT_ID.to_string(), + Environment::local(local_runtime_paths.clone()), + )]); + + for (id, transport_params) in &self.environments { + environments.insert( + id.clone(), + Environment::remote_with_transport( + transport_params.clone(), + Some(local_runtime_paths.clone()), + ), + ); + } + + Ok(EnvironmentProviderSnapshot { + environments, + default: self.default.clone(), + }) + } +} + +fn parse_environment_toml( + item: EnvironmentToml, + config_dir: Option<&Path>, +) -> Result<(String, ExecServerTransportParams), ExecServerError> { + let EnvironmentToml { + id, + url, + program, + args, + env, + cwd, + } = item; + validate_environment_id(&id)?; + if program.is_none() && (args.is_some() || env.is_some() || cwd.is_some()) { + return Err(ExecServerError::Protocol(format!( + "environment `{id}` args, env, and cwd require program" + ))); + } + + let transport_params = match (url, program) { + (Some(url), None) => { + let url = validate_websocket_url(url)?; + ExecServerTransportParams::WebSocketUrl(url) + } + (None, Some(program)) => { + let program = program.trim().to_string(); + if program.is_empty() { + return Err(ExecServerError::Protocol(format!( + "environment `{id}` program cannot be empty" + ))); + } + let cwd = normalize_stdio_cwd(&id, cwd, config_dir)?; + ExecServerTransportParams::StdioCommand(StdioExecServerCommand { + program, + args: args.unwrap_or_default(), + env: env.unwrap_or_default(), + cwd, + }) + } + (None, None) | (Some(_), Some(_)) => { + return Err(ExecServerError::Protocol(format!( + "environment `{id}` must set exactly one of url or program" + ))); + } + }; + + Ok((id, transport_params)) +} + +fn normalize_stdio_cwd( + id: &str, + cwd: Option, + config_dir: Option<&Path>, +) -> Result, ExecServerError> { + let Some(cwd) = cwd else { + return Ok(None); + }; + if cwd.is_absolute() { + return Ok(Some(cwd)); + } + let Some(config_dir) = config_dir else { + return Err(ExecServerError::Protocol(format!( + "environment `{id}` cwd must be absolute" + ))); + }; + Ok(Some(config_dir.join(cwd))) +} + +pub(crate) fn environment_provider_from_codex_home( + codex_home: &Path, +) -> Result, ExecServerError> { + let path = codex_home.join(ENVIRONMENTS_TOML_FILE); + if !path.try_exists().map_err(|err| { + ExecServerError::Protocol(format!( + "failed to inspect environment config `{}`: {err}", + path.display() + )) + })? { + return Ok(Box::new(DefaultEnvironmentProvider::from_env())); + } + + let environments = load_environments_toml(&path)?; + Ok(Box::new(TomlEnvironmentProvider::new_with_config_dir( + environments, + Some(codex_home), + )?)) +} + +fn normalize_default_environment_id( + default: Option<&str>, + ids: &HashSet, +) -> Result { + let Some(default) = default.map(str::trim) else { + return Ok(EnvironmentDefault::EnvironmentId( + LOCAL_ENVIRONMENT_ID.to_string(), + )); + }; + if default.is_empty() { + return Err(ExecServerError::Protocol( + "default environment id cannot be empty".to_string(), + )); + } + if !default.eq_ignore_ascii_case("none") && !ids.contains(default) { + return Err(ExecServerError::Protocol(format!( + "default environment `{default}` is not configured" + ))); + } + if default.eq_ignore_ascii_case("none") { + Ok(EnvironmentDefault::Disabled) + } else { + Ok(EnvironmentDefault::EnvironmentId(default.to_string())) + } +} + +fn validate_environment_id(id: &str) -> Result<(), ExecServerError> { + let trimmed_id = id.trim(); + if trimmed_id.is_empty() { + return Err(ExecServerError::Protocol( + "environment id cannot be empty".to_string(), + )); + } + if trimmed_id != id { + return Err(ExecServerError::Protocol(format!( + "environment id `{id}` must not contain surrounding whitespace" + ))); + } + if id == LOCAL_ENVIRONMENT_ID || id.eq_ignore_ascii_case("none") { + return Err(ExecServerError::Protocol(format!( + "environment id `{id}` is reserved" + ))); + } + if id.len() > MAX_ENVIRONMENT_ID_LEN { + return Err(ExecServerError::Protocol(format!( + "environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters" + ))); + } + if !id + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') + { + return Err(ExecServerError::Protocol(format!( + "environment id `{id}` must contain only ASCII letters, numbers, '-' or '_'" + ))); + } + Ok(()) +} + +fn validate_websocket_url(url: String) -> Result { + let url = url.trim(); + if url.is_empty() { + return Err(ExecServerError::Protocol( + "environment url cannot be empty".to_string(), + )); + } + if !url.starts_with("ws://") && !url.starts_with("wss://") { + return Err(ExecServerError::Protocol(format!( + "environment url `{url}` must use ws:// or wss://" + ))); + } + url.into_client_request().map_err(|err| { + ExecServerError::Protocol(format!("environment url `{url}` is invalid: {err}")) + })?; + Ok(url.to_string()) +} + +fn load_environments_toml(path: &Path) -> Result { + let contents = std::fs::read_to_string(path).map_err(|err| { + ExecServerError::Protocol(format!( + "failed to read environment config `{}`: {err}", + path.display() + )) + })?; + + toml::from_str(&contents).map_err(|err| { + ExecServerError::Protocol(format!( + "failed to parse environment config `{}`: {err}", + path.display() + )) + }) +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use tempfile::tempdir; + + use super::*; + + fn test_runtime_paths() -> ExecServerRuntimePaths { + ExecServerRuntimePaths::new( + std::env::current_exe().expect("current exe"), + /*codex_linux_sandbox_exe*/ None, + ) + .expect("runtime paths") + } + + #[tokio::test] + async fn toml_provider_adds_implicit_local_and_configured_environments() { + let ssh_transport = ExecServerTransportParams::StdioCommand(StdioExecServerCommand { + program: "ssh".to_string(), + args: vec![ + "dev".to_string(), + "codex exec-server --listen stdio".to_string(), + ], + env: HashMap::from([("CODEX_LOG".to_string(), "debug".to_string())]), + cwd: None, + }); + let provider = TomlEnvironmentProvider::new(EnvironmentsToml { + default: Some("ssh-dev".to_string()), + environments: vec![ + EnvironmentToml { + id: "devbox".to_string(), + url: Some(" ws://127.0.0.1:8765 ".to_string()), + ..Default::default() + }, + EnvironmentToml { + id: "ssh-dev".to_string(), + program: Some(" ssh ".to_string()), + args: Some(vec![ + "dev".to_string(), + "codex exec-server --listen stdio".to_string(), + ]), + env: Some(HashMap::from([( + "CODEX_LOG".to_string(), + "debug".to_string(), + )])), + ..Default::default() + }, + ], + }) + .expect("provider"); + let runtime_paths = test_runtime_paths(); + + let snapshot = provider + .snapshot(&runtime_paths) + .await + .expect("environments"); + let EnvironmentProviderSnapshot { + environments, + default, + } = snapshot; + + assert!(!environments[LOCAL_ENVIRONMENT_ID].is_remote()); + assert_eq!( + environments["devbox"].exec_server_url(), + Some("ws://127.0.0.1:8765") + ); + assert_eq!(provider.environments["ssh-dev"], ssh_transport); + assert!(environments["ssh-dev"].is_remote()); + assert_eq!(environments["ssh-dev"].exec_server_url(), None); + assert_eq!( + default, + EnvironmentDefault::EnvironmentId("ssh-dev".to_string()) + ); + } + + #[tokio::test] + async fn toml_provider_default_omitted_selects_local() { + let provider = TomlEnvironmentProvider::new(EnvironmentsToml::default()).expect("provider"); + let snapshot = provider + .snapshot(&test_runtime_paths()) + .await + .expect("environments"); + + assert_eq!( + snapshot.default, + EnvironmentDefault::EnvironmentId(LOCAL_ENVIRONMENT_ID.to_string()) + ); + } + + #[tokio::test] + async fn toml_provider_default_none_disables_default() { + let provider = TomlEnvironmentProvider::new(EnvironmentsToml { + default: Some("none".to_string()), + environments: Vec::new(), + }) + .expect("provider"); + let snapshot = provider + .snapshot(&test_runtime_paths()) + .await + .expect("environments"); + + assert_eq!(snapshot.default, EnvironmentDefault::Disabled); + } + + #[test] + fn toml_provider_rejects_invalid_environments() { + let cases = [ + ( + EnvironmentToml { + id: "local".to_string(), + url: Some("ws://127.0.0.1:8765".to_string()), + ..Default::default() + }, + "environment id `local` is reserved", + ), + ( + EnvironmentToml { + id: " devbox ".to_string(), + url: Some("ws://127.0.0.1:8765".to_string()), + ..Default::default() + }, + "environment id ` devbox ` must not contain surrounding whitespace", + ), + ( + EnvironmentToml { + id: "dev box".to_string(), + url: Some("ws://127.0.0.1:8765".to_string()), + ..Default::default() + }, + "environment id `dev box` must contain only ASCII letters, numbers, '-' or '_'", + ), + ( + EnvironmentToml { + id: "devbox".to_string(), + url: Some("http://127.0.0.1:8765".to_string()), + ..Default::default() + }, + "environment url `http://127.0.0.1:8765` must use ws:// or wss://", + ), + ( + EnvironmentToml { + id: "devbox".to_string(), + url: Some("ws://127.0.0.1:8765".to_string()), + program: Some("codex".to_string()), + ..Default::default() + }, + "environment `devbox` must set exactly one of url or program", + ), + ( + EnvironmentToml { + id: "devbox".to_string(), + program: Some(" ".to_string()), + ..Default::default() + }, + "environment `devbox` program cannot be empty", + ), + ( + EnvironmentToml { + id: "devbox".to_string(), + args: Some(Vec::new()), + ..Default::default() + }, + "environment `devbox` args, env, and cwd require program", + ), + ]; + + for (item, expected) in cases { + let err = TomlEnvironmentProvider::new(EnvironmentsToml { + default: None, + environments: vec![item], + }) + .expect_err("invalid item should fail"); + + assert_eq!( + err.to_string(), + format!("exec-server protocol error: {expected}") + ); + } + } + + #[test] + fn toml_provider_resolves_relative_stdio_cwd_from_config_dir() { + let config_dir = tempdir().expect("tempdir"); + let provider = TomlEnvironmentProvider::new_with_config_dir( + EnvironmentsToml { + default: None, + environments: vec![EnvironmentToml { + id: "ssh-dev".to_string(), + program: Some("ssh".to_string()), + cwd: Some(PathBuf::from("workspace")), + ..Default::default() + }], + }, + Some(config_dir.path()), + ) + .expect("provider"); + + assert_eq!( + provider.environments["ssh-dev"], + ExecServerTransportParams::StdioCommand(StdioExecServerCommand { + program: "ssh".to_string(), + args: Vec::new(), + env: HashMap::new(), + cwd: Some(config_dir.path().join("workspace")), + }) + ); + } + + #[test] + fn toml_provider_rejects_relative_stdio_cwd_without_config_dir() { + let err = TomlEnvironmentProvider::new(EnvironmentsToml { + default: None, + environments: vec![EnvironmentToml { + id: "ssh-dev".to_string(), + program: Some("ssh".to_string()), + cwd: Some(PathBuf::from("workspace")), + ..Default::default() + }], + }) + .expect_err("relative cwd without config dir should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: environment `ssh-dev` cwd must be absolute" + ); + } + + #[test] + fn toml_provider_rejects_duplicate_ids() { + let err = TomlEnvironmentProvider::new(EnvironmentsToml { + default: None, + environments: vec![ + EnvironmentToml { + id: "devbox".to_string(), + url: Some("ws://127.0.0.1:8765".to_string()), + ..Default::default() + }, + EnvironmentToml { + id: "devbox".to_string(), + program: Some("codex".to_string()), + ..Default::default() + }, + ], + }) + .expect_err("duplicate id should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: environment id `devbox` is duplicated" + ); + } + + #[test] + fn toml_provider_rejects_overlong_id() { + let id = "a".repeat(MAX_ENVIRONMENT_ID_LEN + 1); + let err = TomlEnvironmentProvider::new(EnvironmentsToml { + default: None, + environments: vec![EnvironmentToml { + id: id.clone(), + url: Some("ws://127.0.0.1:8765".to_string()), + ..Default::default() + }], + }) + .expect_err("overlong id should fail"); + + assert_eq!( + err.to_string(), + format!( + "exec-server protocol error: environment id `{id}` cannot be longer than {MAX_ENVIRONMENT_ID_LEN} characters" + ) + ); + } + + #[test] + fn toml_provider_rejects_unknown_default() { + let err = TomlEnvironmentProvider::new(EnvironmentsToml { + default: Some("missing".to_string()), + environments: Vec::new(), + }) + .expect_err("unknown default should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: default environment `missing` is not configured" + ); + } + + #[test] + fn load_environments_toml_reads_root_environment_list() { + let codex_home = tempdir().expect("tempdir"); + let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE); + std::fs::write( + &path, + r#" +default = "ssh-dev" + +[[environments]] +id = "devbox" +url = "ws://127.0.0.1:4512" + +[[environments]] +id = "ssh-dev" +program = "ssh" +args = ["dev", "codex exec-server --listen stdio"] +cwd = "/tmp" +[environments.env] +CODEX_LOG = "debug" +"#, + ) + .expect("write environments.toml"); + + let environments = load_environments_toml(&path).expect("environments.toml"); + + assert_eq!(environments.default.as_deref(), Some("ssh-dev")); + assert_eq!(environments.environments.len(), 2); + assert_eq!(environments.environments[0].id, "devbox"); + assert_eq!( + environments.environments[1], + EnvironmentToml { + id: "ssh-dev".to_string(), + program: Some("ssh".to_string()), + args: Some(vec![ + "dev".to_string(), + "codex exec-server --listen stdio".to_string(), + ]), + env: Some(HashMap::from([( + "CODEX_LOG".to_string(), + "debug".to_string(), + )])), + cwd: Some(PathBuf::from("/tmp")), + ..Default::default() + } + ); + } + + #[test] + fn load_environments_toml_rejects_unknown_fields() { + let codex_home = tempdir().expect("tempdir"); + let cases = [ + ("unknown = true\n", "unknown field `unknown`"), + ( + r#" +[[environments]] +id = "devbox" +url = "ws://127.0.0.1:4512" +unknown = true +"#, + "unknown field `unknown`", + ), + ]; + + for (index, (contents, expected)) in cases.into_iter().enumerate() { + let path = codex_home.path().join(format!("environments-{index}.toml")); + std::fs::write(&path, contents).expect("write environments.toml"); + + let err = load_environments_toml(&path).expect_err("unknown field should fail"); + + assert!( + err.to_string().contains(expected), + "expected `{err}` to contain `{expected}`" + ); + } + } + + #[test] + fn toml_provider_rejects_malformed_websocket_url() { + let err = TomlEnvironmentProvider::new(EnvironmentsToml { + default: None, + environments: vec![EnvironmentToml { + id: "devbox".to_string(), + url: Some("ws://".to_string()), + ..Default::default() + }], + }) + .expect_err("malformed websocket url should fail"); + + assert!( + err.to_string() + .contains("environment url `ws://` is invalid"), + "expected malformed URL error, got `{err}`" + ); + } + + #[tokio::test] + async fn environment_provider_from_codex_home_uses_present_environments_file() { + let codex_home = tempdir().expect("tempdir"); + std::fs::write( + codex_home.path().join(ENVIRONMENTS_TOML_FILE), + r#" +default = "none" +"#, + ) + .expect("write environments.toml"); + + let provider = + environment_provider_from_codex_home(codex_home.path()).expect("environment provider"); + + let snapshot = provider + .snapshot(&test_runtime_paths()) + .await + .expect("environments"); + + assert!(snapshot.environments.contains_key(LOCAL_ENVIRONMENT_ID)); + assert_eq!(snapshot.default, EnvironmentDefault::Disabled); + } + + #[tokio::test] + async fn environment_provider_from_codex_home_falls_back_when_file_is_missing() { + let codex_home = tempdir().expect("tempdir"); + + let provider = + environment_provider_from_codex_home(codex_home.path()).expect("environment provider"); + + let snapshot = provider + .snapshot(&test_runtime_paths()) + .await + .expect("environments"); + + assert!(snapshot.environments.contains_key(LOCAL_ENVIRONMENT_ID)); + } +} diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index d860d59aba98..4bdb1cb5101c 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -1,8 +1,11 @@ mod client; mod client_api; +mod client_transport; mod connection; mod environment; mod environment_provider; +mod environment_resolver; +mod environment_toml; mod fs_helper; mod fs_helper_main; mod fs_sandbox; @@ -42,6 +45,7 @@ pub use environment::LOCAL_ENVIRONMENT_ID; pub use environment::REMOTE_ENVIRONMENT_ID; pub use environment_provider::DefaultEnvironmentProvider; pub use environment_provider::EnvironmentProvider; +pub use environment_resolver::EnvironmentResolver; pub use fs_helper::CODEX_FS_HELPER_ARG1; pub use fs_helper_main::main as run_fs_helper_main; pub use local_file_system::LOCAL_FS; diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index 723b99f5028d..9ea41f3854c9 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -23,6 +23,7 @@ use tokio::task::JoinHandle; use crate::connection::JsonRpcConnection; use crate::connection::JsonRpcConnectionEvent; +use crate::connection::JsonRpcTransport; #[derive(Debug)] pub(crate) enum RpcCallError { @@ -58,11 +59,9 @@ pub(crate) enum RpcServerOutboundMessage { request_id: RequestId, error: JSONRPCErrorError, }, - #[allow(dead_code)] Notification(JSONRPCNotification), } -#[allow(dead_code)] #[derive(Clone)] pub(crate) struct RpcNotificationSender { outgoing_tx: mpsc::Sender, @@ -84,7 +83,6 @@ impl RpcNotificationSender { .map_err(|_| internal_error("RPC connection closed while sending response".into())) } - #[allow(dead_code)] pub(crate) async fn notify( &self, method: &str, @@ -229,12 +227,19 @@ pub(crate) struct RpcClient { disconnected_rx: watch::Receiver, next_request_id: AtomicI64, transport_tasks: Vec>, + _transport: JsonRpcTransport, reader_task: JoinHandle<()>, } impl RpcClient { pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver) { - let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks) = connection.into_parts(); + let JsonRpcConnection { + outgoing_tx: write_tx, + mut incoming_rx, + disconnected_rx, + task_handles: transport_tasks, + transport, + } = connection; let pending = Arc::new(Mutex::new(HashMap::::new())); let (event_tx, event_rx) = mpsc::channel(128); @@ -275,6 +280,7 @@ impl RpcClient { disconnected_rx, next_request_id: AtomicI64::new(1), transport_tasks, + _transport: transport, reader_task, }, event_rx, @@ -357,7 +363,6 @@ impl RpcClient { } #[cfg(test)] - #[allow(dead_code)] pub(crate) async fn pending_request_count(&self) -> usize { self.pending.lock().await.len() } @@ -565,11 +570,9 @@ mod tests { async fn rpc_client_matches_out_of_order_responses_by_request_id() { let (client_stdin, server_reader) = tokio::io::duplex(4096); let (mut server_writer, client_stdout) = tokio::io::duplex(4096); - let (client, _events_rx) = RpcClient::new(JsonRpcConnection::from_stdio( - client_stdout, - client_stdin, - "test-rpc".to_string(), - )); + let connection = + JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string()); + let (client, _events_rx) = RpcClient::new(connection); let server = tokio::spawn(async move { let mut lines = BufReader::new(server_reader).lines(); diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index dc1a9b9ffe74..6fc0723f0c1e 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -47,8 +47,13 @@ async fn run_connection( runtime_paths: ExecServerRuntimePaths, ) { let router = Arc::new(build_router()); - let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) = - connection.into_parts(); + let JsonRpcConnection { + outgoing_tx: json_outgoing_tx, + mut incoming_rx, + mut disconnected_rx, + task_handles: connection_tasks, + transport: _transport, + } = connection; let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); let notifications = RpcNotificationSender::new(outgoing_tx.clone()); diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index e06ccfd118b9..06ad0a61a7ce 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -5753,7 +5753,6 @@ session_picker_view = "dense" text: String::from("1. Do the thing"), }, ], - items_view: codex_app_server_protocol::TurnItemsView::Full, status: codex_app_server_protocol::TurnStatus::Completed, error: None, started_at: None, @@ -5805,7 +5804,6 @@ session_picker_view = "dense" summary: Vec::new(), content: vec![String::from("private raw chain of thought")], }], - items_view: codex_app_server_protocol::TurnItemsView::Full, status: codex_app_server_protocol::TurnStatus::Completed, error: None, started_at: None, @@ -5861,7 +5859,6 @@ session_picker_view = "dense" summary: vec![String::from("public summary")], content: vec![String::from("raw reasoning content")], }], - items_view: codex_app_server_protocol::TurnItemsView::Full, status: codex_app_server_protocol::TurnStatus::Completed, error: None, started_at: None,