Skip to content

Commit

Permalink
[INLONG-10069][Sort] Support audit metrics for sort-connector-pulsar-…
Browse files Browse the repository at this point in the history
…1.18 (#10070)
  • Loading branch information
aloyszhang authored Apr 26, 2024
1 parent f2110a4 commit 5674bc2
Show file tree
Hide file tree
Showing 8 changed files with 620 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.pulsar.table;

import org.apache.inlong.sort.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
import org.apache.inlong.sort.pulsar.table.source.PulsarTableSource;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
Expand All @@ -31,8 +34,6 @@
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
import org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
Expand Down Expand Up @@ -99,6 +100,7 @@
*
* <p>The main role of this class is to retrieve config options and validate options from config and
* the table schema. It also sets default values if a config option is not present.
* Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableFactory}
*/
public class PulsarTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

Expand Down Expand Up @@ -154,14 +156,21 @@ public DynamicTableSource createDynamicTableSource(Context context) {
final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);

String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
String auditKeys = tableOptions.get(AUDIT_KEYS);

final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory =
new PulsarTableDeserializationSchemaFactory(
physicalDataType,
keyDecodingFormat,
keyProjection,
valueDecodingFormat,
valueProjection,
UPSERT_DISABLED);
UPSERT_DISABLED,
inlongMetric,
auditHostAndPorts,
auditKeys);

// Set default values for configuration not exposed to user.
final DecodingFormat<DeserializationSchema<RowData>> decodingFormatForMetadataPushdown =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* <li>Create key and value encoding/decoding format.
* <li>Create key and value projection.
* </ul>
* Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils}
*/
public class PulsarTableOptionUtils {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
* org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
* org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
* Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableOptions}
*/
@PublicEvolving
public final class PulsarTableOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
import static org.apache.pulsar.common.naming.TopicName.isValid;

/** Util class for source and sink validation rules. */
/** Util class for source and sink validation rules.
* Modify from {@link org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils}
* */
public class PulsarTableValidationUtils {

private PulsarTableValidationUtils() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.pulsar.table.source;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;

import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A specific {@link PulsarDeserializationSchema} for {@link PulsarTableSource}.
*
* <p>Both Flink's key decoding format and value decoding format are wrapped in this class. It is
* responsible for getting metadata fields from a physical pulsar message body, and the final
* projection mapping from Pulsar message fields to Flink row.
*
* <p>After retrieving key and value bytes and convert them into a list of {@link RowData}, it then
* delegates metadata appending, key and value {@link RowData} combining to a {@link
* PulsarRowDataConverter} instance.
* Modify from {@link org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchema}
*/
public class PulsarTableDeserializationSchema implements PulsarDeserializationSchema<RowData> {

private static final long serialVersionUID = 1L;

private final TypeInformation<RowData> producedTypeInfo;

@Nullable
private final DeserializationSchema<RowData> keyDeserialization;

private final DeserializationSchema<RowData> valueDeserialization;

private final PulsarRowDataConverter rowDataConverter;

private final boolean upsertMode;

private SourceMetricData sourceMetricData;

private MetricOption metricOption;

public PulsarTableDeserializationSchema(
@Nullable DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo,
PulsarRowDataConverter rowDataConverter,
boolean upsertMode,
MetricOption metricOption) {
if (upsertMode) {
checkNotNull(keyDeserialization, "upsert mode must specify a key format");
}
this.keyDeserialization = keyDeserialization;
this.valueDeserialization = checkNotNull(valueDeserialization);
this.rowDataConverter = checkNotNull(rowDataConverter);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.upsertMode = upsertMode;
this.metricOption = metricOption;
}

@Override
public void open(PulsarInitializationContext context, SourceConfiguration configuration)
throws Exception {
if (keyDeserialization != null) {
keyDeserialization.open(context);
}
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
}
valueDeserialization.open(context);
}

@Override
public void deserialize(Message<byte[]> message, Collector<RowData> collector)
throws IOException {

// Get the key row data
List<RowData> keyRowData = new ArrayList<>();
if (keyDeserialization != null) {
keyDeserialization.deserialize(message.getKeyBytes(), new ListCollector<>(keyRowData));
}

// Get the value row data
List<RowData> valueRowData = new ArrayList<>();

if (upsertMode && message.getData().length == 0) {
rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, collector);
return;
}

valueDeserialization.deserialize(message.getData(),
new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData));

rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, collector);
}

@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}
}
Loading

0 comments on commit 5674bc2

Please sign in to comment.