Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@
private static final String CURRENT_FILE_DIR =
ConsensusManager.getConfigRegionDir() + File.separator + "current";
private static final String PROGRESS_FILE_PATH =
CURRENT_FILE_DIR + File.separator + "log_inprogress_";

Check failure on line 90 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "log_inprogress_" 3 times.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ38iLDgrmaxjvmVY4qH&open=AZ38iLDgrmaxjvmVY4qH&pullRequest=17609
private static final String FILE_PATH = CURRENT_FILE_DIR + File.separator + "log_";
private static final long LOG_FILE_MAX_SIZE =
CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax();
private final TEndPoint currentNodeTEndPoint;
private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+");
private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$");
private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("log_inprogress_(\\d+)$");
private static final Pattern LOG_PATTERN = Pattern.compile("log_(\\d+)_(\\d+)$");

public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) {
this.executor = executor;
Expand Down Expand Up @@ -121,6 +121,13 @@

/** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */
protected TSStatus write(ConfigPhysicalPlan plan) {
if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
final TSStatus persistStatus = persistPlanForSimpleConsensus(plan);
if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return persistStatus;
}
}

TSStatus result;
try {
result = executor.executeNonQueryPlan(plan);
Expand All @@ -129,10 +136,6 @@
result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}

if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
writeLogForSimpleConsensus(plan);
}

if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false);
}
Expand Down Expand Up @@ -197,22 +200,25 @@
PipeConfigNodeAgent.runtime()
.listener()
.tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots());
return true;
} catch (IOException e) {
if (PipeConfigNodeAgent.runtime().listener().isOpened()) {
LOGGER.warn(
"Config Region Listening Queue Listen to snapshot failed, the historical data may not be transferred.",
e);
}
}
return true;
}
return false;
}

@Override
public void loadSnapshot(final File latestSnapshotRootDir) {
if (!executor.loadSnapshot(latestSnapshotRootDir)) {
return;
}

try {
executor.loadSnapshot(latestSnapshotRootDir);
// We recompute the snapshot for pipe listener when loading snapshot
// to recover the newest snapshot in cache
PipeConfigNodeAgent.runtime()
Expand Down Expand Up @@ -342,6 +348,9 @@

@Override
public void stop() {
if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) {
closeSimpleLogWriter();
}
// Shutdown leader related service for config pipe
PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
}
Expand All @@ -351,56 +360,48 @@
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}

private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) {
if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
try {
simpleLogWriter.force();
File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex);
Files.move(
simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e);
private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) {
try {
if (simpleLogWriter == null || simpleLogFile == null) {
throw new IOException("SimpleConsensus log writer is not initialized.");
}
for (int retry = 0; retry < 5; retry++) {
try {
simpleLogWriter.close();
} catch (IOException e) {
LOGGER.warn(
"Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+ "filePath: {}, retry: {}",
simpleLogFile.getAbsolutePath(),
retry);
try {
// Sleep 1s and retry
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
LOGGER.warn("Unexpected interruption during the close method of logWriter");
}
continue;
}
break;

if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) {
rollSimpleConsensusLogFile();
}
startIndex = endIndex + 1;
createLogFile(startIndex);
}

try {
ByteBuffer buffer = plan.serializeToByteBuffer();
buffer.position(buffer.limit());
simpleLogWriter.write(buffer);
simpleLogWriter.force();

endIndex = endIndex + 1;
} catch (Exception e) {
LOGGER.error(
"Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e);
"Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed", e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(
"Persist ConfigNode SimpleConsensus log failed: " + String.valueOf(e.getMessage()));

Check warning on line 384 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Directly append the argument of String.valueOf().

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ38iLDgrmaxjvmVY4qG&open=AZ38iLDgrmaxjvmVY4qG&pullRequest=17609
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

private void rollSimpleConsensusLogFile() throws IOException {
simpleLogWriter.force();
closeSimpleLogWriter();
Files.move(
simpleLogFile.toPath(),
new File(FILE_PATH + startIndex + "_" + endIndex).toPath(),
StandardCopyOption.ATOMIC_MOVE);
startIndex = endIndex + 1;
createLogFile(startIndex);
}

private void initStandAloneConfigNode() {
File dir = new File(CURRENT_FILE_DIR);
dir.mkdirs();
String[] list = new File(CURRENT_FILE_DIR).list();
endIndex = 0;
if (list != null && list.length != 0) {
Arrays.sort(list, new FileComparator());
for (String logFileName : list) {
Expand All @@ -417,7 +418,7 @@
continue;
}

startIndex = endIndex;
final int recoveredStartIndex = parseStartIndex(logFileName);
while (logReader.hasNext()) {
endIndex++;
// Read and re-serialize the PhysicalPlan
Expand All @@ -435,13 +436,13 @@
}
}
logReader.close();
if (isInProgressLogFile(logFileName)) {
sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, endIndex);
}
}
} else {
startIndex = 0;
endIndex = 0;
}
startIndex = startIndex + 1;
createLogFile(endIndex);
startIndex = endIndex + 1;
createLogFile(startIndex);

ScheduledExecutorService simpleConsensusThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -482,26 +483,72 @@
}
}

private void sealRecoveredInProgressLogFile(
File logFile, int recoveredStartIndex, int recoveredEndIndex) {
try {
if (recoveredStartIndex > recoveredEndIndex) {
Files.deleteIfExists(logFile.toPath());
return;
}
Files.move(
logFile.toPath(),
new File(FILE_PATH + recoveredStartIndex + "_" + recoveredEndIndex).toPath(),
StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}", logFile, e);
}
}

private boolean isInProgressLogFile(String filename) {
return filename.startsWith("log_inprogress_");
}

private void closeSimpleLogWriter() {
if (simpleLogWriter == null) {
return;
}
for (int retry = 0; retry < 5; retry++) {
try {
simpleLogWriter.close();
simpleLogWriter = null;
return;
} catch (IOException e) {
LOGGER.warn(
"Can't close StandAloneLog for ConfigNode SimpleConsensus mode, "
+ "filePath: {}, retry: {}",
simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(),
retry);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
LOGGER.warn("Unexpected interruption during the close method of logWriter");
break;
}
}
}
}

static class FileComparator implements Comparator<String> {

@Override
public int compare(String filename1, String filename2) {
long id1 = parseEndIndex(filename1);
long id2 = parseEndIndex(filename2);
long id1 = parseStartIndex(filename1);
long id2 = parseStartIndex(filename2);
return Long.compare(id1, id2);
}
}

static long parseEndIndex(String filename) {
static int parseStartIndex(String filename) {
if (filename.startsWith("log_inprogress_")) {
Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename);
if (matcher.find()) {
return Long.parseLong(matcher.group());
return Integer.parseInt(matcher.group(1));
}
} else {
Matcher matcher = LOG_PATTERN.matcher(filename);
if (matcher.find()) {
return Long.parseLong(matcher.group());
return Integer.parseInt(matcher.group(1));
}
}
return 0;
Expand Down
Loading
Loading