Skip to content

feat(tier-2): Lane A — SWAPDB, MOVE, COPY DB n, CLUSTER REPLICAS/SLAVES, COUNT-FAILURE-REPORTS#100

Open
TinDang97 wants to merge 6 commits into
mainfrom
feat/tier-2-lane-a
Open

feat(tier-2): Lane A — SWAPDB, MOVE, COPY DB n, CLUSTER REPLICAS/SLAVES, COUNT-FAILURE-REPORTS#100
TinDang97 wants to merge 6 commits into
mainfrom
feat/tier-2-lane-a

Conversation

@TinDang97
Copy link
Copy Markdown
Collaborator

@TinDang97 TinDang97 commented May 24, 2026

Summary

Tier 2 Lane A — 5 Redis-parity commands + 1 hot-path perf fix surfaced during pre-merge bench. T2.6 (READONLY/READWRITE) intentionally deferred to a follow-up PR because it changes route_slot() signature and cascades through 4 handler files.

Commits (6)

  • c381b31 T2.1 SWAPDB — cross-shard atomic swap via ShardMessage::SwapDb, WAL-durable, BGREWRITEAOF CAS guard, restart-replay test
  • 4958dc9 T2.2 MOVE key dbwith_two_dbs_locked helper (lower-index first), WAL-durable, intercept in all 4 handler paths
  • bbc6117 T2.3 COPY ... DB n — extends copy() to accept target db index, reuses with_two_dbs_locked, WAL-durable
  • f538589 T2.4 CLUSTER REPLICAS / SLAVES — extracts shared format_node_line(node, self_node_id) helper between CLUSTER NODES and CLUSTER REPLICAS
  • ebd240a T2.5 CLUSTER COUNT-FAILURE-REPORTS — counts non-stale pfail_reports; exposes DEFAULT_NODE_TIMEOUT_MS as pub(crate)
  • 608e2d1 perf(handler): collapse duplicate is_write gate on MOVE/COPY hot path — restores s=1 SET p=1 throughput

CI gates green (post-fix)

  • cargo fmt --check
  • cargo clippy --release -- -D warnings
  • cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings
  • cargo test --release --lib3273 PASS / 0 FAIL
  • cargo test --no-default-features --features runtime-tokio,jemalloc --lib2668 PASS / 0 FAIL

Performance verification (L-A.G3)

Initial finding — s=1 SET p=1 regression

First scaling-bench-median run on Lane A head vs scaling-matrix-orbstack-2026-05-23.md showed 3 s=1 cells (-8.5% / -11.4% / -11.7%) flagged as regressions. Strict in-session A/B against pre-Lane-A merge base bddda41 cleared most cells, but s=1 SET p=1 measured 218K vs base 241K = -9.5% — the only real signal once cross-session VM variance was controlled for.

Root cause

