From 28b59230d1b688fcf679dd2628e5c28e4e62fcec Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha <34186745+vaibhav-yb@users.noreply.github.com> Date: Tue, 22 Nov 2022 20:19:20 +0530 Subject: [PATCH] Revert "[#92] Connector changes to support composite types (#30)" This reverts commit 53992d818b2b7abd69865d6b13f9720529905738. --- .../debezium/connector/yugabytedb/PgOid.java | 1 - .../yugabytedb/YugabyteDBValueConverter.java | 2 - .../YugabyteDBCompositeTypesTest.java | 114 ------------------ .../yugabytedb/YugabyteDBDatatypesTest.java | 2 + .../yugabytedb/common/YugabyteDBTestBase.java | 10 +- .../resources/drop_tables_and_databases.ddl | 2 +- 6 files changed, 10 insertions(+), 121 deletions(-) delete mode 100644 src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompositeTypesTest.java diff --git a/src/main/java/io/debezium/connector/yugabytedb/PgOid.java b/src/main/java/io/debezium/connector/yugabytedb/PgOid.java index 651bd581..649f2525 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/PgOid.java +++ b/src/main/java/io/debezium/connector/yugabytedb/PgOid.java @@ -41,5 +41,4 @@ public final class PgOid extends Oid { public static final int INT8RANGE_OID = 3926; public static final int INT8RANGE_ARRAY = 3927; public static final int ENUM_OID = 3500; - public static final int RECORD_OID = 2249; } diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBValueConverter.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBValueConverter.java index a96cabb2..266407ac 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBValueConverter.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBValueConverter.java @@ -203,7 +203,6 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.NUM_RANGE_OID: case PgOid.INT8RANGE_OID: case PgOid.ENUM_OID: - case PgOid.RECORD_OID: return SchemaBuilder.string(); case PgOid.UUID: return Uuid.builder(); @@ -394,7 +393,6 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.NUM_RANGE_OID: case PgOid.INT8RANGE_OID: case PgOid.ENUM_OID: - case PgOid.RECORD_OID: return data -> convertString(column, fieldDefn, data); case PgOid.POINT: return data -> convertPoint(column, fieldDefn, data); diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompositeTypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompositeTypesTest.java deleted file mode 100644 index b16a62a4..00000000 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBCompositeTypesTest.java +++ /dev/null @@ -1,114 +0,0 @@ -package io.debezium.connector.yugabytedb; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.YugabyteYSQLContainer; - -import io.debezium.config.Configuration; -import io.debezium.connector.yugabytedb.common.YugabyteDBTestBase; -import io.debezium.util.Strings; - -/** - * Basic unit tests to check the behaviour of Composite types with CDC in YugabyteDB. - * - * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) - */ -public class YugabyteDBCompositeTypesTest extends YugabyteDBTestBase { - private final static Logger LOGGER = - LoggerFactory.getLogger(YugabyteDBCompositeTypesTest.class); - private static YugabyteYSQLContainer ybContainer; - - private final String TABLE_NAME = "composite_types_table"; - - // Fully qualified table name with schema - private final String FQ_TABLE_NAME = "public." + TABLE_NAME; - - // Helper SQL statements for creation and insertion into the table - private final String CREATE_TYPE = - "CREATE TYPE my_name_type AS (first_name text, last_name varchar(40));"; - private final String CREATE_TABLE = - "CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, name_col my_name_type);"; - private final String INSERT_FORMAT = - "INSERT INTO " + TABLE_NAME + " VALUES (%s, ('Vaibhav', 'Kushwaha'));"; - - @BeforeClass - public static void beforeClass() throws Exception { - ybContainer = TestHelper.getYbContainer(); - ybContainer.start(); - - TestHelper.setContainerHostPort(ybContainer.getHost(), ybContainer.getMappedPort(5433)); - TestHelper.setMasterAddress(ybContainer.getHost() + ":" + ybContainer.getMappedPort(7100)); - TestHelper.dropAllSchemas(); - } - - @Before - public void beforeEachTest() throws Exception { - initializeConnectorTestFramework(); - } - - @After - public void afterEachTest() throws Exception { - stopConnector(); - TestHelper.executeDDL("drop_tables_and_databases.ddl"); - } - - @AfterClass - public static void afterClass() throws Exception { - ybContainer.stop(); - } - - @Test - public void validateBasicTestBasedUdt() throws Exception { - TestHelper.dropAllSchemas(); - TestHelper.execute(CREATE_TYPE); - TestHelper.execute(CREATE_TABLE); - - String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", TABLE_NAME); - Configuration.Builder configBuilder = TestHelper.getConfigBuilder(FQ_TABLE_NAME, dbStreamId); - start(YugabyteDBConnector.class, configBuilder.build()); - - final int recordsCount = 5; - awaitUntilConnectorIsReady(); - - TestHelper.executeBulk(INSERT_FORMAT, recordsCount); - - CompletableFuture.runAsync(() -> verifyUDTValue(recordsCount)) - .exceptionally(throwable -> { - throw new RuntimeException(throwable); - }).get(); - } - - private void verifyUDTValue(long recordsCount) { - int totalConsumedRecords = 0; - long start = System.currentTimeMillis(); - List records = new ArrayList<>(); - while (totalConsumedRecords < recordsCount) { - int consumed = super.consumeAvailableRecords(record -> { - LOGGER.info("The record being consumed is " + record); - records.add(record); - }); - if (consumed > 0) { - totalConsumedRecords += consumed; - LOGGER.debug("Consumed " + totalConsumedRecords + " records"); - } - } - LOGGER.info("Total duration to consume " + recordsCount + " records: " - + Strings.duration(System.currentTimeMillis() - start)); - - for (int i = 0; i < records.size(); ++i) { - // Verify the records with values - assertValueField(records.get(i), "after/id/value", i); - assertValueField(records.get(i), "after/name_col/value", "(Vaibhav,Kushwaha)"); - } - } -} diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java index 02e6ed03..09be2d24 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBDatatypesTest.java @@ -22,12 +22,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.jupiter.api.Disabled; import org.testcontainers.containers.YugabyteYSQLContainer; import io.debezium.config.Configuration; import io.debezium.connector.yugabytedb.common.YugabyteDBTestBase; import io.debezium.connector.yugabytedb.transforms.YBExtractNewRecordState; import io.debezium.transforms.ExtractNewRecordStateConfigDefinition; +import io.debezium.util.Strings; /** * Basic unit tests to check the behaviour with YugabyteDB datatypes diff --git a/src/test/java/io/debezium/connector/yugabytedb/common/YugabyteDBTestBase.java b/src/test/java/io/debezium/connector/yugabytedb/common/YugabyteDBTestBase.java index e527a60c..89d0a833 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/common/YugabyteDBTestBase.java +++ b/src/test/java/io/debezium/connector/yugabytedb/common/YugabyteDBTestBase.java @@ -9,10 +9,14 @@ public class YugabyteDBTestBase extends AbstractConnectorTest { public void awaitUntilConnectorIsReady() throws Exception { Awaitility.await() - .pollDelay(Duration.ofSeconds(10)) - .atMost(Duration.ofSeconds(15)) + .pollDelay(Duration.ofSeconds(5)) + .atMost(Duration.ofSeconds(10)) .until(() -> { - return engine.isRunning(); + if (engine.isRunning()) { + return true; + } else { + return false; + } }); } } diff --git a/src/test/resources/drop_tables_and_databases.ddl b/src/test/resources/drop_tables_and_databases.ddl index 65df5e65..032a5c5e 100644 --- a/src/test/resources/drop_tables_and_databases.ddl +++ b/src/test/resources/drop_tables_and_databases.ddl @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS all_types; DROP TABLE IF EXISTS test_enum; -DROP TABLE IF EXISTS composite_types_table; DROP DATABASE IF EXISTS secondary_database; +