Skip to content

[AIT-208] feat: partial sync implementation#1197

Merged
ttypic merged 2 commits intomainfrom
AIT-208/partial-sync
Mar 12, 2026
Merged

[AIT-208] feat: partial sync implementation#1197
ttypic merged 2 commits intomainfrom
AIT-208/partial-sync

Conversation

@ttypic
Copy link
Contributor

@ttypic ttypic commented Mar 11, 2026

  • rename syncObjectsDataPool to syncObjectsPool
  • Standardized naming across ObjectsManager to improve code clarity.
  • Updated references in implementation and tests.

Spec: ably/specification#413
JS: ably/ably-js#2152

Summary by CodeRabbit

  • Bug Fixes

    • Improved Live Objects sync: partial sync messages are merged (with tombstone semantics), unsupported partials are skipped, sync termination now reliably clears sync state when no objects remain, preventing lingering sync sessions.
  • Tests

    • Expanded test coverage for partial-sync merging, tombstone replacement, unsupported partial handling, buffered/end-of-sync behavior, and related lifecycle edge cases.

@ttypic ttypic requested a review from sacOO7 March 11, 2026 10:52
@coderabbitai
Copy link

coderabbitai bot commented Mar 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Renamed internal sync pool and API (syncObjectsDataPoolsyncObjectsPool, clearSyncObjectsDataPool()clearSyncObjectsPool()), added endSync() invocation in the attach-without-objects path, and implemented partial-sync merging and tombstone semantics with expanded tests for these behaviors.

Changes

Cohort / File(s) Summary
Objects manager core
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Renamed syncObjectsDataPoolsyncObjectsPool and clearSyncObjectsDataPool()clearSyncObjectsPool(); updated lifecycle methods (startNewSync, endSync, applySync, dispose) and added partial-sync merging logic (map merges, tombstone replaces pool entry, counter/unsupported handling).
Realtime entrypoint
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
Replaced calls to clearSyncObjectsDataPool() with clearSyncObjectsPool() and added objectsManager.endSync() in the attach-without-objects branch to terminate synchronization.
Test helpers & realtime tests
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt, liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
Renamed test-visible property/imports from SyncObjectsDataPoolSyncObjectsPool, updated private-field getters, switched mocks/verifications to clearSyncObjectsPool(), and adjusted counter-op naming to CounterInc.
Extended unit tests for sync behavior
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
Added comprehensive tests for partial-sync merging, tombstone replacement, error/logging paths for unsupported partials, multi-message merging, and endSync/buffering scenarios.
Minor comment update
liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
Updated a spec reference in a comment from RTO5b → RTO5f (no behavioral change).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 I hopped the pool and changed its name,
I joined the pieces into one new frame,
When tombstones came I swapped with care,
Cleared the pool, then whispered endSync there.
🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.36% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: partial sync implementation' clearly and specifically describes the main feature being implemented, aligning with the PR objectives of adding partial sync functionality.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch AIT-208/partial-sync
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot temporarily deployed to staging/pull/1197/features March 11, 2026 10:53 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1197/javadoc March 11, 2026 10:55 Inactive
@sacOO7 sacOO7 requested a review from Copilot March 11, 2026 10:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements partial OBJECT_SYNC handling for Live Objects by merging multiple sync messages for the same objectId (instead of overwriting), while also renaming the internal sync pool for clearer, standardized naming.

Changes:

  • Rename syncObjectsDataPool to syncObjectsPool (and corresponding accessor/clear methods) across implementation and tests.
  • Update sync processing to merge partial sync map messages for the same objectId, with special handling for tombstones and unsupported partial types.
  • Add unit tests covering partial sync merging/tombstone/skip paths and update test helpers accordingly.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt Renames sync pool and adds partial-sync merge logic for repeated objectIds during OBJECT_SYNC.
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt Updates state-change cleanup to call the renamed sync-pool clearing method.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt Adds new tests for partial sync merge/tombstone/skip logging behavior.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt Updates tests to use the renamed sync-pool APIs/accessors.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt Updates the test-only private-field accessor to the renamed sync pool.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)

660-690: Cover merged siteTimeserials, not just merged entries.

This test still passes if the second fragment's site-serial metadata is dropped during the merge, even though that metadata is what the runtime uses for post-sync ordering.

