PHOENIX-7863 Add replication consistency point guard to compaction#2489
PHOENIX-7863 Add replication consistency point guard to compaction#2489Himanshu-g81 wants to merge 6 commits into
Conversation
On standby clusters, compaction can prematurely drop delete markers newer than the replication consistency point, causing permanent stale data. This adds a guard that floors maxLookbackWindowStart to the minimum consistency point across all HA groups when replication replay is enabled. Enabled via phoenix.replication.compaction.guard.enabled (default true), active only when phoenix.replication.replay.enabled is also true. Falls back to retaining all delete markers if the consistency point is unavailable.
Move applyReplicationConsistencyGuard and adjustMaxLookbackWindowStart from CompactionScanner to ReplicationLogReplayService where the consistency point logic belongs. Relocate unit test to replication.reader package accordingly.
Now that the guard logic lives in the same class, public access is no longer needed. Tests are in the same package and can still access it.
Co-locate REPLICATION_COMPACTION_GUARD_ENABLED and its default with the other replication config constants in ReplicationLogReplayService, removing them from QueryServices/QueryServicesOptions.
- Remove incorrect @VisibleForTesting from applyReplicationConsistencyGuard (it is genuinely public, called from CompactionScanner) - Reduce adjustMaxLookbackWindowStart to package-private - Fix newHashMapWithExpectedSize(4) to 5 in both IT classes
Given this is a critical safety feature to prevent data desynchronization, when would it ever be disabled? |
There was a problem hiding this comment.
Pull request overview
This PR introduces a replication consistency-point “compaction guard” to prevent standby-cluster compactions from purging delete markers that replication replay has not yet caught up to, avoiding permanent stale data.
Changes:
- Add a new compaction guard configuration (
phoenix.replication.compaction.guard.enabled, defaulttrue) and guard logic inReplicationLogReplayServiceto floormaxLookbackWindowStartby the minimum replication consistency point. - Apply the guard from
CompactionScannerwhen replication replay is enabled (and the guard is enabled). - Add unit + integration coverage for the guard behavior when enabled/disabled and when consistency point retrieval fails.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java | Unit tests for the pure window-flooring adjustment logic. |
| phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java | IT coverage validating delete-marker retention/purge behavior with the guard enabled. |
| phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java | IT coverage ensuring normal compaction behavior when the guard is disabled. |
| phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java | Adds guard config/constants, guard application + testing hooks. |
| phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java | Wires the guard into compaction by adjusting maxLookbackWindowStart. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (replayEnabled && guardEnabled) { | ||
| this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( | ||
| this.maxLookbackWindowStart, conf, tableName, columnFamilyName); | ||
| } |
| protected ReplicationLogReplayService(final Configuration conf) { | ||
| this.conf = conf; | ||
| } |
| public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { | ||
| instance = testInstance; | ||
| } |
| public static void resetInstanceForTesting() { | ||
| instance = null; | ||
| } |
| LOG.info( | ||
| "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" | ||
| + " (consistencyPoint={})", | ||
| tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); |
| long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; | ||
| long consistencyPoint = System.currentTimeMillis() - 120000L; | ||
|
|
| long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; | ||
| long consistencyPoint = System.currentTimeMillis() - 604800000L; | ||
|
|
| private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { | ||
| ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); | ||
| when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); | ||
| ReplicationLogReplayService.setInstanceForTesting(mockService); | ||
| } |
| @VisibleForTesting | ||
| static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, | ||
| long consistencyPoint, String tableName, String columnFamilyName) { | ||
| long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); |
There was a problem hiding this comment.
You are changing the store-level maxLookbackWindowStart but the value actually used for retention is computed per row as the max of this and the ttlWindowStart of the Phoenix compactor's RowContext. When ttlWindowStart in the RowContext is greater than consistencyPoint the boundary snaps to ttlWindowStart and delete markers between consistencyPoint and ttlWindowStart get purged anyway, which I think you are trying to prevent.
The robot pointed me to this issue with this text:
"The guard is effective only when replay lag stays within the TTL window. Concretely: table TTL = 1h, replay lag = 2h ⇒ consistencyPoint = now-2h, guard sets store start to now-2h, but the row uses max(now-1h, now-2h) = now-1h, so markers in the [now-2h, now-1h] band are dropped. Tests happen to pass only because the test table has no TTL."
| public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, | ||
| Configuration conf, String tableName, String columnFamilyName) { | ||
| try { | ||
| long consistencyPoint = getInstance(conf).getConsistencyPoint(); |
There was a problem hiding this comment.
This will iterate every HA group and in SYNC state each call hits the filesystem. That is at least an exists() + listStatus() RPC to the NameNode per HA group, executed once per store per compaction, synchronously, inside the CompactionScanner constructor.
The consistency point moves slowly relative to compaction frequency, so consider caching it with a short TTL of a few seconds and generally using the cached value, rather than recomputing on every scanner construction.
| * Configuration key for enabling/disabling the replication compaction guard | ||
| */ | ||
| public static final String REPLICATION_COMPACTION_GUARD_ENABLED = | ||
| "phoenix.replication.compaction.guard.enabled"; |
There was a problem hiding this comment.
I commented on this elsewhere, but Claude called this a "foot gun", lol.
I'd argue the flag is a foot-gun: setting
phoenix.replication.compaction.guard.enabled=falseon a standby silently re-introduces exactly the data-desync bug this PR wants to fix.
On standby clusters, compaction can prematurely purge delete markers that have timestamps newer than the replication consistency point. This causes permanent stale data because when replay eventually catches up, the delete markers that should have been applied are already gone.
This PR adds a compaction guard that floors
maxLookbackWindowStartto the minimum consistency point across all HA groups, ensuring delete markers newer than the consistency point are retained until replay has processed them.The guard is controlled by
phoenix.replication.compaction.guard.enabled(defaulttrue). It activates automatically whenphoenix.replication.replay.enabled=true— no additional configuration needed on standby clusters. The separate flag allows operators to disable the guard independently if needed without turning off replay entirely.