Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Add base support of computed column for CDC #1109

Merged
merged 4 commits into from
May 11, 2023

Conversation

yuzelin
Copy link
Contributor

@yuzelin yuzelin commented May 9, 2023

Purpose

Base of #1077 and #1038

Tests

MySqlComputedColumnE2ETest
MySqlSyncTableActionITCase#testComputedColumn

API and Format

New argument in mysql-sync-table:
--computed-column

Documentation

CDC Ingestion

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please resolve conflicts.

@@ -416,6 +416,10 @@ public static Timestamp parseTimestampData(String dateStr, int precision)
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision));
}

public static LocalDateTime toLocalDateTime(String dateStr) {
return fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the default precision is zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was currently we don't need the nanoseconds part. Maybe the default can be 9.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just introduce a method with precision, and use toLocalDateTime(str, 0)

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Compute value for computed column. */
public interface ColumnValueComputer extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just Expression?

List<String> SUPPORTED_EXPRESSION = Collections.singletonList("year");

/** Return input column name. */
String inputName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldReference?

DataType outputType();

/** Compute value from given input. Input and output are serialize to string. */
String computeValue(String input);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output eval(Input input);

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Compute value for computed column. */
public interface ColumnValueComputer extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<Input, Output>

String computeValue(String input);

static ColumnValueComputer create(
String exprName, Map<String, DataType> typeMapping, String[] args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just exprName and fieldReference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think we need inputType and args.
inputType: truncate can process numeric, string. We need input type.
args: still truncate: truncate(width, fieldReference). We need the arg width.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String exprName, String fieldReference, DataType fieldType, Object literal?
Is this enough?
Expression is a very complex thing, and its parameter definitions are very complex. I don't want to touch it to the point of complexity, we only need to meet the requirements now.

@@ -26,6 +26,7 @@
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make this class pure, without caseSensitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done by: #1123

@@ -56,6 +57,8 @@ To use this feature through `flink run`, run the following shell command.
* `--table` is the Paimon table name.
* `--partition-keys` are the partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example `dt,hh,mm`.
* `--primary-keys` are the primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example `buyer_id,seller_id`.
* `--computed-column` are the definitions of computed columns. Supported expressions are:
Copy link
Contributor

@JingsongLi JingsongLi May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document from mysql field name instead of paimon field names?

Schema fromMySql =
MySqlActionUtils.buildPaimonSchema(
mySqlSchema, partitionKeys, primaryKeys, tableConfig, caseSensitive);
List<ComputedColumn> computedColumns = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List<ComputedColumn> computedColumns = MySqlActionUtils.buildComputedColumns(
                            computedColumnArgs, mySqlSchema.typeMapping());
Schema fromMySql =
                    MySqlActionUtils.buildPaimonSchema(
                            mySqlSchema,
                            partitionKeys,
                            primaryKeys,
                            computedColumns,
                            tableConfig,
                            caseSensitive);
try {
            table = (FileStoreTable) catalog.getTable(identifier);
            checkArgument(
                    computedColumns.isEmpty(),
                    "Cannot add computed column when table already exists.");
            MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
        } catch (Catalog.TableNotExistException e) {
            catalog.createTable(identifier, fromMySql, false);
            table = (FileStoreTable) catalog.getTable(identifier);
        }

for (int i = 0; i < args.length; i++) {
args[i] = args[i].trim();
}
computedColumns.add(new ComputedColumn(columnName, exprName, typeMapping, args));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move the codes in ComputedColumn to here, and just introduce ComputedColumn(String columnName, Expression expression)

@JingsongLi JingsongLi closed this May 11, 2023
@JingsongLi JingsongLi reopened this May 11, 2023
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@JingsongLi
Copy link
Contributor

Please merge by yourself after test passing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants