Skip to content

Commit

Permalink
NIFI-11259 - enable Kafka SSL connections
Browse files Browse the repository at this point in the history
  • Loading branch information
greyp9 committed Nov 27, 2023
1 parent 7fdc26d commit 8c24255
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
Expand All @@ -46,6 +47,7 @@
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -59,6 +61,13 @@

import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_PASSWORD;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD;
import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE;

public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService {

Expand Down Expand Up @@ -122,6 +131,14 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
)
.build();

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
.description("Service supporting SSL communication with Kafka brokers")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();

public static final PropertyDescriptor CLIENT_TIMEOUT = new PropertyDescriptor.Builder()
.name("default.api.timeout.ms")
.displayName("Client Timeout")
Expand Down Expand Up @@ -160,6 +177,7 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
SASL_MECHANISM,
SASL_USERNAME,
SASL_PASSWORD,
SSL_CONTEXT_SERVICE,
CLIENT_TIMEOUT,
METADATA_WAIT_TIME,
ACK_WAIT_TIME
Expand Down Expand Up @@ -267,6 +285,8 @@ private Properties getClientProperties(final PropertyContext propertyContext) {
PlainLoginModule.class.getName(), saslUsername, saslPassword));
}

setSslProperties(properties, propertyContext);

final int defaultApiTimeoutMs = getDefaultApiTimeoutMs(propertyContext);
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeoutMs);

Expand All @@ -279,6 +299,29 @@ private Properties getClientProperties(final PropertyContext propertyContext) {
return properties;
}

private void setSslProperties(final Properties properties, final PropertyContext context) {
final PropertyValue sslContextServiceProperty = context.getProperty(SSL_CONTEXT_SERVICE);
if (sslContextServiceProperty.isSet()) {
final SSLContextService sslContextService = sslContextServiceProperty.asControllerService(SSLContextService.class);
if (sslContextService.isKeyStoreConfigured()) {
properties.put(SSL_KEYSTORE_LOCATION.getProperty(), sslContextService.getKeyStoreFile());
properties.put(SSL_KEYSTORE_TYPE.getProperty(), sslContextService.getKeyStoreType());

final String keyStorePassword = sslContextService.getKeyStorePassword();
properties.put(SSL_KEYSTORE_PASSWORD.getProperty(), keyStorePassword);

final String keyPassword = sslContextService.getKeyPassword();
final String configuredKeyPassword = keyPassword == null ? keyStorePassword : keyPassword;
properties.put(SSL_KEY_PASSWORD.getProperty(), configuredKeyPassword);
}
if (sslContextService.isTrustStoreConfigured()) {
properties.put(SSL_TRUSTSTORE_LOCATION.getProperty(), sslContextService.getTrustStoreFile());
properties.put(SSL_TRUSTSTORE_TYPE.getProperty(), sslContextService.getTrustStoreType());
properties.put(SSL_TRUSTSTORE_PASSWORD.getProperty(), sslContextService.getTrustStorePassword());
}
}
}

private ServiceConfiguration getServiceConfiguration(final PropertyContext propertyContext) {
final long maxAckWaitMillis = propertyContext.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
return new ServiceConfiguration(maxAckWaitMillis);
Expand Down

0 comments on commit 8c24255

Please sign in to comment.