Skip to content

Commit

Permalink
[pulsar] Added getTopics() implementation for Pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
suvodeep-pyne committed Jan 22, 2025
1 parent 14b86f0 commit 2e91eb3
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
public class PulsarConfig {
public static final String STREAM_TYPE = "pulsar";
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
public static final String SERVICE_HTTP_URL = "serviceHttpUrl";
public static final String AUTHENTICATION_TOKEN = "authenticationToken";
public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
public static final String OAUTH_ISSUER_URL = "issuerUrl";
Expand All @@ -53,6 +54,7 @@ public class PulsarConfig {
private final String _subscriberId;
private final String _pulsarTopicName;
private final String _bootstrapServers;
private final String _serviceHttpUrl;
private final SubscriptionInitialPosition _subscriptionInitialPosition;
private final String _authenticationToken;
private final String _tlsTrustCertsFilePath;
Expand All @@ -79,6 +81,7 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
_bootstrapServers = getConfigValue(streamConfigMap, BOOTSTRAP_SERVERS);
Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config");

_serviceHttpUrl = getConfigValue(streamConfigMap, SERVICE_HTTP_URL);
_subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(streamConfig.getOffsetCriteria());
_authenticationToken = getConfigValue(streamConfigMap, AUTHENTICATION_TOKEN);
_tlsTrustCertsFilePath = getConfigValue(streamConfigMap, TLS_TRUST_CERTS_FILE_PATH);
Expand Down Expand Up @@ -149,6 +152,10 @@ public String getBootstrapServers() {
return _bootstrapServers;
}

public String getServiceHttpUrl() {
return _serviceHttpUrl;
}

public SubscriptionInitialPosition getInitialSubscriberPosition() {
return _subscriptionInitialPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -33,6 +34,8 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;

import static com.google.common.base.Preconditions.checkArgument;


/**
* Manages the Pulsar client connection, given the partition id and {@link PulsarConfig}
Expand All @@ -41,7 +44,6 @@ public class PulsarPartitionLevelConnectionHandler implements Closeable {
protected final PulsarConfig _config;
protected final String _clientId;
protected final PulsarClient _pulsarClient;
protected final PulsarAdmin _pulsarAdmin;

/**
* Creates a new instance of {@link PulsarClient} and {@link Reader}
Expand All @@ -50,7 +52,6 @@ protected PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig st
_config = new PulsarConfig(streamConfig, clientId);
_clientId = clientId;
_pulsarClient = createPulsarClient();
_pulsarAdmin = createPulsarAdmin();
}

private PulsarClient createPulsarClient() {
Expand All @@ -59,20 +60,23 @@ private PulsarClient createPulsarClient() {
Optional.ofNullable(_config.getTlsTrustCertsFilePath())
.filter(StringUtils::isNotBlank)
.ifPresent(clientBuilder::tlsTrustCertsFilePath);
clientBuilder.authentication(authenticationConfig());
Optional.ofNullable(authenticationConfig()).ifPresent(clientBuilder::authentication);
return clientBuilder.build();
} catch (Exception e) {
throw new RuntimeException("Caught exception while creating Pulsar client", e);
}
}

private PulsarAdmin createPulsarAdmin() {
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(_config.getBootstrapServers());
protected PulsarAdmin createPulsarAdmin() {
checkArgument(StringUtils.isNotBlank(_config.getServiceHttpUrl()),
"Service HTTP URL must be provided to perform admin operations");

PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(_config.getServiceHttpUrl());
try {
Optional.ofNullable(_config.getTlsTrustCertsFilePath())
.filter(StringUtils::isNotBlank)
.ifPresent(adminBuilder::tlsTrustCertsFilePath);
adminBuilder.authentication(authenticationConfig());
Optional.ofNullable(authenticationConfig()).ifPresent(adminBuilder::authentication);
return adminBuilder.build();
} catch (Exception e) {
throw new RuntimeException("Caught exception while creating Pulsar admin", e);
Expand All @@ -84,7 +88,8 @@ private PulsarAdmin createPulsarAdmin() {
*
* @return an Authentication object
*/
private Authentication authenticationConfig() {
private Authentication authenticationConfig()
throws MalformedURLException {
String authenticationToken = _config.getAuthenticationToken();
if (StringUtils.isNotBlank(authenticationToken)) {
return AuthenticationFactory.token(authenticationToken);
Expand All @@ -98,21 +103,18 @@ private Authentication authenticationConfig() {
*
* @return an OAuth2 Authentication object
*/
private Authentication oAuth2AuthenticationConfig() {
private Authentication oAuth2AuthenticationConfig()
throws MalformedURLException {
String issuerUrl = _config.getIssuerUrl();
String credentialsFilePath = _config.getCredentialsFilePath();
String audience = _config.getAudience();

if (StringUtils.isNotBlank(issuerUrl) && StringUtils.isNotBlank(credentialsFilePath) && StringUtils.isNotBlank(
audience)) {
try {
return AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsFilePath),
audience);
} catch (Exception e) {
throw new RuntimeException("Failed to create OAuth2 authentication", e);
}
return AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsFilePath),
audience);
}
throw new IllegalArgumentException("Invalid OAuth2 configuration");
return null;
}

@Override
Expand All @@ -121,8 +123,5 @@ public void close()
if (_pulsarClient != null) {
_pulsarClient.close();
}
if (_pulsarAdmin != null) {
_pulsarAdmin.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -181,14 +181,25 @@ public void close()

@Override
public List<TopicMetadata> getTopics() {
try {
return _pulsarAdmin.topics()
.getList(null)
.stream()
.map(topicName -> new PulsarTopicMetadata().setName(topicName))
.collect(Collectors.toList());
try (PulsarAdmin pulsarAdmin = createPulsarAdmin()) {
// List to store all topics
List<TopicMetadata> allTopics = new ArrayList<>();

for (String tenant : pulsarAdmin.tenants().getTenants()) {
for (String namespace : pulsarAdmin.namespaces().getNamespaces(tenant)) {
// Fetch all topics for the namespace
List<String> topicNames = pulsarAdmin.topics().getList(namespace);

// Map topics to PulsarTopicMetadata and add to the list
topicNames.stream()
.map(topicName -> new PulsarTopicMetadata().setName(topicName))
.forEach(allTopics::add);
}
}

return allTopics;
} catch (Exception e) {
throw new RuntimeException("Failed to list Pulsar topics", e);
throw new RuntimeException("Failed to list Pulsar topics across all tenants and namespaces", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -153,6 +155,7 @@ public StreamConfig getStreamConfig(String topicName) {
streamConfigMap.put("stream.pulsar.consumer.type", "simple");
streamConfigMap.put("stream.pulsar.topic.name", topicName);
streamConfigMap.put("stream.pulsar.bootstrap.servers", _pulsar.getPulsarBrokerUrl());
streamConfigMap.put("stream.pulsar.serviceHttpUrl", _pulsar.getHttpServiceUrl());
streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest");
streamConfigMap.put("stream.pulsar.consumer.factory.class.name", PulsarConsumerFactory.class.getName());
streamConfigMap.put("stream.pulsar.decoder.class.name", "dummy");
Expand Down Expand Up @@ -208,6 +211,19 @@ public void testPartitionLevelConsumerBatchMessages()
}
}

@Test
public void testGetTopics()
throws Exception {
try (PulsarStreamMetadataProvider metadataProvider = new PulsarStreamMetadataProvider(CLIENT_ID,
getStreamConfig("NON_EXISTING_TOPIC"))) {
List<StreamMetadataProvider.TopicMetadata> topics = metadataProvider.getTopics();
List<String> topicNames = topics.stream()
.map(StreamMetadataProvider.TopicMetadata::getName)
.collect(Collectors.toList());
assertTrue(topicNames.size() == 4);
}
}

private void testConsumer(PulsarPartitionLevelConsumer consumer, int startIndex, List<MessageId> messageIds) {
MessageId startMessageId = startIndex == 0 ? MessageId.earliest : messageIds.get(startIndex);
int numMessagesFetched = startIndex;
Expand Down

0 comments on commit 2e91eb3

Please sign in to comment.