Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions codex-rs/app-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ mod tests {
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputQuestion;
use codex_core::config::ConfigBuilder;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -1017,7 +1017,7 @@ mod tests {
) -> TestClient {
let codex_home = TempDir::new().expect("temp dir");
let config = Arc::new(build_test_config_for_codex_home(codex_home.path()).await);
let state_db = init_state_db_from_config(config.as_ref())
let state_db = init_state_db(config.as_ref())
.await
.expect("state db should initialize for in-process test");
let client = InProcessAppServerClient::start(InProcessClientStartArgs {
Expand Down
20 changes: 7 additions & 13 deletions codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::ThreadConfigLoader;
use codex_core::config::Config;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use codex_core::resolve_installation_id;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_protocol::protocol::SessionSource;
use codex_rollout::state_db::StateDbHandle;
pub use codex_rollout::StateDbHandle;
pub use codex_state::log_db::LogDbLayer;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -129,7 +129,7 @@ pub struct InProcessStartArgs {
pub feedback: CodexFeedback,
/// SQLite tracing layer used to flush recently emitted logs before feedback upload.
pub log_db: Option<LogDbLayer>,
/// Optional state DB handle to use for the in-process runtime.
/// Process-wide SQLite state handle shared with embedded app-server consumers.
pub state_db: Option<StateDbHandle>,
/// Environment manager used by core execution and filesystem operations.
pub environment_manager: Arc<EnvironmentManager>,
Expand Down Expand Up @@ -370,7 +370,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
let channel_capacity = args.channel_capacity.max(1);
let state_db = match args.state_db.clone() {
Some(state_db) => Some(state_db),
None => init_state_db_from_config(args.config.as_ref()).await,
None => init_state_db(args.config.as_ref()).await,
};
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
Expand Down Expand Up @@ -421,12 +421,6 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let Some(state_db) = state_db else {
warn!(
"in-process app-server state db initialization failed; shutting down processor task"
);
return;
};
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
Expand All @@ -436,7 +430,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
environment_manager: args.environment_manager,
feedback: args.feedback,
log_db: args.log_db,
state_db,
state_db: state_db.clone(),
config_warnings: args.config_warnings,
session_source: args.session_source,
auth_manager,
Expand Down Expand Up @@ -775,7 +769,7 @@ mod tests {
) -> InProcessClientHandle {
let codex_home = TempDir::new().expect("temp dir");
let config = Arc::new(build_test_config(codex_home.path()).await);
let state_db = init_state_db_from_config(config.as_ref())
let state_db = codex_rollout::state_db::try_init(config.as_ref())
.await
.expect("state db should initialize for in-process test");
let args = InProcessStartArgs {
Expand Down Expand Up @@ -833,7 +827,7 @@ mod tests {
}

#[tokio::test]
async fn in_process_allows_device_key_requests_to_reach_device_key_api() {
async fn in_process_allows_device_key_requests_to_reach_device_key_processor() {
let client = start_test_client(SessionSource::Cli).await;
const MALFORMED_KEY_ID_MESSAGE: &str = concat!(
"invalid device key payload: keyId must be dk_hse_, dk_tpm_, or dk_osn_ ",
Expand Down
37 changes: 23 additions & 14 deletions codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ use codex_config::TextRange as CoreTextRange;
use codex_core::ExecPolicyError;
use codex_core::check_execpolicy_for_warnings;
use codex_core::config::find_codex_home;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::ExecServerRuntimePaths;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use codex_rollout::state_db as rollout_state_db;
use codex_state::log_db;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -489,9 +489,9 @@ pub async fn run_main_with_transport_options(
}
};

let state_db = init_state_db_from_config(&config)
.await
.ok_or_else(|| std::io::Error::other("failed to initialize sqlite state db"))?;
let state_db_result = rollout_state_db::try_init(&config).await;
let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string);
let state_db = state_db_result.ok();

if should_run_personality_migration {
let effective_toml = config.config_layer_stack.effective_config();
Expand Down Expand Up @@ -600,12 +600,10 @@ pub async fn run_main_with_transport_options(

let feedback_layer = feedback.logger_layer();
let feedback_metadata_layer = feedback.metadata_layer();
let log_db = log_db::start(state_db.clone());
let log_db_layer = Some(
log_db
.clone()
.with_filter(Targets::new().with_default(Level::TRACE)),
);
let log_db = state_db.clone().map(log_db::start);
let log_db_layer = log_db
.clone()
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let _ = tracing_subscriber::registry()
Expand All @@ -622,6 +620,9 @@ pub async fn run_main_with_transport_options(
None => error!("{}", warning.summary),
}
}
if let Some(err) = &state_db_init_error {
error!("failed to initialize sqlite state db: {err}");
}
let installation_id = resolve_installation_id(&config.codex_home).await?;
let transport_shutdown_token = CancellationToken::new();
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();
Expand Down Expand Up @@ -667,17 +668,25 @@ pub async fn run_main_with_transport_options(
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;

let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_config_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_enabled = remote_control_config_enabled && state_db.is_some();
if remote_control_config_enabled && state_db.is_none() {
error!("remote control disabled because sqlite state db is unavailable");
}
if transport_accept_handles.is_empty() && !remote_control_enabled {
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
"no transport configured; use --listen or enable remote control",
if remote_control_config_enabled && state_db.is_none() {
"no transport configured; remote control disabled because sqlite state db is unavailable"
} else {
"no transport configured; use --listen or enable remote control"
},
));
}

let (remote_control_accept_handle, remote_control_handle) = start_remote_control(
config.chatgpt_base_url.clone(),
Some(state_db.clone()),
state_db.clone(),
auth_manager.clone(),
transport_event_tx.clone(),
transport_shutdown_token.clone(),
Expand Down Expand Up @@ -761,7 +770,7 @@ pub async fn run_main_with_transport_options(
config_manager,
environment_manager,
feedback: feedback.clone(),
log_db: Some(log_db),
log_db,
state_db: state_db.clone(),
config_warnings,
session_source,
Expand Down
11 changes: 4 additions & 7 deletions codex-rs/app-server/src/mcp_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ mod tests {
use codex_config::ThreadConfigLoadErrorCode;
use codex_config::ThreadConfigLoader;
use codex_config::ThreadConfigSource;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::ConfigOverrides;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_login::AuthManager;
Expand Down Expand Up @@ -175,21 +174,19 @@ mod tests {
.await?;

let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let state_db = init_state_db_from_config(&good_config)
let state_db = init_state_db(&good_config)
.await
.expect("refresh tests require state db");
let thread_store = thread_store_from_config(&good_config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_store = thread_store_from_config(&good_config, Some(state_db.clone()));
let thread_manager = Arc::new(ThreadManager::new(
&good_config,
auth_manager,
SessionSource::Exec,
Arc::new(EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
"11111111-1111-4111-8111-111111111111".to_string(),
Some(state_db),
));
thread_manager.start_thread(good_config).await?;
thread_manager.start_thread(bad_config).await?;
Expand Down
11 changes: 4 additions & 7 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ use codex_app_server_protocol::experimental_required_message;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::workspace_settings;
use codex_core::ThreadManager;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::Config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
Expand Down Expand Up @@ -255,7 +254,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) state_db: StateDbHandle,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
pub(crate) auth_manager: Arc<AuthManager>,
Expand Down Expand Up @@ -294,17 +293,15 @@ impl MessageProcessor {
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
Some(analytics_events_client.clone()),
state_db.clone(),
Arc::clone(&thread_store),
agent_graph_store.clone(),
installation_id,
state_db.clone(),
));
thread_manager
.plugins_manager()
Expand Down Expand Up @@ -350,7 +347,7 @@ impl MessageProcessor {
Arc::clone(&config),
feedback,
log_db,
Some(state_db.clone()),
state_db.clone(),
);
let git_processor = GitRequestProcessor::new();
let initialize_processor = InitializeRequestProcessor::new(
Expand Down Expand Up @@ -400,7 +397,7 @@ impl MessageProcessor {
thread_watch_manager.clone(),
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
Some(state_db.clone()),
state_db.clone(),
);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),
Expand Down
6 changes: 1 addition & 5 deletions codex-rs/app-server/src/message_processor_tracing_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
Expand Down Expand Up @@ -282,9 +281,6 @@ async fn build_test_processor(
outgoing_tx,
analytics_events_client.clone(),
));
let state_db = init_state_db_from_config(config.as_ref())
.await
.expect("tracing test processor requires state db");
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
Expand All @@ -294,7 +290,7 @@ async fn build_test_processor(
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
feedback: CodexFeedback::new(),
log_db: None,
state_db,
state_db: None,
config_warnings: Vec::new(),
session_source: SessionSource::VSCode,
auth_manager,
Expand Down
22 changes: 16 additions & 6 deletions codex-rs/app-server/src/request_processors/device_key_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use codex_device_key::RemoteControlClientConnectionAudience;
use codex_device_key::RemoteControlClientConnectionSignPayload;
use codex_device_key::RemoteControlClientEnrollmentAudience;
use codex_device_key::RemoteControlClientEnrollmentSignPayload;
use codex_rollout::state_db::StateDbHandle;
use codex_state::DeviceKeyBindingRecord;
use codex_state::StateRuntime;

#[derive(Clone)]
pub(crate) struct DeviceKeyRequestProcessor {
Expand All @@ -43,7 +43,10 @@ pub(crate) struct DeviceKeyRequestProcessor {
}

impl DeviceKeyRequestProcessor {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>, state_db: StateDbHandle) -> Self {
pub(crate) fn new(
outgoing: Arc<OutgoingMessageSender>,
state_db: Option<Arc<StateRuntime>>,
) -> Self {
Self {
outgoing,
store: DeviceKeyStore::new(Arc::new(StateDeviceKeyBindingStore::new(state_db))),
Expand Down Expand Up @@ -167,26 +170,33 @@ async fn sign_device_key(
}

struct StateDeviceKeyBindingStore {
state_db: StateDbHandle,
state_db: Option<Arc<StateRuntime>>,
}

impl StateDeviceKeyBindingStore {
fn new(state_db: StateDbHandle) -> Self {
fn new(state_db: Option<Arc<StateRuntime>>) -> Self {
Self { state_db }
}

async fn state_db(&self) -> Result<Arc<StateRuntime>, DeviceKeyError> {
self.state_db
.clone()
.ok_or_else(|| DeviceKeyError::Platform("sqlite state db unavailable".to_string()))
}
}

impl fmt::Debug for StateDeviceKeyBindingStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StateDeviceKeyBindingStore")
.field("has_state_db", &self.state_db.is_some())
.finish_non_exhaustive()
}
}

#[async_trait]
impl DeviceKeyBindingStore for StateDeviceKeyBindingStore {
async fn get_binding(&self, key_id: &str) -> Result<Option<DeviceKeyBinding>, DeviceKeyError> {
let state_db = self.state_db.clone();
let state_db = self.state_db().await?;
state_db
.get_device_key_binding(key_id)
.await
Expand All @@ -204,7 +214,7 @@ impl DeviceKeyBindingStore for StateDeviceKeyBindingStore {
key_id: &str,
binding: &DeviceKeyBinding,
) -> Result<(), DeviceKeyError> {
let state_db = self.state_db.clone();
let state_db = self.state_db().await?;
state_db
.upsert_device_key_binding(&DeviceKeyBindingRecord {
key_id: key_id.to_string(),
Expand Down
Loading
Loading