From b6e14b7663d3e791b58196fc73a528c6187f1191 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 30 Apr 2026 19:00:55 +0800 Subject: [PATCH 1/3] Fixed the bug that create attribute does not support attribute.None & Pipe: Reset tablet pipeDataStructureTabletSizeInBytes to 16MB & Enable stopping exception restart by manual stop pipe (#17588) * stop pipe * 16 * fix * legacy --- .../consensus/request/ConfigPhysicalPlan.java | 4 + .../request/ConfigPhysicalPlanType.java | 1 + ...usWithStoppedByRuntimeExceptionPlanV2.java | 109 ++++++++++++++++++ .../coordinator/task/PipeTaskCoordinator.java | 16 +-- .../executor/ConfigPlanExecutor.java | 4 + .../confignode/persistence/pipe/PipeInfo.java | 20 ++++ .../persistence/pipe/PipeTaskInfo.java | 20 ++++ .../impl/pipe/task/StopPipeProcedureV2.java | 26 ++++- .../request/ConfigPhysicalPlanSerDeTest.java | 25 +++- .../pipe/task/StopPipeProcedureV2Test.java | 105 +++++++++++++++++ .../iotdb/commons/conf/CommonConfig.java | 4 +- 11 files changed, 313 insertions(+), 21 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index b7a0936ca450f..06c10c3972395 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -73,6 +73,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; @@ -354,6 +355,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case SetPipeStatusV2: plan = new SetPipeStatusPlanV2(); break; + case SetPipeStatusWithStoppedByRuntimeExceptionV2: + plan = new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(); + break; case DropPipeV2: plan = new DropPipePlanV2(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index 24e5b9ba9c631..b986299596675 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -217,6 +217,7 @@ public enum ConfigPhysicalPlanType { ShowPipeV2((short) 1503), AlterPipeV2((short) 1504), OperateMultiplePipesV2((short) 1505), + SetPipeStatusWithStoppedByRuntimeExceptionV2((short) 1506), /** Pipe Runtime. */ PipeHandleLeaderChange((short) 1600), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java new file mode 100644 index 0000000000000..35ee503d5c5fc --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/task/SetPipeStatusWithStoppedByRuntimeExceptionPlanV2.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.write.pipe.task; + +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 extends ConfigPhysicalPlan { + + private String pipeName; + private PipeStatus status; + private boolean stoppedByRuntimeException; + + public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2() { + super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2); + } + + public SetPipeStatusWithStoppedByRuntimeExceptionPlanV2( + final String pipeName, final PipeStatus status, final boolean stoppedByRuntimeException) { + super(ConfigPhysicalPlanType.SetPipeStatusWithStoppedByRuntimeExceptionV2); + this.pipeName = pipeName; + this.status = status; + this.stoppedByRuntimeException = stoppedByRuntimeException; + } + + public String getPipeName() { + return pipeName; + } + + public PipeStatus getPipeStatus() { + return status; + } + + public boolean isStoppedByRuntimeException() { + return stoppedByRuntimeException; + } + + @Override + protected void serializeImpl(final DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + ReadWriteIOUtils.write(pipeName, stream); + ReadWriteIOUtils.write(status.getType(), stream); + ReadWriteIOUtils.write(stoppedByRuntimeException, stream); + } + + @Override + protected void deserializeImpl(final ByteBuffer buffer) throws IOException { + pipeName = ReadWriteIOUtils.readString(buffer); + status = PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(buffer)); + stoppedByRuntimeException = ReadWriteIOUtils.readBool(buffer); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 that = + (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) obj; + return stoppedByRuntimeException == that.stoppedByRuntimeException + && pipeName.equals(that.pipeName) + && status.equals(that.status); + } + + @Override + public int hashCode() { + return Objects.hash(pipeName, status, stoppedByRuntimeException); + } + + @Override + public String toString() { + return "SetPipeStatusWithStoppedByRuntimeExceptionPlanV2{" + + "pipeName='" + + pipeName + + "', status='" + + status + + "', stoppedByRuntimeException='" + + stoppedByRuntimeException + + "'}"; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index ee1ccd2a8fba4..8473865f49b63 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -118,21 +118,11 @@ public TSStatus startPipe(String pipeName) { return status; } + /** Caller should ensure that the method is called in the lock {@link #lock()}. */ - public TSStatus stopPipe(String pipeName) { - final boolean isStoppedByRuntimeException = pipeTaskInfo.isStoppedByRuntimeException(pipeName); + private TSStatus stopPipe(String pipeName) { final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - if (isStoppedByRuntimeException) { - // Even if the return status is success, it doesn't imply the success of the - // `executeFromOperateOnDataNodes` phase of stopping pipe. However, we still need to set - // `isStoppedByRuntimeException` to false to avoid auto-restart. Meanwhile, - // `isStoppedByRuntimeException` does not need to be synchronized with DNs. - LOGGER.info("Pipe {} has stopped manually, stop its auto restart process.", pipeName); - pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName); - configManager.getProcedureManager().pipeHandleMetaChange(true, false); - } - } else { + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn("Failed to stop pipe {}. Result status: {}.", pipeName, status); } return status; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 4facb308d8a2c..09017a841c73c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -98,6 +98,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; @@ -497,6 +498,9 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan); case SetPipeStatusV2: return pipeInfo.setPipeStatus((SetPipeStatusPlanV2) physicalPlan); + case SetPipeStatusWithStoppedByRuntimeExceptionV2: + return pipeInfo.setPipeStatusWithStoppedByRuntimeException( + (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) physicalPlan); case DropPipeV2: return pipeInfo.dropPipe((DropPipePlanV2) physicalPlan); case AlterPipeV2: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index d09e5c82845a0..032534f9161e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener; import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask; @@ -124,6 +125,25 @@ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) { } } + public TSStatus setPipeStatusWithStoppedByRuntimeException( + final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) { + try { + pipeTaskInfo.setPipeStatusWithStoppedByRuntimeException(plan); + + PipeConfigNodeAgent.task() + .handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName())); + PipeTemporaryMetaInCoordinatorMetrics.getInstance() + .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } catch (final Exception e) { + LOGGER.error("Failed to set pipe status with stopped-by-runtime-exception flag", e); + return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) + .setMessage( + "Failed to set pipe status with stopped-by-runtime-exception flag, because " + + e.getMessage()); + } + } + public TSStatus dropPipe(final DropPipePlanV2 plan) { try { final Optional pipeMetaBeforeDrop = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 2bfdbf9e49a3b..91586c9c91497 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -48,6 +48,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleMetaChangeProcedure; @@ -523,6 +524,25 @@ public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) { } } + public TSStatus setPipeStatusWithStoppedByRuntimeException( + final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 plan) { + acquireWriteLock(); + try { + pipeMetaKeeper + .getPipeMeta(plan.getPipeName()) + .getRuntimeMeta() + .getStatus() + .set(plan.getPipeStatus()); + pipeMetaKeeper + .getPipeMeta(plan.getPipeName()) + .getRuntimeMeta() + .setIsStoppedByRuntimeException(plan.isStoppedByRuntimeException()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } finally { + releaseWriteLock(); + } + } + public TSStatus dropPipe(final DropPipePlanV2 plan) { acquireWriteLock(); try { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index 5349cc65640d0..b2e1a584ec54a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -21,7 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; -import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; @@ -44,6 +44,7 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { private static final Logger LOGGER = LoggerFactory.getLogger(StopPipeProcedureV2.class); private String pipeName; + private boolean isStoppedByRuntimeExceptionBeforeStop; public StopPipeProcedureV2() { super(); @@ -71,7 +72,8 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeEx @Override public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) throws PipeException { LOGGER.info("StopPipeProcedureV2: executeFromCalculateInfoForTask({})", pipeName); - // Do nothing + isStoppedByRuntimeExceptionBeforeStop = + pipeTaskInfo.get().isStoppedByRuntimeException(pipeName); } @Override @@ -83,7 +85,9 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) thro response = env.getConfigManager() .getConsensusManager() - .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); + .write( + new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2( + pipeName, PipeStatus.STOPPED, false)); } catch (ConsensusException e) { LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e); response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -128,7 +132,9 @@ public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { response = env.getConfigManager() .getConsensusManager() - .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); + .write( + new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2( + pipeName, PipeStatus.RUNNING, isStoppedByRuntimeExceptionBeforeStop)); } catch (ConsensusException e) { LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e); response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -159,12 +165,16 @@ public void serialize(DataOutputStream stream) throws IOException { stream.writeShort(ProcedureType.STOP_PIPE_PROCEDURE_V2.getTypeCode()); super.serialize(stream); ReadWriteIOUtils.write(pipeName, stream); + ReadWriteIOUtils.write(isStoppedByRuntimeExceptionBeforeStop, stream); } @Override public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); pipeName = ReadWriteIOUtils.readString(byteBuffer); + // Legacy persisted procedures do not carry this field. + isStoppedByRuntimeExceptionBeforeStop = + byteBuffer.hasRemaining() && ReadWriteIOUtils.readBool(byteBuffer); } @Override @@ -179,11 +189,17 @@ public boolean equals(Object o) { return getProcId() == that.getProcId() && getCurrentState().equals(that.getCurrentState()) && getCycles() == that.getCycles() + && isStoppedByRuntimeExceptionBeforeStop == that.isStoppedByRuntimeExceptionBeforeStop && pipeName.equals(that.pipeName); } @Override public int hashCode() { - return Objects.hash(getProcId(), getCurrentState(), getCycles(), pipeName); + return Objects.hash( + getProcId(), + getCurrentState(), + getCycles(), + pipeName, + isStoppedByRuntimeExceptionBeforeStop); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index be2ba4b696017..3b0554baa11fa 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -95,6 +95,7 @@ import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; @@ -902,6 +903,28 @@ public void SetPipeStatusPlanV2Test() throws IOException { Assert.assertEquals(setPipeStatusPlanV2.getPipeStatus(), setPipeStatusPlanV21.getPipeStatus()); } + @Test + public void SetPipeStatusWithStoppedByRuntimeExceptionPlanV2Test() throws IOException { + final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 + setPipeStatusWithStoppedByRuntimeExceptionPlanV2 = + new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2( + "pipe", org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.STOPPED, true); + final SetPipeStatusWithStoppedByRuntimeExceptionPlanV2 + setPipeStatusWithStoppedByRuntimeExceptionPlanV21 = + (SetPipeStatusWithStoppedByRuntimeExceptionPlanV2) + ConfigPhysicalPlan.Factory.create( + setPipeStatusWithStoppedByRuntimeExceptionPlanV2.serializeToByteBuffer()); + Assert.assertEquals( + setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeName(), + setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeName()); + Assert.assertEquals( + setPipeStatusWithStoppedByRuntimeExceptionPlanV2.getPipeStatus(), + setPipeStatusWithStoppedByRuntimeExceptionPlanV21.getPipeStatus()); + Assert.assertEquals( + setPipeStatusWithStoppedByRuntimeExceptionPlanV2.isStoppedByRuntimeException(), + setPipeStatusWithStoppedByRuntimeExceptionPlanV21.isStoppedByRuntimeException()); + } + @Test public void DropPipePlanV2Test() throws IOException { DropPipePlanV2 dropPipePlanV2 = new DropPipePlanV2("demo"); @@ -944,7 +967,7 @@ public void OperateMultiplePipesPlanV2Test() throws IOException { new SetPipeStatusPlanV2( "testSet", org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus.RUNNING); - List subPlans = new ArrayList<>(); + final List subPlans = new ArrayList<>(); subPlans.add(createPipePlanV2); subPlans.add(alterPipePlanV2); subPlans.add(dropPipePlanV2); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java index e6d676ea27b1c..e3da356059b5e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2Test.java @@ -19,18 +19,73 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusWithStoppedByRuntimeExceptionPlanV2; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.PublicBAOS; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.io.DataOutputStream; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class StopPipeProcedureV2Test { + + private static class TestStopPipeProcedureV2 extends StopPipeProcedureV2 { + + private TestStopPipeProcedureV2(final String pipeName) throws PipeException { + super(pipeName); + } + + private void setPipeTaskInfo(final PipeTaskInfo pipeTaskInfo) { + this.pipeTaskInfo = new AtomicReference<>(pipeTaskInfo); + } + } + + private static PipeTaskInfo createExceptionStoppedPipeTaskInfo(final String pipeName) { + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + final ConcurrentMap pipeTasks = new ConcurrentHashMap<>(); + pipeTasks.put(1, pipeTaskMeta); + + final PipeStaticMeta pipeStaticMeta = + new PipeStaticMeta( + pipeName, + System.currentTimeMillis(), + Collections.singletonMap("extractor", "iotdb-source"), + Collections.singletonMap("processor", "do-nothing-processor"), + Collections.singletonMap("connector", "iotdb-thrift-connector")); + final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); + pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED); + pipeRuntimeMeta.setIsStoppedByRuntimeException(true); + + pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); + return pipeTaskInfo; + } + @Test public void serializeDeserializeTest() { PublicBAOS byteArrayOutputStream = new PublicBAOS(); @@ -50,4 +105,54 @@ public void serializeDeserializeTest() { fail(); } } + + @Test + public void serializeDeserializeLegacyFormatTest() { + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + final StopPipeProcedureV2 proc = new StopPipeProcedureV2("testPipe"); + + try { + proc.serialize(outputStream); + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size() - 1); + final StopPipeProcedureV2 proc2 = + (StopPipeProcedureV2) ProcedureFactory.getInstance().create(buffer); + + assertEquals(proc, proc2); + } catch (Exception e) { + fail(); + } + } + + @Test + public void testStopPipeWritesStatusAndRuntimeExceptionFlagAtomically() throws Exception { + final String pipeName = "testPipe"; + final TestStopPipeProcedureV2 proc = new TestStopPipeProcedureV2(pipeName); + proc.setPipeTaskInfo(createExceptionStoppedPipeTaskInfo(pipeName)); + proc.executeFromCalculateInfoForTask(Mockito.mock(ConfigNodeProcedureEnv.class)); + + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.write(Mockito.any(ConfigPhysicalPlan.class))) + .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + + proc.executeFromWriteConfigNodeConsensus(env); + proc.rollbackFromWriteConfigNodeConsensus(env); + + final ArgumentCaptor planCaptor = + ArgumentCaptor.forClass(ConfigPhysicalPlan.class); + Mockito.verify(consensusManager, Mockito.times(2)).write(planCaptor.capture()); + + assertEquals( + new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(pipeName, PipeStatus.STOPPED, false), + planCaptor.getAllValues().get(0)); + assertEquals( + new SetPipeStatusWithStoppedByRuntimeExceptionPlanV2(pipeName, PipeStatus.RUNNING, true), + planCaptor.getAllValues().get(1)); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index ec9ce06fd1d41..22e282d85a83f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -215,8 +215,8 @@ public class CommonConfig { private int pipeDataStructureTabletRowSize = 2048; - // 128MB - private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024; + // 16MB + private int pipeDataStructureTabletSizeInBytes = 16 * 1024 * 1024; private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3; private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3; private volatile double pipeTotalFloatingMemoryProportion = 0.5; From 6ed6f4a138b23a72539d2a9ffe57fa6a4b1ea856 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 14:45:14 +0800 Subject: [PATCH 2/3] Update PipeTaskCoordinator.java --- .../manager/pipe/coordinator/task/PipeTaskCoordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index 8473865f49b63..7137f9ff73e07 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -118,7 +118,6 @@ public TSStatus startPipe(String pipeName) { return status; } - /** Caller should ensure that the method is called in the lock {@link #lock()}. */ private TSStatus stopPipe(String pipeName) { final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName); From 39a91f0a1c2250b7faedf3da7b5f5873f3cb15b9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 17:53:16 +0800 Subject: [PATCH 3/3] fix-access --- .../manager/pipe/coordinator/task/PipeTaskCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index 7137f9ff73e07..f3430db3932fe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -119,7 +119,7 @@ public TSStatus startPipe(String pipeName) { } /** Caller should ensure that the method is called in the lock {@link #lock()}. */ - private TSStatus stopPipe(String pipeName) { + public TSStatus stopPipe(String pipeName) { final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn("Failed to stop pipe {}. Result status: {}.", pipeName, status);