Skip to content

Commit

Permalink
GH-3557: Adjust the replication factor for transactions topic on @emb…
Browse files Browse the repository at this point in the history
…eddedkafka

Fixes: #3557 

#3557

 * Adjust the replication factor for the transaction state topic on `EmbeddedKafka` based on the broker count in 
  `EmbeddedKafka`.
* Keep the default replication factor of 3. 
* Adding tests to verify
  • Loading branch information
Dltmd202 authored Oct 30, 2024
1 parent 202a6e5 commit 3769c24
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,13 +47,16 @@
* @author Oleg Artyomov
* @author Sergio Lourenco
* @author Pawel Lozinski
* @author Seonghwan Lee
*
* @since 1.3
*/
class EmbeddedKafkaContextCustomizer implements ContextCustomizer {

private final EmbeddedKafka embeddedKafka;

private final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor";

EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) {
this.embeddedKafka = embeddedKafka;
}
Expand Down Expand Up @@ -121,6 +124,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
}
}

properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, String.valueOf(Math.min(3, embeddedKafka.count())));

embeddedKafkaBroker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
if (StringUtils.hasText(this.embeddedKafka.bootstrapServersProperty())) {
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.kafka.test.context;

import java.util.Map;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -25,13 +27,15 @@
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;


import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Oleg Artyomov
* @author Sergio Lourenco
* @author Artem Bilan
* @author Gary Russell
* @author Seonghwan Lee
*
* @since 1.3
*/
Expand Down Expand Up @@ -91,6 +95,21 @@ void testMulti() {
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
}

@Test
void testTransactionReplicationFactor() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = new GenericApplicationContext();
customizer.customizeContext(context, null);
context.refresh();

EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class);
Map<String, Object> properties = (Map<String, Object>) KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerProperties");

assertThat(properties.get("transaction.state.log.replication.factor")).isEqualTo("2");
}

@EmbeddedKafka(kraft = false)
private static final class TestWithEmbeddedKafka {

Expand All @@ -111,4 +130,9 @@ private static final class TestWithEmbeddedKafkaMulti {

}

@EmbeddedKafka(kraft = false, count = 2)
private static final class TestWithEmbeddedKafkaTransactionFactor {

}

}

0 comments on commit 3769c24

Please sign in to comment.