PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379
Conversation
|
|
||
| /** | ||
| * PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between | ||
| * hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side |
|
|
||
| public static Long getPhoenixSyncTableFromTime(Configuration conf) { | ||
| Preconditions.checkNotNull(conf); | ||
| String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME); |
There was a problem hiding this comment.
Why didn't you use conf.getLong() ?
| conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime); | ||
| } | ||
|
|
||
| public static Long getPhoenixSyncTableToTime(Configuration conf) { |
There was a problem hiding this comment.
Here also why didn't you use conf.getLong ?
| return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, | ||
| DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER); | ||
| } | ||
|
|
There was a problem hiding this comment.
IMO these APIs can remain in PhoenixSyncTableTool class only. They are specific to Sync tool
There was a problem hiding this comment.
I saw other Tool also has its setter/getter in PhoenixConfigurationUtil.java, so followed same pattern. I am okay to move
There was a problem hiding this comment.
By definition util is something which is useful in multiple contexts. I don't think we should follow the same pattern.
| return false; | ||
| } | ||
|
|
||
| buildChunkMetadataResult(results, isTargetScan); |
There was a problem hiding this comment.
If we break out early due to page timeout won't we have a partial chunk ?
There was a problem hiding this comment.
It seems that isTargetScan is for different purpose or at-least the naming can be improved.
There was a problem hiding this comment.
If we break out early due to page timeout won't we have a partial chunk ?
I have kept source not to have partial chunk, whatever can be processed with page timeout will be considered as source chunk and target will scan with that source chunk size.
Though we can have partial chunk for source, but I was thinking if chunking is taking ~5-10 mins, its better not to hit the same server immediately to let server cool off ?
For target chunk, we always assume target as partial chunk. and caulculates final checksum in Mapper itslef when all rows boundary is read.
That is why isTargetScan is synonymous to partialChunk.
There was a problem hiding this comment.
I am not sure what you mean by "not to hit the same server immediately to let server cool off". I would suggest refactoring the code a little bit . isTargetScan is already a field of this class. No need to pass it as a parameter to the buildChunkMetadataResult function. Rather, just directly reference this field inside the function. IMO, the current naming is a little confusing. Also, please add comments on why the target chunk is always assumed as a partial chunk.
There was a problem hiding this comment.
I am not sure what you mean by "not to hit the same server immediately to let server cool off"
I meant, If we see page timeout(15 mins) for source cluster, we return with whatever has been collected be it 1 row or 1GB of row. And then look for same rows in target region boundary.
We have an option to continue making source chunk if we see page timeout, until we get 1GB of data or end of region. But I avoided that. Reason being, if we are seeing page timeout, it indicates server is not able to keep up with the request. So instead we go to target cluster to get chunk and validate checksum.
Lets say it took a min to calculate checksum from target cluster, we delayed hitting source RS again by 1 min.
There was a problem hiding this comment.
Lets say it took a min to calculate checksum from target cluster, we delayed hitting source RS again by 1 min.
I realized, shall we add an explicit delay to throttle as well at Mapper side if source chunk times out before processing 1GB chunk ?
| private byte[] chunkStartKey = null; | ||
| private byte[] chunkEndKey = null; | ||
| private long currentChunkSize = 0L; | ||
| private long currentChunkRowCount = 0L; |
There was a problem hiding this comment.
Improvement can be made here to introduce the notion of a chunk object
| byte[] rowKey = CellUtil.cloneRow(rowCells.get(0)); | ||
| long rowSize = calculateRowSize(rowCells); | ||
| addRowToChunk(rowKey, rowCells, rowSize); | ||
| if (!isTargetScan && willExceedChunkLimits(rowSize)) { |
There was a problem hiding this comment.
So addRowToChunk is already adding the rowSize to chunkSize and then willExceedChunkLimits is again adding rowSize to chunkSize
| public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { | ||
| region.startRegionOperation(); | ||
| try { | ||
| resetChunkState(); |
There was a problem hiding this comment.
If you have a notion of a chunk object then you don't need reset you can simply create a new chunk
| /** | ||
| * PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum | ||
| */ | ||
| public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation"; |
There was a problem hiding this comment.
Should all of these instead be named SYNC_TOOL ?
There was a problem hiding this comment.
I have used SyncTableTool for user facing class/config. For others, I have used SyncTable, are you recommending to move all Classes and config to SyncTool instead of SyncTable i.e PhoenixSyncTableRegionScanner -> PhoenixSyncToolRegionScanner ?
I felt SyncTable is more self explainable compared to SyncTool, we can also change it to SyncTableTool at all places ?
There was a problem hiding this comment.
I see. Its okay. Not a big deal. We can stick with the same naming convention.
| if (chunkStartKey == null) { | ||
| LOGGER.warn("Paging timed out while fetching first row of chunk, initStartRowKey: {}", | ||
| Bytes.toStringBinary(initStartRowKey)); | ||
| updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan); | ||
| return true; |
There was a problem hiding this comment.
Is this ever hit ? Even with 0 page timeout we get at least one row
There was a problem hiding this comment.
Yeah, I was not able repro it in my Integration test. Kept it as defensive check.
Even with 0 page timeout we get at least one row
what would this row contain, if we couldn't get any row from table ?
There was a problem hiding this comment.
Either we will get some exception or we will get a row. You can simplify your code by not handling this case. I think this will also get rid of updateDummyWithPrevRowKey function as it seems this is the only place which is calling this function.
There was a problem hiding this comment.
I see, we can remove handling of dummy rows in SyncTableMapper as well then.
| @Override | ||
| protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context) | ||
| throws IOException, InterruptedException { | ||
| context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); |
There was a problem hiding this comment.
What is the meaning of INPUT_RECORDS in the context of sync tool ?
There was a problem hiding this comment.
It indicates number of mappers created
There was a problem hiding this comment.
Why do you need a Phoenix specific counter for it. The Map reduce framework already tells us the number of mappers.
There was a problem hiding this comment.
Via this we are also maintaining counters for mappers created, mappers with no failed chunk and mapper with failed chunk.
Based on your suggestion on other comment., we can rename it to VERIFIED_MAPPER, FAILED_MAPPER.
|
|
||
| if (sourceRowsProcessed > 0) { | ||
| if (mismatchedChunk == 0) { | ||
| context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); |
There was a problem hiding this comment.
What does the OUTPUT_RECORDS mean in the context of Sync tool ?
There was a problem hiding this comment.
Number of mapper sucessfully processed. We also have FAILED_RECORD for failed mappers.
There was a problem hiding this comment.
RECORDS generally implies rows. One suggestion could be to use VERIFIED_CHUNKS, FAILED_CHUNKS
| + " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n" | ||
| + " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n" | ||
| + " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n" | ||
| + " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n" |
There was a problem hiding this comment.
I don't think Counters should have a fixed limit. Just make them VARCHAR so that we can add more counters in the future.
There was a problem hiding this comment.
I think we should also add tenantId as one of PK column.
|
|
||
| public enum Type { | ||
| CHUNK, | ||
| MAPPER_REGION |
|
|
||
| String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME | ||
| + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" | ||
| + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; |
There was a problem hiding this comment.
There are only 2 possible status so does it make sense to set them in the query ? If you don't then you are only querying pk columns without any filter.
| qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); | ||
| PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); | ||
| PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); | ||
| if (LOGGER.isDebugEnabled()) { |
There was a problem hiding this comment.
Let's move this log to INFO level. It will be useful.
| formatter.printHelp("hadoop jar phoenix-server.jar " + PhoenixSyncTableTool.class.getName(), | ||
| "Synchronize a Phoenix table between source and target clusters", options, | ||
| "\nExample usage:\n" | ||
| + "hadoop jar phoenix-server.jar org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n" |
There was a problem hiding this comment.
Generally we run IndexTool via /hbase/bin/hbase IndexTool.
| qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); | ||
| qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); | ||
| PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); | ||
| PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); |
There was a problem hiding this comment.
Do we need the end time to be within the max lookback window ? How will the sync tool break if the end time is outside of the max lookback window ?
There was a problem hiding this comment.
Right, this check is not useful.
| PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); | ||
| } | ||
| if (tenantId != null) { | ||
| PhoenixConfigurationUtil.setTenantId(configuration, tenantId); |
There was a problem hiding this comment.
Can you verify if the tenantid is being correctly set as a key prefix on the scan ?
There was a problem hiding this comment.
If you have a table region with multiple tenants and we pass a tenant id then our scan range should start with the tenantid prefix.
There was a problem hiding this comment.
Yes, it only create input ranges and scan for tenant specific rows. We have an IT for same
| * Configures a Configuration object with ZooKeeper settings from a ZK quorum string. | ||
| * @param baseConf Base configuration to create from (typically job configuration) | ||
| * @param zkQuorum ZooKeeper quorum string in format: "zk_quorum:port:znode" Example: | ||
| * "zk1,zk2,zk3:2181:/hbase" |
There was a problem hiding this comment.
This is actually not the only format for zk quorum. There are other valid formats also where the port number is specified separately for each server. There is actually a very useful API in Hbase called HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum) We should use that as that also works for zk registry.
|
|
||
| String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME | ||
| + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" | ||
| + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; |
There was a problem hiding this comment.
I am not 100% positive that you can assume that the output of this query is always sorted by row key. You might have to add an ORDER BY clause here. If you are adding an ORDER BY clause it will be better to add all the PK columns to make the sorting more efficient.
| int completedIdx = 0; | ||
|
|
||
| // Two pointer comparison across splitRange and completedRange | ||
| while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) { |
There was a problem hiding this comment.
I think you are assuming here that completedRegions is already sorted. Please see my comment on the getProcessedMapperRegions function.
| PhoenixInputSplit split = (PhoenixInputSplit) allSplits.get(splitIdx); | ||
| KeyRange splitRange = split.getKeyRange(); | ||
| KeyRange completedRange = completedRegions.get(completedIdx); | ||
| byte[] splitStart = splitRange.getLowerRange(); |
There was a problem hiding this comment.
Will the end key of the split range will always be exclusive ? If yes, can you please add a comment
| * @return List of (startKey, endKey) pairs representing unprocessed ranges | ||
| */ | ||
| @VisibleForTesting | ||
| public List<Pair<byte[], byte[]>> calculateUnprocessedRanges(byte[] mapperRegionStart, |
There was a problem hiding this comment.
Maybe we could return a List<KeyRange>
| if (hasStartBoundary) { | ||
| queryBuilder.append(" AND END_ROW_KEY >= ?"); | ||
| } | ||
| queryBuilder.append(" AND STATUS IN (?, ?)"); |
There was a problem hiding this comment.
Same as above we don't need to pass status
| scan.setCacheBlocks(false); | ||
| scan.setTimeRange(fromTime, toTime); | ||
| if (isTargetScan) { | ||
| scan.setLimit(1); |
There was a problem hiding this comment.
Can you add a comment here why we are setting limit to 1 and caching to 1
| Scan scan = new Scan(); | ||
| scan.withStartRow(startKey, isStartKeyInclusive); | ||
| scan.withStopRow(endKey, isEndKeyInclusive); | ||
| scan.setRaw(true); |
There was a problem hiding this comment.
Are we sure we have to do raw scan ?
There was a problem hiding this comment.
Also, can we make this configurable via the SyncTool commandl ine
| scan.withStartRow(startKey, isStartKeyInclusive); | ||
| scan.withStopRow(endKey, isEndKeyInclusive); | ||
| scan.setRaw(true); | ||
| scan.readAllVersions(); |
There was a problem hiding this comment.
Same can we make the behavior of reading all versions configurable.
| @@ -0,0 +1,2267 @@ | |||
| /* | |||
There was a problem hiding this comment.
Can you add a test where rows are deleted on both the source and target tables but you have run compaction on only one. We can have actually 2 cases where compaction is run on the source but not on target and vice versa. I saw that you are doing raw scan. Maxlookback settings will also impact this.
No description provided.