Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Add base support of computed column for CDC #1109

Merged
merged 4 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ To use this feature through `flink run`, run the following shell command.
--table <table-name> \
[--partition-keys <partition-keys>] \
[--primary-keys <primary-keys>] \
[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
Expand All @@ -56,6 +57,8 @@ To use this feature through `flink run`, run the following shell command.
* `--table` is the Paimon table name.
* `--partition-keys` are the partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example `dt,hh,mm`.
* `--primary-keys` are the primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example `buyer_id,seller_id`.
* `--computed-column` are the definitions of computed columns. The argument field is from MySQL table field name. Supported expressions are:
* year(date-column): Extract year from a DATE, DATETIME or TIMESTAMP. Output is an INT value represent the year.
* `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password`, `database-name` and `table-name` are required configurations, others are optional. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options) for a complete list of configurations.
* `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations.
* `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations.
Expand Down Expand Up @@ -87,6 +90,7 @@ Example
--table test_table \
--partition-keys pt \
--primary-keys pt,uid \
--computed-columns '_year=year(age)' \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ public static Timestamp parseTimestampData(String dateStr, int precision)
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision));
}

public static LocalDateTime toLocalDateTime(String dateStr, int precision) {
return fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision);
}

/**
* This is similar to {@link LocalDateTime#from(TemporalAccessor)}, but it's less strict and
* introduces default values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
protected MySqlContainer mySqlContainer;

protected String warehousePath;
private String catalogDdl;
private String useCatalogCmd;
protected String catalogDdl;
protected String useCatalogCmd;

protected MySqlCdcE2eTestBase(MySqlVersion mySqlVersion) {
this(mySqlVersion, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.tests.cdc;

import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;

import java.sql.Connection;
import java.sql.Statement;

/** E2e test for MySql CDC with computed column. */
public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {

private static final Logger LOG = LoggerFactory.getLogger(MySqlComputedColumnE2ETest.class);

protected MySqlComputedColumnE2ETest() {
super(MySqlVersion.V5_7);
}

@Test
public void testSyncTable() throws Exception {
String runActionCommand =
String.join(
" ",
"bin/flink",
"run",
"-c",
"org.apache.paimon.flink.action.FlinkActions",
"-D",
"execution.checkpointing.interval=1s",
"--detached",
"lib/paimon-flink.jar",
"mysql-sync-table",
"--warehouse",
warehousePath,
"--database",
"default",
"--table",
"ts_table",
"--partition-keys",
// computed from _datetime
"_year",
"--primary-keys",
"pk,_year",
"--computed-column '_year=year(_datetime)'",
"--mysql-conf",
"hostname=mysql-1",
"--mysql-conf",
String.format("port=%d", MySqlContainer.MYSQL_PORT),
"--mysql-conf",
String.format("username='%s'", mySqlContainer.getUsername()),
"--mysql-conf",
String.format("password='%s'", mySqlContainer.getPassword()),
"--mysql-conf",
"database-name='test_computed_column'",
"--mysql-conf",
"table-name='T'",
"--table-conf",
"bucket=2");
Container.ExecResult execResult =
jobManager.execInContainer("su", "flink", "-c", runActionCommand);
LOG.info(execResult.getStdout());
LOG.info(execResult.getStderr());

try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
statement.executeUpdate("USE test_computed_column");

statement.executeUpdate("INSERT INTO T VALUES (1, '2023-05-10 12:30:20')");

String jobId =
runSql(
"INSERT INTO result1 SELECT * FROM ts_table;",
catalogDdl,
useCatalogCmd,
createResultSink("result1", "pk INT, _date TIMESTAMP(0), _year INT"));
checkResult("1, 2023-05-10T12:30:20, 2023");
clearCurrentResults();
cancelJob(jobId);
}
}

@Disabled("Not supported")
@Test
public void testSyncDatabase() {}
}
13 changes: 13 additions & 0 deletions paimon-e2e-tests/src/test/resources/mysql/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,16 @@ CREATE TABLE T (
UPPERCASE_V0 VARCHAR(20),
PRIMARY KEY (k)
);

-- ################################################################################
-- MySqlComputedColumnE2ETest#testSyncTable
-- ################################################################################

CREATE DATABASE test_computed_column;
USE test_computed_column;

CREATE TABLE T (
pk INT,
_datetime DATETIME,
PRIMARY KEY (pk)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.types.DataType;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
* A Computed column's value is computed from input columns. Only expression with at most two inputs
* (with referenced field at the first) is supported currently.
*/
public class ComputedColumn implements Serializable {

private static final long serialVersionUID = 1L;

private final String columnName;
private final Expression expression;

public ComputedColumn(String columnName, Expression expression) {
this.columnName = columnName;
this.expression = expression;
}

public String columnName() {
return columnName;
}

public DataType columnType() {
return expression.outputType();
}

String fieldReference() {
return expression.fieldReference();
}

/** Compute column's value from given argument. Return null if input is null. */
@Nullable
String eval(@Nullable String input) {
if (input == null) {
return null;
}
return expression.eval(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;

/** Produce a computation result for computed column. */
public interface Expression extends Serializable {

List<String> SUPPORTED_EXPRESSION = Collections.singletonList("year");

/** Return name of referenced field. */
String fieldReference();

/** Return {@link DataType} of computed value. */
DataType outputType();

/** Compute value from given input. Input and output are serialized to string. */
String eval(String input);

static Expression create(
String exprName, String fieldReference, DataType fieldType, @Nullable String literal) {
switch (exprName) {
case "year":
return year(fieldReference);
// TODO: support more expression
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName, String.join(",", SUPPORTED_EXPRESSION)));
}
}

static Expression year(String fieldReference) {
return new YearComputer(fieldReference);
}

/** Compute year from a time input. */
final class YearComputer implements Expression {

private static final long serialVersionUID = 1L;

private final String fieldReference;

private YearComputer(String fieldReference) {
this.fieldReference = fieldReference;
}

@Override
public String fieldReference() {
return fieldReference;
}

@Override
public DataType outputType() {
return DataTypes.INT();
}

@Override
public String eval(String input) {
LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 0);
return String.valueOf(localDateTime.getYear());
}
}
}
Loading