Skip to content

Commit

Permalink
[INLONG-9866][Sort] Add end to end test case(redis to redis) for sort…
Browse files Browse the repository at this point in the history
…-connector-redis-v1.15. (#9869)
  • Loading branch information
XiaoYou201 authored Apr 2, 2024
1 parent 3746e3f commit 8903184
Show file tree
Hide file tree
Showing 16 changed files with 495 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:
CI: false

- name: Unit test with Maven
run: mvn --batch-mode --update-snapshots -e -V test
run: mvn --batch-mode --update-snapshots -e -V test -pl !:sort-end-to-end-tests-v1.15,!:sort-end-to-end-tests-v1.13
env:
CI: false

Expand Down
3 changes: 3 additions & 0 deletions inlong-sort/sort-end-to-end-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
</profile>
<profile>
<id>v1.15</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<modules>
<module>sort-end-to-end-tests-v1.15</module>
</modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@
<artifactId>clickhouse-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -233,6 +238,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-redis-v1.15</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-redis.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
<executions>
Expand All @@ -241,7 +254,7 @@
<goals>
<goal>copy</goal>
</goals>
<phase>pre-integration-test</phase>
<phase>validate</phase>
</execution>
</executions>
</plugin>
Expand Down Expand Up @@ -273,6 +286,12 @@
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,15 @@

import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;

/**
* End-to-end tests for sort-connector-kafka uber jar.
*/
public class KafkaE2EITCase extends FlinkContainerTestEnv {
public class Kafka2StarRocksTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(KafkaE2EITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(Kafka2StarRocksTest.class);

public static final Logger MYSQL_LOG = LoggerFactory.getLogger(MySqlContainer.class);

Expand All @@ -81,9 +80,8 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
static {
try {
URI kafkaSqlFile =
Objects.requireNonNull(KafkaE2EITCase.class.getResource("/flinkSql/kafka_test.sql")).toURI();
Objects.requireNonNull(Kafka2StarRocksTest.class.getResource("/flinkSql/kafka_test.sql")).toURI();
sqlFile = Paths.get(kafkaSqlFile).toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Expand All @@ -102,7 +100,6 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

Expand Down Expand Up @@ -178,7 +175,7 @@ private void initializeKafkaTable(String topic) {
}

private String getCreateStatement(String fileName, Map<String, Object> properties) {
URL url = Objects.requireNonNull(KafkaE2EITCase.class.getResource("/env/" + fileName));
URL url = Objects.requireNonNull(Kafka2StarRocksTest.class.getResource("/env/" + fileName));

try {
Path file = Paths.get(url.toURI());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
* End-to-end tests for sort-connector-mongodb-cdc-v1.15 uber jar.
* Test flink sql Mongodb cdc to StarRocks
*/
public class MongodbToStarRocksITCase extends FlinkContainerTestEnv {
public class Mongodb2StarRocksTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(MongodbToStarRocksITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(Mongodb2StarRocksTest.class);

private static final Path mongodbJar = TestUtils.getResource("sort-connector-mongodb-cdc.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
Expand All @@ -82,9 +82,8 @@ public class MongodbToStarRocksITCase extends FlinkContainerTestEnv {

static {
try {
sqlFile = Paths.get(PostgresToStarRocksITCase.class.getResource("/flinkSql/mongodb_test.sql").toURI())
sqlFile = Paths.get(Postgres2StarRocksTest.class.getResource("/flinkSql/mongodb_test.sql").toURI())
.toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -116,7 +115,6 @@ public static void buildStarRocksImage() {
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@

import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;

/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Mysql cdc to StarRocks
*/
public class MysqlToRocksITCase extends FlinkContainerTestEnv {
public class Mysql2StarRocksTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(MysqlToRocksITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(Mysql2StarRocksTest.class);

private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
Expand All @@ -64,19 +63,17 @@ public class MysqlToRocksITCase extends FlinkContainerTestEnv {
static {
try {
sqlFile =
Paths.get(MysqlToRocksITCase.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
buildStarRocksImage();
Paths.get(Mysql2StarRocksTest.class.getResource("/flinkSql/mysql_test.sql").toURI()).toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static StarRocksContainer STAR_ROCKS =
public static final StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,43 +43,29 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.inlong.sort.tests.utils.StarRocksManager.INTER_CONTAINER_STAR_ROCKS_ALIAS;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.STAR_ROCKS_LOG;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.buildStarRocksImage;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.getNewStarRocksImageName;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.initializeStarRocksTable;
import static org.apache.inlong.sort.tests.utils.StarRocksManager.*;

/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Postgres cdc to StarRocks
*/
public class PostgresToStarRocksITCase extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(PostgresToStarRocksITCase.class);
public class Postgres2StarRocksTest extends FlinkContainerTestEnv {

private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class);
private static final Logger LOG = LoggerFactory.getLogger(Postgres2StarRocksTest.class);
private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-starrocks.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
private static final String sqlFile;

static {
try {
sqlFile = Paths.get(PostgresToStarRocksITCase.class.getResource("/flinkSql/postgres_test.sql").toURI())
sqlFile = Paths.get(Postgres2StarRocksTest.class.getResource("/flinkSql/postgres_test.sql").toURI())
.toString();
buildStarRocksImage();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withAccessToHost(true)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@ClassRule
public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer(
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"))
Expand All @@ -88,7 +74,14 @@ public class PostgresToStarRocksITCase extends FlinkContainerTestEnv {
.withDatabaseName("test")
.withNetwork(NETWORK)
.withNetworkAliases("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOG));
.withLogConsumer(new Slf4jLogConsumer(PG_LOG));
@ClassRule
public static final StarRocksContainer STAR_ROCKS =
(StarRocksContainer) new StarRocksContainer(getNewStarRocksImageName())
.withExposedPorts(9030, 8030, 8040)
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_STAR_ROCKS_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(STAR_ROCKS_LOG));

@Before
public void setup() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv;
import org.apache.inlong.sort.tests.utils.RedisContainer;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
import redis.clients.jedis.Jedis;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

public class RedisToRedisTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);
private static final Path redisJar = TestUtils.getResource("sort-connector-redis.jar");
private static final String sqlFile;
private static Jedis jedisSource;
private static Jedis jedisSink;

static {
try {
sqlFile = Paths.get(RedisToRedisTest.class.getResource("/flinkSql/redis_test.sql").toURI())
.toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static final RedisContainer REDIS_CONTAINER_SOURCE = new RedisContainer(
DockerImageName.parse("redis:6.2.14"))
.withExposedPorts(6379)
.withNetwork(NETWORK)
.withNetworkAliases("redis_source")
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final RedisContainer REDIS_CONTAINER_SINK = new RedisContainer(
DockerImageName.parse("redis:6.2.14"))
.withExposedPorts(6379)
.withNetwork(NETWORK)
.withNetworkAliases("redis_sink")
.withLogConsumer(new Slf4jLogConsumer(LOG));
@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializeRedisTable();
}

private void initializeRedisTable() {

int sourcePort = REDIS_CONTAINER_SOURCE.getRedisPort();
int sinkPort = REDIS_CONTAINER_SINK.getRedisPort();

jedisSource = new Jedis("127.0.0.1", sourcePort);
jedisSink = new Jedis("127.0.0.1", sinkPort);

jedisSource.set("1", "value_1");
jedisSource.set("2", "value_2");

jedisSource.hset("3", "1", "value_1");
jedisSource.hset("3", "2", "value_2");

// ZREVRANK TEST
jedisSource.zadd("rank", 10, "1");
jedisSource.zadd("rank", 20, "2");
jedisSource.zadd("rank", 30, "3");

// ZSCORETEST TEST
jedisSource.zadd("rank_score", 10, "1");
jedisSource.zadd("rank_score", 20, "2");
jedisSource.zadd("rank_score", 30, "3");

}

@AfterClass
public static void teardown() {
REDIS_CONTAINER_SOURCE.stop();
REDIS_CONTAINER_SINK.stop();
}

/**
* Test flink sql postgresql cdc to StarRocks
*
* @throws Exception The exception may throws when execute the case
*/
@Test
public void testRedisSourceAndSink() throws Exception {
submitSQLJob(sqlFile, redisJar);
waitUntilJobRunning(Duration.ofSeconds(30));
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals("value_1_1", jedisSink.get("1_1"));
assertEquals("value_2_2", jedisSink.get("2_2"));
});
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals("value_1", jedisSink.hget("3_3", "1"));
assertEquals("value_2", jedisSink.hget("3_3", "2"));
});
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals("10.0", jedisSink.hget("rank_score_test", "1"));
assertEquals("20.0", jedisSink.hget("rank_score_test", "2"));
assertEquals("30.0", jedisSink.hget("rank_score_test", "3"));
});
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals("2", jedisSink.hget("rank_test", "1"));
assertEquals("1", jedisSink.hget("rank_test", "2"));
assertEquals("0", jedisSink.hget("rank_test", "3"));
});
}

}
Loading

0 comments on commit 8903184

Please sign in to comment.