[AIT-208] feat: partial sync implementation#1197
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughRenamed internal sync pool and API ( Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Implements partial OBJECT_SYNC handling for Live Objects by merging multiple sync messages for the same objectId (instead of overwriting), while also renaming the internal sync pool for clearer, standardized naming.
Changes:
- Rename
syncObjectsDataPooltosyncObjectsPool(and corresponding accessor/clear methods) across implementation and tests. - Update sync processing to merge partial sync map messages for the same
objectId, with special handling for tombstones and unsupported partial types. - Add unit tests covering partial sync merging/tombstone/skip paths and update test helpers accordingly.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt | Renames sync pool and adds partial-sync merge logic for repeated objectIds during OBJECT_SYNC. |
| liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt | Updates state-change cleanup to call the renamed sync-pool clearing method. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt | Adds new tests for partial sync merge/tombstone/skip logging behavior. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt | Updates tests to use the renamed sync-pool APIs/accessors. |
| liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt | Updates the test-only private-field accessor to the renamed sync pool. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
Show resolved
Hide resolved
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)
660-690: Cover mergedsiteTimeserials, not just merged entries.This test still passes if the second fragment's site-serial metadata is dropped during the merge, even though that metadata is what the runtime uses for post-sync ordering.
🧪 Suggested coverage
- siteTimeserials = mapOf("site1" to "serial2"), + siteTimeserials = mapOf("site2" to "serial2"), map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2")))) ) ) objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:") val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap assertNotNull(liveMap.data["key1"], "key1 should be present after merge") assertNotNull(liveMap.data["key2"], "key2 should be present after merge") assertEquals("value1", liveMap.data["key1"]?.data?.string) assertEquals("value2", liveMap.data["key2"]?.data?.string) + assertEquals("serial1", liveMap.siteTimeserials["site1"]) + assertEquals("serial2", liveMap.siteTimeserials["site2"])
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 263-276: When merging partial map syncs in ObjectsManager (the
when branch handling objectState.map), also merge the siteTimeserials from the
existing fragment and the incoming fragment instead of keeping the first
fragment's value; update the mergedMap creation to combine
existingState.map?.siteTimeserials and objectState.map.siteTimeserials (e.g.,
concatenate then deduplicate/preserve order as appropriate) and set that on
mergedMap before building mergedState and writing syncObjectsPool[objectId] =
existingEntry.copy(objectState = mergedState).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 254bfbfc-c643-471d-9f08-aedfae574373
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
95135d5 to
872a6f6
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)
270-274:⚠️ Potential issue | 🟠 MajorMerge
siteTimeserialswhen coalescing partial map sync fragments.
existingState.copy(map = mergedMap)keeps only the first fragment'ssiteTimeserials. If later partials carry newer per-site serials,applyObjectSync()seeds the live object with stale ordering metadata and can misorder or incorrectly dedupe subsequent ops.💡 Proposed fix
val existingState = existingEntry.objectState!! // non-null for existing entry val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty() val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + map = mergedMap, + ) syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, When coalescing partial map sync fragments we currently only take entries from both fragments but keep the first fragment's per-site serials, causing stale siteTimeserials to seed live objects; update the merge to combine siteTimeserials from existingState.map and objectState.map by merging their maps and selecting the newer/higher serial per site (e.g., max value per key), then build mergedMap using ObjectsMap().copy(entries = mergedEntries, siteTimeserials = mergedSiteTimeserials) and assign mergedState = existingState.copy(map = mergedMap) before writing back to syncObjectsPool[objectId]; ensure you reference existingState, objectState, mergedEntries, mergedMap and applyObjectSync remains compatible with the merged siteTimeserials.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-83: The problem is that startNewSync reassigns
syncCompletionWaiter unconditionally, which leaves any applyAckResult() awaiting
the previous CompletableDeferred stranded when a server-initiated OBJECT_SYNC
begins; to fix, in ObjectsManager.startNewSync do not replace an existing active
syncCompletionWaiter—only create a new CompletableDeferred when there is no
current waiter or the existing one is already completed/consumed; keep using the
same syncCompletionWaiter across resyncs so applyAckResult() can resume/fail
correctly and continue to clear syncObjectsPool and set currentSyncId/state as
before.
---
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: When coalescing partial map sync fragments we currently
only take entries from both fragments but keep the first fragment's per-site
serials, causing stale siteTimeserials to seed live objects; update the merge to
combine siteTimeserials from existingState.map and objectState.map by merging
their maps and selecting the newer/higher serial per site (e.g., max value per
key), then build mergedMap using ObjectsMap().copy(entries = mergedEntries,
siteTimeserials = mergedSiteTimeserials) and assign mergedState =
existingState.copy(map = mergedMap) before writing back to
syncObjectsPool[objectId]; ensure you reference existingState, objectState,
mergedEntries, mergedMap and applyObjectSync remains compatible with the merged
siteTimeserials.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cdab0f9e-90f7-424e-97bc-6a7c001ea7f8
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
- liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
872a6f6 to
aa8f205
Compare
aa8f205 to
92ed8eb
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)
270-274:⚠️ Potential issue | 🟠 MajorMerge
siteTimeserialswhen coalescing partial map syncs.
ObjectStateinliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt:264-304treatssiteTimeserialsas part of the snapshot, but Lines 270-274 only merge map entries. Any site serials carried by later fragments are lost, so the synced object is seeded with stale ordering/dedup metadata.💡 Suggested fix
- val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedMap = (existingState.map ?: ObjectsMap()).copy( + semantics = objectState.map.semantics ?: existingState.map?.semantics, + entries = mergedEntries, + ) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + map = mergedMap, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, The merge currently only combines map.entries and discards siteTimeserials; update the coalescing logic around existingEntry/existingState/objectState so you also merge siteTimeserials (e.g., combine existingState.siteTimeserials with objectState.siteTimeserials, preferring the newer fragment's values where keys collide), then include that merged siteTimeserials in mergedState (the copy used to update syncObjectsPool[objectId]) so ordering/dedup metadata is preserved.
80-82:⚠️ Potential issue | 🔴 CriticalDon't replace an active
syncCompletionWaiter.If a server-initiated resync starts while
applyAckResult()is already awaiting the current deferred on Lines 109-113, this assignment strands those coroutines on the old instance and their publishes never resume or fail. Reuse the existing active waiter and only allocate a new one when there isn't one.💡 Suggested fix
- syncCompletionWaiter = CompletableDeferred() + if (syncCompletionWaiter?.isActive != true) { + syncCompletionWaiter = CompletableDeferred() + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 80 - 82, When starting a resync, do not unconditionally replace the active CompletableDeferred; instead check the current syncCompletionWaiter used by applyAckResult() and only allocate a new CompletableDeferred when there is no existing waiter or the existing one is completed (e.g. if (syncCompletionWaiter == null || syncCompletionWaiter!!.isCompleted) { syncCompletionWaiter = CompletableDeferred() }), so coroutines awaiting the old waiter in applyAckResult() are not stranded; keep the existing syncCompletionWaiter otherwise while still clearing syncObjectsPool and updating currentSyncId.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt`:
- Around line 371-373: The buffered increment in the test is currently a no-op
because CounterInc(number = 0.0) is queued; update the queued CounterInc to use
the expected delta (CounterInc(number = 3.0)) so the post-sync value moves from
5.0 to 8.0 and the replay is +3.0. Locate the queued operation using the symbol
CounterInc (and the surrounding ObjectOperationAction.CounterInc / objectId
"counter:test@1") and change its number to 3.0; apply the same fix to the second
occurrence referenced around the later expectations so both buffered increments
match the assertions.
---
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: The merge currently only combines map.entries and
discards siteTimeserials; update the coalescing logic around
existingEntry/existingState/objectState so you also merge siteTimeserials (e.g.,
combine existingState.siteTimeserials with objectState.siteTimeserials,
preferring the newer fragment's values where keys collide), then include that
merged siteTimeserials in mergedState (the copy used to update
syncObjectsPool[objectId]) so ordering/dedup metadata is preserved.
- Around line 80-82: When starting a resync, do not unconditionally replace the
active CompletableDeferred; instead check the current syncCompletionWaiter used
by applyAckResult() and only allocate a new CompletableDeferred when there is no
existing waiter or the existing one is completed (e.g. if (syncCompletionWaiter
== null || syncCompletionWaiter!!.isCompleted) { syncCompletionWaiter =
CompletableDeferred() }), so coroutines awaiting the old waiter in
applyAckResult() are not stranded; keep the existing syncCompletionWaiter
otherwise while still clearing syncObjectsPool and updating currentSyncId.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 52afa433-a7cb-4bd8-84b9-aa8a063da076
📒 Files selected for processing (5)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (2)
- liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
- liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
Outdated
Show resolved
Hide resolved
- rename `syncObjectsDataPool` to `syncObjectsPool` - Standardized naming across `ObjectsManager` to improve code clarity. - Updated references in implementation and tests.
92ed8eb to
f430594
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)
270-274:⚠️ Potential issue | 🟠 MajorMerge
siteTimeserialswhen combining partial map fragments.This merge keeps the first fragment's
siteTimeserialsand drops later fragments' metadata.applyObjectSync()then seeds the live map from stale per-site serials, which can break ordering/dedup after sync.🔧 Proposed fix
val existingState = existingEntry.objectState!! // non-null for existing entry val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty() - val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedMap = (existingState.map ?: ObjectsMap()).copy( + semantics = objectState.map.semantics ?: existingState.map?.semantics, + entries = mergedEntries, + ) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + map = mergedMap, + ) syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, When merging partial map fragments in ObjectsManager, also merge per-site serial metadata instead of keeping only the first fragment's siteTimeserials: in the block using existingState, objectState and building mergedMap (referencing existingState.map, objectState.map, ObjectsMap, mergedEntries and syncObjectsPool[objectId] = existingEntry.copy(...)), combine siteTimeserials from both maps into mergedMap.siteTimeserials by merging keys and taking the highest serial per site (or otherwise resolving duplicates deterministically) so applyObjectSync seeds the live map with the correct per-site serials and preserves ordering/dedup semantics.
80-82:⚠️ Potential issue | 🟠 MajorPreserve the active sync waiter across resyncs.
Line 82 replaces any in-flight
syncCompletionWaiter. If a newOBJECT_SYNCstarts whileapplyAckResult()is already awaiting the old deferred, that caller never resumes or fails.🔧 Proposed fix
internal fun startNewSync(syncId: String?) { Log.v(tag, "Starting new sync sequence: syncId=$syncId") syncObjectsPool.clear() // RTO5a2a currentSyncId = syncId - syncCompletionWaiter = CompletableDeferred() + if (syncCompletionWaiter?.isActive != true) { + syncCompletionWaiter = CompletableDeferred() + } stateChange(ObjectsState.Syncing) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 80 - 82, The resync logic in ObjectsManager (the lines calling syncObjectsPool.clear(), setting currentSyncId, and assigning syncCompletionWaiter) overwrites any existing in-flight syncCompletionWaiter and can leave callers of applyAckResult() hanging; change the assignment so you only create/replace syncCompletionWaiter when there is no active unfinished deferred (i.e., if syncCompletionWaiter is null or already completed), otherwise preserve the existing Deferred so applyAckResult() can resume; reference the symbols syncObjectsPool.clear(), currentSyncId, syncCompletionWaiter, and applyAckResult() when making the conditional creation/assignment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 270-274: When merging partial map fragments in ObjectsManager,
also merge per-site serial metadata instead of keeping only the first fragment's
siteTimeserials: in the block using existingState, objectState and building
mergedMap (referencing existingState.map, objectState.map, ObjectsMap,
mergedEntries and syncObjectsPool[objectId] = existingEntry.copy(...)), combine
siteTimeserials from both maps into mergedMap.siteTimeserials by merging keys
and taking the highest serial per site (or otherwise resolving duplicates
deterministically) so applyObjectSync seeds the live map with the correct
per-site serials and preserves ordering/dedup semantics.
- Around line 80-82: The resync logic in ObjectsManager (the lines calling
syncObjectsPool.clear(), setting currentSyncId, and assigning
syncCompletionWaiter) overwrites any existing in-flight syncCompletionWaiter and
can leave callers of applyAckResult() hanging; change the assignment so you only
create/replace syncCompletionWaiter when there is no active unfinished deferred
(i.e., if syncCompletionWaiter is null or already completed), otherwise preserve
the existing Deferred so applyAckResult() can resume; reference the symbols
syncObjectsPool.clear(), currentSyncId, syncCompletionWaiter, and
applyAckResult() when making the conditional creation/assignment.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a8f91d7b-0bbb-49b4-a27e-f6a77b23d37f
📒 Files selected for processing (6)
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.ktliveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
✅ Files skipped from review due to trivial changes (1)
- liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Updated `createObjectFromState` to skip unsupported object types instead of throwing errors. - Improved logging to warn about unsupported object types. - Enhanced test coverage for handling unsupported object sync scenarios. - Merged partial sync map entries seamlessly across multiple protocol messages.
sacOO7
left a comment
There was a problem hiding this comment.
Thanks for addressing mentioned changes 👍
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (2)
77-83:⚠️ Potential issue | 🟠 MajorReuse the active sync waiter across resyncs.
Line 82 always replaces
syncCompletionWaiter. IfapplyAckResult()is already suspended when a newOBJECT_SYNCstarts, that coroutine stays parked on the old deferred forever becauseendSync()only completes the replacement.🔁 Suggested fix
internal fun startNewSync(syncId: String?) { Log.v(tag, "Starting new sync sequence: syncId=$syncId") syncObjectsPool.clear() // RTO5a2a currentSyncId = syncId - syncCompletionWaiter = CompletableDeferred() + if (syncCompletionWaiter?.isActive != true) { + syncCompletionWaiter = CompletableDeferred() + } stateChange(ObjectsState.Syncing) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 77 - 83, startNewSync currently unconditionally replaces syncCompletionWaiter which leaves coroutines suspended on the old deferred forever; change startNewSync in ObjectsManager so it only creates a new CompletableDeferred when there is no active waiter (e.g. if syncCompletionWaiter is null or syncCompletionWaiter.isCompleted), otherwise reuse the existing syncCompletionWaiter so applyAckResult awaiting the previous deferred can complete; reference startNewSync, syncCompletionWaiter, applyAckResult and endSync when making this change and ensure endSync still completes the existing deferred (instead of completing a replaced one).
270-274:⚠️ Potential issue | 🟠 MajorMerge the rest of the object state, not just
map.entries.Line 273 keeps the first fragment’s
siteTimeserialsandcreateOpwhile only appending entries. That can leave stale post-sync ordering metadata and drop create-state payload thatObjectState.mapdoes not carry.🧩 Suggested fix
val existingState = existingEntry.objectState!! // non-null for existing entry val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty() - val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries) - val mergedState = existingState.copy(map = mergedMap) + val mergedMap = (existingState.map ?: ObjectsMap()).copy( + semantics = objectState.map.semantics ?: existingState.map?.semantics, + entries = mergedEntries, + ) + val mergedState = existingState.copy( + siteTimeserials = existingState.siteTimeserials + objectState.siteTimeserials, + createOp = existingState.createOp ?: objectState.createOp, + map = mergedMap, + ) syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around lines 270 - 274, The current merge in ObjectsManager only appends map.entries and reuses existingState for all other fields, which preserves stale siteTimeserials/createOp and can drop create-state payload; update the merge so you construct mergedState from existingState but explicitly merge/replace the other ObjectState fields from objectState (e.g., siteTimeserials, createOp and any payload/create-state fields) while keeping the mergedMap for the map field — for example, build mergedMap from existingState.map and objectState.map entries, then create mergedState = existingState.copy(map = mergedMap, siteTimeserials = objectState.siteTimeserials ?: existingState.siteTimeserials, createOp = objectState.createOp ?: existingState.createOp, ... ) and store that into syncObjectsPool[objectId] instead of only replacing map.entries.
🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)
660-690: Assert merged sync metadata in the partial-map tests.These cases only prove that the keys were merged. They still pass when later fragments’
siteTimeserialsorcreateOpare discarded, so the partial-sync regression isn’t pinned down by the new coverage.Also applies to: 693-765
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt` around lines 660 - 690, The test `(RTO5f2a2) partial sync map entries are merged across two messages with the same objectId` currently only asserts keys exist; update it to also assert that the merged sync metadata is preserved: after calling ObjectsManager.handleObjectSyncMessages, fetch the DefaultLiveMap for "map:test@1" and assert its combined/site-level siteTimeserials include both "site1"->"serial1" and "site1"->"serial2" as appropriate (or that the resulting siteTimeserial reflects the later serial), and assert any per-entry createOp/metadata on liveMap.data["key1"] and liveMap.data["key2"] match the createOp values from msg1 and msg2 respectively; apply the same additional metadata assertions to the other partial-map tests noted (lines 693-765).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 262-275: The code currently merges an incoming map partial into an
existing pooled entry without checking the existing entry's type; update the
merge branch in ObjectsManager where syncObjectsPool[objectId] is handled to
first inspect existingEntry.objectState (and existingEntry.objectState?.counter)
and if the existing pooled fragment is a counter, reject or ignore the incoming
map partial instead of merging (preserve the tombstone replace behavior when
objectState.tombstone is true); ensure you use the same symbols
(syncObjectsPool, existingEntry, objectState, ObjectState, applySync) so the
pooled entry never ends up with both counter and map set.
---
Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-83: startNewSync currently unconditionally replaces
syncCompletionWaiter which leaves coroutines suspended on the old deferred
forever; change startNewSync in ObjectsManager so it only creates a new
CompletableDeferred when there is no active waiter (e.g. if syncCompletionWaiter
is null or syncCompletionWaiter.isCompleted), otherwise reuse the existing
syncCompletionWaiter so applyAckResult awaiting the previous deferred can
complete; reference startNewSync, syncCompletionWaiter, applyAckResult and
endSync when making this change and ensure endSync still completes the existing
deferred (instead of completing a replaced one).
- Around line 270-274: The current merge in ObjectsManager only appends
map.entries and reuses existingState for all other fields, which preserves stale
siteTimeserials/createOp and can drop create-state payload; update the merge so
you construct mergedState from existingState but explicitly merge/replace the
other ObjectState fields from objectState (e.g., siteTimeserials, createOp and
any payload/create-state fields) while keeping the mergedMap for the map field —
for example, build mergedMap from existingState.map and objectState.map entries,
then create mergedState = existingState.copy(map = mergedMap, siteTimeserials =
objectState.siteTimeserials ?: existingState.siteTimeserials, createOp =
objectState.createOp ?: existingState.createOp, ... ) and store that into
syncObjectsPool[objectId] instead of only replacing map.entries.
---
Nitpick comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`:
- Around line 660-690: The test `(RTO5f2a2) partial sync map entries are merged
across two messages with the same objectId` currently only asserts keys exist;
update it to also assert that the merged sync metadata is preserved: after
calling ObjectsManager.handleObjectSyncMessages, fetch the DefaultLiveMap for
"map:test@1" and assert its combined/site-level siteTimeserials include both
"site1"->"serial1" and "site1"->"serial2" as appropriate (or that the resulting
siteTimeserial reflects the later serial), and assert any per-entry
createOp/metadata on liveMap.data["key1"] and liveMap.data["key2"] match the
createOp values from msg1 and msg2 respectively; apply the same additional
metadata assertions to the other partial-map tests noted (lines 693-765).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3fe8bc55-a3a0-4496-9219-7903ec1845cd
📒 Files selected for processing (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.ktliveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
syncObjectsDataPooltosyncObjectsPoolObjectsManagerto improve code clarity.Spec: ably/specification#413
JS: ably/ably-js#2152
Summary by CodeRabbit
Bug Fixes
Tests