Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
import org.apache.phoenix.schema.CompiledTTLExpression;
import org.apache.phoenix.schema.ConditionalTTLExpression;
Expand Down Expand Up @@ -199,6 +200,17 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store,
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0
? compactionTime
: compactionTime - (this.maxLookbackInMillis + 1);
Configuration conf = env.getConfiguration();
boolean replayEnabled =
conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED);
boolean guardEnabled =
conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED);
if (replayEnabled && guardEnabled) {
this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard(
this.maxLookbackWindowStart, conf, tableName, columnFamilyName);
}
Comment on lines +210 to +213
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
this.major = major && !forceMinorCompaction;
this.minVersion = cfd.getMinVersions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand All @@ -50,6 +51,17 @@ public class ReplicationLogReplayService {
*/
public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false;

/**
* Configuration key for enabling/disabling the replication compaction guard
*/
public static final String REPLICATION_COMPACTION_GUARD_ENABLED =
"phoenix.replication.compaction.guard.enabled";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented on this elsewhere, but Claude called this a "foot gun", lol.

I'd argue the flag is a foot-gun: setting phoenix.replication.compaction.guard.enabled=false on a standby silently re-introduces exactly the data-desync bug this PR wants to fix.


/**
* Default value for replication compaction guard enabled flag
*/
public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true;

/**
* Number of threads in the executor pool for the replication replay service
*/
Expand Down Expand Up @@ -83,7 +95,7 @@ public class ReplicationLogReplayService {
private ScheduledExecutorService scheduler;
private volatile boolean isRunning = false;

private ReplicationLogReplayService(final Configuration conf) {
protected ReplicationLogReplayService(final Configuration conf) {
this.conf = conf;
}
Comment on lines +98 to 100

Expand All @@ -105,6 +117,16 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws
return instance;
}

@VisibleForTesting
public static void setInstanceForTesting(ReplicationLogReplayService testInstance) {
instance = testInstance;
}
Comment on lines +121 to +123

@VisibleForTesting
public static void resetInstanceForTesting() {
instance = null;
}
Comment on lines +126 to +128

/**
* Starts the replication log replay service by initializing the scheduler and scheduling periodic
* replay operations for each HA Group.
Expand Down Expand Up @@ -229,6 +251,37 @@ protected long getConsistencyPoint() throws IOException, SQLException {
return consistencyPoint;
}

/**
* Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On
* standby clusters, this prevents compaction from dropping delete markers that have timestamps
* newer than the consistency point.
*/
public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart,
Configuration conf, String tableName, String columnFamilyName) {
try {
long consistencyPoint = getInstance(conf).getConsistencyPoint();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will iterate every HA group and in SYNC state each call hits the filesystem. That is at least an exists() + listStatus() RPC to the NameNode per HA group, executed once per store per compaction, synchronously, inside the CompactionScanner constructor.

The consistency point moves slowly relative to compaction frequency, so consider caching it with a short TTL of a few seconds and generally using the cached value, rather than recomputing on every scanner construction.

return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint,
tableName, columnFamilyName);
} catch (Exception e) {
LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}."
+ " Retaining all delete markers.", tableName, columnFamilyName, e);
return 0L;
}
}

@VisibleForTesting
static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart,
long consistencyPoint, String tableName, String columnFamilyName) {
long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are changing the store-level maxLookbackWindowStart but the value actually used for retention is computed per row as the max of this and the ttlWindowStart of the Phoenix compactor's RowContext. When ttlWindowStart in the RowContext is greater than consistencyPoint the boundary snaps to ttlWindowStart and delete markers between consistencyPoint and ttlWindowStart get purged anyway, which I think you are trying to prevent.

The robot pointed me to this issue with this text:
"The guard is effective only when replay lag stays within the TTL window. Concretely: table TTL = 1h, replay lag = 2h ⇒ consistencyPoint = now-2h, guard sets store start to now-2h, but the row uses max(now-1h, now-2h) = now-1h, so markers in the [now-2h, now-1h] band are dropped. Tests happen to pass only because the test table has no TTL."

if (adjusted < currentMaxLookbackWindowStart) {
LOG.info(
"Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}"
+ " (consistencyPoint={})",
tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint);
Comment on lines +277 to +280
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix this!!

}
return adjusted;
}

/** Returns the list of HA groups on the cluster */
protected List<String> getReplicationGroups() throws SQLException {
return HAGroupStoreManager.getInstance(conf).getHAGroupNames();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.replication.reader;

import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;

/**
* Integration test verifying that the replication compaction guard does NOT interfere with normal
* compaction when explicitly disabled via configuration.
*/
@Category(NeedsOwnMiniClusterTest.class)
public class CompactionReplicationGuardDisabledIT extends BaseTest {

private static final int MAX_LOOKBACK_AGE = 15;
private static final int ROWS_POPULATED = 2;
private ManualEnvironmentEdge injectEdge;

@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true));
props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED,
Boolean.toString(true));
props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED,
Boolean.toString(false));
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

@Before
public void beforeTest() throws Exception {
EnvironmentEdgeManager.reset();
injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
}

@After
public synchronized void afterTest() throws Exception {
ReplicationLogReplayService.resetInstanceForTesting();
EnvironmentEdgeManager.reset();
boolean refCountLeaked = isAnyStoreRefCountLeaked();
assertFalse("refCount leaked", refCountLeaked);
}

/**
* When guard is disabled, delete markers are purged normally by maxLookback even though the
* consistency point would have protected them if the guard were enabled.
*/
@Test(timeout = 120000L)
public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
createTable(dataTableName);
TableName dataTable = TableName.valueOf(dataTableName);
populateTable(dataTableName);

injectEdge.incrementValue(1);
long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis();

// Delete a row
conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'");
conn.commit();
injectEdge.incrementValue(1);

// Set consistency point BEFORE delete — guard would retain if enabled
long consistencyPoint = beforeDeleteTime - 1;
ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class);
when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint);
ReplicationLogReplayService.setInstanceForTesting(mockService);

// Advance past maxLookback
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000);

flush(dataTable);
majorCompact(dataTable);

// Guard disabled — delete marker purged by maxLookback as normal
assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1);
}
}

private void flush(TableName table) throws IOException {
getUtility().getAdmin().flush(table);
}

private void majorCompact(TableName table) throws Exception {
TestUtil.majorCompact(getUtility(), table);
}

private void createTable(String tableName) throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(
"CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10),"
+ " val2 VARCHAR(10), val3 VARCHAR(10))");
conn.commit();
}
}

private void populateTable(String tableName) throws SQLException {
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement()
.execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')");
conn.commit();
conn.createStatement()
.execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')");
conn.commit();
}
}
}
Loading