Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-14134 - add support for JSON file upload to CLI pg-import #9611

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.api.Context;
Expand All @@ -34,6 +36,7 @@
import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;

import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
Expand All @@ -43,6 +46,8 @@
*/
public class PGImport extends AbstractNiFiCommand<StringResult> {

private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public PGImport() {
super("pg-import", StringResult.class);
}
Expand All @@ -59,6 +64,7 @@ public String getDescription() {
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PG_ID.createOption());
addOption(CommandOption.INPUT_SOURCE.createOption());
addOption(CommandOption.REGISTRY_CLIENT_ID.createOption());
addOption(CommandOption.BUCKET_ID.createOption());
addOption(CommandOption.FLOW_ID.createOption());
Expand All @@ -73,9 +79,11 @@ protected void doInitialize(Context context) {
public StringResult doExecute(final NiFiClient client, final Properties properties)
throws NiFiClientException, IOException, MissingOptionException {

final String bucketId = getRequiredArg(properties, CommandOption.BUCKET_ID);
final String flowId = getRequiredArg(properties, CommandOption.FLOW_ID);
final String flowVersion = getRequiredArg(properties, CommandOption.FLOW_VERSION);
final String inputSource = getArg(properties, CommandOption.INPUT_SOURCE);

final String bucketId = getArg(properties, CommandOption.BUCKET_ID);
final String flowId = getArg(properties, CommandOption.FLOW_ID);
final String flowVersion = getArg(properties, CommandOption.FLOW_VERSION);
final String flowBranch = getArg(properties, CommandOption.FLOW_BRANCH);

final String posXStr = getArg(properties, CommandOption.POS_X);
Expand All @@ -85,6 +93,35 @@ public StringResult doExecute(final NiFiClient client, final Properties properti

final boolean posXExists = StringUtils.isNotBlank(posXStr);
final boolean posYExists = StringUtils.isNotBlank(posYStr);
final File input = new File(inputSource);

if (StringUtils.isBlank(inputSource)) {
if (StringUtils.isBlank(bucketId)) {
throw new IllegalArgumentException("Input path is not specified so Bucket ID must be specified");
}
if (StringUtils.isBlank(flowId)) {
throw new IllegalArgumentException("Input path is not specified so Flow ID must be specified");
}
if (StringUtils.isBlank(flowVersion)) {
throw new IllegalArgumentException("Input path is not specified so Flow Version must be specified");
}
} else {
if (!input.exists() || !input.isFile() || !input.canRead()) {
throw new IllegalArgumentException("Specified input is not a local readable file: " + inputSource);
}
if (StringUtils.isNotBlank(bucketId)) {
throw new IllegalArgumentException("Input path is specified so Bucket ID should not be specified");
}
if (StringUtils.isNotBlank(flowId)) {
throw new IllegalArgumentException("Input path is specified so Flow ID should not be specified");
}
if (StringUtils.isNotBlank(flowVersion)) {
throw new IllegalArgumentException("Input path is specified so Flow Version should not be specified");
}
if (StringUtils.isNotBlank(flowBranch)) {
throw new IllegalArgumentException("Input path is specified so Flow Branch should not be specified");
}
}

if ((posXExists && !posYExists)) {
throw new IllegalArgumentException("Missing Y position - Please specify both X and Y, or specify neither");
Expand All @@ -100,44 +137,13 @@ public StringResult doExecute(final NiFiClient client, final Properties properti
keepExistingPC = "true";
}

// if a registry client is specified use it, otherwise see if there is only one
// available and use that,
// if more than one is available then throw an exception because we don't know
// which one to use
String registryId = getArg(properties, CommandOption.REGISTRY_CLIENT_ID);
if (StringUtils.isBlank(registryId)) {
final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients();

final Set<FlowRegistryClientEntity> entities = registries.getRegistries();
if (entities == null || entities.isEmpty()) {
throw new NiFiClientException("No registry clients available");
}

if (entities.size() == 1) {
registryId = entities.stream().findFirst().get().getId();
} else {
throw new MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
+ " must be provided when there is more than one available");
}
}

// get the optional id of the parent PG, otherwise fallback to the root group
String parentPgId = getArg(properties, CommandOption.PG_ID);
if (StringUtils.isBlank(parentPgId)) {
final FlowClient flowClient = client.getFlowClient();
parentPgId = flowClient.getRootGroupId();
}

final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
versionControlInfo.setRegistryId(registryId);
versionControlInfo.setBucketId(bucketId);
versionControlInfo.setFlowId(flowId);
versionControlInfo.setVersion(flowVersion);

if (StringUtils.isNotBlank(flowBranch)) {
versionControlInfo.setBranch(flowBranch);
}

final PositionDTO posDto = new PositionDTO();
if (posXExists && posYExists) {
posDto.setX(Double.parseDouble(posXStr));
Expand All @@ -148,16 +154,58 @@ public StringResult doExecute(final NiFiClient client, final Properties properti
posDto.setY(Integer.valueOf(pgBox.getY()).doubleValue());
}

final ProcessGroupDTO pgDto = new ProcessGroupDTO();
pgDto.setVersionControlInformation(versionControlInfo);
pgDto.setPosition(posDto);
final ProcessGroupClient pgClient = client.getProcessGroupClient();
ProcessGroupEntity createdEntity = null;

if (StringUtils.isBlank(inputSource)) {

// if a registry client is specified use it, otherwise see if there is only one
// available and use that, if more than one is available then throw an exception
// because we don't know which one to use
String registryId = getArg(properties, CommandOption.REGISTRY_CLIENT_ID);
if (StringUtils.isBlank(registryId)) {
final FlowRegistryClientsEntity registries = client.getControllerClient().getRegistryClients();

final Set<FlowRegistryClientEntity> entities = registries.getRegistries();
if (entities == null || entities.isEmpty()) {
throw new NiFiClientException("No registry clients available");
}

if (entities.size() == 1) {
registryId = entities.stream().findFirst().get().getId();
} else {
throw new MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
+ " must be provided when there is more than one available");
}
}

final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
pgEntity.setComponent(pgDto);
pgEntity.setRevision(getInitialRevisionDTO());
final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO();
versionControlInfo.setRegistryId(registryId);
versionControlInfo.setBucketId(bucketId);
versionControlInfo.setFlowId(flowId);
versionControlInfo.setVersion(flowVersion);

if (StringUtils.isNotBlank(flowBranch)) {
versionControlInfo.setBranch(flowBranch);
}

final ProcessGroupDTO pgDto = new ProcessGroupDTO();
pgDto.setVersionControlInformation(versionControlInfo);
pgDto.setPosition(posDto);

final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
pgEntity.setComponent(pgDto);
pgEntity.setRevision(getInitialRevisionDTO());

createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, Boolean.parseBoolean(keepExistingPC));

} else {
JsonNode rootNode = OBJECT_MAPPER.readTree(input);
JsonNode flowContentsNode = rootNode.path("flowContents");
String pgName = flowContentsNode.path("name").asText();
createdEntity = pgClient.upload(parentPgId, input, pgName, posDto.getX(), posDto.getY());
}

final ProcessGroupClient pgClient = client.getProcessGroupClient();
final ProcessGroupEntity createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, Boolean.parseBoolean(keepExistingPC));
return new StringResult(createdEntity.getId(), getContext().isInteractive());
}

Expand Down
4 changes: 4 additions & 0 deletions nifi-toolkit/nifi-toolkit-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ language governing permissions and limitations under the License. -->
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ FlowEntity copySnippet(String processGroupId, CopySnippetRequestEntity copySnipp
CopyResponseEntity copy(String processGroupId, CopyRequestEntity copyRequestEntity) throws NiFiClientException, IOException;

PasteResponseEntity paste(String processGroupId, PasteRequestEntity pasteRequestEntity) throws NiFiClientException, IOException;

ProcessGroupEntity upload(String parentPgId, File file, String pgName, Double posX, Double posY) throws NiFiClientException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.UUID;

/**
* Jersey implementation of ProcessGroupClient.
Expand Down Expand Up @@ -378,4 +381,35 @@ public PasteResponseEntity paste(String processGroupId, PasteRequestEntity paste
PasteResponseEntity.class);
});
}

@Override
public ProcessGroupEntity upload(String parentPgId, File file, String pgName, Double posX, Double posY) throws NiFiClientException, IOException {
if (StringUtils.isBlank(parentPgId)) {
throw new IllegalArgumentException("Parent process group id cannot be null or blank");
}
if (file == null) {
throw new IllegalArgumentException("File cannot be null");
}
if (!file.exists() || !file.canRead()) {
throw new IllegalArgumentException("Specified file is not a local readable file: " + file.getAbsolutePath());
}

FormDataMultiPart form = new FormDataMultiPart();

form.field("id", parentPgId);
form.field("groupName", pgName);
form.field("positionX", Double.toString(posX));
form.field("positionY", Double.toString(posY));
form.field("clientId", UUID.randomUUID().toString());
form.bodyPart(new FileDataBodyPart("file", file, MediaType.APPLICATION_JSON_TYPE));

return executeAction("Error uploading process group", () -> {
final WebTarget target = processGroupsTarget
.path("{id}/process-groups/upload")
.resolveTemplate("id", parentPgId);
return getRequestBuilder(target).post(
Entity.entity(form, MediaType.MULTIPART_FORM_DATA),
ProcessGroupEntity.class);
});
}
}
Loading