Skip to content

Commit

Permalink
NIFI-14134 - add support for JSON file upload to CLI pg-import
Browse files Browse the repository at this point in the history
  • Loading branch information
pvillard31 committed Jan 7, 2025
1 parent 1ae31c3 commit b8f09ea
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.api.Context;
Expand All @@ -34,6 +36,7 @@
import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;

import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
Expand All @@ -43,6 +46,8 @@
*/
public class PGImport extends AbstractNiFiCommand<StringResult> {

private final static ObjectMapper objectMapper = new ObjectMapper();

public PGImport() {
super("pg-import", StringResult.class);
}
Expand All @@ -59,6 +64,7 @@ public String getDescription() {
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PG_ID.createOption());
addOption(CommandOption.INPUT_SOURCE.createOption());
addOption(CommandOption.REGISTRY_CLIENT_ID.createOption());
addOption(CommandOption.BUCKET_ID.createOption());
addOption(CommandOption.FLOW_ID.createOption());
Expand All @@ -73,9 +79,11 @@ protected void doInitialize(Context context) {
public StringResult doExecute(final NiFiClient client, final Properties properties)
throws NiFiClientException, IOException, MissingOptionException {

final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID);
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
final String flowVersion = getRequiredArg(properties, CommandOption.FLOW_VERSION);
final String inputSource = getArg(properties, CommandOption.INPUT_SOURCE);

final String bucketId = getArg(properties, CommandOption.BUCKET_ID);
final String flowId = getArg(properties, CommandOption.FLOW_ID);
final String flowVersion = getArg(properties, CommandOption.FLOW_VERSION);
final String flowBranch = getArg(properties, CommandOption.FLOW_BRANCH);

final String posXStr = getArg(properties, CommandOption.POS_X);
Expand All @@ -85,6 +93,35 @@ public StringResult doExecute(final NiFiClient client, final Properties properti

final boolean posXExists = StringUtils.isNotBlank(posXStr);
final boolean posYExists = StringUtils.isNotBlank(posYStr);
final File input = new File(inputSource);

if (StringUtils.isBlank(inputSource)) {
if (StringUtils.isBlank(bucketId)) {
throw new IllegalArgumentException("Input path is not specified so Bucket ID must be specified");
}
if (StringUtils.isBlank(flowId)) {
throw new IllegalArgumentException("Input path is not specified so Flow ID must be specified");
}
if (StringUtils.isBlank(flowVersion)) {
throw new IllegalArgumentException("Input path is not specified so Flow Version must be specified");
}
} else {
if (!input.exists() || !input.isFile() || !input.canRead()) {
throw new IllegalArgumentException("Specified input is not a local readable file: " + inputSource);
}
if (!StringUtils.isBlank(bucketId)) {
throw new IllegalArgumentException("Input path is specified so Bucket ID should not be specified");
}
if (!StringUtils.isBlank(flowId)) {
throw new IllegalArgumentException("Input path is specified so Flow ID should not be specified");
}
if (!StringUtils.isBlank(flowVersion)) {
throw new IllegalArgumentException("Input path is specified so Flow Version should not be specified");
}
if (!StringUtils.isBlank(flowBranch)) {
throw new IllegalArgumentException("Input path is specified so Flow Branch should not be specified");
}
}

if ((posXExists && !posYExists)) {
throw new IllegalArgumentException("Missing Y position - Please specify both X and Y, or specify neither");
Expand All @@ -100,44 +137,13 @@ public StringResult doExecute(final NiFiClient client, final Properties properti
keepExistingPC = "true";
}

// if a registry client is specified use it, otherwise see if there is only one
// available and use that,
// if more than one is available then throw an exception because we don't know
// which one to use
String registryId = getArg(properties, CommandOption.REGISTRY_CLIENT_ID);
if (StringUtils.isBlank(registryId)) {
final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients();

final Set<FlowRegistryClientEntity> entities = registries.getRegistries();
if (entities == null || entities.isEmpty()) {
throw new NiFiClientException("No registry clients available");
}

if (entities.size() == 1) {
registryId = entities.stream().findFirst().get().getId();
} else {
throw new MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
+ " must be provided when there is more than one available");
}
}

// get the optional id of the parent PG, otherwise fallback to the root group
String parentPgId = getArg(properties, CommandOption.PG_ID);
if (StringUtils.isBlank(parentPgId)) {
final FlowClient flowClient = client.getFlowClient();
parentPgId = flowClient.getRootGroupId();
}

final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
versionControlInfo.setRegistryId(registryId);
versionControlInfo.setBucketId(bucketId);
versionControlInfo.setFlowId(flowId);
versionControlInfo.setVersion(flowVersion);

if (StringUtils.isNotBlank(flowBranch)) {
versionControlInfo.setBranch(flowBranch);
}

final PositionDTO posDto = new PositionDTO();
if (posXExists && posYExists) {
posDto.setX(Double.parseDouble(posXStr));
Expand All @@ -148,16 +154,58 @@ public StringResult doExecute(final NiFiClient client, final Properties properti
posDto.setY(Integer.valueOf(pgBox.getY()).doubleValue());
}

final ProcessGroupDTO pgDto = new ProcessGroupDTO();
pgDto.setVersionControlInformation(versionControlInfo);
pgDto.setPosition(posDto);
final ProcessGroupClient pgClient = client.getProcessGroupClient();
ProcessGroupEntity createdEntity = null;

if (StringUtils.isBlank(inputSource)) {

// if a registry client is specified use it, otherwise see if there is only one
// available and use that, if more than one is available then throw an exception
// because we don't know which one to use
String registryId = getArg(properties, CommandOption.REGISTRY_CLIENT_ID);
if (StringUtils.isBlank(registryId)) {
final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients();

final Set<FlowRegistryClientEntity> entities = registries.getRegistries();
if (entities == null || entities.isEmpty()) {
throw new NiFiClientException("No registry clients available");
}

if (entities.size() == 1) {
registryId = entities.stream().findFirst().get().getId();
} else {
throw new MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
+ " must be provided when there is more than one available");
}
}

final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
pgEntity.setComponent(pgDto);
pgEntity.setRevision(getInitialRevisionDTO());
final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
versionControlInfo.setRegistryId(registryId);
versionControlInfo.setBucketId(bucketId);
versionControlInfo.setFlowId(flowId);
versionControlInfo.setVersion(flowVersion);

if (StringUtils.isNotBlank(flowBranch)) {
versionControlInfo.setBranch(flowBranch);
}

final ProcessGroupDTO pgDto = new ProcessGroupDTO();
pgDto.setVersionControlInformation(versionControlInfo);
pgDto.setPosition(posDto);

final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
pgEntity.setComponent(pgDto);
pgEntity.setRevision(getInitialRevisionDTO());

createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, Boolean.parseBoolean(keepExistingPC));

} else {
JsonNode rootNode = objectMapper.readTree(input);
JsonNode flowContentsNode = rootNode.path("flowContents");
String pgName = flowContentsNode.path("name").asText();
createdEntity = pgClient.upload(parentPgId, input, pgName, posDto.getX(), posDto.getY());
}

final ProcessGroupClient pgClient = client.getProcessGroupClient();
final ProcessGroupEntity createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, Boolean.parseBoolean(keepExistingPC));
return new StringResult(createdEntity.getId(), getContext().isInteractive());
}

