diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java index 8a20abd414..7a4d82a57a 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ * @author Oleg Artyomov * @author Sergio Lourenco * @author Pawel Lozinski + * @author Seonghwan Lee * * @since 1.3 */ @@ -54,6 +55,8 @@ class EmbeddedKafkaContextCustomizer implements ContextCustomizer { private final EmbeddedKafka embeddedKafka; + private final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor"; + EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) { this.embeddedKafka = embeddedKafka; } @@ -121,6 +124,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte } } + properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, String.valueOf(Math.min(3, embeddedKafka.count()))); + embeddedKafkaBroker.brokerProperties((Map) (Map) properties); if (StringUtils.hasText(this.embeddedKafka.bootstrapServersProperty())) { embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java index 6147490df6..20deb9cb8b 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java @@ -16,6 +16,8 @@ package org.springframework.kafka.test.context; +import java.util.Map; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -25,6 +27,7 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -32,6 +35,7 @@ * @author Sergio Lourenco * @author Artem Bilan * @author Gary Russell + * @author Seonghwan Lee * * @since 1.3 */ @@ -91,6 +95,21 @@ void testMulti() { .matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+"); } + @Test + void testTransactionReplicationFactor() { + EmbeddedKafka annotationWithPorts = + AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class); + EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts); + ConfigurableApplicationContext context = new GenericApplicationContext(); + customizer.customizeContext(context, null); + context.refresh(); + + EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class); + Map properties = (Map) KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerProperties"); + + assertThat(properties.get("transaction.state.log.replication.factor")).isEqualTo("2"); + } + @EmbeddedKafka(kraft = false) private static final class TestWithEmbeddedKafka { @@ -111,4 +130,9 @@ private static final class TestWithEmbeddedKafkaMulti { } + @EmbeddedKafka(kraft = false, count = 2) + private static final class TestWithEmbeddedKafkaTransactionFactor { + + } + }