diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml
index 8933b17b70f19..6d3e7576cd36d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml
@@ -37,16 +37,36 @@
org.apache.nifi
nifi-schema-registry-service-api
2.0.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+ 2.0.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-ssl-context-service
+ test
org.apache.nifi
nifi-utils
2.0.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-property-utils
+ 2.0.0-SNAPSHOT
+ test
org.apache.nifi
nifi-record
2.0.0-SNAPSHOT
+ test
org.apache.nifi
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java
new file mode 100644
index 0000000000000..d96f37b3eced4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.kafka.service.Kafka3ConnectionService;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.util.KeystoreType;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Quick integration test for testing against Kafka cluster defined via
+ * confluentinc/kafka-images.
+ */
+public class PublishKafkaSSLIT {
+ private static final String TEST_RECORD_VALUE = "value-" + System.currentTimeMillis();
+
+ private static final String BOOTSTRAP_SERVERS = "";
+ private static final String SSL_CONTEXT_SERVICE_PATH = "";
+ private static final String KEYSTORE_PATH = SSL_CONTEXT_SERVICE_PATH + "/kafka.producer.keystore.jks";
+ private static final String TRUSTSTORE_PATH = SSL_CONTEXT_SERVICE_PATH + "/kafka.producer.truststore.jks";
+ private static final String KEYSTORE_PASSWORD = "";
+ private static final String TRUSTSTORE_PASSWORD = "";
+
+ @Test
+ @Disabled("use this to test 'context.yield()' on misconfiguration of KafkaConnectionService; requires running Kafka cluster")
+ public void testKafkaSSLContext() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class);
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner));
+ runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+
+ runner.enqueue(TEST_RECORD_VALUE);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+ }
+
+ private String addKafkaConnectionService(final TestRunner runner) throws InitializationException {
+ final Map connectionServiceProps = new HashMap<>();
+ connectionServiceProps.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(), BOOTSTRAP_SERVERS);
+ connectionServiceProps.put(Kafka3ConnectionService.SSL_CONTEXT_SERVICE.getName(), addSSLContextService(runner));
+ connectionServiceProps.put(Kafka3ConnectionService.SECURITY_PROTOCOL.getName(), SecurityProtocol.SSL.name());
+
+ final String identifier = Kafka3ConnectionService.class.getSimpleName();
+ final KafkaConnectionService connectionService = new Kafka3ConnectionService();
+ runner.addControllerService(identifier, connectionService, connectionServiceProps);
+
+ runner.enableControllerService(connectionService);
+ return identifier;
+ }
+
+ private String addSSLContextService(final TestRunner runner) throws InitializationException {
+ final String identifier = SSLContextService.class.getSimpleName();
+ final SSLContextService service = new StandardRestrictedSSLContextService();
+ runner.addControllerService(identifier, service);
+
+ runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE, KEYSTORE_PATH);
+ runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE_PASSWORD, KEYSTORE_PASSWORD);
+ runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE_TYPE, KeystoreType.JKS.name());
+ runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE, TRUSTSTORE_PATH);
+ runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE_PASSWORD, TRUSTSTORE_PASSWORD);
+ runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE_TYPE, KeystoreType.JKS.name());
+
+ runner.enableControllerService(service);
+ return identifier;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java
index a803df7b3647a..29e0b69889cec 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java
@@ -53,13 +53,18 @@ public Kafka3ProducerService(final Properties properties,
this.producer = new KafkaProducer<>(properties, serializer, serializer);
this.serviceConfiguration = serviceConfiguration;
this.producerConfiguration = producerConfiguration;
- if (producerConfiguration.getUseTransactions()) {
- producer.initTransactions();
- }
+ }
+
+ @Override
+ public void close() {
+ producer.close();
}
@Override
public RecordSummary send(final Iterator kafkaRecords, final PublishContext publishContext) {
+ if (producerConfiguration.getUseTransactions()) {
+ producer.initTransactions();
+ }
return producerConfiguration.getUseTransactions()
? sendTransaction(kafkaRecords, publishContext)
: sendNoTransaction(kafkaRecords, publishContext);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
index fb759b1d32471..06ff0bca95c5d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java
@@ -301,8 +301,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
final ProducerConfiguration producerConfiguration = new ProducerConfiguration(useTransactions, transactionalIdPrefix);
- final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration);
+ try (final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration)) {
+ publishFlowFile(context, session, flowFile, producerService);
+ } catch (final Throwable e) {
+ getLogger().error(e.getMessage(), e);
+ context.yield();
+ }
+ }
+
+ private void publishFlowFile(final ProcessContext context, final ProcessSession session,
+ final FlowFile flowFile, final KafkaProducerService producerService) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java
index 17cc6cfdad5bf..a4ae52a728be9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java
@@ -19,12 +19,15 @@
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
-public interface KafkaProducerService {
+public interface KafkaProducerService extends Closeable {
RecordSummary send(Iterator records, PublishContext publishContext);
+ void close();
+
List getPartitionStates(String topic);
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java
index 4f7c28be8926e..3b53141a9b1e1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.kafka.service.api.producer;
public class ProducerRecordMetadata {