Expand Down
4 changes: 4 additions & 0 deletions nifi-toolkit/nifi-toolkit-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ language governing permissions and limitations under the License. -->
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp
CopyResponseEntity copy(String processGroupId, CopyRequestEntity copyRequestEntity) throws NiFiClientException, IOException;

PasteResponseEntity paste(String processGroupId, PasteRequestEntity pasteRequestEntity) throws NiFiClientException, IOException;

ProcessGroupEntity upload(String parentPgId, File file, String pgName, Double posX, Double posY) throws NiFiClientException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.UUID;

/**
* Jersey implementation of ProcessGroupClient.
Expand Down Expand Up @@ -378,4 +381,35 @@ public PasteResponseEntity paste(String processGroupId, PasteRequestEntity paste
PasteResponseEntity.class);
});
}

@Override
public ProcessGroupEntity upload(String parentPgId, File file, String pgName, Double posX, Double posY) throws NiFiClientException, IOException {
if (StringUtils.isBlank(parentPgId)) {
throw new IllegalArgumentException("Parent process group id cannot be null or blank");
}
if (file == null) {
throw new IllegalArgumentException("File cannot be null");
}
if (!file.exists() || !file.canRead()) {
throw new IllegalArgumentException("File does not exist: " + file.getAbsolutePath());
}

FormDataMultiPart form = new FormDataMultiPart();

form.field("id", parentPgId);
form.field("groupName", pgName);
form.field("positionX", Double.toString(posX));
form.field("positionY", Double.toString(posY));
form.field("clientId", UUID.randomUUID().toString());
form.bodyPart(new FileDataBodyPart("file", file, MediaType.APPLICATION_JSON_TYPE));

return executeAction("Error uploading process group", () -> {
final WebTarget target = processGroupsTarget
.path("{id}/process-groups/upload")
.resolveTemplate("id", parentPgId);
return getRequestBuilder(target).post(
Entity.entity(form, MediaType.MULTIPART_FORM_DATA),
ProcessGroupEntity.class);
});
}
}

0 comments on commit b8f09ea

Please sign in to comment.