Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Aug 12, 2024
1 parent 88ae428 commit fbbedf2
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.connector.tidb;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.container.TestContainer;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@Slf4j
public class TiDBCDCIT extends TiDBTestBase implements TestResource {
private final String SQL_TEMPLATE = "select id,name,description,weight from %s.%s";
private final String DATABASE_NAME = "inventory";
private final String SOURCE_TABLE_NAME = "products";
private final String SINK_TABLE_NAME = "products_sink";

@BeforeAll
@Override
public void startUp() throws Exception {
startContainers();
}

@TestTemplate
public void testAllEvents(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/tidb/tidb_source_to_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());

try (Connection connection = getJdbcConnection("inventory");
Statement statement = connection.createStatement()) {

statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}

// stream stage
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
query(getQuerySQL(DATABASE_NAME, SOURCE_TABLE_NAME)),
query(getQuerySQL(DATABASE_NAME, SINK_TABLE_NAME)));
});
}

private List<List<Object>> query(String sql) {
try (Connection connection = getJdbcConnection("inventory")) {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
List<List<Object>> result = new ArrayList<>();
int columnCount = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
ArrayList<Object> objects = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
objects.add(resultSet.getObject(i));
}
log.debug(String.format("Print MySQL-CDC query, sql: %s, data: %s", sql, objects));
result.add(objects);
}
return result;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

private String getQuerySQL(String database, String tableName) {
return String.format(SQL_TEMPLATE, database, tableName);
}

@AfterAll
@Override
public void tearDown() throws Exception {
stopContainers();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.connector.tidb;

import org.apache.seatunnel.e2e.common.TestSuiteBase;

import org.apache.commons.lang3.RandomUtils;

import org.awaitility.Awaitility;
Expand Down Expand Up @@ -49,7 +51,7 @@

/** Utility class for tidb tests. */
@Slf4j
public class TiDBContainer {
public class TiDBTestBase extends TestSuiteBase {
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

public static final String PD_SERVICE_NAME = "pd0";
Expand All @@ -64,9 +66,8 @@ public class TiDBContainer {
public static final int PD_PORT_ORIGIN = 2379;
public static int pdPort = PD_PORT_ORIGIN + RandomUtils.nextInt(0, 1000);

@ClassRule public static final Network NETWORK = Network.newNetwork();
public static final Network NETWORK = Network.newNetwork();

@ClassRule
public static final GenericContainer<?> PD =
new FixedHostPortGenericContainer<>("pingcap/pd:v6.1.0")
.withFileSystemBind("src/test/resources/config/pd.toml", "/pd.toml")
Expand All @@ -86,7 +87,6 @@ public class TiDBContainer {
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(log));

@ClassRule
public static final GenericContainer<?> TIKV =
new FixedHostPortGenericContainer<>("pingcap/tikv:v6.1.0")
.withFixedExposedPort(TIKV_PORT_ORIGIN, TIKV_PORT_ORIGIN)
Expand All @@ -104,7 +104,6 @@ public class TiDBContainer {
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(log));

@ClassRule
public static final GenericContainer<?> TIDB =
new GenericContainer<>("pingcap/tidb:v6.1.0")
.withExposedPorts(TIDB_PORT)
Expand Down Expand Up @@ -180,7 +179,7 @@ private static void dropTestDatabase(Connection connection, String databaseName)
*/
protected void initializeTidbTable(String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = TiDBContainer.class.getClassLoader().getResource(ddlFile);
final URL ddlTestFile = TiDBTestBase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection("");
Statement statement = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@ USE inventory;
CREATE TABLE products
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
name VARCHAR(255) NOT NULL DEFAULT 'seatunnel',
description VARCHAR(512),
weight DECIMAL(20, 10)
);
ALTER TABLE products AUTO_INCREMENT = 101;

CREATE TABLE products_sink
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'seatunnel',
description VARCHAR(512),
weight DECIMAL(20, 10)
);

INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
(default, "car battery", "12V car battery", 8.1),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 2
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
TDengine {
url: "jdbc:TAOS-RS://flink_e2e_tdengine_src:6041/"
username: "root"
password: "taosdata"
database: "power"
stable: "meters"
lower_bound: "2018-10-03 14:38:05.000"
upper_bound: "2018-10-03 14:38:16.801"
result_table_name = "tdengine_result"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}

transform {
}

sink {
TDengine {
url: "jdbc:TAOS-RS://flink_e2e_tdengine_sink:6041/"
username: "root"
password: "taosdata"
database: "power2"
stable: "meters2"
timezone: "UTC"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}

0 comments on commit fbbedf2

Please sign in to comment.