diff --git a/codex-rs/app-server-client/src/lib.rs b/codex-rs/app-server-client/src/lib.rs index 6e10cef25532..1466708d617d 100644 --- a/codex-rs/app-server-client/src/lib.rs +++ b/codex-rs/app-server-client/src/lib.rs @@ -951,7 +951,7 @@ mod tests { use codex_app_server_protocol::ToolRequestUserInputParams; use codex_app_server_protocol::ToolRequestUserInputQuestion; use codex_core::config::ConfigBuilder; - use codex_core::init_state_db_from_config; + use codex_core::init_state_db; use futures::SinkExt; use futures::StreamExt; use pretty_assertions::assert_eq; @@ -1017,7 +1017,7 @@ mod tests { ) -> TestClient { let codex_home = TempDir::new().expect("temp dir"); let config = Arc::new(build_test_config_for_codex_home(codex_home.path()).await); - let state_db = init_state_db_from_config(config.as_ref()) + let state_db = init_state_db(config.as_ref()) .await .expect("state db should initialize for in-process test"); let client = InProcessAppServerClient::start(InProcessClientStartArgs { diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 35b92548b895..495ca50b6963 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -82,13 +82,13 @@ use codex_config::CloudRequirementsLoader; use codex_config::LoaderOverrides; use codex_config::ThreadConfigLoader; use codex_core::config::Config; -use codex_core::init_state_db_from_config; +use codex_core::init_state_db; use codex_core::resolve_installation_id; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_login::AuthManager; use codex_protocol::protocol::SessionSource; -use codex_rollout::state_db::StateDbHandle; +pub use codex_rollout::StateDbHandle; pub use codex_state::log_db::LogDbLayer; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -129,7 +129,7 @@ pub struct InProcessStartArgs { pub feedback: CodexFeedback, /// SQLite tracing layer used to flush recently emitted logs before feedback upload. pub log_db: Option, - /// Optional state DB handle to use for the in-process runtime. + /// Process-wide SQLite state handle shared with embedded app-server consumers. pub state_db: Option, /// Environment manager used by core execution and filesystem operations. pub environment_manager: Arc, @@ -370,7 +370,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult Some(state_db), - None => init_state_db_from_config(args.config.as_ref()).await, + None => init_state_db(args.config.as_ref()).await, }; let installation_id = resolve_installation_id(&args.config.codex_home).await?; let (client_tx, mut client_rx) = mpsc::channel::(channel_capacity); @@ -421,12 +421,6 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult(channel_capacity); let mut processor_handle = tokio::spawn(async move { - let Some(state_db) = state_db else { - warn!( - "in-process app-server state db initialization failed; shutting down processor task" - ); - return; - }; let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: Arc::clone(&processor_outgoing), analytics_events_client, @@ -436,7 +430,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult InProcessClientHandle { let codex_home = TempDir::new().expect("temp dir"); let config = Arc::new(build_test_config(codex_home.path()).await); - let state_db = init_state_db_from_config(config.as_ref()) + let state_db = codex_rollout::state_db::try_init(config.as_ref()) .await .expect("state db should initialize for in-process test"); let args = InProcessStartArgs { @@ -833,7 +827,7 @@ mod tests { } #[tokio::test] - async fn in_process_allows_device_key_requests_to_reach_device_key_api() { + async fn in_process_allows_device_key_requests_to_reach_device_key_processor() { let client = start_test_client(SessionSource::Cli).await; const MALFORMED_KEY_ID_MESSAGE: &str = concat!( "invalid device key payload: keyId must be dk_hse_, dk_tpm_, or dk_osn_ ", diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index c7efae69c615..fd5daaafa662 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -51,11 +51,11 @@ use codex_config::TextRange as CoreTextRange; use codex_core::ExecPolicyError; use codex_core::check_execpolicy_for_warnings; use codex_core::config::find_codex_home; -use codex_core::init_state_db_from_config; use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecServerRuntimePaths; use codex_feedback::CodexFeedback; use codex_protocol::protocol::SessionSource; +use codex_rollout::state_db as rollout_state_db; use codex_state::log_db; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -489,9 +489,9 @@ pub async fn run_main_with_transport_options( } }; - let state_db = init_state_db_from_config(&config) - .await - .ok_or_else(|| std::io::Error::other("failed to initialize sqlite state db"))?; + let state_db_result = rollout_state_db::try_init(&config).await; + let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string); + let state_db = state_db_result.ok(); if should_run_personality_migration { let effective_toml = config.config_layer_stack.effective_config(); @@ -600,12 +600,10 @@ pub async fn run_main_with_transport_options( let feedback_layer = feedback.logger_layer(); let feedback_metadata_layer = feedback.metadata_layer(); - let log_db = log_db::start(state_db.clone()); - let log_db_layer = Some( - log_db - .clone() - .with_filter(Targets::new().with_default(Level::TRACE)), - ); + let log_db = state_db.clone().map(log_db::start); + let log_db_layer = log_db + .clone() + .map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE))); let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); let _ = tracing_subscriber::registry() @@ -622,6 +620,9 @@ pub async fn run_main_with_transport_options( None => error!("{}", warning.summary), } } + if let Some(err) = &state_db_init_error { + error!("failed to initialize sqlite state db: {err}"); + } let installation_id = resolve_installation_id(&config.codex_home).await?; let transport_shutdown_token = CancellationToken::new(); let mut transport_accept_handles = Vec::>::new(); @@ -667,17 +668,25 @@ pub async fn run_main_with_transport_options( let auth_manager = AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; - let remote_control_enabled = config.features.enabled(Feature::RemoteControl); + let remote_control_config_enabled = config.features.enabled(Feature::RemoteControl); + let remote_control_enabled = remote_control_config_enabled && state_db.is_some(); + if remote_control_config_enabled && state_db.is_none() { + error!("remote control disabled because sqlite state db is unavailable"); + } if transport_accept_handles.is_empty() && !remote_control_enabled { return Err(std::io::Error::new( ErrorKind::InvalidInput, - "no transport configured; use --listen or enable remote control", + if remote_control_config_enabled && state_db.is_none() { + "no transport configured; remote control disabled because sqlite state db is unavailable" + } else { + "no transport configured; use --listen or enable remote control" + }, )); } let (remote_control_accept_handle, remote_control_handle) = start_remote_control( config.chatgpt_base_url.clone(), - Some(state_db.clone()), + state_db.clone(), auth_manager.clone(), transport_event_tx.clone(), transport_shutdown_token.clone(), @@ -761,7 +770,7 @@ pub async fn run_main_with_transport_options( config_manager, environment_manager, feedback: feedback.clone(), - log_db: Some(log_db), + log_db, state_db: state_db.clone(), config_warnings, session_source, diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs index b1471d88fc94..fb3068e7e1e5 100644 --- a/codex-rs/app-server/src/mcp_refresh.rs +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -108,9 +108,8 @@ mod tests { use codex_config::ThreadConfigLoadErrorCode; use codex_config::ThreadConfigLoader; use codex_config::ThreadConfigSource; - use codex_core::agent_graph_store_from_state_db; use codex_core::config::ConfigOverrides; - use codex_core::init_state_db_from_config; + use codex_core::init_state_db; use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; use codex_login::AuthManager; @@ -175,21 +174,19 @@ mod tests { .await?; let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy")); - let state_db = init_state_db_from_config(&good_config) + let state_db = init_state_db(&good_config) .await .expect("refresh tests require state db"); - let thread_store = thread_store_from_config(&good_config, state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); + let thread_store = thread_store_from_config(&good_config, Some(state_db.clone())); let thread_manager = Arc::new(ThreadManager::new( &good_config, auth_manager, SessionSource::Exec, Arc::new(EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, "11111111-1111-4111-8111-111111111111".to_string(), + Some(state_db), )); thread_manager.start_thread(good_config).await?; thread_manager.start_thread(bad_config).await?; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 1ac635685f18..ecc1f152a3a9 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -61,7 +61,6 @@ use codex_app_server_protocol::experimental_required_message; use codex_arg0::Arg0DispatchPaths; use codex_chatgpt::workspace_settings; use codex_core::ThreadManager; -use codex_core::agent_graph_store_from_state_db; use codex_core::config::Config; use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; @@ -255,7 +254,7 @@ pub(crate) struct MessageProcessorArgs { pub(crate) environment_manager: Arc, pub(crate) feedback: CodexFeedback, pub(crate) log_db: Option, - pub(crate) state_db: StateDbHandle, + pub(crate) state_db: Option, pub(crate) config_warnings: Vec, pub(crate) session_source: SessionSource, pub(crate) auth_manager: Arc, @@ -294,17 +293,15 @@ impl MessageProcessor { // affect per-thread behavior, but they must not move newly started, // resumed, or forked threads to a different persistence backend/root. let thread_store = thread_store_from_config(config.as_ref(), state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), session_source, environment_manager, Some(analytics_events_client.clone()), - state_db.clone(), Arc::clone(&thread_store), - agent_graph_store.clone(), installation_id, + state_db.clone(), )); thread_manager .plugins_manager() @@ -350,7 +347,7 @@ impl MessageProcessor { Arc::clone(&config), feedback, log_db, - Some(state_db.clone()), + state_db.clone(), ); let git_processor = GitRequestProcessor::new(); let initialize_processor = InitializeRequestProcessor::new( @@ -400,7 +397,7 @@ impl MessageProcessor { thread_watch_manager.clone(), Arc::clone(&thread_list_state_permit), thread_goal_processor.clone(), - Some(state_db.clone()), + state_db.clone(), ); let turn_processor = TurnRequestProcessor::new( auth_manager.clone(), diff --git a/codex-rs/app-server/src/message_processor_tracing_tests.rs b/codex-rs/app-server/src/message_processor_tracing_tests.rs index 27e2c2f47339..0d4ef8279b0b 100644 --- a/codex-rs/app-server/src/message_processor_tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor_tracing_tests.rs @@ -32,7 +32,6 @@ use codex_config::CloudRequirementsLoader; use codex_config::LoaderOverrides; use codex_core::config::Config; use codex_core::config::ConfigBuilder; -use codex_core::init_state_db_from_config; use codex_exec_server::EnvironmentManager; use codex_feedback::CodexFeedback; use codex_login::AuthManager; @@ -282,9 +281,6 @@ async fn build_test_processor( outgoing_tx, analytics_events_client.clone(), )); - let state_db = init_state_db_from_config(config.as_ref()) - .await - .expect("tracing test processor requires state db"); let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing, analytics_events_client, @@ -294,7 +290,7 @@ async fn build_test_processor( environment_manager: Arc::new(EnvironmentManager::default_for_tests()), feedback: CodexFeedback::new(), log_db: None, - state_db, + state_db: None, config_warnings: Vec::new(), session_source: SessionSource::VSCode, auth_manager, diff --git a/codex-rs/app-server/src/request_processors/device_key_processor.rs b/codex-rs/app-server/src/request_processors/device_key_processor.rs index e1da459a2e6a..ea0a96c2aff4 100644 --- a/codex-rs/app-server/src/request_processors/device_key_processor.rs +++ b/codex-rs/app-server/src/request_processors/device_key_processor.rs @@ -33,8 +33,8 @@ use codex_device_key::RemoteControlClientConnectionAudience; use codex_device_key::RemoteControlClientConnectionSignPayload; use codex_device_key::RemoteControlClientEnrollmentAudience; use codex_device_key::RemoteControlClientEnrollmentSignPayload; -use codex_rollout::state_db::StateDbHandle; use codex_state::DeviceKeyBindingRecord; +use codex_state::StateRuntime; #[derive(Clone)] pub(crate) struct DeviceKeyRequestProcessor { @@ -43,7 +43,10 @@ pub(crate) struct DeviceKeyRequestProcessor { } impl DeviceKeyRequestProcessor { - pub(crate) fn new(outgoing: Arc, state_db: StateDbHandle) -> Self { + pub(crate) fn new( + outgoing: Arc, + state_db: Option>, + ) -> Self { Self { outgoing, store: DeviceKeyStore::new(Arc::new(StateDeviceKeyBindingStore::new(state_db))), @@ -167,18 +170,25 @@ async fn sign_device_key( } struct StateDeviceKeyBindingStore { - state_db: StateDbHandle, + state_db: Option>, } impl StateDeviceKeyBindingStore { - fn new(state_db: StateDbHandle) -> Self { + fn new(state_db: Option>) -> Self { Self { state_db } } + + async fn state_db(&self) -> Result, DeviceKeyError> { + self.state_db + .clone() + .ok_or_else(|| DeviceKeyError::Platform("sqlite state db unavailable".to_string())) + } } impl fmt::Debug for StateDeviceKeyBindingStore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StateDeviceKeyBindingStore") + .field("has_state_db", &self.state_db.is_some()) .finish_non_exhaustive() } } @@ -186,7 +196,7 @@ impl fmt::Debug for StateDeviceKeyBindingStore { #[async_trait] impl DeviceKeyBindingStore for StateDeviceKeyBindingStore { async fn get_binding(&self, key_id: &str) -> Result, DeviceKeyError> { - let state_db = self.state_db.clone(); + let state_db = self.state_db().await?; state_db .get_device_key_binding(key_id) .await @@ -204,7 +214,7 @@ impl DeviceKeyBindingStore for StateDeviceKeyBindingStore { key_id: &str, binding: &DeviceKeyBinding, ) -> Result<(), DeviceKeyError> { - let state_db = self.state_db.clone(); + let state_db = self.state_db().await?; state_db .upsert_device_key_binding(&DeviceKeyBindingRecord { key_id: key_id.to_string(), diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index ff192e34b2a3..0e12e44ce512 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -7,7 +7,7 @@ pub(crate) struct ThreadGoalRequestProcessor { outgoing: Arc, config: Arc, thread_state_manager: ThreadStateManager, - state_db: StateDbHandle, + state_db: Option, } impl ThreadGoalRequestProcessor { @@ -16,7 +16,7 @@ impl ThreadGoalRequestProcessor { outgoing: Arc, config: Arc, thread_state_manager: ThreadStateManager, - state_db: StateDbHandle, + state_db: Option, ) -> Self { Self { thread_manager, @@ -72,6 +72,23 @@ impl ThreadGoalRequestProcessor { } } + pub(crate) async fn pending_resume_goal_state( + &self, + thread: &CodexThread, + ) -> (bool, Option) { + let emit_thread_goal_update = self.config.features.enabled(Feature::Goals); + let thread_goal_state_db = if emit_thread_goal_update { + if let Some(state_db) = thread.state_db() { + Some(state_db) + } else { + self.state_db.clone() + } + } else { + None + }; + (emit_thread_goal_update, thread_goal_state_db) + } + async fn thread_goal_set_inner( &self, request_id: ConnectionRequestId, @@ -93,7 +110,7 @@ impl ThreadGoalRequestProcessor { None => find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), - Some(self.state_db.as_ref()), + self.state_db.as_deref(), ) .await .map_err(|err| { @@ -258,7 +275,7 @@ impl ThreadGoalRequestProcessor { None => find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), - Some(self.state_db.as_ref()), + self.state_db.as_deref(), ) .await .map_err(|err| { @@ -322,7 +339,7 @@ impl ThreadGoalRequestProcessor { find_thread_path_by_id_str( &self.config.codex_home, &thread_id.to_string(), - Some(self.state_db.as_ref()), + self.state_db.as_deref(), ) .await .map_err(|err| { @@ -331,7 +348,9 @@ impl ThreadGoalRequestProcessor { .ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?; } - Ok(self.state_db.clone()) + self.state_db + .clone() + .ok_or_else(|| internal_error("sqlite state db unavailable for thread goals")) } async fn emit_thread_goal_snapshot(&self, thread_id: ThreadId) { diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 4042197a7667..deeb2e9d6291 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2671,10 +2671,10 @@ impl ThreadRequestProcessor { ))); }; - let emit_thread_goal_update = self.config.features.enabled(Feature::Goals); - let thread_goal_state_db = emit_thread_goal_update - .then(|| self.state_db.clone()) - .flatten(); + let (emit_thread_goal_update, thread_goal_state_db) = self + .thread_goal_processor + .pending_resume_goal_state(existing_thread.as_ref()) + .await; let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse( Box::new(crate::thread_state::PendingThreadResumeRequest { diff --git a/codex-rs/core-api/src/lib.rs b/codex-rs/core-api/src/lib.rs index 9af459830aca..f9bdc9b56b4c 100644 --- a/codex-rs/core-api/src/lib.rs +++ b/codex-rs/core-api/src/lib.rs @@ -31,7 +31,6 @@ pub use codex_core::StartThreadOptions; pub use codex_core::StateDbHandle; pub use codex_core::ThreadManager; pub use codex_core::ThreadShutdownReport; -pub use codex_core::agent_graph_store_from_state_db; pub use codex_core::config::Config; pub use codex_core::config::Constrained; pub use codex_core::config::GhostSnapshotConfig; @@ -41,7 +40,6 @@ pub use codex_core::config::TerminalResizeReflowConfig; pub use codex_core::config::ThreadStoreConfig; pub use codex_core::config::find_codex_home; pub use codex_core::init_state_db; -pub use codex_core::init_state_db_from_config; pub use codex_core::resolve_installation_id; pub use codex_core::skills::SkillsManager; pub use codex_core::thread_store_from_config; diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 8c8114324cf1..f39c9a18264d 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -31,6 +31,7 @@ use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::user_input::UserInput; +use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::ReadThreadParams; use serde::Serialize; use std::collections::HashMap; @@ -310,6 +311,7 @@ impl AgentControl { state.notify_thread_created(new_thread.thread_id); self.persist_thread_spawn_edge_for_source( + new_thread.thread.as_ref(), new_thread.thread_id, notification_source.as_ref(), ) @@ -459,14 +461,19 @@ impl AgentControl { )) .await?; let state = self.upgrade()?; - let agent_graph_store = state.agent_graph_store(); + let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else { + return Ok(resumed_thread_id); + }; + let Some(state_db_ctx) = resumed_thread.state_db() else { + return Ok(resumed_thread_id); + }; let mut resume_queue = VecDeque::from([(thread_id, root_depth)]); while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() { - let child_ids = match agent_graph_store - .list_thread_spawn_children( + let child_ids = match state_db_ctx + .list_thread_spawn_children_with_status( parent_thread_id, - Some(codex_agent_graph_store::ThreadSpawnEdgeStatus::Open), + DirectionalThreadSpawnEdgeStatus::Open, ) .await { @@ -530,6 +537,7 @@ impl AgentControl { let _ = config.features.disable(Feature::Collab); } let state = self.upgrade()?; + let state_db_ctx = state.state_db(); let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?; let (session_source, agent_metadata) = match session_source { SessionSource::SubAgent(SubAgentSource::ThreadSpawn { @@ -539,11 +547,14 @@ impl AgentControl { agent_role: _, agent_nickname: _, }) => { - let state_db_ctx = state.state_db(); let (resumed_agent_nickname, resumed_agent_role) = - match state_db_ctx.get_thread(thread_id).await { - Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role), - Ok(None) | Err(_) => (None, None), + if let Some(state_db_ctx) = state_db_ctx.as_ref() { + match state_db_ctx.get_thread(thread_id).await { + Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role), + Ok(None) | Err(_) => (None, None), + } + } else { + (None, None) }; self.prepare_thread_spawn( &mut reservation, @@ -610,6 +621,7 @@ impl AgentControl { ); } self.persist_thread_spawn_edge_for_source( + resumed_thread.thread.as_ref(), resumed_thread.thread_id, Some(¬ification_source), ) @@ -722,13 +734,11 @@ impl AgentControl { /// agent and any live descendants reached from the in-memory tree. pub(crate) async fn close_agent(&self, agent_id: ThreadId) -> CodexResult { let state = self.upgrade()?; - if let Err(err) = state - .agent_graph_store() - .set_thread_spawn_edge_status( - agent_id, - codex_agent_graph_store::ThreadSpawnEdgeStatus::Closed, - ) - .await + if let Ok(thread) = state.get_thread(agent_id).await + && let Some(state_db_ctx) = thread.state_db() + && let Err(err) = state_db_ctx + .set_thread_spawn_edge_status(agent_id, DirectionalThreadSpawnEdgeStatus::Closed) + .await { warn!("failed to persist thread-spawn edge status for {agent_id}: {err}"); } @@ -1144,21 +1154,21 @@ impl AgentControl { async fn persist_thread_spawn_edge_for_source( &self, + thread: &crate::CodexThread, child_thread_id: ThreadId, session_source: Option<&SessionSource>, ) { let Some(parent_thread_id) = session_source.and_then(thread_spawn_parent_thread_id) else { return; }; - let Ok(state) = self.upgrade() else { + let Some(state_db_ctx) = thread.state_db() else { return; }; - if let Err(err) = state - .agent_graph_store() + if let Err(err) = state_db_ctx .upsert_thread_spawn_edge( parent_thread_id, child_thread_id, - codex_agent_graph_store::ThreadSpawnEdgeStatus::Open, + DirectionalThreadSpawnEdgeStatus::Open, ) .await { diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index a8de42781462..e0c6618904c9 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -107,6 +107,32 @@ impl AgentControlHarness { } } + async fn new_with_state_db() -> Self { + let (home, mut config) = test_config().await; + config + .features + .enable(Feature::Sqlite) + .expect("test config should allow sqlite"); + let state_db = crate::init_state_db(&config) + .await + .expect("test config should initialize state db"); + let manager = ThreadManager::with_models_provider_home_and_state_for_tests( + CodexAuth::from_api_key("dummy"), + config.model_provider.clone(), + config.codex_home.to_path_buf(), + std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + Some(state_db), + ) + .await; + let control = manager.agent_control(); + Self { + _home: home, + config, + manager, + control, + } + } + async fn start_thread(&self) -> (ThreadId, Arc) { let new_thread = self .manager @@ -1538,25 +1564,7 @@ async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() { #[tokio::test] async fn resume_thread_subagent_restores_stored_nickname_and_role() { - let (home, mut config) = test_config().await; - config - .features - .enable(Feature::Sqlite) - .expect("test config should allow sqlite"); - let manager = ThreadManager::with_models_provider_and_home_for_tests( - CodexAuth::from_api_key("dummy"), - config.model_provider.clone(), - config.codex_home.to_path_buf(), - std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), - ) - .await; - let control = manager.agent_control(); - let harness = AgentControlHarness { - _home: home, - config, - manager, - control, - }; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, _parent_thread) = harness.start_thread().await; let agent_path = AgentPath::from_string("/root/explorer".to_string()) .expect("test agent path should be valid"); @@ -1704,12 +1712,14 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() { .expect("child shutdown should succeed"); let store = LocalThreadStore::new( LocalThreadStoreConfig::from_config(&harness.config), - codex_state::StateRuntime::init( - harness.config.sqlite_home.clone(), - harness.config.model_provider_id.clone(), - ) - .await - .expect("state db should initialize"), + Some( + codex_state::StateRuntime::init( + harness.config.sqlite_home.clone(), + harness.config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + ), ); store .archive_thread(ArchiveThreadParams { @@ -1734,7 +1744,7 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() { #[tokio::test] async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, _parent_thread) = harness.start_thread().await; let worker_path = AgentPath::root().join("worker").expect("worker path"); let reviewer_path = AgentPath::root().join("reviewer").expect("reviewer path"); @@ -1860,7 +1870,7 @@ async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants #[tokio::test] async fn shutdown_agent_tree_closes_live_descendants() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, _parent_thread) = harness.start_thread().await; let child_thread_id = harness @@ -1945,7 +1955,7 @@ async fn shutdown_agent_tree_closes_live_descendants() { #[tokio::test] async fn shutdown_agent_tree_closes_descendants_when_started_at_child() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, _parent_thread) = harness.start_thread().await; let child_thread_id = harness @@ -2036,7 +2046,7 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() { #[tokio::test] async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, parent_thread) = harness.start_thread().await; let child_thread_id = harness @@ -2131,7 +2141,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() { #[tokio::test] async fn resume_closed_child_reopens_open_descendants() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, parent_thread) = harness.start_thread().await; let child_thread_id = harness @@ -2228,7 +2238,7 @@ async fn resume_closed_child_reopens_open_descendants() { #[tokio::test] async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdown() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, parent_thread) = harness.start_thread().await; let child_thread_id = harness @@ -2319,7 +2329,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo #[tokio::test] async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_source_is_stale() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, parent_thread) = harness.start_thread().await; let child_thread_id = harness @@ -2450,7 +2460,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc #[tokio::test] async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails() { - let harness = AgentControlHarness::new().await; + let harness = AgentControlHarness::new_with_state_db().await; let (parent_thread_id, parent_thread) = harness.start_thread().await; let child_thread_id = harness diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 3ead350dfad5..a89d8fc9737c 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -98,7 +98,6 @@ pub(crate) async fn run_codex_thread_interactive( parent_trace: None, environment_selections: parent_ctx.environments.clone(), analytics_events_client: Some(parent_session.services.analytics_events_client.clone()), - state_db: parent_session.services.state_db.clone(), thread_store: Arc::clone(&parent_session.services.thread_store), })) .or_cancel(&cancel_token) diff --git a/codex-rs/core/src/goals.rs b/codex-rs/core/src/goals.rs index 8f15b1b24f29..7de2737b323d 100644 --- a/codex-rs/core/src/goals.rs +++ b/codex-rs/core/src/goals.rs @@ -30,6 +30,7 @@ use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::validate_thread_goal_objective; use codex_rollout::state_db::reconcile_rollout; +use codex_thread_store::LocalThreadStore; use codex_utils_template::Template; use futures::future::BoxFuture; use std::sync::Arc; @@ -1338,6 +1339,17 @@ impl Session { state_db } else if let Some(state_db) = self.goal_runtime.state_db.lock().await.clone() { state_db + } else if let Some(local_store) = self + .services + .thread_store + .as_any() + .downcast_ref::() + { + local_store.state_db().await.ok_or_else(|| { + anyhow::anyhow!( + "thread goals require a local persisted thread with a state database" + ) + })? } else { anyhow::bail!("thread goals require a local persisted thread with a state database"); }; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 1a754d1ece08..0cdf0e2d4669 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -117,9 +117,7 @@ pub use thread_manager::NewThread; pub use thread_manager::StartThreadOptions; pub use thread_manager::ThreadManager; pub use thread_manager::ThreadShutdownReport; -pub use thread_manager::agent_graph_store_from_state_db; pub use thread_manager::build_models_manager; -pub use thread_manager::init_state_db_from_config; pub use thread_manager::thread_store_from_config; pub use web_search::web_search_action_detail; pub use web_search::web_search_detail; diff --git a/codex-rs/core/src/personality_migration.rs b/codex-rs/core/src/personality_migration.rs index bb1cb43587ae..975aecd4afd1 100644 --- a/codex-rs/core/src/personality_migration.rs +++ b/codex-rs/core/src/personality_migration.rs @@ -25,7 +25,7 @@ pub enum PersonalityMigrationStatus { pub async fn maybe_migrate_personality( codex_home: &Path, config_toml: &ConfigToml, - state_db: StateDbHandle, + state_db: Option, ) -> io::Result { let marker_path = codex_home.join(PERSONALITY_MIGRATION_FILENAME); if tokio::fs::try_exists(&marker_path).await? { @@ -65,13 +65,16 @@ pub async fn maybe_migrate_personality( async fn has_recorded_sessions( codex_home: &Path, default_provider: &str, - state_db: StateDbHandle, + state_db: Option, ) -> io::Result { - let config = LocalThreadStoreConfig { - codex_home: codex_home.to_path_buf(), - default_model_provider_id: default_provider.to_string(), - }; - let store = LocalThreadStore::new(config, state_db); + let store = LocalThreadStore::new( + LocalThreadStoreConfig { + codex_home: codex_home.to_path_buf(), + sqlite_home: codex_home.to_path_buf(), + default_model_provider_id: default_provider.to_string(), + }, + state_db, + ); if has_threads(&store, /*archived*/ false).await? { return Ok(true); } diff --git a/codex-rs/core/src/personality_migration_tests.rs b/codex-rs/core/src/personality_migration_tests.rs index 3de22ba1d3b3..fcff99582041 100644 --- a/codex-rs/core/src/personality_migration_tests.rs +++ b/codex-rs/core/src/personality_migration_tests.rs @@ -112,7 +112,7 @@ async fn applies_when_sessions_exist_and_no_personality() -> io::Result<()> { let config_toml = ConfigToml::default(); let state_db = state_db_for_test(temp.path()).await?; - let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?; + let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?; assert_eq!(status, PersonalityMigrationStatus::Applied); assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists()); @@ -129,7 +129,7 @@ async fn applies_when_only_archived_sessions_exist_and_no_personality() -> io::R let config_toml = ConfigToml::default(); let state_db = state_db_for_test(temp.path()).await?; - let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?; + let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?; assert_eq!(status, PersonalityMigrationStatus::Applied); assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists()); @@ -146,7 +146,7 @@ async fn skips_when_marker_exists() -> io::Result<()> { let config_toml = ConfigToml::default(); let state_db = state_db_for_test(temp.path()).await?; - let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?; + let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?; assert_eq!(status, PersonalityMigrationStatus::SkippedMarker); assert!(!temp.path().join("config.toml").exists()); @@ -164,7 +164,7 @@ async fn skips_when_personality_explicit() -> io::Result<()> { let config_toml = read_config_toml(temp.path()).await?; let state_db = state_db_for_test(temp.path()).await?; - let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?; + let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?; assert_eq!( status, @@ -182,7 +182,7 @@ async fn skips_when_no_sessions() -> io::Result<()> { let temp = TempDir::new()?; let config_toml = ConfigToml::default(); let state_db = state_db_for_test(temp.path()).await?; - let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?; + let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?; assert_eq!(status, PersonalityMigrationStatus::SkippedNoSessions); assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists()); @@ -199,7 +199,7 @@ async fn uses_configured_sqlite_home_when_checking_for_sessions() -> io::Result< let config_toml = ConfigToml::default(); let state_db = state_db_for_test_with_sqlite_home(codex_home.path(), sqlite_home.path()).await?; - let status = maybe_migrate_personality(codex_home.path(), &config_toml, state_db).await?; + let status = maybe_migrate_personality(codex_home.path(), &config_toml, Some(state_db)).await?; assert_eq!(status, PersonalityMigrationStatus::Applied); assert!( diff --git a/codex-rs/core/src/prompt_debug.rs b/codex-rs/core/src/prompt_debug.rs index 1b0c75230caf..5e68cc1e577c 100644 --- a/codex-rs/core/src/prompt_debug.rs +++ b/codex-rs/core/src/prompt_debug.rs @@ -17,9 +17,8 @@ use crate::resolve_installation_id; use crate::session::session::Session; use crate::session::turn::build_prompt; use crate::session::turn::built_tools; +use crate::state_db_bridge::init_state_db; use crate::thread_manager::ThreadManager; -use crate::thread_manager::agent_graph_store_from_state_db; -use crate::thread_manager::init_state_db_from_config; use crate::thread_manager::thread_store_from_config; /// Build the model-visible `input` list for a single debug turn. @@ -38,11 +37,8 @@ pub async fn build_prompt_input( config.codex_linux_sandbox_exe.clone(), )?; - let state_db = init_state_db_from_config(&config) - .await - .ok_or_else(|| std::io::Error::other("prompt debug requires state db"))?; + let state_db = init_state_db(&config).await; let thread_store = thread_store_from_config(&config, state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); let installation_id = resolve_installation_id(&config.codex_home).await?; let thread_manager = ThreadManager::new( &config, @@ -50,10 +46,9 @@ pub async fn build_prompt_input( SessionSource::Exec, Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, installation_id, + state_db.clone(), ); let thread = thread_manager.start_thread(config).await?; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 8782f14b3dae..ee892dd2ea09 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -132,6 +132,7 @@ use codex_terminal_detection::user_agent; use codex_thread_store::CreateThreadParams; use codex_thread_store::LiveThread; use codex_thread_store::LiveThreadInitGuard; +use codex_thread_store::LocalThreadStore; use codex_thread_store::ResumeThreadParams; use codex_thread_store::ThreadEventPersistenceMode; use codex_thread_store::ThreadPersistenceMetadata; @@ -409,7 +410,6 @@ pub(crate) struct CodexSpawnArgs { pub(crate) parent_trace: Option, pub(crate) environment_selections: ResolvedTurnEnvironments, pub(crate) analytics_events_client: Option, - pub(crate) state_db: Option, pub(crate) thread_store: Arc, } @@ -469,7 +469,6 @@ impl Codex { parent_trace: _, environment_selections, analytics_events_client, - state_db, thread_store, } = args; let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); @@ -558,7 +557,15 @@ impl Codex { }; match thread_id { Some(thread_id) => { - let state_db_ctx = state_db.clone(); + let state_db_ctx = if config.ephemeral { + None + } else if let Some(local_store) = + thread_store.as_any().downcast_ref::() + { + local_store.state_db().await + } else { + None + }; state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn") .await } @@ -646,7 +653,6 @@ impl Codex { agent_control, environment_manager, analytics_events_client, - state_db, thread_store, parent_rollout_thread_trace, ) @@ -1308,7 +1314,7 @@ impl Session { self.services.user_shell.as_ref().clone(), self.services.shell_snapshot_tx.clone(), self.services.session_telemetry.clone(), - self.state_db(), + self.services.state_db.clone(), ); } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 2ce01e81b3ce..e12f1fdee2c4 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -368,7 +368,6 @@ impl Session { agent_control: AgentControl, environment_manager: Arc, analytics_events_client: Option, - state_db: Option, thread_store: Arc, parent_rollout_thread_trace: ThreadTraceContext, ) -> anyhow::Result> { @@ -468,7 +467,22 @@ impl Session { otel.name = "session_init.thread_persistence", session_init.ephemeral = config.ephemeral, )); - let state_db_ctx = if config.ephemeral { None } else { state_db }; + let state_db_fut = async { + if config.ephemeral { + None + } else if let Some(local_store) = + thread_store.as_any().downcast_ref::() + { + local_store.state_db().await + } else { + None + } + } + .instrument(info_span!( + "session_init.state_db", + otel.name = "session_init.state_db", + session_init.ephemeral = config.ephemeral, + )); let auth_manager_clone = Arc::clone(&auth_manager); let config_for_mcp = Arc::clone(&config); @@ -492,8 +506,8 @@ impl Session { )); // Join all independent futures. - let (thread_persistence_result, (auth, mcp_servers, auth_statuses)) = - tokio::join!(thread_persistence_fut, auth_and_mcp_fut); + let (thread_persistence_result, state_db_ctx, (auth, mcp_servers, auth_statuses)) = + tokio::join!(thread_persistence_fut, state_db_fut, auth_and_mcp_fut); let mut live_thread_init = LiveThreadInitGuard::new(thread_persistence_result.map_err(|e| { diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index cae3e1f97810..48dc5acb51c9 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3598,15 +3598,16 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - /*state_db*/ None, Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), - codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - .expect("state db should initialize"), + Some( + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + ), )), codex_rollout_trace::ThreadTraceContext::disabled(), ) @@ -3755,12 +3756,14 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { live_thread: None, thread_store: Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), - codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - .expect("state db should initialize"), + Some( + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + ), )), model_client: ModelClient::new( Some(auth_manager.clone()), @@ -3939,15 +3942,16 @@ async fn make_session_with_config_and_rx( AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - /*state_db*/ None, Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), - codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - .expect("state db should initialize"), + Some( + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + ), )), codex_rollout_trace::ThreadTraceContext::disabled(), ) @@ -4047,15 +4051,16 @@ async fn make_session_with_history_source_and_agent_control_and_rx( agent_control, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - /*state_db*/ None, Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), - codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - .expect("state db should initialize"), + Some( + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + ), )), codex_rollout_trace::ThreadTraceContext::disabled(), ) @@ -5446,7 +5451,7 @@ where live_thread: None, thread_store: Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()), - state_db, + Some(state_db), )), model_client: ModelClient::new( Some(Arc::clone(&auth_manager)), diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 1026468627bc..0688c0d9dee9 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -731,12 +731,14 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { let skills_watcher = Arc::new(SkillsWatcher::noop()); let thread_store = Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(&config), - codex_state::StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - ) - .await - .expect("state db should initialize"), + Some( + codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"), + ), )); let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs { @@ -767,7 +769,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { turn_environments: Vec::new(), }, analytics_events_client: None, - state_db: None, thread_store, }) .await diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index fc608aed93e4..29884da4aeb5 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -138,8 +138,11 @@ pub(crate) async fn record_completed_response_item( .await; } mark_thread_memory_mode_polluted_if_external_context(sess, turn_context, item).await; - let has_memory_citation = - record_stage1_output_usage_and_detect_memory_citation(sess.state_db(), item).await; + let has_memory_citation = record_stage1_output_usage_and_detect_memory_citation( + sess.services.state_db.as_ref(), + item, + ) + .await; if has_memory_citation { sess.record_memory_citation_for_turn(&turn_context.sub_id) .await; @@ -174,7 +177,7 @@ pub(crate) async fn mark_thread_memory_mode_polluted_if_external_context( } async fn record_stage1_output_usage_and_detect_memory_citation( - state_db_ctx: Option, + state_db_ctx: Option<&state_db::StateDbHandle>, item: &ResponseItem, ) -> bool { let Some(raw_text) = raw_assistant_output_text_from_item(item) else { diff --git a/codex-rs/core/src/test_support.rs b/codex-rs/core/src/test_support.rs index 74305fb51886..fd42b2516f64 100644 --- a/codex-rs/core/src/test_support.rs +++ b/codex-rs/core/src/test_support.rs @@ -74,6 +74,23 @@ pub async fn thread_manager_with_models_provider_and_home( .await } +pub async fn thread_manager_with_models_provider_home_and_state( + auth: CodexAuth, + provider: ModelProviderInfo, + codex_home: PathBuf, + environment_manager: Arc, + state_db: Option, +) -> ThreadManager { + ThreadManager::with_models_provider_home_and_state_for_tests( + auth, + provider, + codex_home, + environment_manager, + state_db, + ) + .await +} + pub async fn start_thread_with_user_shell_override( thread_manager: &ThreadManager, config: Config, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 331ed3ca15c7..054fd909066f 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -19,8 +19,6 @@ use crate::skills_watcher::SkillsWatcher; use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::InterruptedTurnHistoryMarker; use crate::tasks::interrupted_turn_history_marker; -use codex_agent_graph_store::AgentGraphStore; -use codex_agent_graph_store::LocalAgentGraphStore; use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::TurnStatus; @@ -53,8 +51,8 @@ use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; -use codex_rollout::state_db; use codex_rollout::state_db::StateDbHandle; +use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; use codex_thread_store::LocalThreadStoreConfig; @@ -251,11 +249,10 @@ pub(crate) struct ThreadManagerState { mcp_manager: Arc, skills_watcher: Arc, thread_store: Arc, - state_db: StateDbHandle, - agent_graph_store: Arc, session_source: SessionSource, installation_id: String, analytics_events_client: Option, + state_db: Option, // Captures submitted ops for testing purpose when test mode is enabled. ops_log: Option, } @@ -271,11 +268,10 @@ pub fn build_models_manager( ) } -pub async fn init_state_db_from_config(config: &Config) -> Option { - state_db::init(config).await -} - -pub fn thread_store_from_config(config: &Config, state_db: StateDbHandle) -> Arc { +pub fn thread_store_from_config( + config: &Config, + state_db: Option, +) -> Arc { match &config.experimental_thread_store { ThreadStoreConfig::Local => Arc::new(LocalThreadStore::new( LocalThreadStoreConfig::from_config(config), @@ -286,27 +282,6 @@ pub fn thread_store_from_config(config: &Config, state_db: StateDbHandle) -> Arc } } -pub fn agent_graph_store_from_state_db(state_db: StateDbHandle) -> Arc { - Arc::new(LocalAgentGraphStore::new(state_db)) -} - -async fn state_db_from_roots_for_tests( - codex_home: PathBuf, - sqlite_home: PathBuf, - default_model_provider_id: String, -) -> StateDbHandle { - let config = codex_rollout::RolloutConfig { - codex_home: codex_home.clone(), - sqlite_home, - cwd: codex_home, - model_provider_id: default_model_provider_id, - generate_memories: false, - }; - state_db::try_init(&config) - .await - .unwrap_or_else(|err| panic!("test state db should initialize: {err}")) -} - impl ThreadManager { #[allow(clippy::too_many_arguments)] pub fn new( @@ -315,10 +290,9 @@ impl ThreadManager { session_source: SessionSource, environment_manager: Arc, analytics_events_client: Option, - state_db: StateDbHandle, thread_store: Arc, - agent_graph_store: Arc, installation_id: String, + state_db: Option, ) -> Self { let codex_home = config.codex_home.clone(); let restriction_product = session_source.restriction_product(); @@ -345,12 +319,11 @@ impl ThreadManager { mcp_manager, skills_watcher, thread_store, - state_db, - agent_graph_store, auth_manager, session_source, installation_id, analytics_events_client, + state_db, ops_log: should_use_test_thread_manager_behavior() .then(|| Arc::new(std::sync::Mutex::new(Vec::new()))), }), @@ -371,28 +344,13 @@ impl ThreadManager { )); std::fs::create_dir_all(&codex_home) .unwrap_or_else(|err| panic!("temp codex home dir create failed: {err}")); - let state_db = state_db_from_roots_for_tests( - codex_home.clone(), - codex_home.clone(), - OPENAI_PROVIDER_ID.to_string(), - ) - .await; - let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) { - Ok(codex_home) => codex_home, - Err(err) => panic!("test codex_home should be absolute: {err}"), - }; - let installation_id = resolve_installation_id(&skills_codex_home) - .await - .unwrap_or_else(|err| panic!("resolve test installation id failed: {err}")); - let mut manager = Self::with_models_provider_and_home_and_state_db_for_tests( + let mut manager = Self::with_models_provider_and_home_for_tests( auth, provider, codex_home.clone(), Arc::new(EnvironmentManager::default_for_tests()), - state_db, - skills_codex_home, - installation_id, - ); + ) + .await; manager._test_codex_home_guard = Some(TempCodexHomeGuard { path: codex_home }); manager } @@ -405,39 +363,30 @@ impl ThreadManager { codex_home: PathBuf, environment_manager: Arc, ) -> Self { - let state_db = state_db_from_roots_for_tests( - codex_home.clone(), - codex_home.clone(), - OPENAI_PROVIDER_ID.to_string(), - ) - .await; - let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) { - Ok(codex_home) => codex_home, - Err(err) => panic!("test codex_home should be absolute: {err}"), - }; - let installation_id = resolve_installation_id(&skills_codex_home) - .await - .unwrap_or_else(|err| panic!("resolve test installation id failed: {err}")); - Self::with_models_provider_and_home_and_state_db_for_tests( + Self::with_models_provider_home_and_state_for_tests( auth, provider, codex_home, environment_manager, - state_db, - skills_codex_home, - installation_id, + /*state_db*/ None, ) + .await } - fn with_models_provider_and_home_and_state_db_for_tests( + pub(crate) async fn with_models_provider_home_and_state_for_tests( auth: CodexAuth, provider: ModelProviderInfo, codex_home: PathBuf, environment_manager: Arc, - state_db: StateDbHandle, - skills_codex_home: AbsolutePathBuf, - installation_id: String, + state_db: Option, ) -> Self { + let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) { + Ok(codex_home) => codex_home, + Err(err) => panic!("test codex_home should be absolute: {err}"), + }; + let installation_id = resolve_installation_id(&skills_codex_home) + .await + .unwrap_or_else(|err| panic!("resolve test installation id failed: {err}")); set_thread_manager_test_mode_for_tests(/*enabled*/ true); let auth_manager = AuthManager::from_auth_for_testing(auth); let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); @@ -458,11 +407,11 @@ impl ThreadManager { let thread_store: Arc = Arc::new(LocalThreadStore::new( LocalThreadStoreConfig { codex_home: codex_home.clone(), + sqlite_home: codex_home.clone(), default_model_provider_id: OPENAI_PROVIDER_ID.to_string(), }, state_db.clone(), )); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), @@ -475,12 +424,11 @@ impl ThreadManager { mcp_manager, skills_watcher, thread_store, - state_db, - agent_graph_store, auth_manager, session_source: SessionSource::Exec, installation_id, analytics_events_client: None, + state_db, ops_log: should_use_test_thread_manager_behavior() .then(|| Arc::new(std::sync::Mutex::new(Vec::new()))), }), @@ -566,17 +514,22 @@ impl ThreadManager { subtree_thread_ids.push(thread_id); seen_thread_ids.insert(thread_id); - for descendant_id in self - .state - .agent_graph_store - .list_thread_spawn_descendants(thread_id, /*status_filter*/ None) - .await - .map_err(|err| { - CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}")) - })? - { - if seen_thread_ids.insert(descendant_id) { - subtree_thread_ids.push(descendant_id); + if let Some(state_db_ctx) = thread.state_db() { + for status in [ + DirectionalThreadSpawnEdgeStatus::Open, + DirectionalThreadSpawnEdgeStatus::Closed, + ] { + for descendant_id in state_db_ctx + .list_thread_spawn_descendants_with_status(thread_id, status) + .await + .map_err(|err| { + CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}")) + })? + { + if seen_thread_ids.insert(descendant_id) { + subtree_thread_ids.push(descendant_id); + } + } } } @@ -918,14 +871,10 @@ impl ThreadManager { } impl ThreadManagerState { - pub(crate) fn state_db(&self) -> StateDbHandle { + pub(crate) fn state_db(&self) -> Option { self.state_db.clone() } - pub(crate) fn agent_graph_store(&self) -> Arc { - self.agent_graph_store.clone() - } - pub(crate) async fn list_thread_ids(&self) -> Vec { self.threads .read() @@ -1242,7 +1191,6 @@ impl ThreadManagerState { parent_trace, environment_selections, analytics_events_client: self.analytics_events_client.clone(), - state_db: Some(self.state_db.clone()), thread_store: Arc::clone(&self.thread_store), }) .await?; diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 0f6afa05a648..85bfa97076ae 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -1,5 +1,6 @@ use super::*; use crate::config::test_config; +use crate::init_state_db; use crate::installation_id::INSTALLATION_ID_FILENAME; use crate::rollout::RolloutRecorder; use crate::session::session::SessionSettingsUpdate; @@ -50,19 +51,12 @@ fn assistant_msg(text: &str) -> ResponseItem { } } -async fn state_backed_stores( - config: &Config, -) -> ( - StateDbHandle, - Arc, - Arc, -) { - let state_db = init_state_db_from_config(config) +async fn state_backed_stores(config: &Config) -> (StateDbHandle, Arc) { + let state_db = init_state_db(config) .await .expect("thread manager test requires state db"); - let thread_store = thread_store_from_config(config, state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); - (state_db, thread_store, agent_graph_store) + let thread_store = thread_store_from_config(config, Some(state_db.clone())); + (state_db, thread_store) } fn contextual_user_interrupted_marker() -> ResponseItem { @@ -408,17 +402,16 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let selected_cwd = AbsolutePathBuf::try_from(config.cwd.as_path().join("selected")).expect("absolute path"); @@ -524,17 +517,16 @@ async fn explicit_installation_id_skips_codex_home_file() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); let installation_id = uuid::Uuid::new_v4().to_string(); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager, SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, installation_id.clone(), + Some(state_db), ); let thread = manager @@ -563,17 +555,16 @@ async fn resume_active_thread_from_rollout_returns_running_thread() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager @@ -620,17 +611,16 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager @@ -682,17 +672,16 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager @@ -768,17 +757,16 @@ async fn new_uses_active_provider_for_model_refresh() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager, SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let _ = manager.list_models(RefreshStrategy::Online).await; @@ -983,17 +971,16 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager @@ -1090,17 +1077,16 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager @@ -1186,17 +1172,16 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_ let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager @@ -1328,17 +1313,16 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> { let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); - let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await; + let (state_db, thread_store) = state_backed_stores(&config).await; let manager = ThreadManager::new( &config, auth_manager.clone(), SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, TEST_INSTALLATION_ID.to_string(), + Some(state_db), ); let source = manager diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index b678f3ffe83c..dcde42efc4a4 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -6,7 +6,6 @@ use crate::function_tool::FunctionCallError; use crate::init_state_db; use crate::session::tests::make_session_and_context; use crate::session_prefix::format_subagent_notification_message; -use crate::thread_manager::agent_graph_store_from_state_db; use crate::thread_manager::thread_store_from_config; use crate::tools::context::ToolOutput; use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2; @@ -3150,8 +3149,28 @@ async fn close_agent_submits_shutdown_and_returns_previous_status() { assert_eq!(status_after, AgentStatus::NotFound); } -#[tokio::test] -async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed() { +#[test] +fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed() { + let handle = std::thread::Builder::new() + .name("multi-agent-cascade-test".to_string()) + .stack_size(8 * 1024 * 1024) + .spawn(|| { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .expect("tokio runtime should build"); + runtime.block_on( + tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed_impl(), + ); + }) + .expect("multi-agent cascade test thread should spawn"); + if let Err(payload) = handle.join() { + std::panic::resume_unwind(payload); + } +} + +async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed_impl() { let (_session, turn) = make_session_and_context().await; let mut config = turn.config.as_ref().clone(); config.agent_max_depth = 3; @@ -3168,10 +3187,9 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db.clone(), - thread_store_from_config(&config, state_db.clone()), - agent_graph_store_from_state_db(state_db.clone()), + thread_store_from_config(&config, Some(state_db.clone())), "11111111-1111-4111-8111-111111111111".to_string(), + Some(state_db), ); let parent = manager diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 1728f5bc5c13..755dcdfaa8a1 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -15,9 +15,8 @@ use anyhow::anyhow; use codex_config::CloudRequirementsLoader; use codex_core::CodexThread; use codex_core::ThreadManager; -use codex_core::agent_graph_store_from_state_db; use codex_core::config::Config; -use codex_core::init_state_db_from_config; +use codex_core::init_state_db; use codex_core::resolve_installation_id; use codex_core::shell::Shell; use codex_core::shell::get_shell_by_model_provided_path; @@ -426,12 +425,14 @@ impl TestCodexBuilder { environment_manager: Arc, ) -> anyhow::Result { let auth = self.auth.clone(); - let thread_manager = if config.model_catalog.is_some() { - let state_db = init_state_db_from_config(&config) + let needs_state_db = config.model_catalog.is_some() + || config.features.enabled(Feature::Goals) + || config.features.enabled(Feature::Sqlite); + let thread_manager = if needs_state_db { + let state_db = init_state_db(&config) .await .expect("test codex requires state db"); - let thread_store = thread_store_from_config(&config, state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); + let thread_store = thread_store_from_config(&config, Some(state_db.clone())); let installation_id = resolve_installation_id(&config.codex_home).await?; ThreadManager::new( &config, @@ -439,10 +440,9 @@ impl TestCodexBuilder { SessionSource::Exec, Arc::clone(&environment_manager), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, installation_id, + Some(state_db), ) } else { codex_core::test_support::thread_manager_with_models_provider_and_home( diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 143681e2e1bc..558f1411a26a 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -5,8 +5,7 @@ use codex_core::NewThread; use codex_core::Prompt; use codex_core::ResponseEvent; use codex_core::ThreadManager; -use codex_core::agent_graph_store_from_state_db; -use codex_core::init_state_db_from_config; +use codex_core::init_state_db; use codex_core::resolve_installation_id; use codex_core::thread_store_from_config; use codex_features::Feature; @@ -1116,11 +1115,10 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { Ok(None) => panic!("No CodexAuth found in codex_home"), Err(e) => panic!("Failed to load CodexAuth: {e}"), }; - let state_db = init_state_db_from_config(&config) + let state_db = init_state_db(&config) .await .expect("client test requires state db"); - let thread_store = thread_store_from_config(&config, state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); + let thread_store = thread_store_from_config(&config, Some(state_db.clone())); let installation_id = resolve_installation_id(&config.codex_home) .await .expect("resolve installation id"); @@ -1130,10 +1128,9 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { SessionSource::Exec, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, - state_db, thread_store, - agent_graph_store, installation_id, + Some(state_db), ); let NewThread { thread: codex, .. } = thread_manager .start_thread(config.clone()) diff --git a/codex-rs/core/tests/suite/personality_migration.rs b/codex-rs/core/tests/suite/personality_migration.rs index 25415cd7ee95..574e453e6dd1 100644 --- a/codex-rs/core/tests/suite/personality_migration.rs +++ b/codex-rs/core/tests/suite/personality_migration.rs @@ -46,7 +46,7 @@ async fn run_migration( config_toml: &ConfigToml, ) -> io::Result { let state_db = state_db_for_test(codex_home).await?; - maybe_migrate_personality(codex_home, config_toml, state_db).await + maybe_migrate_personality(codex_home, config_toml, Some(state_db)).await } async fn write_session_with_user_event(codex_home: &Path) -> io::Result<()> { diff --git a/codex-rs/mcp-server/src/lib.rs b/codex-rs/mcp-server/src/lib.rs index 2fd94e586a64..761de3062c44 100644 --- a/codex-rs/mcp-server/src/lib.rs +++ b/codex-rs/mcp-server/src/lib.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use codex_arg0::Arg0DispatchPaths; use codex_core::config::Config; +use codex_core::init_state_db; use codex_core::resolve_installation_id; use codex_exec_server::EnvironmentManager; use codex_exec_server::EnvironmentManagerArgs; @@ -141,19 +142,17 @@ pub async fn run_main( // Task: process incoming messages. let processor_handle = tokio::spawn({ let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); - let processor = MessageProcessor::new( + let state_db = init_state_db(&config).await; + let mut processor = MessageProcessor::new( outgoing_message_sender, arg0_paths, Arc::new(config), environment_manager, installation_id, + state_db, ) .await; async move { - let Some(mut processor) = processor else { - error!("failed to initialize MCP processor"); - return; - }; while let Some(msg) = incoming_rx.recv().await { match msg { JsonRpcMessage::Request(r) => processor.process_request(r).await, diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index 73cdb6193bb7..5e96daa6814b 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -2,10 +2,9 @@ use std::collections::HashMap; use std::sync::Arc; use codex_arg0::Arg0DispatchPaths; +use codex_core::StateDbHandle; use codex_core::ThreadManager; -use codex_core::agent_graph_store_from_state_db; use codex_core::config::Config; -use codex_core::init_state_db_from_config; use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; use codex_login::AuthManager; @@ -56,34 +55,31 @@ impl MessageProcessor { config: Arc, environment_manager: Arc, installation_id: String, - ) -> Option { + state_db: Option, + ) -> Self { let outgoing = Arc::new(outgoing); let auth_manager = AuthManager::shared_from_config( config.as_ref(), /*enable_codex_api_key_env*/ false, ) .await; - let state_db = init_state_db_from_config(config.as_ref()).await?; - let thread_store = thread_store_from_config(config.as_ref(), state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager, SessionSource::Mcp, environment_manager, /*analytics_events_client*/ None, - state_db, - thread_store, - agent_graph_store, + thread_store_from_config(config.as_ref(), state_db.clone()), installation_id, + state_db.clone(), )); - Some(Self { + Self { outgoing, initialized: false, arg0_paths, thread_manager, running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())), - }) + } } pub(crate) async fn process_request(&mut self, request: JsonRpcRequest) { diff --git a/codex-rs/thread-manager-sample/src/main.rs b/codex-rs/thread-manager-sample/src/main.rs index 4ad937f9af9f..aff3e2db1770 100644 --- a/codex-rs/thread-manager-sample/src/main.rs +++ b/codex-rs/thread-manager-sample/src/main.rs @@ -52,11 +52,10 @@ use codex_core_api::TuiNotificationSettings; use codex_core_api::UriBasedFileOpener; use codex_core_api::UserInput; use codex_core_api::WebSearchMode; -use codex_core_api::agent_graph_store_from_state_db; use codex_core_api::arg0_dispatch_or_else; use codex_core_api::built_in_model_providers; use codex_core_api::find_codex_home; -use codex_core_api::init_state_db_from_config; +use codex_core_api::init_state_db; use codex_core_api::item_event_to_server_notification; use codex_core_api::resolve_installation_id; use codex_core_api::set_default_originator; @@ -106,6 +105,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { }; let config = new_config(args.model, arg0_paths)?; + let state_db = init_state_db(&config).await; let auth_manager = AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; @@ -113,11 +113,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { config.codex_self_exe.clone(), config.codex_linux_sandbox_exe.clone(), )?; - let Some(state_db) = init_state_db_from_config(&config).await else { - bail!("thread manager sample requires state db"); - }; let thread_store = thread_store_from_config(&config, state_db.clone()); - let agent_graph_store = agent_graph_store_from_state_db(state_db.clone()); let environment_manager = Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await); let installation_id = resolve_installation_id(&config.codex_home).await?; @@ -127,10 +123,9 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { SessionSource::Exec, environment_manager, /*analytics_events_client*/ None, - state_db, Arc::clone(&thread_store), - agent_graph_store, installation_id, + state_db, ); let NewThread { diff --git a/codex-rs/thread-store/src/local/archive_thread.rs b/codex-rs/thread-store/src/local/archive_thread.rs index 268a6ffc47ce..8fb214e98c98 100644 --- a/codex-rs/thread-store/src/local/archive_thread.rs +++ b/codex-rs/thread-store/src/local/archive_thread.rs @@ -13,11 +13,11 @@ pub(super) async fn archive_thread( params: ArchiveThreadParams, ) -> ThreadStoreResult<()> { let thread_id = params.thread_id; - let state_db = store.state_db(); + let state_db_ctx = store.state_db().await; let rollout_path = find_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -52,10 +52,11 @@ pub(super) async fn archive_thread( } })?; - let _ = store - .state_db() - .mark_archived(thread_id, archived_path.as_path(), Utc::now()) - .await; + if let Some(ctx) = state_db_ctx { + let _ = ctx + .mark_archived(thread_id, archived_path.as_path(), Utc::now()) + .await; + } Ok(()) } @@ -74,15 +75,13 @@ mod tests { use crate::ThreadSortKey; use crate::ThreadStore; use crate::local::LocalThreadStore; - use crate::local::test_support::init_test_state_db; use crate::local::test_support::test_config; - use crate::local::test_support::test_store; use crate::local::test_support::write_session_file; #[tokio::test] async fn archive_thread_moves_rollout_to_archived_collection() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(201); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let active_path = @@ -128,12 +127,21 @@ mod tests { async fn archive_thread_updates_sqlite_metadata_when_present() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(202); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let active_path = write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); let mut builder = codex_state::ThreadMetadataBuilder::new( thread_id, active_path.clone(), diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index 44938e6e8e0c..d181149406dd 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -22,12 +22,12 @@ pub(super) async fn create_thread( })?; let config = RolloutConfig { codex_home: store.config.codex_home.clone(), - sqlite_home: store.sqlite_home(), + sqlite_home: store.config.sqlite_home.clone(), cwd, model_provider_id: params.metadata.model_provider.clone(), generate_memories: matches!(params.metadata.memory_mode, ThreadMemoryMode::Enabled), }; - let state_db_ctx = Some(store.state_db()); + let state_db_ctx = store.state_db().await; let recorder = RolloutRecorder::new( &config, RolloutRecorderParams::new( diff --git a/codex-rs/thread-store/src/local/list_threads.rs b/codex-rs/thread-store/src/local/list_threads.rs index 5cf1f27aa1c2..e470ad2be93c 100644 --- a/codex-rs/thread-store/src/local/list_threads.rs +++ b/codex-rs/thread-store/src/local/list_threads.rs @@ -39,16 +39,16 @@ pub(super) async fn list_threads( SortDirection::Asc => codex_rollout::SortDirection::Asc, SortDirection::Desc => codex_rollout::SortDirection::Desc, }; + let state_db = store.state_db().await; let rollout_config = RolloutConfig { codex_home: store.config.codex_home.clone(), - sqlite_home: store.sqlite_home(), + sqlite_home: store.config.sqlite_home.clone(), cwd: store.config.codex_home.clone(), model_provider_id: store.config.default_model_provider_id.clone(), generate_memories: false, }; - let state_db_ctx = Some(store.state_db()); let page = list_rollout_threads( - state_db_ctx, + state_db, &rollout_config, store.config.default_model_provider_id.as_str(), ¶ms, @@ -80,13 +80,14 @@ pub(super) async fn list_threads( .map(|thread| thread.thread_id) .collect::>(); let mut names = HashMap::::with_capacity(thread_ids.len()); - let state_db_ctx = store.state_db(); - for &thread_id in &thread_ids { - let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else { - continue; - }; - if let Some(title) = distinct_thread_metadata_title(&metadata) { - names.insert(thread_id, title); + if let Some(state_db_ctx) = store.state_db().await { + for &thread_id in &thread_ids { + let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else { + continue; + }; + if let Some(title) = distinct_thread_metadata_title(&metadata) { + names.insert(thread_id, title); + } } } if names.len() < thread_ids.len() @@ -107,9 +108,9 @@ pub(super) async fn list_threads( } async fn list_rollout_threads( - state_db_ctx: Option, + state_db: Option, config: &RolloutConfig, - default_model_provider: &str, + default_model_provider_id: &str, params: &ListThreadsParams, cursor: Option<&codex_rollout::Cursor>, sort_key: codex_rollout::ThreadSortKey, @@ -117,7 +118,7 @@ async fn list_rollout_threads( ) -> ThreadStoreResult { let page = if params.use_state_db_only && params.archived { RolloutRecorder::list_archived_threads_from_state_db( - state_db_ctx.clone(), + state_db, config, params.page_size, cursor, @@ -126,13 +127,13 @@ async fn list_rollout_threads( params.allowed_sources.as_slice(), params.model_providers.as_deref(), params.cwd_filters.as_deref(), - default_model_provider, + default_model_provider_id, params.search_term.as_deref(), ) .await } else if params.use_state_db_only { RolloutRecorder::list_threads_from_state_db( - state_db_ctx.clone(), + state_db, config, params.page_size, cursor, @@ -141,13 +142,13 @@ async fn list_rollout_threads( params.allowed_sources.as_slice(), params.model_providers.as_deref(), params.cwd_filters.as_deref(), - default_model_provider, + default_model_provider_id, params.search_term.as_deref(), ) .await } else if params.archived { RolloutRecorder::list_archived_threads( - state_db_ctx.clone(), + state_db, config, params.page_size, cursor, @@ -156,13 +157,13 @@ async fn list_rollout_threads( params.allowed_sources.as_slice(), params.model_providers.as_deref(), params.cwd_filters.as_deref(), - default_model_provider, + default_model_provider_id, params.search_term.as_deref(), ) .await } else { RolloutRecorder::list_threads( - state_db_ctx, + state_db, config, params.page_size, cursor, @@ -171,7 +172,7 @@ async fn list_rollout_threads( params.allowed_sources.as_slice(), params.model_providers.as_deref(), params.cwd_filters.as_deref(), - default_model_provider, + default_model_provider_id, params.search_term.as_deref(), ) .await @@ -194,9 +195,7 @@ mod tests { use super::*; use crate::ThreadStore; use crate::local::LocalThreadStore; - use crate::local::test_support::init_test_state_db; use crate::local::test_support::test_config; - use crate::local::test_support::test_store; use crate::local::test_support::write_archived_session_file; use crate::local::test_support::write_session_file; use crate::local::test_support::write_session_file_with; @@ -204,7 +203,7 @@ mod tests { #[tokio::test] async fn list_threads_uses_default_provider_when_rollout_omits_provider() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); write_session_file_with( home.path(), home.path().join("sessions/2025/01/03"), @@ -239,13 +238,22 @@ mod tests { async fn list_threads_preserves_sqlite_title_search_results() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(103); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = home.path().join("rollout-title-search.jsonl"); fs::write(&rollout_path, "").expect("placeholder rollout file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); let created_at = Utc::now(); let mut builder = codex_state::ThreadMetadataBuilder::new( thread_id, @@ -259,10 +267,6 @@ mod tests { let mut metadata = builder.build(config.default_model_provider_id.as_str()); metadata.title = "needle title".to_string(); metadata.first_user_message = Some("plain preview".to_string()); - runtime - .mark_backfill_complete(/*last_watermark*/ None) - .await - .expect("backfill should be complete"); runtime .upsert_thread(&metadata) .await @@ -299,7 +303,7 @@ mod tests { #[tokio::test] async fn list_threads_selects_active_or_archived_collection() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let active_uuid = Uuid::from_u128(105); let archived_uuid = Uuid::from_u128(106); write_session_file(home.path(), "2025-01-03T12-00-00", active_uuid) @@ -368,7 +372,7 @@ mod tests { async fn list_threads_returns_local_rollout_summary() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let store = LocalThreadStore::new(config.clone(), init_test_state_db(&config).await); + let store = LocalThreadStore::new(config, /*state_db*/ None); let uuid = Uuid::from_u128(101); let path = write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); @@ -407,7 +411,7 @@ mod tests { #[tokio::test] async fn list_threads_rejects_invalid_cursor() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let err = store .list_threads(ListThreadsParams { diff --git a/codex-rs/thread-store/src/local/live_writer.rs b/codex-rs/thread-store/src/local/live_writer.rs index 37d975897370..643207b59dec 100644 --- a/codex-rs/thread-store/src/local/live_writer.rs +++ b/codex-rs/thread-store/src/local/live_writer.rs @@ -66,12 +66,12 @@ pub(super) async fn resume_thread( })?; let config = RolloutConfig { codex_home: store.config.codex_home.clone(), - sqlite_home: store.sqlite_home(), + sqlite_home: store.config.sqlite_home.clone(), cwd, model_provider_id: params.metadata.model_provider.clone(), generate_memories: matches!(params.metadata.memory_mode, ThreadMemoryMode::Enabled), }; - let state_db_ctx = Some(store.state_db()); + let state_db_ctx = store.state_db().await; let recorder = RolloutRecorder::new( &config, RolloutRecorderParams::resume( diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 88724420731a..07aa5e925f11 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -41,7 +41,7 @@ use crate::UpdateThreadMetadataParams; pub struct LocalThreadStore { pub(super) config: LocalThreadStoreConfig, live_recorders: Arc>>, - state_db: StateDbHandle, + state_db: Option, } /// Process-scoped configuration for local thread storage. @@ -51,6 +51,7 @@ pub struct LocalThreadStore { #[derive(Clone, Debug, PartialEq, Eq)] pub struct LocalThreadStoreConfig { pub codex_home: PathBuf, + pub sqlite_home: PathBuf, /// Provider used only when older local metadata does not contain one. pub default_model_provider_id: String, } @@ -59,6 +60,7 @@ impl LocalThreadStoreConfig { pub fn from_config(config: &impl codex_rollout::RolloutConfigView) -> Self { Self { codex_home: config.codex_home().to_path_buf(), + sqlite_home: config.sqlite_home().to_path_buf(), default_model_provider_id: config.model_provider_id().to_string(), } } @@ -73,9 +75,8 @@ impl std::fmt::Debug for LocalThreadStore { } impl LocalThreadStore { - /// Create a local store from process-scoped local storage configuration and - /// the caller-provided shared state DB handle. - pub fn new(config: LocalThreadStoreConfig, state_db: StateDbHandle) -> Self { + /// Create a local store using an already initialized state DB handle. + pub fn new(config: LocalThreadStoreConfig, state_db: Option) -> Self { Self { config, live_recorders: Arc::new(Mutex::new(HashMap::new())), @@ -84,14 +85,10 @@ impl LocalThreadStore { } /// Return the state DB handle used by local rollout writers. - pub fn state_db(&self) -> StateDbHandle { + pub async fn state_db(&self) -> Option { self.state_db.clone() } - pub(super) fn sqlite_home(&self) -> PathBuf { - self.state_db.codex_home().to_path_buf() - } - /// Read a local rollout-backed thread by path. pub async fn read_thread_by_rollout_path( &self, @@ -285,16 +282,14 @@ mod tests { use super::*; use crate::ThreadEventPersistenceMode; use crate::ThreadPersistenceMetadata; - use crate::local::test_support::init_test_state_db; use crate::local::test_support::test_config; - use crate::local::test_support::test_store; use crate::local::test_support::write_archived_session_file; use crate::local::test_support::write_session_file; #[tokio::test] async fn live_writer_lifecycle_writes_and_closes() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let thread_id = ThreadId::default(); store @@ -343,7 +338,7 @@ mod tests { #[tokio::test] async fn create_thread_rejects_missing_cwd() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let thread_id = ThreadId::default(); let mut params = create_thread_params(thread_id); params.metadata.cwd = None; @@ -363,7 +358,7 @@ mod tests { #[tokio::test] async fn discard_thread_drops_unmaterialized_live_writer() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let thread_id = ThreadId::default(); store @@ -401,9 +396,8 @@ mod tests { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); let thread_id = ThreadId::default(); - let state_db = init_test_state_db(&config).await; - let first_store = LocalThreadStore::new(config.clone(), state_db.clone()); + let first_store = LocalThreadStore::new(config.clone(), /*state_db*/ None); first_store .create_thread(create_thread_params(thread_id)) .await @@ -432,7 +426,7 @@ mod tests { .await .expect("shutdown initial writer"); - let resumed_store = LocalThreadStore::new(config, state_db); + let resumed_store = LocalThreadStore::new(config, /*state_db*/ None); resumed_store .resume_thread(ResumeThreadParams { thread_id, @@ -463,7 +457,7 @@ mod tests { #[tokio::test] async fn create_thread_rejects_duplicate_live_writer() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let thread_id = ThreadId::default(); store @@ -483,7 +477,7 @@ mod tests { #[tokio::test] async fn resume_thread_rejects_duplicate_live_writer() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let thread_id = ThreadId::default(); store @@ -512,7 +506,7 @@ mod tests { #[tokio::test] async fn resume_thread_rejects_missing_cwd() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = uuid::Uuid::from_u128(407); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = @@ -541,7 +535,7 @@ mod tests { async fn load_history_uses_live_writer_rollout_path() { let home = TempDir::new().expect("temp dir"); let external_home = TempDir::new().expect("external temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = uuid::Uuid::from_u128(404); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = write_session_file(external_home.path(), "2025-01-04T10-00-00", uuid) @@ -590,7 +584,7 @@ mod tests { async fn read_thread_uses_live_writer_rollout_path_for_external_resume() { let home = TempDir::new().expect("temp dir"); let external_home = TempDir::new().expect("external temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = uuid::Uuid::from_u128(406); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = write_session_file(external_home.path(), "2025-01-04T11-00-00", uuid) @@ -629,7 +623,7 @@ mod tests { #[tokio::test] async fn load_history_uses_live_writer_rollout_path_for_archived_source() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = uuid::Uuid::from_u128(405); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = write_archived_session_file(home.path(), "2025-01-04T10-30-00", uuid) @@ -697,7 +691,7 @@ mod tests { #[tokio::test] async fn read_thread_by_rollout_path_includes_history() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let thread_id = ThreadId::default(); store diff --git a/codex-rs/thread-store/src/local/read_thread.rs b/codex-rs/thread-store/src/local/read_thread.rs index dd6e8494ab2d..9d685aace625 100644 --- a/codex-rs/thread-store/src/local/read_thread.rs +++ b/codex-rs/thread-store/src/local/read_thread.rs @@ -176,12 +176,12 @@ async fn resolve_rollout_path( return Ok(Some(path)); } - let state_db = store.state_db(); + let state_db_ctx = store.state_db().await; if include_archived { match find_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -191,7 +191,7 @@ async fn resolve_rollout_path( None => find_archived_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -202,7 +202,7 @@ async fn resolve_rollout_path( find_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -260,7 +260,8 @@ async fn read_sqlite_metadata( store: &LocalThreadStore, thread_id: codex_protocol::ThreadId, ) -> Option { - store.state_db().get_thread(thread_id).await.ok().flatten() + let runtime = store.state_db().await?; + runtime.get_thread(thread_id).await.ok().flatten() } async fn stored_thread_from_sqlite_metadata( @@ -414,9 +415,7 @@ mod tests { use super::*; use crate::ThreadStore; use crate::local::LocalThreadStore; - use crate::local::test_support::init_test_state_db; use crate::local::test_support::test_config; - use crate::local::test_support::test_store; use crate::local::test_support::write_archived_session_file; use crate::local::test_support::write_session_file; use crate::local::test_support::write_session_file_with_fork; @@ -424,7 +423,7 @@ mod tests { #[tokio::test] async fn read_thread_returns_active_rollout_summary() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(205); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let active_path = @@ -452,7 +451,7 @@ mod tests { #[tokio::test] async fn read_thread_returns_rollout_path_summary() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(211); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let active_path = @@ -483,12 +482,17 @@ mod tests { async fn read_thread_by_rollout_path_prefers_sqlite_git_info() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(223); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let active_path = write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new( thread_id, active_path.clone(), @@ -526,7 +530,7 @@ mod tests { #[tokio::test] async fn read_thread_returns_archived_rollout_when_requested() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(207); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let archived_path = write_archived_session_file(home.path(), "2025-01-03T12-00-00", uuid) @@ -567,7 +571,7 @@ mod tests { #[tokio::test] async fn read_thread_prefers_active_rollout_over_archived() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(208); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let active_path = @@ -592,7 +596,7 @@ mod tests { #[tokio::test] async fn read_thread_returns_forked_from_id() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(209); let parent_uuid = Uuid::from_u128(210); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); @@ -625,12 +629,17 @@ mod tests { async fn read_thread_applies_sqlite_thread_name() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(212); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new(thread_id, rollout_path, Utc::now(), SessionSource::Cli); builder.model_provider = Some(config.default_model_provider_id.clone()); @@ -660,8 +669,13 @@ mod tests { async fn read_thread_preserves_rollout_cwd_when_sqlite_metadata_exists() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let uuid = Uuid::from_u128(224); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let day_dir = home.path().join("sessions/2025/01/03"); @@ -730,7 +744,7 @@ mod tests { #[tokio::test] async fn read_thread_uses_legacy_thread_name_when_sqlite_title_is_missing() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(213); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); @@ -754,8 +768,6 @@ mod tests { async fn read_thread_uses_sqlite_metadata_for_rollout_without_user_preview() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(217); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let day_dir = home.path().join("sessions/2025/01/03"); @@ -777,6 +789,13 @@ mod tests { }); writeln!(file, "{meta}").expect("write session meta"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new( thread_id, rollout_path.clone(), @@ -819,13 +838,18 @@ mod tests { let home = TempDir::new().expect("temp dir"); let external = TempDir::new().expect("external temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(220); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file"); let stale_path = external.path().join("missing-rollout.jsonl"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new( thread_id, stale_path.clone(), @@ -863,8 +887,6 @@ mod tests { let home = TempDir::new().expect("temp dir"); let external = TempDir::new().expect("external temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(221); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = @@ -872,6 +894,13 @@ mod tests { let other_uuid = Uuid::from_u128(222); let stale_path = write_session_file(external.path(), "2025-01-04T12-00-00", other_uuid) .expect("other session file"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new(thread_id, stale_path, Utc::now(), SessionSource::Cli); builder.model_provider = Some("wrong-sqlite-provider".to_string()); @@ -903,7 +932,7 @@ mod tests { #[tokio::test] async fn read_thread_uses_session_meta_for_rollout_without_user_preview_or_sqlite_metadata() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(218); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let day_dir = home.path().join("sessions/2025/01/03"); @@ -958,13 +987,18 @@ mod tests { let home = TempDir::new().expect("temp dir"); let external = TempDir::new().expect("external temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(214); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = external .path() .join(format!("rollout-2025-01-03T12-00-00-{uuid}.jsonl")); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new( thread_id, rollout_path.clone(), @@ -1011,15 +1045,20 @@ mod tests { let home = TempDir::new().expect("temp dir"); let external = TempDir::new().expect("external temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(216); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let rollout_path = external .path() .join(format!("rollout-2025-01-03T12-00-00-{uuid}.jsonl")); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); let mut builder = ThreadMetadataBuilder::new(thread_id, rollout_path, Utc::now(), SessionSource::Cli); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); builder.archived_at = Some(Utc::now()); let mut metadata = builder.build(config.default_model_provider_id.as_str()); metadata.first_user_message = Some("Archived SQLite preview".to_string()); @@ -1062,12 +1101,17 @@ mod tests { async fn read_thread_sqlite_fallback_loads_archived_history() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(219); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let archived_path = write_archived_session_file(home.path(), "2025-01-03T12-00-00", uuid) .expect("archived session file"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let mut builder = ThreadMetadataBuilder::new( thread_id, archived_path.clone(), @@ -1103,7 +1147,7 @@ mod tests { #[tokio::test] async fn read_thread_fails_without_rollout() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(206); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); diff --git a/codex-rs/thread-store/src/local/test_support.rs b/codex-rs/thread-store/src/local/test_support.rs index 597014efe991..98321880ffe9 100644 --- a/codex-rs/thread-store/src/local/test_support.rs +++ b/codex-rs/thread-store/src/local/test_support.rs @@ -4,34 +4,18 @@ use std::path::Path; use std::path::PathBuf; use codex_rollout::ARCHIVED_SESSIONS_SUBDIR; -use codex_rollout::StateDbHandle; use uuid::Uuid; -use super::LocalThreadStore; use super::LocalThreadStoreConfig; pub(super) fn test_config(codex_home: &Path) -> LocalThreadStoreConfig { LocalThreadStoreConfig { codex_home: codex_home.to_path_buf(), + sqlite_home: codex_home.to_path_buf(), default_model_provider_id: "test-provider".to_string(), } } -pub(super) async fn init_test_state_db(config: &LocalThreadStoreConfig) -> StateDbHandle { - codex_state::StateRuntime::init( - config.codex_home.clone(), - config.default_model_provider_id.clone(), - ) - .await - .expect("state db should initialize") -} - -pub(super) async fn test_store(codex_home: &Path) -> LocalThreadStore { - let config = test_config(codex_home); - let state_db = init_test_state_db(&config).await; - LocalThreadStore::new(config, state_db) -} - pub(super) fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result { write_session_file_with( root, diff --git a/codex-rs/thread-store/src/local/unarchive_thread.rs b/codex-rs/thread-store/src/local/unarchive_thread.rs index 7ac3f01a0fd7..ad41db69acb1 100644 --- a/codex-rs/thread-store/src/local/unarchive_thread.rs +++ b/codex-rs/thread-store/src/local/unarchive_thread.rs @@ -17,11 +17,11 @@ pub(super) async fn unarchive_thread( params: ArchiveThreadParams, ) -> ThreadStoreResult { let thread_id = params.thread_id; - let state_db = store.state_db(); + let state_db_ctx = store.state_db().await; let archived_path = find_archived_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -73,10 +73,11 @@ pub(super) async fn unarchive_thread( message: format!("failed to update unarchived thread timestamp: {err}"), })?; - let _ = store - .state_db() - .mark_unarchived(thread_id, restored_path.as_path()) - .await; + if let Some(ctx) = state_db_ctx { + let _ = ctx + .mark_unarchived(thread_id, restored_path.as_path()) + .await; + } let item = read_thread_item_from_rollout(restored_path.clone()) .await @@ -111,15 +112,13 @@ mod tests { use super::*; use crate::ThreadStore; use crate::local::LocalThreadStore; - use crate::local::test_support::init_test_state_db; use crate::local::test_support::test_config; - use crate::local::test_support::test_store; use crate::local::test_support::write_archived_session_file; #[tokio::test] async fn unarchive_thread_restores_rollout_and_returns_updated_thread() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(203); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let archived_path = write_archived_session_file(home.path(), "2025-01-03T13-00-00", uuid) @@ -150,12 +149,21 @@ mod tests { async fn unarchive_thread_updates_sqlite_metadata_when_present() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(204); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let archived_path = write_archived_session_file(home.path(), "2025-01-03T13-00-00", uuid) .expect("archived session file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); let mut builder = codex_state::ThreadMetadataBuilder::new( thread_id, archived_path.clone(), diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index 15a7292124b0..671bcdfbf980 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -55,8 +55,9 @@ pub(super) async fn update_thread_metadata( .await?; } + let state_db_ctx = store.state_db().await; codex_rollout::state_db::reconcile_rollout( - Some(store.state_db()).as_deref(), + state_db_ctx.as_deref(), resolved_rollout_path.path.as_path(), store.config.default_model_provider_id.as_str(), /*builder*/ None, @@ -72,7 +73,11 @@ pub(super) async fn update_thread_metadata( let resolved_git_info = match git_info { Some(git_info) => { - let state_db = store.state_db(); + let Some(state_db) = store.state_db().await else { + return Err(ThreadStoreError::Internal { + message: format!("sqlite state db unavailable for thread {thread_id}"), + }); + }; let metadata = state_db .get_thread(thread_id) @@ -152,7 +157,11 @@ async fn apply_thread_git_info( branch: &Option, origin_url: &Option, ) -> ThreadStoreResult<()> { - let state_db = store.state_db(); + let Some(state_db) = store.state_db().await else { + return Err(ThreadStoreError::Internal { + message: format!("sqlite state db unavailable for thread {thread_id}"), + }); + }; let updated = state_db .update_thread_git_info( thread_id, @@ -232,17 +241,18 @@ async fn apply_thread_name( thread_id: ThreadId, name: String, ) -> ThreadStoreResult<()> { - let updated = store - .state_db() - .update_thread_title(thread_id, &name) - .await - .map_err(|err| ThreadStoreError::Internal { - message: format!("failed to set thread name: {err}"), - })?; - if !updated { - return Err(ThreadStoreError::Internal { - message: format!("thread metadata unavailable before name update: {thread_id}"), - }); + if let Some(state_db) = store.state_db().await { + let updated = state_db + .update_thread_title(thread_id, &name) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to set thread name: {err}"), + })?; + if !updated { + return Err(ThreadStoreError::Internal { + message: format!("thread metadata unavailable before name update: {thread_id}"), + }); + } } append_thread_name(store.config.codex_home.as_path(), thread_id, &name) @@ -300,11 +310,11 @@ async fn resolve_rollout_path( return Ok(ResolvedRolloutPath { path, archived }); } - let state_db = store.state_db(); + let state_db_ctx = store.state_db().await; let active_path = find_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -324,7 +334,7 @@ async fn resolve_rollout_path( find_archived_thread_path_by_id_str( store.config.codex_home.as_path(), &thread_id.to_string(), - Some(state_db.as_ref()), + state_db_ctx.as_deref(), ) .await .map_err(|err| ThreadStoreError::InvalidRequest { @@ -359,16 +369,21 @@ mod tests { use crate::ThreadPersistenceMetadata; use crate::ThreadStore; use crate::local::LocalThreadStore; - use crate::local::test_support::init_test_state_db; use crate::local::test_support::test_config; - use crate::local::test_support::test_store; use crate::local::test_support::write_archived_session_file; use crate::local::test_support::write_session_file; #[tokio::test] async fn update_thread_metadata_sets_name_on_active_rollout_and_indexes_name() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let config = test_config(home.path()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config, Some(runtime.clone())); let uuid = Uuid::from_u128(301); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); write_session_file(home.path(), "2025-01-03T14-00-00", uuid).expect("session file"); @@ -391,8 +406,7 @@ mod tests { .expect("find thread name"); assert_eq!(latest_name.as_deref(), Some("A sharper name")); - let metadata = store - .state_db() + let metadata = runtime .get_thread(thread_id) .await .expect("get metadata") @@ -404,12 +418,18 @@ mod tests { async fn update_thread_metadata_sets_memory_mode_on_active_rollout() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(302); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let path = write_session_file(home.path(), "2025-01-03T14-30-00", uuid).expect("session file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + let thread = store .update_thread_metadata(UpdateThreadMetadataParams { thread_id, @@ -442,8 +462,13 @@ mod tests { let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let path = write_session_file(home.path(), "2025-01-03T18-30-00", uuid).expect("session file"); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); store .update_thread_metadata(UpdateThreadMetadataParams { @@ -502,7 +527,7 @@ mod tests { async fn update_thread_metadata_uses_live_rollout_path_for_external_resume() { let home = TempDir::new().expect("temp dir"); let external_home = TempDir::new().expect("external temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(307); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let path = write_session_file(external_home.path(), "2025-01-03T14-45-00", uuid) @@ -543,8 +568,13 @@ mod tests { async fn update_thread_metadata_sets_git_info() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config, runtime); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config, Some(runtime)); let uuid = Uuid::from_u128(309); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); write_session_file(home.path(), "2025-01-03T17-00-00", uuid).expect("session file"); @@ -581,8 +611,13 @@ mod tests { async fn update_thread_metadata_partially_updates_git_info() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config, runtime); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config, Some(runtime)); let uuid = Uuid::from_u128(310); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); write_session_file(home.path(), "2025-01-03T17-30-00", uuid).expect("session file"); @@ -634,8 +669,13 @@ mod tests { async fn update_thread_metadata_clears_git_info_fields() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); let uuid = Uuid::from_u128(311); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let path = @@ -799,7 +839,7 @@ mod tests { #[tokio::test] async fn update_thread_metadata_rejects_mismatched_session_meta_id() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let filename_uuid = Uuid::from_u128(303); let metadata_uuid = Uuid::from_u128(304); let thread_id = ThreadId::from_string(&filename_uuid.to_string()).expect("valid thread id"); @@ -831,7 +871,7 @@ mod tests { #[tokio::test] async fn update_thread_metadata_rejects_multi_field_patch_without_partial_write() { let home = TempDir::new().expect("temp dir"); - let store = test_store(home.path()).await; + let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None); let uuid = Uuid::from_u128(305); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let path = @@ -866,12 +906,21 @@ mod tests { async fn update_thread_metadata_keeps_archived_thread_archived_in_sqlite() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(306); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let archived_path = write_archived_session_file(home.path(), "2025-01-03T16-00-00", uuid) .expect("archived session file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); codex_rollout::state_db::reconcile_rollout( Some(runtime.as_ref()), archived_path.as_path(), @@ -920,12 +969,21 @@ mod tests { async fn update_thread_metadata_keeps_live_archived_thread_archived_in_sqlite() { let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); - let runtime = init_test_state_db(&config).await; - let store = LocalThreadStore::new(config.clone(), runtime.clone()); let uuid = Uuid::from_u128(308); let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); let archived_path = write_archived_session_file(home.path(), "2025-01-03T16-30-00", uuid) .expect("archived session file"); + let runtime = codex_state::StateRuntime::init( + home.path().to_path_buf(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + runtime + .mark_backfill_complete(/*last_watermark*/ None) + .await + .expect("backfill should be complete"); codex_rollout::state_db::reconcile_rollout( Some(runtime.as_ref()), archived_path.as_path(),