Skip to content

Commit

Permalink
Enhance support for Partitioned Topics about Server-side filters and …
Browse files Browse the repository at this point in the history
…QueueBrowser (#67)
  • Loading branch information
eolivelli authored Sep 12, 2022
1 parent 0f7b218 commit 3792bb6
Show file tree
Hide file tree
Showing 6 changed files with 580 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import java.util.Enumeration;
import java.util.List;
import java.util.NoSuchElementException;

public final class CompositeEnumeration implements Enumeration {
private final List<? extends Enumeration> enumerations;
private int currentEnumeration = 0;
private Enumeration current;

public CompositeEnumeration(List<? extends Enumeration> enumerations) {
this.enumerations = enumerations;
if (enumerations.isEmpty()) {
current = null;
currentEnumeration = -1;
} else {
startEnumeration(0);
}
}

private void startEnumeration(int n) {
if (n == enumerations.size()) {
currentEnumeration = -1;
current = null;
} else {
currentEnumeration = n;
this.current = enumerations.get(n);
skipEmpty();
}
}

private void skipEmpty() {
while (!current.hasMoreElements()) {
currentEnumeration++;
if (currentEnumeration == enumerations.size()) {
currentEnumeration = -1;
current = null;
break;
}
this.current = enumerations.get(currentEnumeration);
}
}

@Override
public synchronized boolean hasMoreElements() {
return currentEnumeration >= 0;
}

@Override
public synchronized Object nextElement() {
if (current == null) {
throw new NoSuchElementException();
}
Object next = current.nextElement();
if (!current.hasMoreElements()) {
startEnumeration(currentEnumeration + 1);
}
return next;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;

@Slf4j
Expand Down Expand Up @@ -1115,35 +1117,11 @@ public Consumer<?> createConsumer(
}

try {
if (isUseServerSideFiltering() && subscriptionMode == SubscriptionMode.Durable) {
try {
Map<String, ? extends SubscriptionStats> subscriptions =
pulsarAdmin.topics().getStats(fullQualifiedTopicName).getSubscriptions();
SubscriptionStats subscriptionStats = subscriptions.get(subscriptionName);
if (subscriptionStats != null) {
Map<String, String> subscriptionPropertiesFromBroker =
subscriptionStats.getSubscriptionProperties();
log.info("subscriptionPropertiesFromBroker {}", subscriptionPropertiesFromBroker);
if (subscriptionPropertiesFromBroker != null) {
boolean filtering =
"true".equals(subscriptionPropertiesFromBroker.get("jms.filtering"));
if (filtering) {
String selectorOnSubscription =
subscriptionPropertiesFromBroker.getOrDefault("jms.selector", "");
if (!selectorOnSubscription.isEmpty()) {
log.info(
"Detected selector {} on Subscription {} on topic {}",
selectorOnSubscription,
subscriptionName,
fullQualifiedTopicName);
selectorOnSubscriptionReceiver.set(selectorOnSubscription);
}
}
}
}
} catch (PulsarAdminException.NotFoundException notFoundException) {
}
}
downloadServerSideFilter(
fullQualifiedTopicName,
subscriptionName,
subscriptionMode,
selectorOnSubscriptionReceiver);

ConsumerConfiguration consumerConfiguration =
getConsumerConfiguration(overrideConsumerConfiguration);
Expand Down Expand Up @@ -1184,15 +1162,106 @@ public Consumer<?> createConsumer(
}
}

public Reader<?> createReaderForBrowser(
PulsarQueue destination, ConsumerConfiguration overrideConsumerConfiguration)
private void downloadServerSideFilter(
String fullQualifiedTopicName,
String subscriptionName,
SubscriptionMode subscriptionMode,
AtomicReference<String> selectorOnSubscriptionReceiver)
throws PulsarAdminException {
if (isUseServerSideFiltering() && subscriptionMode == SubscriptionMode.Durable) {
try {
Map<String, ? extends SubscriptionStats> subscriptions = null;
try {
PartitionedTopicStats partitionedStats =
pulsarAdmin.topics().getPartitionedStats(fullQualifiedTopicName, false);
subscriptions = partitionedStats.getSubscriptions();
} catch (PulsarAdminException.NotFoundException notFoundOrNonPartitioned) {
}
if (subscriptions == null) {
subscriptions = pulsarAdmin.topics().getStats(fullQualifiedTopicName).getSubscriptions();
}
SubscriptionStats subscriptionStats = subscriptions.get(subscriptionName);
if (subscriptionStats != null) {
Map<String, String> subscriptionPropertiesFromBroker =
subscriptionStats.getSubscriptionProperties();
if (subscriptionPropertiesFromBroker != null) {
log.debug("subscriptionPropertiesFromBroker {}", subscriptionPropertiesFromBroker);
boolean filtering =
"true".equals(subscriptionPropertiesFromBroker.get("jms.filtering"));
if (filtering) {
String selectorOnSubscription =
subscriptionPropertiesFromBroker.getOrDefault("jms.selector", "");
if (!selectorOnSubscription.isEmpty()) {
log.info(
"Detected selector {} on Subscription {} on topic {}",
selectorOnSubscription,
subscriptionName,
fullQualifiedTopicName);
selectorOnSubscriptionReceiver.set(selectorOnSubscription);
}
}
}
}
} catch (PulsarAdminException.NotFoundException notFoundException) {
log.info("Topic not found, cannot download server-side filters {}", fullQualifiedTopicName);
}
}
}

public List<Reader<?>> createReadersForBrowser(
PulsarQueue destination,
ConsumerConfiguration overrideConsumerConfiguration,
AtomicReference<String> selectorOnSubscriptionReceiver)
throws JMSException {

String fullQualifiedTopicName = getPulsarTopicName(destination);
String queueSubscriptionName = getQueueSubscriptionName(destination);
try {
downloadServerSideFilter(
fullQualifiedTopicName,
queueSubscriptionName,
SubscriptionMode.Durable,
selectorOnSubscriptionReceiver);
} catch (PulsarAdminException err) {
throw Utils.handleException(err);
}

try {
PartitionedTopicMetadata partitionedTopicMetadata =
getPulsarAdmin().topics().getPartitionedTopicMetadata(fullQualifiedTopicName);
List<Reader<?>> readers = new ArrayList<>();
if (partitionedTopicMetadata.partitions == 0) {
Reader<?> readerForBrowserForNonPartitionedTopic =
createReaderForBrowserForNonPartitionedTopic(
queueSubscriptionName, fullQualifiedTopicName, overrideConsumerConfiguration);
readers.add(readerForBrowserForNonPartitionedTopic);
} else {
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
String partitionName = fullQualifiedTopicName + "-partition-" + i;
Reader<?> readerForBrowserForNonPartitionedTopic =
createReaderForBrowserForNonPartitionedTopic(
queueSubscriptionName, partitionName, overrideConsumerConfiguration);
readers.add(readerForBrowserForNonPartitionedTopic);
}
}
return readers;
} catch (PulsarAdminException.NotFoundException err) {
return Collections.emptyList();
} catch (PulsarAdminException err) {
throw Utils.handleException(err);
}
}

private Reader<?> createReaderForBrowserForNonPartitionedTopic(
String queueSubscriptionName,
String fullQualifiedTopicName,
ConsumerConfiguration overrideConsumerConfiguration)
throws JMSException {
try {

// peekMessages works only for non-partitioned topics
List<Message<byte[]>> messages =
getPulsarAdmin()
.topics()
.peekMessages(fullQualifiedTopicName, getQueueSubscriptionName(destination), 1);
getPulsarAdmin().topics().peekMessages(fullQualifiedTopicName, queueSubscriptionName, 1);

MessageId seekMessageId;
if (messages.isEmpty()) {
Expand All @@ -1204,17 +1273,21 @@ public Reader<?> createReaderForBrowser(
if (log.isDebugEnabled()) {
log.debug("createBrowser {} at {}", fullQualifiedTopicName, seekMessageId);
}

ConsumerConfiguration consumerConfiguration =
getConsumerConfiguration(overrideConsumerConfiguration);
Schema<?> schema = consumerConfiguration.getConsumerSchema();
if (schema == null) {
schema = Schema.BYTES;
}
Map<String, Object> readerConfiguration =
Utils.deepCopyMap(consumerConfiguration.getConsumerConfiguration());
readerConfiguration.remove("batchIndexAckEnabled");
ReaderBuilder<?> builder =
pulsarClient
.newReader(schema)
// these properties can be overridden by the configuration
.loadConf(consumerConfiguration.getConsumerConfiguration())
.loadConf(readerConfiguration)
// these properties cannot be overwritten by the configuration
.readerName("jms-queue-browser-" + UUID.randomUUID())
.startMessageId(seekMessageId)
Expand Down
Loading

0 comments on commit 3792bb6

Please sign in to comment.