Skip to content

Commit

Permalink
[Java] Queue Push Consumers (legacy) (#263)
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Aug 12, 2024
1 parent 9228541 commit 9872a14
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 3 deletions.
4 changes: 1 addition & 3 deletions examples/jetstream/queue-push-consumer/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ func main() {
// as well as for publishing and subscription convenience methods.
js, _ := nc.JetStream()

// Declare a simple [limits-based stream][1] and populate the stream
// with a few messages.
// [1]: /examples/jetstream/limits-stream/go/
// Declare a simple stream.
streamName := "EVENTS"

js.AddStream(&nats.StreamConfig{
Expand Down
174 changes: 174 additions & 0 deletions examples/jetstream/queue-push-consumer/java/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package example;

import io.nats.client.*;
import io.nats.client.api.*;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;

public class Main {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}

// Initialize a connection to the server. The connection is AutoCloseable
// on exit.
try (Connection nc = Nats.connect(natsURL)) {

// Access `JetStream` and `JetStreamManagement` which provide methods to create
// streams and consumers as well as convenience methods for publishing
// to streams and consuming messages from the streams.
JetStream js = nc.jetStream();
JetStreamManagement jsm = nc.jetStreamManagement();

// Declare a simple stream.
String streamName = "EVENTS";
StreamConfiguration config = StreamConfiguration.builder()
.name(streamName)
.subjects("events.>")
.build();

StreamInfo stream = jsm.addStream(config);

// ### Durable (implicit)
// Like the standard [push consumer][1], the `JetStream` context provides
// a simple way to create a queue push consumer. The only additional
// argument is the "group name".
// [1]: /examples/jetstream/push-consumer/java/
System.out.println("# Durable (implicit)");
PushSubscribeOptions pso = ConsumerConfiguration.builder()
.durable("durable")
.deliverGroup("event-processor")
.buildPushSubscribeOptions();
JetStreamSubscription sub1 = js.subscribe("events.>", "event-processor", pso);

// If we inspect the consumer info, we will notice a property that
// was not defined for the non-queue push consumer. The `DeliverGroup`
// is the unique name of the group of subscribers. Internally, this
// corresponds to a core NATS queue group name which we will see
// below.
ConsumerInfo info = jsm.getConsumerInfo(streamName, "durable");
ConsumerConfiguration consumerConfig = info.getConsumerConfiguration();
System.out.printf("Deliver group: '%s'\n", consumerConfig.getDeliverGroup());

// Using the same helper method, we can create another subscription
// in the group. This method implicitly checks for whether the
// consumer has been created and binds to it based on the subject
// and group name.
JetStreamSubscription sub2 = js.subscribe("events.>", "event-processor", pso);

// As noted above, a queue push consumer relies on a core NATS
// queue group for distributing messages to active members. As
// a result, we can bind a subscription by using the `DeliverSubject`
// and the `DeliverGroup`
// Since messages are publishing to a dedicated subject and is
// part of a group, we can also create a core NATS subscription
// to join the group. As a reminder, the `DeliverSubject` is
// randomly generated by default, but this can be set explicitly
// in the consumer config if desired.
Subscription sub3 = nc.subscribe(consumerConfig.getDeliverSubject(), consumerConfig.getDeliverGroup());
System.out.printf("Deliver subject: '%s'\n", consumerConfig.getDeliverSubject());

// Now we can publish some messages to the stream to observe how
// they are distributed to the subscribers.
js.publish("events.1", null);
js.publish("events.2", null);
js.publish("events.3", null);

// As noted in the [push consumer][1] example, subscriptions
// enqueue messages proactively. When there are a group of
// subscriptions, each will receive a different subset of the messages.
// When calling `nextMessage` this means, messages can be processed out
// of order. There is no correlation with message order and
// subscription creation order 😉. In fact, it is possible that
// not all subscriptions will necessarily get a message.
// [1]: /examples/jetstream/push-consumer/java/
Message msg = sub1.nextMessage(Duration.ofSeconds(1));
if (msg != null) {
System.out.printf("sub1: received message '%s'\n", msg.getSubject());
msg.ack();
} else {
System.out.println("sub1: receive timeout");
}

msg = sub2.nextMessage(Duration.ofSeconds(1));
if (msg != null) {
System.out.printf("sub2: received message '%s'\n", msg.getSubject());
msg.ack();
} else {
System.out.println("sub2: receive timeout");
}

msg = sub3.nextMessage(Duration.ofSeconds(1));
if (msg != null) {
System.out.printf("sub3: received message '%s'\n", msg.getSubject());
msg.ack();
} else {
System.out.println("sub3: receive timeout");
}

// Since we created this consumer using the helper method, when we unsubscribe
// (or call `drain`), the consumer will be deleted.
sub1.unsubscribe();
sub2.unsubscribe();
sub3.unsubscribe();

// ### Durable (explicit)
// To create a (safe) durable consumer, use the `addOrUpdateConsumer`
// method. Although it may seem redundant, a durable name *and*
// the deliver group name must be defined. This is simply because
// the durable name is used for all consumer types, while the
// deliver group is exclusive to the queue push consumer. In this
// example, the same name is used as convention which is what the helper
// method above did as well.
System.out.println("\n# Durable (explicit)");

String consumerName = "event-processor";
ConsumerConfiguration cc = ConsumerConfiguration.builder()
.durable(consumerName)
.deliverSubject("my-subject")
.deliverGroup(consumerName)
.ackPolicy(AckPolicy.Explicit)
.build();
info = jsm.addOrUpdateConsumer(streamName, cc);

// Wait for all 6 messages to be received before exiting.
CountDownLatch latch = new CountDownLatch(6);

// For this part, we will use async core NATS queue subscriptions. Since
// core NATS subscriptions are JetStream-unaware, we must call `msg.ack`
// explicitly to notify the server that the message has been processed.
Dispatcher dispatcher = nc.createDispatcher();
dispatcher.subscribe("my-subject", "event-processor", m -> {
System.out.printf("sub1: received message '%s'\n", m.getSubject());
m.ack();
latch.countDown();
});
dispatcher.subscribe("my-subject", "event-processor", m -> {
System.out.printf("sub2: received message '%s'\n", m.getSubject());
m.ack();
latch.countDown();
});
dispatcher.subscribe("my-subject", "event-processor", m -> {
System.out.printf("sub3: received message '%s'\n", m.getSubject());
m.ack();
latch.countDown();
});

// Publish some more messages.
js.publish("events.4", null);
js.publish("events.5", null);
js.publish("events.6", null);
js.publish("events.7", null);
js.publish("events.8", null);
js.publish("events.9", null);

latch.await();
} catch (InterruptedException | IOException | JetStreamApiException e) {
e.printStackTrace();
}
}
}

1 comment on commit 9872a14

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for nats-by-example ready!

✅ Preview
https://nats-by-example-4x0lntdsa-connecteverything.vercel.app

Built with commit 9872a14.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.