Skip to content

Commit

Permalink
Addressing PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Sep 17, 2024
1 parent 795a913 commit b8e6102
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,9 @@
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<scope>optional</scope>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) throws Interrupt

StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
.setCorrelationId(42)
.build());
streamBridge.send("words2", MessageBuilder.withPayload("BAZQUX")
.setCorrelationId(43)
.build());
.setCorrelationId(43)
.build());

assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words"))
.isNotNull()
Expand Down Expand Up @@ -173,7 +173,7 @@ RecordMessageConverter fullRR() {

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
Consumer<?, ?> consumer, Type payloadType) {

return MessageBuilder.withPayload(record).build();
}
Expand All @@ -199,7 +199,7 @@ Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
@Bean
java.util.function.Consumer<Flux<String>> patternConsumer() {
return f -> f.doOnNext(s -> patternedDeliveries.add(s))
.subscribe();
.subscribe();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@
/**
* @author Artem Bilan
* @author Soby Chacko
* @since 4.1.1
* @since 4.2.0
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.kafka.consumer.metadata.max.age.ms=1000",
"spring.cloud.function.definition=receive",
"spring.cloud.stream.function.reactive.uppercase=true",
"spring.cloud.stream.bindings.receive-in-0.group=grp1",
"spring.cloud.stream.bindings.receive-in-0.destination=words1",
"spring.cloud.stream.bindings.receive-out-0.destination=foobar",
"spring.cloud.stream.bindings.receive-in-0.group=rkbot-in-group",
"spring.cloud.stream.bindings.receive-in-0.destination=rkbot-in-topic",
"spring.cloud.stream.bindings.receive-out-0.destination=rkbot-out-topic",
"spring.cloud.stream.kafka.binder.enable-observation=true",
"spring.cloud.stream.kafka.binder.brokers=${spring.kafka.bootstrap-servers}",
"management.tracing.sampling.probability=1",
"spring.cloud.stream.kafka.bindings.receive-in-0.consumer.converterBeanName=fullRR"
})
@DirtiesContext
@AutoConfigureObservability
@EmbeddedKafka(topics = { "foobar" })
@EmbeddedKafka(topics = { "rkbot-out-topic" })
public class ReactorKafkaBinderObservationTests {

private static final TestSpanHandler SPANS = new TestSpanHandler();
Expand All @@ -96,7 +96,7 @@ public class ReactorKafkaBinderObservationTests {
@Test
void endToEndReactorKafkaBinder1() {

streamBridge.send("words1", MessageBuilder.withPayload("data")
streamBridge.send("rkbot-in-topic", MessageBuilder.withPayload("data")
.build());

await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(SPANS.spans()).hasSize(3));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.kafka" level="WARN"/>
<logger name="reactor.kafka" level="DEBUG"/>
<logger name="org.springframework.integration.kafka" level="INFO"/>
<logger name="org.springframework.kafka" level="DEBUG"/>
<logger name="org.springframework.cloud.stream" level="INFO" />
<logger name="org.springframework.integration.channel" level="DEBUG" />
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.kafka" level="WARN"/>
<logger name="reactor.kafka" level="DEBUG"/>
<logger name="org.springframework.integration.kafka" level="INFO"/>
<logger name="org.springframework.kafka" level="DEBUG"/>
<logger name="org.springframework.cloud.stream" level="INFO" />
<logger name="org.springframework.integration.channel" level="DEBUG" />
<logger name="kafka.server.ReplicaFetcherThread" level="ERROR"/>
<logger name="kafka.server.LogDirFailureChannel" level="FATAL"/>
<logger name="kafka.server.BrokerMetadataCheckpoint" level="ERROR"/>
<logger name="kafka.utils.CoreUtils$" level="ERROR"/>
<root level="WARN">
<appender-ref ref="stdout"/>
</root>
<root level="WARN">
<appender-ref ref="stdout"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -1,70 +1,81 @@
[[reactive-kafka-binder-observability]]
= Observability in Reactive Kafka Binder

In this section, we will describe how Micrometer based observability is enabled in the reactive Kafka binder.
This section describes how Micrometer-based observability is enabled in the reactive Kafka binder.

There is built in support for observability when it comes to producer binding, but you need to opt-in for this by enabling the following property.
== Producer Binding

There is built-in support for observability in producer binding.
To enable it, set the following property:

```
spring.cloud.stream.kafka.binder.enable-observation
```

When this property is set to `true`, you can trace the publishing of records.
When this property is set to `true`, you can observe the publishing of records.
Both publishing records using `StreamBridge` and regular `Supplier<?>` beans can be observed.

== Consumer Binding

Both publishing records using `StreamBridge` and regular `Supplier<?>` beans can be now traced when enabling the above property.
Enabling observability on the consumer side is more complex than on the producer side.
There are two starting points for consumer binding:

However, on the consumer side, enabling observability is not as straightforward as on the producer side.
1. A topic where data is published via a producer binding
2. A topic where data is produced outside of Spring Cloud Stream

There are two starting points for consumer binding - one a topic where the data is published via a producer binding, another one where the data is produced via not Spring Cloud Stream.
In the first case, the application ideally wants to carry the observability headers down to the consumer inbound.
In the second case, if there was no upstream observation started, it will start a new observation.

Let's look at the following function.
=== Example: Function with Observability

```
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s
.flatMap(record -> {
Observation receiverObservation =
KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", "localhost:9092"),
observationRegistry);

return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec).setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView).build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});

return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);

return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
```

In this example, when we receive a record, we first create an observation.
If there is an upstream observation, then that will be part of the `KafkaRecordReceiverContext`.
After that, a `Mono` is created with context deferred, and when the `map` operation is invoked, the context has access to the correct observation.
Finally, the result of the `flatMap` operation will be sent back to the binding as `Flux<Message<?>`.
The outbound record will have the same observability headers from the input binding.
In this example:

1. When a record is received, an observation is created.
2. If there's an upstream observation, it will be part of the `KafkaRecordReceiverContext`.
3. A `Mono` is created with context deferred.
4. When the `map` operation is invoked, the context has access to the correct observation.
5. The result of the `flatMap` operation is sent back to the binding as `Flux<Message<?>>`.
6. The outbound record will have the same observability headers from the input binding.

If you have a `Consumer`, here is how you can do the same.
=== Example: Consumer with Observability

```
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() ->
new KafkaRecordReceiverContext(
record, "user.receiver", bootstrap),
observationRegistry)
.observe(() -> System.out.println(record)))
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
```

In this case, since there is no output binding, instead of using the `flatMap`, you can simply call the `doOnNext` operation on `Flux`.
The direct call to `observe` in this case will start the observation and properly shut it down when finished.
In this case:

1. Since there's no output binding, `doOnNext` is used on the `Flux` instead of `flatMap`.
2. The direct call to `observe` starts the observation and properly shuts it down when finished.

0 comments on commit b8e6102

Please sign in to comment.