From 9b33d54f53d9730a7891e913e466b7ab0148a188 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 31 Oct 2023 17:21:59 -0400 Subject: [PATCH 1/6] NIFI-12301 Corrected hasProperty() check when Migrating Properties When calling migrateProperties, provide the properties that were configured in the VersionedComponent, rather than creating a new property map based on the component's new properties This closes #7964 Signed-off-by: David Handermann --- .../controller/StandardProcessorNode.java | 16 +++-- .../reporting/AbstractReportingTaskNode.java | 11 ++- .../StandardControllerServiceNode.java | 10 ++- ...tandardVersionedComponentSynchronizer.java | 72 ++++++------------- .../StandardControllerServiceFactory.java | 2 +- .../apache/nifi/controller/ProcessorNode.java | 2 +- .../nifi/controller/ReportingTaskNode.java | 3 +- .../service/ControllerServiceNode.java | 2 +- .../VersionedFlowSynchronizer.java | 15 ++-- .../tests/system/MigrateProperties.java | 15 +++- .../tests/system/MigrateProperties.java | 1 + .../system/migration/PropertyMigrationIT.java | 5 ++ 12 files changed, 81 insertions(+), 73 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 3781655bf36af..28f45cf82ce57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -88,9 +88,9 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.ThreadUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; -import org.springframework.scheduling.support.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.support.CronExpression; import java.lang.management.ThreadInfo; import java.lang.reflect.InvocationTargetException; @@ -1111,7 +1111,6 @@ protected Collection computeValidationErrors(final ValidationC } @Override - @SuppressWarnings("deprecation") public List validateConfig() { final List results = new ArrayList<>(); @@ -2078,9 +2077,9 @@ public void onConfigurationRestored(final ProcessContext context) { } @Override - public void migrateConfiguration(final ControllerServiceFactory serviceFactory) { + public void migrateConfiguration(final Map rawPropertyValues, final ControllerServiceFactory serviceFactory) { try { - migrateProperties(serviceFactory); + migrateProperties(rawPropertyValues, serviceFactory); } catch (final Exception e) { LOG.error("Failed to migrate Property Configuration for {}.", this, e); } @@ -2092,11 +2091,14 @@ public void migrateConfiguration(final ControllerServiceFactory serviceFactory) } } - private void migrateProperties(final ControllerServiceFactory serviceFactory) { + private void migrateProperties(final Map originalPropertyValues, final ControllerServiceFactory serviceFactory) { final Processor processor = getProcessor(); - final StandardPropertyConfiguration propertyConfig = new StandardPropertyConfiguration(toPropertyNameMap(getEffectivePropertyValues()), - toPropertyNameMap(getRawPropertyValues()), this::mapRawValueToEffectiveValue, toString(), serviceFactory); + final Map effectiveValues = new HashMap<>(); + originalPropertyValues.forEach((key, value) -> effectiveValues.put(key, mapRawValueToEffectiveValue(value))); + + final StandardPropertyConfiguration propertyConfig = new StandardPropertyConfiguration(effectiveValues, + originalPropertyValues, this::mapRawValueToEffectiveValue, toString(), serviceFactory); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), processor.getClass(), getIdentifier())) { processor.migrateProperties(propertyConfig); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index b18db120c7788..6ea63a26e0765 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -65,7 +65,9 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -428,11 +430,14 @@ public Optional getParentProcessGroup() { } @Override - public void migrateConfiguration(final ControllerServiceFactory serviceFactory) { + public void migrateConfiguration(final Map originalPropertyValues, final ControllerServiceFactory serviceFactory) { final ReportingTask task = getReportingTask(); - final StandardPropertyConfiguration propertyConfig = new StandardPropertyConfiguration(toPropertyNameMap(getEffectivePropertyValues()), - toPropertyNameMap(getRawPropertyValues()), this::mapRawValueToEffectiveValue, toString(), serviceFactory); + final Map effectiveValues = new HashMap<>(); + originalPropertyValues.forEach((key, value) -> effectiveValues.put(key, mapRawValueToEffectiveValue(value))); + + final StandardPropertyConfiguration propertyConfig = new StandardPropertyConfiguration(effectiveValues, + originalPropertyValues, this::mapRawValueToEffectiveValue, toString(), serviceFactory); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), task.getClass(), getIdentifier())) { task.migrateProperties(propertyConfig); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index d1a112b8354d8..f6231c5e3d9bb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -75,6 +75,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -836,9 +837,12 @@ public void notifyPrimaryNodeChanged(final PrimaryNodeState nodeState) { } @Override - public void migrateConfiguration(final ControllerServiceFactory serviceFactory) { - final StandardPropertyConfiguration propertyConfig = new StandardPropertyConfiguration(toPropertyNameMap(getEffectivePropertyValues()), - toPropertyNameMap(getRawPropertyValues()), super::mapRawValueToEffectiveValue, toString(), serviceFactory); + public void migrateConfiguration(final Map originalPropertyValues, final ControllerServiceFactory serviceFactory) { + final Map effectiveValues = new HashMap<>(); + originalPropertyValues.forEach((key, value) -> effectiveValues.put(key, mapRawValueToEffectiveValue(value))); + + final StandardPropertyConfiguration propertyConfig = new StandardPropertyConfiguration(effectiveValues, + originalPropertyValues, super::mapRawValueToEffectiveValue, toString(), serviceFactory); final ControllerService implementation = getControllerServiceImplementation(); try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), implementation.getClass(), getIdentifier())) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 17f10cee157df..184f53aa0f486 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -75,6 +75,7 @@ import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.FlowSynchronizationOptions; +import org.apache.nifi.groups.FlowSynchronizationOptions.ComponentStopTimeoutAction; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.PropertyDecryptor; import org.apache.nifi.groups.RemoteProcessGroup; @@ -93,11 +94,7 @@ import org.apache.nifi.parameter.StandardParameterProviderConfiguration; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.registry.flow.FlowRegistryClientContextFactory; import org.apache.nifi.registry.flow.FlowRegistryClientNode; -import org.apache.nifi.registry.flow.FlowRegistryException; -import org.apache.nifi.registry.flow.FlowSnapshotContainer; -import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowState; @@ -119,11 +116,9 @@ import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FlowDifferenceFilters; -import org.apache.nifi.web.ResourceNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URL; import java.time.Duration; import java.util.ArrayList; @@ -155,7 +150,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private final VersionedFlowSynchronizationContext context; private final Set updatedVersionedComponentIds = new HashSet<>(); - private final Set createdExtensions = new HashSet<>(); + private final List createdExtensions = new ArrayList<>(); private FlowSynchronizationOptions syncOptions; private final ConnectableAdditionTracker connectableAdditionTracker = new ConnectableAdditionTracker(); @@ -251,16 +246,19 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve } }); - for (final ComponentNode extension : createdExtensions) { + for (final CreatedExtension createdExtension : createdExtensions) { + final ComponentNode extension = createdExtension.extension(); + final Map originalPropertyValues = createdExtension.propertyValues(); + final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(context.getExtensionManager(), context.getFlowManager(), context.getControllerServiceProvider(), extension); if (extension instanceof final ProcessorNode processor) { - processor.migrateConfiguration(serviceFactory); + processor.migrateConfiguration(originalPropertyValues, serviceFactory); } else if (extension instanceof final ControllerServiceNode service) { - service.migrateConfiguration(serviceFactory); + service.migrateConfiguration(originalPropertyValues, serviceFactory); } else if (extension instanceof final ReportingTaskNode task) { - task.migrateConfiguration(serviceFactory); + task.migrateConfiguration(originalPropertyValues, serviceFactory); } } @@ -1195,7 +1193,7 @@ private ControllerServiceNode addControllerService(final ProcessGroup destinatio } updateControllerService(newService, proposed, topLevelGroup); - createdExtensions.add(newService); + createdExtensions.add(new CreatedExtension(newService, proposed.getProperties())); return newService; } @@ -2115,29 +2113,6 @@ private BundleCoordinate toCoordinate(final Bundle bundle) { return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion()); } - private Map getVersionedParameterContexts(final VersionedFlowCoordinates versionedFlowCoordinates) { - final String registryId = determineRegistryId(versionedFlowCoordinates); - final FlowRegistryClientNode flowRegistry = context.getFlowManager().getFlowRegistryClient(registryId); - if (flowRegistry == null) { - throw new ResourceNotFoundException("Could not find any Flow Registry registered with identifier " + registryId); - } - - final String bucketId = versionedFlowCoordinates.getBucketId(); - final String flowId = versionedFlowCoordinates.getFlowId(); - final int flowVersion = versionedFlowCoordinates.getVersion(); - - try { - final FlowSnapshotContainer snapshotContainer = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getAnonymousContext(), bucketId, flowId, flowVersion, false); - final RegisteredFlowSnapshot childSnapshot = snapshotContainer.getFlowSnapshot(); - return childSnapshot.getParameterContexts(); - } catch (final FlowRegistryException e) { - throw new IllegalArgumentException("The Flow Registry with ID " + registryId + " reports that no Flow exists with Bucket " - + bucketId + ", Flow " + flowId + ", Version " + flowVersion, e); - } catch (final IOException ioe) { - throw new IllegalStateException("Failed to communicate with Flow Registry when attempting to retrieve a versioned flow"); - } - } - @Override public void synchronize(final Funnel funnel, final VersionedFunnel proposed, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException, InterruptedException { @@ -2404,7 +2379,7 @@ private ProcessorNode addProcessor(final ProcessGroup destination, final Version destination.addProcessor(procNode); updateProcessor(procNode, proposed, topLevelGroup); - createdExtensions.add(procNode); + createdExtensions.add(new CreatedExtension(procNode, proposed.getProperties())); // Notify the processor node that the configuration (properties, e.g.) has been restored final ProcessContext processContext = context.getProcessContextFactory().apply(procNode); @@ -2573,6 +2548,7 @@ private void notifyScheduledStateChange(final ComponentNode component, final Flo if (intendedState == org.apache.nifi.flow.ScheduledState.RUNNING && reportingTaskNode.getScheduledState() == ScheduledState.DISABLED) { return; } + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reportingTaskNode, intendedState); } } catch (final Exception e) { @@ -2637,14 +2613,12 @@ private boolean stopOrTerminate(final ProcessorNode processor, final long timeou return stopProcessor(processor, timeout); } catch (final TimeoutException te) { - switch (synchronizationOptions.getComponentStopTimeoutAction()) { - case THROW_TIMEOUT_EXCEPTION: - throw te; - case TERMINATE: - default: - processor.terminate(); - return true; + if (synchronizationOptions.getComponentStopTimeoutAction() == ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION) { + throw te; } + + processor.terminate(); + return true; } finally { notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); } @@ -3309,9 +3283,7 @@ private Connectable getConnectable(final ProcessGroup group, final ConnectableCo } final Optional addedComponent = connectableAdditionTracker.getComponent(group.getIdentifier(), connectableComponent.getId()); - if (addedComponent.isPresent()) { - LOG.debug("Found Connectable in Process Group {} as newly added component {}", group, addedComponent.get()); - } + addedComponent.ifPresent(value -> LOG.debug("Found Connectable in Process Group {} as newly added component {}", group, value)); return addedComponent.orElse(null); } @@ -3402,7 +3374,7 @@ private Connectable getConnectable(final ProcessGroup group, final ConnectableCo NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())))) .findAny(); - if (!rpgOption.isPresent()) { + if (rpgOption.isEmpty()) { throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID " + rpgId + " but could not find a Remote Process Group corresponding to that ID"); } @@ -3428,7 +3400,7 @@ private Connectable getConnectable(final ProcessGroup group, final ConnectableCo NiFiRegistryFlowMapper.generateVersionedComponentId(component.getIdentifier())))) .findAny(); - if (!rpgOption.isPresent()) { + if (rpgOption.isEmpty()) { throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID " + rpgId + " but could not find a Remote Process Group corresponding to that ID"); } @@ -3487,7 +3459,7 @@ private ReportingTaskNode addReportingTask(final VersionedReportingTask reportin final BundleCoordinate coordinate = toCoordinate(reportingTask.getBundle()); final ReportingTaskNode taskNode = context.getFlowManager().createReportingTask(reportingTask.getType(), reportingTask.getInstanceIdentifier(), coordinate, false); updateReportingTask(taskNode, reportingTask); - createdExtensions.add(taskNode); + createdExtensions.add(new CreatedExtension(taskNode, reportingTask.getProperties())); return taskNode; } @@ -3732,4 +3704,6 @@ private ControllerServiceNode getVersionedControllerService(final ProcessGroup g return getVersionedControllerService(group.getParent(), versionedComponentId); } + private record CreatedExtension(ComponentNode extension, Map propertyValues) { + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/migration/StandardControllerServiceFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/migration/StandardControllerServiceFactory.java index dcd2a3de650f4..956d8c0b8b400 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/migration/StandardControllerServiceFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/migration/StandardControllerServiceFactory.java @@ -98,7 +98,7 @@ public ControllerServiceNode create(final ControllerServiceCreationDetails creat serviceNode.setProperties(creationDetails.serviceProperties()); final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(extensionManager, flowManager, serviceProvider, serviceNode); - serviceNode.migrateConfiguration(serviceFactory); + serviceNode.migrateConfiguration(creationDetails.serviceProperties(), serviceFactory); if (isEnable()) { final ValidationStatus validationStatus = serviceNode.performValidation(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index fa5b130a371bf..be53eabf81592 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -298,6 +298,6 @@ protected void performFlowAnalysisOnThis() { public abstract void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState); - public abstract void migrateConfiguration(ControllerServiceFactory serviceFactory); + public abstract void migrateConfiguration(Map originalPropertyValues, ControllerServiceFactory serviceFactory); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 548476f967aad..e7096bf60beeb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -28,6 +28,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -138,5 +139,5 @@ public interface ReportingTaskNode extends ComponentNode { void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState, LifecycleState lifecycleState); - void migrateConfiguration(ControllerServiceFactory controllerServiceFactory); + void migrateConfiguration(Map originalPropertyValues, ControllerServiceFactory controllerServiceFactory); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 40f3be083d96d..02dafdfc54207 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -257,6 +257,6 @@ void setControllerServiceAndProxy(final LoggableComponent imp void notifyPrimaryNodeChanged(PrimaryNodeState primaryNodeState); - void migrateConfiguration(ControllerServiceFactory serviceFactory); + void migrateConfiguration(Map originalPropertyValues, ControllerServiceFactory serviceFactory); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index 0aaaafc73c10c..d4b5c78ab4718 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -616,7 +616,7 @@ private void addReportingTask(final FlowController controller, final VersionedRe final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(controller.getExtensionManager(), controller.getFlowManager(), controller.getControllerServiceProvider(), taskNode); - taskNode.migrateConfiguration(serviceFactory); + taskNode.migrateConfiguration(reportingTask.getProperties(), serviceFactory); } private void updateReportingTask(final ReportingTaskNode taskNode, final VersionedReportingTask reportingTask, final FlowController controller) { @@ -957,26 +957,29 @@ private void inheritControllerServices(final FlowController controller, final Ve // Service B's references won't be updated. To avoid this, we create them all first, and then configure/update // them so that when AbstractComponentNode#setProperty is called, it properly establishes that reference. final List controllerServices = dataflow.getControllerServices(); - final Set controllerServicesAdded = new HashSet<>(); + final Map> controllerServicesAddedAndProperties = new HashMap<>(); for (final VersionedControllerService versionedControllerService : controllerServices) { final ControllerServiceNode serviceNode = flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier()); if (serviceNode == null) { final ControllerServiceNode added = addRootControllerService(controller, versionedControllerService); - controllerServicesAdded.add(added); + controllerServicesAddedAndProperties.put(added, versionedControllerService.getProperties()); } } for (final VersionedControllerService versionedControllerService : controllerServices) { final ControllerServiceNode serviceNode = flowManager.getRootControllerService(versionedControllerService.getInstanceIdentifier()); - if (controllerServicesAdded.contains(serviceNode) || affectedComponentSet.isControllerServiceAffected(serviceNode.getIdentifier())) { + if (controllerServicesAddedAndProperties.containsKey(serviceNode) || affectedComponentSet.isControllerServiceAffected(serviceNode.getIdentifier())) { updateRootControllerService(serviceNode, versionedControllerService, controller.getEncryptor()); } } - for (final ControllerServiceNode service : controllerServicesAdded) { + for (final Map.Entry> entry : controllerServicesAddedAndProperties.entrySet()) { + final ControllerServiceNode service = entry.getKey(); + final Map originalPropertyValues = entry.getValue(); + final ControllerServiceFactory serviceFactory = new StandardControllerServiceFactory(controller.getExtensionManager(), controller.getFlowManager(), controller.getControllerServiceProvider(), service); - service.migrateConfiguration(serviceFactory); + service.migrateConfiguration(originalPropertyValues, serviceFactory); } for (final VersionedControllerService versionedControllerService : controllerServices) { diff --git a/nifi-system-tests/nifi-alternate-config-extensions-bundle/nifi-alternate-config-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java b/nifi-system-tests/nifi-alternate-config-extensions-bundle/nifi-alternate-config-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java index cfa974166aa35..af7e9dadd339f 100644 --- a/nifi-system-tests/nifi-alternate-config-extensions-bundle/nifi-alternate-config-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java +++ b/nifi-system-tests/nifi-alternate-config-extensions-bundle/nifi-alternate-config-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.tests.system; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.cs.tests.system.MigrationService; import org.apache.nifi.flowfile.FlowFile; @@ -63,6 +64,13 @@ public class MigrateProperties extends AbstractProcessor { .identifiesControllerService(ControllerService.class) .build(); + static PropertyDescriptor DEPRECATED = new PropertyDescriptor.Builder() + .name("Deprecated") + .required(false) + .addValidator(Validator.VALID) + .defaultValue("Deprecated Value") + .build(); + static Relationship REL_ODD = new Relationship.Builder().name("odd").build(); static Relationship REL_EVEN = new Relationship.Builder().name("even").build(); static Relationship REL_BROKEN = new Relationship.Builder().name("broken").build(); @@ -72,7 +80,8 @@ public class MigrateProperties extends AbstractProcessor { INGEST, ATTRIBUTE_NAME, ATTRIBUTE_VALUE, - SERVICE + SERVICE, + DEPRECATED ); private final AtomicLong counter = new AtomicLong(0L); @@ -98,6 +107,10 @@ public void migrateProperties(final PropertyConfiguration config) { final String ignoredValue = config.getPropertyValue("ignored").orElse(null); config.removeProperty("ignored"); + if (config.hasProperty("Deprecated")) { + config.setProperty("Deprecated Found", "true"); + } + // If the 'ignored' value was set, create a new Controller Service whose Start value is set to that value. if (ignoredValue != null && ignoredValue.matches("\\d+")) { final String serviceId = config.createControllerService(MigrationService.class.getName(), Map.of("Start", ignoredValue)); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java index 6f677656feec5..beac6d1260e9c 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/MigrateProperties.java @@ -62,6 +62,7 @@ public class MigrateProperties extends AbstractProcessor { .addValidator(Validator.VALID) .build(); + static Relationship REL_SUCCESS = new Relationship.Builder().name("success").build(); static Relationship REL_FAILURE = new Relationship.Builder().name("failure").build(); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/migration/PropertyMigrationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/migration/PropertyMigrationIT.java index 45344cdc6a2c2..55f52d5ef7b2a 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/migration/PropertyMigrationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/migration/PropertyMigrationIT.java @@ -45,6 +45,7 @@ import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -105,6 +106,9 @@ public void testControllerServiceCreated() throws NiFiClientException, IOExcepti final String serviceId = propertiesMap.get(SERVICE); assertNotNull(serviceId); serviceIds.add(serviceId); + + assertEquals("Deprecated Value", propertiesMap.get("Deprecated")); + assertFalse(propertiesMap.containsKey("Deprecated Found")); } // Should be 3 different services @@ -191,6 +195,7 @@ public void testPropertyMigration() throws NiFiClientException, IOException { expectedUpdatedProperties.put("Attribute Value", "Hi"); expectedUpdatedProperties.put("New Property", "true"); expectedUpdatedProperties.put("Service", null); + expectedUpdatedProperties.put("Deprecated", "Deprecated Value"); assertEquals(expectedUpdatedProperties, updatedProperties); final ProcessorConfigDTO updatedConfig = updated.getComponent().getConfig(); From 2c0ff6f6249c04b7121113ce199761747364eb5d Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Wed, 1 Nov 2023 13:14:45 +0100 Subject: [PATCH 2/6] NIFI-12303 Removed deprecated Consumer Hostname property from ConsumeAzureEventHub This closes #7966 Signed-off-by: David Handermann --- .../processors/azure/eventhub/ConsumeAzureEventHub.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 87391728768ad..a3aef2affd521 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -148,14 +148,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .defaultValue("$Default") .required(true) .build(); - static final PropertyDescriptor CONSUMER_HOSTNAME = new PropertyDescriptor.Builder() - .name("event-hub-consumer-hostname") - .displayName("Consumer Hostname") - .description("DEPRECATED: This property is no longer used.") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .required(false) - .build(); static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() .name("record-reader") @@ -291,7 +283,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, - CONSUMER_HOSTNAME, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, From 75c661bbbe56a7951974a701921af9da74dd0d68 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Mon, 30 Oct 2023 15:15:52 -0400 Subject: [PATCH 3/6] NIFI-12194 Added Yield on Exceptions in Kafka Processors - Catching KafkaException and yielding for publisher lease requests improves behavior when the Processor is unable to connect to Kafka Brokers This closes #7955 Signed-off-by: David Handermann --- .../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 6 ++++-- .../processors/kafka/pubsub/ConsumeKafka_2_6.java | 6 ++++-- .../kafka/pubsub/PublishKafkaRecord_2_6.java | 13 ++++++++++++- .../processors/kafka/pubsub/PublishKafka_2_6.java | 13 ++++++++++++- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index 525f621e1feae..50fece3b35181 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -540,9 +540,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + "Will roll back session and discard any partially received data.", lease); } catch (final KafkaException kex) { - getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex); + getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex); + context.yield(); } catch (final Throwable t) { - getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t); + getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t); + context.yield(); } finally { activeLeases.remove(lease); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index 4421ae92f88ce..a5c6b15891fab 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -483,9 +483,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + "Will roll back session and discard any partially received data.", lease); } catch (final KafkaException kex) { - getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex); + getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex); + context.yield(); } catch (final Throwable t) { - getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t); + getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t); + context.yield(); } finally { activeLeases.remove(lease); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java index af61faeb9598b..34053d6a3b1f8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka.pubsub; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; @@ -505,7 +506,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final long startTime = System.nanoTime(); - try (final PublisherLease lease = pool.obtainPublisher()) { + try (final PublisherLease lease = obtainPublisher(context, pool)) { try { if (useTransactions) { lease.beginTransaction(); @@ -588,6 +589,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) { + try { + return pool.obtainPublisher(); + } catch (final KafkaException e) { + getLogger().error("Failed to obtain Kafka Producer", e); + context.yield(); + throw e; + } + } + private Function getPartitioner(final ProcessContext context, final FlowFile flowFile) { final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java index b2721a7199051..b6b84ce1e0359 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java @@ -20,6 +20,7 @@ import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; @@ -439,7 +440,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final PublishFailureStrategy failureStrategy = getFailureStrategy(context); final long startTime = System.nanoTime(); - try (final PublisherLease lease = pool.obtainPublisher()) { + try (final PublisherLease lease = obtainPublisher(context, pool)) { try { if (useTransactions) { lease.beginTransaction(); @@ -512,6 +513,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) { + try { + return pool.obtainPublisher(); + } catch (final KafkaException e) { + getLogger().error("Failed to obtain Kafka Producer", e); + context.yield(); + throw e; + } + } + private PublishFailureStrategy getFailureStrategy(final ProcessContext context) { final String strategy = context.getProperty(FAILURE_STRATEGY).getValue(); if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) { From df3f5b4638ba457dd78524765d918b590080eb99 Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Tue, 31 Oct 2023 20:06:19 +0000 Subject: [PATCH 4/6] NIFI-12299 Run integration-tests when assemblies are updated This closes #7962 Signed-off-by: David Handermann --- .github/workflows/integration-tests.yml | 6 ++++++ pom.xml | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 9717970d4eb8d..e0d5eb04c16d4 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -22,12 +22,18 @@ on: push: paths: - '.github/workflows/integration-tests.yml' + - 'pom.xml' + - '**/*-assembly/pom.xml' + - '**/*-bom/pom.xml' - '**/test/**/*IT.java' - '**/test/**/*ITCase.java' - '**/test/**/IT*.java' pull_request: paths: - '.github/workflows/integration-tests.yml' + - 'pom.xml' + - '**/*-assembly/pom.xml' + - '**/*-bom/pom.xml' - '**/test/**/*IT.java' - '**/test/**/*ITCase.java' - '**/test/**/IT*.java' diff --git a/pom.xml b/pom.xml index 7f37c854479c3..5b3e9b77d0bed 100644 --- a/pom.xml +++ b/pom.xml @@ -1200,7 +1200,10 @@ !ITPreventProxiedAnonymousAccess, !ITPreventDirectAnonymousAccess, !ITAllowDirectAnonymousAccess, - !ITProcessGroupAccessControl + !ITProcessGroupAccessControl, + !StatelessNiFiSinkTaskIT#testSimpleFlow, + !StatelessNiFiSinkTaskIT#testParameters, + !StatelessNiFiSinkTaskIT#testWrongOutputPort From 945d8b54bce63d7454238e11e08d073b88748378 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 1 Nov 2023 13:10:00 -0500 Subject: [PATCH 5/6] NIFI-12294 Standardized NAR Entry Loading (#7958) - Consolidated duplicative NAR file entry normalization --- .../org/apache/nifi/nar/NarUnpackerTest.java | 49 ++++----- .../java/org/apache/nifi/nar/NarUnpacker.java | 99 ++++++++++--------- 2 files changed, 72 insertions(+), 76 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java index beb673f0a8d1e..5733d1bb6033e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -44,17 +45,17 @@ public class NarUnpackerTest { + private static final String PROPERTIES_PATH = "/NarUnpacker/conf/nifi.properties"; + @BeforeAll public static void copyResources() throws IOException { - final Path sourcePath = Paths.get("./src/test/resources"); final Path targetPath = Paths.get("./target"); - Files.walkFileTree(sourcePath, new SimpleFileVisitor() { + Files.walkFileTree(sourcePath, new SimpleFileVisitor<>() { @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) - throws IOException { + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { Path relativeSource = sourcePath.relativize(dir); Path target = targetPath.resolve(relativeSource); @@ -62,12 +63,10 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) Files.createDirectories(target); return FileVisitResult.CONTINUE; - } @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) - throws IOException { + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Path relativeSource = sourcePath.relativize(file); Path target = targetPath.resolve(relativeSource); @@ -81,8 +80,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) @Test public void testUnpackNars() { - - NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties", Collections.EMPTY_MAP); + NiFiProperties properties = loadSpecifiedProperties(Collections.emptyMap()); assertEquals("./target/NarUnpacker/lib/", properties.getProperty("nifi.nar.library.directory")); @@ -93,10 +91,10 @@ public void testUnpackNars() { assertEquals(2, extensionMapping.getAllExtensionNames().size()); - assertTrue(extensionMapping.getAllExtensionNames().keySet().contains("org.apache.nifi.processors.dummy.one")); - assertTrue(extensionMapping.getAllExtensionNames().keySet().contains("org.apache.nifi.processors.dummy.two")); + assertTrue(extensionMapping.getAllExtensionNames().containsKey("org.apache.nifi.processors.dummy.one")); + assertTrue(extensionMapping.getAllExtensionNames().containsKey("org.apache.nifi.processors.dummy.two")); final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); - File[] extensionFiles = extensionsWorkingDir.listFiles(); + File[] extensionFiles = Objects.requireNonNull(extensionsWorkingDir.listFiles()); Set expectedNars = new HashSet<>(); expectedNars.add("dummy-one.nar-unpacked"); @@ -110,24 +108,22 @@ public void testUnpackNars() { } @Test - public void testUnpackNarsFromEmptyDir() throws IOException { - + public void testUnpackNarsFromEmptyDir() { final File emptyDir = new File("./target/empty/dir"); - emptyDir.delete(); emptyDir.deleteOnExit(); assertTrue(emptyDir.mkdirs()); final Map others = new HashMap<>(); others.put("nifi.nar.library.directory.alt", emptyDir.toString()); - NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties", others); + NiFiProperties properties = loadSpecifiedProperties(others); final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, SystemBundle.create(properties), NarUnpackMode.UNPACK_INDIVIDUAL_JARS); assertEquals(1, extensionMapping.getAllExtensionNames().size()); - assertTrue(extensionMapping.getAllExtensionNames().keySet().contains("org.apache.nifi.processors.dummy.one")); + assertTrue(extensionMapping.getAllExtensionNames().containsKey("org.apache.nifi.processors.dummy.one")); final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); - File[] extensionFiles = extensionsWorkingDir.listFiles(); + File[] extensionFiles = Objects.requireNonNull(extensionsWorkingDir.listFiles()); assertEquals(2, extensionFiles.length); @@ -139,23 +135,21 @@ public void testUnpackNarsFromEmptyDir() throws IOException { @Test public void testUnpackNarsFromNonExistantDir() { - final File nonExistantDir = new File("./target/this/dir/should/not/exist/"); - nonExistantDir.delete(); nonExistantDir.deleteOnExit(); final Map others = new HashMap<>(); others.put("nifi.nar.library.directory.alt", nonExistantDir.toString()); - NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties", others); + NiFiProperties properties = loadSpecifiedProperties(others); final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, SystemBundle.create(properties), NarUnpackMode.UNPACK_INDIVIDUAL_JARS); - assertTrue(extensionMapping.getAllExtensionNames().keySet().contains("org.apache.nifi.processors.dummy.one")); + assertTrue(extensionMapping.getAllExtensionNames().containsKey("org.apache.nifi.processors.dummy.one")); assertEquals(1, extensionMapping.getAllExtensionNames().size()); final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); - File[] extensionFiles = extensionsWorkingDir.listFiles(); + File[] extensionFiles = Objects.requireNonNull(extensionsWorkingDir.listFiles()); assertEquals(2, extensionFiles.length); @@ -167,24 +161,23 @@ public void testUnpackNarsFromNonExistantDir() { @Test public void testUnpackNarsFromNonDir() throws IOException { - final File nonDir = new File("./target/file.txt"); - nonDir.createNewFile(); + assertTrue(nonDir.createNewFile()); nonDir.deleteOnExit(); final Map others = new HashMap<>(); others.put("nifi.nar.library.directory.alt", nonDir.toString()); - NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties", others); + NiFiProperties properties = loadSpecifiedProperties(others); final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, SystemBundle.create(properties), NarUnpackMode.UNPACK_INDIVIDUAL_JARS); assertNull(extensionMapping); } - private NiFiProperties loadSpecifiedProperties(final String propertiesFile, final Map others) { + private NiFiProperties loadSpecifiedProperties(final Map others) { String filePath; try { - filePath = NarUnpackerTest.class.getResource(propertiesFile).toURI().getPath(); + filePath = Objects.requireNonNull(NarUnpackerTest.class.getResource(PROPERTIES_PATH)).toURI().getPath(); } catch (URISyntaxException ex) { throw new RuntimeException("Cannot load properties file due to " + ex.getLocalizedMessage(), ex); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java index 52b0dd58a08b1..7f2a49014c6f4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java @@ -40,6 +40,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -55,13 +56,11 @@ import java.util.jar.JarOutputStream; import java.util.jar.Manifest; -import static java.lang.String.format; - -/** - * - */ public final class NarUnpacker { public static final String BUNDLED_DEPENDENCIES_DIRECTORY = "NAR-INF/bundled-dependencies"; + + private static final String BUNDLED_DEPENDENCIES_PREFIX = "META-INF/bundled-dependencies"; + private static final Logger logger = LoggerFactory.getLogger(NarUnpacker.class); private static final String HASH_FILENAME = "nar-digest"; private static final FileFilter NAR_FILTER = pathname -> { @@ -93,8 +92,6 @@ public static ExtensionMapping unpackNars(final Bundle systemBundle, final File final boolean requireFrameworkNar, final String frameworkNarId, final boolean requireJettyNar, final boolean verifyHash, final NarUnpackMode unpackMode, final Predicate narFilter) { - final Map unpackedNars = new HashMap<>(); - try { File unpackedJetty = null; File unpackedFramework = null; @@ -128,13 +125,13 @@ public static ExtensionMapping unpackNars(final Bundle systemBundle, final File if (!narFiles.isEmpty()) { final long startTime = System.nanoTime(); - logger.info("Expanding " + narFiles.size() + " NAR files with all processors..."); + logger.info("Expanding {} NAR files started", narFiles.size()); for (File narFile : narFiles) { if (!narFile.canRead()) { throw new IllegalStateException("Unable to read NAR file: " + narFile.getAbsolutePath()); } - logger.debug("Expanding NAR file: " + narFile.getAbsolutePath()); + logger.debug("Expanding NAR file: {}", narFile.getAbsolutePath()); // get the manifest for this nar try (final JarFile nar = new JarFile(narFile)) { @@ -211,11 +208,11 @@ public static ExtensionMapping unpackNars(final Bundle systemBundle, final File } final long duration = System.nanoTime() - startTime; - logger.info("NAR loading process took " + duration + " nanoseconds " - + "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds)."); + final double durationSeconds = TimeUnit.NANOSECONDS.toMillis(duration) / 1000.0; + logger.info("Expanded {} NAR files in {} seconds ({} ns)", narFiles.size(), durationSeconds, duration); } - unpackedNars.putAll(createUnpackedNarBundleCoordinateMap(extensionsWorkingDir)); + final Map unpackedNars = new HashMap<>(createUnpackedNarBundleCoordinateMap(extensionsWorkingDir)); final ExtensionMapping extensionMapping = new ExtensionMapping(); mapExtensions(unpackedNars, docsWorkingDir, extensionMapping); @@ -224,7 +221,7 @@ public static ExtensionMapping unpackNars(final Bundle systemBundle, final File return extensionMapping; } catch (IOException e) { - logger.warn("Unable to load NAR library bundles due to {} Will proceed without loading any further Nar bundles", e.toString(), e); + logger.warn("Unable to load NAR bundles. Proceeding without loading any further NAR bundles", e); } return null; @@ -236,8 +233,12 @@ public static ExtensionMapping unpackNars(final Bundle systemBundle, final File * @return map of coordinates for bundles */ private static Map createUnpackedNarBundleCoordinateMap(File extensionsWorkingDir) { - Map result = new HashMap<>(); File[] unpackedDirs = extensionsWorkingDir.listFiles(file -> file.isDirectory() && file.getName().endsWith("nar-unpacked")); + if (unpackedDirs == null) { + return Collections.emptyMap(); + } + + final Map result = new HashMap<>(); for (File unpackedDir : unpackedDirs) { Path mf = Paths.get(unpackedDir.getAbsolutePath(), "META-INF", "MANIFEST.MF"); try(InputStream is = Files.newInputStream(mf)) { @@ -245,19 +246,18 @@ private static Map createUnpackedNarBundleCoordinateMap( BundleCoordinate bundleCoordinate = createBundleCoordinate(manifest); result.put(unpackedDir, bundleCoordinate); } catch (IOException e) { - logger.error(format("Unable to parse NAR information from unpacked nar directory [%s].", unpackedDir.getAbsoluteFile()), e); + logger.error("Unable to parse NAR information from unpacked directory [{}]", unpackedDir.getAbsoluteFile(), e); } } return result; } - private static BundleCoordinate createBundleCoordinate(Manifest manifest) { + private static BundleCoordinate createBundleCoordinate(final Manifest manifest) { Attributes mainAttributes = manifest.getMainAttributes(); String groupId = mainAttributes.getValue(NarManifestEntry.NAR_GROUP.getManifestName()); String narId = mainAttributes.getValue(NarManifestEntry.NAR_ID.getManifestName()); String version = mainAttributes.getValue(NarManifestEntry.NAR_VERSION.getManifestName()); - BundleCoordinate bundleCoordinate = new BundleCoordinate(groupId, narId, version); - return bundleCoordinate; + return new BundleCoordinate(groupId, narId, version); } private static void mapExtensions(final Map unpackedNars, final File docsDirectory, final ExtensionMapping mapping) throws IOException { @@ -319,7 +319,7 @@ public static File unpackNar(final File nar, final File baseWorkingDirectory, fi } else { final byte[] hashFileContents = Files.readAllBytes(workingHashFile.toPath()); if (!Arrays.equals(hashFileContents, narDigest)) { - logger.info("Contents of nar {} have changed. Reloading.", new Object[] { nar.getAbsolutePath() }); + logger.info("Reloading changed NAR [{}]", nar.getAbsolutePath()); FileUtils.deleteFile(narWorkingDirectory, true); unpackIndividualJars(nar, narWorkingDirectory, narDigest, unpackMode); } @@ -353,16 +353,11 @@ private static void unpackIndividualJars(final File nar, final File workingDirec final Enumeration jarEntries = jarFile.entries(); while (jarEntries.hasMoreElements()) { final JarEntry jarEntry = jarEntries.nextElement(); - String name = jarEntry.getName(); - if (name.contains("META-INF/bundled-dependencies")){ - name = name.replace("META-INF/bundled-dependencies", BUNDLED_DEPENDENCIES_DIRECTORY); - } - - final File f = new File(workingDirectory, name); + final File jarEntryFile = getMappedJarEntryFile(workingDirectory, jarEntry); if (jarEntry.isDirectory()) { - FileUtils.ensureDirectoryExistAndCanReadAndWrite(f); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(jarEntryFile); } else { - makeFile(jarFile.getInputStream(jarEntry), f); + makeFile(jarFile.getInputStream(jarEntry), jarEntryFile); } } } @@ -398,31 +393,27 @@ private static void unpackToUberJar(final File nar, final File workingDirectory, final Enumeration jarEntries = jarFile.entries(); while (jarEntries.hasMoreElements()) { final JarEntry jarEntry = jarEntries.nextElement(); - String name = jarEntry.getName(); - if (name.contains("META-INF/bundled-dependencies")){ - name = name.replace("META-INF/bundled-dependencies", BUNDLED_DEPENDENCIES_DIRECTORY); - } - - logger.debug("Unpacking NAR entry {}", name); + final File jarEntryFile = getMappedJarEntryFile(workingDirectory, jarEntry); + logger.debug("Unpacking NAR entry {}", jarEntryFile); // If we've not yet created this entry, create it now. If we've already created the entry, ignore it. - if (!entriesCreated.add(name)) { + if (!entriesCreated.add(jarEntry.getName())) { continue; } // Explode anything from META-INF and any WAR files into the nar's output directory instead of copying it to the uber jar. // The WAR files are important so that NiFi can load its UI. The META-INF/ directory is important in order to ensure that our // NarClassLoader has all of the information that it needs. - if (name.contains("META-INF/") || (name.contains("NAR-INF") && name.endsWith(".war"))) { + final String jarEntryFilePath = jarEntryFile.getAbsolutePath(); + if (jarEntryFilePath.contains("META-INF") || (jarEntryFilePath.contains("NAR-INF") && jarEntryFilePath.endsWith(".war"))) { if (jarEntry.isDirectory()) { continue; } - final File outFile = new File(workingDirectory, name); - Files.createDirectories(outFile.getParentFile().toPath()); + Files.createDirectories(jarEntryFile.getParentFile().toPath()); try (final InputStream entryIn = jarFile.getInputStream(jarEntry); - final OutputStream manifestOut = new FileOutputStream(outFile)) { + final OutputStream manifestOut = new FileOutputStream(jarEntryFile)) { copy(entryIn, manifestOut); } @@ -431,9 +422,9 @@ private static void unpackToUberJar(final File nar, final File workingDirectory, if (jarEntry.isDirectory()) { uberJarOut.putNextEntry(new JarEntry(jarEntry.getName())); - } else if (name.endsWith(".jar")) { + } else if (jarEntryFilePath.endsWith(".jar")) { // Unpack each .jar file into the uber jar, taking care to deal with META-INF/ files, etc. carefully. - logger.debug("Unpacking Jar {}", name); + logger.debug("Unpacking JAR {}", jarEntryFile); try (final InputStream entryIn = jarFile.getInputStream(jarEntry); final InputStream in = new BufferedInputStream(entryIn)) { @@ -475,6 +466,7 @@ private static void copyJarContents(final InputStream in, final JarOutputStream JarEntry jarEntry; while ((jarEntry = jarInputStream.getNextJarEntry()) != null) { final String entryName = jarEntry.getName(); + final File outFile = getJarEntryFile(workingDirectory, entryName); // The META-INF/ directory can contain several different types of files. For example, it contains: // MANIFEST.MF @@ -491,8 +483,6 @@ private static void copyJarContents(final InputStream in, final JarOutputStream if ((entryName.contains("META-INF/") && !entryName.contains("META-INF/MANIFEST.MF") ) && !jarEntry.isDirectory()) { logger.debug("Found META-INF/services file {}", entryName); - final File outFile = new File(workingDirectory, entryName); - // Because we're combining multiple jar files into one, we can run into situations where there may be conflicting filenames // such as 1 jar has a file named META-INF/license and another jar file has a META-INF/license/my-license.txt. We can generally // just ignore these, though, as they are not necessary in this temporarily created jar file. So we log it at a debug level and @@ -569,10 +559,12 @@ private static void unpackDocumentation(final BundleCoordinate coordinate, final // go through each entry in this jar for (final Enumeration jarEnumeration = jarFile.entries(); jarEnumeration.hasMoreElements();) { final JarEntry jarEntry = jarEnumeration.nextElement(); + final File jarEntryFile = getJarEntryFile(docsDirectory, jarEntry.getName()); + final String jarEntryName = jarEntryFile.getName(); // if this entry is documentation for this component - if (jarEntry.getName().startsWith(entryName)) { - final String name = StringUtils.substringAfter(jarEntry.getName(), "docs/"); + if (jarEntryName.startsWith(entryName)) { + final String name = StringUtils.substringAfter(jarEntryName, "docs/"); final String path = coordinate.getGroup() + "/" + coordinate.getId() + "/" + coordinate.getVersion() + "/" + name; // if this is a directory create it @@ -581,7 +573,7 @@ private static void unpackDocumentation(final BundleCoordinate coordinate, final // ensure the documentation directory can be created if (!componentDocsDirectory.exists() && !componentDocsDirectory.mkdirs()) { - logger.warn("Unable to create docs directory " + componentDocsDirectory.getAbsolutePath()); + logger.warn("Unable to create docs directory {}", componentDocsDirectory.getAbsolutePath()); break; } } else { @@ -596,9 +588,6 @@ private static void unpackDocumentation(final BundleCoordinate coordinate, final } } - /* - * Returns true if this jar file contains a NiFi component - */ private static ExtensionMapping determineDocumentedNiFiComponents(final BundleCoordinate coordinate, final File jar) throws IOException { final ExtensionMapping mapping = new ExtensionMapping(); @@ -670,6 +659,20 @@ private static void makeFile(final InputStream inputStream, final File file) thr } } + private static File getMappedJarEntryFile(final File workingDirectory, final JarEntry jarEntry) { + final String jarEntryName = jarEntry.getName().replace(BUNDLED_DEPENDENCIES_PREFIX, BUNDLED_DEPENDENCIES_DIRECTORY); + return getJarEntryFile(workingDirectory, jarEntryName); + } + + private static File getJarEntryFile(final File workingDirectory, final String jarEntryName) { + final Path workingDirectoryPath = workingDirectory.toPath().normalize(); + final Path jarEntryPath = workingDirectoryPath.resolve(jarEntryName).normalize(); + if (jarEntryPath.startsWith(workingDirectoryPath)) { + return jarEntryPath.toFile(); + } + throw new IllegalArgumentException(String.format("NAR Entry path not valid [%s]", jarEntryName)); + } + private NarUnpacker() { } } From ca3441b2a315bfe10127c6df7f13ccc02e8dd513 Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Tue, 31 Oct 2023 21:08:20 +0000 Subject: [PATCH 6/6] NIFI-12192 upload test-reports after test-based GitHub Actions complete --- .github/workflows/ci-workflow.yml | 18 +++++----- .github/workflows/integration-tests.yml | 9 ++--- .github/workflows/system-tests.yml | 8 ++--- .github/workflows/test-report.yml | 44 +++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 17 deletions(-) create mode 100644 .github/workflows/test-report.yml diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml index 330d0eef145b1..81d897e4d524c 100644 --- a/.github/workflows/ci-workflow.yml +++ b/.github/workflows/ci-workflow.yml @@ -175,15 +175,15 @@ jobs: ${{ env.MAVEN_VERIFY_COMMAND }} ${{ env.MAVEN_BUILD_PROFILES }} ${{ env.MAVEN_PROJECTS }} - - name: Upload Test Reports + - name: Upload Test Results + if: success() || failure() || cancelled() uses: actions/upload-artifact@v3 with: - name: surefire-reports-ubuntu-21 + name: test-results-ubuntu-en path: | ./**/target/surefire-reports/*.txt ./**/target/surefire-reports/*.xml retention-days: 3 - if: failure() - name: Post Disk Usage run: df if: ${{ always() }} @@ -239,15 +239,15 @@ jobs: ${{ env.MAVEN_VERIFY_COMMAND }} ${{ env.MAVEN_BUILD_PROFILES }} ${{ env.MAVEN_PROJECTS }} - - name: Upload Test Reports + - name: Upload Test Results + if: success() || failure() || cancelled() uses: actions/upload-artifact@v3 with: - name: surefire-reports-macos-jp + name: test-results-macos-jp path: | ./**/target/surefire-reports/*.txt ./**/target/surefire-reports/*.xml retention-days: 3 - if: failure() - name: Post Disk Usage run: df if: ${{ always() }} @@ -305,15 +305,15 @@ jobs: ${{ env.MAVEN_VERIFY_COMMAND }} ${{ env.MAVEN_BUILD_PROFILES }} ${{ env.MAVEN_PROJECTS }} - - name: Upload Test Reports + - name: Upload Test Results + if: always() uses: actions/upload-artifact@v3 with: - name: surefire-reports-windows-fr + name: test-results-windows-fr path: | ./**/target/surefire-reports/*.txt ./**/target/surefire-reports/*.xml retention-days: 3 - if: failure() - name: Post Disk Usage run: df if: ${{ always() }} diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index e0d5eb04c16d4..ef52b5afb9211 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -121,11 +121,12 @@ jobs: ${{ env.MAVEN_COMMAND }} ${{ env.MAVEN_BUILD_ARGUMENTS }} ${{ env.MAVEN_BUILD_EXCLUDE_PROJECTS }} - - name: Upload Troubleshooting Logs - if: failure() || cancelled() + - name: Upload Test Results + if: always() uses: actions/upload-artifact@v3 with: - name: ubuntu-21-failsafe-logs + name: test-results-ubuntu-21 path: | - **/target/failsafe-reports/**/*.txt + ./**/target/failsafe-reports/**/*.txt + ./**/target/failsafe-reports/**/*.xml retention-days: 7 diff --git a/.github/workflows/system-tests.yml b/.github/workflows/system-tests.yml index e9c07148a9ca5..0285eeed98e5d 100644 --- a/.github/workflows/system-tests.yml +++ b/.github/workflows/system-tests.yml @@ -123,13 +123,13 @@ jobs: ${{ env.MAVEN_COMMAND }} ${{ env.MAVEN_RUN_ARGUMENTS }} ${{ env.MAVEN_PROJECTS }} - - name: Upload Troubleshooting Logs - if: failure() || cancelled() + - name: Upload Test Results + if: always() uses: actions/upload-artifact@v3 with: - name: ${{ matrix.os }}-${{ matrix.version }}-troubleshooting-logs + name: test-results-${{ matrix.os }}-${{ matrix.version }} path: | nifi-system-tests/nifi-system-test-suite/target/failsafe-reports/**/*.txt nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/* - retention-days: 7 + retention-days: 7 \ No newline at end of file diff --git a/.github/workflows/test-report.yml b/.github/workflows/test-report.yml new file mode 100644 index 0000000000000..67c346c6666e8 --- /dev/null +++ b/.github/workflows/test-report.yml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +name: test-report + +on: + workflow_run: + workflows: + - ci-workflow + - integration-tests + - system-tests + types: + - completed + +permissions: + contents: read + actions: read + checks: write + +jobs: + report: + timeout-minutes: 30 + runs-on: ubuntu-latest + name: Test Report + steps: + - uses: dorny/test-reporter@v1 + with: + artifact: /test-results-(.*)/ + name: Java Tests + path: '**/*.xml' + reporter: java-junit + max-annotations: '50'