handler_monoio/mod.rs:1077 and handler_sharded/mod.rs:1133 (added by T2.2/T2.3) wrapped MOVE/COPY interception in an outer if metadata::is_write(cmd) { guard. The existing write-path block below already calls metadata::is_write(cmd), so every SET, HSET, ZADD, etc. paid two PHF table lookups instead of one. At pipeline=1 where per-command overhead dominates, that compounded to -9.5%. At pipeline >= 16 the cost amortized into noise.

Fix (commit 608e2d1)

Remove the redundant outer wrapper. MOVE/COPY rely directly on inner eq_ignore_ascii_case checks; for non-MOVE/COPY workloads the branch predictor learns "false" on both inline checks for free (~5ns each, fully predictable). handler_single.rs and shard/spsc_handler.rs were already correct — they gate MOVE/COPY directly without an outer wrapper.

Post-fix in-session A/B vs 46ed28a (May-23 baseline commit)

Strict back-to-back same-VM-state, s=1, c=400, median-of-3:

cell 46ed28a Lane A pre-fix Lane A post-fix post-fix vs base
GET p=1 198K 211K 201K +1.5%
GET p=16 2.65M 2.88M 2.52M -4.9%
GET p=64 7.25M 7.54M 7.18M -1.0%
SET p=1 241K 218K (-9.5%) 243K +0.9%
SET p=16 1.49M 1.50M 1.48M -0.7%
SET p=64 2.74M 2.76M 2.63M -4.0%

All 6 cells now within ±5% of the merge-base in-session.

Post-fix scaling matrix (run-2 canonical, run-1 hit VM contention)

Redis s=1 s=2 s=4 best Moon vs Redis
GET p=1 202K 217K 167K 160K 217K 1.07×
GET p=16 2.38M 2.75M 2.28M 2.14M 2.75M 1.15×
GET p=64 4.48M 7.35M 4.63M 6.17M 7.35M 1.64×
SET p=1 184K 232K 170K 162K 232K 1.26×
SET p=16 953K 1.46M 1.44M 1.51M 1.51M 1.58×
SET p=64 1.69M 2.82M 2.63M 3.33M 3.33M 1.97×

Run-1 of the same matrix saw a transient s=2 SET p=1 collapse (125K, monotonically declining within the cell) which also dragged Redis SET p=16 down -26% — confirmed VM-environmental, not code, by the run-2 recovery.

Methodology note for the L-A.G3 gate

Cross-session comparison against scaling-matrix-orbstack-2026-05-23.md is not reliable at ±5% — the file itself flags ±10-15% as the VM variance band, and we observed ±15-26% cross-session Redis movement on the same hardware. Recommend the L-A.G3 wording change to: "±5% of in-session A/B against the merge base, measured strict back-to-back on the same VM boot." The baseline file remains a long-term trend reference.

Functional verification (post-fix, --shards 1)

  • MOVE k0 1:1, GET (db0) = nil, GET (db1) = v0 — PASS
  • COPY src dst DB 2:1, GET (db2) = srcval, src preserved — PASS
  • SWAPDB 0 3+OK, GET (db0) = v3, GET (db3) = v0 — PASS

Out of scope

  • T2.6 READONLY/READWRITE — deferred (changes route_slot() signature, must land last)
  • Tier 2 Lane B (T2.7-T2.9)
  • Tier 2 Lane C (T2.10-T2.11)

Test plan

  • Unit tests for all 5 commands
  • Restart-replay tests for SWAPDB / MOVE / COPY DB n
  • Full cargo test (both runtimes)
  • In-session s=1 A/B bench: pre-Lane-A → Lane A → Lane A+fix
  • Full scaling-bench-median post-fix (×2, confirming VM noise on run-1)
  • Functional smoke: MOVE / COPY DB / SWAPDB
  • Reviewer: spot-check with_two_dbs_locked lock-ordering on MOVE same-db edge case
  • Reviewer: confirm WAL records replay deterministically for SWAPDB cross-shard order
  • Reviewer: confirm 608e2d1 deindent didn't drop any inner statements (diff is 126/122 raw lines, 15/11 ignoring whitespace)

Summary by CodeRabbit

  • New Features

    • Implemented SWAPDB command for atomic database swapping across single/sharded deployments.
    • Added CLUSTER REPLICAS/CLUSTER SLAVES and CLUSTER COUNT-FAILURE-REPORTS subcommands for enhanced cluster introspection.
    • Extended MOVE and COPY commands to support cross-database operations via optional DB parameter.
  • Bug Fixes

    • Added concurrency guard for BGREWRITEAOF to prevent simultaneous rewrites.
  • Tests

    • Added SWAPDB consistency and no-op validation tests.
    • Extended cluster command test coverage for replicas and failure reports.
    • Added WAL/AOF replay coverage for SWAPDB.

Review Change Stack

TinDang97 added 5 commits May 24, 2026 13:09
Implements Redis-compatible SWAPDB with full multi-shard broadcast,
WAL durability, BGREWRITEAOF guard, bounds checking, same-index no-op,
and crash-recovery replay.

## Architecture

SWAPDB is intercepted at the handler layer (not inside `cmd_dispatch`)
because it requires async execution, multi-db access, and cross-shard
coordination:

- **handler_monoio/dispatch.rs** — `try_handle_swapdb()` async fn
- **handler_sharded/dispatch.rs** — `try_handle_swapdb()` async fn
- **handler_single.rs** — inline SWAPDB path for single-shard tokio mode

## Multi-shard broadcast

`shard::coordinator::coordinate_swapdb()` fans out to all shards:

- **Remote shards**: `ShardMessage::SwapDb { a, b, reply_tx }` sent via
  SPSC.  The SPSC handler (`spsc_handler::SwapDb` arm) emits a per-shard
  WAL record via `wal_append_and_fanout` BEFORE performing the swap,
  then calls `ShardDatabases::swap_dbs(shard_id, a, b)` and signals the
  coordinator via an OneshotSender<()>.
- **Local shard**: handled inline in the coordinator (ChannelMesh has no
  self-send slot — `target_index` panics on `my_id == target_id`).
  WAL bytes are sent via `ShardDatabases::wal_append()` (the per-shard
  MPSC channel that the event-loop drains on its 1ms tick into the WAL
  writer).  The swap is performed via `ShardDatabases::swap_dbs()`.

The coordinator awaits all remote-shard acks before returning +OK.

Brief-skew acceptance: between the first and last ack, interleaved GET
may see pre-swap on one shard + post-swap on another.  This matches
Redis cluster relaxed semantics and is acceptable for SWAPDB semantics.

## WAL durability

Each shard emits a "SWAPDB <a> <b>" WAL record before performing the
swap (both remote shards via SPSC handler and local shard via the
wal_append channel).  On crash recovery,
`persistence::replay::DispatchReplayEngine` intercepts SWAPDB before
calling `cmd_dispatch` (which only sees one `&mut Database`), operates
directly on the `&mut [Database]` slice, and uses `split_at_mut` +
`std::mem::swap` to apply the exchange.

Verified with restart-survival test: SET key in db0 → SWAPDB 0 1 →
SIGTERM → restart → key found in db1 (WAL v2 path, `--disk-offload
disable --appendonly yes`).

## BGREWRITEAOF guard (CAS fix)

Replaced unconditional `store(true)` in `bgrewriteaof_start` and
`bgrewriteaof_start_sharded` with `compare_exchange(false, true)` to
prevent a second concurrent BGREWRITEAOF call from clearing the flag
mid-rewrite.  The flag is only cleared by the caller that set it
(on channel-send failure) or by the AOF writer task on completion.

## Correctness properties

- `ShardDatabases::swap_dbs()` acquires write locks in ascending-index
  order (lower index first) to prevent deadlock on concurrent SWAPDB
  calls with the same pair from opposite directions.
- Same-index SWAPDB (SWAPDB 0 0) returns +OK immediately with no WAL
  emitted and no lock contention.
- Out-of-range indices (SWAPDB 0 999 with db_count=16) return ERR.
- BGREWRITEAOF guard: rejects SWAPDB if `AOF_REWRITE_IN_PROGRESS` is
  set.  The flag is set by `bgrewriteaof_start{,_sharded}` (via CAS)
  and cleared by the AOF writer task after rewrite completes (both
  monoio and tokio paths).
- `ShardMessage::SwapDb { a: usize, b: usize, reply_tx: OneshotSender<()> }`
  fits within the 64-byte (1 cache-line) cap asserted at compile time.

## Tests added

- `persistence::replay::tests::replay_swapdb_exchanges_databases`
- `persistence::replay::tests::replay_swapdb_same_index_noop`
- `persistence::replay::tests::replay_swapdb_out_of_range_skips`
- `shard::shared_databases::tests::test_swap_dbs_exchanges_contents`
- `shard::shared_databases::tests::test_swap_dbs_reverse_order_same_result`
- `command::tests::swapdb_dispatch_stub_returns_error` (replaces old stub test)
- `scripts/test-consistency.sh`: SWAPDB section added (key movement,
  same-index no-op, out-of-range ERR, reverse swap)

## E2E smoke (4-shard, `--disk-free-min-pct 0`)

- `SWAPDB 0 1` — OK, db0 DBSIZE 0 db1 DBSIZE 1, key moved correctly
- `SWAPDB 0 0` — OK (no-op)
- `SWAPDB 0 9999` — ERR DB index is out of range
- `SWAPDB 1 0` (reverse) — OK, key restored to db0

## CI gates

- cargo fmt --check: PASS
- cargo clippy -- -D warnings (default features): PASS
- cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings: PASS
- cargo test --no-default-features --features runtime-tokio,jemalloc --lib: 2618 PASS, 0 FAIL

author: Tin Dang
Atomically moves a key from the connection's selected database to another
database on the same shard. Both source and destination are on the same
Moon shard — cross-shard moves return ERR directing users to MIGRATE (T2.8).

## Semantics (Redis-compatible)
- Returns :1 on success
- Returns :0 if the key is absent in the source database
- Returns :0 if the key already exists in the destination database (no clobber)
- Returns :0 if src_db == dst_db (no-op, Redis-compatible)
- Returns ERR on wrong arity or out-of-range db index

## Architecture
MOVE cannot go through the central dispatch() function which only receives
one &mut Database. Each of the four handler paths intercepts MOVE before
reaching dispatch():

- handler_single: drops the single-db write guard, calls with_two_dbs_locked,
  then re-acquires the guard to restore the loop invariant
- handler_monoio: intercepted inside the is_write gate before the write path
- handler_sharded: same pattern as handler_monoio
- spsc_handler: intercepted before cmd_dispatch; uses with_two_slice_dbs on
  the ShardSlice path (no RwLock needed — SPSC is single-threaded) and
  with_two_dbs_locked on the legacy ShardDatabases path

## Lock ordering
Lower database index is always locked first in with_two_dbs_locked to prevent
deadlocks with concurrent reverse MOVE operations from other connections.
The ShardSlice path uses split_at_mut (no locking) so deadlock is impossible.

## WAL
WAL is appended (wal_append_and_fanout / aof_tx) only on the :1 (success)
path, matching Redis AOF semantics.

## New module: src/command/keyspace/
Shared helpers used by both MOVE (T2.2) and future COPY DB n (T2.3):
- move_core()         — pure data-plane MOVE logic
- parse_move_args()   — validates MOVE key db argument structure
- with_two_dbs_locked() — deadlock-safe RwLock acquisition (lower index first)
- with_two_slice_dbs()  — split_at_mut borrow for ShardSlice path

## Tests (13 unit tests in move_cmd.rs)
- move_core: success, missing key, collision → key restored in src
- parse_move_args: ok, wrong arity, negative db, out-of-range, non-numeric
- with_two_dbs_locked: lower-first and higher-first orderings
- with_two_slice_dbs: lower-src and higher-src orderings

Smoke-tested against a live server (--appendonly no):
  MOVE mykey 1       → 1 (key moved)
  MOVE nonexistent 1 → 0
  MOVE colkey 1      → 0 (collision preserved both sides)
  MOVE samekey 0     → 0 (same-db no-op)
  MOVE (bad arity)   → ERR
  MOVE db2key 2      → 1 (move to db2 works)

author: Tin Dang
Extends COPY to support cross-database copy via the `DB n` clause:
  COPY src dst DB n [REPLACE]

The `DB n` form copies a key from the connection's selected database to
database n on the same shard. Without `DB n` the command falls through to
the existing same-db path in key_extra::copy (no behaviour change for the
common case).

## Semantics (Redis-compatible)
- Returns :1 on success (key copied to destination database)
- Returns :0 if source key does not exist
- Returns :0 on collision in destination without REPLACE
- Returns :1 with REPLACE on collision (overwrites destination)
- `DB n == current_db` falls through to key_extra::copy (same-db path)
- No DB clause falls through to key_extra::copy (same-db path)
- Returns ERR on invalid DB index or syntax errors

## Architecture
COPY with DB n cannot go through dispatch() which only receives one
&mut Database. The same handler-level interception strategy as MOVE is used:

- handler_single: COPY DB n block after the MOVE block; same guard
  drop/re-acquire pattern; db.as_slice() dereferences SharedDatabases
- handler_monoio: COPY block inside the is_write gate, before normal write path
- handler_sharded: COPY block inside the is_write gate
- spsc_handler: COPY block after MOVE block; same slice/locked dual-path

parse_copy_db_args() returns:
  None            — no DB clause or same-db (caller falls through to dispatch)
  Some(Ok(args))  — cross-db: intercept and execute copy_core
  Some(Err(f))    — invalid DB index or syntax: return error frame immediately

## key_extra.rs — same-db DB clause fallthrough
When parse_copy_db_args returns None (same-db case), dispatch() reaches
key_extra::copy(). The DB token and its index argument are consumed there
so existing same-db COPY DB 0 (from current db 0) works correctly.

## New items in src/command/keyspace/move_cmd.rs
- copy_core()          — pure data-plane COPY logic (no locking, no WAL)
- CopyDbArgs struct    — parsed cross-db COPY arguments
- parse_copy_db_args() — tokenises COPY args, identifies cross-db case

## Tests (9 new unit tests in move_cmd.rs)
- copy_core: success, missing src, collision without replace, collision with replace
- parse_copy_db_args: no DB clause (→ None), same db (→ None), cross db,
  with REPLACE, invalid DB index

Smoke-tested against a live server (--appendonly no):
  COPY src dst DB 1             → 1 (key copied, src preserved)
  COPY src dst DB 1 (collision) → 0 (dst untouched)
  COPY src dst DB 1 REPLACE     → 1 (dst overwritten)
  COPY origkey newkey DB 2      → 1 (different src/dst names across dbs)
  COPY nosuchkey dst DB 1       → 0 (missing src)
  COPY cpyfallthrough cpy2      → 1 (no DB clause, same-db fallthrough)
  COPY samedbkey samedbdst DB 0 → 1 (same-db DB clause, fallthrough)

author: Tin Dang
Add CLUSTER REPLICAS <node-id> and its deprecated alias CLUSTER SLAVES.

Wire protocol:
- Returns Frame::Array of Frame::BulkString entries (one per replica).
- Each entry is a CLUSTER NODES-format line with no trailing newline.
- Empty array when the master has no replicas.
- ERR Unknown node <id> when the requested node-id is not in cluster state.

Implementation:
- Extract format_node_line(node, self_node_id) helper (no trailing \n)
  so that CLUSTER NODES and CLUSTER REPLICAS share one formatter.
- Refactor handle_cluster_nodes to use format_node_line + push('\n').
- handle_cluster_replicas filters nodes.values() by
  NodeFlags::Replica { master_id } == target_id.
- Both REPLICAS and SLAVES dispatch to handle_cluster_replicas in the
  CLUSTER subcommand match arm — single code path, zero duplication.

Tests added (5):
- cluster_replicas_returns_empty_for_master_with_no_replicas
- cluster_replicas_lists_replicas (≥9 fields, no trailing \n, both ids present)
- cluster_replicas_rejects_unknown_node_id
- cluster_slaves_is_alias_for_replicas (dispatch via handle_cluster_command)
- cluster_replicas_includes_myself_marker_when_self_is_replica

author: Tin Dang
Add CLUSTER COUNT-FAILURE-REPORTS <node-id>.

Wire protocol:
- Returns Frame::Integer(count) — the number of active PFAIL reports for
  the given node.
- Returns :0 for an unknown node-id (matches real Redis behaviour).
- A report is active when its age is strictly less than 2 * node_timeout_ms.
  Stale reports (age >= 2 * node_timeout_ms) are excluded from the count.

Staleness window:
- Uses DEFAULT_NODE_TIMEOUT_MS (30_000 ms) from cluster::failover, now
  exported as pub(crate) so both try_mark_fail_with_consensus and this
  handler apply the same 2× cutoff — no divergence between the two paths
  that read pfail_reports.
- Follow-up: centralize node_timeout into ClusterState so it is
  configurable at runtime (current --cluster-node-timeout is 15_000 ms
  in Config but not yet threaded to this handler; hardcoded 30_000 used
  here matches the existing failover.rs constant).

Implementation:
- handle_cluster_count_failure_reports reads ClusterState under a shared
  read lock; no writes, no WAL emission.
- Dispatch wired as b"COUNT-FAILURE-REPORTS" arm in handle_cluster_command.
- failover.rs: DEFAULT_NODE_TIMEOUT_MS promoted from private const to
  pub(crate) const with doc comment.

Tests added (4):
- cluster_count_failure_reports_returns_zero_for_unknown_node
- cluster_count_failure_reports_returns_zero_for_healthy_node
- cluster_count_failure_reports_counts_active_reports
- cluster_count_failure_reports_excludes_stale_reports
  (uses ts=0 for stale, ts=u64::MAX/2 for active — clock-skew-safe)

author: Tin Dang
@qodo-code-review
Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 24, 2026

📝 Walkthrough

Walkthrough

Implements MOVE/COPY core functions and two-db access helpers; adds handler-layer SWAPDB with cross-shard coordination and WAL emission; gates SWAPDB during AOF rewrite via an atomic flag cleared by writer loops; adds SWAPDB replay handling; and adds CLUSTER REPLICAS / COUNT-FAILURE-REPORTS handlers and consistency tests.

Changes

Cross-Database Commands (MOVE, COPY, SWAPDB)

Layer / File(s) Summary
Data-plane core operations and argument parsing
src/command/keyspace/mod.rs, src/command/keyspace/move_cmd.rs
move_core and copy_core implement atomic key transfer with collision handling; parse_move_args and parse_copy_db_args validate cross-db arguments; with_two_dbs_locked and with_two_slice_dbs provide deadlock-safe two-database access; unit tests added.
Command dispatch fallbacks & metadata
src/command/mod.rs, src/command/metadata.rs, src/command/key_extra.rs
Exports keyspace module, updates direct-dispatch fallbacks: MOVE/SWAPDB now require handler-layer invocation; COPY DB token parsing adjusted to error on missing index and fall through for same-db.
Single-mode connection handler
src/server/conn/handler_single.rs
SWAPDB intercept acquires two write locks in ascending order, swaps databases, and emits WAL; write-path MOVE/COPY acquire two-db locks, perform operation, reacquire current-db guard, and append AOF on success.
Sharded monoio handler
src/server/conn/handler_monoio/dispatch.rs, src/server/conn/handler_monoio/mod.rs
Async SWAPDB dispatcher validates args, blocks during BGREWRITEAOF, coordinates cross-shard swap; local MOVE/COPY fast-paths use shard-slice or two-db locking and append AOF on success.
Sharded handler
src/server/conn/handler_sharded/dispatch.rs, src/server/conn/handler_sharded/mod.rs
Async handler-layer SWAPDB intercept rejects MULTI/queued execution, validates indices, and delegates to coordinate_swapdb; local MOVE/COPY handled early via keyspace::move_cmd.
Shard coordinator and message dispatch
src/shard/shared_databases.rs, src/shard/dispatch.rs, src/shard/coordinator.rs, src/shard/spsc_handler.rs
ShardDatabases::swap_dbs atomically swaps two DBs under ordered locks; ShardMessage::SwapDb carries swap indices and oneshot reply; coordinate_swapdb broadcasts swap to shards, emits WAL before local swap, and awaits acknowledgements; SPSC handler appends WAL and swaps.
AOF concurrency control and replay
src/command/persistence.rs, src/persistence/aof.rs, src/persistence/replay.rs
Introduces AOF_REWRITE_IN_PROGRESS atomic flag to gate BGREWRITEAOF via CAS with rollback on send failure; writer loops clear the flag after rewrite; DispatchReplayEngine::replay_command intercepts SWAPDB and swaps DBs via safe slice splitting for replay.
Consistency testing
scripts/test-consistency.sh
Adds SWAPDB consistency checks: seeds db0/db1, executes SWAPDB, verifies key movement and absence, checks same-index no-op and out-of-range error behavior, and swaps back to restore state.

Cluster Commands Enhancement

Layer / File(s) Summary
CLUSTER REPLICAS and COUNT-FAILURE-REPORTS
src/cluster/command.rs, src/cluster/failover.rs
handle_cluster_replicas (alias SLAVES) filters replica nodes by master id and returns formatted node lines; handle_cluster_count_failure_reports counts recent pfail reports using a stale cutoff derived from DEFAULT_NODE_TIMEOUT_MS (now pub(crate)); format_node_line centralizes CLUSTER NODES output formatting; tests added.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Handler
  participant Coordinator
  participant LocalShard
  participant RemoteShard
  participant AOFWriter

  Client->>Handler: SWAPDB a b
  Handler->>Handler: parse indices, check MULTI/AOF_REWRITE_IN_PROGRESS
  Handler->>Coordinator: coordinate_swapdb(a,b)
  Coordinator->>LocalShard: emit WAL (SWAPDB a b)
  LocalShard->>LocalShard: swap_dbs(a,b)
  Coordinator->>RemoteShard: ShardMessage::SwapDb(a,b)
  RemoteShard->>RemoteShard: emit WAL, swap_dbs(a,b)
  RemoteShard->>Coordinator: oneshot ack
  Coordinator->>Handler: +OK
  Coordinator->>AOFWriter: (WAL persisted)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • pilotspace/moon#99: Refactors dispatch-layer error messages and fallback handling for MOVE and SWAPDB in src/command/mod.rs.
  • pilotspace/moon#63: Overlaps BGREWRITEAOF startup and AOF rewrite flow changes relevant to AOF_REWRITE_IN_PROGRESS coordination.
  • pilotspace/moon#68: Prior COPY command work this PR builds on for cross-database DB clause parsing and behavior.

Suggested labels

enhancement

Suggested reviewers

  • pilotspacex-byte

Poem

A rabbit hops between two DBs at dawn, 🐇
Locks tidy, no deadlocks to fawn,
WALs recorded before the leap,
Shards all nod—no data to keep,
Keys swapped, tests pass, the meadow's lawn.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main changes: 5 Tier-2 Lane A features (SWAPDB, MOVE, COPY DB n, CLUSTER REPLICAS/SLAVES, COUNT-FAILURE-REPORTS) plus a hot-path perf fix, matching the detailed changeset.
Description check ✅ Passed The PR description is comprehensive and well-structured: it covers all five features, includes detailed performance analysis with benching methodology, explains the perf fix, lists CI verification, and documents test coverage and reviewer follow-ups.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/tier-2-lane-a

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 18

🧹 Nitpick comments (1)
src/cluster/command.rs (1)

925-961: 💤 Low value

Test won't catch the duplicate master_id bug.

The test checks fields.len() >= 9 but doesn't validate field content. With the current bug in format_node_line, replica lines will have 10 fields (duplicate master_id), and this test will pass.

Consider adding a check after fixing the bug:

// For replica nodes, verify master_id appears exactly once in the correct position
if fields[3] == "slave" || fields[3].starts_with("myself,slave") {
    assert_eq!(fields[3], "slave", "flags should be 'slave' for non-self replicas");
    assert!(fields[4].len() == 40, "field 4 should be 40-char master_id");
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cluster/command.rs` around lines 925 - 961, The test
cluster_replicas_lists_replicas is too loose and won't catch the duplicate
master_id bug from format_node_line; update the test (inside the loop that
parses fields) to assert replica-specific layout: when fields[3] indicates a
replica (either exactly "slave" or starts with "myself,slave" for the self
case), assert flags are "slave" for non-self replicas and verify the master_id
field is in the expected position and is exactly 40 chars long (i.e., check
fields[3] == "slave" for non-self replicas and fields[4].len() == 40), and also
ensure the master_id does not appear elsewhere in the line so duplicates would
fail; add these checks after the fields: Vec<&str> =
line.split_whitespace().collect() block to catch the duplicate-master_id bug
from format_node_line.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@scripts/test-consistency.sh`:
- Around line 548-553: The seed step uses the helper/command "both" but relies
on a separate SELECT that won’t persist across invocations, so the final "both
DEL swapkey" runs against DB0 instead of DB1; update the seed to target DB1
explicitly when deleting (i.e., remove the separate SELECT 1 and invoke DEL with
DB index specified for "both" so the DEL runs against DB1 for key "swapkey") and
ensure the commands around "both SET swapkey hello" and the subsequent delete
use explicit DB targeting.
- Around line 570-576: The test currently reads redis_oor but only checks
rust_oor for "ERR" so redis_oor is unused and parity isn't asserted; update the
conditional to verify both redis_oor and rust_oor contain "ERR" (e.g., use grep
-qi "ERR" on both $redis_oor and $rust_oor) and only increment PASS when they
both match, otherwise increment FAIL and echo a message showing both $redis_oor
and $rust_oor to highlight the discrepancy; keep the existing variables
(redis_oor, rust_oor, PASS, FAIL, SWAPDB) when implementing this change.

In `@src/cluster/command.rs`:
- Around line 102-118: The flags string in format_node_line incorrectly includes
the replica's master_id for NodeFlags::Replica, causing an extra token; change
the match arms that handle NodeFlags::Replica { master_id } so they produce
"myself,slave" and "slave" (no master_id appended) when building flags_str, and
leave the separate master-id field logic (the master_id_field variable/output)
intact so the master node id is emitted only in its dedicated column.

In `@src/command/key_extra.rs`:
- Around line 42-49: The code currently consumes the DB index token by doing i
+= 1 without validating its content; instead, validate the consumed token
(args[i] before or after increment as appropriate) is a non-negative integer
(e.g., parse bytes to a usize/u64 after converting from bytes) and return
Frame::Error(Bytes::from_static(b"ERR invalid DB index")) on parse failure.
Update the branch that comments about parse_copy_db_args/same-db path to
explicitly parse and validate the DB token (the args and i variables in this
block) before proceeding so malformed values like "DB abc" are rejected.

In `@src/command/keyspace/move_cmd.rs`:
- Around line 255-264: The code in with_two_dbs_locked currently takes two write
locks even when src_idx == dst_idx, causing a self-deadlock; add an explicit
guard at the start of with_two_dbs_locked that checks if src_idx == dst_idx and
either panic! or return an Err/early-return to enforce the contract. Locate the
with_two_dbs_locked function and insert the equality check before any locking on
dbs (use the same src_idx, dst_idx variables used in the diff), so the
subsequent code that acquires lo/hi locks and calls f(&mut lo, &mut hi) never
runs for equal indices.

In `@src/server/conn/handler_monoio/dispatch.rs`:
- Around line 1016-1034: The SWAPDB handler currently parses DB indices without
verifying argument count, allowing extra args and returning the wrong error for
too few args; before calling parse_db_index on cmd_args, check that
cmd_args.len() == 2 and if not push the standard arity error via
responses.push(Frame::Error(Bytes::from_static(b"ERR wrong number of arguments
for 'swapdb' command"))) and return true; then proceed to parse using the
existing parse_db_index closure and the existing variables (parse_db_index,
cmd_args, a, b, responses, Frame::Error) as-is.

In `@src/server/conn/handler_monoio/mod.rs`:
- Around line 789-792: SWAPDB is being handled before ACL checks, allowing
unauthorized access; move the SWAPDB intercept so it runs after the ACL gate
(i.e., after the call to dispatch::try_enforce_acl) or explicitly call
dispatch::try_enforce_acl before invoking dispatch::try_handle_swapdb in the
monoio path (the symbols to change are dispatch::try_handle_swapdb and
dispatch::try_enforce_acl in handler_monoio/mod.rs) so behavior matches the
sharded handler and unauthorized clients cannot reach SWAPDB.

In `@src/server/conn/handler_sharded/dispatch.rs`:
- Around line 455-472: The SWAPDB handler currently accepts extra args and
misreports missing args because it parses only the first two values; update the
dispatch logic to require exact arity by checking cmd_args.len() == 2 before
parsing. Specifically, add a guard that if cmd_args.len() != 2 pushes the same
Frame::Error(Bytes::from_static(b"ERR wrong number of arguments for 'swapdb'
command")) (or the existing error message you prefer) into responses and returns
true; then continue to use the parse_db_index closure to parse both entries
(parse_db_index, cmd_args.first(), cmd_args.get(1)) only after the arity check
so trailing args are rejected and missing args are reported correctly.

In `@src/server/conn/handler_sharded/mod.rs`:
- Around line 1131-1200: The MOVE/COPY handlers bypass normal write-path
undo/intent bookkeeping when a cross-store transaction is active; add an early
rejection that checks the active_cross_txn flag and returns an error before any
MOVE/COPY work or AOF append. Specifically, in the metadata::is_write branch
before handling cmd.eq_ignore_ascii_case(b"MOVE") and
cmd.eq_ignore_ascii_case(b"COPY"), detect the active cross-store txn (e.g.,
conn.active_cross_txn or ctx.active_cross_txn as available) and push a
Frame::Error like "ERR cross-store transaction active; MOVE/COPY not allowed" to
responses and continue, ensuring no MOVE/COPY logic or AofMessage::Append is
executed.

In `@src/server/conn/handler_single.rs`:
- Around line 630-633: The SWAPDB handler currently only reads the first two
arguments (using idx_a = cmd_args.first() and idx_b = cmd_args.get(1)) which
lets extra trailing args pass; update the handler to enforce exact arity before
parsing by checking that cmd_args.len() == 2 (or equivalent) and returning an
argument-count error if not, then proceed to call parse_idx on both entries
(idx_a/idx_b) and the existing match branch (Some(a), Some(b)); reference the
variables idx_a, idx_b and the parse_idx usage to locate where to add the length
check and early error return.
- Around line 656-677: The code drops the result of aof_tx.try_send when
enqueueing the SWAPDB WAL, which can cause the handler to acknowledge success
even if the WAL append failed; update the SWAPDB path (the block using aof_tx,
itoa::Buffer, Frame::Array, serialize_command and AofMessage::Append) to handle
the try_send Result: check for Err and on failure propagate an error (or return
a failed response) so the state mutation is not acknowledged, or retry/send
synchronously (await) until persisted; ensure the caller sees the failure and
does not reply OK when tx.try_send returns an error.

In `@src/shard/coordinator.rs`:
- Around line 1660-1728: The file exceeds the repository size cap and the new
coordination logic (coordinate_swapdb) should be moved into a dedicated
submodule; extract the SWAPDB coordination code (the function coordinate_swapdb
plus any small helpers it needs like the WAL emit/serialize callsite and imports
for ShardMessage, spsc_send, ShardDatabases, channel types) into a new submodule
(e.g., shard::coordinator::swapdb) and update the parent coordinator module to
re-export or call into swapdb::coordinate_swapdb. Ensure you move or import all
referenced symbols (spsc_send, ShardMessage, channel::oneshot/OneshotReceiver,
shard_databases.swap_dbs/wal_append, crate::persistence::aof::serialize_command,
Frame, etc.), adjust visibility (pub(crate) or pub) and mod declarations (mod
swapdb; pub use swapdb::coordinate_swapdb;) so existing callers compile, and run
cargo check to fix any missing use paths.
- Around line 1710-1713: coordinate_swapdb currently serializes the WAL frame
and calls shard_databases.wal_append(my_shard, serialized) but proceeds to
shard_databases.swap_dbs(my_shard, a, b) even if wal_append failed (it can
silently drop send failures); change the flow to check the result of wal_append
(or make wal_append return a Result/boolean) and only call
shard_databases.swap_dbs when WAL enqueue succeeds, returning or propagating an
error from coordinate_swapdb on failure; update uses of serialize_command,
wal_append, and swap_dbs to handle and propagate the WAL enqueue failure so
local swap never happens without durable WAL record.

In `@src/shard/shared_databases.rs`:
- Around line 797-806: The swap_dbs function currently only has a debug_assert
to catch a == b but will deadlock in release builds by taking the same write
lock twice; add an explicit no-op early return at the top of swap_dbs (before
computing lo/hi and before acquiring locks) that checks if a == b and returns
immediately, preserving the existing debug_assert and then proceeding to acquire
self.shards[shard_id][lo].write() and self.shards[shard_id][hi].write() only for
lo != hi.

In `@src/shard/spsc_handler.rs`:
- Around line 2296-2325: The SwapDb handler currently performs
shard_databases.swap_dbs(a,b) without coordinating with snapshots; change it to
block or snapshot-protect SWAPDB by acquiring the snapshot protection/guard
before serialising/applying the swap (or by checking SnapshotState and
waiting/returning an error if a SnapshotState is active). Concretely, in the
ShardMessage::SwapDb branch wrap the WAL emission + shard_databases.swap_dbs
call with the existing SnapshotState API (e.g. obtain a SnapshotGuard or call
snapshot_state.block_mutations()/wait_for_no_snapshots) so swaps cannot run
concurrently with an in-flight snapshot, and keep the existing ascending-index
write-lock ordering when holding that snapshot guard. Ensure you reference the
ShardMessage::SwapDb handler, shard_databases.swap_dbs, and the
SnapshotState/snapshot guard API when implementing the coordination.
- Around line 464-509: The MOVE handler returns early (Integer(0)) for dst_db ==
db_idx and bypasses snapshot copy-on-write; before deciding or executing the
move (both the dst_db == db_idx branch and the actual move branch that calls
ksmv::move_core), invoke the snapshot COW interceptor for the source key so
snapshots capture the pre-move state—i.e., after parsing Ok((key, dst_db)) call
the existing cow_intercept function (e.g. crate::shard::snapshot::cow_intercept
or the project’s equivalent) for (db_idx, &key) while still on the shard context
(the same place where you later call ksmv::with_two_slice_dbs or
ksmv::with_two_dbs_locked) so the subsequent Integer(0) return or
ksmv::move_core cannot delete the pre-snapshot data.
- Around line 464-549: The MOVE and COPY intercept paths call
move_core/copy_core without refreshing the source and destination DBs, so
expired keys may be treated incorrectly; update the closures passed to
ksmv::with_two_slice_dbs and ksmv::with_two_dbs_locked in both the MOVE (where
move_core is invoked) and COPY (where copy_core is invoked) branches to call
refresh_now_from_cache(cached_clock) on both src and dst before calling
ksmv::move_core or ksmv::copy_core (i.e., inside each closure, first run
src.refresh_now_from_cache(cached_clock);
dst.refresh_now_from_cache(cached_clock); then invoke the core function).

---

Nitpick comments:
In `@src/cluster/command.rs`:
- Around line 925-961: The test cluster_replicas_lists_replicas is too loose and
won't catch the duplicate master_id bug from format_node_line; update the test
(inside the loop that parses fields) to assert replica-specific layout: when
fields[3] indicates a replica (either exactly "slave" or starts with
"myself,slave" for the self case), assert flags are "slave" for non-self
replicas and verify the master_id field is in the expected position and is
exactly 40 chars long (i.e., check fields[3] == "slave" for non-self replicas
and fields[4].len() == 40), and also ensure the master_id does not appear
elsewhere in the line so duplicates would fail; add these checks after the
fields: Vec<&str> = line.split_whitespace().collect() block to catch the
duplicate-master_id bug from format_node_line.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7468036a-b29c-4c8d-85aa-c4dfe0a9944f

📥 Commits

Reviewing files that changed from the base of the PR and between bddda41 and ebd240a.

📒 Files selected for processing (20)
  • scripts/test-consistency.sh
  • src/cluster/command.rs
  • src/cluster/failover.rs
  • src/command/key_extra.rs
  • src/command/keyspace/mod.rs
  • src/command/keyspace/move_cmd.rs
  • src/command/metadata.rs
  • src/command/mod.rs
  • src/command/persistence.rs
  • src/persistence/aof.rs
  • src/persistence/replay.rs
  • src/server/conn/handler_monoio/dispatch.rs
  • src/server/conn/handler_monoio/mod.rs
  • src/server/conn/handler_sharded/dispatch.rs
  • src/server/conn/handler_sharded/mod.rs
  • src/server/conn/handler_single.rs
  • src/shard/coordinator.rs
  • src/shard/dispatch.rs
  • src/shard/shared_databases.rs
  • src/shard/spsc_handler.rs
💤 Files with no reviewable changes (1)
  • src/command/metadata.rs

Comment on lines +548 to +553
# Seed: db0 has swapkey=hello, db1 is empty
both SELECT 0
both SET swapkey hello
both SELECT 1
both DEL swapkey

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use explicit DB targeting when seeding SWAPDB state

both SELECT ... does not persist DB selection across later redis-cli invocations. On Line 552, DEL swapkey runs against default DB 0, which can invalidate the intended seed and make the swap check non-representative.

Suggested fix
-# Seed: db0 has swapkey=hello, db1 is empty
-both SELECT 0
-both SET swapkey hello
-both SELECT 1
-both DEL swapkey
+# Seed: db0 has swapkey=hello, db1 is empty
+redis-cli -p "$PORT_REDIS" -n 0 SET swapkey hello >/dev/null 2>&1 || true
+redis-cli -p "$PORT_RUST"  -n 0 SET swapkey hello >/dev/null 2>&1 || true
+redis-cli -p "$PORT_REDIS" -n 1 DEL swapkey >/dev/null 2>&1 || true
+redis-cli -p "$PORT_RUST"  -n 1 DEL swapkey >/dev/null 2>&1 || true
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@scripts/test-consistency.sh` around lines 548 - 553, The seed step uses the
helper/command "both" but relies on a separate SELECT that won’t persist across
invocations, so the final "both DEL swapkey" runs against DB0 instead of DB1;
update the seed to target DB1 explicitly when deleting (i.e., remove the
separate SELECT 1 and invoke DEL with DB index specified for "both" so the DEL
runs against DB1 for key "swapkey") and ensure the commands around "both SET
swapkey hello" and the subsequent delete use explicit DB targeting.

Comment on lines +570 to +576
redis_oor=$(redis-cli -p "$PORT_REDIS" SWAPDB 0 9999 2>&1) || true
rust_oor=$(redis-cli -p "$PORT_RUST" SWAPDB 0 9999 2>&1) || true
if echo "$rust_oor" | grep -qi "ERR"; then
PASS=$((PASS + 1))
else
FAIL=$((FAIL + 1)); echo " FAIL: SWAPDB out-of-range should return ERR, got: $rust_oor"
fi
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert out-of-range parity against Redis (and use redis_oor)

Line 570 captures redis_oor, but the check only validates moon output contains ERR. This misses direct parity verification and leaves redis_oor unused.

Suggested fix
 redis_oor=$(redis-cli -p "$PORT_REDIS" SWAPDB 0 9999 2>&1) || true
 rust_oor=$(redis-cli -p "$PORT_RUST" SWAPDB 0 9999 2>&1) || true
-if echo "$rust_oor" | grep -qi "ERR"; then
-    PASS=$((PASS + 1))
-else
-    FAIL=$((FAIL + 1)); echo "  FAIL: SWAPDB out-of-range should return ERR, got: $rust_oor"
-fi
+assert_eq "SWAPDB out-of-range parity" "$redis_oor" "$rust_oor"
🧰 Tools
🪛 Shellcheck (0.11.0)

[warning] 570-570: redis_oor appears unused. Verify use (or export if used externally).

(SC2034)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@scripts/test-consistency.sh` around lines 570 - 576, The test currently reads
redis_oor but only checks rust_oor for "ERR" so redis_oor is unused and parity
isn't asserted; update the conditional to verify both redis_oor and rust_oor
contain "ERR" (e.g., use grep -qi "ERR" on both $redis_oor and $rust_oor) and
only increment PASS when they both match, otherwise increment FAIL and echo a
message showing both $redis_oor and $rust_oor to highlight the discrepancy; keep
the existing variables (redis_oor, rust_oor, PASS, FAIL, SWAPDB) when
implementing this change.

Comment thread src/cluster/command.rs
Comment on lines +102 to +118
fn format_node_line(node: &ClusterNode, self_node_id: &str) -> String {
let flags_str = if node.node_id == self_node_id {
// self: prepend "myself,"
match &node.flags {
NodeFlags::Master => "myself,master".to_string(),
NodeFlags::Replica { master_id } => format!("myself,slave {}", master_id),
NodeFlags::Pfail => "myself,pfail".to_string(),
NodeFlags::Fail => "myself,fail".to_string(),
}
} else {
match &node.flags {
NodeFlags::Master => "master".to_string(),
NodeFlags::Replica { master_id } => format!("slave {}", master_id),
NodeFlags::Pfail => "pfail".to_string(),
NodeFlags::Fail => "fail".to_string(),
}
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

Redis CLUSTER NODES output format for replica nodes

💡 Result:

In Redis Cluster, redis-cli CLUSTER NODES output is line-based; each line corresponds to one node entry in the cluster state as seen by the contacted node. Replica nodes are represented as one of the entries whose flags include “slave” (older name) or “replica”, and whose “master” field contains the node ID of the primary it replicates (or “-” if the primary isn’t known). The line also includes fields such as “config-epoch”, status/link info, and the list of hash slots (replicas typically don’t list slots). [1] Replica node-specific fields/markers you’ll see in each CLUSTER NODES line: 1) Flags token: “slave” / “replica” (indicates the node is a replica). [1] 2) Master node ID field: if the node is a replica and the primary is known, the “master” field is the primary’s node ID; otherwise it is “-”. [1] 3) “config-epoch” field: for a replica, this is the configuration epoch of the current primary (the “real” meaning is tied to the primary; replicas broadcast the primary’s config epochs via heartbeats). [1] Example shape (tokens/fields), taken from the documented explanation of how CLUSTER NODES lines are structured: - Node ID - ip:port@cluster-bus-port - flags (includes “slave”/“replica” for replicas) - master (primary node ID, or “-” if unknown) - … other timing/config/link fields - … hash slots list (typically only present for primaries). [1] So, to identify replica nodes in CLUSTER NODES output: look for lines whose flags include “slave”/“replica”, then confirm the replicated primary via the “master” field in that same line. [1]

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the helper implementation around the referenced line range
nl -ba src/cluster/command.rs | sed -n '80,180p'

Repository: pilotspace/moon

Length of output: 102


🏁 Script executed:

#!/bin/bash
set -euo pipefail

awk '{printf "%d\t%s\n", NR, $0}' src/cluster/command.rs | sed -n '80,180p'

Repository: pilotspace/moon

Length of output: 4208


Fix CLUSTER NODES replica formatting: don’t embed master-id in the flags field

format_node_line builds flags_str for NodeFlags::Replica { master_id } as "myself,slave {master_id}" / "slave {master_id}" (lines 103-118) and then also outputs master_id_field right after (lines 120-123, 132-145). Because flags_str contains a space, the line splits into an extra token, duplicating/misalaligning the master-id field. Redis expects the flags field to be a single token (slave / myself,slave) with the replicated primary’s node id in the following master-id field.

🔧 Proposed fix
     let flags_str = if node.node_id == self_node_id {
         // self: prepend "myself,"
         match &node.flags {
             NodeFlags::Master => "myself,master".to_string(),
-            NodeFlags::Replica { master_id } => format!("myself,slave {}", master_id),
+            NodeFlags::Replica { master_id: _ } => "myself,slave".to_string(),
             NodeFlags::Pfail => "myself,pfail".to_string(),
             NodeFlags::Fail => "myself,fail".to_string(),
         }
     } else {
         match &node.flags {
             NodeFlags::Master => "master".to_string(),
-            NodeFlags::Replica { master_id } => format!("slave {}", master_id),
+            NodeFlags::Replica { master_id: _ } => "slave".to_string(),
             NodeFlags::Pfail => "pfail".to_string(),
             NodeFlags::Fail => "fail".to_string(),
         }
     };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cluster/command.rs` around lines 102 - 118, The flags string in
format_node_line incorrectly includes the replica's master_id for
NodeFlags::Replica, causing an extra token; change the match arms that handle
NodeFlags::Replica { master_id } so they produce "myself,slave" and "slave" (no
master_id appended) when building flags_str, and leave the separate master-id
field logic (the master_id_field variable/output) intact so the master node id
is emitted only in its dedicated column.

Comment thread src/command/key_extra.rs
Comment on lines +42 to +49
// Cross-db COPY is intercepted by handler-level code before dispatch().
// When this branch is reached, the DB index has already been validated
// as the same database (parse_copy_db_args returned None → same-db path).
// Consume the DB index argument and continue as a same-db copy.
i += 1; // skip the db-index token
if i >= args.len() {
return Frame::Error(Bytes::from_static(b"ERR syntax error"));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate the consumed DB index token instead of accepting arbitrary bytes.

This branch only checks presence, so malformed values (e.g., DB abc) are silently accepted if interception is bypassed. Validate that the skipped token is a non-negative integer before continuing.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/command/key_extra.rs` around lines 42 - 49, The code currently consumes
the DB index token by doing i += 1 without validating its content; instead,
validate the consumed token (args[i] before or after increment as appropriate)
is a non-negative integer (e.g., parse bytes to a usize/u64 after converting
from bytes) and return Frame::Error(Bytes::from_static(b"ERR invalid DB index"))
on parse failure. Update the branch that comments about
parse_copy_db_args/same-db path to explicitly parse and validate the DB token
(the args and i variables in this block) before proceeding so malformed values
like "DB abc" are rejected.

Comment on lines +255 to +264
if src_idx < dst_idx {
let mut lo = dbs[src_idx].write();
let mut hi = dbs[dst_idx].write();
f(&mut lo, &mut hi)
} else {
// dst_idx < src_idx (equality rejected by caller: src == dst → :0)
let mut lo = dbs[dst_idx].write();
let mut hi = dbs[src_idx].write();
f(&mut hi, &mut lo)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard equal DB indices in with_two_dbs_locked to prevent self-deadlock.

When src_idx == dst_idx, this path takes two write locks on the same RwLock, which can deadlock. Add an explicit equality check up front (panic or early return) to enforce the contract.

Suggested fix
 pub fn with_two_dbs_locked<R>(
     dbs: &[RwLock<Database>],
     src_idx: usize,
     dst_idx: usize,
     f: impl FnOnce(&mut Database, &mut Database) -> R,
 ) -> R {
+    assert_ne!(
+        src_idx, dst_idx,
+        "with_two_dbs_locked: src and dst must differ"
+    );
     assert!(
         src_idx < dbs.len(),
         "src_idx {src_idx} out of range ({} dbs)",
         dbs.len()
     );
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/command/keyspace/move_cmd.rs` around lines 255 - 264, The code in
with_two_dbs_locked currently takes two write locks even when src_idx ==
dst_idx, causing a self-deadlock; add an explicit guard at the start of
with_two_dbs_locked that checks if src_idx == dst_idx and either panic! or
return an Err/early-return to enforce the contract. Locate the
with_two_dbs_locked function and insert the equality check before any locking on
dbs (use the same src_idx, dst_idx variables used in the diff), so the
subsequent code that acquires lo/hi locks and calls f(&mut lo, &mut hi) never
runs for equal indices.

Comment thread src/shard/coordinator.rs
Comment on lines +1710 to +1713
let serialized = crate::persistence::aof::serialize_command(&wal_frame);
shard_databases.wal_append(my_shard, serialized);
shard_databases.swap_dbs(my_shard, a, b);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Do not swap locally if WAL enqueue fails.

coordinate_swapdb assumes WAL is recorded before swap, but wal_append can silently drop send failures. That allows a local swap without durable WAL record.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/shard/coordinator.rs` around lines 1710 - 1713, coordinate_swapdb
currently serializes the WAL frame and calls
shard_databases.wal_append(my_shard, serialized) but proceeds to
shard_databases.swap_dbs(my_shard, a, b) even if wal_append failed (it can
silently drop send failures); change the flow to check the result of wal_append
(or make wal_append return a Result/boolean) and only call
shard_databases.swap_dbs when WAL enqueue succeeds, returning or propagating an
error from coordinate_swapdb on failure; update uses of serialize_command,
wal_append, and swap_dbs to handle and propagate the WAL enqueue failure so
local swap never happens without durable WAL record.

Comment on lines +797 to +806
debug_assert_ne!(
a, b,
"swap_dbs called with equal indices — caller should short-circuit"
);

let (lo, hi) = if a < b { (a, b) } else { (b, a) };
// Acquire in ascending index order to prevent deadlock.
let mut guard_lo = self.shards[shard_id][lo].write();
let mut guard_hi = self.shards[shard_id][hi].write();
std::mem::swap(&mut *guard_lo, &mut *guard_hi);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Prevent same-index SWAPDB from hanging in release builds.

When a == b, this path can attempt to take the same write lock twice and stall. Make same-index an explicit no-op in swap_dbs (not only a debug assertion).

Proposed fix
 pub fn swap_dbs(&self, shard_id: usize, a: usize, b: usize) {
     debug_assert!(shard_id < self.shards.len(), "shard_id out of bounds");
     debug_assert!(a < self.db_count, "db index a out of bounds");
     debug_assert!(b < self.db_count, "db index b out of bounds");
-    debug_assert_ne!(
-        a, b,
-        "swap_dbs called with equal indices — caller should short-circuit"
-    );
+    if a == b {
+        return;
+    }

     let (lo, hi) = if a < b { (a, b) } else { (b, a) };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
debug_assert_ne!(
a, b,
"swap_dbs called with equal indices — caller should short-circuit"
);
let (lo, hi) = if a < b { (a, b) } else { (b, a) };
// Acquire in ascending index order to prevent deadlock.
let mut guard_lo = self.shards[shard_id][lo].write();
let mut guard_hi = self.shards[shard_id][hi].write();
std::mem::swap(&mut *guard_lo, &mut *guard_hi);
debug_assert!(shard_id < self.shards.len(), "shard_id out of bounds");
debug_assert!(a < self.db_count, "db index a out of bounds");
debug_assert!(b < self.db_count, "db index b out of bounds");
if a == b {
return;
}
let (lo, hi) = if a < b { (a, b) } else { (b, a) };
// Acquire in ascending index order to prevent deadlock.
let mut guard_lo = self.shards[shard_id][lo].write();
let mut guard_hi = self.shards[shard_id][hi].write();
std::mem::swap(&mut *guard_lo, &mut *guard_hi);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/shard/shared_databases.rs` around lines 797 - 806, The swap_dbs function
currently only has a debug_assert to catch a == b but will deadlock in release
builds by taking the same write lock twice; add an explicit no-op early return
at the top of swap_dbs (before computing lo/hi and before acquiring locks) that
checks if a == b and returns immediately, preserving the existing debug_assert
and then proceeding to acquire self.shards[shard_id][lo].write() and
self.shards[shard_id][hi].write() only for lo != hi.

Comment thread src/shard/spsc_handler.rs
Comment on lines +464 to +509
if cmd.eq_ignore_ascii_case(b"MOVE") {
use crate::command::keyspace::move_cmd as ksmv;
let response = match ksmv::parse_move_args(args, db_count) {
Err(e) => e,
Ok((_key, dst_db)) if dst_db == db_idx => {
crate::protocol::Frame::Integer(0)
}
Ok((key, dst_db)) => {
// SPSC runs single-threaded per shard; no concurrent MOVE can
// deadlock. slice path uses split_at_mut (no locking needed).
if crate::shard::slice::is_initialized() {
crate::shard::slice::with_shard(|s| {
ksmv::with_two_slice_dbs(
&mut s.databases,
db_idx,
dst_db,
|src, dst| ksmv::move_core(src, dst, &key),
)
})
} else {
// Lock ordering (lower index first) prevents deadlock with
// handler_monoio/sharded connections on the same shard.
ksmv::with_two_dbs_locked(
&shard_databases.all_shard_dbs()[shard_id],
db_idx,
dst_db,
|src, dst| ksmv::move_core(src, dst, &key),
)
}
}
};
if matches!(response, crate::protocol::Frame::Integer(1)) {
let serialized = aof::serialize_command(&command);
wal_append_and_fanout(
&serialized,
wal_writer,
wal_v3_writer,
repl_backlog,
replica_txs,
repl_state,
shard_id,
);
}
let _ = reply_tx.send(response);
return;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Preserve snapshot COW for MOVE.

This early return bypasses the usual cow_intercept on the source key. If a snapshot is active and the source segment has not been serialized yet, MOVE can delete the key before the snapshot captures its pre-command state.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/shard/spsc_handler.rs` around lines 464 - 509, The MOVE handler returns
early (Integer(0)) for dst_db == db_idx and bypasses snapshot copy-on-write;
before deciding or executing the move (both the dst_db == db_idx branch and the
actual move branch that calls ksmv::move_core), invoke the snapshot COW
interceptor for the source key so snapshots capture the pre-move state—i.e.,
after parsing Ok((key, dst_db)) call the existing cow_intercept function (e.g.
crate::shard::snapshot::cow_intercept or the project’s equivalent) for (db_idx,
&key) while still on the shard context (the same place where you later call
ksmv::with_two_slice_dbs or ksmv::with_two_dbs_locked) so the subsequent
Integer(0) return or ksmv::move_core cannot delete the pre-snapshot data.

Comment thread src/shard/spsc_handler.rs
Comment on lines +464 to +549
if cmd.eq_ignore_ascii_case(b"MOVE") {
use crate::command::keyspace::move_cmd as ksmv;
let response = match ksmv::parse_move_args(args, db_count) {
Err(e) => e,
Ok((_key, dst_db)) if dst_db == db_idx => {
crate::protocol::Frame::Integer(0)
}
Ok((key, dst_db)) => {
// SPSC runs single-threaded per shard; no concurrent MOVE can
// deadlock. slice path uses split_at_mut (no locking needed).
if crate::shard::slice::is_initialized() {
crate::shard::slice::with_shard(|s| {
ksmv::with_two_slice_dbs(
&mut s.databases,
db_idx,
dst_db,
|src, dst| ksmv::move_core(src, dst, &key),
)
})
} else {
// Lock ordering (lower index first) prevents deadlock with
// handler_monoio/sharded connections on the same shard.
ksmv::with_two_dbs_locked(
&shard_databases.all_shard_dbs()[shard_id],
db_idx,
dst_db,
|src, dst| ksmv::move_core(src, dst, &key),
)
}
}
};
if matches!(response, crate::protocol::Frame::Integer(1)) {
let serialized = aof::serialize_command(&command);
wal_append_and_fanout(
&serialized,
wal_writer,
wal_v3_writer,
repl_backlog,
replica_txs,
repl_state,
shard_id,
);
}
let _ = reply_tx.send(response);
return;
}

if cmd.eq_ignore_ascii_case(b"COPY") {
use crate::command::keyspace::move_cmd as ksmv;
if let Some(copy_result) = ksmv::parse_copy_db_args(args, db_idx, db_count) {
let response = match copy_result {
Err(e) => e,
Ok(ca) => {
if crate::shard::slice::is_initialized() {
crate::shard::slice::with_shard(|s| {
ksmv::with_two_slice_dbs(
&mut s.databases,
db_idx,
ca.dst_db,
|src, dst| {
ksmv::copy_core(
src,
dst,
&ca.src_key,
&ca.dst_key,
ca.replace,
)
},
)
})
} else {
ksmv::with_two_dbs_locked(
&shard_databases.all_shard_dbs()[shard_id],
db_idx,
ca.dst_db,
|src, dst| {
ksmv::copy_core(
src,
dst,
&ca.src_key,
&ca.dst_key,
ca.replace,
)
},
)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Refresh both DBs before move_core/copy_core.

The normal dispatch path calls refresh_now_from_cache(cached_clock) before touching a database, but these intercepts skip it for both source and destination DBs. That can make expired source keys movable/copyable and expired destination keys still count as existing.

Suggested fix
                             if crate::shard::slice::is_initialized() {
                                 crate::shard::slice::with_shard(|s| {
                                     ksmv::with_two_slice_dbs(
                                         &mut s.databases,
                                         db_idx,
                                         dst_db,
-                                        |src, dst| ksmv::move_core(src, dst, &key),
+                                        |src, dst| {
+                                            src.refresh_now_from_cache(cached_clock);
+                                            dst.refresh_now_from_cache(cached_clock);
+                                            ksmv::move_core(src, dst, &key)
+                                        },
                                     )
                                 })
                             } else {
                                 ksmv::with_two_dbs_locked(
                                     &shard_databases.all_shard_dbs()[shard_id],
                                     db_idx,
                                     dst_db,
-                                    |src, dst| ksmv::move_core(src, dst, &key),
+                                    |src, dst| {
+                                        src.refresh_now_from_cache(cached_clock);
+                                        dst.refresh_now_from_cache(cached_clock);
+                                        ksmv::move_core(src, dst, &key)
+                                    },
                                 )
                             }

Apply the same refresh in the COPY closures too.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/shard/spsc_handler.rs` around lines 464 - 549, The MOVE and COPY
intercept paths call move_core/copy_core without refreshing the source and
destination DBs, so expired keys may be treated incorrectly; update the closures
passed to ksmv::with_two_slice_dbs and ksmv::with_two_dbs_locked in both the
MOVE (where move_core is invoked) and COPY (where copy_core is invoked) branches
to call refresh_now_from_cache(cached_clock) on both src and dst before calling
ksmv::move_core or ksmv::copy_core (i.e., inside each closure, first run
src.refresh_now_from_cache(cached_clock);
dst.refresh_now_from_cache(cached_clock); then invoke the core function).

Comment thread src/shard/spsc_handler.rs
Comment on lines +2296 to +2325
ShardMessage::SwapDb { a, b, reply_tx } => {
// WAL-before-swap: emit the SWAPDB record so that crash-recovery
// replay can re-apply the swap in the correct order. The record
// is written even when wal_writer/wal_v3_writer are None (the
// fast-path in wal_append_and_fanout will skip it cheaply).
//
// Serialise "SWAPDB <a> <b>" without heap allocation on the number
// formatting (itoa writes into a stack buffer).
let mut a_buf = itoa::Buffer::new();
let mut b_buf = itoa::Buffer::new();
let a_str = a_buf.format(a);
let b_str = b_buf.format(b);
let wal_frame = crate::protocol::Frame::Array(crate::framevec![
crate::protocol::Frame::BulkString(bytes::Bytes::from_static(b"SWAPDB")),
crate::protocol::Frame::BulkString(bytes::Bytes::copy_from_slice(a_str.as_bytes())),
crate::protocol::Frame::BulkString(bytes::Bytes::copy_from_slice(b_str.as_bytes())),
]);
let serialized = aof::serialize_command(&wal_frame);
wal_append_and_fanout(
&serialized,
wal_writer,
wal_v3_writer,
repl_backlog,
replica_txs,
repl_state,
shard_id,
);

// Perform the in-place swap under ascending-index write locks.
shard_databases.swap_dbs(shard_id, a, b);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Block or snapshot-protect SWAPDB.

SWAPDB mutates entire databases but never goes through the normal write/COW path. Running this while SnapshotState is active can expose swapped contents to an in-flight snapshot with no way to reconstruct the pre-swap view.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/shard/spsc_handler.rs` around lines 2296 - 2325, The SwapDb handler
currently performs shard_databases.swap_dbs(a,b) without coordinating with
snapshots; change it to block or snapshot-protect SWAPDB by acquiring the
snapshot protection/guard before serialising/applying the swap (or by checking
SnapshotState and waiting/returning an error if a SnapshotState is active).
Concretely, in the ShardMessage::SwapDb branch wrap the WAL emission +
shard_databases.swap_dbs call with the existing SnapshotState API (e.g. obtain a
SnapshotGuard or call snapshot_state.block_mutations()/wait_for_no_snapshots) so
swaps cannot run concurrently with an in-flight snapshot, and keep the existing
ascending-index write-lock ordering when holding that snapshot guard. Ensure you
reference the ShardMessage::SwapDb handler, shard_databases.swap_dbs, and the
SnapshotState/snapshot guard API when implementing the coordination.

@TinDang97 TinDang97 marked this pull request as draft May 24, 2026 07:03
Lane A's T2.2 (MOVE) and T2.3 (COPY ... DB n) wrapped their command-name
checks in an outer `if metadata::is_write(cmd) { ... }` guard inside the
per-frame dispatch loop of handler_monoio and handler_sharded. The
existing write-path block below already calls `metadata::is_write(cmd)`,
so every write command (SET, HSET, ZADD, etc.) was paying TWO PHF table
lookups instead of one.

In-session A/B on moon-dev (s=1, c=400, median-of-3) measured Lane A
SET p=1 at 218K rps versus the pre-Lane-A baseline (46ed28a) at 241K
rps — a -9.5% regression in the per-command-overhead-dominated cell.
Other cells (GET p=*, SET p>=16) were unaffected because the extra PHF
lookup amortizes away at pipeline depth >= 16.

## Fix

Remove the redundant outer `if metadata::is_write(cmd) {` wrapper.
MOVE and COPY are themselves write commands; the inner
`cmd.eq_ignore_ascii_case(b"MOVE")` / `b"COPY"` checks already gate
correctly. For non-MOVE/COPY workloads (SET, GET, HSET, etc.) the
branch predictor learns "false" on both inline checks for free
(~5ns each, fully predictable).

## Result (post-fix, in-session vs 46ed28a baseline)

| cell        | 46ed28a | Lane A pre-fix | Lane A post-fix | delta vs base |
|-------------|---------|----------------|-----------------|---------------|
| GET p=1     | 198K    | 211K           | 201K            | +1.5%         |
| GET p=16    | 2.65M   | 2.88M          | 2.52M           | -4.9%         |
| GET p=64    | 7.25M   | 7.54M          | 7.18M           | -1.0%         |
| SET p=1     | 241K    | 218K (-9.5%)   | 243K            | +0.9%         |
| SET p=16    | 1.49M   | 1.50M          | 1.48M           | -0.7%         |
| SET p=64    | 2.74M   | 2.76M          | 2.63M           | -4.0%         |

All cells now within +/-5% of 46ed28a in-session.

## Files

- src/server/conn/handler_monoio/mod.rs: lines 1075-1173 (wrapper removed)
- src/server/conn/handler_sharded/mod.rs: lines 1131-1200 (wrapper removed)

## Functional verification (post-fix, --shards 1)

- MOVE k0 1                     -> :1, GET 0 = (nil), GET (db1) = v0  PASS
- COPY src dst DB 2             -> :1, GET (db2) = srcval, src preserved PASS
- SWAPDB 0 3                    -> +OK, GET (db0) = v3, GET (db3) = v0 PASS

## CI gates

- cargo fmt --check: PASS
- cargo clippy --release -- -D warnings: PASS
- cargo test --release --lib: 3273 PASS / 0 FAIL
- cargo test --no-default-features --features runtime-tokio,jemalloc --lib: 2668 PASS / 0 FAIL

handler_single.rs and shard/spsc_handler.rs were already correct (they
gate MOVE/COPY directly with eq_ignore_ascii_case, no outer is_write
wrapper).

author: Tin Dang
@TinDang97 TinDang97 marked this pull request as ready for review May 24, 2026 07:55
@qodo-code-review
Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/server/conn/handler_monoio/mod.rs (1)

1120-1174: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Run the normal eviction/OOM gate before cross-DB COPY.

COPY ... DB n can grow memory, but this fast-path returns before the write path's try_evict_if_needed / try_evict_if_needed_async_spill checks. Under maxmemory or spill-offload, it can duplicate a large value without the usual backpressure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/conn/handler_monoio/mod.rs` around lines 1120 - 1174, The
fast-path for cross-DB COPY currently performs ksmv::copy_core without invoking
the usual eviction/OOM gate; before executing the cross-db branch (i.e., after
parse_copy_db_args returns Ok(ca) and before calling ksmv::with_two_slice_dbs /
ksmv::with_two_dbs_locked and ksmv::copy_core) call the same eviction checks
used by normal writes (try_evict_if_needed and/or
try_evict_if_needed_async_spill on ctx) with the expected size delta for the
copy so we apply backpressure or async spill before duplicating large values;
keep the existing AOF append (ctx.aof_tx / aof::serialize_command /
AofMessage::Append) behavior unchanged and only proceed to call ksmv::copy_core
if the eviction gate allows the write.
src/server/conn/handler_sharded/mod.rs (1)

1170-1202: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Preserve maxmemory/OOM enforcement for cross-DB COPY.

This intercept mutates the destination DB before the normal write path runs, so it skips try_evict_if_needed. A large COPY ... DB n can therefore bypass the shard's memory guard.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/conn/handler_sharded/mod.rs` around lines 1170 - 1202, The current
COPY DB branch mutates the destination DB via ksmv::copy_core (called through
ksmv::with_two_slice_dbs or ksmv::with_two_dbs_locked) before the shard-level
memory enforcement runs, letting large cross-DB COPY bypass try_evict_if_needed;
fix by invoking the shard eviction/OOM enforcement for the destination DB prior
to performing the copy (e.g., detect cross-DB copy from parse_copy_db_args, and
call the existing try_evict_if_needed mechanism on the destination DB via
ctx.shard_databases (or by obtaining the dst DB handle and running
try_evict_if_needed) before calling
with_two_slice_dbs/with_two_dbs_locked/copy_core), ensuring the call path
mirrors the normal write path’s eviction check so COPY cannot exceed maxmemory.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/server/conn/handler_monoio/mod.rs`:
- Around line 1110-1114: The current AOF append gate `if !matches!(response,
Frame::Error(_)) { ... ctx.aof_tx ... aof::serialize_command(&frame) ...
AofMessage::Append(...) }` wrongly persists no-op responses like
`Frame::Integer(0)` and also appends MOVE/COPY even when they returned 0; change
the condition so you skip appending when the response is `Frame::Integer(0)`
and, for MOVE/COPY commands, only send the serialized command to `ctx.aof_tx`
when the response equals `Frame::Integer(1)` (success); keep the existing
skip-on-`Frame::Error(_)` behavior and use the same checks in the parallel block
at the other location (around lines 1164-1167).

In `@src/server/conn/handler_sharded/mod.rs`:
- Around line 1161-1165: The current AOF append branch treats any non-Error
response as a write; change it to only append when the command actually
succeeded and the selected DB is :1: replace the condition around
aof_bytes/AofMessage::Append so it checks (1) the response is a success frame
(not Frame::Error and not a noop/null result — mirror the success-check logic
used in handler_single.rs) and (2) the connection context indicates DB 1 (use
the same ctx DB selector accessor used elsewhere in this module), and apply the
identical fix at the other occurrence (the block at 1193-1196); keep using
aof_bytes, ctx.aof_tx and AofMessage::Append for the append path.

---

Outside diff comments:
In `@src/server/conn/handler_monoio/mod.rs`:
- Around line 1120-1174: The fast-path for cross-DB COPY currently performs
ksmv::copy_core without invoking the usual eviction/OOM gate; before executing
the cross-db branch (i.e., after parse_copy_db_args returns Ok(ca) and before
calling ksmv::with_two_slice_dbs / ksmv::with_two_dbs_locked and
ksmv::copy_core) call the same eviction checks used by normal writes
(try_evict_if_needed and/or try_evict_if_needed_async_spill on ctx) with the
expected size delta for the copy so we apply backpressure or async spill before
duplicating large values; keep the existing AOF append (ctx.aof_tx /
aof::serialize_command / AofMessage::Append) behavior unchanged and only proceed
to call ksmv::copy_core if the eviction gate allows the write.

In `@src/server/conn/handler_sharded/mod.rs`:
- Around line 1170-1202: The current COPY DB branch mutates the destination DB
via ksmv::copy_core (called through ksmv::with_two_slice_dbs or
ksmv::with_two_dbs_locked) before the shard-level memory enforcement runs,
letting large cross-DB COPY bypass try_evict_if_needed; fix by invoking the
shard eviction/OOM enforcement for the destination DB prior to performing the
copy (e.g., detect cross-DB copy from parse_copy_db_args, and call the existing
try_evict_if_needed mechanism on the destination DB via ctx.shard_databases (or
by obtaining the dst DB handle and running try_evict_if_needed) before calling
with_two_slice_dbs/with_two_dbs_locked/copy_core), ensuring the call path
mirrors the normal write path’s eviction check so COPY cannot exceed maxmemory.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 920bf565-5bc3-4fdd-9eed-fdb19d4645a4

📥 Commits

Reviewing files that changed from the base of the PR and between ebd240a and 608e2d1.

📒 Files selected for processing (2)
  • src/server/conn/handler_monoio/mod.rs
  • src/server/conn/handler_sharded/mod.rs

Comment on lines +1110 to +1114
if !matches!(response, Frame::Error(_)) {
if let Some(ref tx) = ctx.aof_tx {
let serialized = aof::serialize_command(&frame);
let _ = tx.try_send(AofMessage::Append(serialized));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Only append MOVE/COPY to AOF on :1.

This gate also persists Frame::Integer(0) no-ops, which drifts from the success-only behavior in src/server/conn/handler_single.rs:2072-2132.

Suggested fix
-                    if !matches!(response, Frame::Error(_)) {
+                    if matches!(response, Frame::Integer(1)) {
                         if let Some(ref tx) = ctx.aof_tx {
                             let serialized = aof::serialize_command(&frame);
                             let _ = tx.try_send(AofMessage::Append(serialized));
                         }
                     }
@@
-                        if !matches!(response, Frame::Error(_)) {
+                        if matches!(response, Frame::Integer(1)) {
                             if let Some(ref tx) = ctx.aof_tx {
                                 let serialized = aof::serialize_command(&frame);
                                 let _ = tx.try_send(AofMessage::Append(serialized));
                             }
                         }

Also applies to: 1164-1167

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/conn/handler_monoio/mod.rs` around lines 1110 - 1114, The current
AOF append gate `if !matches!(response, Frame::Error(_)) { ... ctx.aof_tx ...
aof::serialize_command(&frame) ... AofMessage::Append(...) }` wrongly persists
no-op responses like `Frame::Integer(0)` and also appends MOVE/COPY even when
they returned 0; change the condition so you skip appending when the response is
`Frame::Integer(0)` and, for MOVE/COPY commands, only send the serialized
command to `ctx.aof_tx` when the response equals `Frame::Integer(1)` (success);
keep the existing skip-on-`Frame::Error(_)` behavior and use the same checks in
the parallel block at the other location (around lines 1164-1167).

Comment on lines +1161 to +1165
if !matches!(response, Frame::Error(_)) {
if let Some(ref bytes) = aof_bytes {
if let Some(ref tx) = ctx.aof_tx { let _ = tx.try_send(AofMessage::Append(bytes.clone())); }
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Only append MOVE/COPY to AOF on :1.

This currently logs :0 no-op results as writes, which is inconsistent with the success-only handling in src/server/conn/handler_single.rs:2072-2132.

Suggested fix
-                            if !matches!(response, Frame::Error(_)) {
+                            if matches!(response, Frame::Integer(1)) {
                                 if let Some(ref bytes) = aof_bytes {
                                     if let Some(ref tx) = ctx.aof_tx { let _ = tx.try_send(AofMessage::Append(bytes.clone())); }
                                 }
                             }
@@
-                                if !matches!(response, Frame::Error(_)) {
+                                if matches!(response, Frame::Integer(1)) {
                                     if let Some(ref bytes) = aof_bytes {
                                         if let Some(ref tx) = ctx.aof_tx { let _ = tx.try_send(AofMessage::Append(bytes.clone())); }
                                     }
                                 }

Also applies to: 1193-1196

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/conn/handler_sharded/mod.rs` around lines 1161 - 1165, The current
AOF append branch treats any non-Error response as a write; change it to only
append when the command actually succeeded and the selected DB is :1: replace
the condition around aof_bytes/AofMessage::Append so it checks (1) the response
is a success frame (not Frame::Error and not a noop/null result — mirror the
success-check logic used in handler_single.rs) and (2) the connection context
indicates DB 1 (use the same ctx DB selector accessor used elsewhere in this
module), and apply the identical fix at the other occurrence (the block at
1193-1196); keep using aof_bytes, ctx.aof_tx and AofMessage::Append for the
append path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant