Skip to content

Commit

Permalink
Added health status - started, ready, alive
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 11, 2024
1 parent 33a4acf commit 0e1c144
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

Expand All @@ -22,16 +23,18 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final SolaceFailureHandler nackHandler;
private final T payload;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;
private final Consumer<Throwable> reportFailure;
private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) {
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker, Consumer<Throwable> reportFailure) {
this.msg = message;
this.unacknowledgedMessageTracker = unacknowledgedMessageTracker;
this.payload = (T) convertPayload();
this.ackHandler = ackHandler;
this.nackHandler = nackHandler;
this.metadata = captureContextMetadata(new SolaceInboundMetadata(message));
this.reportFailure = reportFailure;
}

public InboundMessage getMessage() {
Expand Down Expand Up @@ -88,6 +91,7 @@ public CompletionStage<Void> ack() {
@Override
public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
this.unacknowledgedMessageTracker.decrement();
this.reportFailure.accept(reason);
return nackHandler.handle(this, reason, nackMetadata);

// if (solaceErrorTopicPublisherHandler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final SolaceAckHandler ackHandler;
private final SolaceFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean alive = new AtomicBoolean(false);
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.solace = solace;
this.channel = ic.getChannel();
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.gracefulShutdown = ic.getClientGracefulShutdown();
Expand Down Expand Up @@ -108,15 +111,23 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler,
unacknowledgedMessageTracker))
.plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
unacknowledgedMessageTracker, this::reportFailure))
.plug(m -> lazyStart
? m.onSubscription()
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3);
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(() -> alive.set(false));
if (!lazyStart) {
receiver.start();
}
}

private void reportFailure(Throwable throwable) {
// should we send cause of failure in isAlive method?
alive.set(false);
}

private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
String strategy = ic.getConsumerQueueFailureStrategy();
SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy);
Expand Down Expand Up @@ -204,15 +215,15 @@ public void close() {
}

public void isStarted(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected());
}

public void isReady(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && receiver != null && receiver.isRunning());
}

public void isAlive(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && alive.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand All @@ -20,11 +21,14 @@ class SenderProcessor implements Processor<Message<?>, Message<?>>, Subscription
private final Function<Message<?>, Uni<Void>> send;
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference<>();
private final Consumer<Throwable> reportFailure;

public SenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send) {
public SenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send,
Consumer<Throwable> reportFailure) {
this.inflights = inflights;
this.waitForCompletion = waitForCompletion;
this.send = send;
this.reportFailure = reportFailure;
}

@Override
Expand Down Expand Up @@ -101,6 +105,7 @@ public void onError(Throwable throwable) {
Subscriber<? super Message<?>> subscriber = downstream.getAndSet(null);
if (subscriber != null) {
subscriber.onError(throwable);
reportFailure.accept(throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand Down Expand Up @@ -37,12 +38,15 @@ public class SolaceOutgoingChannel
private final SenderProcessor processor;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private final AtomicBoolean alive = new AtomicBoolean(false);
private volatile boolean isPublisherReady = true;
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();

public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) {
this.solace = solace;
this.channel = oc.getChannel();
PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder();
switch (oc.getProducerBackPressureStrategy()) {
Expand All @@ -67,7 +71,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()));
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()), this::reportFailure);
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
if (!lazyStart) {
Expand All @@ -82,17 +86,26 @@ public void ready() {
});
}

private void reportFailure(Throwable throwable) {
// should we send cause of failure in isAlive method?
alive.set(false);
}

private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean waitForPublishReceipt) {

// TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true
return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt)
.onItem().transformToUni(receipt -> {
alive.set(true);
if (receipt != null) {
OutgoingMessageMetadata.setResultOnMessage(m, receipt);
}
return Uni.createFrom().completionStage(m.getAck());
})
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t)));
.onFailure().recoverWithUni(t -> {
alive.set(false);
return Uni.createFrom().completionStage(m.nack(t));
});

}

Expand Down Expand Up @@ -215,15 +228,15 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
}

public void isStarted(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected());
}

public void isReady(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && this.publisher != null && this.publisher.isReady());
}

public void isAlive(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && alive.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.quarkiverse.solace.health;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;

import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.resources.Topic;

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolaceConsumerHealthTest extends WeldTestBase {

@Test
void solaceConsumerHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);

await().until(() -> isStarted() && isReady());

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
publisher.publish("4", tp);
publisher.publish("5", tp);

await().until(() -> isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);

}

@Test
void solaceConsumerLivenessCheck() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

// Run app that consumes messages
MyErrorConsumer app = runApplication(config, MyErrorConsumer.class);

await().until(() -> isStarted() && isReady());

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);

await().until(() -> isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);

publisher.publish("3", tp);
await().until(() -> getHealth().getLiveness().isOk() == false);

publisher.publish("4", tp);
publisher.publish("5", tp);
await().until(() -> getHealth().getLiveness().isOk() == true);
}

@ApplicationScoped
static class MyConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
void in(InboundMessage msg) {
received.add(msg.getPayloadAsString());
}

public List<String> getReceived() {
return received;
}
}

@ApplicationScoped
static class MyErrorConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
CompletionStage<Void> in(SolaceInboundMessage<byte[]> msg) {
String payload = new String(msg.getPayload(), StandardCharsets.UTF_8);
if (payload.equals("3")) {
return msg.nack(new IllegalArgumentException("Nacking message with payload 3"));
}

return msg.ack();
}
}
}
Loading

0 comments on commit 0e1c144

Please sign in to comment.