diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java index 946fdde94cd7..2e49ef772d5b 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java @@ -126,7 +126,7 @@ public void testTopicPattern() throws ExecutionException, InterruptedException { runner.setProperty(ConsumeKafka.GROUP_ID, groupId); runner.setProperty(ConsumeKafka.TOPICS, topicPattern); - runner.setProperty(ConsumeKafka.TOPIC_TYPE, ConsumeKafka.TOPIC_PATTERN); + runner.setProperty(ConsumeKafka.TOPIC_FORMAT, ConsumeKafka.TOPIC_PATTERN); runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); runner.run(1, false, true); @@ -151,7 +151,7 @@ public void testTopicNames() throws ExecutionException, InterruptedException { runner.setProperty(ConsumeKafka.GROUP_ID, groupId); runner.setProperty(ConsumeKafka.TOPICS, topicNames); - runner.setProperty(ConsumeKafka.TOPIC_TYPE, ConsumeKafka.TOPIC_NAME); + runner.setProperty(ConsumeKafka.TOPIC_FORMAT, ConsumeKafka.TOPIC_NAME); runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, ProcessingStrategy.FLOW_FILE.getValue()); runner.run(1, false, true); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java index 9caa837e292d..80de86053d64 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaDelimitedIT.java @@ -50,7 +50,7 @@ public void test1ProduceOneFlowFile() throws InitializationException { runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "xx"); - runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*"); + runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); final Map attributes = new HashMap<>(); attributes.put("a1", "valueA1"); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java index bb4ee9075052..40bf041a0a51 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaIT.java @@ -47,7 +47,7 @@ public void test_1_KafkaTestContainerProduceOne() throws InitializationException runner.setValidateExpressionUsage(false); runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); - runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*"); + runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); //runner.setProperty(PublishKafka.USE_TRANSACTIONS, Boolean.FALSE.toString()); final Map attributes = new HashMap<>(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java index 5ddb1ea5a340..41fd9dbbc434 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordIT.java @@ -59,8 +59,8 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordWriterService(runner); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); - runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY); - runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*"); + runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY); + runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); final Map attributes = new HashMap<>(); attributes.put(KEY_ATTRIBUTE_KEY, KEY_ATTRIBUTE_VALUE); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaValueRecordIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaValueRecordIT.java index 1f06fec676db..4c7188c22e6d 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaValueRecordIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaValueRecordIT.java @@ -63,9 +63,9 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio addRecordKeyWriterService(runner); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); - runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY); + runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY); runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "address"); - runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*"); + runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.name()); final Map attributes = new HashMap<>(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java index aff79791aa51..c1d1dffe8bc7 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java @@ -63,9 +63,9 @@ public void test_1_KafkaTestContainerProduceOneFlowFile() throws InitializationE addRecordKeyWriterService(runner); runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); - runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY); + runner.setProperty(PublishKafka.KAFKA_KEY, KEY_ATTRIBUTE_KEY); runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "address"); - runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*"); + runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "a.*"); runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); final Map attributes = new HashMap<>(); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java index bb13694ba2c4..f7fba11276c0 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaContentX1IT.java @@ -62,7 +62,7 @@ public void test1ProduceOneFlowFile() throws InitializationException, IOExceptio runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.name()); runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "account"); - runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "attribute.*"); + runner.setProperty(PublishKafka.ATTRIBUTE_HEADER_PATTERN, "attribute.*"); final Map attributes = new HashMap<>(); attributes.put("attributeA", "valueA"); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index 56994c84c4e4..dbc9dca2e377 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -23,10 +23,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.ConfigVerificationResult; @@ -45,8 +44,8 @@ import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; import org.apache.nifi.kafka.service.producer.Kafka3ProducerService; -import org.apache.nifi.kafka.shared.property.KafkaClientProperty; import org.apache.nifi.kafka.shared.property.SaslMechanism; +import org.apache.nifi.kafka.shared.property.IsolationLevel; import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider; import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; @@ -56,9 +55,7 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -79,12 +76,13 @@ + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.", expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT) +@CapabilityDescription("Provides and manages connections to Kafka Brokers for producer or consumer operations.") public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService { public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() .name("bootstrap.servers") .displayName("Bootstrap Servers") - .description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Mapped to Kafka bootstrap.servers") + .description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Corresponds to Kafka bootstrap.servers property") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) @@ -112,9 +110,9 @@ public class Kafka3ConnectionService extends AbstractControllerService implement public static final PropertyDescriptor SASL_USERNAME = new PropertyDescriptor.Builder() .name("sasl.username") - .displayName("Username") + .displayName("SASL Username") .description("Username provided with configured password when using PLAIN or SCRAM SASL Mechanisms") - .required(false) + .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .dependsOn( @@ -127,9 +125,9 @@ public class Kafka3ConnectionService extends AbstractControllerService implement public static final PropertyDescriptor SASL_PASSWORD = new PropertyDescriptor.Builder() .name("sasl.password") - .displayName("Password") + .displayName("SASL Password") .description("Password provided with configured username when using PLAIN or SCRAM SASL Mechanisms") - .required(false) + .required(true) .sensitive(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) @@ -148,11 +146,27 @@ public class Kafka3ConnectionService extends AbstractControllerService implement .identifiesControllerService(SSLContextService.class) .build(); + public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new PropertyDescriptor.Builder() + .name("isolation.level") + .displayName("Transaction Isolation Level") + .description(""" + Specifies how the service should handle transaction isolation levels when communicating with Kafka. + The uncommited option means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. + The committed option configures the service to not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the + consumer must wait for the producer to finish its entire transaction instead of pulling as the messages become available. + Corresponds to Kafka isolation.level property. + """) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(IsolationLevel.class) + .defaultValue(IsolationLevel.READ_COMMITTED) + .required(true) + .build(); + public static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder() .name("max.poll.records") .displayName("Max Poll Records") - .description("Specifies the maximum number of records Kafka should return in a single poll.") - .required(false) + .description("Maximum number of records Kafka should return in a single poll.") + .required(true) .defaultValue("10000") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); @@ -167,53 +181,45 @@ public class Kafka3ConnectionService extends AbstractControllerService implement .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); - public static final PropertyDescriptor TRANSACTION_ISOLATION_LEVEL = new PropertyDescriptor.Builder() - .name("Transaction Isolation Level") - .description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of " - + "read_uncommitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " - + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " - + "for the producer to finish its entire transaction instead of pulling as the messages become available.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues("true", "false") - .defaultValue("true") - .required(true) - .build(); - public static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder() .name("max.block.ms") .displayName("Max Metadata Wait Time") - .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the " - + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .description(""" + The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the + entire 'send' call. Corresponds to Kafka max.block.ms property + """) .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .defaultValue("5 sec") + .defaultValue("5 s") .build(); public static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder() .name("ack.wait.time") .displayName("Acknowledgment Wait Time") - .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " - + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") + .description(""" + After sending a message to Kafka, this indicates the amount of time that the service will wait for a response from Kafka. + If Kafka does not acknowledge the message within this time period, the service will throw an exception. + """) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) - .defaultValue("5 secs") + .defaultValue("5 s") .build(); - private static final List PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + private static final List PROPERTY_DESCRIPTORS = List.of( BOOTSTRAP_SERVERS, SECURITY_PROTOCOL, SASL_MECHANISM, SASL_USERNAME, SASL_PASSWORD, SSL_CONTEXT_SERVICE, + TRANSACTION_ISOLATION_LEVEL, MAX_POLL_RECORDS, CLIENT_TIMEOUT, - TRANSACTION_ISOLATION_LEVEL, METADATA_WAIT_TIME, ACK_WAIT_TIME - )); + ); private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2); @@ -223,8 +229,6 @@ public class Kafka3ConnectionService extends AbstractControllerService implement private Properties clientProperties; - private Properties consumerProperties; - private ServiceConfiguration serviceConfiguration; private Kafka3ConsumerService consumerService; @@ -232,8 +236,8 @@ public class Kafka3ConnectionService extends AbstractControllerService implement @OnEnabled public void onEnabled(final ConfigurationContext configurationContext) { clientProperties = getClientProperties(configurationContext); - consumerProperties = getConsumerProperties(configurationContext, clientProperties); serviceConfiguration = getServiceConfiguration(configurationContext); + final Properties consumerProperties = getConsumerProperties(configurationContext, clientProperties); consumerService = new Kafka3ConsumerService(getLogger(), consumerProperties); } @@ -314,10 +318,8 @@ private Properties getConsumerProperties(final PropertyContext propertyContext, final Properties properties = new Properties(); properties.putAll(defaultProperties); - // since config for ConsumerPool is locked in at ControllerService.enable(), - final boolean honorTransactions = propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asBoolean(); - final KafkaClientProperty isolationLevel = honorTransactions ? KafkaClientProperty.READ_COMMITTED : KafkaClientProperty.READ_UNCOMMITTED; - properties.put(KafkaClientProperty.ISOLATION_LEVEL.getProperty(), isolationLevel.getProperty()); + final IsolationLevel isolationLevel = propertyContext.getProperty(TRANSACTION_ISOLATION_LEVEL).asAllowableValue(IsolationLevel.class); + properties.put(TRANSACTION_ISOLATION_LEVEL.getName(), isolationLevel.getValue()); return properties; } @@ -332,21 +334,6 @@ private Properties getClientProperties(final PropertyContext propertyContext) { final String configuredBootstrapServers = propertyContext.getProperty(BOOTSTRAP_SERVERS).getValue(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configuredBootstrapServers); - final String securityProtocol = propertyContext.getProperty(SECURITY_PROTOCOL).getValue(); - properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol); - - final String saslMechanism = propertyContext.getProperty(SASL_MECHANISM).getValue(); - properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - - final String saslUsername = propertyContext.getProperty(SASL_USERNAME).getValue(); - final String saslPassword = propertyContext.getProperty(SASL_PASSWORD).getValue(); - - if ((saslUsername != null) && (saslPassword != null)) { - properties.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( - "%s required username=\"%s\" password=\"%s\";", - PlainLoginModule.class.getName(), saslUsername, saslPassword)); - } - setSslProperties(properties, propertyContext); final int defaultApiTimeoutMs = getDefaultApiTimeoutMs(propertyContext); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java index b0d3f85a46d3..6f97e94f5c9b 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java @@ -31,8 +31,6 @@ import org.apache.nifi.kafka.service.producer.transaction.KafkaNonTransactionalProducerWrapper; import org.apache.nifi.kafka.service.producer.transaction.KafkaProducerWrapper; import org.apache.nifi.kafka.service.producer.transaction.KafkaTransactionalProducerWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -43,9 +41,8 @@ import java.util.stream.Collectors; public class Kafka3ProducerService implements KafkaProducerService { - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final Producer producer; + private final List callbacks; private final ServiceConfiguration serviceConfiguration; diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java index 12fbb143eacb..10b7c86bba5f 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java @@ -101,7 +101,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcessor { static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); - static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regular expression according to the Java Pattern syntax"); static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() .name("Kafka Connection Service") @@ -119,35 +119,20 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess .expressionLanguageSupported(NONE) .build(); - static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() - .name("topic") - .displayName("Topic Names") - .description("The name of the Kafka Topics from which the Processor consumes Kafka Records. More than one can be supplied if comma separated.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .build(); - - static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() - .name("Topic Name Format") + static final PropertyDescriptor TOPIC_FORMAT = new PropertyDescriptor.Builder() + .name("Topic Format") .description("Specifies whether the Topics provided are a comma separated list of names or a single regular expression") .required(true) .allowableValues(TOPIC_NAME, TOPIC_PATTERN) .defaultValue(TOPIC_NAME) .build(); - static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() - .name("Record Reader") - .description("The Record Reader to use for incoming Kafka messages") - .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .build(); - - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() - .name("Record Writer") - .description("The Record Writer to use in order to serialize the outgoing FlowFiles") - .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() + .name("Topics") + .description("The name or pattern of the Kafka Topics from which the Processor consumes Kafka Records. More than one can be supplied if comma separated.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -160,26 +145,44 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess .expressionLanguageSupported(NONE) .build(); - static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() - .name("Message Demarcator") - .required(false) - .addValidator(Validator.VALID) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " - + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " - + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " - + "will result in a single FlowFile which " - + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") + static final PropertyDescriptor COMMIT_OFFSETS = new PropertyDescriptor.Builder() + .name("Commit Offsets") + .description("Specifies whether this Processor should commit the offsets to Kafka after receiving messages. Typically, this value should be set to true " + + "so that messages that are received are not duplicated. However, in certain scenarios, we may want to avoid committing the offsets, that the data can be " + + "processed and later acknowledged by PublishKafka in order to provide Exactly Once semantics.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") .build(); - static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder() - .name("Separate By Key") - .description("If true, and the [Message Demarcator] property is set, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.") - .required(false) - .allowableValues("true", "false") - .defaultValue("false") + static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder() + .name("Max Uncommitted Time") + .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. " + + "This value impacts how often offsets will be committed. Committing offsets less often increases " + + "throughput but also increases the window of potential data duplication in the event of a rebalance " + + "or JVM restart between commits. This value is also related to maximum poll records and the use " + + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages " + + "than when we're not as there is much less for us to keep track of in memory.") + .required(true) + .defaultValue("1 s") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .dependsOn(COMMIT_OFFSETS, "true") .build(); + static final PropertyDescriptor HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("Header Encoding") + .description("Character encoding applied when reading Kafka Record Header values and writing FlowFile attributes") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(StandardCharsets.UTF_8.name()) + .required(true) + .build(); + + static final PropertyDescriptor HEADER_NAME_PATTERN = new PropertyDescriptor.Builder() + .name("Header Name Pattern") + .description("Regular Expression Pattern applied to Kafka Record Header Names for selecting Header Values to be written as FlowFile attributes") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .required(false) + .build(); static final PropertyDescriptor PROCESSING_STRATEGY = new PropertyDescriptor.Builder() .name("Processing Strategy") @@ -190,25 +193,43 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess .expressionLanguageSupported(NONE) .build(); - static final PropertyDescriptor HEADER_ENCODING = new PropertyDescriptor.Builder() - .name("Header Encoding") - .description("Character encoding applied when reading Kafka Record Header values and writing FlowFile attributes") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue(StandardCharsets.UTF_8.name()) + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("The Record Reader to use for incoming Kafka messages") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("The Record Writer to use in order to serialize the outgoing FlowFiles") + .identifiesControllerService(RecordSetWriterFactory.class) .required(true) + .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD) .build(); static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder() .name("Output Strategy") - .description("The format used to output the Kafka record into a FlowFile record.") + .description("The format used to output the Kafka Record into a FlowFile Record.") .required(true) .defaultValue(OutputStrategy.USE_VALUE) .allowableValues(OutputStrategy.class) + .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.RECORD) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("Key Attribute Encoding") + .description("Encoding for value of configured FlowFile attribute containing Kafka Record Key.") + .required(true) + .defaultValue(KeyEncoding.UTF8) + .allowableValues(KeyEncoding.class) + .dependsOn(OUTPUT_STRATEGY, OutputStrategy.USE_VALUE) .build(); static final PropertyDescriptor KEY_FORMAT = new PropertyDescriptor.Builder() .name("Key Format") - .description("Specifies how to represent the Kafka Record's Key in the output") + .description("Specifies how to represent the Kafka Record Key in the output FlowFile") .required(true) .defaultValue(KeyFormat.BYTE_ARRAY) .allowableValues(KeyFormat.class) @@ -217,49 +238,32 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess static final PropertyDescriptor KEY_RECORD_READER = new PropertyDescriptor.Builder() .name("Key Record Reader") - .description("The Record Reader to use for parsing the Kafka Record's key into a Record") + .description("The Record Reader to use for parsing the Kafka Record Key into a Record") .identifiesControllerService(RecordReaderFactory.class) .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) .dependsOn(KEY_FORMAT, KeyFormat.RECORD) .build(); - static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() - .name("Key Attribute Encoding") - .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") .required(true) - .defaultValue(KeyEncoding.UTF8) - .allowableValues(KeyEncoding.class) - .dependsOn(OUTPUT_STRATEGY, OutputStrategy.USE_VALUE) - .build(); - - static final PropertyDescriptor HEADER_NAME_PATTERN = new PropertyDescriptor.Builder() - .name("Header Name Pattern") - .description("Regular Expression Pattern applied to Kafka Record Header Names for selecting Header Values to be written as FlowFile attributes") - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .required(false) + .addValidator(Validator.VALID) + .description("Since KafkaConsumer receives messages in batches, this Processor has an option to output FlowFiles which contains " + + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " + + "will result in a single FlowFile which " + + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") + .dependsOn(PROCESSING_STRATEGY, ProcessingStrategy.DEMARCATOR) .build(); - static final PropertyDescriptor COMMIT_OFFSETS = new PropertyDescriptor.Builder() - .name("Commit Offsets") - .description("Specifies whether or not this Processor should commit the offsets to Kafka after receiving messages. Typically, we want this value set to true " + - "so that messages that are received are not duplicated. However, in certain scenarios, we may want to avoid committing the offsets, that the data can be " + - "processed and later acknowledged by PublishKafkaRecord in order to provide Exactly Once semantics. See Processor's Usage / Additional Details for more information.") + static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder() + .name("Separate By Key") + .description("When this property is enabled, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.") + .required(true) .allowableValues("true", "false") - .defaultValue("true") - .build(); - - static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder() - .name("Max Uncommitted Time") - .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. " - + "This value impacts how often offsets will be committed. Committing offsets less often increases " - + "throughput but also increases the window of potential data duplication in the event of a rebalance " - + "or JVM restart between commits. This value is also related to maximum poll records and the use " - + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages " - + "than when we're not as there is much less for us to keep track of in memory.") - .required(false) - .defaultValue("1 secs") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .dependsOn(COMMIT_OFFSETS, "true") + .defaultValue("false") + .dependsOn(MESSAGE_DEMARCATOR) .build(); public static final Relationship SUCCESS = new Relationship.Builder() @@ -270,22 +274,22 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess private static final List DESCRIPTORS = List.of( CONNECTION_SERVICE, GROUP_ID, + TOPIC_FORMAT, TOPICS, - TOPIC_TYPE, - MESSAGE_DEMARCATOR, - SEPARATE_BY_KEY, - RECORD_READER, - RECORD_WRITER, AUTO_OFFSET_RESET, - PROCESSING_STRATEGY, - HEADER_ENCODING, - HEADER_NAME_PATTERN, - KEY_ATTRIBUTE_ENCODING, COMMIT_OFFSETS, MAX_UNCOMMITTED_TIME, + HEADER_NAME_PATTERN, + HEADER_ENCODING, + PROCESSING_STRATEGY, + RECORD_READER, + RECORD_WRITER, + OUTPUT_STRATEGY, + KEY_ATTRIBUTE_ENCODING, KEY_FORMAT, KEY_RECORD_READER, - OUTPUT_STRATEGY + MESSAGE_DEMARCATOR, + SEPARATE_BY_KEY ); private static final Set RELATIONSHIPS = Collections.singleton(SUCCESS); @@ -397,7 +401,7 @@ private void processConsumerRecords(final ProcessContext context, final ProcessS private Iterator transformDemarcator(final ProcessContext context, final Iterator consumerRecords) { final PropertyValue propertyValueDemarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR); if (propertyValueDemarcator.isSet()) { - final byte[] demarcator = propertyValueDemarcator.evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); + final byte[] demarcator = propertyValueDemarcator.getValue().getBytes(StandardCharsets.UTF_8); final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); return new ByteRecordBundler(demarcator, separateByKey, keyEncoding, headerNamePattern, headerEncoding, commitOffsets).bundle(consumerRecords); } else { @@ -455,18 +459,18 @@ private PollingContext getPollingContext(final ProcessContext context) { final String offsetReset = context.getProperty(AUTO_OFFSET_RESET).getValue(); final AutoOffsetReset autoOffsetReset = AutoOffsetReset.valueOf(offsetReset.toUpperCase()); final String topics = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue(); - final String topicType = context.getProperty(TOPIC_TYPE).getValue(); + final String topicFormat = context.getProperty(TOPIC_FORMAT).getValue(); final Duration maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asDuration(); final PollingContext pollingContext; - if (topicType.equals(TOPIC_PATTERN.getValue())) { + if (topicFormat.equals(TOPIC_PATTERN.getValue())) { final Pattern topicPattern = Pattern.compile(topics.trim()); pollingContext = new PollingContext(groupId, topicPattern, autoOffsetReset, maxUncommittedTime); - } else if (topicType.equals(TOPIC_NAME.getValue())) { + } else if (topicFormat.equals(TOPIC_NAME.getValue())) { final Collection topicList = KafkaUtils.toTopicList(topics); pollingContext = new PollingContext(groupId, topicList, autoOffsetReset, maxUncommittedTime); } else { - throw new ProcessException(String.format("Subscription type has an unknown value %s", topicType)); + throw new ProcessException(String.format("Topic Format [%s] not supported", topicFormat)); } return pollingContext; } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java index 6a11855edde5..4c41bcd246c2 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java @@ -113,19 +113,85 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); - // https://github.com/apache/kafka/blob/5fa48214448ddf19270a35f1dd5156a4eece4ca7/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L117 - public static final String ACKS_CONFIG = "acks"; - static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name(ACKS_CONFIG) + .name("acks") .displayName("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka Client acks property.") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DeliveryGuarantee.class) .defaultValue(DeliveryGuarantee.DELIVERY_REPLICATED) .build(); + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("Specifies the compression strategy for records sent to Kafka. Corresponds to Kafka Client compression.type property.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka Client max.request.size property.") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + public static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder() + .name("Transactions Enabled") + .description("Specifies whether to provide transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " + + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + + "requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder() + .name("Transactional ID Prefix") + .description("Specifies the KafkaProducer config transactional.id will be a generated UUID and will be prefixed with the configured string.") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .dependsOn(TRANSACTIONS_ENABLED, "true") + .required(false) + .build(); + + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name("partitioner.class") + .displayName("Partitioner Class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka Client partitioner.class property.") + .allowableValues(PartitionStrategy.class) + .defaultValue(PartitionStrategy.RANDOM_PARTITIONING.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + .name("partition") + .displayName("Partition") + .description("Specifies the Kafka Partition destination for Records.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() .name("Record Reader") .description("The Record Reader to use for incoming FlowFiles") @@ -144,7 +210,7 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .name("Publish Strategy") .description("The format used to publish the incoming FlowFile record to Kafka.") .required(true) - .defaultValue(PublishStrategy.USE_VALUE.getValue()) + .defaultValue(PublishStrategy.USE_VALUE) .allowableValues(PublishStrategy.class) .build(); @@ -153,104 +219,58 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE) .required(false) .build(); - static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() - .name("Message Demarcator") + public static final PropertyDescriptor ATTRIBUTE_HEADER_PATTERN = new PropertyDescriptor.Builder() + .name("FlowFile Attribute Header Pattern") + .description("A Regular Expression that is matched against all FlowFile attribute names. " + + "Any attribute whose name matches the pattern will be added to the Kafka messages as a Header. " + + "If not specified, no FlowFile attributes will be added as headers.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE) .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " - + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " - + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " - + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") .build(); - public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() - .name("Max Request Size") - .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + static final PropertyDescriptor HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("Header Encoding") + .description("For any attribute that is added as a Kafka Record Header, this property indicates the Character Encoding to use for serializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue(StandardCharsets.UTF_8.displayName()) .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") + .dependsOn(ATTRIBUTE_HEADER_PATTERN) .build(); - static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + static final PropertyDescriptor KAFKA_KEY = new PropertyDescriptor.Builder() .name("Kafka Key") .description("The Key to use for the Message. " - + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "If not specified, the FlowFile attribute 'kafka.key' is used as the message key, if it is present." + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER) .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() - .name("Key Attribute Encoding") + .name("Kafka Key Attribute Encoding") .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") .required(true) .defaultValue(KeyEncoding.UTF8.getValue()) .allowableValues(KeyEncoding.class) - .build(); - - static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("compression.type") - .displayName("Compression Type") - .description("This parameter allows you to specify the compression codec for all data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues("none", "gzip", "snappy", "lz4") - .defaultValue("none") - .build(); - - public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() - .name("FlowFile Attribute Header Pattern") - .description("A Regular Expression that is matched against all FlowFile attribute names. " - + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " - + "If not specified, no FlowFile attributes will be added as headers.") - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) - .required(false) - .build(); - - public static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder() - .name("Transactions Enabled") - .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " - + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " - + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " - + "requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues("true", "false") - .defaultValue("true") - .required(true) - .build(); - static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder() - .name("Transactional ID Prefix") - .description("When [Transactions Enabled] is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.") - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) - .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) - .dependsOn(TRANSACTIONS_ENABLED, "true") - .required(false) - .build(); - - static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() - .name("Message Header Encoding") - .description("For any attribute that is added as a message header, as configured via the property, " - + "this property indicates the Character Encoding to use for serializing the headers.") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue("UTF-8") - .required(false) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER) .build(); static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder() .name("Record Key Writer") .description("The Record Key Writer to use for outgoing FlowFiles") + .required(false) .identifiesControllerService(RecordSetWriterFactory.class) - .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER) .build(); public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder() @@ -258,75 +278,94 @@ public class PublishKafka extends AbstractProcessor implements KafkaPublishCompo .description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured " + "Topic Name and Partition / Partitioner class properties") .required(true) - .defaultValue(RecordMetadataStrategy.FROM_PROPERTIES.getValue()) + .defaultValue(RecordMetadataStrategy.FROM_PROPERTIES) .allowableValues(RecordMetadataStrategy.class) - .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER) .build(); - static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() - .name("partitioner.class") - .displayName("Partitioner class") - .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") - .allowableValues(PartitionStrategy.class) - .defaultValue(PartitionStrategy.RANDOM_PARTITIONING.getValue()) - .required(false) + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") .build(); - public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() - .name("partition") - .displayName("Partition") - .description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the property.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .build(); private static final List DESCRIPTORS = List.of( CONNECTION_SERVICE, - DELIVERY_GUARANTEE, TOPIC_NAME, - RECORD_READER, - RECORD_WRITER, - PUBLISH_STRATEGY, - RECORD_KEY_WRITER, - RECORD_METADATA_STRATEGY, - MESSAGE_DEMARCATOR, FAILURE_STRATEGY, - KEY, - KEY_ATTRIBUTE_ENCODING, - ATTRIBUTE_NAME_REGEX, + DELIVERY_GUARANTEE, + COMPRESSION_CODEC, + MAX_REQUEST_SIZE, TRANSACTIONS_ENABLED, TRANSACTIONAL_ID_PREFIX, - MESSAGE_HEADER_ENCODING, - MESSAGE_KEY_FIELD, - MAX_REQUEST_SIZE, - COMPRESSION_CODEC, PARTITION_CLASS, - PARTITION + PARTITION, + MESSAGE_DEMARCATOR, + RECORD_READER, + RECORD_WRITER, + PUBLISH_STRATEGY, + MESSAGE_KEY_FIELD, + ATTRIBUTE_HEADER_PATTERN, + HEADER_ENCODING, + KAFKA_KEY, + KEY_ATTRIBUTE_ENCODING, + RECORD_KEY_WRITER, + RECORD_METADATA_STRATEGY ); + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); + @Override public List getSupportedPropertyDescriptors() { return DESCRIPTORS; } - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles for which all content was sent to Kafka.") - .build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); - - private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); - @Override public Set getRelationships() { return RELATIONSHIPS; } + @Override + public List verify(final ProcessContext context, final ComponentLog verificationLogger, + final Map attributes) { + final List verificationResults = new ArrayList<>(); + + final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); + + final boolean transactionsEnabled = context.getProperty(TRANSACTIONS_ENABLED).asBoolean(); + final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue(); + final Supplier transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix); + final String deliveryGuarantee = context.getProperty(DELIVERY_GUARANTEE).getValue(); + final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); + final ProducerConfiguration producerConfiguration = new ProducerConfiguration( + transactionsEnabled, transactionalIdSupplier.get(), deliveryGuarantee, compressionCodec, partitionClass); + final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration); + + final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() + .verificationStepName("Verify Topic Partitions"); + + final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(attributes).getValue(); + try { + final List partitionStates = producerService.getPartitionStates(topicName); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) + .explanation(String.format("Partitions [%d] found for Topic [%s]", partitionStates.size(), topicName)); + } catch (final Exception e) { + getLogger().error("Topic [%s] Partition verification failed", topicName, e); + verificationPartitions + .outcome(ConfigVerificationResult.Outcome.FAILED) + .explanation(String.format("Topic [%s] Partition access failed: %s", topicName, e)); + } + verificationResults.add(verificationPartitions.build()); + + return verificationResults; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final List flowFiles = PublishKafkaUtil.pollFlowFiles(session); @@ -360,7 +399,6 @@ private void publishFlowFiles(final ProcessContext context, final ProcessSession } final RecordSummary recordSummary = producerService.complete(); if (recordSummary.isFailure()) { - // might this be a place we want to behave differently? (route FlowFile to failure only on failure) routeFailureStrategy(context, session, flowFiles); } else { routeResults(session, recordSummary.getFlowFileResults()); @@ -403,20 +441,20 @@ private void publishFlowFile(final ProcessContext context, final ProcessSession final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue()); final RecordMetadataStrategy metadataStrategy = RecordMetadataStrategy.valueOf(context.getProperty(RECORD_METADATA_STRATEGY).getValue()); - final String keyAttribute = context.getProperty(KEY).getValue(); + final String kafkaKeyAttribute = context.getProperty(KAFKA_KEY).getValue(); final String keyAttributeEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); final KeyFactory keyFactory = ((PublishStrategy.USE_VALUE == publishStrategy) && (messageKeyField != null)) ? new MessageKeyFactory(flowFile, messageKeyField, keyWriterFactory, getLogger()) - : new AttributeKeyFactory(keyAttribute, keyAttributeEncoding); + : new AttributeKeyFactory(kafkaKeyAttribute, keyAttributeEncoding); - final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue(); - final Pattern attributeNamePattern = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex); - final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue(); - final Charset charset = Charset.forName(charsetName); - final HeadersFactory headersFactory = new AttributesHeadersFactory(attributeNamePattern, charset); + final String attributeHeaderPatternProperty = context.getProperty(ATTRIBUTE_HEADER_PATTERN).getValue(); + final Pattern attributeHeaderPattern = (attributeHeaderPatternProperty == null) ? null : Pattern.compile(attributeHeaderPatternProperty); + final String headerEncoding = context.getProperty(HEADER_ENCODING).evaluateAttributeExpressions().getValue(); + final Charset headerEncodingCharacterSet = Charset.forName(headerEncoding); + final HeadersFactory headersFactory = new AttributesHeadersFactory(attributeHeaderPattern, headerEncodingCharacterSet); - final KafkaRecordConverter kafkaRecordConverter = getKafkaRecordConverterFor( + final KafkaRecordConverter kafkaRecordConverter = getKafkaRecordConverter( publishStrategy, metadataStrategy, readerFactory, writerFactory, keyWriterFactory, keyFactory, headersFactory, propertyDemarcator, flowFile, maxMessageSize); final PublishCallback callback = new PublishCallback( @@ -433,12 +471,18 @@ private Integer getPartition(final ProcessContext context, final FlowFile flowFi return null; } - private KafkaRecordConverter getKafkaRecordConverterFor( - final PublishStrategy publishStrategy, final RecordMetadataStrategy metadataStrategy, - final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, + private KafkaRecordConverter getKafkaRecordConverter( + final PublishStrategy publishStrategy, + final RecordMetadataStrategy metadataStrategy, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, final RecordSetWriterFactory keyWriterFactory, - final KeyFactory keyFactory, final HeadersFactory headersFactory, - final PropertyValue propertyValueDemarcator, final FlowFile flowFile, final int maxMessageSize) { + final KeyFactory keyFactory, + final HeadersFactory headersFactory, + final PropertyValue propertyValueDemarcator, + final FlowFile flowFile, + final int maxMessageSize + ) { final KafkaRecordConverter kafkaRecordConverter; if ((readerFactory != null) && (writerFactory != null)) { if (publishStrategy == PublishStrategy.USE_WRAPPER) { @@ -484,46 +528,9 @@ public void process(final InputStream in) { final Iterator records = kafkaConverter.convert(attributes, is, inputLength); producerService.send(records, publishContext); } catch (final Exception e) { - publishContext.setException(e); // on data pre-process failure, indicate this to controller service + publishContext.setException(e); // on data pre-process failure, indicate this to controller service producerService.send(Collections.emptyIterator(), publishContext); } } } - - @Override - public List verify(final ProcessContext context, final ComponentLog verificationLogger, - final Map attributes) { - final List verificationResults = new ArrayList<>(); - - final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class); - - final boolean transactionsEnabled = context.getProperty(TRANSACTIONS_ENABLED).asBoolean(); - final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue(); - final Supplier transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix); - final String deliveryGuarantee = context.getProperty(DELIVERY_GUARANTEE).getValue(); - final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); - final String partitionClass = context.getProperty(PARTITION_CLASS).getValue(); - final ProducerConfiguration producerConfiguration = new ProducerConfiguration( - transactionsEnabled, transactionalIdSupplier.get(), deliveryGuarantee, compressionCodec, partitionClass); - final KafkaProducerService producerService = connectionService.getProducerService(producerConfiguration); - - final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder() - .verificationStepName("Verify Topic Partitions"); - - final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions(attributes).getValue(); - try { - final List partitionStates = producerService.getPartitionStates(topicName); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL) - .explanation(String.format("Partitions [%d] found for Topic [%s]", partitionStates.size(), topicName)); - } catch (final Exception e) { - getLogger().error("Topic [%s] Partition verification failed", topicName, e); - verificationPartitions - .outcome(ConfigVerificationResult.Outcome.FAILED) - .explanation(String.format("Topic [%s] Partition access failed: %s", topicName, e)); - } - verificationResults.add(verificationPartitions.build()); - - return verificationResults; - } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/header/AttributesHeadersFactory.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/header/AttributesHeadersFactory.java index 8540cecbe074..401a6c1eea5f 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/header/AttributesHeadersFactory.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/header/AttributesHeadersFactory.java @@ -18,7 +18,6 @@ import org.apache.nifi.kafka.service.api.header.RecordHeader; -import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -37,7 +36,7 @@ public AttributesHeadersFactory(final Pattern attributeNamePattern, final Charse this.messageHeaderCharset = messageHeaderCharset; } - public List getHeaders(final Map attributes) throws IOException { + public List getHeaders(final Map attributes) { final List headers = new ArrayList<>(); if (attributeNamePattern != null) { for (final Map.Entry entry : attributes.entrySet()) { diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java index ff3a6ae4c529..e91c2d67609a 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java @@ -69,7 +69,7 @@ private byte[] toBytes(final Record field, final RecordSetWriterFactory writerFa throws MalformedRecordException, SchemaNotFoundException, IOException { if (writerFactory == null) { throw new MalformedRecordException("Record has a key that is itself a record, but the 'Record Key Writer' " - + "of the processor was not configured. If Records are expected to have a Record as the key, the " + + "of the processor was not configured. If Records are expected to have a Record as the key, the " + "'Record Key Writer' property must be set."); } final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), field.getSchema()); diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/IsolationLevel.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/IsolationLevel.java new file mode 100644 index 000000000000..e0e11d9c9d84 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/IsolationLevel.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.property; + +import org.apache.nifi.components.DescribedValue; + +/** + * Enumeration of supported Kafka Isolation Levels + */ +public enum IsolationLevel implements DescribedValue { + READ_COMMITTED("read_committed", "Read Committed", "Read records from Kafka only after the producer has committed a transaction."), + READ_UNCOMMITTED("read_uncommitted", "Read Uncommitted", "Read records from Kafka as soon as the produced has sent the record, regardless of transaction status."); + + private final String value; + private final String displayName; + private final String description; + + IsolationLevel(final String value, final String displayName, final String description) { + this.value = value; + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java index b8a25f20d233..4769f0299d83 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java @@ -30,11 +30,7 @@ public enum KafkaClientProperty { SSL_KEY_PASSWORD("ssl.key.password"), SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), - SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), - - ISOLATION_LEVEL("isolation.level"), - READ_COMMITTED("read_committed"), - READ_UNCOMMITTED("read_uncommitted"); + SSL_TRUSTSTORE_TYPE("ssl.truststore.type"); private final String property;