Skip to content

Commit

Permalink
NIFI-11259 Adjusted property ordering and naming
Browse files Browse the repository at this point in the history
  • Loading branch information
exceptionfactory committed Jun 24, 2024
1 parent 5a1a56d commit 9c22e5a
Show file tree
Hide file tree
Showing 15 changed files with 385 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testTopicPattern() throws ExecutionException, InterruptedException {

runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topicPattern);
runner.setProperty(ConsumeKafka.TOPIC_TYPE, ConsumeKafka.TOPIC_PATTERN);
runner.setProperty(ConsumeKafka.TOPIC_FORMAT, ConsumeKafka.TOPIC_PATTERN);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue());
runner.run(1, false, true);

Expand All @@ -151,7 +151,7 @@ public void testTopicNames() throws ExecutionException, InterruptedException {

runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
runner.setProperty(ConsumeKafka.TOPICS, topicNames);
runner.setProperty(ConsumeKafka.TOPIC_TYPE, ConsumeKafka.TOPIC_NAME);
runner.setProperty(ConsumeKafka.TOPIC_FORMAT, ConsumeKafka.TOPIC_NAME);
runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue());
runner.run(1, false, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void test1ProduceOneFlowFile() throws InitializationException {
runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "xx");
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*");
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");

final Map<String, String> attributes = new HashMap<>();
attributes.put("a1", "valueA1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void test_1_KafkaTestContainerProduceOne() throws InitializationException
runner.setValidateExpressionUsage(false);
runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner));
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*");
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
//runner.setProperty(PublishKafka.USE_TRANSACTIONS, Boolean.FALSE.toString());

final Map<String, String> attributes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordWriterService(runner);

runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY);
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*");
runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY);
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");

final Map<String, String> attributes = new HashMap<>();
attributes.put(KEY_ATTRIBUTE_KEY, KEY_ATTRIBUTE_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
addRecordKeyWriterService(runner);

runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY);
runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY);
runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "address");
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*");
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.name());

final Map<String, String> attributes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public void test_1_KafkaTestContainerProduceOneFlowFile() throws InitializationE
addRecordKeyWriterService(runner);

runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY);
runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY);
runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "address");
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*");
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*");
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name());

final Map<String, String> attributes = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio
runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.name());
runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "account");
runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "attribute.*");
runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "attribute.*");

final Map<String, String> attributes = new HashMap<>();
attributes.put("attributeA", "valueA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
Expand All @@ -45,8 +44,8 @@
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.IsolationLevel;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
Expand All @@ -56,9 +55,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -79,12 +76,13 @@
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
@CapabilityDescription("Provides and manages connections to Kafka Brokers for producer or consumer operations.")
public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService {

public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name("bootstrap.servers")
.displayName("Bootstrap Servers")
.description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Mapped to Kafka bootstrap.servers")
.description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Corresponds to Kafka bootstrap.servers property")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
Expand Down Expand Up @@ -112,9 +110,9 @@ public class Kafka3ConnectionService extends AbstractControllerService implement

public static final PropertyDescriptor SASL_USERNAME = new PropertyDescriptor.Builder()
.name("sasl.username")
.displayName("Username")
.displayName("SASL Username")
.description("Username provided with configured password when using PLAIN or SCRAM SASL Mechanisms")
.required(false)
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(
Expand All @@ -127,9 +125,9 @@ public class Kafka3ConnectionService extends AbstractControllerService implement

public static final PropertyDescriptor SASL_PASSWORD = new PropertyDescriptor.Builder()
.name("sasl.password")
.displayName("Password")
.displayName("SASL Password")
.description("Password provided with configured username when using PLAIN or SCRAM SASL Mechanisms")
.required(false)
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
Expand All @@ -148,11 +146,27 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.identifiesControllerService(SSLContextService.class)
.build();

public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
.name("isolation.level")
.displayName("Transaction Isolation Level")
.description("""
Specifies how the service should handle transaction isolation levels when communicating with Kafka.
The uncommited option means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions.
The committed option configures the service to not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the
consumer must wait for the producer to finish its entire transaction instead of pulling as the messages become available.
Corresponds to Kafka isolation.level property.
""")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(IsolationLevel.class)
.defaultValue(IsolationLevel.READ_COMMITTED)
.required(true)
.build();

public static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
.description("Specifies the maximum number of records Kafka should return in a single poll.")
.required(false)
.description("Maximum number of records Kafka should return in a single poll.")
.required(true)
.defaultValue("10000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
Expand All @@ -167,53 +181,45 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
.name("Transaction Isolation Level")
.description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of "
+ "read_uncommitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+ "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+ "for the producer to finish its entire transaction instead of pulling as the messages become available.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();

public static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
.name("max.block.ms")
.displayName("Max Metadata Wait Time")
.description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.description("""
The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the
entire 'send' call. Corresponds to Kafka max.block.ms property
""")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("5 sec")
.defaultValue("5 s")
.build();

public static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
.name("ack.wait.time")
.displayName("Acknowledgment Wait Time")
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.description("""
After sending a message to Kafka, this indicates the amount of time that the service will wait for a response from Kafka.
If Kafka does not acknowledge the message within this time period, the service will throw an exception.
""")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 secs")
.defaultValue("5 s")
.build();

private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
BOOTSTRAP_SERVERS,
SECURITY_PROTOCOL,
SASL_MECHANISM,
SASL_USERNAME,
SASL_PASSWORD,
SSL_CONTEXT_SERVICE,
TRANSACTION_ISOLATION_LEVEL,
MAX_POLL_RECORDS,
CLIENT_TIMEOUT,
TRANSACTION_ISOLATION_LEVEL,
METADATA_WAIT_TIME,
ACK_WAIT_TIME
));
);

private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2);

