From a9ca004166a542ca274ea6f2d33c0afc63523b42 Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Wed, 22 Nov 2023 19:23:02 -0500 Subject: [PATCH] NIFI-11259 - KafkaProducerService should be closeable. On exceptions in service client, finally should close the embedded Kafka Producer<>. --- .../nifi-kafka-3-integration/pom.xml | 20 +++++ .../kafka/processors/PublishKafkaSSLIT.java | 90 +++++++++++++++++++ .../producer/Kafka3ProducerService.java | 11 ++- .../nifi/kafka/processors/PublishKafka.java | 11 ++- .../api/producer/KafkaProducerService.java | 5 +- .../api/producer/ProducerRecordMetadata.java | 16 ++++ 6 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml index 8933b17b70f19..6d3e7576cd36d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/pom.xml @@ -37,16 +37,36 @@ org.apache.nifi nifi-schema-registry-service-api 2.0.0-SNAPSHOT + test + + + org.apache.nifi + nifi-ssl-context-service-api + 2.0.0-SNAPSHOT + test + + + org.apache.nifi + nifi-ssl-context-service + test org.apache.nifi nifi-utils 2.0.0-SNAPSHOT + test + + + org.apache.nifi + nifi-property-utils + 2.0.0-SNAPSHOT + test org.apache.nifi nifi-record 2.0.0-SNAPSHOT + test org.apache.nifi diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java new file mode 100644 index 0000000000000..d96f37b3eced4 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaSSLIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.kafka.service.Kafka3ConnectionService; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.shared.property.SecurityProtocol; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.util.KeystoreType; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardRestrictedSSLContextService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Quick integration test for testing against Kafka cluster defined via + * confluentinc/kafka-images. + */ +public class PublishKafkaSSLIT { + private static final String TEST_RECORD_VALUE = "value-" + System.currentTimeMillis(); + + private static final String BOOTSTRAP_SERVERS = ""; + private static final String SSL_CONTEXT_SERVICE_PATH = ""; + private static final String KEYSTORE_PATH = SSL_CONTEXT_SERVICE_PATH + "/kafka.producer.keystore.jks"; + private static final String TRUSTSTORE_PATH = SSL_CONTEXT_SERVICE_PATH + "/kafka.producer.truststore.jks"; + private static final String KEYSTORE_PASSWORD = ""; + private static final String TRUSTSTORE_PASSWORD = ""; + + @Test + @Disabled("use this to test 'context.yield()' on misconfiguration of KafkaConnectionService; requires running Kafka cluster") + public void testKafkaSSLContext() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class); + runner.setValidateExpressionUsage(false); + runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); + runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); + + runner.enqueue(TEST_RECORD_VALUE); + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); + } + + private String addKafkaConnectionService(final TestRunner runner) throws InitializationException { + final Map connectionServiceProps = new HashMap<>(); + connectionServiceProps.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(), BOOTSTRAP_SERVERS); + connectionServiceProps.put(Kafka3ConnectionService.SSL_CONTEXT_SERVICE.getName(), addSSLContextService(runner)); + connectionServiceProps.put(Kafka3ConnectionService.SECURITY_PROTOCOL.getName(), SecurityProtocol.SSL.name()); + + final String identifier = Kafka3ConnectionService.class.getSimpleName(); + final KafkaConnectionService connectionService = new Kafka3ConnectionService(); + runner.addControllerService(identifier, connectionService, connectionServiceProps); + + runner.enableControllerService(connectionService); + return identifier; + } + + private String addSSLContextService(final TestRunner runner) throws InitializationException { + final String identifier = SSLContextService.class.getSimpleName(); + final SSLContextService service = new StandardRestrictedSSLContextService(); + runner.addControllerService(identifier, service); + + runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE, KEYSTORE_PATH); + runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE_PASSWORD, KEYSTORE_PASSWORD); + runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE_TYPE, KeystoreType.JKS.name()); + runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE, TRUSTSTORE_PATH); + runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE_PASSWORD, TRUSTSTORE_PASSWORD); + runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE_TYPE, KeystoreType.JKS.name()); + + runner.enableControllerService(service); + return identifier; + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java index a803df7b3647a..29e0b69889cec 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java @@ -53,13 +53,18 @@ public Kafka3ProducerService(final Properties properties, this.producer = new KafkaProducer<>(properties, serializer, serializer); this.serviceConfiguration = serviceConfiguration; this.producerConfiguration = producerConfiguration; - if (producerConfiguration.getUseTransactions()) { - producer.initTransactions(); - } + } + + @Override + public void close() { + producer.close(); } @Override public RecordSummary send(final Iterator kafkaRecords, final PublishContext publishContext) { + if (producerConfiguration.getUseTransactions()) { + producer.initTransactions(); + } return producerConfiguration.getUseTransactions() ? sendTransaction(kafkaRecords, publishContext) : sendNoTransaction(kafkaRecords, publishContext); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java index fb759b1d32471..06ff0bca95c5d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java @@ -301,8 +301,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean(); final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue(); final ProducerConfiguration producerConfiguration = new ProducerConfiguration(useTransactions, transactionalIdPrefix); - final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration); + try (final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration)) { + publishFlowFile(context, session, flowFile, producerService); + } catch (final Throwable e) { + getLogger().error(e.getMessage(), e); + context.yield(); + } + } + + private void publishFlowFile(final ProcessContext context, final ProcessSession session, + final FlowFile flowFile, final KafkaProducerService producerService) { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java index 17cc6cfdad5bf..a4ae52a728be9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/KafkaProducerService.java @@ -19,12 +19,15 @@ import org.apache.nifi.kafka.service.api.common.PartitionState; import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import java.io.Closeable; import java.util.Iterator; import java.util.List; -public interface KafkaProducerService { +public interface KafkaProducerService extends Closeable { RecordSummary send(Iterator records, PublishContext publishContext); + void close(); + List getPartitionStates(String topic); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java index 4f7c28be8926e..3b53141a9b1e1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/producer/ProducerRecordMetadata.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.kafka.service.api.producer; public class ProducerRecordMetadata {