Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
private List<ColumnCategory> columnTypes;
private List<String> measurementList;
private List<TSDataType> dataTypeList;
private List<IMeasurementSchema> fieldSchemaList;
private int deviceIdSize;

private List<ModsOperationUtil.ModsInfo> modsInfoList;
Expand Down Expand Up @@ -194,7 +195,7 @@

long size = 0;
List<AbstractAlignedChunkMetadata> iChunkMetadataList =
reader.getAlignedChunkMetadata(pair.left, true);
reader.getAlignedChunkMetadata(pair.left, false);

Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
iChunkMetadataList.iterator();
Expand All @@ -213,27 +214,7 @@
continue;
}

Iterator<IChunkMetadata> iChunkMetadataIterator =
alignedChunkMetadata.getValueChunkMetadataList().iterator();
while (iChunkMetadataIterator.hasNext()) {
IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next();
if (iChunkMetadata == null) {
iChunkMetadataIterator.remove();
continue;
}

if (!modifications.isEmpty()
&& ModsOperationUtil.isAllDeletedByMods(
pair.getLeft(),
iChunkMetadata.getMeasurementUid(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
iChunkMetadataIterator.remove();
}
}

if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) {
chunkMetadataIterator.remove();
continue;
}
Expand Down Expand Up @@ -267,6 +248,7 @@
dataTypeList = new ArrayList<>();
columnTypes = new ArrayList<>();
measurementList = new ArrayList<>();
fieldSchemaList = new ArrayList<>();

for (int i = 0; i < columnSchemaSize; i++) {
final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i);
Expand All @@ -280,6 +262,9 @@
measurementList.add(measurementName);
dataTypeList.add(schema.getType());
}
if (ColumnCategory.FIELD.equals(columnCategory)) {
fieldSchemaList.add(schema);
}
}
}
deviceIdSize = dataTypeList.size();
Expand Down Expand Up @@ -331,9 +316,9 @@
tablet =
new Tablet(
tableName,
measurementList,
dataTypeList,
columnTypes,
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(columnTypes),
rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
isFirstRow = false;
Expand Down Expand Up @@ -376,6 +361,20 @@
long size = timeChunkSize;

final List<Chunk> valueChunkList = new ArrayList<>();
final Map<String, IChunkMetadata> valueChunkMetadataMap =
alignedChunkMetadata.getValueChunkMetadataList().stream()
.filter(Objects::nonNull)
.filter(
metadata ->
!isFieldDeletedByMods(
metadata.getMeasurementUid(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime()))
.collect(
Collectors.toMap(
IChunkMetadata::getMeasurementUid,
metadata -> metadata,
(left, right) -> left));

// To ensure that the Tablet has the same alignedChunk column as the current one,
// you need to create a new Tablet to fill in the data.
Expand All @@ -392,50 +391,99 @@
measurementList.subList(deviceIdSize, measurementList.size()).clear();
dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();

for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) {
final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset);
boolean hasSelectedField = fieldSchemaList.isEmpty();
boolean hasSelectedNonNullChunk = false;
for (; offset < fieldSchemaList.size(); ++offset) {

Check warning on line 396 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ377Iy1BmOcEYzVqYQr&open=AZ377Iy1BmOcEYzVqYQr&pullRequest=17601
final IMeasurementSchema schema = fieldSchemaList.get(offset);
if (isFieldDeletedByMods(
schema.getMeasurementName(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime())) {
continue;
}

final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName());
Chunk chunk = null;
if (metadata != null) {
final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
if (valueChunkList.isEmpty()) {
chunk = reader.readMemChunk((ChunkMetadata) metadata);
final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
if (!hasSelectedNonNullChunk) {
// If the first chunk exceeds the memory limit, we need to allocate more memory
size = newSize;
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size);
columnTypes.add(ColumnCategory.FIELD);
measurementList.add(metadata.getMeasurementUid());
dataTypeList.add(metadata.getDataType());
valueChunkList.add(chunk);
++offset;
} else {
break;
}
break;
} else {
// Record the column information corresponding to Meta to fill in Tablet
columnTypes.add(ColumnCategory.FIELD);
measurementList.add(metadata.getMeasurementUid());
dataTypeList.add(metadata.getDataType());
valueChunkList.add(chunk);
size = newSize;
}
hasSelectedNonNullChunk = true;
}

columnTypes.add(ColumnCategory.FIELD);
measurementList.add(schema.getMeasurementName());
dataTypeList.add(schema.getType());
valueChunkList.add(chunk);
hasSelectedField = true;
}

if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
if (offset >= fieldSchemaList.size()) {
currentChunkMetadata = null;
}

if (!hasSelectedField) {
this.chunkReader = null;
this.batchData = null;
return;
}

this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
this.modsInfoList =
ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications);
}

private boolean areAllFieldsDeletedByMods(
final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) {
if (modifications.isEmpty() || fieldSchemaList.isEmpty()) {
return false;
}

for (final IMeasurementSchema schema : fieldSchemaList) {
if (!ModsOperationUtil.isAllDeletedByMods(
currentDeviceID,
schema.getMeasurementName(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
return false;
}
}
return true;
}

private boolean isFieldDeletedByMods(
final String measurementID, final long startTime, final long endTime) {
return !modifications.isEmpty()
&& ModsOperationUtil.isAllDeletedByMods(
deviceID, measurementID, startTime, endTime, modifications);
}

private boolean fillMeasurementValueColumns(

Check failure on line 472 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ377Iy1BmOcEYzVqYQs&open=AZ377Iy1BmOcEYzVqYQs&pullRequest=17601
final BatchData data, final Tablet tablet, final int rowIndex) {
final TsPrimitiveType[] primitiveTypes = data.getVector();
final TsPrimitiveType[] primitiveTypes =
Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0];
boolean needFillTime = false;
boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize;

for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
if (primitiveType == null
|| ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) {
final TsPrimitiveType primitiveType =
i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null;
final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i));
if (!isDeleted) {
hasNonDeletedField = true;
}
if (primitiveType == null || isDeleted) {
switch (dataTypeList.get(i)) {
case TEXT:
case BLOB:
Expand Down Expand Up @@ -480,7 +528,7 @@
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
}
}
return needFillTime;
return needFillTime || hasNonDeletedField;
}

private void fillDeviceIdColumns(
Expand Down
Loading