Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added health status - started, ready, alive #26

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

Expand All @@ -22,16 +23,18 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final SolaceFailureHandler nackHandler;
private final T payload;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;
private final Consumer<Throwable> reportFailure;
private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) {
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker, Consumer<Throwable> reportFailure) {
this.msg = message;
this.unacknowledgedMessageTracker = unacknowledgedMessageTracker;
this.payload = (T) convertPayload();
this.ackHandler = ackHandler;
this.nackHandler = nackHandler;
this.metadata = captureContextMetadata(new SolaceInboundMetadata(message));
this.reportFailure = reportFailure;
}

public InboundMessage getMessage() {
Expand Down Expand Up @@ -88,6 +91,7 @@ public CompletionStage<Void> ack() {
@Override
public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
this.unacknowledgedMessageTracker.decrement();
this.reportFailure.accept(reason);
return nackHandler.handle(this, reason, nackMetadata);

// if (solaceErrorTopicPublisherHandler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final SolaceAckHandler ackHandler;
private final SolaceFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean alive = new AtomicBoolean(false);
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.solace = solace;
this.channel = ic.getChannel();
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.gracefulShutdown = ic.getClientGracefulShutdown();
Expand Down Expand Up @@ -108,15 +111,23 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler,
unacknowledgedMessageTracker))
.plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
unacknowledgedMessageTracker, this::reportFailure))
.plug(m -> lazyStart
? m.onSubscription()
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3);
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(() -> alive.set(false));
if (!lazyStart) {
receiver.start();
}
}

private void reportFailure(Throwable throwable) {
// should we send cause of failure in isAlive method?
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
alive.set(false);
}

private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
String strategy = ic.getConsumerQueueFailureStrategy();
SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy);
Expand Down Expand Up @@ -204,15 +215,15 @@ public void close() {
}

public void isStarted(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected());
}

public void isReady(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && receiver != null && receiver.isRunning());
}

public void isAlive(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && alive.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand All @@ -20,11 +21,14 @@ class SenderProcessor implements Processor<Message<?>, Message<?>>, Subscription
private final Function<Message<?>, Uni<Void>> send;
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference<>();
private final Consumer<Throwable> reportFailure;
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved

public SenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send) {
public SenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send,
Consumer<Throwable> reportFailure) {
this.inflights = inflights;
this.waitForCompletion = waitForCompletion;
this.send = send;
this.reportFailure = reportFailure;
}

@Override
Expand Down Expand Up @@ -101,6 +105,7 @@ public void onError(Throwable throwable) {
Subscriber<? super Message<?>> subscriber = downstream.getAndSet(null);
if (subscriber != null) {
subscriber.onError(throwable);
reportFailure.accept(throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand Down Expand Up @@ -37,12 +38,15 @@ public class SolaceOutgoingChannel
private final SenderProcessor processor;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private final AtomicBoolean alive = new AtomicBoolean(false);
private volatile boolean isPublisherReady = true;
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();

public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) {
this.solace = solace;
this.channel = oc.getChannel();
PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder();
switch (oc.getProducerBackPressureStrategy()) {
Expand All @@ -67,7 +71,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()));
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()), this::reportFailure);
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
if (!lazyStart) {
Expand All @@ -82,17 +86,26 @@ public void ready() {
});
}

private void reportFailure(Throwable throwable) {
// should we send cause of failure in isAlive method?
alive.set(false);
}

private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean waitForPublishReceipt) {

// TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true
return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt)
.onItem().transformToUni(receipt -> {
alive.set(true);
if (receipt != null) {
OutgoingMessageMetadata.setResultOnMessage(m, receipt);
}
return Uni.createFrom().completionStage(m.getAck());
})
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t)));
.onFailure().recoverWithUni(t -> {
alive.set(false);
return Uni.createFrom().completionStage(m.nack(t));
});

}

Expand Down Expand Up @@ -215,15 +228,15 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
}

public void isStarted(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected());
}

public void isReady(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && this.publisher != null && this.publisher.isReady());
}

public void isAlive(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && alive.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.quarkiverse.solace.health;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;

import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.resources.Topic;

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolaceConsumerHealthTest extends WeldTestBase {

@Test
void solaceConsumerHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);

await().until(() -> isStarted() && isReady());

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
publisher.publish("4", tp);
publisher.publish("5", tp);

await().until(() -> isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);

}

@Test
void solaceConsumerLivenessCheck() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

// Run app that consumes messages
MyErrorConsumer app = runApplication(config, MyErrorConsumer.class);

await().until(() -> isStarted() && isReady());

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);

await().until(() -> isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);

publisher.publish("3", tp);
await().until(() -> getHealth().getLiveness().isOk() == false);

publisher.publish("4", tp);
publisher.publish("5", tp);
await().until(() -> getHealth().getLiveness().isOk() == true);
}

@ApplicationScoped
static class MyConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
void in(InboundMessage msg) {
received.add(msg.getPayloadAsString());
}

public List<String> getReceived() {
return received;
}
}

@ApplicationScoped
static class MyErrorConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
CompletionStage<Void> in(SolaceInboundMessage<byte[]> msg) {
String payload = new String(msg.getPayload(), StandardCharsets.UTF_8);
if (payload.equals("3")) {
return msg.nack(new IllegalArgumentException("Nacking message with payload 3"));
}

return msg.ack();
}
}
}
Loading