Expand All @@ -223,17 +229,15 @@ public class Kafka3ConnectionService extends AbstractControllerService implement

private Properties clientProperties;

private Properties consumerProperties;

private ServiceConfiguration serviceConfiguration;

private Kafka3ConsumerService consumerService;

@OnEnabled
public void onEnabled(final ConfigurationContext configurationContext) {
clientProperties = getClientProperties(configurationContext);
consumerProperties = getConsumerProperties(configurationContext, clientProperties);
serviceConfiguration = getServiceConfiguration(configurationContext);
final Properties consumerProperties = getConsumerProperties(configurationContext, clientProperties);
consumerService = new Kafka3ConsumerService(getLogger(), consumerProperties);
}

Expand Down Expand Up @@ -314,10 +318,8 @@ private Properties getConsumerProperties(final PropertyContext propertyContext,
final Properties properties = new Properties();
properties.putAll(defaultProperties);

// since config for ConsumerPool is locked in at ControllerService.enable(),
final boolean honorTransactions = propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asBoolean();
final KafkaClientProperty isolationLevel = honorTransactions ? KafkaClientProperty.READ_COMMITTED : KafkaClientProperty.READ_UNCOMMITTED;
properties.put(KafkaClientProperty.ISOLATION_LEVEL.getProperty(), isolationLevel.getProperty());
final IsolationLevel isolationLevel = propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class);
properties.put(TRANSACTION_ISOLATION_LEVEL.getName(), isolationLevel.getValue());

return properties;
}
Expand All @@ -332,21 +334,6 @@ private Properties getClientProperties(final PropertyContext propertyContext) {
final String configuredBootstrapServers = propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configuredBootstrapServers);

final String securityProtocol = propertyContext.getProperty(SECURITY_PROTOCOL).getValue();
properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);

final String saslMechanism = propertyContext.getProperty(SASL_MECHANISM).getValue();
properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);

final String saslUsername = propertyContext.getProperty(SASL_USERNAME).getValue();
final String saslPassword = propertyContext.getProperty(SASL_PASSWORD).getValue();

if ((saslUsername != null) && (saslPassword != null)) {
properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" password=\"%s\";",
PlainLoginModule.class.getName(), saslUsername, saslPassword));
}

setSslProperties(properties, propertyContext);

final int defaultApiTimeoutMs = getDefaultApiTimeoutMs(propertyContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.nifi.kafka.service.producer.transaction.KafkaNonTransactionalProducerWrapper;
import org.apache.nifi.kafka.service.producer.transaction.KafkaProducerWrapper;
import org.apache.nifi.kafka.service.producer.transaction.KafkaTransactionalProducerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand All @@ -43,9 +41,8 @@
import java.util.stream.Collectors;

public class Kafka3ProducerService implements KafkaProducerService {
private final Logger logger = LoggerFactory.getLogger(getClass());

private final Producer<byte[], byte[]> producer;

private final List<ProducerCallback> callbacks;

private final ServiceConfiguration serviceConfiguration;
Expand Down
Loading

0 comments on commit 9c22e5a

Please sign in to comment.