Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-9972][Sort] Pulsar connector support authentication when connecting to Pulsar cluster #9973

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
startupMode.getValue(),
primaryKey,
pulsarSource.getSubscription(),
scanStartupSubStartOffset);
scanStartupSubStartOffset,
"",
"");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta
@JsonProperty("scanStartupSubStartOffset")
private String scanStartupSubStartOffset;

/**
* pulsar client auth plugin class name
* e.g. org.apache.pulsar.client.impl.auth.AuthenticationToken
*/
@JsonProperty("clientAuthPluginClassName")
private String clientAuthPluginClassName;

/**
* pulsar client auth params
* e.g. token:{tokenString}
* the tokenString should be compatible with the clientAuthPluginClassName see also in:
* <a href="https://pulsar.apache.org/docs/next/security-jwt/"> pulsar auth </a>
*/
@JsonProperty("clientAuthParams")
private String clientAuthParams;

@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand All @@ -86,7 +102,10 @@ public PulsarExtractNode(@JsonProperty("id") String id,
@Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("scanStartupSubName") String scanStartupSubName,
@JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset) {
@JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset,
@JsonProperty("clientAuthPluginClassName") String clientAuthPluginClassName,
@JsonProperty("clientAuthParams") String clientAuthParams) {

super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null.");
this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null.");
Expand All @@ -97,6 +116,9 @@ public PulsarExtractNode(@JsonProperty("id") String id,
this.primaryKey = primaryKey;
this.scanStartupSubName = scanStartupSubName;
this.scanStartupSubStartOffset = scanStartupSubStartOffset;
this.clientAuthPluginClassName = clientAuthPluginClassName;
this.clientAuthParams = clientAuthParams;

}

/**
Expand All @@ -107,23 +129,27 @@ public PulsarExtractNode(@JsonProperty("id") String id,
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
if (StringUtils.isEmpty(this.primaryKey)) {
if (StringUtils.isBlank(this.primaryKey)) {
options.put("connector", "pulsar-inlong");
options.putAll(format.generateOptions(false));
} else {
options.put("connector", "upsert-pulsar-inlong");
options.putAll(format.generateOptions(true));
}
if (adminUrl != null) {
if (StringUtils.isNotBlank(adminUrl)) {
options.put("admin-url", adminUrl);
}
options.put("service-url", serviceUrl);
options.put("topic", topic);
options.put("scan.startup.mode", scanStartupMode);
if (scanStartupSubName != null) {
if (StringUtils.isNotBlank(scanStartupSubName)) {
options.put("scan.startup.sub-name", scanStartupSubName);
options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset);
}
if (StringUtils.isNotBlank(clientAuthPluginClassName)) {
options.put("pulsar.client.authPluginClassName", clientAuthPluginClassName);
options.put("pulsar.client.authParams", clientAuthParams);
EMsnap marked this conversation as resolved.
Show resolved Hide resolved
}
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public Node getTestObject() {
"earliest",
null,
"subscription",
"earliest");
"earliest",
"org.apache.pulsar.client.impl.auth.AuthenticationToken",
"token auth params");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public PulsarExtractNode buildPulsarExtractNode() {
"earliest",
null,
"test",
"earliest");
"earliest",
"org.apache.pulsar.client.impl.auth.AuthenticationToken",
"token auth params");
}

private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
Expand Down
5 changes: 5 additions & 0 deletions inlong-sort/sort-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@
<artifactId>flink-sql-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -55,6 +57,8 @@ public class PulsarTableSource implements ScanTableSource, SupportsReadingMetada
// Format attributes
// --------------------------------------------------------------------------------------------

private static final Logger LOG = LoggerFactory.getLogger(PulsarTableSource.class);

private static final String FORMAT_METADATA_PREFIX = "value.";

private final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory;
Expand Down Expand Up @@ -111,6 +115,7 @@ public ChangelogMode getChangelogMode() {
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
PulsarDeserializationSchema<RowData> deserializationSchema =
deserializationSchemaFactory.createPulsarDeserialization(context);
LOG.info("pulsar source init with properties: {}", properties);
PulsarSource<RowData> source =
PulsarSource.builder()
.setTopics(topics)
Expand Down
Loading