From af20b4fa793fb6393f2f7d9a431140349482a320 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Mon, 24 Jun 2024 16:34:26 -0500 Subject: [PATCH] NIFI-11259 Removed inconsistent PublishKafkaRecordPartialHandlingIT --- .../PublishKafkaRecordPartialHandlingIT.java | 101 ------------------ .../kafka/processors/publish/ff.partial.json | 8 -- 2 files changed, 109 deletions(-) delete mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordPartialHandlingIT.java delete mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/processors/publish/ff.partial.json diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordPartialHandlingIT.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordPartialHandlingIT.java deleted file mode 100644 index 3aabfea9e8df..000000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaRecordPartialHandlingIT.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.kafka.processors; - -import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.commons.io.IOUtils; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.nifi.kafka.shared.property.FailureStrategy; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.io.IOException; -import java.util.Collections; -import java.util.Objects; -import java.util.stream.Stream; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -/** - * Test handling of record-based FlowFile, given various NiFi and Kafka configurations. Kafka transactionality allows - * records in a FlowFile to succeed or fail as a unit. Non-transactional sends might allow leakage of well-formatted - * records at the beginning of a FlowFile with malformed content. - */ -public class PublishKafkaRecordPartialHandlingIT extends AbstractPublishKafkaIT { - private static final String TEST_RESOURCE = "org/apache/nifi/kafka/processors/publish/ff.partial.json"; - - private static Stream publishArguments() { - return Stream.of( - Arguments.of("no-transactions-route-failure", Boolean.FALSE, FailureStrategy.ROUTE_TO_FAILURE, 0, 1, Boolean.FALSE, 1), - Arguments.of("transactions-route-failure", Boolean.TRUE, FailureStrategy.ROUTE_TO_FAILURE, 0, 1, Boolean.FALSE, 0), - Arguments.of("no-transactions-rollback", Boolean.FALSE, FailureStrategy.ROLLBACK, 0, 0, Boolean.TRUE, 1), // [1] - Arguments.of("transactions-rollback", Boolean.TRUE, FailureStrategy.ROLLBACK, 0, 0, Boolean.TRUE, 0) - ); - } - - // [1] the Kafka client library requires "transactional" mode to be enabled in order to fail all records/KafkaMessages in a given FlowFile - - @ParameterizedTest - @MethodSource("publishArguments") - public void test1ProduceOneFlowFile(final String label, - final Boolean useTransactions, - final FailureStrategy failureStrategy, - final Integer expectTransferSuccess, - final Integer expectTransferFailure, - final Boolean expectYield, - final Integer expectedKafkaMessageCount) throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class); - runner.setValidateExpressionUsage(false); - runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); - addRecordReaderService(runner); - addRecordWriterService(runner); - - final String topicName = getClass().getName() + "." + label; - runner.setProperty(PublishKafka.TOPIC_NAME, topicName); - runner.setProperty(PublishKafka.TRANSACTIONS_ENABLED, useTransactions.toString()); - runner.setProperty(PublishKafka.FAILURE_STRATEGY, failureStrategy); - - final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( - getClass().getClassLoader().getResource(TEST_RESOURCE))); - runner.enqueue(bytesFlowFile); - - runner.run(1); - runner.assertTransferCount(PublishKafka.REL_SUCCESS, expectTransferSuccess); - runner.assertTransferCount(PublishKafka.REL_FAILURE, expectTransferFailure); - assertEquals(expectYield, runner.isYieldCalled()); - - checkKafkaState(topicName, expectedKafkaMessageCount); - } - - public void checkKafkaState(final String topicName, final int expectedKafkaMessageCount) throws JsonProcessingException { - try (KafkaConsumer consumer = new KafkaConsumer<>(getKafkaConsumerProperties())) { - consumer.subscribe(Collections.singletonList(topicName)); - final ConsumerRecords records = consumer.poll(DURATION_POLL); - assertEquals(expectedKafkaMessageCount, records.count()); - for (ConsumerRecord record : records) { - assertNotNull(objectMapper.readTree(record.value())); - } - } - } -} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/processors/publish/ff.partial.json b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/processors/publish/ff.partial.json deleted file mode 100644 index 94f8120a6b0c..000000000000 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/processors/publish/ff.partial.json +++ /dev/null @@ -1,8 +0,0 @@ -[ - { - "a": "A", - "b": "B" - }, - true, - -2106859769 -]