diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
new file mode 100644
index 00000000000..222b9d1d140
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Filter for CDC data table scans that prunes redundant cell versions. For each row, given a set of
+ * change timestamps (from the CDC batch), this filter includes only:
+ *
+ * - Cells at a change timestamp (the change itself)
+ * - The first cell below each change timestamp per column (the pre-image)
+ * - All DeleteFamily markers (needed for CDC deletion tracking)
+ *
+ * All other cell versions are skipped, reducing network I/O, memory, and processing overhead.
+ *
+ * Within each column (family:qualifier), HBase delivers cells in timestamp-descending order. The
+ * filter maintains a pointer into the sorted change timestamps and tracks whether a pre-image is
+ * still needed, advancing through the timestamps as cells arrive.
+ */
+public class CDCVersionFilter extends FilterBase implements Writable {
+
+ private Map timestampMap;
+
+ // Per-row state
+ private long[] currentTimestamps;
+ private byte[] currentRowKey;
+
+ // Per-column state
+ private byte[] prevFamily;
+ private byte[] prevQualifier;
+ private int tsIdx;
+ private boolean needPreImage;
+
+ public CDCVersionFilter() {
+ }
+
+ /**
+ * @param timestampMap mapping of data row key to sorted (descending) array of change timestamps
+ * for that row
+ */
+ public CDCVersionFilter(Map timestampMap) {
+ this.timestampMap = timestampMap;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ currentTimestamps = null;
+ currentRowKey = null;
+ resetColumnState();
+ }
+
+ private void resetColumnState() {
+ prevFamily = null;
+ prevQualifier = null;
+ tsIdx = 0;
+ needPreImage = false;
+ }
+
+ private boolean isNewRow(Cell cell) {
+ if (currentRowKey == null) {
+ return true;
+ }
+ return !Bytes.equals(currentRowKey, 0, currentRowKey.length, cell.getRowArray(),
+ cell.getRowOffset(), cell.getRowLength());
+ }
+
+ private void onNewRow(Cell cell) {
+ currentRowKey = CellUtil.cloneRow(cell);
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(currentRowKey);
+ currentTimestamps = timestampMap != null ? timestampMap.get(rowKeyPtr) : null;
+ resetColumnState();
+ }
+
+ private boolean isNewColumn(Cell cell) {
+ if (prevFamily == null) {
+ return true;
+ }
+ if (
+ !Bytes.equals(prevFamily, 0, prevFamily.length, cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength())
+ ) {
+ return true;
+ }
+ return !Bytes.equals(prevQualifier, 0, prevQualifier.length, cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength());
+ }
+
+ private void trackColumn(Cell cell) {
+ prevFamily = CellUtil.cloneFamily(cell);
+ prevQualifier = CellUtil.cloneQualifier(cell);
+ }
+
+ // No @Override for HBase 3 compatibility
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ return filterCell(v);
+ }
+
+ @Override
+ public ReturnCode filterCell(Cell cell) throws IOException {
+ if (isNewRow(cell)) {
+ onNewRow(cell);
+ }
+
+ if (currentTimestamps == null || currentTimestamps.length == 0) {
+ return ReturnCode.INCLUDE;
+ }
+
+ Cell.Type type = cell.getType();
+ if (type == Cell.Type.DeleteFamily || type == Cell.Type.DeleteFamilyVersion) {
+ return ReturnCode.INCLUDE;
+ }
+
+ if (isNewColumn(cell)) {
+ trackColumn(cell);
+ tsIdx = 0;
+ needPreImage = false;
+ }
+
+ long cellTs = cell.getTimestamp();
+
+ // Advance past change timestamps that are above this cell's timestamp.
+ // These timestamps had no cell for this column, but we still need a pre-image
+ // below them (the first cell we encounter serves as pre-image for all of them).
+ while (tsIdx < currentTimestamps.length && currentTimestamps[tsIdx] > cellTs) {
+ needPreImage = true;
+ tsIdx++;
+ }
+
+ if (tsIdx < currentTimestamps.length && cellTs == currentTimestamps[tsIdx]) {
+ needPreImage = true;
+ tsIdx++;
+ return ReturnCode.INCLUDE;
+ }
+
+ if (needPreImage) {
+ needPreImage = false;
+ return ReturnCode.INCLUDE;
+ }
+
+ if (tsIdx >= currentTimestamps.length) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ return ReturnCode.SKIP;
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return Writables.getBytes(this);
+ }
+
+ public static CDCVersionFilter parseFrom(byte[] pbBytes) throws DeserializationException {
+ try {
+ return (CDCVersionFilter) Writables.getWritable(pbBytes, new CDCVersionFilter());
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (timestampMap == null) {
+ out.writeInt(0);
+ return;
+ }
+ out.writeInt(timestampMap.size());
+ for (Map.Entry entry : timestampMap.entrySet()) {
+ ImmutableBytesPtr key = entry.getKey();
+ long[] timestamps = entry.getValue();
+ out.writeInt(key.getLength());
+ out.write(key.get(), key.getOffset(), key.getLength());
+ out.writeInt(timestamps.length);
+ for (long ts : timestamps) {
+ out.writeLong(ts);
+ }
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numRows = in.readInt();
+ timestampMap = new HashMap<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ int keyLen = in.readInt();
+ byte[] keyBytes = new byte[keyLen];
+ in.readFully(keyBytes);
+ int numTs = in.readInt();
+ long[] timestamps = new long[numTs];
+ for (int j = 0; j < numTs; j++) {
+ timestamps[j] = in.readLong();
+ }
+ timestampMap.put(new ImmutableBytesPtr(keyBytes), timestamps);
+ }
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 97b388dc43e..f0c0c4b4ac5 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -28,9 +28,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -40,6 +42,8 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -49,6 +53,7 @@
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
+import org.apache.phoenix.filter.CDCVersionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
@@ -123,21 +128,60 @@ protected Scan prepareDataTableScan(Collection dataRowKeys) throws IOExc
) {
return null;
}
- // TODO: Get Timerange from the start row and end row of the index scan object
- // and set it in the datatable scan object.
- // if (scan.getStartRow().length == 8) {
- // startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
- // scan.getStartRow(), 0, SortOrder.getDefault());
- // }
- // if (scan.getStopRow().length == 8) {
- // stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
- // scan.getStopRow(), 0, SortOrder.getDefault());
- // }
Scan dataScan = prepareDataTableScan(dataRowKeys, true);
if (dataScan == null) {
return null;
}
- return CDCUtil.setupScanForCDC(dataScan);
+ CDCUtil.setupScanForCDC(dataScan);
+ Map timestampMap = buildDataRowTimestampMap(dataRowKeys);
+ if (!timestampMap.isEmpty()) {
+ CDCVersionFilter versionFilter = new CDCVersionFilter(timestampMap);
+ Filter existingFilter = dataScan.getFilter();
+ if (existingFilter != null) {
+ dataScan.setFilter(
+ new FilterList(FilterList.Operator.MUST_PASS_ALL, existingFilter, versionFilter));
+ } else {
+ dataScan.setFilter(versionFilter);
+ }
+ }
+ return dataScan;
+ }
+
+ private Map buildDataRowTimestampMap(Collection dataRowKeys) {
+ Set scopedRowKeys = new HashSet<>(dataRowKeys.size());
+ for (byte[] dataRowKey : dataRowKeys) {
+ scopedRowKeys.add(new ImmutableBytesPtr(dataRowKey));
+ }
+ Map> tempMap = new HashMap<>();
+ for (List indexRow : indexRows) {
+ if (indexRow.isEmpty()) {
+ continue;
+ }
+ Cell firstCell = indexRow.get(0);
+ byte[] indexRowKey = ImmutableBytesPtr.cloneCellRowIfNecessary(firstCell);
+ byte[] dataRowKey = indexToDataRowKeyMap.get(indexRowKey);
+ if (dataRowKey == null) {
+ continue;
+ }
+ ImmutableBytesPtr dataRowKeyPtr = new ImmutableBytesPtr(dataRowKey);
+ if (!scopedRowKeys.contains(dataRowKeyPtr)) {
+ continue;
+ }
+ long changeTs = firstCell.getTimestamp();
+ tempMap.computeIfAbsent(dataRowKeyPtr, k -> new TreeSet<>(Collections.reverseOrder()))
+ .add(changeTs);
+ }
+ Map result = new HashMap<>(tempMap.size());
+ for (Map.Entry> entry : tempMap.entrySet()) {
+ TreeSet tsSet = entry.getValue();
+ long[] tsArray = new long[tsSet.size()];
+ int i = 0;
+ for (long ts : tsSet) {
+ tsArray[i++] = ts;
+ }
+ result.put(entry.getKey(), tsArray);
+ }
+ return result;
}
protected boolean getNextCoveredIndexRow(List| result) throws IOException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 167f059be73..824e0f3e3d3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -614,6 +614,149 @@ protected List generateChanges(long startTS, String[] tenantids, Stri
return changes;
}
+ protected List generateChangesForPrePostImage(long startTS, String[] tenantids,
+ String tableName, CommitAdapter committer) throws Exception {
+ List changes = new ArrayList<>();
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTS);
+ committer.init();
+ Map rowid1 = new HashMap() {
+ {
+ put("K", 1);
+ }
+ };
+ Map rowid2 = new HashMap() {
+ {
+ put("K", 2);
+ }
+ };
+ long ts = startTS;
+ for (String tid : tenantids) {
+ try (Connection conn = committer.getConnection(tid)) {
+ // Initial inserts for two rows at the same timestamp (one batch, multiple data rows).
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts, rowid1, new TreeMap() {
+ {
+ put("V1", 1L);
+ put("V2", 10L);
+ put("V3", 1000L);
+ put("B.VB", 100L);
+ }
+ })));
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts, rowid2, new TreeMap() {
+ {
+ put("V1", 200L);
+ put("V2", 2000L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Partial update: only V1 changes, so V2/V3/B.VB pre-images come from the prior version.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V1", 2L);
+ }
+ })));
+ committer.commit(conn);
+
+ // V3 and B.VB remain untouched here, so they stay sparse relative to the V1/V2 churn.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V1", 3L);
+ put("V2", 20L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Column-level null -> DeleteColumn marker on V2.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V2", null);
+ }
+ })));
+ committer.commit(conn);
+
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V1", 4L);
+ put("B.VB", 400L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Full-row delete -> DeleteFamily marker.
+ changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, rowid1, null)));
+ committer.commit(conn);
+
+ // Re-insert after delete: PRE image must be empty (bounded by the delete family marker).
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V1", 5L);
+ put("V2", 50L);
+ }
+ })));
+ committer.commit(conn);
+
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("B.VB", 500L);
+ }
+ })));
+ committer.commit(conn);
+
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V1", 6L);
+ put("V2", 60L);
+ put("V3", 6000L);
+ put("B.VB", 600L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Second row mutated again so the scan batch keeps covering multiple data rows.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid2, new TreeMap() {
+ {
+ put("V1", 201L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Consecutive full-row deletes: CDC collapses these into a single delete event.
+ changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, rowid1, null)));
+ committer.commit(conn);
+ changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, rowid1, null)));
+ committer.commit(conn);
+
+ // Re-insert a single column after the deletes.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap() {
+ {
+ put("V1", 7L);
+ }
+ })));
+ committer.commit(conn);
+ }
+ ts += 100;
+ }
+ committer.reset();
+ for (int i = 0; i < changes.size(); ++i) {
+ LOGGER.debug("----- generated change: " + i + " tenantId:" + changes.get(i).tenantId
+ + " changeTS: " + changes.get(i).changeTS + " pks: " + changes.get(i).pks + " change: "
+ + changes.get(i).change);
+ }
+ return changes;
+ }
+
protected void verifyChangesViaSCN(String tenantId, Connection conn, String cdcFullName,
Map pkColumns, String dataTableName, Map dataColumns,
List changes, long startTS, long endTS) throws Exception {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 9a86354778d..385e4401ec2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -337,7 +337,6 @@ public void testSelectCDC() throws Exception {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, new TreeMap() {
{
put("K", "INTEGER");
@@ -434,6 +433,85 @@ public void testSelectCDC() throws Exception {
}
}
+ /**
+ * Exercises CDC PRE/POST image reconstruction over a row with a deep stack of cell versions
+ * interleaved with column-level nulls, full-row deletes, consecutive deletes and re-inserts. This
+ * specifically stresses the server-side version pruning applied to the data table scan: the PRE
+ * and POST images are recomputed independently via SCN queries on the data table and compared
+ * against the CDC output, so any over-pruning of needed versions surfaces as a mismatch.
+ */
+ @Test
+ public void testSelectCDCPreAndPostImageWithVersionPruning() throws Exception {
+ String cdcName, cdc_sql;
+ String schemaName = getSchemaName();
+ String tableName = getTableOrViewName(schemaName);
+ String datatableName = tableName;
+ try (Connection conn = newConnection()) {
+ createTable(conn,
+ "CREATE TABLE " + tableName + " (" + (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "")
+ + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, v3 INTEGER, B.vb INTEGER, "
+ + "CONSTRAINT PK PRIMARY KEY " + (multitenant ? "(TENANT_ID, k) " : "(k)") + ")",
+ encodingScheme, multitenant, tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = getTableOrViewName(schemaName);
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = getCDCName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDC(conn, cdc_sql, encodingScheme);
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = { tenantId };
+ if (multitenant) {
+ tenantids = new String[] { tenantId, "2000" };
+ }
+
+ long startTS = System.currentTimeMillis();
+ List changes =
+ generateChangesForPrePostImage(startTS, tenantids, tableName, COMMIT_SUCCESS);
+ long currentTime = System.currentTimeMillis();
+ long endTS = changes.get(changes.size() - 1).getTimestamp() + 1;
+ if (endTS > currentTime) {
+ Thread.sleep(endTS - currentTime);
+ }
+
+ Map dataColumns = new TreeMap() {
+ {
+ put("V1", "INTEGER");
+ put("V2", "INTEGER");
+ put("V3", "INTEGER");
+ put("B.VB", "INTEGER");
+ }
+ };
+ String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+ try (Connection conn = newConnection(tenantId)) {
+ dumpCDCResults(conn, cdcName, new TreeMap() {
+ {
+ put("K", "INTEGER");
+ }
+ }, addPartitionInList(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + "\"CDC JSON\" FROM "
+ + cdcFullName));
+
+ // Verify PRE and POST images are correctly reconstructed from the version-pruned data scan.
+ verifyChangesViaSCN(tenantId,
+ getCDCQueryPreparedStatement(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName, startTS, endTS)
+ .executeQuery(),
+ datatableName, dataColumns, changes, PRE_POST_IMG);
+
+ // Cross-check CHANGE, PRE and POST images together.
+ verifyChangesViaSCN(tenantId,
+ getCDCQueryPreparedStatement(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName, startTS, endTS)
+ .executeQuery(),
+ datatableName, dataColumns, changes, ALL_IMG);
+ }
+ }
+
@Test
public void testSelectGeneric() throws Exception {
String cdcName, cdc_sql;
@@ -494,7 +572,6 @@ public void testSelectGeneric() throws Exception {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn, cdcFullName,
"SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName));
@@ -598,7 +675,6 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor
};
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, new TreeMap() {
{
put("K", "INTEGER");
@@ -681,7 +757,6 @@ public void testSelectWithTimeRange() throws Exception {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, pkColumns,
"SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
new file mode 100644
index 00000000000..e0e10af9db9
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.junit.Test;
+
+public class CDCVersionFilterTest {
+
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] CF1 = Bytes.toBytes("cf");
+ private static final byte[] CF2 = Bytes.toBytes("cf2");
+ private static final byte[] CQ1 = Bytes.toBytes("cq1");
+ private static final byte[] CQ2 = Bytes.toBytes("cq2");
+ private static final byte[] VAL = Bytes.toBytes("v");
+
+ // Convenience alias so existing tests don't need to change
+ private static final byte[] CF = CF1;
+
+ private Cell put(byte[] row, byte[] cf, byte[] cq, long ts) {
+ return new KeyValue(row, cf, cq, ts, KeyValue.Type.Put, VAL);
+ }
+
+ private Cell deleteColumn(byte[] row, byte[] cf, byte[] cq, long ts) {
+ return new KeyValue(row, cf, cq, ts, KeyValue.Type.DeleteColumn);
+ }
+
+ private Cell pointDelete(byte[] row, byte[] cf, byte[] cq, long ts) {
+ return new KeyValue(row, cf, cq, ts, KeyValue.Type.Delete);
+ }
+
+ private Cell deleteFamily(byte[] row, byte[] cf, long ts) {
+ return new KeyValue(row, cf, null, ts, KeyValue.Type.DeleteFamily);
+ }
+
+ private CDCVersionFilter createFilter(Map timestampMap) {
+ return new CDCVersionFilter(timestampMap);
+ }
+
+ @Test
+ public void testSingleChangeTimestamp() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // ts=100 is the change, ts=90 is the pre-image, ts=80 should be skipped
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testMultipleChangeTimestamps() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // Column CQ1: cells at ts 100, 90, 80, 70, 60, 50, 40
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre for 100
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 70)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 60))); // change
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 50))); // pre for 60
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40)));
+ }
+
+ @Test
+ public void testNoGapBetweenTimestamps() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 80, 60 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // Column with cells exactly at change timestamps plus pre-images
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre for 100
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 80))); // change
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); // pre for 80
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 60))); // change
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 50))); // pre for 60
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40)));
+ }
+
+ @Test
+ public void testMissingChangeTimestamp() throws IOException {
+ // Column has no cell at ts=60 but needs pre-image below it
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); // pre for 100
+ // No cell at ts=60; ts=55 should be included as pre-image for ts=60
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 55)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40)));
+ }
+
+ @Test
+ public void testDeleteFamilyAlwaysIncluded() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 50)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 10)));
+ }
+
+ @Test
+ public void testDeleteColumnAtChangeTimestamp() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testMultipleColumns() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // Column CQ1
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+
+ // Column CQ2 — state should reset
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2, 85)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ2, 70)));
+ }
+
+ @Test
+ public void testMultipleRows() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ map.put(new ImmutableBytesPtr(ROW2), new long[] { 80, 50 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // ROW1
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+
+ // ROW2 — different change timestamps
+ filter.reset();
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 70))); // pre for 80
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW2, CF, CQ1, 60)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 50))); // change
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 40))); // pre for 50
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW2, CF, CQ1, 30)));
+ }
+
+ @Test
+ public void testRowNotInMap() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // ROW2 is not in the map, should include everything
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 90)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testCellAboveAllChangeTimestamps() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 80 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // Cell at ts=100 is above the change timestamp 80; should be skipped.
+ // But we still need a pre-image for ts=80.
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 60)));
+ }
+
+ @Test
+ public void testSerializationRoundTrip() throws IOException, DeserializationException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 });
+ map.put(new ImmutableBytesPtr(ROW2), new long[] { 80 });
+ CDCVersionFilter original = createFilter(map);
+
+ byte[] serialized = original.toByteArray();
+ CDCVersionFilter deserialized = CDCVersionFilter.parseFrom(serialized);
+
+ // Verify deserialized filter behaves the same as original
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.SKIP, deserialized.filterCell(put(ROW1, CF, CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 60)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 50)));
+ assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW1, CF, CQ1, 40)));
+
+ deserialized.reset();
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF, CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF, CQ1, 70)));
+ assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW2, CF, CQ1, 60)));
+ }
+
+ @Test
+ public void testMixedDeleteAndPutTypes() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // DeleteFamily at ts=100 — always included
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 100)));
+ // DeleteColumn for CQ1 at ts=100 — matches change timestamp
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 100)));
+ // Put for CQ1 at ts=90 — pre-image
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ // Put for CQ1 at ts=80 — not needed
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testPreImageServedByChangeTimestamp() throws IOException {
+ // Change timestamps are adjacent: pre-image for 100 is the cell at 80
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 80 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ // Cell at 80 serves as both pre-image for 100 AND change at 80
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); // pre for 80
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 60)));
+ }
+
+ @Test
+ public void testSparseColumnPreImageForSkippedTimestamp() throws IOException {
+ // ts=100 and ts=60 are change timestamps, but column only has cells at ts=100, 55, 40
+ // Cell at 55 must be included as pre-image for both ts=100 and ts=60
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ // ts=55: pre-image for ts=100, and also covers ts=60 (no cell at 60 for this column)
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 55)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40)));
+ }
+
+ @Test
+ public void testNullTimestampMap() throws IOException {
+ CDCVersionFilter filter = new CDCVersionFilter(null);
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ }
+
+ @Test
+ public void testEmptyTimestampMap() throws IOException {
+ CDCVersionFilter filter = createFilter(new HashMap<>());
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ }
+
+ @Test
+ public void testColumnWithNoCellAtAnyChangeTimestamp() throws IOException {
+ // change timestamps = [100, 60], column has cells only at [95, 55]
+ // 95 should be pre-image for 100, 55 should be pre-image for 60
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // ts=95: while loop advances past ts=100 (100 > 95), needPreImage=true.
+ // No match (95 != 60). needPreImage=true -> INCLUDE as pre-image for 100.
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 95)));
+ // ts=55: while loop advances past ts=60 (60 > 55), needPreImage=true.
+ // tsIdx >= len. needPreImage=true -> INCLUDE as pre-image for 60.
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 55)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40)));
+ }
+
+ @Test
+ public void testDeleteColumnAsPreImage() throws IOException {
+ // DeleteColumn at ts=90 serves as pre-image for change ts=100
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testAllCellsAboveAllChangeTimestamps() throws IOException {
+ // change timestamps = [50], column cells at [100, 90, 80] — all above 50, no match
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 50 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testPointDeleteAtChangeTimestamp() throws IOException {
+ // Point delete (Type.Delete) at change timestamp
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testPointDeleteAsPreImage() throws IOException {
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ // Point delete serving as pre-image
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testDeleteColumnBetweenChangeTimestampsNotNeeded() throws IOException {
+ // DeleteColumn at ts=75 is between change ts 100 and 60, not at either,
+ // and not a pre-image candidate (pre-image for 100 already served by ts=90)
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre for 100
+ assertEquals(ReturnCode.SKIP, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 75)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 60)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 50)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40)));
+ }
+
+ @Test
+ public void testMultipleColumnFamilies() throws IOException {
+ // Same qualifier name in different families — state should reset on family change
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // CF1:CQ1
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF1, CQ1, 80)));
+
+ // CF2:CQ1 — same qualifier but different family, state should reset
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1, 85)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF2, CQ1, 70)));
+ }
+
+ @Test
+ public void testSingleCellAtChangeTimestamp() throws IOException {
+ // Only one cell for this column, exactly at the change timestamp
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ // Next column starts immediately — no pre-image available, that's fine
+ }
+
+ @Test
+ public void testDeleteFamilyDoesNotAffectColumnState() throws IOException {
+ // DeleteFamily markers should not interfere with per-column version tracking
+ Map map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 });
+ CDCVersionFilter filter = createFilter(map);
+
+ // DeleteFamily at ts=100
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 100)));
+ // Then normal column cells — should still track correctly
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+}
| |