From 333663190b5549adc74c912efbf2c334e04653ee Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 21 Aug 2024 10:29:31 -0400 Subject: [PATCH] Fix ConcurrentMessageListenerContainerTests for race condition We cannot use the same `topic1` for different tests since consumers from other tests may consume data for us --- ...ncurrentMessageListenerContainerTests.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index fbfb6c4e7c..b197095181 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -82,14 +82,14 @@ * @author Soby Chacko * @author Lokesh Alamuri */ -@EmbeddedKafka(topics = { ConcurrentMessageListenerContainerTests.topic1, +@EmbeddedKafka(topics = {ConcurrentMessageListenerContainerTests.topic1, ConcurrentMessageListenerContainerTests.topic2, ConcurrentMessageListenerContainerTests.topic4, ConcurrentMessageListenerContainerTests.topic5, ConcurrentMessageListenerContainerTests.topic6, ConcurrentMessageListenerContainerTests.topic7, ConcurrentMessageListenerContainerTests.topic8, ConcurrentMessageListenerContainerTests.topic9, ConcurrentMessageListenerContainerTests.topic10, ConcurrentMessageListenerContainerTests.topic11, - ConcurrentMessageListenerContainerTests.topic12 }, - brokerProperties = "group.initial.rebalance.delay.ms:500") + ConcurrentMessageListenerContainerTests.topic12, ConcurrentMessageListenerContainerTests.topic13}, + brokerProperties = "group.initial.rebalance.delay.ms:500") public class ConcurrentMessageListenerContainerTests { private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); @@ -116,6 +116,8 @@ public class ConcurrentMessageListenerContainerTests { public static final String topic12 = "testTopic12"; + public static final String topic13 = "testTopic13"; + private static EmbeddedKafkaBroker embeddedKafka; @BeforeAll @@ -209,7 +211,8 @@ protected Consumer createKafkaConsumer(String groupId, String c Set> children = new HashSet<>(containers); assertThat(container.isInExpectedState()).isTrue(); MessageListenerContainer childContainer = container.getContainers().get(0); - container.getContainers().get(0).stopAbnormally(() -> { }); + container.getContainers().get(0).stopAbnormally(() -> { + }); assertThat(container.isInExpectedState()).isFalse(); container.getContainers().get(0).start(); container.stop(); @@ -253,7 +256,7 @@ public void testAutoCommitWithRebalanceListener() throws Exception { @Override protected Consumer createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, - @Nullable String clientIdSuffixArg, @Nullable Properties properties) { + @Nullable String clientIdSuffixArg, @Nullable Properties properties) { overrides.set(properties); Consumer created = super.createKafkaConsumer(groupId, clientIdPrefix, @@ -591,18 +594,19 @@ public void testConcurrencyWithPartitions() { Consumer consumer = mock(Consumer.class); given(cf.createConsumer(anyString(), anyString(), anyString(), any())).willReturn(consumer); given(consumer.poll(any(Duration.class))) - .willAnswer(new Answer>() { + .willAnswer(new Answer>() { - @Override - public ConsumerRecords answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(100); - return null; - } + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(100); + return null; + } - }); + }); ContainerProperties containerProps = new ContainerProperties(topic1PartitionS); containerProps.setGroupId("grp"); - containerProps.setMessageListener((MessageListener) message -> { }); + containerProps.setMessageListener((MessageListener) message -> { + }); containerProps.setMissingTopicsFatal(false); ConcurrentMessageListenerContainer container = @@ -751,7 +755,7 @@ public boolean seeksAfterHandling() { } @Test - public void testAckOnErrorManualImmediate() throws Exception { + public void testAckOnErrorManualImmediate() throws Exception { //ackOnError should not affect manual commits testAckOnErrorWithManualImmediateGuts(topic10, true); testAckOnErrorWithManualImmediateGuts(topic11, false); @@ -821,16 +825,15 @@ public void testIsChildRunning() throws Exception { this.logger.info("Start isChildRunning"); Map props = KafkaTestUtils.consumerProps("test1", "true", embeddedKafka); - AtomicReference overrides = new AtomicReference<>(); - DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props) { + @Override protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, - String clientIdSuffixArg, Properties properties) { - overrides.set(properties); + String clientIdSuffixArg, Properties properties) { return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); } }; - ContainerProperties containerProps = new ContainerProperties(topic1); + ContainerProperties containerProps = new ContainerProperties(topic13); containerProps.setLogContainerConfig(true); containerProps.setClientId("client"); containerProps.setAckMode(ContainerProperties.AckMode.RECORD); @@ -901,7 +904,7 @@ protected Consumer createKafkaConsumer(String groupId, String c Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); - template.setDefaultTopic(topic1); + template.setDefaultTopic(topic13); template.sendDefault(0, 0, "foo"); template.sendDefault(1, 2, "bar"); template.sendDefault(0, 0, "baz"); @@ -972,9 +975,10 @@ public void testContainerStartStop() throws Exception { embeddedKafka); AtomicReference overrides = new AtomicReference<>(); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(props) { + @Override protected Consumer createKafkaConsumer(String groupId, String clientIdPrefix, - String clientIdSuffixArg, Properties properties) { + String clientIdSuffixArg, Properties properties) { overrides.set(properties); return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties); }