🧪 Suggested coverage
-        siteTimeserials = mapOf("site1" to "serial2"),
+        siteTimeserials = mapOf("site2" to "serial2"),
         map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2"))))
       )
     )

     objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")

     val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
     assertNotNull(liveMap.data["key1"], "key1 should be present after merge")
     assertNotNull(liveMap.data["key2"], "key2 should be present after merge")
     assertEquals("value1", liveMap.data["key1"]?.data?.string)
     assertEquals("value2", liveMap.data["key2"]?.data?.string)
+    assertEquals("serial1", liveMap.siteTimeserials["site1"])
+    assertEquals("serial2", liveMap.siteTimeserials["site2"])
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 263-276: When merging partial map syncs in ObjectsManager (the
when branch handling objectState.map), also merge the siteTimeserials from the
existing fragment and the incoming fragment instead of keeping the first
fragment's value; update the mergedMap creation to combine
existingState.map?.siteTimeserials and objectState.map.siteTimeserials (e.g.,
concatenate then deduplicate/preserve order as appropriate) and set that on
mergedMap before building mergedState and writing syncObjectsPool[objectId] =
existingEntry.copy(objectState = mergedState).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 254bfbfc-c643-471d-9f08-aedfae574373

📥 Commits

Reviewing files that changed from the base of the PR and between 74bf748 and 95135d5.

📒 Files selected for processing (5)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt

Base automatically changed from AIT-208/protocol-v6 to main March 11, 2026 12:35
@ttypic ttypic force-pushed the AIT-208/partial-sync branch from 95135d5 to 872a6f6 Compare March 11, 2026 12:39
@github-actions github-actions bot temporarily deployed to staging/pull/1197/features March 11, 2026 12:40 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1197/javadoc March 11, 2026 12:42 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)

270-274: ⚠️ Potential issue | 🟠 Major

Merge siteTimeserials when coalescing partial map sync fragments.

existingState.copy(map = mergedMap) keeps only the first fragment's siteTimeserials. If later partials carry newer per-site serials, applyObjectSync() seeds the live object with stale ordering metadata and can misorder or incorrectly dedupe subsequent ops.

💡 Proposed fix
             val existingState = existingEntry.objectState!! // non-null for existing entry
             val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
             val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
-            val mergedState = existingState.copy(map = mergedMap)
+            val mergedState = existingState.copy(
+              siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials,
+              map = mergedMap,
+            )
             syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 270 - 274, When coalescing partial map sync fragments we currently only
take entries from both fragments but keep the first fragment's per-site serials,
causing stale siteTimeserials to seed live objects; update the merge to combine
siteTimeserials from existingState.map and objectState.map by merging their maps
and selecting the newer/higher serial per site (e.g., max value per key), then
build mergedMap using ObjectsMap().copy(entries = mergedEntries, siteTimeserials
= mergedSiteTimeserials) and assign mergedState = existingState.copy(map =
mergedMap) before writing back to syncObjectsPool[objectId]; ensure you
reference existingState, objectState, mergedEntries, mergedMap and
applyObjectSync remains compatible with the merged siteTimeserials.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-83: The problem is that startNewSync reassigns
syncCompletionWaiter unconditionally, which leaves any applyAckResult() awaiting
the previous CompletableDeferred stranded when a server-initiated OBJECT_SYNC
begins; to fix, in ObjectsManager.startNewSync do not replace an existing active
syncCompletionWaiter—only create a new CompletableDeferred when there is no
current waiter or the existing one is already completed/consumed; keep using the
same syncCompletionWaiter across resyncs so applyAckResult() can resume/fail
correctly and continue to clear syncObjectsPool and set currentSyncId/state as
before.

---

Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: When coalescing partial map sync fragments we currently
only take entries from both fragments but keep the first fragment's per-site
serials, causing stale siteTimeserials to seed live objects; update the merge to
combine siteTimeserials from existingState.map and objectState.map by merging
their maps and selecting the newer/higher serial per site (e.g., max value per
key), then build mergedMap using ObjectsMap().copy(entries = mergedEntries,
siteTimeserials = mergedSiteTimeserials) and assign mergedState =
existingState.copy(map = mergedMap) before writing back to
syncObjectsPool[objectId]; ensure you reference existingState, objectState,
mergedEntries, mergedMap and applyObjectSync remains compatible with the merged
siteTimeserials.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: cdab0f9e-90f7-424e-97bc-6a7c001ea7f8

