Skip to content

Commit

Permalink
NIFI-11259 - KafkaProducerService should be closeable. On exceptions …
Browse files Browse the repository at this point in the history
…in service client, finally should close the embedded Kafka Producer<>.
  • Loading branch information
greyp9 committed Nov 27, 2023
1 parent 8c24255 commit a9ca004
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,36 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.processors;

import org.apache.nifi.kafka.service.Kafka3ConnectionService;
import org.apache.nifi.kafka.service.api.KafkaConnectionService;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

/**
* Quick integration test for testing against Kafka cluster defined via
* <a href="https://github.com/confluentinc/kafka-images">confluentinc/kafka-images</a>.
*/
public class PublishKafkaSSLIT {
private static final String TEST_RECORD_VALUE = "value-" + System.currentTimeMillis();

private static final String BOOTSTRAP_SERVERS = "";
private static final String SSL_CONTEXT_SERVICE_PATH = "";
private static final String KEYSTORE_PATH = SSL_CONTEXT_SERVICE_PATH + "/kafka.producer.keystore.jks";
private static final String TRUSTSTORE_PATH = SSL_CONTEXT_SERVICE_PATH + "/kafka.producer.truststore.jks";
private static final String KEYSTORE_PASSWORD = "";
private static final String TRUSTSTORE_PASSWORD = "";

@Test
@Disabled("use this to test 'context.yield()' on misconfiguration of KafkaConnectionService; requires running Kafka cluster")
public void testKafkaSSLContext() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class);
runner.setValidateExpressionUsage(false);
runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());

runner.enqueue(TEST_RECORD_VALUE);
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
}

private String addKafkaConnectionService(final TestRunner runner) throws InitializationException {
final Map<String, String> connectionServiceProps = new HashMap<>();
connectionServiceProps.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(), BOOTSTRAP_SERVERS);
connectionServiceProps.put(Kafka3ConnectionService.SSL_CONTEXT_SERVICE.getName(), addSSLContextService(runner));
connectionServiceProps.put(Kafka3ConnectionService.SECURITY_PROTOCOL.getName(), SecurityProtocol.SSL.name());

final String identifier = Kafka3ConnectionService.class.getSimpleName();
final KafkaConnectionService connectionService = new Kafka3ConnectionService();
runner.addControllerService(identifier, connectionService, connectionServiceProps);

runner.enableControllerService(connectionService);
return identifier;
}

private String addSSLContextService(final TestRunner runner) throws InitializationException {
final String identifier = SSLContextService.class.getSimpleName();
final SSLContextService service = new StandardRestrictedSSLContextService();
runner.addControllerService(identifier, service);

runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE, KEYSTORE_PATH);
runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE_PASSWORD, KEYSTORE_PASSWORD);
runner.setProperty(service, StandardRestrictedSSLContextService.KEYSTORE_TYPE, KeystoreType.JKS.name());
runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE, TRUSTSTORE_PATH);
runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE_PASSWORD, TRUSTSTORE_PASSWORD);
runner.setProperty(service, StandardRestrictedSSLContextService.TRUSTSTORE_TYPE, KeystoreType.JKS.name());

runner.enableControllerService(service);
return identifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ public Kafka3ProducerService(final Properties properties,
this.producer = new KafkaProducer<>(properties, serializer, serializer);
this.serviceConfiguration = serviceConfiguration;
this.producerConfiguration = producerConfiguration;
if (producerConfiguration.getUseTransactions()) {
producer.initTransactions();
}
}

@Override
public void close() {
producer.close();
}

@Override
public RecordSummary send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext) {
if (producerConfiguration.getUseTransactions()) {
producer.initTransactions();
}
return producerConfiguration.getUseTransactions()
? sendTransaction(kafkaRecords, publishContext)
: sendNoTransaction(kafkaRecords, publishContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
final ProducerConfiguration producerConfiguration = new ProducerConfiguration(useTransactions, transactionalIdPrefix);
final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration);

try (final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration)) {
publishFlowFile(context, session, flowFile, producerService);
} catch (final Throwable e) {
getLogger().error(e.getMessage(), e);
context.yield();
}
}

private void publishFlowFile(final ProcessContext context, final ProcessSession session,
final FlowFile flowFile, final KafkaProducerService producerService) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSetWriterFactory keyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;

public interface KafkaProducerService {
public interface KafkaProducerService extends Closeable {

RecordSummary send(Iterator<KafkaRecord> records, PublishContext publishContext);

void close();

List<PartitionState> getPartitionStates(String topic);
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.service.api.producer;

public class ProducerRecordMetadata {
Expand Down

0 comments on commit a9ca004

Please sign in to comment.