Skip to content

Commit

Permalink
Pipe: validate all pipe parameters before alter (#13932)
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu authored Oct 29, 2024
1 parent fdc7175 commit d35da56
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
private PipeRuntimeMeta currentPipeRuntimeMeta;
private PipeRuntimeMeta updatedPipeRuntimeMeta;

private ProcedureType procedureType;
private final ProcedureType procedureType;

public AlterPipeProcedureV2(ProcedureType procedureType) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TFetchTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
Expand Down Expand Up @@ -257,6 +259,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -1906,21 +1909,102 @@ public SettableFuture<ConfigTaskResult> alterPipe(AlterPipeStatement alterPipeSt
return future;
}

// Validate pipe plugin before alteration - only validate replace mode
// Validate pipe existence
final PipeMeta pipeMetaFromCoordinator;
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TGetAllPipeInfoResp getAllPipeInfoResp = configNodeClient.getAllPipeInfo();
if (getAllPipeInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
String exceptionMessage =
String.format(
"Failed to get pipe info from config node, status is %s.",
getAllPipeInfoResp.getStatus());
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}

pipeMetaFromCoordinator =
getAllPipeInfoResp.getAllPipeInfo().stream()
.map(PipeMeta::deserialize)
.filter(
pipeMeta ->
pipeMeta
.getStaticMeta()
.getPipeName()
.equals(alterPipeStatement.getPipeName()))
.findFirst()
.orElse(null);
if (pipeMetaFromCoordinator == null) {
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, pipe not found in system.",
alterPipeStatement.getPipeName());
LOGGER.warn(exceptionMessage);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}
} catch (Exception e) {
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, because %s",
alterPipeStatement.getPipeName(), e.getMessage());
LOGGER.warn(exceptionMessage, e);
future.setException(
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
return future;
}

// Construct temporary pipe static meta for validation
final String pipeName = alterPipeStatement.getPipeName();
try {
if (!alterPipeStatement.getExtractorAttributes().isEmpty()
&& alterPipeStatement.isReplaceAllExtractorAttributes()) {
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
}
if (!alterPipeStatement.getProcessorAttributes().isEmpty()
&& alterPipeStatement.isReplaceAllProcessorAttributes()) {
PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
}
if (!alterPipeStatement.getConnectorAttributes().isEmpty()
&& alterPipeStatement.isReplaceAllConnectorAttributes()) {
PipeDataNodeAgent.plugin()
.validateConnector(pipeName, alterPipeStatement.getConnectorAttributes());
if (!alterPipeStatement.getExtractorAttributes().isEmpty()) {
if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
} else {
pipeMetaFromCoordinator
.getStaticMeta()
.getExtractorParameters()
.addOrReplaceEquivalentAttributes(
new PipeParameters(alterPipeStatement.getExtractorAttributes()));
PipeDataNodeAgent.plugin()
.validateExtractor(
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute());
}
}

if (!alterPipeStatement.getProcessorAttributes().isEmpty()) {
if (alterPipeStatement.isReplaceAllProcessorAttributes()) {
PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
} else {
pipeMetaFromCoordinator
.getStaticMeta()
.getProcessorParameters()
.addOrReplaceEquivalentAttributes(
new PipeParameters(alterPipeStatement.getProcessorAttributes()));
PipeDataNodeAgent.plugin()
.validateProcessor(
pipeMetaFromCoordinator.getStaticMeta().getProcessorParameters().getAttribute());
}
}

if (!alterPipeStatement.getConnectorAttributes().isEmpty()) {
if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
PipeDataNodeAgent.plugin()
.validateConnector(pipeName, alterPipeStatement.getConnectorAttributes());
} else {
pipeMetaFromCoordinator
.getStaticMeta()
.getConnectorParameters()
.addOrReplaceEquivalentAttributes(
new PipeParameters(alterPipeStatement.getConnectorAttributes()));
PipeDataNodeAgent.plugin()
.validateConnector(
pipeName,
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute());
}
}
} catch (Exception e) {
LOGGER.info("Failed to validate alter pipe statement, because {}", e.getMessage(), e);
Expand Down

0 comments on commit d35da56

Please sign in to comment.