From 3769c240bbfe57b24a167da08585b35955e9d9c7 Mon Sep 17 00:00:00 2001 From: dltmd202 <75921696+Dltmd202@users.noreply.github.com> Date: Thu, 31 Oct 2024 05:31:39 +0900 Subject: [PATCH] GH-3557: Adjust the replication factor for transactions topic on @EmbeddedKafka Fixes: #3557 https://github.com/spring-projects/spring-kafka/issues/3557 * Adjust the replication factor for the transaction state topic on `EmbeddedKafka` based on the broker count in `EmbeddedKafka`. * Keep the default replication factor of 3. * Adding tests to verify --- .../EmbeddedKafkaContextCustomizer.java | 7 +++++- .../EmbeddedKafkaContextCustomizerTests.java | 24 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java index 8a20abd414..7a4d82a57a 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ * @author Oleg Artyomov * @author Sergio Lourenco * @author Pawel Lozinski + * @author Seonghwan Lee * * @since 1.3 */ @@ -54,6 +55,8 @@ class EmbeddedKafkaContextCustomizer implements ContextCustomizer { private final EmbeddedKafka embeddedKafka; + private final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor"; + EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) { this.embeddedKafka = embeddedKafka; } @@ -121,6 +124,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte } } + properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, String.valueOf(Math.min(3, embeddedKafka.count()))); + embeddedKafkaBroker.brokerProperties((Map) (Map) properties); if (StringUtils.hasText(this.embeddedKafka.bootstrapServersProperty())) { embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java index 6147490df6..20deb9cb8b 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java @@ -16,6 +16,8 @@ package org.springframework.kafka.test.context; +import java.util.Map; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -25,6 +27,7 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.utils.KafkaTestUtils; + import static org.assertj.core.api.Assertions.assertThat; /** @@ -32,6 +35,7 @@ * @author Sergio Lourenco * @author Artem Bilan * @author Gary Russell + * @author Seonghwan Lee * * @since 1.3 */ @@ -91,6 +95,21 @@ void testMulti() { .matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+"); } + @Test + void testTransactionReplicationFactor() { + EmbeddedKafka annotationWithPorts = + AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class); + EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts); + ConfigurableApplicationContext context = new GenericApplicationContext(); + customizer.customizeContext(context, null); + context.refresh(); + + EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class); + Map properties = (Map) KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerProperties"); + + assertThat(properties.get("transaction.state.log.replication.factor")).isEqualTo("2"); + } + @EmbeddedKafka(kraft = false) private static final class TestWithEmbeddedKafka { @@ -111,4 +130,9 @@ private static final class TestWithEmbeddedKafkaMulti { } + @EmbeddedKafka(kraft = false, count = 2) + private static final class TestWithEmbeddedKafkaTransactionFactor { + + } + }