Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #18

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@

import com.solace.messaging.MessagingService;

import io.quarkiverse.solace.i18n.SolaceLogging;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.quarkus.runtime.ShutdownEvent;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
Expand All @@ -50,8 +48,6 @@
@ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy")
@ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time")
@ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id")
// TODO implement consumer concurrency
//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
Expand Down Expand Up @@ -81,18 +77,6 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea
List<SolaceIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>();
List<SolaceOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>();

public void onStop(@Observes ShutdownEvent shutdownEvent) {
if (solace.isConnected()) {
SolaceLogging.log.info("Waiting incoming channel messages to be acknowledged");
incomingChannels.forEach(SolaceIncomingChannel::waitForUnAcknowledgedMessages);
SolaceLogging.log.info("All incoming channel messages are acknowledged");

SolaceLogging.log.info("Waiting for outgoing messages to be published");
outgoingChannels.forEach(SolaceOutgoingChannel::waitForPublishedMessages);
SolaceLogging.log.info("All outgoing messages are published");
}
}

public void terminate(
@Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) {
incomingChannels.forEach(SolaceIncomingChannel::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public Flow.Publisher<? extends Message<?>> getStream() {
public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
if (!unacknowledgedMessageTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
Expand All @@ -184,6 +185,7 @@ public void waitForUnAcknowledgedMessages() {
}

public void close() {
waitForUnAcknowledgedMessages();
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {

public void waitForPublishedMessages() {
try {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
if (!publishedMessagesTracker.awaitEmpty(this.waitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
Expand All @@ -190,6 +191,7 @@ public void waitForPublishedMessages() {
}

public void close() {
waitForPublishedMessages();
if (processor != null) {
processor.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;

import jakarta.enterprise.context.ApplicationScoped;

import org.awaitility.Durations;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -26,8 +31,12 @@

import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
Expand Down Expand Up @@ -220,6 +229,49 @@ void consumerPublishToErrorTopicPermissionException() {

@Test
@Order(7)
void consumerGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "in")
.with("consumer.queue.name", queue)
.with("consumer.queue.add-additional-subscriptions", true)
.with("consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("consumer.queue.subscriptions", topic);

// Initialize incoming channel to consumes messages
SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(),
new SolaceConnectorIncomingConfiguration(config), messagingService);

List<Object> list = new ArrayList<>();
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved

Flow.Publisher<? extends Message<?>> stream = solaceIncomingChannel.getStream();
Multi.createFrom().publisher(stream).subscribe().with(message -> {
list.add(message);
CompletableFuture.runAsync(message::ack);
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
});
await().until(() -> {
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
HealthReport.HealthReportBuilder builder = HealthReport.builder();
solaceIncomingChannel.isReady(builder);
return builder.build().isOk();
});

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
publisher.publish("4", tp);
publisher.publish("5", tp);

// Assert on consumed messages
await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() == 5);
solaceIncomingChannel.close();
}

@Test
@Order(8)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.enterprise.context.ApplicationScoped;

Expand All @@ -20,8 +21,11 @@

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.outgoing.SolaceOutboundMetadata;
import io.quarkiverse.solace.outgoing.SolaceOutgoingChannel;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mutiny.core.Vertx;

public class SolacePublisherTest extends WeldTestBase {

Expand Down Expand Up @@ -99,6 +103,37 @@ void publisherWithBackPressureReject() {
await().untilAsserted(() -> assertThat(app.getAcked().size()).isLessThan(5));
}

@Test
void publisherGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "out")
.with("producer.topic", topic);

List<String> expected = new CopyOnWriteArrayList<>();

// Start listening first
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withSubscriptions(TopicSubscription.of(topic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString()));
receiver.start();

SolaceOutgoingChannel solaceOutgoingChannel = new SolaceOutgoingChannel(Vertx.vertx(),
new SolaceConnectorOutgoingConfiguration(config), messagingService);
// Publish messages
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Flow.Subscriber<? super Message<Integer>>) solaceOutgoingChannel.getSubscriber());
await().until(() -> {
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
HealthReport.HealthReportBuilder builder = HealthReport.builder();
solaceOutgoingChannel.isReady(builder);
return builder.build().isOk();
});
// Assert on received messages
await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10));
solaceOutgoingChannel.close();
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
}

// @Test
// void publisherWithBackPressureRejectWaitForPublisherReadiness() {
// MapBasedConfig config = new MapBasedConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public MessagingService apply(SyntheticCreationalContext<MessagingService> conte
}

var tmp = service;
shutdown.addShutdownTask(() -> {
shutdown.addLastShutdownTask(() -> {
if (tmp.isConnected()) {
tmp.disconnect();
}
Expand Down