From 7b337024d45b5fdaf846f9601f145b2518722553 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Thu, 11 Apr 2024 10:11:46 +0800 Subject: [PATCH 1/3] [INLONG-9972][Sort] Pulsar connector support authentication when connecting to Pulsar cluster --- .../sort/node/provider/PulsarProvider.java | 4 ++- .../node/extract/PulsarExtractNode.java | 34 ++++++++++++++++--- .../node/extract/PulsarExtractNodeTest.java | 5 ++- .../sort/parser/PulsarSqlParserTest.java | 4 ++- inlong-sort/sort-dist/pom.xml | 5 +++ .../sort/pulsar/table/PulsarTableSource.java | 5 +++ 6 files changed, 50 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java index a767f799f31..9ef0a1634ae 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java @@ -80,7 +80,9 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) { startupMode.getValue(), primaryKey, pulsarSource.getSubscription(), - scanStartupSubStartOffset); + scanStartupSubStartOffset, + "", + ""); } @Override diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index bc09b52e2c3..489c1d642d1 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -73,6 +73,22 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta @JsonProperty("scanStartupSubStartOffset") private String scanStartupSubStartOffset; + /** + * pulsar client auth plugin class name + * e.g. org.apache.pulsar.client.impl.auth.AuthenticationToken + */ + @JsonProperty("clientAuthPluginClassName") + private String clientAuthPluginClassName; + + /** + * pulsar client auth params + * e.g. token:{tokenString} + * the tokenString should be compatible with the clientAuthPluginClassName see also in: + * pulsar auth + */ + @JsonProperty("clientAuthParams") + private String clientAuthParams; + @JsonCreator public PulsarExtractNode(@JsonProperty("id") String id, @JsonProperty("name") String name, @@ -86,7 +102,10 @@ public PulsarExtractNode(@JsonProperty("id") String id, @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode, @JsonProperty("primaryKey") String primaryKey, @JsonProperty("scanStartupSubName") String scanStartupSubName, - @JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset) { + @JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset, + @JsonProperty("clientAuthPluginClassName") String clientAuthPluginClassName, + @JsonProperty("clientAuthParams") String clientAuthParams) { + super(id, name, fields, watermarkField, properties); this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null."); this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null."); @@ -97,6 +116,9 @@ public PulsarExtractNode(@JsonProperty("id") String id, this.primaryKey = primaryKey; this.scanStartupSubName = scanStartupSubName; this.scanStartupSubStartOffset = scanStartupSubStartOffset; + this.clientAuthPluginClassName = clientAuthPluginClassName; + this.clientAuthParams = clientAuthParams; + } /** @@ -107,23 +129,27 @@ public PulsarExtractNode(@JsonProperty("id") String id, @Override public Map tableOptions() { Map options = super.tableOptions(); - if (StringUtils.isEmpty(this.primaryKey)) { + if (StringUtils.isBlank(this.primaryKey)) { options.put("connector", "pulsar-inlong"); options.putAll(format.generateOptions(false)); } else { options.put("connector", "upsert-pulsar-inlong"); options.putAll(format.generateOptions(true)); } - if (adminUrl != null) { + if (StringUtils.isNotBlank(adminUrl)) { options.put("admin-url", adminUrl); } options.put("service-url", serviceUrl); options.put("topic", topic); options.put("scan.startup.mode", scanStartupMode); - if (scanStartupSubName != null) { + if (StringUtils.isNotBlank(scanStartupSubName)) { options.put("scan.startup.sub-name", scanStartupSubName); options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset); } + if (StringUtils.isNotBlank(clientAuthPluginClassName)) { + options.put("pulsar.client.authPluginClassName", clientAuthPluginClassName); + options.put("pulsar.client.authParams", clientAuthParams); + } return options; } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java index 8ab72647262..efb3c219de6 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java @@ -51,6 +51,9 @@ public Node getTestObject() { "earliest", null, "subscription", - "earliest"); + "earliest", + "org.apache.pulsar.client.impl.auth.AuthenticationToken", + "token auth params"); + ); } } diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java index 5db0e6ae5d4..fe3d758fae3 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java @@ -77,7 +77,9 @@ public PulsarExtractNode buildPulsarExtractNode() { "earliest", null, "test", - "earliest"); + "earliest", + "org.apache.pulsar.client.impl.auth.AuthenticationToken", + "token auth params"); } private NodeRelation buildNodeRelation(List inputs, List outputs) { diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml index 587d77613b1..05d5da66af2 100644 --- a/inlong-sort/sort-dist/pom.xml +++ b/inlong-sort/sort-dist/pom.xml @@ -197,6 +197,11 @@ flink-sql-avro ${flink.version} + + org.apache.inlong + audit-sdk + ${project.version} + diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java index f177eb7543d..84a076d5022 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java @@ -31,6 +31,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -55,6 +57,8 @@ public class PulsarTableSource implements ScanTableSource, SupportsReadingMetada // Format attributes // -------------------------------------------------------------------------------------------- + private static final Logger LOG = LoggerFactory.getLogger(PulsarTableSource.class); + private static final String FORMAT_METADATA_PREFIX = "value."; private final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory; @@ -111,6 +115,7 @@ public ChangelogMode getChangelogMode() { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { PulsarDeserializationSchema deserializationSchema = deserializationSchemaFactory.createPulsarDeserialization(context); + LOG.info("pulsar source init with properties: {}", properties); PulsarSource source = PulsarSource.builder() .setTopics(topics) From 9b2f9790a494685c68842036d3f32bfb08e5dc1b Mon Sep 17 00:00:00 2001 From: pengzirui Date: Thu, 11 Apr 2024 10:22:53 +0800 Subject: [PATCH 2/3] [INLONG-9972][Sort] fix code style --- .../inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java index efb3c219de6..5c84a4e0e77 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java @@ -54,6 +54,5 @@ public Node getTestObject() { "earliest", "org.apache.pulsar.client.impl.auth.AuthenticationToken", "token auth params"); - ); } } From c076f45444e0ccdb20b0ada6ceed69f4e2f2e667 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Thu, 11 Apr 2024 20:53:31 +0800 Subject: [PATCH 3/3] [INLONG-9972][Sort] Pulsar connector support authentication when connecting to Pulsar cluster --- .../protocol/node/extract/PulsarExtractNode.java | 3 ++- .../org/apache/inlong/sort/base/Constants.java | 14 ++++++++++++++ .../pulsar/table/PulsarDynamicTableFactory.java | 4 ++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index 489c1d642d1..9a2adcc8e3c 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -146,7 +146,8 @@ public Map tableOptions() { options.put("scan.startup.sub-name", scanStartupSubName); options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset); } - if (StringUtils.isNotBlank(clientAuthPluginClassName)) { + if (StringUtils.isNotBlank(clientAuthPluginClassName) + && StringUtils.isNotBlank(clientAuthParams)) { options.put("pulsar.client.authPluginClassName", clientAuthPluginClassName); options.put("pulsar.client.authParams", clientAuthParams); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index ba47c743561..98d7c559b85 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -429,4 +429,18 @@ public final class Constants { .withDescription( "Inner format"); + public static final ConfigOption PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME = + ConfigOptions.key("pulsar.client.authPluginClassName") + .stringType() + .noDefaultValue() + .withDescription( + "pulsar client auth plugin class name"); + + public static final ConfigOption PULSAR_AUTH_PARAMS = + ConfigOptions.key("pulsar.client.authParams") + .stringType() + .noDefaultValue() + .withDescription( + "pulsar client auth params"); + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java index dd06bfe7584..c731e22097f 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java @@ -79,6 +79,8 @@ import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.PULSAR_AUTH_PARAMS; +import static org.apache.inlong.sort.base.Constants.PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME; /** * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 @@ -333,6 +335,8 @@ public Set> optionalOptions() { options.add(INLONG_METRIC); options.add(INLONG_AUDIT); options.add(AUDIT_KEYS); + options.add(PULSAR_AUTH_PARAMS); + options.add(PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME); return options; }