Skip to content

Commit

Permalink
[flink] Add base support of computed column for CDC (#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored and JingsongLi committed May 12, 2023
1 parent fa61241 commit 621d574
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 67 deletions.
4 changes: 4 additions & 0 deletions docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ To use this feature through `flink run`, run the following shell command.
--table <table-name> \
[--partition-keys <partition-keys>] \
[--primary-keys <primary-keys>] \
[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
Expand All @@ -56,6 +57,8 @@ To use this feature through `flink run`, run the following shell command.
* `--table` is the Paimon table name.
* `--partition-keys` are the partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example `dt,hh,mm`.
* `--primary-keys` are the primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example `buyer_id,seller_id`.
* `--computed-column` are the definitions of computed columns. The argument field is from MySQL table field name. Supported expressions are:
* year(date-column): Extract year from a DATE, DATETIME or TIMESTAMP. Output is an INT value represent the year.
* `--mysql-conf` is the configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format `key=value`. `hostname`, `username`, `password`, `database-name` and `table-name` are required configurations, others are optional. See its [document](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options) for a complete list of configurations.
* `--catalog-conf` is the configuration for Paimon catalog. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of catalog configurations.
* `--table-conf` is the configuration for Paimon table sink. Each configuration should be specified in the format `key=value`. See [here]({{< ref "maintenance/configurations" >}}) for a complete list of table configurations.
Expand Down Expand Up @@ -87,6 +90,7 @@ Example
--table test_table \
--partition-keys pt \
--primary-keys pt,uid \
--computed-columns '_year=year(age)' \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ public static Timestamp parseTimestampData(String dateStr, int precision)
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision));
}

public static LocalDateTime toLocalDateTime(String dateStr, int precision) {
return fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision);
}

/**
* This is similar to {@link LocalDateTime#from(TemporalAccessor)}, but it's less strict and
* introduces default values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
protected MySqlContainer mySqlContainer;

protected String warehousePath;
private String catalogDdl;
private String useCatalogCmd;
protected String catalogDdl;
protected String useCatalogCmd;

protected MySqlCdcE2eTestBase(MySqlVersion mySqlVersion) {
this(mySqlVersion, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.tests.cdc;

import org.apache.paimon.flink.action.cdc.mysql.MySqlContainer;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;

import java.sql.Connection;
import java.sql.Statement;

/** E2e test for MySql CDC with computed column. */
public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {

private static final Logger LOG = LoggerFactory.getLogger(MySqlComputedColumnE2ETest.class);

protected MySqlComputedColumnE2ETest() {
super(MySqlVersion.V5_7);
}

@Test
public void testSyncTable() throws Exception {
String runActionCommand =
String.join(
" ",
"bin/flink",
"run",
"-c",
"org.apache.paimon.flink.action.FlinkActions",
"-D",
"execution.checkpointing.interval=1s",
"--detached",
"lib/paimon-flink.jar",
"mysql-sync-table",
"--warehouse",
warehousePath,
"--database",
"default",
"--table",
"ts_table",
"--partition-keys",
// computed from _datetime
"_year",
"--primary-keys",
"pk,_year",
"--computed-column '_year=year(_datetime)'",
"--mysql-conf",
"hostname=mysql-1",
"--mysql-conf",
String.format("port=%d", MySqlContainer.MYSQL_PORT),
"--mysql-conf",
String.format("username='%s'", mySqlContainer.getUsername()),
"--mysql-conf",
String.format("password='%s'", mySqlContainer.getPassword()),
"--mysql-conf",
"database-name='test_computed_column'",
"--mysql-conf",
"table-name='T'",
"--table-conf",
"bucket=2");
Container.ExecResult execResult =
jobManager.execInContainer("su", "flink", "-c", runActionCommand);
LOG.info(execResult.getStdout());
LOG.info(execResult.getStderr());

try (Connection conn = getMySqlConnection();
Statement statement = conn.createStatement()) {
statement.executeUpdate("USE test_computed_column");

statement.executeUpdate("INSERT INTO T VALUES (1, '2023-05-10 12:30:20')");

String jobId =
runSql(
"INSERT INTO result1 SELECT * FROM ts_table;",
catalogDdl,
useCatalogCmd,
createResultSink("result1", "pk INT, _date TIMESTAMP(0), _year INT"));
checkResult("1, 2023-05-10T12:30:20, 2023");
clearCurrentResults();
cancelJob(jobId);
}
}

@Disabled("Not supported")
@Test
public void testSyncDatabase() {}
}
13 changes: 13 additions & 0 deletions paimon-e2e-tests/src/test/resources/mysql/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,16 @@ CREATE TABLE T (
UPPERCASE_V0 VARCHAR(20),
PRIMARY KEY (k)
);

-- ################################################################################
-- MySqlComputedColumnE2ETest#testSyncTable
-- ################################################################################

CREATE DATABASE test_computed_column;
USE test_computed_column;

CREATE TABLE T (
pk INT,
_datetime DATETIME,
PRIMARY KEY (pk)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.types.DataType;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
* A Computed column's value is computed from input columns. Only expression with at most two inputs
* (with referenced field at the first) is supported currently.
*/
public class ComputedColumn implements Serializable {

private static final long serialVersionUID = 1L;

private final String columnName;
private final Expression expression;

public ComputedColumn(String columnName, Expression expression) {
this.columnName = columnName;
this.expression = expression;
}

public String columnName() {
return columnName;
}

public DataType columnType() {
return expression.outputType();
}

String fieldReference() {
return expression.fieldReference();
}

/** Compute column's value from given argument. Return null if input is null. */
@Nullable
String eval(@Nullable String input) {
if (input == null) {
return null;
}
return expression.eval(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;

/** Produce a computation result for computed column. */
public interface Expression extends Serializable {

List<String> SUPPORTED_EXPRESSION = Collections.singletonList("year");

/** Return name of referenced field. */
String fieldReference();

/** Return {@link DataType} of computed value. */
DataType outputType();

/** Compute value from given input. Input and output are serialized to string. */
String eval(String input);

static Expression create(
String exprName, String fieldReference, DataType fieldType, @Nullable String literal) {
switch (exprName) {
case "year":
return year(fieldReference);
// TODO: support more expression
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported expression: %s. Supported expressions are: %s",
exprName, String.join(",", SUPPORTED_EXPRESSION)));
}
}

static Expression year(String fieldReference) {
return new YearComputer(fieldReference);
}

/** Compute year from a time input. */
final class YearComputer implements Expression {

private static final long serialVersionUID = 1L;

private final String fieldReference;

private YearComputer(String fieldReference) {
this.fieldReference = fieldReference;
}

@Override
public String fieldReference() {
return fieldReference;
}

@Override
public DataType outputType() {
return DataTypes.INT();
}

@Override
public String eval(String input) {
LocalDateTime localDateTime = DateTimeUtils.toLocalDateTime(input, 0);
return String.valueOf(localDateTime.getYear());
}
}
}
Loading

0 comments on commit 621d574

Please sign in to comment.