From c904d5effc935dd41994b35266c6ef63f89a1e16 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 5 May 2026 11:44:28 +0200 Subject: [PATCH 1/3] Migrate to microsecond precision for timestamps --- crates/core/src/kv.rs | 36 ------------ crates/core/src/lib.rs | 1 - crates/core/src/migrations.rs | 65 +++++++++++++++++++- crates/core/src/state.rs | 25 +++++++- crates/core/src/sync/interface.rs | 27 +++++---- crates/core/src/sync/mod.rs | 1 + crates/core/src/sync/storage_adapter.rs | 45 +++++++------- crates/core/src/sync/streaming_sync.rs | 65 +++++++++++--------- crates/core/src/sync/subscriptions.rs | 27 +++++---- crates/core/src/{ => sync}/sync_local.rs | 10 +++- crates/core/src/sync/sync_status.rs | 16 ++--- dart/test/goldens/simple_iteration.json | 2 +- dart/test/sync_stream_test.dart | 20 +++---- dart/test/sync_test.dart | 24 ++------ dart/test/utils/migration_fixtures.dart | 75 +++++++++++++++++++++++- dart/test/utils/native_test_utils.dart | 19 ++++++ 16 files changed, 305 insertions(+), 153 deletions(-) rename crates/core/src/{ => sync}/sync_local.rs (98%) diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 884ee45a..c926c466 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -7,10 +7,8 @@ use powersync_sqlite_nostd as sqlite; use powersync_sqlite_nostd::{Connection, Context}; use sqlite::ResultCode; -use crate::create_sqlite_optional_text_fn; use crate::create_sqlite_text_fn; use crate::error::PowerSyncError; -use crate::sync::BucketPriority; fn powersync_client_id_impl( ctx: *mut sqlite::context, @@ -39,30 +37,6 @@ create_sqlite_text_fn!( "powersync_client_id" ); -fn powersync_last_synced_at_impl( - ctx: *mut sqlite::context, - _args: &[*mut sqlite::value], -) -> Result, ResultCode> { - let db = ctx.db_handle(); - - // language=SQLite - let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?; - statement.bind_int(1, BucketPriority::SENTINEL.into())?; - - if statement.step()? == ResultCode::ROW { - let client_id = statement.column_text(0)?; - Ok(Some(client_id.to_string())) - } else { - Ok(None) - } -} - -create_sqlite_optional_text_fn!( - powersync_last_synced_at, - powersync_last_synced_at_impl, - "powersync_last_synced_at" -); - pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { db.create_function_v2( "powersync_client_id", @@ -74,16 +48,6 @@ pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { None, None, )?; - db.create_function_v2( - "powersync_last_synced_at", - 0, - sqlite::UTF8 | sqlite::DETERMINISTIC, - None, - Some(powersync_last_synced_at), - None, - None, - None, - )?; Ok(()) } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 2a5c3450..655f3356 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -24,7 +24,6 @@ mod pre_close_vtab; mod schema; mod state; mod sync; -mod sync_local; mod update_hooks; mod utils; mod uuid; diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 95df2208..4dda06b5 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -4,16 +4,19 @@ use alloc::format; use alloc::string::{String, ToString}; use alloc::vec::Vec; -use powersync_sqlite_nostd as sqlite; +use powersync_sqlite_nostd::{self as sqlite, Destructor}; use powersync_sqlite_nostd::{Connection, Context}; +use serde::Serialize; +use serde::ser::SerializeSeq; use sqlite::ResultCode; use crate::error::{PSResult, PowerSyncError}; +use crate::ext::SafeManagedStmt; use crate::fix_data::apply_v035_fix; use crate::schema::inspection::ExistingView; use crate::sync::BucketPriority; -pub const LATEST_VERSION: i32 = 12; +pub const LATEST_VERSION: i32 = 13; pub fn powersync_migrate( ctx: *mut sqlite::context, @@ -424,5 +427,63 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12') local_db.exec_safe(stmt).into_db_result(local_db)?; } + if current_version < 13 && target_version >= 13 { + let up = "\ +UPDATE ps_stream_subscriptions SET expires_at = expires_at * 1_000_000, last_synced_at = last_synced_at * 1_000_000; +ALTER TABLE ps_sync_state RENAME TO ps_sync_state_old; +CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at INTEGER NOT NULL +) STRICT; +INSERT INTO ps_sync_state (priority, last_synced_at) + SELECT priority, unixepoch(last_synced_at) * 1_000_000 FROM ps_sync_state_old; +DROP TABLE ps_sync_state_old; +"; + local_db.exec_safe(up).into_db_result(local_db)?; + + const DOWN_STATEMENTS: &[&str] = &[ + "UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1_000_000, last_synced_at = last_synced_at / 1_000_000", + "ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new", + "CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at TEXT NOT NULL +) STRICT;", + "INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1_000_000, 'unixepoch') FROM ps_sync_state", + "DROP TABLE ps_sync_state_new", + "DELETE FROM ps_migration WHERE id >= 13", + ]; + let down = serialize_down_statements(DOWN_STATEMENTS)?; + let track_migration = + local_db.prepare_v2("INSERT INTO ps_migration(id, down_migrations) VALUES (?, ?)")?; + track_migration.bind_int(1, 13)?; + track_migration.bind_text(2, &down, Destructor::STATIC)?; + track_migration.exec()?; + } + Ok(()) } + +fn serialize_down_statements(statements: &[&'static str]) -> Result { + struct DownStatements<'a>(&'a [&'static str]); + + #[derive(Serialize)] + struct DownStatement { + sql: &'static str, + } + + impl<'a> Serialize for DownStatements<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.0.len()))?; + for element in self.0 { + seq.serialize_element(&DownStatement { sql: element })?; + } + + seq.end() + } + } + + serde_json::to_string(&DownStatements(statements)).map_err(PowerSyncError::internal) +} diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index 9024133a..4a1d292e 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -12,8 +12,9 @@ use powersync_sqlite_nostd::{self as sqlite, Context}; use sqlite::{Connection, ResultCode}; use crate::{ + error::PowerSyncError, schema::{InferredSchemaCache, Schema}, - sync::SyncClient, + sync::{SyncClient, storage_adapter::StorageAdapter}, }; /// State that is shared for a SQLite database connection after the core extension has been @@ -27,6 +28,7 @@ pub struct DatabaseState { schema: RefCell>, pending_updates: RefCell>, commited_updates: RefCell>, + pub storage_adapter: RefCell>>, pub sync_client: RefCell>, /// Cached put and delete statements for raw tables, used by the `sync_local` step of the sync /// client. @@ -97,10 +99,29 @@ impl DatabaseState { core::mem::replace(&mut *committed, Default::default()) } + pub fn storage_adapter( + &self, + db: *mut sqlite::sqlite3, + ) -> Result, PowerSyncError> { + let mut adapter = self.storage_adapter.borrow_mut(); + Ok(match *adapter { + Some(ref adapter) => { + debug_assert!(db == adapter.db); + adapter.clone() + } + None => { + let created = Rc::new(StorageAdapter::new(db)?); + *adapter = Some(created.clone()); + created + } + }) + } + /// Releases global resources (like prepared statements for the sync client) referenced from /// this state. pub fn release_resources(&self) { - self.sync_client.replace(None); + self.sync_client.take(); + self.storage_adapter.take(); } /// ## Safety diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 3fce1368..546c843e 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -9,7 +9,6 @@ use crate::error::PowerSyncError; use crate::schema::Schema; use crate::state::DatabaseState; use crate::sync::diagnostics::{DiagnosticOptions, DiagnosticsEvent}; -use crate::sync::storage_adapter::StorageAdapter; use crate::sync::subscriptions::{StreamKey, apply_subscriptions}; use alloc::borrow::Cow; use alloc::boxed::Box; @@ -267,9 +266,10 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() } }), "subscriptions" => { + let adapter = state.storage_adapter(db)?; let request = serde_json::from_str(payload.text()) .map_err(PowerSyncError::as_argument_error)?; - return apply_subscriptions(db, request); + return apply_subscriptions(&adapter, request); } _ => { return Err(PowerSyncError::argument_error("Unknown operation")); @@ -278,14 +278,15 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() let instructions = { let mut client = state.sync_client.borrow_mut(); - - client - .get_or_insert_with(|| { + let client = match *client { + Some(ref mut client) => client, + None => { let state = unsafe { DatabaseState::clone_from(ctx.user_data()) }; - - SyncClient::new(db, &state) - }) - .push_event(event) + let created = SyncClient::new(db, &state)?; + client.insert(created) + } + }; + client.push_event(event) }?; let formatted = @@ -316,11 +317,11 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() "powersync_offline_sync_status", 0, sqlite::UTF8 | sqlite::DIRECTONLY | SQLITE_RESULT_SUBTYPE, - None, + Some(Rc::into_raw(state.clone()) as *mut c_void), Some(powersync_offline_sync_status), None, None, - None, + Some(DatabaseState::destroy_rc), )?; Ok(()) @@ -330,7 +331,9 @@ fn powersync_offline_sync_status_impl( ctx: *mut sqlite::context, _args: &[*mut sqlite::value], ) -> Result { - let adapter = StorageAdapter::new(ctx.db_handle())?; + let db_state = unsafe { DatabaseState::from_context(&ctx) }; + let adapter = db_state.storage_adapter(ctx.db_handle())?; + let state = adapter.offline_sync_state()?; let serialized = serde_json::to_string(&state).map_err(PowerSyncError::internal)?; diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index 3b746740..874eb29a 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -11,6 +11,7 @@ pub mod operations; pub mod storage_adapter; mod streaming_sync; mod subscriptions; +mod sync_local; mod sync_status; pub use bucket_priority::BucketPriority; diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 094904ac..74f34027 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -14,15 +14,16 @@ use crate::{ interface::{RequestedStreamSubscription, StreamSubscriptionRequest}, streaming_sync::{OwnedStreamDescription, RequestedStreamSubscriptions}, subscriptions::{LocallyTrackedSubscription, StreamKey}, - sync_status::{ActiveStreamSubscription, DownloadSyncStatus, SyncPriorityStatus}, + sync_local::{PartialSyncOperation, SyncOperation}, + sync_status::{ + ActiveStreamSubscription, DownloadSyncStatus, SyncPriorityStatus, TimestampMicros, + }, }, - sync_local::{PartialSyncOperation, SyncOperation}, utils::{JsonString, column_nullable}, }; use super::{ bucket_priority::BucketPriority, interface::BucketRequest, streaming_sync::OwnedCheckpoint, - sync_status::Timestamp, }; /// An adapter for storing sync state. @@ -46,7 +47,7 @@ impl StorageAdapter { .into_db_result(db)?; // language=SQLite - let time = db.prepare_v2("SELECT unixepoch()")?; + let time = db.prepare_v2("SELECT CAST(unixepoch('subsec') * 1_000_000 as integer)")?; // language=SQLite let delete_subscription = @@ -90,11 +91,9 @@ impl StorageAdapter { let priority_items = { // language=SQLite let statement = self - .db - .prepare_v2( - "SELECT priority, unixepoch(last_synced_at) FROM ps_sync_state ORDER BY priority", - ) - .into_db_result(self.db)?; + .db + .prepare_v2("SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority") + .into_db_result(self.db)?; let mut items = Vec::::new(); while statement.step()? == ResultCode::ROW { @@ -105,7 +104,7 @@ impl StorageAdapter { items.push(SyncPriorityStatus { priority, - last_synced_at: Some(Timestamp(timestamp)), + last_synced_at: Some(TimestampMicros(timestamp)), has_synced: Some(true), }); } @@ -272,10 +271,11 @@ WHERE bucket = ?1", priority: BucketPriority, buckets: Vec<&'a str>, } + let now = self.now()?; let sync_result = match priority { None => { - let mut sync = SyncOperation::new(state, self.db, None); + let mut sync = SyncOperation::new(state, self.db, None, now); sync.use_schema(schema); sync.apply() } @@ -305,6 +305,7 @@ WHERE bucket = ?1", priority, args: &serialized_args, }), + now, ); sync.use_schema(schema); sync.apply() @@ -331,7 +332,7 @@ WHERE bucket = ?1", } } - Ok(SyncLocalResult::ChangesApplied) + Ok(SyncLocalResult::ChangesApplied { timestamp: now }) } else { Ok(SyncLocalResult::PendingLocalChanges) } @@ -372,9 +373,9 @@ WHERE bucket = ?1", }) } - pub fn now(&self) -> Result { + pub fn now(&self) -> Result { self.time_stmt.step()?; - let res = Timestamp(self.time_stmt.column_int64(0)); + let res = TimestampMicros(self.time_stmt.column_int64(0)); self.time_stmt.reset()?; Ok(res) @@ -405,20 +406,24 @@ WHERE bucket = ?1", } fn delete_outdated_subscriptions(&self) -> Result<(), PowerSyncError> { - self.db - .exec_safe("DELETE FROM ps_stream_subscriptions WHERE (expires_at < unixepoch()) OR (ttl IS NULL AND NOT active)")?; + let now = self.now()?; + let stmt = self.db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE (expires_at < ?) OR (ttl IS NULL AND NOT active)")?; + stmt.bind_int64(1, now.0)?; + stmt.exec()?; Ok(()) } /// Increases the TTL for explicit subscriptions that are currently marked as active. pub fn increase_ttl(&self, streams: &[StreamKey]) -> Result<(), PowerSyncError> { + let now = self.now()?; let stmt = self.db.prepare_v2( - "UPDATE ps_stream_subscriptions SET expires_at = unixepoch() + ttl WHERE stream_name = ? AND local_params = ? AND ttl IS NOT NULL", + "UPDATE ps_stream_subscriptions SET expires_at = ? + ttl * 1_000_000 WHERE stream_name = ? AND local_params = ? AND ttl IS NOT NULL", )?; for stream in streams { - stmt.bind_text(1, &stream.name, sqlite::Destructor::STATIC)?; - stmt.bind_text(2, &stream.serialized_params(), sqlite::Destructor::STATIC)?; + stmt.bind_int64(1, now.0)?; + stmt.bind_text(2, &stream.name, sqlite::Destructor::STATIC)?; + stmt.bind_text(3, &stream.serialized_params(), sqlite::Destructor::STATIC)?; stmt.exec()?; } @@ -569,7 +574,7 @@ pub enum SyncLocalResult { /// pending local CRUD data to be uploaded and acknowledged in a write checkpoint. PendingLocalChanges, /// The checkpoint has been applied and changes have been published. - ChangesApplied, + ChangesApplied { timestamp: TimestampMicros }, } /// Information about the amount of operations a bucket had at the last checkpoint and how many diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 0c300f6d..5c680ce3 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -19,6 +19,7 @@ use futures_lite::FutureExt; use crate::{ error::{PowerSyncError, PowerSyncErrorCause}, + ext::SafeManagedStmt, kv::client_id, state::DatabaseState, sync::{ @@ -31,10 +32,10 @@ use crate::{ StreamSubscriptionErrorCause, SyncLineWithSource, }, subscriptions::LocallyTrackedSubscription, - sync_status::{ActiveStreamSubscription, Timestamp}, + sync_status::{ActiveStreamSubscription, TimestampMicros}, }, }; -use powersync_sqlite_nostd::{self as sqlite, Connection, ResultCode}; +use powersync_sqlite_nostd::{self as sqlite, Connection}; use super::{ interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent}, @@ -51,18 +52,25 @@ use super::{ /// initialized. pub struct SyncClient { db: *mut sqlite::sqlite3, + adapter: Rc, db_state: Weak, /// The current [ClientState] (essentially an optional [StreamingSyncIteration]). state: ClientState, } impl SyncClient { - pub fn new(db: *mut sqlite::sqlite3, state: &Rc) -> Self { - Self { + pub fn new( + db: *mut sqlite::sqlite3, + state: &Rc, + ) -> Result { + let adapter = state.storage_adapter(db)?; + + Ok(Self { db, + adapter, db_state: Rc::downgrade(state), state: ClientState::Idle, - } + }) } pub fn push_event<'a>( @@ -73,7 +81,12 @@ impl SyncClient { SyncControlRequest::StartSyncStream(options) => { self.state.tear_down()?; - let mut handle = SyncIterationHandle::new(self.db, options, self.db_state.clone())?; + let mut handle = SyncIterationHandle::new( + self.db, + options, + self.adapter.clone(), + self.db_state.clone(), + ); let instructions = handle.initialize()?; self.state = ClientState::IterationActive(handle); @@ -150,20 +163,20 @@ impl SyncIterationHandle { fn new( db: *mut sqlite::sqlite3, options: StartSyncStream, + adapter: Rc, state: Weak, - ) -> Result { + ) -> Self { let runner = StreamingSyncIteration { db, validated_but_not_applied: None, diagnostics: DiagnosticsCollector::for_options(&options), options, state, - adapter: StorageAdapter::new(db)?, + adapter, status: SyncStatusContainer::new(), }; let future = runner.run().boxed_local(); - - Ok(Self { future }) + Self { future } } /// Forwards a [SyncEvent::Initialize] to the current sync iteration, returning the initial @@ -231,7 +244,7 @@ impl<'a> ActiveEvent<'a> { struct StreamingSyncIteration { db: *mut sqlite::sqlite3, state: Weak, - adapter: StorageAdapter, + adapter: Rc, options: StartSyncStream, status: SyncStatusContainer, // A checkpoint that has been fully received and validated, but couldn't be applied due to @@ -344,7 +357,7 @@ impl StreamingSyncIteration { validated_but_not_applied: target.clone(), } } - SyncLocalResult::ChangesApplied => { + SyncLocalResult::ChangesApplied { timestamp } => { event.instructions.push(Instruction::LogLine { severity: LogSeverity::DEBUG, line: "Validated and applied checkpoint".into(), @@ -352,7 +365,7 @@ impl StreamingSyncIteration { event.instructions.push(Instruction::FlushFileSystem {}); SyncStateMachineTransition::SyncLocalChangesApplied { partial: None, - timestamp: self.adapter.now()?, + timestamp, } } } @@ -385,11 +398,10 @@ impl StreamingSyncIteration { // of priority 0. We'll resolve this for a complete checkpoint later. SyncStateMachineTransition::Empty } - SyncLocalResult::ChangesApplied => { - let now = self.adapter.now()?; + SyncLocalResult::ChangesApplied { timestamp } => { SyncStateMachineTransition::SyncLocalChangesApplied { partial: Some(priority), - timestamp: now, + timestamp, } } } @@ -629,13 +641,13 @@ impl StreamingSyncIteration { let result = self.sync_local(&checkpoint, None)?; match result { - SyncLocalResult::ChangesApplied => { + SyncLocalResult::ChangesApplied { timestamp } => { event.instructions.push(Instruction::LogLine { severity: LogSeverity::DEBUG, line: "Applied pending checkpoint after completed upload".into(), }); - self.handle_checkpoint_applied(event, self.adapter.now()?); + self.handle_checkpoint_applied(event, timestamp); } _ => { event.instructions.push(Instruction::LogLine { @@ -841,24 +853,21 @@ impl StreamingSyncIteration { .adapter .sync_local(&*state, target, priority, &self.options.schema)?; - if matches!(&result, SyncLocalResult::ChangesApplied) { + if let SyncLocalResult::ChangesApplied { timestamp } = result { // Update affected stream subscriptions to mark them as synced. let mut status = self.status.inner().borrow_mut(); if !status.streams.is_empty() { let stmt = self.adapter.db.prepare_v2( - "UPDATE ps_stream_subscriptions SET last_synced_at = unixepoch() WHERE id = ? RETURNING last_synced_at", + "UPDATE ps_stream_subscriptions SET last_synced_at = ?2 WHERE id = ?1", )?; for stream in &mut status.streams { if stream.is_in_priority(priority) { stmt.bind_int64(1, stream.id)?; - if stmt.step()? == ResultCode::ROW { - let timestamp = Timestamp(stmt.column_int64(0)); - stream.last_synced_at = Some(timestamp); - } - - stmt.reset()?; + stmt.bind_int64(2, timestamp.0)?; + stream.last_synced_at = Some(timestamp); + stmt.exec()?; } } } @@ -922,7 +931,7 @@ impl StreamingSyncIteration { }) } - fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent, timestamp: Timestamp) { + fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent, timestamp: TimestampMicros) { event.instructions.push(Instruction::DidCompleteSync {}); self.status.update( @@ -1105,7 +1114,7 @@ enum SyncStateMachineTransition<'a> { }, SyncLocalChangesApplied { partial: Option, - timestamp: Timestamp, + timestamp: TimestampMicros, }, CloseIteration(CloseSyncStream), Empty, diff --git a/crates/core/src/sync/subscriptions.rs b/crates/core/src/sync/subscriptions.rs index 52dc5a81..65d758bc 100644 --- a/crates/core/src/sync/subscriptions.rs +++ b/crates/core/src/sync/subscriptions.rs @@ -8,7 +8,7 @@ use serde_with::{DurationSeconds, serde_as}; use crate::{ error::{PSResult, PowerSyncError}, ext::SafeManagedStmt, - sync::BucketPriority, + sync::{BucketPriority, storage_adapter::StorageAdapter}, utils::JsonString, }; @@ -79,21 +79,25 @@ pub struct SubscribeToStream { } pub fn apply_subscriptions( - db: *mut sqlite::sqlite3, + adapter: &StorageAdapter, subscription: SubscriptionChangeRequest, ) -> Result<(), PowerSyncError> { + let db = adapter.db; + match subscription { SubscriptionChangeRequest::Subscribe(subscription) => { + let now = adapter.now()?; + let stmt = db .prepare_v2( " INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl, expires_at) - VALUES (?, ?2, ?, ?4, unixepoch() + ?4) + VALUES (?, ?2, ?, ?4, ?5) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, - expires_at = unixepoch() + ?4 + expires_at = ?5 ", ) .into_db_result(db)?; @@ -108,13 +112,14 @@ INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, subscription.stream.serialized_params(), sqlite::Destructor::STATIC, )?; - stmt.bind_int64( - 4, - subscription - .ttl - .map(|f| f.as_secs() as i64) - .unwrap_or(LocallyTrackedSubscription::DEFAULT_TTL) as i64, - )?; + let ttl_seconds = subscription + .ttl + .map(|f| f.as_secs() as i64) + .unwrap_or(LocallyTrackedSubscription::DEFAULT_TTL) + as i64; + stmt.bind_int64(4, ttl_seconds)?; + stmt.bind_int64(5, now.0 + ttl_seconds * 1_000_000)?; + stmt.exec()?; } SubscriptionChangeRequest::Unsubscribe(subscription) => { diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync/sync_local.rs similarity index 98% rename from crates/core/src/sync_local.rs rename to crates/core/src/sync/sync_local.rs index 874073c0..1293ef33 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync/sync_local.rs @@ -12,6 +12,7 @@ use crate::schema::{ }; use crate::state::DatabaseState; use crate::sync::BucketPriority; +use crate::sync::sync_status::TimestampMicros; use crate::utils::SqlBuffer; use powersync_sqlite_nostd::{self as sqlite, Destructor, ManagedStmt}; use powersync_sqlite_nostd::{Connection, ResultCode}; @@ -31,6 +32,7 @@ pub struct SyncOperation<'a> { db: *mut sqlite::sqlite3, schema: ParsedDatabaseSchema<'a>, partial: Option>, + time: TimestampMicros, } impl<'a> SyncOperation<'a> { @@ -38,12 +40,14 @@ impl<'a> SyncOperation<'a> { state: &'a DatabaseState, db: *mut sqlite::sqlite3, partial: Option>, + time: TimestampMicros, ) -> Self { Self { state, db, schema: ParsedDatabaseSchema::new(), partial, + time, } } @@ -380,8 +384,12 @@ SELECT // language=SQLite let stmt = self .db - .prepare_v2("INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, datetime());") .into_db_result(self.db)?; + .prepare_v2( + "INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, ?);", + ) + .into_db_result(self.db)?; stmt.bind_int(1, priority_code)?; + stmt.bind_int64(2, self.time.0)?; stmt.exec()?; Ok(()) diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 50d11ea0..eb7daeea 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -102,7 +102,7 @@ impl DownloadSyncStatus { } } - pub fn partial_checkpoint_complete(&mut self, priority: BucketPriority, now: Timestamp) { + pub fn partial_checkpoint_complete(&mut self, priority: BucketPriority, now: TimestampMicros) { self.debug_assert_priority_status_is_sorted(); // We can delete entries with a higher priority because this partial sync includes them. self.priority_status.retain(|i| i.priority < priority); @@ -117,7 +117,7 @@ impl DownloadSyncStatus { self.debug_assert_priority_status_is_sorted(); } - pub fn applied_checkpoint(&mut self, now: Timestamp) { + pub fn applied_checkpoint(&mut self, now: TimestampMicros) { self.downloading = None; self.priority_status.clear(); @@ -257,12 +257,12 @@ impl SyncStatusContainer { #[repr(transparent)] #[derive(Serialize, Hash, Clone, Copy)] -pub struct Timestamp(pub i64); +pub struct TimestampMicros(pub i64); #[derive(Serialize, Hash)] pub struct SyncPriorityStatus { pub priority: BucketPriority, - pub last_synced_at: Option, + pub last_synced_at: Option, pub has_synced: Option, } @@ -405,8 +405,8 @@ pub struct ActiveStreamSubscription { pub active: bool, pub is_default: bool, pub has_explicit_subscription: bool, - pub expires_at: Option, - pub last_synced_at: Option, + pub expires_at: Option, + pub last_synced_at: Option, } impl ActiveStreamSubscription { @@ -421,8 +421,8 @@ impl ActiveStreamSubscription { active: local.active, has_explicit_subscription: local.has_subscribed_manually(), - expires_at: local.expires_at.clone().map(|e| Timestamp(e)), - last_synced_at: local.last_synced_at.map(|e| Timestamp(e)), + expires_at: local.expires_at.clone().map(|e| TimestampMicros(e)), + last_synced_at: local.last_synced_at.map(|e| TimestampMicros(e)), } } diff --git a/dart/test/goldens/simple_iteration.json b/dart/test/goldens/simple_iteration.json index da52fa35..32c1db89 100644 --- a/dart/test/goldens/simple_iteration.json +++ b/dart/test/goldens/simple_iteration.json @@ -149,7 +149,7 @@ "priority_status": [ { "priority": 2147483647, - "last_synced_at": 1740823200, + "last_synced_at": 1740823200000000, "has_synced": true } ], diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index a7bf90de..f9d29999 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -107,12 +107,12 @@ void main() { lastStatus, containsPair( 'streams', - [containsPair('last_synced_at', 1740823200)], + [containsPair('last_synced_at', timestamp())], ), ); final [stored] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(stored, containsPair('last_synced_at', 1740823200)); + expect(stored, containsPair('last_synced_at', timestamp())); }); syncTest('are deleted', (_) { @@ -314,7 +314,7 @@ void main() { ); final [row] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(row, containsPair('expires_at', 1740826800)); + expect(row, containsPair('expires_at', timestamp(plusSeconds: 3600))); var startInstructions = control('start', null); expect( @@ -368,8 +368,6 @@ void main() { }); syncTest('increase ttl', (controller) { - const startTime = 1740826800; - control( 'subscriptions', json.encode({ @@ -381,7 +379,7 @@ void main() { ); var [row] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(row, containsPair('expires_at', startTime)); + expect(row, containsPair('expires_at', timestamp(plusHours: 1))); controller.elapse(const Duration(minutes: 30)); @@ -397,7 +395,7 @@ void main() { // Which should increase its expiry date. [row] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(row, containsPair('expires_at', startTime + 1800)); + expect(row, containsPair('expires_at', timestamp(plusHours: 1))); // The sync client uses token_expires_in lines to extend the expiry date // of active stream subscriptions. @@ -405,14 +403,14 @@ void main() { control('line_text', json.encode({'token_expires_in': 3600})); [row] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(row, containsPair('expires_at', startTime + 3600)); + expect(row, containsPair('expires_at', timestamp(plusHours: 1))); // Stopping should not increase the expiry date. controller.elapse(const Duration(minutes: 30)); control('stop', null); [row] = db.select('SELECT * FROM ps_stream_subscriptions'); - expect(row, containsPair('expires_at', startTime + 3600)); + expect(row, containsPair('expires_at', timestamp(plusMinutes: 30))); }); syncTest('can be made implicit', (_) { @@ -639,8 +637,8 @@ void main() { 'active': true, 'is_default': false, 'has_explicit_subscription': true, - 'expires_at': 1740909600, - 'last_synced_at': 1740823200 + 'expires_at': timestamp(plusDays: 1), + 'last_synced_at': timestamp() } ])); }); diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index cbe8ae9a..d9478d86 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -302,22 +302,12 @@ void _syncTests({ // Make sure there's only a single row in last_synced_at expect( db.select( - "SELECT datetime(last_synced_at) AS last_synced_at FROM ps_sync_state WHERE priority = ?", + "SELECT datetime(last_synced_at / 1_000_000, 'unixepoch') AS last_synced_at FROM ps_sync_state WHERE priority = ?", [prio ?? 2147483647]), [ {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} ], ); - - if (prio == null) { - expect( - db.select( - "SELECT datetime(powersync_last_synced_at()) AS last_synced_at"), - [ - {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} - ], - ); - } } controller.elapse(const Duration(hours: 1)); @@ -348,12 +338,12 @@ void _syncTests({ [ { 'priority': 2, - 'last_synced_at': 1740823800, + 'last_synced_at': timestamp(), 'has_synced': true }, { 'priority': 2147483647, - 'last_synced_at': 1740823200, + 'last_synced_at': timestamp(plusMinutes: -10), 'has_synced': true }, ], @@ -368,10 +358,10 @@ void _syncTests({ 'connected': false, 'connecting': false, 'priority_status': [ - {'priority': 2, 'last_synced_at': 1740823800, 'has_synced': true}, + {'priority': 2, 'last_synced_at': timestamp(), 'has_synced': true}, { 'priority': 2147483647, - 'last_synced_at': 1740823200, + 'last_synced_at': timestamp(plusMinutes: -10), 'has_synced': true } ], @@ -385,14 +375,10 @@ void _syncTests({ pushCheckpoint(buckets: priorityBuckets); pushCheckpointComplete(); - expect(db.select('SELECT powersync_last_synced_at() AS r').single, - {'r': isNotNull}); expect(db.select('SELECT priority FROM ps_sync_state').single, {'priority': 2147483647}); db.execute('SELECT powersync_clear(0)'); - expect(db.select('SELECT powersync_last_synced_at() AS r').single, - {'r': isNull}); expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0)); }); diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 09d67be9..2f049c2a 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 12; +const databaseVersion = 13; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -476,6 +476,67 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN downloaded_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') ''', + 13: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +, count_at_last INTEGER NOT NULL DEFAULT 0, count_since_last INTEGER NOT NULL DEFAULT 0, downloaded_size INTEGER NOT NULL DEFAULT 0) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_stream_subscriptions ( + id INTEGER NOT NULL PRIMARY KEY, + stream_name TEXT NOT NULL, + active INTEGER NOT NULL DEFAULT FALSE, + is_default INTEGER NOT NULL DEFAULT FALSE, + local_priority INTEGER, + local_params TEXT NOT NULL DEFAULT 'null', + ttl INTEGER, + expires_at INTEGER, + last_synced_at INTEGER, + UNIQUE (stream_name, local_params) +) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL PRIMARY KEY, + last_synced_at INTEGER NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(8, '[{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL,\n last_synced_at TEXT NOT NULL\n) STRICT"},{"sql":"INSERT INTO ps_sync_state SELECT * FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 8"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(9, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_at_last"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN count_since_last"},{"sql":"DELETE FROM ps_migration WHERE id >= 9"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN downloaded_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(13, '[{"sql":"UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1_000_000, last_synced_at = last_synced_at / 1_000_000"},{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL PRIMARY KEY,\n last_synced_at TEXT NOT NULL\n) STRICT;"},{"sql":"INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1_000_000, ''unixepoch'') FROM ps_sync_state"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 13"}]')''', }; final finalState = expectedState[databaseVersion]!; @@ -599,6 +660,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 13: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete, count_at_last, count_since_last, downloaded_size) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0, 0, 0, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0, 0, 0, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -646,6 +718,7 @@ final dataDown1 = { 9: data1[9]!, 10: data1[9]!, 11: data1[10]!, + 12: data1[12]!, }; final finalData1 = data1[databaseVersion]!; diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index 8cb1412e..a0865d01 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -1,6 +1,7 @@ import 'dart:ffi'; import 'dart:io'; +import 'package:clock/clock.dart'; import 'package:fake_async/fake_async.dart'; import 'package:meta/meta.dart'; import 'package:sqlite3/common.dart'; @@ -90,3 +91,21 @@ void syncTest(String description, void Function(FakeAsync controller) body) { fakeAsync(body, initialTime: DateTime.utc(2025, 3, 1, 10)); }); } + +/// Generates an expected statement relative to the current mocked time (in +/// microseconds). +int timestamp({ + int plusSeconds = 0, + int plusMinutes = 0, + int plusHours = 0, + int plusDays = 0, +}) { + final plus = Duration( + seconds: plusSeconds, + minutes: plusMinutes, + hours: plusHours, + days: plusDays, + ); + + return clock.now().microsecondsSinceEpoch + plus.inMicroseconds; +} From bb11e2b5a6f47e36342b8279c6695d45ef353aa2 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 5 May 2026 12:48:34 +0200 Subject: [PATCH 2/3] Add datetime migration test --- dart/test/sync_stream_test.dart | 58 +++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/dart/test/sync_stream_test.dart b/dart/test/sync_stream_test.dart index f9d29999..ef77e6d3 100644 --- a/dart/test/sync_stream_test.dart +++ b/dart/test/sync_stream_test.dart @@ -1,5 +1,6 @@ import 'dart:convert'; +import 'package:clock/clock.dart'; import 'package:file/local.dart'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/sqlite3.dart'; @@ -656,4 +657,61 @@ void main() { db.execute('select powersync_clear(0);'); expect(db.select('select * from ps_stream_subscriptions'), isEmpty); }); + + syncTest('can migrate from old timestamps', (async) { + // Migrate ps_sync_state to text-based date values and stream subscriptions + // to timestamps with second precision. + db.execute('SELECT powersync_test_migration(?)', [12]); + + // Mark as synced + db + ..execute( + 'INSERT INTO ps_sync_state(priority, last_synced_at) VALUES (?, ?)', [ + 2147483647, + clock.now().toIso8601String(), + ]) + ..execute( + 'INSERT INTO ps_stream_subscriptions(stream_name, active, ttl, expires_at, last_synced_at) ' + 'VALUES (?, ?, ?, ?, ?)', + [ + 'stream', + 1, + 3600, + clock.now().millisecondsSinceEpoch ~/ 1000 + 1800, + clock.now().millisecondsSinceEpoch ~/ 1000, + ], + ); + + db.execute('SELECT powersync_test_migration(?)', [13]); + + final [statusRow] = db.select('SELECT powersync_offline_sync_status()'); + expect( + json.decode(statusRow.columnAt(0)), + allOf( + containsPair( + 'priority_status', + [ + { + 'priority': 2147483647, + 'has_synced': true, + 'last_synced_at': timestamp() + } + ], + ), + containsPair('streams', [ + { + 'name': 'stream', + 'parameters': null, + 'priority': null, + 'active': true, + 'is_default': false, + 'has_explicit_subscription': true, + 'expires_at': timestamp(plusMinutes: 30), + 'last_synced_at': timestamp(), + 'progress': anything, + } + ]), + ), + ); + }); } From 5e1c6016664536a8980c802678dee675dc772932 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 5 May 2026 13:16:42 +0200 Subject: [PATCH 3/3] AI feedback --- crates/core/src/migrations.rs | 22 ++++++---------------- crates/core/src/sync/storage_adapter.rs | 4 ++-- dart/test/utils/migration_fixtures.dart | 2 +- dart/test/utils/native_test_utils.dart | 2 +- 4 files changed, 10 insertions(+), 20 deletions(-) diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index 4dda06b5..06f22146 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -7,7 +7,7 @@ use alloc::vec::Vec; use powersync_sqlite_nostd::{self as sqlite, Destructor}; use powersync_sqlite_nostd::{Connection, Context}; use serde::Serialize; -use serde::ser::SerializeSeq; +use serde_json::json; use sqlite::ResultCode; use crate::error::{PSResult, PowerSyncError}; @@ -429,26 +429,26 @@ json_object('sql', 'DELETE FROM ps_migration WHERE id >= 12') if current_version < 13 && target_version >= 13 { let up = "\ -UPDATE ps_stream_subscriptions SET expires_at = expires_at * 1_000_000, last_synced_at = last_synced_at * 1_000_000; +UPDATE ps_stream_subscriptions SET expires_at = expires_at * 1000000, last_synced_at = last_synced_at * 1000000; ALTER TABLE ps_sync_state RENAME TO ps_sync_state_old; CREATE TABLE ps_sync_state ( priority INTEGER NOT NULL PRIMARY KEY, last_synced_at INTEGER NOT NULL ) STRICT; INSERT INTO ps_sync_state (priority, last_synced_at) - SELECT priority, unixepoch(last_synced_at) * 1_000_000 FROM ps_sync_state_old; + SELECT priority, unixepoch(last_synced_at) * 1000000 FROM ps_sync_state_old; DROP TABLE ps_sync_state_old; "; local_db.exec_safe(up).into_db_result(local_db)?; const DOWN_STATEMENTS: &[&str] = &[ - "UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1_000_000, last_synced_at = last_synced_at / 1_000_000", + "UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1000000, last_synced_at = last_synced_at / 1000000", "ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new", "CREATE TABLE ps_sync_state ( priority INTEGER NOT NULL PRIMARY KEY, last_synced_at TEXT NOT NULL ) STRICT;", - "INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1_000_000, 'unixepoch') FROM ps_sync_state", + "INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1000000, 'unixepoch') FROM ps_sync_state_new", "DROP TABLE ps_sync_state_new", "DELETE FROM ps_migration WHERE id >= 13", ]; @@ -466,22 +466,12 @@ DROP TABLE ps_sync_state_old; fn serialize_down_statements(statements: &[&'static str]) -> Result { struct DownStatements<'a>(&'a [&'static str]); - #[derive(Serialize)] - struct DownStatement { - sql: &'static str, - } - impl<'a> Serialize for DownStatements<'a> { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { - let mut seq = serializer.serialize_seq(Some(self.0.len()))?; - for element in self.0 { - seq.serialize_element(&DownStatement { sql: element })?; - } - - seq.end() + serializer.collect_seq(self.0.iter().map(|s| json!({"sql": s})).into_iter()) } } diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs index 74f34027..7b5298f3 100644 --- a/crates/core/src/sync/storage_adapter.rs +++ b/crates/core/src/sync/storage_adapter.rs @@ -47,7 +47,7 @@ impl StorageAdapter { .into_db_result(db)?; // language=SQLite - let time = db.prepare_v2("SELECT CAST(unixepoch('subsec') * 1_000_000 as integer)")?; + let time = db.prepare_v2("SELECT CAST(unixepoch('subsec') * 1000000 as integer)")?; // language=SQLite let delete_subscription = @@ -417,7 +417,7 @@ WHERE bucket = ?1", pub fn increase_ttl(&self, streams: &[StreamKey]) -> Result<(), PowerSyncError> { let now = self.now()?; let stmt = self.db.prepare_v2( - "UPDATE ps_stream_subscriptions SET expires_at = ? + ttl * 1_000_000 WHERE stream_name = ? AND local_params = ? AND ttl IS NOT NULL", + "UPDATE ps_stream_subscriptions SET expires_at = ? + ttl * 1000000 WHERE stream_name = ? AND local_params = ? AND ttl IS NOT NULL", )?; for stream in streams { diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 2f049c2a..9566a182 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -536,7 +536,7 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(10, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"DELETE FROM ps_migration WHERE id >= 10"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(11, '[{"sql":"DROP TABLE ps_stream_subscriptions"},{"sql":"DELETE FROM ps_migration WHERE id >= 11"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(12, '[{"sql":"ALTER TABLE ps_buckets DROP COLUMN downloaded_size"},{"sql":"DELETE FROM ps_migration WHERE id >= 12"}]') -;INSERT INTO ps_migration(id, down_migrations) VALUES(13, '[{"sql":"UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1_000_000, last_synced_at = last_synced_at / 1_000_000"},{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL PRIMARY KEY,\n last_synced_at TEXT NOT NULL\n) STRICT;"},{"sql":"INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1_000_000, ''unixepoch'') FROM ps_sync_state"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 13"}]')''', +;INSERT INTO ps_migration(id, down_migrations) VALUES(13, '[{"sql":"UPDATE ps_stream_subscriptions SET expires_at = expires_at / 1000000, last_synced_at = last_synced_at / 1000000"},{"sql":"ALTER TABLE ps_sync_state RENAME TO ps_sync_state_new"},{"sql":"CREATE TABLE ps_sync_state (\n priority INTEGER NOT NULL PRIMARY KEY,\n last_synced_at TEXT NOT NULL\n) STRICT;"},{"sql":"INSERT INTO ps_sync_state (priority, last_synced_at) SELECT priority, datetime(last_synced_at / 1000000, ''unixepoch'') FROM ps_sync_state_new"},{"sql":"DROP TABLE ps_sync_state_new"},{"sql":"DELETE FROM ps_migration WHERE id >= 13"}]')''', }; final finalState = expectedState[databaseVersion]!; diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index a0865d01..d1852395 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -92,7 +92,7 @@ void syncTest(String description, void Function(FakeAsync controller) body) { }); } -/// Generates an expected statement relative to the current mocked time (in +/// Generates an expected timestamp relative to the current mocked time (in /// microseconds). int timestamp({ int plusSeconds = 0,