diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java new file mode 100644 index 00000000000..222b9d1d140 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.filter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Writable; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + +/** + * Filter for CDC data table scans that prunes redundant cell versions. For each row, given a set of + * change timestamps (from the CDC batch), this filter includes only: + * + * All other cell versions are skipped, reducing network I/O, memory, and processing overhead. + *

+ * Within each column (family:qualifier), HBase delivers cells in timestamp-descending order. The + * filter maintains a pointer into the sorted change timestamps and tracks whether a pre-image is + * still needed, advancing through the timestamps as cells arrive. + */ +public class CDCVersionFilter extends FilterBase implements Writable { + + private Map timestampMap; + + // Per-row state + private long[] currentTimestamps; + private byte[] currentRowKey; + + // Per-column state + private byte[] prevFamily; + private byte[] prevQualifier; + private int tsIdx; + private boolean needPreImage; + + public CDCVersionFilter() { + } + + /** + * @param timestampMap mapping of data row key to sorted (descending) array of change timestamps + * for that row + */ + public CDCVersionFilter(Map timestampMap) { + this.timestampMap = timestampMap; + } + + @Override + public void reset() throws IOException { + currentTimestamps = null; + currentRowKey = null; + resetColumnState(); + } + + private void resetColumnState() { + prevFamily = null; + prevQualifier = null; + tsIdx = 0; + needPreImage = false; + } + + private boolean isNewRow(Cell cell) { + if (currentRowKey == null) { + return true; + } + return !Bytes.equals(currentRowKey, 0, currentRowKey.length, cell.getRowArray(), + cell.getRowOffset(), cell.getRowLength()); + } + + private void onNewRow(Cell cell) { + currentRowKey = CellUtil.cloneRow(cell); + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(currentRowKey); + currentTimestamps = timestampMap != null ? timestampMap.get(rowKeyPtr) : null; + resetColumnState(); + } + + private boolean isNewColumn(Cell cell) { + if (prevFamily == null) { + return true; + } + if ( + !Bytes.equals(prevFamily, 0, prevFamily.length, cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength()) + ) { + return true; + } + return !Bytes.equals(prevQualifier, 0, prevQualifier.length, cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength()); + } + + private void trackColumn(Cell cell) { + prevFamily = CellUtil.cloneFamily(cell); + prevQualifier = CellUtil.cloneQualifier(cell); + } + + // No @Override for HBase 3 compatibility + public ReturnCode filterKeyValue(Cell v) throws IOException { + return filterCell(v); + } + + @Override + public ReturnCode filterCell(Cell cell) throws IOException { + if (isNewRow(cell)) { + onNewRow(cell); + } + + if (currentTimestamps == null || currentTimestamps.length == 0) { + return ReturnCode.INCLUDE; + } + + Cell.Type type = cell.getType(); + if (type == Cell.Type.DeleteFamily || type == Cell.Type.DeleteFamilyVersion) { + return ReturnCode.INCLUDE; + } + + if (isNewColumn(cell)) { + trackColumn(cell); + tsIdx = 0; + needPreImage = false; + } + + long cellTs = cell.getTimestamp(); + + // Advance past change timestamps that are above this cell's timestamp. + // These timestamps had no cell for this column, but we still need a pre-image + // below them (the first cell we encounter serves as pre-image for all of them). + while (tsIdx < currentTimestamps.length && currentTimestamps[tsIdx] > cellTs) { + needPreImage = true; + tsIdx++; + } + + if (tsIdx < currentTimestamps.length && cellTs == currentTimestamps[tsIdx]) { + needPreImage = true; + tsIdx++; + return ReturnCode.INCLUDE; + } + + if (needPreImage) { + needPreImage = false; + return ReturnCode.INCLUDE; + } + + if (tsIdx >= currentTimestamps.length) { + return ReturnCode.NEXT_COL; + } + + return ReturnCode.SKIP; + } + + @Override + public byte[] toByteArray() throws IOException { + return Writables.getBytes(this); + } + + public static CDCVersionFilter parseFrom(byte[] pbBytes) throws DeserializationException { + try { + return (CDCVersionFilter) Writables.getWritable(pbBytes, new CDCVersionFilter()); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + @Override + public void write(DataOutput out) throws IOException { + if (timestampMap == null) { + out.writeInt(0); + return; + } + out.writeInt(timestampMap.size()); + for (Map.Entry entry : timestampMap.entrySet()) { + ImmutableBytesPtr key = entry.getKey(); + long[] timestamps = entry.getValue(); + out.writeInt(key.getLength()); + out.write(key.get(), key.getOffset(), key.getLength()); + out.writeInt(timestamps.length); + for (long ts : timestamps) { + out.writeLong(ts); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int numRows = in.readInt(); + timestampMap = new HashMap<>(numRows); + for (int i = 0; i < numRows; i++) { + int keyLen = in.readInt(); + byte[] keyBytes = new byte[keyLen]; + in.readFully(keyBytes); + int numTs = in.readInt(); + long[] timestamps = new long[numTs]; + for (int j = 0; j < numTs; j++) { + timestamps[j] = in.readLong(); + } + timestampMap.put(new ImmutableBytesPtr(keyBytes), timestamps); + } + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java index 97b388dc43e..f0c0c4b4ac5 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java @@ -28,9 +28,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilder; import org.apache.hadoop.hbase.CellBuilderFactory; @@ -40,6 +42,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -49,6 +53,7 @@ import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.SingleCellColumnExpression; +import org.apache.phoenix.filter.CDCVersionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.CDCTableInfo; import org.apache.phoenix.index.IndexMaintainer; @@ -123,21 +128,60 @@ protected Scan prepareDataTableScan(Collection dataRowKeys) throws IOExc ) { return null; } - // TODO: Get Timerange from the start row and end row of the index scan object - // and set it in the datatable scan object. - // if (scan.getStartRow().length == 8) { - // startTimeRange = PLong.INSTANCE.getCodec().decodeLong( - // scan.getStartRow(), 0, SortOrder.getDefault()); - // } - // if (scan.getStopRow().length == 8) { - // stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( - // scan.getStopRow(), 0, SortOrder.getDefault()); - // } Scan dataScan = prepareDataTableScan(dataRowKeys, true); if (dataScan == null) { return null; } - return CDCUtil.setupScanForCDC(dataScan); + CDCUtil.setupScanForCDC(dataScan); + Map timestampMap = buildDataRowTimestampMap(dataRowKeys); + if (!timestampMap.isEmpty()) { + CDCVersionFilter versionFilter = new CDCVersionFilter(timestampMap); + Filter existingFilter = dataScan.getFilter(); + if (existingFilter != null) { + dataScan.setFilter( + new FilterList(FilterList.Operator.MUST_PASS_ALL, existingFilter, versionFilter)); + } else { + dataScan.setFilter(versionFilter); + } + } + return dataScan; + } + + private Map buildDataRowTimestampMap(Collection dataRowKeys) { + Set scopedRowKeys = new HashSet<>(dataRowKeys.size()); + for (byte[] dataRowKey : dataRowKeys) { + scopedRowKeys.add(new ImmutableBytesPtr(dataRowKey)); + } + Map> tempMap = new HashMap<>(); + for (List indexRow : indexRows) { + if (indexRow.isEmpty()) { + continue; + } + Cell firstCell = indexRow.get(0); + byte[] indexRowKey = ImmutableBytesPtr.cloneCellRowIfNecessary(firstCell); + byte[] dataRowKey = indexToDataRowKeyMap.get(indexRowKey); + if (dataRowKey == null) { + continue; + } + ImmutableBytesPtr dataRowKeyPtr = new ImmutableBytesPtr(dataRowKey); + if (!scopedRowKeys.contains(dataRowKeyPtr)) { + continue; + } + long changeTs = firstCell.getTimestamp(); + tempMap.computeIfAbsent(dataRowKeyPtr, k -> new TreeSet<>(Collections.reverseOrder())) + .add(changeTs); + } + Map result = new HashMap<>(tempMap.size()); + for (Map.Entry> entry : tempMap.entrySet()) { + TreeSet tsSet = entry.getValue(); + long[] tsArray = new long[tsSet.size()]; + int i = 0; + for (long ts : tsSet) { + tsArray[i++] = ts; + } + result.put(entry.getKey(), tsArray); + } + return result; } protected boolean getNextCoveredIndexRow(List result) throws IOException { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index 167f059be73..824e0f3e3d3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -614,6 +614,149 @@ protected List generateChanges(long startTS, String[] tenantids, Stri return changes; } + protected List generateChangesForPrePostImage(long startTS, String[] tenantids, + String tableName, CommitAdapter committer) throws Exception { + List changes = new ArrayList<>(); + EnvironmentEdgeManager.injectEdge(injectEdge); + injectEdge.setValue(startTS); + committer.init(); + Map rowid1 = new HashMap() { + { + put("K", 1); + } + }; + Map rowid2 = new HashMap() { + { + put("K", 2); + } + }; + long ts = startTS; + for (String tid : tenantids) { + try (Connection conn = committer.getConnection(tid)) { + // Initial inserts for two rows at the same timestamp (one batch, multiple data rows). + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts, rowid1, new TreeMap() { + { + put("V1", 1L); + put("V2", 10L); + put("V3", 1000L); + put("B.VB", 100L); + } + }))); + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts, rowid2, new TreeMap() { + { + put("V1", 200L); + put("V2", 2000L); + } + }))); + committer.commit(conn); + + // Partial update: only V1 changes, so V2/V3/B.VB pre-images come from the prior version. + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V1", 2L); + } + }))); + committer.commit(conn); + + // V3 and B.VB remain untouched here, so they stay sparse relative to the V1/V2 churn. + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V1", 3L); + put("V2", 20L); + } + }))); + committer.commit(conn); + + // Column-level null -> DeleteColumn marker on V2. + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V2", null); + } + }))); + committer.commit(conn); + + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V1", 4L); + put("B.VB", 400L); + } + }))); + committer.commit(conn); + + // Full-row delete -> DeleteFamily marker. + changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, rowid1, null))); + committer.commit(conn); + + // Re-insert after delete: PRE image must be empty (bounded by the delete family marker). + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V1", 5L); + put("V2", 50L); + } + }))); + committer.commit(conn); + + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("B.VB", 500L); + } + }))); + committer.commit(conn); + + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V1", 6L); + put("V2", 60L); + put("V3", 6000L); + put("B.VB", 600L); + } + }))); + committer.commit(conn); + + // Second row mutated again so the scan batch keeps covering multiple data rows. + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid2, new TreeMap() { + { + put("V1", 201L); + } + }))); + committer.commit(conn); + + // Consecutive full-row deletes: CDC collapses these into a single delete event. + changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, rowid1, null))); + committer.commit(conn); + changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, rowid1, null))); + committer.commit(conn); + + // Re-insert a single column after the deletes. + changes.add(addChange(conn, tableName, + new ChangeRow(tid, ts += 100, rowid1, new TreeMap() { + { + put("V1", 7L); + } + }))); + committer.commit(conn); + } + ts += 100; + } + committer.reset(); + for (int i = 0; i < changes.size(); ++i) { + LOGGER.debug("----- generated change: " + i + " tenantId:" + changes.get(i).tenantId + + " changeTS: " + changes.get(i).changeTS + " pks: " + changes.get(i).pks + " change: " + + changes.get(i).change); + } + return changes; + } + protected void verifyChangesViaSCN(String tenantId, Connection conn, String cdcFullName, Map pkColumns, String dataTableName, Map dataColumns, List changes, long startTS, long endTS) throws Exception { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index 9a86354778d..385e4401ec2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -337,7 +337,6 @@ public void testSelectCDC() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { - // For debug: uncomment to see the exact results logged to console. dumpCDCResults(conn, cdcName, new TreeMap() { { put("K", "INTEGER"); @@ -434,6 +433,85 @@ public void testSelectCDC() throws Exception { } } + /** + * Exercises CDC PRE/POST image reconstruction over a row with a deep stack of cell versions + * interleaved with column-level nulls, full-row deletes, consecutive deletes and re-inserts. This + * specifically stresses the server-side version pruning applied to the data table scan: the PRE + * and POST images are recomputed independently via SCN queries on the data table and compared + * against the CDC output, so any over-pruning of needed versions surfaces as a mismatch. + */ + @Test + public void testSelectCDCPreAndPostImageWithVersionPruning() throws Exception { + String cdcName, cdc_sql; + String schemaName = getSchemaName(); + String tableName = getTableOrViewName(schemaName); + String datatableName = tableName; + try (Connection conn = newConnection()) { + createTable(conn, + "CREATE TABLE " + tableName + " (" + (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, v3 INTEGER, B.vb INTEGER, " + + "CONSTRAINT PK PRIMARY KEY " + (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", + encodingScheme, multitenant, tableSaltBuckets, false, null); + if (forView) { + String viewName = getTableOrViewName(schemaName); + createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName, + encodingScheme); + tableName = viewName; + } + cdcName = getCDCName(); + cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; + createCDC(conn, cdc_sql, encodingScheme); + } + + String tenantId = multitenant ? "1000" : null; + String[] tenantids = { tenantId }; + if (multitenant) { + tenantids = new String[] { tenantId, "2000" }; + } + + long startTS = System.currentTimeMillis(); + List changes = + generateChangesForPrePostImage(startTS, tenantids, tableName, COMMIT_SUCCESS); + long currentTime = System.currentTimeMillis(); + long endTS = changes.get(changes.size() - 1).getTimestamp() + 1; + if (endTS > currentTime) { + Thread.sleep(endTS - currentTime); + } + + Map dataColumns = new TreeMap() { + { + put("V1", "INTEGER"); + put("V2", "INTEGER"); + put("V3", "INTEGER"); + put("B.VB", "INTEGER"); + } + }; + String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); + try (Connection conn = newConnection(tenantId)) { + dumpCDCResults(conn, cdcName, new TreeMap() { + { + put("K", "INTEGER"); + } + }, addPartitionInList(conn, cdcFullName, + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + "\"CDC JSON\" FROM " + + cdcFullName)); + + // Verify PRE and POST images are correctly reconstructed from the version-pruned data scan. + verifyChangesViaSCN(tenantId, + getCDCQueryPreparedStatement(conn, cdcFullName, + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName, startTS, endTS) + .executeQuery(), + datatableName, dataColumns, changes, PRE_POST_IMG); + + // Cross-check CHANGE, PRE and POST images together. + verifyChangesViaSCN(tenantId, + getCDCQueryPreparedStatement(conn, cdcFullName, + "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + cdcFullName, startTS, endTS) + .executeQuery(), + datatableName, dataColumns, changes, ALL_IMG); + } + } + @Test public void testSelectGeneric() throws Exception { String cdcName, cdc_sql; @@ -494,7 +572,6 @@ public void testSelectGeneric() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { - // For debug: uncomment to see the exact results logged to console. dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn, cdcFullName, "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName)); @@ -598,7 +675,6 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor }; try (Connection conn = newConnection(tenantId)) { - // For debug: uncomment to see the exact results logged to console. dumpCDCResults(conn, cdcName, new TreeMap() { { put("K", "INTEGER"); @@ -681,7 +757,6 @@ public void testSelectWithTimeRange() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { - // For debug: uncomment to see the exact results logged to console. dumpCDCResults(conn, cdcName, pkColumns, "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java new file mode 100644 index 00000000000..e0e10af9db9 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.filter; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter.ReturnCode; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.junit.Test; + +public class CDCVersionFilterTest { + + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] CF1 = Bytes.toBytes("cf"); + private static final byte[] CF2 = Bytes.toBytes("cf2"); + private static final byte[] CQ1 = Bytes.toBytes("cq1"); + private static final byte[] CQ2 = Bytes.toBytes("cq2"); + private static final byte[] VAL = Bytes.toBytes("v"); + + // Convenience alias so existing tests don't need to change + private static final byte[] CF = CF1; + + private Cell put(byte[] row, byte[] cf, byte[] cq, long ts) { + return new KeyValue(row, cf, cq, ts, KeyValue.Type.Put, VAL); + } + + private Cell deleteColumn(byte[] row, byte[] cf, byte[] cq, long ts) { + return new KeyValue(row, cf, cq, ts, KeyValue.Type.DeleteColumn); + } + + private Cell pointDelete(byte[] row, byte[] cf, byte[] cq, long ts) { + return new KeyValue(row, cf, cq, ts, KeyValue.Type.Delete); + } + + private Cell deleteFamily(byte[] row, byte[] cf, long ts) { + return new KeyValue(row, cf, null, ts, KeyValue.Type.DeleteFamily); + } + + private CDCVersionFilter createFilter(Map timestampMap) { + return new CDCVersionFilter(timestampMap); + } + + @Test + public void testSingleChangeTimestamp() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + // ts=100 is the change, ts=90 is the pre-image, ts=80 should be skipped + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testMultipleChangeTimestamps() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 }); + CDCVersionFilter filter = createFilter(map); + + // Column CQ1: cells at ts 100, 90, 80, 70, 60, 50, 40 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre for 100 + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 80))); + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 70))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 60))); // change + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 50))); // pre for 60 + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40))); + } + + @Test + public void testNoGapBetweenTimestamps() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 80, 60 }); + CDCVersionFilter filter = createFilter(map); + + // Column with cells exactly at change timestamps plus pre-images + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre for 100 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 80))); // change + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); // pre for 80 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 60))); // change + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 50))); // pre for 60 + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40))); + } + + @Test + public void testMissingChangeTimestamp() throws IOException { + // Column has no cell at ts=60 but needs pre-image below it + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); // pre for 100 + // No cell at ts=60; ts=55 should be included as pre-image for ts=60 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 55))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40))); + } + + @Test + public void testDeleteFamilyAlwaysIncluded() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 50))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 10))); + } + + @Test + public void testDeleteColumnAtChangeTimestamp() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre-image + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testMultipleColumns() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + // Column CQ1 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + + // Column CQ2 — state should reset + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2, 85))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ2, 70))); + } + + @Test + public void testMultipleRows() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + map.put(new ImmutableBytesPtr(ROW2), new long[] { 80, 50 }); + CDCVersionFilter filter = createFilter(map); + + // ROW1 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + + // ROW2 — different change timestamps + filter.reset(); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 80))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 70))); // pre for 80 + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW2, CF, CQ1, 60))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 50))); // change + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 40))); // pre for 50 + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW2, CF, CQ1, 30))); + } + + @Test + public void testRowNotInMap() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + // ROW2 is not in the map, should include everything + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 90))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 80))); + } + + @Test + public void testCellAboveAllChangeTimestamps() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 80 }); + CDCVersionFilter filter = createFilter(map); + + // Cell at ts=100 is above the change timestamp 80; should be skipped. + // But we still need a pre-image for ts=80. + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 80))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 60))); + } + + @Test + public void testSerializationRoundTrip() throws IOException, DeserializationException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 }); + map.put(new ImmutableBytesPtr(ROW2), new long[] { 80 }); + CDCVersionFilter original = createFilter(map); + + byte[] serialized = original.toByteArray(); + CDCVersionFilter deserialized = CDCVersionFilter.parseFrom(serialized); + + // Verify deserialized filter behaves the same as original + assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.SKIP, deserialized.filterCell(put(ROW1, CF, CQ1, 80))); + assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 60))); + assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, CQ1, 50))); + assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW1, CF, CQ1, 40))); + + deserialized.reset(); + assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF, CQ1, 80))); + assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF, CQ1, 70))); + assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW2, CF, CQ1, 60))); + } + + @Test + public void testMixedDeleteAndPutTypes() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + // DeleteFamily at ts=100 — always included + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 100))); + // DeleteColumn for CQ1 at ts=100 — matches change timestamp + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 100))); + // Put for CQ1 at ts=90 — pre-image + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + // Put for CQ1 at ts=80 — not needed + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testPreImageServedByChangeTimestamp() throws IOException { + // Change timestamps are adjacent: pre-image for 100 is the cell at 80 + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 80 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + // Cell at 80 serves as both pre-image for 100 AND change at 80 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 80))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 70))); // pre for 80 + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 60))); + } + + @Test + public void testSparseColumnPreImageForSkippedTimestamp() throws IOException { + // ts=100 and ts=60 are change timestamps, but column only has cells at ts=100, 55, 40 + // Cell at 55 must be included as pre-image for both ts=100 and ts=60 + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + // ts=55: pre-image for ts=100, and also covers ts=60 (no cell at 60 for this column) + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 55))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40))); + } + + @Test + public void testNullTimestampMap() throws IOException { + CDCVersionFilter filter = new CDCVersionFilter(null); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + } + + @Test + public void testEmptyTimestampMap() throws IOException { + CDCVersionFilter filter = createFilter(new HashMap<>()); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + } + + @Test + public void testColumnWithNoCellAtAnyChangeTimestamp() throws IOException { + // change timestamps = [100, 60], column has cells only at [95, 55] + // 95 should be pre-image for 100, 55 should be pre-image for 60 + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 }); + CDCVersionFilter filter = createFilter(map); + + // ts=95: while loop advances past ts=100 (100 > 95), needPreImage=true. + // No match (95 != 60). needPreImage=true -> INCLUDE as pre-image for 100. + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 95))); + // ts=55: while loop advances past ts=60 (60 > 55), needPreImage=true. + // tsIdx >= len. needPreImage=true -> INCLUDE as pre-image for 60. + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 55))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40))); + } + + @Test + public void testDeleteColumnAsPreImage() throws IOException { + // DeleteColumn at ts=90 serves as pre-image for change ts=100 + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testAllCellsAboveAllChangeTimestamps() throws IOException { + // change timestamps = [50], column cells at [100, 90, 80] — all above 50, no match + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 50 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testPointDeleteAtChangeTimestamp() throws IOException { + // Point delete (Type.Delete) at change timestamp + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testPointDeleteAsPreImage() throws IOException { + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + // Point delete serving as pre-image + assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } + + @Test + public void testDeleteColumnBetweenChangeTimestampsNotNeeded() throws IOException { + // DeleteColumn at ts=75 is between change ts 100 and 60, not at either, + // and not a pre-image candidate (pre-image for 100 already served by ts=90) + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100, 60 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); // pre for 100 + assertEquals(ReturnCode.SKIP, filter.filterCell(deleteColumn(ROW1, CF, CQ1, 75))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 60))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 50))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 40))); + } + + @Test + public void testMultipleColumnFamilies() throws IOException { + // Same qualifier name in different families — state should reset on family change + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + // CF1:CQ1 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF1, CQ1, 80))); + + // CF2:CQ1 — same qualifier but different family, state should reset + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1, 85))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF2, CQ1, 70))); + } + + @Test + public void testSingleCellAtChangeTimestamp() throws IOException { + // Only one cell for this column, exactly at the change timestamp + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + // Next column starts immediately — no pre-image available, that's fine + } + + @Test + public void testDeleteFamilyDoesNotAffectColumnState() throws IOException { + // DeleteFamily markers should not interfere with per-column version tracking + Map map = new HashMap<>(); + map.put(new ImmutableBytesPtr(ROW1), new long[] { 100 }); + CDCVersionFilter filter = createFilter(map); + + // DeleteFamily at ts=100 + assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 100))); + // Then normal column cells — should still track correctly + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 100))); + assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 90))); + assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 80))); + } +}