Skip to content

Commit

Permalink
Fixed #39 #36
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 29, 2024
1 parent a7e39f9 commit 9cb6456
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 52 deletions.
13 changes: 7 additions & 6 deletions runtime/src/main/resources/META-INF/quarkus-extension.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
name: Solace
description: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh.
metadata:
# keywords:
# - solace
# - pubsubplus event broker
keywords:
- solace
- pubsubplus event broker
- event driven integration
# guide: https://quarkiverse.github.io/quarkiverse-docs/solace/dev/ # To create and publish this guide, see https://github.com/quarkiverse/quarkiverse/wiki#documenting-your-extension
# categories:
# - "miscellaneous"
# status: "preview"
categories:
- "messaging"
status: "preview"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.solace.quarkus.samples;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

Expand All @@ -11,61 +11,56 @@
import com.solace.quarkus.messaging.outgoing.SolaceOutboundMetadata;

import io.quarkus.logging.Log;
import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class HelloConsumer {

/**
* Publish a simple message using TryMe in Solace broker and you should see the message published to topic
* Publishes message to topic hello/foobar which is subscribed by queue.foobar
*
* @see #consumeMessage(SolaceInboundMessage)
* @return
*/
@Outgoing("hello-out")
Multi<Message<String>> publishMessage() {
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId("1").createPubSubOutboundMetadata();
return Multi.createFrom().items("1").map(m -> Message.of(m, Metadata.of(outboundMetadata)));
}

/**
* Receives message from queue - queue.foobar
*
* @param p
*/
@Incoming("hello-in")
@Outgoing("hello-out")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
Message<?> consumeAndPublish(SolaceInboundMessage<?> p) {
Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.createPubSubOutboundMetadata();
Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
CompletableFuture completableFuture = new CompletableFuture();
p.ack();
completableFuture.complete(null);
return completableFuture;
}, (throwable) -> {
CompletableFuture completableFuture = new CompletableFuture();
p.nack(throwable, p.getMetadata());
completableFuture.complete(null);
return completableFuture;
});
return outboundMessage;
CompletionStage<Void> consumeMessage(SolaceInboundMessage<?> p) {
Log.infof("Received message: %s from topic: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8),
p.getMessage().getDestinationName());
return p.ack();
}

/**
* Publish a simple string from using TryMe in Solace broker and you should see the message published to dynamic destination
* topic
* Receives message from queue - queue.dynamic.topic and overwrites the topic configured in outgoing channel
* dynamic-destination-out
*
* See [resources/application.properties#mp.messaging.outgoing.dynamic-destination-out.producer.topic]
*
* @param p
*/
@Incoming("dynamic-destination-in")
@Outgoing("dynamic-destination-out")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
Log.infof("Received message: %s from topic: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8),
p.getMessage().getDestinationName());
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId("test")
.setDynamicDestination("solace/quarkus/producer/" + p.getMessage().getCorrelationId()) // make sure correlationID is available on incoming message
.setDynamicDestination("hello/foobar/" + p.getMessage().getApplicationMessageId())
.createPubSubOutboundMetadata();
Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
CompletableFuture completableFuture = new CompletableFuture();
p.ack();
completableFuture.complete(null);
return completableFuture;
}, (throwable) -> {
CompletableFuture completableFuture = new CompletableFuture();
p.nack(throwable, p.getMetadata());
completableFuture.complete(null);
return completableFuture;
});
Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata));
return outboundMessage;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
quarkus.solace.host=
quarkus.solace.vpn=
quarkus.solace.authentication.basic.username=
quarkus.solace.authentication.basic.password=
#quarkus.solace.host=
#quarkus.solace.vpn=
#quarkus.solace.authentication.basic.username=
#quarkus.solace.authentication.basic.password=

mp.messaging.outgoing.hello-out.connector=quarkus-solace
mp.messaging.outgoing.hello-out.producer.topic=
mp.messaging.outgoing.hello-out.producer.topic=hello/foobar

mp.messaging.incoming.hello-in.connector=quarkus-solace
mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true
mp.messaging.incoming.hello-in.consumer.queue.name=
mp.messaging.incoming.hello-in.consumer.queue.name=queue.foobar
mp.messaging.incoming.hello-in.consumer.queue.missing-resource-creation-strategy=create-on-start
mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive
#mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=error_topic
#mp.messaging.incoming.hello-in.consumer.queue.error.topic=
mp.messaging.incoming.hello-in.consumer.queue.add-additional-subscriptions=true
mp.messaging.incoming.hello-in.consumer.queue.subscriptions=hello/foobar

mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace
mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.dynamic.topic
mp.messaging.incoming.dynamic-destination-in.consumer.queue.missing-resource-creation-strategy=create-on-start
mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=ignore
mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error
mp.messaging.incoming.dynamic-destination-in.consumer.queue.add-additional-subscriptions=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.subscriptions=hello/foobar

mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace
mp.messaging.outgoing.dynamic-destination-out.producer.topic=
mp.messaging.outgoing.dynamic-destination-out.producer.topic=hello/foobar

0 comments on commit 9cb6456

Please sign in to comment.