diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index d4ab2a2cf5b7..ccdd0e76b312 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -63,7 +63,7 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { private PipeRuntimeMeta currentPipeRuntimeMeta; private PipeRuntimeMeta updatedPipeRuntimeMeta; - private ProcedureType procedureType; + private final ProcedureType procedureType; public AlterPipeProcedureV2(ProcedureType procedureType) { super(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index e0ec84948c1f..0092d0db07a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -38,6 +38,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.executable.ExecutableResource; import org.apache.iotdb.commons.path.MeasurementPath; @@ -45,6 +46,7 @@ import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader; import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; @@ -89,6 +91,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq; import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; @@ -257,6 +260,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; import org.apache.iotdb.db.trigger.service.TriggerClassLoader; import org.apache.iotdb.pipe.api.PipePlugin; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -1906,21 +1910,101 @@ public SettableFuture alterPipe(AlterPipeStatement alterPipeSt return future; } - // Validate pipe plugin before alteration - only validate replace mode + // Validate pipe existence + final PipeMeta pipeMetaFromCoordinator; + try (final ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TGetAllPipeInfoResp getAllPipeInfoResp = configNodeClient.getAllPipeInfo(); + if (getAllPipeInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new StartupException("Failed to get pipe task meta from config node."); + } + + pipeMetaFromCoordinator = + getAllPipeInfoResp.getAllPipeInfo().stream() + .map(PipeMeta::deserialize) + .filter( + pipeMeta -> + pipeMeta + .getStaticMeta() + .getPipeName() + .equals(alterPipeStatement.getPipeName())) + .findFirst() + .orElse(null); + if (pipeMetaFromCoordinator == null) { + final String exceptionMessage = + String.format( + "Failed to alter pipe %s, pipe not found in system.", + alterPipeStatement.getPipeName()); + LOGGER.warn(exceptionMessage); + future.setException( + new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode())); + return future; + } + } catch (Exception e) { + final String exceptionMessage = + String.format( + "Failed to alter pipe %s, because %s", + alterPipeStatement.getPipeName(), e.getMessage()); + LOGGER.warn(exceptionMessage, e); + future.setException( + new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode())); + return future; + } + + // Construct temporary pipe static meta for validation final String pipeName = alterPipeStatement.getPipeName(); try { - if (!alterPipeStatement.getExtractorAttributes().isEmpty() - && alterPipeStatement.isReplaceAllExtractorAttributes()) { - PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes()); - } - if (!alterPipeStatement.getProcessorAttributes().isEmpty() - && alterPipeStatement.isReplaceAllProcessorAttributes()) { - PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes()); - } - if (!alterPipeStatement.getConnectorAttributes().isEmpty() - && alterPipeStatement.isReplaceAllConnectorAttributes()) { - PipeDataNodeAgent.plugin() - .validateConnector(pipeName, alterPipeStatement.getConnectorAttributes()); + if (alterPipeStatement.isReplaceAllExtractorAttributes()) { + if (!alterPipeStatement.getExtractorAttributes().isEmpty()) { + PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes()); + } + } else { + if (!alterPipeStatement.getExtractorAttributes().isEmpty()) { + pipeMetaFromCoordinator + .getStaticMeta() + .getExtractorParameters() + .addOrReplaceEquivalentAttributes( + new PipeParameters(alterPipeStatement.getExtractorAttributes())); + PipeDataNodeAgent.plugin() + .validateExtractor( + pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute()); + } + } + + if (alterPipeStatement.isReplaceAllProcessorAttributes()) { + if (!alterPipeStatement.getProcessorAttributes().isEmpty()) { + PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes()); + } + } else { + if (!alterPipeStatement.getProcessorAttributes().isEmpty()) { + pipeMetaFromCoordinator + .getStaticMeta() + .getProcessorParameters() + .addOrReplaceEquivalentAttributes( + new PipeParameters(alterPipeStatement.getProcessorAttributes())); + PipeDataNodeAgent.plugin() + .validateProcessor( + pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters().getAttribute()); + } + } + + if (alterPipeStatement.isReplaceAllConnectorAttributes()) { + if (!alterPipeStatement.getConnectorAttributes().isEmpty()) { + PipeDataNodeAgent.plugin() + .validateConnector(pipeName, alterPipeStatement.getConnectorAttributes()); + } + } else { + if (!alterPipeStatement.getConnectorAttributes().isEmpty()) { + pipeMetaFromCoordinator + .getStaticMeta() + .getConnectorParameters() + .addOrReplaceEquivalentAttributes( + new PipeParameters(alterPipeStatement.getConnectorAttributes())); + PipeDataNodeAgent.plugin() + .validateConnector( + pipeName, + pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute()); + } } } catch (Exception e) { LOGGER.info("Failed to validate alter pipe statement, because {}", e.getMessage(), e);