Skip to content

Commit

Permalink
GH-2650: Observability enhancements in reactive Kafka binder
Browse files Browse the repository at this point in the history
Fixes #2650

* Enable native observability support for output binding in the reactive Kafka binder
* Adding test to verify this support with downstream consumers
* Adding ref docs
  • Loading branch information
sobychacko committed Sep 13, 2024
1 parent 4df2e76 commit 4ae53f3
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,43 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.opentelemetry</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.wavefront</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-otel</artifactId>
</exclusion>
<exclusion>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-reporter-wavefront</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -84,6 +85,7 @@
* @author Gary Russell
* @author Byungjun You
* @author Omer Celik
* @author Soby Chacko
* @since 4.0
*
*/
Expand Down Expand Up @@ -111,11 +113,14 @@ public class ReactorKafkaBinder

private final Map<String, MessageProducerSupport> messageProducers = new ConcurrentHashMap<>();

private final ObservationRegistry observationRegistry;

public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioner) {
KafkaTopicProvisioner provisioner, @Nullable ObservationRegistry observationRegistry) {

super(new String[0], provisioner, null, null);
this.configurationProperties = configurationProperties;
this.observationRegistry = observationRegistry;
}

public void setConsumerConfigCustomizer(ConsumerConfigCustomizer consumerConfigCustomizer) {
Expand Down Expand Up @@ -194,6 +199,9 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin

SenderOptions<Object, Object> opts = this.senderOptionsCustomizer.apply(producerProperties.getBindingName(),
SenderOptions.create(configs));
if (this.configurationProperties.isEnableObservation() && this.observationRegistry != null) {
opts = opts.withObservation(this.observationRegistry);
}
// TODO bean for converter; MCB doesn't use one on the producer side.
RecordMessageConverter converter = new MessagingMessageConverter();
AbstractApplicationContext applicationContext = getApplicationContext();
Expand Down Expand Up @@ -405,7 +413,7 @@ protected void handleMessageInternal(Message<?> message) {
@SuppressWarnings("unchecked")
SenderRecord<Object, Object, Object> sr = SenderRecord.create(
(ProducerRecord<Object, Object>) converter.fromMessage(message, topic), correlation);
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr));
Flux<SenderResult<Object>> result = sender.send(Flux.just(sr)).contextCapture();
result.subscribe(res -> {
if (this.results != null) {
this.results.send(MessageBuilder.withPayload(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.cloud.stream.binder.reactorkafka;

import io.micrometer.observation.ObservationRegistry;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
Expand All @@ -37,6 +39,7 @@
*
* @author Gary Russell
* @author Chris Bono
* @author Soby Chacko
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
Expand Down Expand Up @@ -73,13 +76,15 @@ KafkaTopicProvisioner provisioningProvider(

@Bean
ReactorKafkaBinder reactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider,
KafkaExtendedBindingProperties extendedBindingProperties,
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers) {
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider);
KafkaTopicProvisioner provisioningProvider,
KafkaExtendedBindingProperties extendedBindingProperties,
ObjectProvider<ConsumerConfigCustomizer> consumerConfigCustomizer,
ObjectProvider<ProducerConfigCustomizer> producerConfigCustomizer,
ObjectProvider<ReceiverOptionsCustomizer> receiverOptionsCustomizers,
ObjectProvider<SenderOptionsCustomizer> senderOptionsptionsCustomizers,
ObjectProvider<ObservationRegistry> observationRegistryObjectProvider) {
ReactorKafkaBinder reactorKafkaBinder = new ReactorKafkaBinder(configurationProperties, provisioningProvider,
observationRegistryObjectProvider.getIfUnique());
reactorKafkaBinder.setExtendedBindingProperties(extendedBindingProperties);
reactorKafkaBinder.setConsumerConfigCustomizer(consumerConfigCustomizer.getIfUnique());
reactorKafkaBinder.setProducerConfigCustomizer(producerConfigCustomizer.getIfUnique());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) throws Interrupt

StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
.setCorrelationId(42)
.build());
streamBridge.send("words2", MessageBuilder.withPayload("BAZQUX")
.setCorrelationId(43)
.build());
.setCorrelationId(43)
.build());

assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words"))
.isNotNull()
Expand Down Expand Up @@ -173,7 +173,7 @@ RecordMessageConverter fullRR() {

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
Consumer<?, ?> consumer, Type payloadType) {

return MessageBuilder.withPayload(record).build();
}
Expand All @@ -199,7 +199,7 @@ Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
@Bean
java.util.function.Consumer<Flux<String>> patternConsumer() {
return f -> f.doOnNext(s -> patternedDeliveries.add(s))
.subscribe();
.subscribe();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.reactorkafka;

import java.lang.reflect.Type;
import java.time.Duration;
import java.util.function.Function;
import java.util.stream.Collectors;

import brave.handler.SpanHandler;
import brave.test.TestSpanHandler;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
import io.micrometer.tracing.test.simple.SpansAssert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.observation.KafkaReceiverObservation;
import reactor.kafka.receiver.observation.KafkaRecordReceiverContext;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* @author Artem Bilan
* @author Soby Chacko
* @since 4.1.1
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.kafka.consumer.metadata.max.age.ms=1000",
"spring.cloud.function.definition=receive",
"spring.cloud.stream.function.reactive.uppercase=true",
"spring.cloud.stream.bindings.receive-in-0.group=grp1",
"spring.cloud.stream.bindings.receive-in-0.destination=words1",
"spring.cloud.stream.bindings.receive-out-0.destination=foobar",
"spring.cloud.stream.kafka.binder.enable-observation=true",
"spring.cloud.stream.kafka.binder.brokers=${spring.kafka.bootstrap-servers}",
"management.tracing.sampling.probability=1",
"spring.cloud.stream.kafka.bindings.receive-in-0.consumer.converterBeanName=fullRR"
})
@DirtiesContext
@AutoConfigureObservability
@EmbeddedKafka(topics = { "foobar" })
public class ReactorKafkaBinderObservationTests {

private static final TestSpanHandler SPANS = new TestSpanHandler();

@Autowired
StreamBridge streamBridge;

@Autowired
ObservationRegistry observationRegistry;

@Autowired
TestConfiguration testConfiguration;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Test
void endToEndReactorKafkaBinder1() {

streamBridge.send("words1", MessageBuilder.withPayload("data")
.build());

await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(3));
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
.haveSameTraceId();
}

@SpringBootConfiguration
@EnableAutoConfiguration(exclude = org.springframework.cloud.function.observability.ObservationAutoConfiguration.class)
public static class TestConfiguration {

@Bean
SpanHandler testSpanHandler() {
return SPANS;
}

@Bean
RecordMessageConverter fullRR() {
return new RecordMessageConverter() {

private final RecordMessageConverter converter = new MessagingMessageConverter();

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Type payloadType) {

return MessageBuilder.withPayload(record).build();
}

@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}

};
}

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s
.flatMap(record -> {
Observation receiverObservation =
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", "localhost:9092"),
observationRegistry);

return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec).setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView).build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void consumerBinding() throws Exception {
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
binder.setApplicationContext(mock(GenericApplicationContext.class));

CountDownLatch latch = new CountDownLatch(2);
Expand Down Expand Up @@ -148,7 +148,7 @@ void concurrency(String topic, String group, boolean atMostOnce) throws Exceptio
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
binder.setApplicationContext(mock(GenericApplicationContext.class));

CountDownLatch subscriptionLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -229,7 +229,7 @@ void autoCommit() throws Exception {
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
binder.setApplicationContext(mock(GenericApplicationContext.class));

CountDownLatch subscriptionLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -298,7 +298,7 @@ void producerBinding() throws InterruptedException {
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderProps, kafkaProperties, prop -> {
});
provisioner.setMetadataRetryOperations(new RetryTemplate());
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner, null);
CountDownLatch latch = new CountDownLatch(1);
GenericApplicationContext context = new GenericApplicationContext();
context.registerBean("sendResults", FluxMessageChannel.class);
Expand Down
Loading

0 comments on commit 4ae53f3

Please sign in to comment.