From 9cb64563e9b223860feb1f213eea48056610cbec Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 29 Jan 2024 16:25:53 +0530 Subject: [PATCH 1/5] Fixed #39 #36 --- .../resources/META-INF/quarkus-extension.yaml | 13 ++-- .../solace/quarkus/samples/HelloConsumer.java | 63 +++++++++---------- .../src/main/resources/application.properties | 26 ++++---- 3 files changed, 50 insertions(+), 52 deletions(-) diff --git a/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/runtime/src/main/resources/META-INF/quarkus-extension.yaml index 61b8e95..5ad8a22 100644 --- a/runtime/src/main/resources/META-INF/quarkus-extension.yaml +++ b/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -1,10 +1,11 @@ name: Solace description: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh. metadata: -# keywords: -# - solace -# - pubsubplus event broker + keywords: + - solace + - pubsubplus event broker + - event driven integration # guide: https://quarkiverse.github.io/quarkiverse-docs/solace/dev/ # To create and publish this guide, see https://github.com/quarkiverse/quarkiverse/wiki#documenting-your-extension -# categories: -# - "miscellaneous" -# status: "preview" + categories: + - "messaging" + status: "preview" diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java index 31fc3c9..8978223 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java @@ -1,7 +1,7 @@ package com.solace.quarkus.samples; import java.nio.charset.StandardCharsets; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import jakarta.enterprise.context.ApplicationScoped; @@ -11,38 +11,42 @@ import com.solace.quarkus.messaging.outgoing.SolaceOutboundMetadata; import io.quarkus.logging.Log; +import io.smallrye.mutiny.Multi; @ApplicationScoped public class HelloConsumer { + /** - * Publish a simple message using TryMe in Solace broker and you should see the message published to topic + * Publishes message to topic hello/foobar which is subscribed by queue.foobar + * + * @see #consumeMessage(SolaceInboundMessage) + * @return + */ + @Outgoing("hello-out") + Multi> publishMessage() { + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId("1").createPubSubOutboundMetadata(); + return Multi.createFrom().items("1").map(m -> Message.of(m, Metadata.of(outboundMetadata))); + } + + /** + * Receives message from queue - queue.foobar * * @param p */ @Incoming("hello-in") - @Outgoing("hello-out") @Acknowledgment(Acknowledgment.Strategy.MANUAL) - Message consumeAndPublish(SolaceInboundMessage p) { - Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); - SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - .createPubSubOutboundMetadata(); - Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { - CompletableFuture completableFuture = new CompletableFuture(); - p.ack(); - completableFuture.complete(null); - return completableFuture; - }, (throwable) -> { - CompletableFuture completableFuture = new CompletableFuture(); - p.nack(throwable, p.getMetadata()); - completableFuture.complete(null); - return completableFuture; - }); - return outboundMessage; + CompletionStage consumeMessage(SolaceInboundMessage p) { + Log.infof("Received message: %s from topic: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8), + p.getMessage().getDestinationName()); + return p.ack(); } /** - * Publish a simple string from using TryMe in Solace broker and you should see the message published to dynamic destination - * topic + * Receives message from queue - queue.dynamic.topic and overwrites the topic configured in outgoing channel + * dynamic-destination-out + * + * See [resources/application.properties#mp.messaging.outgoing.dynamic-destination-out.producer.topic] * * @param p */ @@ -50,22 +54,13 @@ Message consumeAndPublish(SolaceInboundMessage p) { @Outgoing("dynamic-destination-out") @Acknowledgment(Acknowledgment.Strategy.MANUAL) Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { - Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); + Log.infof("Received message: %s from topic: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8), + p.getMessage().getDestinationName()); SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() .setApplicationMessageId("test") - .setDynamicDestination("solace/quarkus/producer/" + p.getMessage().getCorrelationId()) // make sure correlationID is available on incoming message + .setDynamicDestination("hello/foobar/" + p.getMessage().getApplicationMessageId()) .createPubSubOutboundMetadata(); - Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { - CompletableFuture completableFuture = new CompletableFuture(); - p.ack(); - completableFuture.complete(null); - return completableFuture; - }, (throwable) -> { - CompletableFuture completableFuture = new CompletableFuture(); - p.nack(throwable, p.getMetadata()); - completableFuture.complete(null); - return completableFuture; - }); + Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata)); return outboundMessage; } diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index 72cebfd..c6bc966 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -1,24 +1,26 @@ -quarkus.solace.host= -quarkus.solace.vpn= -quarkus.solace.authentication.basic.username= -quarkus.solace.authentication.basic.password= +#quarkus.solace.host= +#quarkus.solace.vpn= +#quarkus.solace.authentication.basic.username= +#quarkus.solace.authentication.basic.password= mp.messaging.outgoing.hello-out.connector=quarkus-solace -mp.messaging.outgoing.hello-out.producer.topic= +mp.messaging.outgoing.hello-out.producer.topic=hello/foobar mp.messaging.incoming.hello-in.connector=quarkus-solace mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true -mp.messaging.incoming.hello-in.consumer.queue.name= +mp.messaging.incoming.hello-in.consumer.queue.name=queue.foobar +mp.messaging.incoming.hello-in.consumer.queue.missing-resource-creation-strategy=create-on-start mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive -#mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=error_topic -#mp.messaging.incoming.hello-in.consumer.queue.error.topic= +mp.messaging.incoming.hello-in.consumer.queue.add-additional-subscriptions=true +mp.messaging.incoming.hello-in.consumer.queue.subscriptions=hello/foobar mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true -mp.messaging.incoming.dynamic-destination-in.consumer.queue.name= +mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.dynamic.topic +mp.messaging.incoming.dynamic-destination-in.consumer.queue.missing-resource-creation-strategy=create-on-start mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive -mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=ignore -mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error +mp.messaging.incoming.dynamic-destination-in.consumer.queue.add-additional-subscriptions=true +mp.messaging.incoming.dynamic-destination-in.consumer.queue.subscriptions=hello/foobar mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace -mp.messaging.outgoing.dynamic-destination-out.producer.topic= +mp.messaging.outgoing.dynamic-destination-out.producer.topic=hello/foobar From 88555babd65cbcbd18e7554989258268f9a502e3 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 29 Jan 2024 16:54:56 +0530 Subject: [PATCH 2/5] Updated keyword in quarkus-extension configuration --- runtime/src/main/resources/META-INF/quarkus-extension.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/runtime/src/main/resources/META-INF/quarkus-extension.yaml index 5ad8a22..a08c086 100644 --- a/runtime/src/main/resources/META-INF/quarkus-extension.yaml +++ b/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -5,6 +5,7 @@ metadata: - solace - pubsubplus event broker - event driven integration + - reactive-solace # guide: https://quarkiverse.github.io/quarkiverse-docs/solace/dev/ # To create and publish this guide, see https://github.com/quarkiverse/quarkiverse/wiki#documenting-your-extension categories: - "messaging" From fc36d567498b4e1fdab0b85e35fff6f41438058a Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 29 Jan 2024 20:35:14 +0530 Subject: [PATCH 3/5] Addressed comments --- .../main/java/com/solace/quarkus/samples/HelloConsumer.java | 4 +--- .../java/com/solace/quarkus/samples/PublisherResource.java | 2 +- .../src/main/resources/application.properties | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java index 8978223..745affc 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java @@ -52,7 +52,6 @@ CompletionStage consumeMessage(SolaceInboundMessage p) { */ @Incoming("dynamic-destination-in") @Outgoing("dynamic-destination-out") - @Acknowledgment(Acknowledgment.Strategy.MANUAL) Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { Log.infof("Received message: %s from topic: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8), p.getMessage().getDestinationName()); @@ -60,8 +59,7 @@ Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { .setApplicationMessageId("test") .setDynamicDestination("hello/foobar/" + p.getMessage().getApplicationMessageId()) .createPubSubOutboundMetadata(); - Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata)); - return outboundMessage; + return p.addMetadata(outboundMetadata).withAck(() -> p.ack()); } } diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java index 3b9ebc8..601ce63 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/PublisherResource.java @@ -15,7 +15,7 @@ @Path("/hello") public class PublisherResource { - @Channel("hello") + @Channel("hello-out") MutinyEmitter foobar; /** diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index c6bc966..c0d8f33 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -5,6 +5,7 @@ mp.messaging.outgoing.hello-out.connector=quarkus-solace mp.messaging.outgoing.hello-out.producer.topic=hello/foobar +mp.messaging.outgoing.hello-out.merge=true mp.messaging.incoming.hello-in.connector=quarkus-solace mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true @@ -20,7 +21,7 @@ mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.dynamic.t mp.messaging.incoming.dynamic-destination-in.consumer.queue.missing-resource-creation-strategy=create-on-start mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive mp.messaging.incoming.dynamic-destination-in.consumer.queue.add-additional-subscriptions=true -mp.messaging.incoming.dynamic-destination-in.consumer.queue.subscriptions=hello/foobar +mp.messaging.incoming.dynamic-destination-in.consumer.queue.subscriptions=test/topic/> mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace -mp.messaging.outgoing.dynamic-destination-out.producer.topic=hello/foobar +mp.messaging.outgoing.dynamic-destination-out.producer.topic=test/dynamic/topic From 537cd3ba0692f4ccb817cb2113ebb1c43ea8b142 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 29 Jan 2024 21:15:30 +0530 Subject: [PATCH 4/5] Added getAck and getNack methods in SolaceIncomingChannel --- .../messaging/incoming/SolaceInboundMessage.java | 12 ++++++++++++ .../com/solace/quarkus/samples/HelloConsumer.java | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java index 646ce2c..1bb277e 100644 --- a/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java +++ b/pubsub-plus-connector/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java @@ -4,6 +4,8 @@ import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; import org.eclipse.microprofile.reactive.messaging.Metadata; @@ -82,6 +84,16 @@ public Metadata getMetadata() { return metadata; } + @Override + public Supplier> getAck() { + return this::ack; + } + + @Override + public Function> getNack() { + return this::nack; + } + @Override public CompletionStage ack() { this.unacknowledgedMessageTracker.decrement(); diff --git a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java index 745affc..a6b7675 100644 --- a/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/com/solace/quarkus/samples/HelloConsumer.java @@ -59,7 +59,7 @@ Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { .setApplicationMessageId("test") .setDynamicDestination("hello/foobar/" + p.getMessage().getApplicationMessageId()) .createPubSubOutboundMetadata(); - return p.addMetadata(outboundMetadata).withAck(() -> p.ack()); + return p.addMetadata(outboundMetadata); } } From 1b1e28cf664c9bbedd040bbcde0b83d225d377bc Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 29 Jan 2024 21:46:47 +0530 Subject: [PATCH 5/5] removed withAck callback --- .../quarkus/messaging/locals/LocalPropagationTest.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java b/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java index 798e5bc..b1a6d96 100644 --- a/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java +++ b/pubsub-plus-connector/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java @@ -7,7 +7,6 @@ import java.util.Random; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -196,14 +195,7 @@ public Message process(Message input) { Vertx.currentContext().putLocal("uuid", value); Vertx.currentContext().putLocal("input", payload); - return input.withPayload(payload + 1) - .withAck(() -> { - CompletableFuture cf = new CompletableFuture<>(); - executor.execute(() -> { - cf.complete(null); - }); - return cf; - }); + return input.withPayload(payload + 1); } @Incoming("process")