diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java index 54d202db253d..471b44221cf0 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.toolkit.cli.impl.command.nifi.pg; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.cli.MissingOptionException; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.toolkit.cli.api.Context; @@ -34,6 +36,7 @@ import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import java.io.File; import java.io.IOException; import java.util.Properties; import java.util.Set; @@ -43,6 +46,8 @@ */ public class PGImport extends AbstractNiFiCommand { + private final static ObjectMapper objectMapper = new ObjectMapper(); + public PGImport() { super("pg-import", StringResult.class); } @@ -59,6 +64,7 @@ public String getDescription() { @Override protected void doInitialize(Context context) { addOption(CommandOption.PG_ID.createOption()); + addOption(CommandOption.INPUT_SOURCE.createOption()); addOption(CommandOption.REGISTRY_CLIENT_ID.createOption()); addOption(CommandOption.BUCKET_ID.createOption()); addOption(CommandOption.FLOW_ID.createOption()); @@ -73,9 +79,11 @@ protected void doInitialize(Context context) { public StringResult doExecute(final NiFiClient client, final Properties properties) throws NiFiClientException, IOException, MissingOptionException { - final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID); - final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID); - final String flowVersion = getRequiredArg(properties, CommandOption.FLOW_VERSION); + final String inputSource = getArg(properties, CommandOption.INPUT_SOURCE); + + final String bucketId = getArg(properties, CommandOption.BUCKET_ID); + final String flowId = getArg(properties, CommandOption.FLOW_ID); + final String flowVersion = getArg(properties, CommandOption.FLOW_VERSION); final String flowBranch = getArg(properties, CommandOption.FLOW_BRANCH); final String posXStr = getArg(properties, CommandOption.POS_X); @@ -85,6 +93,35 @@ public StringResult doExecute(final NiFiClient client, final Properties properti final boolean posXExists = StringUtils.isNotBlank(posXStr); final boolean posYExists = StringUtils.isNotBlank(posYStr); + final File input = new File(inputSource); + + if (StringUtils.isBlank(inputSource)) { + if (StringUtils.isBlank(bucketId)) { + throw new IllegalArgumentException("Input path is not specified so Bucket ID must be specified"); + } + if (StringUtils.isBlank(flowId)) { + throw new IllegalArgumentException("Input path is not specified so Flow ID must be specified"); + } + if (StringUtils.isBlank(flowVersion)) { + throw new IllegalArgumentException("Input path is not specified so Flow Version must be specified"); + } + } else { + if (!input.exists() || !input.isFile() || !input.canRead()) { + throw new IllegalArgumentException("Specified input is not a local readable file: " + inputSource); + } + if (!StringUtils.isBlank(bucketId)) { + throw new IllegalArgumentException("Input path is specified so Bucket ID should not be specified"); + } + if (!StringUtils.isBlank(flowId)) { + throw new IllegalArgumentException("Input path is specified so Flow ID should not be specified"); + } + if (!StringUtils.isBlank(flowVersion)) { + throw new IllegalArgumentException("Input path is specified so Flow Version should not be specified"); + } + if (!StringUtils.isBlank(flowBranch)) { + throw new IllegalArgumentException("Input path is specified so Flow Branch should not be specified"); + } + } if ((posXExists && !posYExists)) { throw new IllegalArgumentException("Missing Y position - Please specify both X and Y, or specify neither"); @@ -100,27 +137,6 @@ public StringResult doExecute(final NiFiClient client, final Properties properti keepExistingPC = "true"; } - // if a registry client is specified use it, otherwise see if there is only one - // available and use that, - // if more than one is available then throw an exception because we don't know - // which one to use - String registryId = getArg(properties, CommandOption.REGISTRY_CLIENT_ID); - if (StringUtils.isBlank(registryId)) { - final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients(); - - final Set entities = registries.getRegistries(); - if (entities == null || entities.isEmpty()) { - throw new NiFiClientException("No registry clients available"); - } - - if (entities.size() == 1) { - registryId = entities.stream().findFirst().get().getId(); - } else { - throw new MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName() - + " must be provided when there is more than one available"); - } - } - // get the optional id of the parent PG, otherwise fallback to the root group String parentPgId = getArg(properties, CommandOption.PG_ID); if (StringUtils.isBlank(parentPgId)) { @@ -128,16 +144,6 @@ public StringResult doExecute(final NiFiClient client, final Properties properti parentPgId = flowClient.getRootGroupId(); } - final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO(); - versionControlInfo.setRegistryId(registryId); - versionControlInfo.setBucketId(bucketId); - versionControlInfo.setFlowId(flowId); - versionControlInfo.setVersion(flowVersion); - - if (StringUtils.isNotBlank(flowBranch)) { - versionControlInfo.setBranch(flowBranch); - } - final PositionDTO posDto = new PositionDTO(); if (posXExists && posYExists) { posDto.setX(Double.parseDouble(posXStr)); @@ -148,16 +154,58 @@ public StringResult doExecute(final NiFiClient client, final Properties properti posDto.setY(Integer.valueOf(pgBox.getY()).doubleValue()); } - final ProcessGroupDTO pgDto = new ProcessGroupDTO(); - pgDto.setVersionControlInformation(versionControlInfo); - pgDto.setPosition(posDto); + final ProcessGroupClient pgClient = client.getProcessGroupClient(); + ProcessGroupEntity createdEntity = null; + + if (StringUtils.isBlank(inputSource)) { + + // if a registry client is specified use it, otherwise see if there is only one + // available and use that, if more than one is available then throw an exception + // because we don't know which one to use + String registryId = getArg(properties, CommandOption.REGISTRY_CLIENT_ID); + if (StringUtils.isBlank(registryId)) { + final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients(); + + final Set entities = registries.getRegistries(); + if (entities == null || entities.isEmpty()) { + throw new NiFiClientException("No registry clients available"); + } + + if (entities.size() == 1) { + registryId = entities.stream().findFirst().get().getId(); + } else { + throw new MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName() + + " must be provided when there is more than one available"); + } + } - final ProcessGroupEntity pgEntity = new ProcessGroupEntity(); - pgEntity.setComponent(pgDto); - pgEntity.setRevision(getInitialRevisionDTO()); + final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO(); + versionControlInfo.setRegistryId(registryId); + versionControlInfo.setBucketId(bucketId); + versionControlInfo.setFlowId(flowId); + versionControlInfo.setVersion(flowVersion); + + if (StringUtils.isNotBlank(flowBranch)) { + versionControlInfo.setBranch(flowBranch); + } + + final ProcessGroupDTO pgDto = new ProcessGroupDTO(); + pgDto.setVersionControlInformation(versionControlInfo); + pgDto.setPosition(posDto); + + final ProcessGroupEntity pgEntity = new ProcessGroupEntity(); + pgEntity.setComponent(pgDto); + pgEntity.setRevision(getInitialRevisionDTO()); + + createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, Boolean.parseBoolean(keepExistingPC)); + + } else { + JsonNode rootNode = objectMapper.readTree(input); + JsonNode flowContentsNode = rootNode.path("flowContents"); + String pgName = flowContentsNode.path("name").asText(); + createdEntity = pgClient.upload(parentPgId, input, pgName, posDto.getX(), posDto.getY()); + } - final ProcessGroupClient pgClient = client.getProcessGroupClient(); - final ProcessGroupEntity createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, Boolean.parseBoolean(keepExistingPC)); return new StringResult(createdEntity.getId(), getContext().isInteractive()); } diff --git a/nifi-toolkit/nifi-toolkit-client/pom.xml b/nifi-toolkit/nifi-toolkit-client/pom.xml index 53cebb8f39cd..44b2d7ce8578 100644 --- a/nifi-toolkit/nifi-toolkit-client/pom.xml +++ b/nifi-toolkit/nifi-toolkit-client/pom.xml @@ -59,6 +59,10 @@ language governing permissions and limitations under the License. --> org.glassfish.jersey.core jersey-client + + org.glassfish.jersey.media + jersey-media-multipart + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java index db8b88208bc0..2d530eff05bb 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java @@ -75,4 +75,6 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp CopyResponseEntity copy(String processGroupId, CopyRequestEntity copyRequestEntity) throws NiFiClientException, IOException; PasteResponseEntity paste(String processGroupId, PasteRequestEntity pasteRequestEntity) throws NiFiClientException, IOException; + + ProcessGroupEntity upload(String parentPgId, File file, String pgName, Double posX, Double posY) throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java index ce0e1bf87120..2f2b7b0759db 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java @@ -36,12 +36,15 @@ import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessGroupImportEntity; import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity; +import org.glassfish.jersey.media.multipart.FormDataMultiPart; +import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.StandardCopyOption; +import java.util.UUID; /** * Jersey implementation of ProcessGroupClient. @@ -378,4 +381,35 @@ public PasteResponseEntity paste(String processGroupId, PasteRequestEntity paste PasteResponseEntity.class); }); } + + @Override + public ProcessGroupEntity upload(String parentPgId, File file, String pgName, Double posX, Double posY) throws NiFiClientException, IOException { + if (StringUtils.isBlank(parentPgId)) { + throw new IllegalArgumentException("Parent process group id cannot be null or blank"); + } + if (file == null) { + throw new IllegalArgumentException("File cannot be null"); + } + if (!file.exists() || !file.canRead()) { + throw new IllegalArgumentException("File does not exist: " + file.getAbsolutePath()); + } + + FormDataMultiPart form = new FormDataMultiPart(); + + form.field("id", parentPgId); + form.field("groupName", pgName); + form.field("positionX", Double.toString(posX)); + form.field("positionY", Double.toString(posY)); + form.field("clientId", UUID.randomUUID().toString()); + form.bodyPart(new FileDataBodyPart("file", file, MediaType.APPLICATION_JSON_TYPE)); + + return executeAction("Error uploading process group", () -> { + final WebTarget target = processGroupsTarget + .path("{id}/process-groups/upload") + .resolveTemplate("id", parentPgId); + return getRequestBuilder(target).post( + Entity.entity(form, MediaType.MULTIPART_FORM_DATA), + ProcessGroupEntity.class); + }); + } } \ No newline at end of file