📥 Commits

Reviewing files that changed from the base of the PR and between 95135d5 and 872a6f6.

📒 Files selected for processing (5)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

@ttypic ttypic force-pushed the AIT-208/partial-sync branch from 872a6f6 to aa8f205 Compare March 11, 2026 12:47
@github-actions github-actions bot temporarily deployed to staging/pull/1197/features March 11, 2026 12:47 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1197/javadoc March 11, 2026 12:50 Inactive
@ttypic ttypic force-pushed the AIT-208/partial-sync branch from aa8f205 to 92ed8eb Compare March 11, 2026 12:54
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)

270-274: ⚠️ Potential issue | 🟠 Major

Merge siteTimeserials when coalescing partial map syncs.

ObjectState in liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt:264-304 treats siteTimeserials as part of the snapshot, but Lines 270-274 only merge map entries. Any site serials carried by later fragments are lost, so the synced object is seeded with stale ordering/dedup metadata.

💡 Suggested fix
-            val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
-            val mergedState = existingState.copy(map = mergedMap)
+            val mergedMap = (existingState.map ?: ObjectsMap()).copy(
+              semantics = objectState.map.semantics ?: existingState.map?.semantics,
+              entries = mergedEntries,
+            )
+            val mergedState = existingState.copy(
+              siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials,
+              map = mergedMap,
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 270 - 274, The merge currently only combines map.entries and discards
siteTimeserials; update the coalescing logic around
existingEntry/existingState/objectState so you also merge siteTimeserials (e.g.,
combine existingState.siteTimeserials with objectState.siteTimeserials,
preferring the newer fragment's values where keys collide), then include that
merged siteTimeserials in mergedState (the copy used to update
syncObjectsPool[objectId]) so ordering/dedup metadata is preserved.

80-82: ⚠️ Potential issue | 🔴 Critical

Don't replace an active syncCompletionWaiter.

If a server-initiated resync starts while applyAckResult() is already awaiting the current deferred on Lines 109-113, this assignment strands those coroutines on the old instance and their publishes never resume or fail. Reuse the existing active waiter and only allocate a new one when there isn't one.

💡 Suggested fix
-    syncCompletionWaiter = CompletableDeferred()
+    if (syncCompletionWaiter?.isActive != true) {
+      syncCompletionWaiter = CompletableDeferred()
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 80 - 82, When starting a resync, do not unconditionally replace the active
CompletableDeferred; instead check the current syncCompletionWaiter used by
applyAckResult() and only allocate a new CompletableDeferred when there is no
existing waiter or the existing one is completed (e.g. if (syncCompletionWaiter
== null || syncCompletionWaiter!!.isCompleted) { syncCompletionWaiter =
CompletableDeferred() }), so coroutines awaiting the old waiter in
applyAckResult() are not stranded; keep the existing syncCompletionWaiter
otherwise while still clearing syncObjectsPool and updating currentSyncId.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt`:
- Around line 371-373: The buffered increment in the test is currently a no-op
because CounterInc(number = 0.0) is queued; update the queued CounterInc to use
the expected delta (CounterInc(number = 3.0)) so the post-sync value moves from
5.0 to 8.0 and the replay is +3.0. Locate the queued operation using the symbol
CounterInc (and the surrounding ObjectOperationAction.CounterInc / objectId
"counter:test@1") and change its number to 3.0; apply the same fix to the second
occurrence referenced around the later expectations so both buffered increments
match the assertions.

---

Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: The merge currently only combines map.entries and
discards siteTimeserials; update the coalescing logic around
existingEntry/existingState/objectState so you also merge siteTimeserials (e.g.,
combine existingState.siteTimeserials with objectState.siteTimeserials,
preferring the newer fragment's values where keys collide), then include that
merged siteTimeserials in mergedState (the copy used to update
syncObjectsPool[objectId]) so ordering/dedup metadata is preserved.
- Around line 80-82: When starting a resync, do not unconditionally replace the
active CompletableDeferred; instead check the current syncCompletionWaiter used
by applyAckResult() and only allocate a new CompletableDeferred when there is no
existing waiter or the existing one is completed (e.g. if (syncCompletionWaiter
== null || syncCompletionWaiter!!.isCompleted) { syncCompletionWaiter =
CompletableDeferred() }), so coroutines awaiting the old waiter in
applyAckResult() are not stranded; keep the existing syncCompletionWaiter
otherwise while still clearing syncObjectsPool and updating currentSyncId.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 52afa433-a7cb-4bd8-84b9-aa8a063da076

📥 Commits

Reviewing files that changed from the base of the PR and between 872a6f6 and aa8f205.

📒 Files selected for processing (5)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (2)
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

@github-actions github-actions bot temporarily deployed to staging/pull/1197/features March 11, 2026 12:54 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1197/javadoc March 11, 2026 12:56 Inactive
- rename `syncObjectsDataPool` to `syncObjectsPool`
- Standardized naming across `ObjectsManager` to improve code clarity.
- Updated references in implementation and tests.
@ttypic ttypic force-pushed the AIT-208/partial-sync branch from 92ed8eb to f430594 Compare March 11, 2026 14:42
@github-actions github-actions bot temporarily deployed to staging/pull/1197/features March 11, 2026 14:43 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1197/javadoc March 11, 2026 14:45 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)

270-274: ⚠️ Potential issue | 🟠 Major

Merge siteTimeserials when combining partial map fragments.

This merge keeps the first fragment's siteTimeserials and drops later fragments' metadata. applyObjectSync() then seeds the live map from stale per-site serials, which can break ordering/dedup after sync.

🔧 Proposed fix
             val existingState = existingEntry.objectState!! // non-null for existing entry
             val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
-            val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
-            val mergedState = existingState.copy(map = mergedMap)
+            val mergedMap = (existingState.map ?: ObjectsMap()).copy(
+              semantics = objectState.map.semantics ?: existingState.map?.semantics,
+              entries = mergedEntries,
+            )
+            val mergedState = existingState.copy(
+              siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials,
+              map = mergedMap,
+            )
             syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 270 - 274, When merging partial map fragments in ObjectsManager, also
merge per-site serial metadata instead of keeping only the first fragment's
siteTimeserials: in the block using existingState, objectState and building
mergedMap (referencing existingState.map, objectState.map, ObjectsMap,
mergedEntries and syncObjectsPool[objectId] = existingEntry.copy(...)), combine
siteTimeserials from both maps into mergedMap.siteTimeserials by merging keys
and taking the highest serial per site (or otherwise resolving duplicates
deterministically) so applyObjectSync seeds the live map with the correct
per-site serials and preserves ordering/dedup semantics.

80-82: ⚠️ Potential issue | 🟠 Major

Preserve the active sync waiter across resyncs.

Line 82 replaces any in-flight syncCompletionWaiter. If a new OBJECT_SYNC starts while applyAckResult() is already awaiting the old deferred, that caller never resumes or fails.

🔧 Proposed fix
 internal fun startNewSync(syncId: String?) {
   Log.v(tag, "Starting new sync sequence: syncId=$syncId")

   syncObjectsPool.clear() // RTO5a2a
   currentSyncId = syncId
-  syncCompletionWaiter = CompletableDeferred()
+  if (syncCompletionWaiter?.isActive != true) {
+    syncCompletionWaiter = CompletableDeferred()
+  }
   stateChange(ObjectsState.Syncing)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 80 - 82, The resync logic in ObjectsManager (the lines calling
syncObjectsPool.clear(), setting currentSyncId, and assigning
syncCompletionWaiter) overwrites any existing in-flight syncCompletionWaiter and
can leave callers of applyAckResult() hanging; change the assignment so you only
create/replace syncCompletionWaiter when there is no active unfinished deferred
(i.e., if syncCompletionWaiter is null or already completed), otherwise preserve
the existing Deferred so applyAckResult() can resume; reference the symbols
syncObjectsPool.clear(), currentSyncId, syncCompletionWaiter, and
applyAckResult() when making the conditional creation/assignment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: When merging partial map fragments in ObjectsManager,
also merge per-site serial metadata instead of keeping only the first fragment's
siteTimeserials: in the block using existingState, objectState and building
mergedMap (referencing existingState.map, objectState.map, ObjectsMap,
mergedEntries and syncObjectsPool[objectId] = existingEntry.copy(...)), combine
siteTimeserials from both maps into mergedMap.siteTimeserials by merging keys
and taking the highest serial per site (or otherwise resolving duplicates
deterministically) so applyObjectSync seeds the live map with the correct
per-site serials and preserves ordering/dedup semantics.
- Around line 80-82: The resync logic in ObjectsManager (the lines calling
syncObjectsPool.clear(), setting currentSyncId, and assigning
syncCompletionWaiter) overwrites any existing in-flight syncCompletionWaiter and
can leave callers of applyAckResult() hanging; change the assignment so you only
create/replace syncCompletionWaiter when there is no active unfinished deferred
(i.e., if syncCompletionWaiter is null or already completed), otherwise preserve
the existing Deferred so applyAckResult() can resume; reference the symbols
syncObjectsPool.clear(), currentSyncId, syncCompletionWaiter, and
applyAckResult() when making the conditional creation/assignment.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a8f91d7b-0bbb-49b4-a27e-f6a77b23d37f

📥 Commits

Reviewing files that changed from the base of the PR and between aa8f205 and f430594.

📒 Files selected for processing (6)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
✅ Files skipped from review due to trivial changes (1)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Updated `createObjectFromState` to skip unsupported object types instead of throwing errors.
- Improved logging to warn about unsupported object types.
- Enhanced test coverage for handling unsupported object sync scenarios.
- Merged partial sync map entries seamlessly across multiple protocol messages.
Copy link
Collaborator

@sacOO7 sacOO7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing mentioned changes 👍

Copy link
Collaborator

@sacOO7 sacOO7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)

77-83: ⚠️ Potential issue | 🟠 Major

Reuse the active sync waiter across resyncs.

Line 82 always replaces syncCompletionWaiter. If applyAckResult() is already suspended when a new OBJECT_SYNC starts, that coroutine stays parked on the old deferred forever because endSync() only completes the replacement.

🔁 Suggested fix
 internal fun startNewSync(syncId: String?) {
   Log.v(tag, "Starting new sync sequence: syncId=$syncId")

   syncObjectsPool.clear() // RTO5a2a
   currentSyncId = syncId
-  syncCompletionWaiter = CompletableDeferred()
+  if (syncCompletionWaiter?.isActive != true) {
+    syncCompletionWaiter = CompletableDeferred()
+  }
   stateChange(ObjectsState.Syncing)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 77 - 83, startNewSync currently unconditionally replaces
syncCompletionWaiter which leaves coroutines suspended on the old deferred
forever; change startNewSync in ObjectsManager so it only creates a new
CompletableDeferred when there is no active waiter (e.g. if syncCompletionWaiter
is null or syncCompletionWaiter.isCompleted), otherwise reuse the existing
syncCompletionWaiter so applyAckResult awaiting the previous deferred can
complete; reference startNewSync, syncCompletionWaiter, applyAckResult and
endSync when making this change and ensure endSync still completes the existing
deferred (instead of completing a replaced one).

270-274: ⚠️ Potential issue | 🟠 Major

Merge the rest of the object state, not just map.entries.

Line 273 keeps the first fragment’s siteTimeserials and createOp while only appending entries. That can leave stale post-sync ordering metadata and drop create-state payload that ObjectState.map does not carry.

🧩 Suggested fix
             val existingState = existingEntry.objectState!! // non-null for existing entry
             val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
-            val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
-            val mergedState = existingState.copy(map = mergedMap)
+            val mergedMap = (existingState.map ?: ObjectsMap()).copy(
+              semantics = objectState.map.semantics ?: existingState.map?.semantics,
+              entries = mergedEntries,
+            )
+            val mergedState = existingState.copy(
+              siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials,
+              createOp = existingState.createOp ?: objectState.createOp,
+              map = mergedMap,
+            )
             syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 270 - 274, The current merge in ObjectsManager only appends map.entries
and reuses existingState for all other fields, which preserves stale
siteTimeserials/createOp and can drop create-state payload; update the merge so
you construct mergedState from existingState but explicitly merge/replace the
other ObjectState fields from objectState (e.g., siteTimeserials, createOp and
any payload/create-state fields) while keeping the mergedMap for the map field —
for example, build mergedMap from existingState.map and objectState.map entries,
then create mergedState = existingState.copy(map = mergedMap, siteTimeserials =
objectState.siteTimeserials ?: existingState.siteTimeserials, createOp =
objectState.createOp ?: existingState.createOp, ... ) and store that into
syncObjectsPool[objectId] instead of only replacing map.entries.
🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)

660-690: Assert merged sync metadata in the partial-map tests.

These cases only prove that the keys were merged. They still pass when later fragments’ siteTimeserials or createOp are discarded, so the partial-sync regression isn’t pinned down by the new coverage.

Also applies to: 693-765

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`
around lines 660 - 690, The test `(RTO5f2a2) partial sync map entries are merged
across two messages with the same objectId` currently only asserts keys exist;
update it to also assert that the merged sync metadata is preserved: after
calling ObjectsManager.handleObjectSyncMessages, fetch the DefaultLiveMap for
"map:test@1" and assert its combined/site-level siteTimeserials include both
"site1"->"serial1" and "site1"->"serial2" as appropriate (or that the resulting
siteTimeserial reflects the later serial), and assert any per-entry
createOp/metadata on liveMap.data["key1"] and liveMap.data["key2"] match the
createOp values from msg1 and msg2 respectively; apply the same additional
metadata assertions to the other partial-map tests noted (lines 693-765).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 262-275: The code currently merges an incoming map partial into an
existing pooled entry without checking the existing entry's type; update the
merge branch in ObjectsManager where syncObjectsPool[objectId] is handled to
first inspect existingEntry.objectState (and existingEntry.objectState?.counter)
and if the existing pooled fragment is a counter, reject or ignore the incoming
map partial instead of merging (preserve the tombstone replace behavior when
objectState.tombstone is true); ensure you use the same symbols
(syncObjectsPool, existingEntry, objectState, ObjectState, applySync) so the
pooled entry never ends up with both counter and map set.

---

Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-83: startNewSync currently unconditionally replaces
syncCompletionWaiter which leaves coroutines suspended on the old deferred
forever; change startNewSync in ObjectsManager so it only creates a new
CompletableDeferred when there is no active waiter (e.g. if syncCompletionWaiter
is null or syncCompletionWaiter.isCompleted), otherwise reuse the existing
syncCompletionWaiter so applyAckResult awaiting the previous deferred can
complete; reference startNewSync, syncCompletionWaiter, applyAckResult and
endSync when making this change and ensure endSync still completes the existing
deferred (instead of completing a replaced one).
- Around line 270-274: The current merge in ObjectsManager only appends
map.entries and reuses existingState for all other fields, which preserves stale
siteTimeserials/createOp and can drop create-state payload; update the merge so
you construct mergedState from existingState but explicitly merge/replace the
other ObjectState fields from objectState (e.g., siteTimeserials, createOp and
any payload/create-state fields) while keeping the mergedMap for the map field —
for example, build mergedMap from existingState.map and objectState.map entries,
then create mergedState = existingState.copy(map = mergedMap, siteTimeserials =
objectState.siteTimeserials ?: existingState.siteTimeserials, createOp =
objectState.createOp ?: existingState.createOp, ... ) and store that into
syncObjectsPool[objectId] instead of only replacing map.entries.

---

Nitpick comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`:
- Around line 660-690: The test `(RTO5f2a2) partial sync map entries are merged
across two messages with the same objectId` currently only asserts keys exist;
update it to also assert that the merged sync metadata is preserved: after
calling ObjectsManager.handleObjectSyncMessages, fetch the DefaultLiveMap for
"map:test@1" and assert its combined/site-level siteTimeserials include both
"site1"->"serial1" and "site1"->"serial2" as appropriate (or that the resulting
siteTimeserial reflects the later serial), and assert any per-entry
createOp/metadata on liveMap.data["key1"] and liveMap.data["key2"] match the
createOp values from msg1 and msg2 respectively; apply the same additional
metadata assertions to the other partial-map tests noted (lines 693-765).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3fe8bc55-a3a0-4496-9219-7903ec1845cd

📥 Commits

Reviewing files that changed from the base of the PR and between f430594 and f8555a9.

📒 Files selected for processing (2)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt

@ttypic ttypic merged commit 161d6ec into main Mar 12, 2026
14 of 15 checks passed
@ttypic ttypic deleted the AIT-208/partial-sync branch March 12, 2026 19:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants