Skip to content

Commit

Permalink
Fix ConcurrentMessageListenerContainerTests for race condition
Browse files Browse the repository at this point in the history
We cannot use the same `topic1` for different tests since
consumers from other tests may consume data for us
  • Loading branch information
artembilan committed Aug 21, 2024
1 parent e234529 commit 3336631
Showing 1 changed file with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@
* @author Soby Chacko
* @author Lokesh Alamuri
*/
@EmbeddedKafka(topics = { ConcurrentMessageListenerContainerTests.topic1,
@EmbeddedKafka(topics = {ConcurrentMessageListenerContainerTests.topic1,
ConcurrentMessageListenerContainerTests.topic2,
ConcurrentMessageListenerContainerTests.topic4, ConcurrentMessageListenerContainerTests.topic5,
ConcurrentMessageListenerContainerTests.topic6, ConcurrentMessageListenerContainerTests.topic7,
ConcurrentMessageListenerContainerTests.topic8, ConcurrentMessageListenerContainerTests.topic9,
ConcurrentMessageListenerContainerTests.topic10, ConcurrentMessageListenerContainerTests.topic11,
ConcurrentMessageListenerContainerTests.topic12 },
brokerProperties = "group.initial.rebalance.delay.ms:500")
ConcurrentMessageListenerContainerTests.topic12, ConcurrentMessageListenerContainerTests.topic13},
brokerProperties = "group.initial.rebalance.delay.ms:500")
public class ConcurrentMessageListenerContainerTests {

private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
Expand All @@ -116,6 +116,8 @@ public class ConcurrentMessageListenerContainerTests {

public static final String topic12 = "testTopic12";

public static final String topic13 = "testTopic13";

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
Expand Down Expand Up @@ -209,7 +211,8 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
Set<KafkaMessageListenerContainer<Integer, String>> children = new HashSet<>(containers);
assertThat(container.isInExpectedState()).isTrue();
MessageListenerContainer childContainer = container.getContainers().get(0);
container.getContainers().get(0).stopAbnormally(() -> { });
container.getContainers().get(0).stopAbnormally(() -> {
});
assertThat(container.isInExpectedState()).isFalse();
container.getContainers().get(0).start();
container.stop();
Expand Down Expand Up @@ -253,7 +256,7 @@ public void testAutoCommitWithRebalanceListener() throws Exception {

@Override
protected Consumer<Integer, String> createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
@Nullable String clientIdSuffixArg, @Nullable Properties properties) {
@Nullable String clientIdSuffixArg, @Nullable Properties properties) {

overrides.set(properties);
Consumer<Integer, String> created = super.createKafkaConsumer(groupId, clientIdPrefix,
Expand Down Expand Up @@ -591,18 +594,19 @@ public void testConcurrencyWithPartitions() {
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(anyString(), anyString(), anyString(), any())).willReturn(consumer);
given(consumer.poll(any(Duration.class)))
.willAnswer(new Answer<ConsumerRecords<Integer, String>>() {
.willAnswer(new Answer<ConsumerRecords<Integer, String>>() {

@Override
public ConsumerRecords<Integer, String> answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(100);
return null;
}
@Override
public ConsumerRecords<Integer, String> answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(100);
return null;
}

});
});
ContainerProperties containerProps = new ContainerProperties(topic1PartitionS);
containerProps.setGroupId("grp");
containerProps.setMessageListener((MessageListener<Integer, String>) message -> { });
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
});
containerProps.setMissingTopicsFatal(false);

ConcurrentMessageListenerContainer<Integer, String> container =
Expand Down Expand Up @@ -751,7 +755,7 @@ public boolean seeksAfterHandling() {
}

@Test
public void testAckOnErrorManualImmediate() throws Exception {
public void testAckOnErrorManualImmediate() throws Exception {
//ackOnError should not affect manual commits
testAckOnErrorWithManualImmediateGuts(topic10, true);
testAckOnErrorWithManualImmediateGuts(topic11, false);
Expand Down Expand Up @@ -821,16 +825,15 @@ public void testIsChildRunning() throws Exception {
this.logger.info("Start isChildRunning");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
embeddedKafka);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props) {

@Override
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {
overrides.set(properties);
String clientIdSuffixArg, Properties properties) {
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}
};
ContainerProperties containerProps = new ContainerProperties(topic1);
ContainerProperties containerProps = new ContainerProperties(topic13);
containerProps.setLogContainerConfig(true);
containerProps.setClientId("client");
containerProps.setAckMode(ContainerProperties.AckMode.RECORD);
Expand Down Expand Up @@ -901,7 +904,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.setDefaultTopic(topic13);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 2, "bar");
template.sendDefault(0, 0, "baz");
Expand Down Expand Up @@ -972,9 +975,10 @@ public void testContainerStartStop() throws Exception {
embeddedKafka);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {

@Override
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {
String clientIdSuffixArg, Properties properties) {
overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}
Expand Down

0 comments on commit 3336631

Please sign in to comment.