batcher : batchers.values()) {
+ // We must flush the batchers to respect CONFIG_MAX_BATCH_SIZE.
+ // We flush asynchronously and await the results instead.
+ batcher.sendOutstanding();
+ }
+ }
+
+ /**
+ * Applies the mutations using inserts.
+ *
+ * Note that no batching is used.
+ *
+ * @param mutations Mutations to be applied.
+ * @param perRecordResults {@link Map} the per-record results will be written to.
+ */
+ @VisibleForTesting
+ void insertRows(
+ Map mutations, Map> perRecordResults) {
+ logger.trace("insertRows(#records={})", mutations.size());
+ for (Map.Entry recordEntry : mutations.entrySet()) {
+ // We keep compatibility with Confluent's sink and disallow batching operations that check if
+ // the row already exists.
+ SinkRecord record = recordEntry.getKey();
+ MutationData recordMutationData = recordEntry.getValue();
+ ConditionalRowMutation insert =
+ // We want to perform the mutation if and only if the row does not already exist.
+ ConditionalRowMutation.create(
+ recordMutationData.getTargetTable(), recordMutationData.getRowKey())
+ // We first check if any cell of this row exists...
+ .condition(Filters.FILTERS.pass())
+ // ... and perform the mutation only if no cell exists.
+ .otherwise(recordMutationData.getInsertMutation());
+ boolean insertSuccessful;
+ Optional exceptionThrown = Optional.empty();
+ try {
+ insertSuccessful = !bigtableData.checkAndMutateRow(insert);
+ } catch (ApiException e) {
+ insertSuccessful = false;
+ exceptionThrown = Optional.of(e);
+ }
+ perRecordResults.put(
+ record,
+ insertSuccessful
+ ? CompletableFuture.completedFuture(null)
+ : CompletableFuture.failedFuture(
+ exceptionThrown.orElse(
+ new ConnectException("Insert failed since the row already existed."))));
+ }
+ }
+
+ /**
+ * Handles results of the whole operation.
+ *
+ * @param perRecordResults Results to be handled.
+ */
+ @VisibleForTesting
+ void handleResults(Map> perRecordResults) {
+ logger.trace("handleResults(#records={})", perRecordResults.size());
+ for (Map.Entry> recordResult : perRecordResults.entrySet()) {
+ try {
+ recordResult.getValue().get();
+ } catch (ExecutionException | InterruptedException e) {
+ SinkRecord record = recordResult.getKey();
+ reportError(record, e);
+ }
+ }
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManager.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManager.java
new file mode 100644
index 000000000..7db1f3110
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManager.java
@@ -0,0 +1,470 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.autocreate;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.models.ColumnFamily;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest;
+import com.google.cloud.bigtable.admin.v2.models.Table;
+import com.google.cloud.kafka.connect.bigtable.exception.BatchException;
+import com.google.cloud.kafka.connect.bigtable.mapping.MutationData;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class responsible for the creation of Cloud Bigtable {@link Table Table(s)} and {@link
+ * ColumnFamily ColumnFamily(s)} needed by the transformed Kafka Connect records.
+ *
+ * This class contains nontrivial logic since we try to avoid API calls if possible.
+ *
+ *
This class does not automatically rediscover deleted resources. If another user of the Cloud
+ * Bigtable instance deletes a table or a column, the sink using an instance of this class to
+ * auto-create the resources, might end up sending requests targeting nonexistent {@link Table
+ * Table(s)} and/or {@link ColumnFamily ColumnFamily(s)}.
+ */
+public class BigtableSchemaManager {
+ @VisibleForTesting protected Logger logger = LoggerFactory.getLogger(BigtableSchemaManager.class);
+
+ private final BigtableTableAdminClient bigtable;
+
+ /**
+ * A {@link Map} storing the names of existing Cloud Bigtable tables as keys and existing column
+ * families within these tables as the values.
+ *
+ *
We have a single data structure for table and column family caches to ensure that they are
+ * consistent.
+ * An {@link Optional#empty()} value means that a table exists, but we don't know what column
+ * families it contains.
+ */
+ @VisibleForTesting protected Map>> tableNameToColumnFamilies;
+
+ /**
+ * The default constructor.
+ *
+ * @param bigtable The Cloud Bigtable admin client used to auto-create {@link Table Table(s)} and
+ * {@link ColumnFamily ColumnFamily(s)}.
+ */
+ public BigtableSchemaManager(BigtableTableAdminClient bigtable) {
+ this.bigtable = bigtable;
+ tableNameToColumnFamilies = new HashMap<>();
+ }
+
+ /**
+ * Ensures that all the {@link Table Table(s)} needed by the input records exist by attempting to
+ * create the missing ones.
+ *
+ * @param recordsAndOutputs A {@link Map} containing {@link SinkRecord SinkRecord(s)} and their
+ * matching {@link MutationData} specifying which {@link Table Table(s)} need to exist.
+ * @return A {@link ResourceCreationResult} containing {@link SinkRecord SinkRecord(s)} for whose
+ * {@link MutationData} auto-creation of {@link Table Table(s)} failed.
+ */
+ public ResourceCreationResult ensureTablesExist(Map recordsAndOutputs) {
+ Map> recordsByTableNames = getTableNamesToRecords(recordsAndOutputs);
+
+ Map> recordsByMissingTableNames =
+ missingTablesToRecords(recordsByTableNames);
+ if (recordsByMissingTableNames.isEmpty()) {
+ return ResourceCreationResult.empty();
+ }
+ logger.debug("Missing {} tables", recordsByMissingTableNames.size());
+ Map, ResourceAndRecords> recordsByCreateTableFutures =
+ sendCreateTableRequests(recordsByMissingTableNames);
+ // No cache update here since we create tables with no column families, so every (non-delete)
+ // write to the table will need to create needed column families first, so saving the data from
+ // the response gives us no benefit.
+ // We ignore errors to handle races between multiple tasks of a single connector and refresh
+ // the cache in a further step.
+ Set dataErrors =
+ awaitResourceCreationAndHandleInvalidInputErrors(
+ recordsByCreateTableFutures, "Error creating a Cloud Bigtable table: %s");
+ refreshTableNamesCache();
+ Set bigtableErrors =
+ missingTablesToRecords(recordsByMissingTableNames).values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ bigtableErrors.removeAll(dataErrors);
+ return new ResourceCreationResult(bigtableErrors, dataErrors);
+ }
+
+ /**
+ * Ensures that all the {@link ColumnFamily ColumnFamily(s)} needed by the input records exist by
+ * attempting to create the missing ones.
+ *
+ * This method will not try to create missing {@link Table Table(s)} tables if some of the
+ * needed ones do not exist, but it will handle that case gracefully.
+ *
+ * @param recordsAndOutputs A {@link Map} containing {@link SinkRecord SinkRecord(s)} and their
+ * matching {@link MutationData} specifying which {@link ColumnFamily ColumnFamily(s)} need to
+ * exist.
+ * @return A {@link ResourceCreationResult} containing {@link SinkRecord SinkRecord(s)} for whose
+ * {@link MutationData} needed {@link Table Table(s)} are missing or auto-creation of {@link
+ * ColumnFamily ColumnFamily(s)} failed.
+ */
+ public ResourceCreationResult ensureColumnFamiliesExist(
+ Map recordsAndOutputs) {
+ Map, List> recordsByColumnFamilies =
+ getTableColumnFamiliesToRecords(recordsAndOutputs);
+
+ Map, List> recordsByMissingColumnFamilies =
+ missingTableColumnFamiliesToRecords(recordsByColumnFamilies);
+ if (recordsByMissingColumnFamilies.isEmpty()) {
+ return ResourceCreationResult.empty();
+ }
+ logger.debug("Missing {} column families", recordsByMissingColumnFamilies.size());
+ Map, ResourceAndRecords>>
+ recordsByCreateColumnFamilyFutures =
+ sendCreateColumnFamilyRequests(recordsByMissingColumnFamilies);
+
+ // No cache update here since the requests are handled by Cloud Bigtable in a random order.
+ // We ignore errors to handle races between multiple tasks of a single connector
+ // and refresh the cache in a further step.
+ Set dataErrors =
+ awaitResourceCreationAndHandleInvalidInputErrors(
+ recordsByCreateColumnFamilyFutures, "Error creating a Cloud Bigtable column family %s");
+
+ Set tablesRequiringRefresh =
+ recordsByMissingColumnFamilies.keySet().stream()
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ refreshTableColumnFamiliesCache(tablesRequiringRefresh);
+
+ Map, List> missing =
+ missingTableColumnFamiliesToRecords(recordsByMissingColumnFamilies);
+ Set bigtableErrors =
+ missing.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ bigtableErrors.removeAll(dataErrors);
+ return new ResourceCreationResult(bigtableErrors, dataErrors);
+ }
+
+ /**
+ * @param recordsAndOutputs A {@link Map} containing {@link SinkRecord SinkRecords} and
+ * corresponding Cloud Bigtable mutations.
+ * @return A {@link Map} containing Cloud Bigtable table names and {@link SinkRecord SinkRecords}
+ * that need these tables to exist.
+ */
+ private static Map> getTableNamesToRecords(
+ Map recordsAndOutputs) {
+ Map> tableNamesToRecords = new HashMap<>();
+ for (Map.Entry rowEntry : recordsAndOutputs.entrySet()) {
+ SinkRecord record = rowEntry.getKey();
+ String tableName = rowEntry.getValue().getTargetTable();
+ List records =
+ tableNamesToRecords.computeIfAbsent(tableName, k -> new ArrayList<>());
+ records.add(record);
+ }
+ return tableNamesToRecords;
+ }
+
+ /**
+ * @param recordsAndOutputs A {@link Map} containing {@link SinkRecord SinkRecords} and
+ * corresponding Cloud Bigtable mutations.
+ * @return A {@link Map} containing {@link Map.Entry Map.Entry(s)} consisting of Bigtable table
+ * names and column families and {@link SinkRecord SinkRecords} that need to use these tables
+ * and column families to exist.
+ */
+ private static Map, List> getTableColumnFamiliesToRecords(
+ Map recordsAndOutputs) {
+ Map, List> tableColumnFamiliesToRecords = new HashMap<>();
+ for (Map.Entry e : recordsAndOutputs.entrySet()) {
+ SinkRecord record = e.getKey();
+ MutationData recordMutationData = e.getValue();
+ String tableName = recordMutationData.getTargetTable();
+ for (String columnFamily : recordMutationData.getRequiredColumnFamilies()) {
+ Map.Entry key =
+ new AbstractMap.SimpleImmutableEntry<>(tableName, columnFamily);
+ List records =
+ tableColumnFamiliesToRecords.computeIfAbsent(key, k -> new ArrayList<>());
+ records.add(record);
+ }
+ }
+ return tableColumnFamiliesToRecords;
+ }
+
+ /**
+ * Refreshes the existing table names in the cache.
+ *
+ * Note that it deletes the entries from the cache if the tables disappear.
+ */
+ @VisibleForTesting
+ void refreshTableNamesCache() {
+ Set tables;
+ try {
+ tables = new HashSet<>(bigtable.listTables());
+ } catch (ApiException e) {
+ logger.error("listTables() exception", e);
+ // We don't allow listTables() to fail. It means something is seriously wrong, so we fail the
+ // whole batch.
+ throw new BatchException(e);
+ }
+ for (String key : new HashSet<>(tableNameToColumnFamilies.keySet())) {
+ if (!tables.contains(key)) {
+ tableNameToColumnFamilies.remove(key);
+ }
+ }
+ for (String table : tables) {
+ tableNameToColumnFamilies.putIfAbsent(table, Optional.empty());
+ }
+ }
+
+ /**
+ * Refreshes existing table names and a subset of existing column families in the cache.
+ *
+ * Note that it deletes the entries from the cache if the tables disappeared and that it
+ * doesn't modify column family caches of tables that aren't provided as an argument.
+ *
+ * @param tablesRequiringRefresh A {@link Set} of table names whose column family caches will be
+ * refreshed.
+ */
+ @VisibleForTesting
+ void refreshTableColumnFamiliesCache(Set tablesRequiringRefresh) {
+ refreshTableNamesCache();
+ List>> tableFutures =
+ tableNameToColumnFamilies.keySet().stream()
+ .filter(tablesRequiringRefresh::contains)
+ .map(t -> new AbstractMap.SimpleImmutableEntry<>(t, bigtable.getTableAsync(t)))
+ .collect(Collectors.toList());
+ Map>> newCache = new HashMap<>(tableNameToColumnFamilies);
+ for (Map.Entry> entry : tableFutures) {
+ String tableName = entry.getKey();
+ try {
+ Table tableDetails = entry.getValue().get();
+ Set tableColumnFamilies =
+ tableDetails.getColumnFamilies().stream()
+ .map(ColumnFamily::getId)
+ .collect(Collectors.toSet());
+ newCache.put(tableName, Optional.of(tableColumnFamilies));
+ } catch (ExecutionException | InterruptedException e) {
+ // We don't allow getTable() to fail. If it does, the entry is removed from the cache. This
+ // way its SinkRecord will be failed by ensureColumnFamiliesExist(). The alternative is to
+ // throw an exception and fail the whole batch that way.
+ logger.warn("getTable({}) exception", tableName, e);
+ newCache.remove(tableName);
+ }
+ }
+ // Note that we update the cache atomically to avoid partial errors. If an unexpected exception
+ // is thrown, the whole batch is failed. It's not ideal, but in line with the behavior of other
+ // connectors.
+ tableNameToColumnFamilies = newCache;
+ }
+
+ /**
+ * @param tableNamesToRecords A {@link Map} containing Cloud Bigtable table names and {@link
+ * SinkRecord SinkRecords} that need these tables to exist.
+ * @return A subset of the input argument with the entries corresponding to existing tables
+ * removed.
+ */
+ private Map> missingTablesToRecords(
+ Map> tableNamesToRecords) {
+ Map> recordsByMissingTableNames = new HashMap<>(tableNamesToRecords);
+ recordsByMissingTableNames.keySet().removeAll(tableNameToColumnFamilies.keySet());
+ return recordsByMissingTableNames;
+ }
+
+ /**
+ * @param tableColumnFamiliesToRecords A {@link Map} containing {@link Map.Entry} consisting of
+ * Bigtable table names and column families and {@link SinkRecord SinkRecords} that need to
+ * use these tables and column families to exist.
+ * @return A subset of the input argument with the entries corresponding to existing column
+ * families removed.
+ */
+ private Map, List> missingTableColumnFamiliesToRecords(
+ Map, List> tableColumnFamiliesToRecords) {
+ Map, List> recordsByMissingColumnFamilies =
+ new HashMap<>(tableColumnFamiliesToRecords);
+ for (Map.Entry>> existingEntry :
+ tableNameToColumnFamilies.entrySet()) {
+ String tableName = existingEntry.getKey();
+ for (String columnFamily : existingEntry.getValue().orElse(new HashSet<>())) {
+ recordsByMissingColumnFamilies.remove(
+ new AbstractMap.SimpleImmutableEntry<>(tableName, columnFamily));
+ }
+ }
+ return recordsByMissingColumnFamilies;
+ }
+
+ private Map, ResourceAndRecords> sendCreateTableRequests(
+ Map> recordsByMissingTables) {
+ Map, ResourceAndRecords> result = new HashMap<>();
+ for (Map.Entry> e : recordsByMissingTables.entrySet()) {
+ ResourceAndRecords resourceAndRecords =
+ new ResourceAndRecords<>(e.getKey(), e.getValue());
+ result.put(createTable(e.getKey()), resourceAndRecords);
+ }
+ return result;
+ }
+
+ private Map, ResourceAndRecords>>
+ sendCreateColumnFamilyRequests(
+ Map, List> recordsByMissingColumnFamilies) {
+ Map, ResourceAndRecords>> result = new HashMap<>();
+ for (Map.Entry, List> e :
+ recordsByMissingColumnFamilies.entrySet()) {
+ ResourceAndRecords> resourceAndRecords =
+ new ResourceAndRecords<>(e.getKey(), e.getValue());
+ result.put(createColumnFamily(e.getKey()), resourceAndRecords);
+ }
+ return result;
+ }
+
+ private ApiFuture createTable(String tableName) {
+ logger.info("Creating table '{}'", tableName);
+ CreateTableRequest createTableRequest = CreateTableRequest.of(tableName);
+ return bigtable.createTableAsync(createTableRequest);
+ }
+
+ // We only issue one request at a time because each multi-column-family operation on a single
+ // Table is atomic and fails if any of the Column Families to be created already exists.
+ // Thus by sending multiple requests, we simplify error handling when races between multiple
+ // tasks of a single connector happen.
+ private ApiFuture createColumnFamily(Map.Entry tableNameAndColumnFamily) {
+ String tableName = tableNameAndColumnFamily.getKey();
+ String columnFamily = tableNameAndColumnFamily.getValue();
+ logger.info("Creating column family '{}' in table '{}'", columnFamily, tableName);
+ ModifyColumnFamiliesRequest request =
+ ModifyColumnFamiliesRequest.of(tableName).addFamily(columnFamily);
+ return bigtable.modifyFamiliesAsync(request);
+ }
+
+ /**
+ * Awaits resource auto-creation result futures and handles the errors.
+ *
+ * The errors might be handled in two ways:
+ *
+ *
+ * If a resource's creation failed with an exception signifying that the request was
+ * invalid, it is assumed that input {@link SinkRecord SinkRecord(s)} map to invalid values,
+ * so all the {@link SinkRecord SinkRecord(s)} needing the resource whose creation failed
+ * are returned.
+ * Other resource creation errors are logged.
+ *
+ *
+ * @param createdColumnFamilyFuturesAndRecords {@link Map} of {@link ApiFuture ApiFuture(s)} and
+ * information what resource is created and for which {@link SinkRecord SinkRecord(s)}.
+ * @param errorMessageTemplate The Java format string template of error message with which Cloud
+ * Bigtable exceptions for valid input data are logged.
+ * @return A {@link Set} of {@link SinkRecord SinkRecord(s)} for which auto resource creation
+ * failed due to their invalid data.
+ * @param {@link ApiFuture} containing result of the resource creation operation.
+ * @param The resources' type identifier.
+ */
+ @VisibleForTesting
+ , Id> Set awaitResourceCreationAndHandleInvalidInputErrors(
+ Map> createdColumnFamilyFuturesAndRecords,
+ String errorMessageTemplate) {
+ Set dataErrors = new HashSet<>();
+ createdColumnFamilyFuturesAndRecords.forEach(
+ (fut, resourceAndRecords) -> {
+ Object resource = resourceAndRecords.getResource();
+ List sinkRecords = resourceAndRecords.getRecords();
+ try {
+ fut.get();
+ } catch (ExecutionException | InterruptedException e) {
+ String errorMessage = String.format(errorMessageTemplate, resource.toString());
+ if (SchemaApiExceptions.isCausedByInputError(e)) {
+ dataErrors.addAll(sinkRecords);
+ } else {
+ logger.info(errorMessage, e);
+ }
+ }
+ });
+ return dataErrors;
+ }
+
+ /**
+ * A record class connecting an auto-created resource and {@link SinkRecord SinkRecord(s)}
+ * requiring it to exist.
+ *
+ * @param The resources' type identifier.
+ */
+ @VisibleForTesting
+ static class ResourceAndRecords {
+ private final Id resource;
+ private final List records;
+
+ public ResourceAndRecords(Id resource, List records) {
+ this.resource = resource;
+ this.records = records;
+ }
+
+ public Id getResource() {
+ return resource;
+ }
+
+ public List getRecords() {
+ return records;
+ }
+ }
+
+ /**
+ * A helper class containing logic for grouping {@link ApiException ApiException(s)} encountered
+ * when modifying Cloud Bigtable schema.
+ */
+ @VisibleForTesting
+ static class SchemaApiExceptions {
+ /**
+ * @param t Exception thrown by some function using Cloud Bigtable API.
+ * @return true if input exception was caused by invalid Cloud Bigtable request, false
+ * otherwise.
+ */
+ @VisibleForTesting
+ static boolean isCausedByInputError(Throwable t) {
+ return maybeExtractBigtableStatusCode(t)
+ .map(sc -> isStatusCodeCausedByInputError(sc.getCode()))
+ .orElse(false);
+ }
+
+ @VisibleForTesting
+ static Optional maybeExtractBigtableStatusCode(Throwable t) {
+ while (t != null) {
+ if (t instanceof ApiException) {
+ ApiException apiException = (ApiException) t;
+ return Optional.of(apiException.getStatusCode());
+ }
+ t = t.getCause();
+ }
+ return Optional.empty();
+ }
+
+ @VisibleForTesting
+ static boolean isStatusCodeCausedByInputError(StatusCode.Code code) {
+ switch (code) {
+ case INVALID_ARGUMENT:
+ case FAILED_PRECONDITION:
+ case OUT_OF_RANGE:
+ return true;
+ default:
+ return false;
+ }
+ }
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/ResourceCreationResult.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/ResourceCreationResult.java
new file mode 100644
index 000000000..f4de7efca
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/autocreate/ResourceCreationResult.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.autocreate;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+/** A record class storing the output of {@link BigtableSchemaManager} operations. */
+public class ResourceCreationResult {
+ private final Set bigtableErrors;
+ private final Set dataErrors;
+
+ public static ResourceCreationResult empty() {
+ return new ResourceCreationResult(new HashSet<>(), new HashSet<>());
+ }
+
+ public ResourceCreationResult(Set bigtableErrors, Set dataErrors) {
+ this.bigtableErrors = bigtableErrors;
+ this.dataErrors = dataErrors;
+ }
+
+ /**
+ * @return A {@link Set} of {@link SinkRecord SinkRecord(s)} for which resource auto-creation
+ * failed due to some problems on Cloud Bigtable part.
+ */
+ public Set getBigtableErrors() {
+ return bigtableErrors;
+ }
+
+ /**
+ * @return A {@link Set} of {@link SinkRecord SinkRecord(s)} for which resource auto-creation
+ * failed due to invalid input data. These records should not ever be retried.
+ */
+ public Set getDataErrors() {
+ return dataErrors;
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableErrorMode.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableErrorMode.java
new file mode 100644
index 000000000..f8447c854
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableErrorMode.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.config;
+
+public enum BigtableErrorMode {
+ FAIL,
+ WARN,
+ IGNORE,
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java
new file mode 100644
index 000000000..0fe27aff4
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java
@@ -0,0 +1,548 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.config;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.admin.v2.stub.BigtableTableAdminStubSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.threeten.bp.Duration;
+import org.threeten.bp.temporal.ChronoUnit;
+
+/**
+ * A class defining the configuration of {@link
+ * com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector}.
+ *
+ * It's responsible for the validation and parsing of the user-provided values.
+ */
+public class BigtableSinkConfig extends AbstractConfig {
+ public static final String CONFIG_GCP_PROJECT_ID = "gcp.bigtable.project.id";
+ public static final String CONFIG_GCP_CREDENTIALS_PATH = "gcp.bigtable.credentials.path";
+ public static final String CONFIG_GCP_CREDENTIALS_JSON = "gcp.bigtable.credentials.json";
+ public static final String CONFIG_BIGTABLE_INSTANCE_ID = "gcp.bigtable.instance.id";
+ public static final String CONFIG_BIGTABLE_APP_PROFILE_ID = "gcp.bigtable.app.profile.id";
+ public static final String CONFIG_INSERT_MODE = "insert.mode";
+ public static final String CONFIG_MAX_BATCH_SIZE = "max.batch.size";
+ public static final String CONFIG_VALUE_NULL_MODE = "value.null.mode";
+ public static final String CONFIG_ERROR_MODE = "error.mode";
+ public static final String CONFIG_TABLE_NAME_FORMAT = "table.name.format";
+ public static final String CONFIG_ROW_KEY_DEFINITION = "row.key.definition";
+ public static final String CONFIG_ROW_KEY_DELIMITER = "row.key.delimiter";
+ public static final String CONFIG_AUTO_CREATE_TABLES = "auto.create.tables";
+ public static final String CONFIG_AUTO_CREATE_COLUMN_FAMILIES = "auto.create.column.families";
+ public static final String CONFIG_DEFAULT_COLUMN_FAMILY = "default.column.family";
+ public static final String CONFIG_DEFAULT_COLUMN_QUALIFIER = "default.column.qualifier";
+ public static final String CONFIG_RETRY_TIMEOUT_MILLIS = "retry.timeout.ms";
+ private static final InsertMode DEFAULT_INSERT_MODE = InsertMode.INSERT;
+ private static final NullValueMode DEFAULT_NULL_VALUE_MODE = NullValueMode.WRITE;
+ private static final BigtableErrorMode DEFAULT_ERROR_MODE = BigtableErrorMode.FAIL;
+ private static final Integer DEFAULT_MAX_BATCH_SIZE = 1;
+ private static final List BIGTABLE_CONFIGURATION_PROPERTIES =
+ List.of(
+ CONFIG_GCP_CREDENTIALS_JSON,
+ CONFIG_GCP_CREDENTIALS_PATH,
+ CONFIG_GCP_PROJECT_ID,
+ CONFIG_BIGTABLE_INSTANCE_ID,
+ CONFIG_BIGTABLE_APP_PROFILE_ID);
+ private static final int BIGTABLE_CREDENTIALS_CHECK_TIMEOUT_SECONDS = 2;
+
+ protected BigtableSinkConfig(ConfigDef definition, Map properties) {
+ super(definition, properties);
+ }
+
+ /**
+ * The main constructor.
+ *
+ * @param properties The properties provided by the user.
+ */
+ public BigtableSinkConfig(Map properties) {
+ this(getDefinition(), properties);
+ }
+
+ /**
+ * Validates that a valid {@link BigtableSinkConfig} can be created using the input properties.
+ *
+ * @param props The properties provided by the user.
+ * @return {@link Config} containing validation results.
+ */
+ public static Config validate(Map props) {
+ return validate(props, true);
+ }
+
+ /**
+ * Validates that a valid {@link BigtableSinkConfig} can be created using the input properties.
+ *
+ * @param props The properties provided by the user.
+ * @param accessBigtableToValidateConfiguration If set to true, validation includes checking
+ * whether the Cloud Bigtable configuration is valid by connecting to Cloud Bigtable and
+ * attempting to execute a simple read-only operation.
+ * @return {@link Config} containing validation results.
+ */
+ @VisibleForTesting
+ static Config validate(Map props, boolean accessBigtableToValidateConfiguration) {
+ // Note that we only need to verify the properties we define, the generic Sink configuration is
+ // handled in SinkConnectorConfig::validate().
+ String credentialsPath = props.get(CONFIG_GCP_CREDENTIALS_PATH);
+ String credentialsJson = props.get(CONFIG_GCP_CREDENTIALS_JSON);
+ String insertMode = props.get(CONFIG_INSERT_MODE);
+ String maxBatchSize = props.get(CONFIG_MAX_BATCH_SIZE);
+ String effectiveInsertMode =
+ Optional.ofNullable(insertMode).orElse(DEFAULT_INSERT_MODE.name()).toUpperCase();
+ String effectiveMaxBatchSize =
+ Optional.ofNullable(maxBatchSize).orElse(DEFAULT_MAX_BATCH_SIZE.toString()).trim();
+
+ Map validationResult = getDefinition().validateAll(props);
+ if (!Utils.isBlank(credentialsPath) && !Utils.isBlank(credentialsJson)) {
+ String errorMessage =
+ CONFIG_GCP_CREDENTIALS_JSON
+ + " and "
+ + CONFIG_GCP_CREDENTIALS_PATH
+ + " are mutually exclusive options, but both are set.";
+ addErrorMessage(validationResult, CONFIG_GCP_CREDENTIALS_JSON, credentialsJson, errorMessage);
+ addErrorMessage(validationResult, CONFIG_GCP_CREDENTIALS_PATH, credentialsPath, errorMessage);
+ }
+ if (effectiveInsertMode.equals(InsertMode.INSERT.name())
+ && !effectiveMaxBatchSize.equals("1")) {
+ String errorMessage =
+ "When using `"
+ + CONFIG_INSERT_MODE
+ + "` of `insert`, "
+ + CONFIG_MAX_BATCH_SIZE
+ + " must be set to `1`.";
+ addErrorMessage(validationResult, CONFIG_INSERT_MODE, insertMode, errorMessage);
+ addErrorMessage(validationResult, CONFIG_MAX_BATCH_SIZE, maxBatchSize, errorMessage);
+ }
+
+ if (accessBigtableToValidateConfiguration
+ && validationResult.values().stream().allMatch(v -> v.errorMessages().isEmpty())) {
+ // We validate the user's credentials in order to warn them early rather than fill DLQ
+ // with records whose processing would fail due to invalid credentials.
+ // We only call it after validating that all other parameters are fine since creating
+ // a Cloud Bigtable client uses many of these parameters, and we don't want to warn
+ // the user unnecessarily.
+ BigtableSinkConfig config = new BigtableSinkConfig(props);
+ if (!config.isBigtableConfigurationValid()) {
+ String errorMessage = "Cloud Bigtable configuration is invalid.";
+ for (String bigtableProp : BIGTABLE_CONFIGURATION_PROPERTIES) {
+ addErrorMessage(validationResult, bigtableProp, props.get(bigtableProp), errorMessage);
+ }
+ }
+ }
+ return new Config(new ArrayList<>(validationResult.values()));
+ }
+
+ /**
+ * @return {@link ConfigDef} used by Kafka Connect to advertise configuration options to the user
+ * and by us to perform basic validation of the user-provided values.
+ */
+ public static ConfigDef getDefinition() {
+ return new ConfigDef()
+ .define(
+ CONFIG_GCP_PROJECT_ID,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.CompositeValidator.of(
+ new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
+ ConfigDef.Importance.HIGH,
+ "The ID of the GCP project.")
+ .define(
+ CONFIG_BIGTABLE_INSTANCE_ID,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.CompositeValidator.of(
+ new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
+ ConfigDef.Importance.HIGH,
+ "The ID of the Cloud Bigtable instance.")
+ .define(
+ CONFIG_BIGTABLE_APP_PROFILE_ID,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.Importance.MEDIUM,
+ "The application profile that the connector should use. If none is supplied,"
+ + " the default app profile will be used.")
+ .define(
+ CONFIG_GCP_CREDENTIALS_PATH,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.Importance.HIGH,
+ "The path to the JSON service key file. Configure at most one of `"
+ + CONFIG_GCP_CREDENTIALS_PATH
+ + "` and `"
+ + CONFIG_GCP_CREDENTIALS_JSON
+ + "`. If neither is provided, Application Default Credentials will be used.")
+ .define(
+ CONFIG_GCP_CREDENTIALS_JSON,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.Importance.HIGH,
+ "The path to the JSON service key file. Configure at most one of `"
+ + CONFIG_GCP_CREDENTIALS_PATH
+ + "` and `"
+ + CONFIG_GCP_CREDENTIALS_JSON
+ + "`. If neither is provided, Application Default Credentials will be used.")
+ .define(
+ CONFIG_INSERT_MODE,
+ ConfigDef.Type.STRING,
+ DEFAULT_INSERT_MODE.name(),
+ enumValidator(InsertMode.values()),
+ ConfigDef.Importance.HIGH,
+ "Defines the insertion mode to use. Supported modes are:"
+ + "\n- insert - Insert new record only."
+ + " If the row to be written already exists in the table, an error is thrown."
+ + "\n- upsert - If the row to be written already exists,"
+ + " then its column values are overwritten with the ones provided.")
+ .define(
+ CONFIG_MAX_BATCH_SIZE,
+ ConfigDef.Type.INT,
+ DEFAULT_MAX_BATCH_SIZE,
+ ConfigDef.Range.atLeast(1),
+ ConfigDef.Importance.MEDIUM,
+ "The maximum number of records that can be batched into a batch of upserts."
+ + " Note that since only a batch size of 1 for inserts is supported, `"
+ + CONFIG_MAX_BATCH_SIZE
+ + "` must be exactly `1` when `"
+ + CONFIG_INSERT_MODE
+ + "` is set to `INSERT`.")
+ .define(
+ CONFIG_VALUE_NULL_MODE,
+ ConfigDef.Type.STRING,
+ DEFAULT_NULL_VALUE_MODE.name(),
+ enumValidator(NullValueMode.values()),
+ ConfigDef.Importance.MEDIUM,
+ "Defines what to do with `null` Kafka values. Supported modes are:"
+ + "\n- write - Serialize `null`s to empty byte arrays."
+ + "\n- ignore - Ignore `null`s."
+ + "\n- delete - Use them to issue DELETE commands. Root-level `null` deletes a"
+ + " row. `null` nested one level deletes a column family named after the"
+ + " `null`-valued field. `null` nested two levels deletes a column named after the"
+ + " `null`-valued field in column family named after the `null-valued` field parent"
+ + " field. `null` values nested more than two levels are serialized like other"
+ + " values and don't result in any DELETE commands.")
+ .define(
+ CONFIG_ERROR_MODE,
+ ConfigDef.Type.STRING,
+ DEFAULT_ERROR_MODE.name(),
+ enumValidator(BigtableErrorMode.values()),
+ ConfigDef.Importance.MEDIUM,
+ "Specifies how to handle errors that result from writes, after retries. It is ignored"
+ + " if DLQ is configured. Supported modes are:"
+ + "\n- fail - The connector fails and must be manually restarted."
+ + "\n- warn - The connector logs a warning and continues operating normally."
+ + "\n- ignore - The connector does not log a warning but continues operating"
+ + " normally.")
+ .define(
+ CONFIG_TABLE_NAME_FORMAT,
+ ConfigDef.Type.STRING,
+ "${topic}",
+ ConfigDef.CompositeValidator.of(
+ new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString()),
+ ConfigDef.Importance.MEDIUM,
+ "Name of the destination table. Use `${topic}` within the table name to specify"
+ + " the originating topic name.\nFor example, `user_${topic}` for the topic `stats`"
+ + " will map to the table name `user_stats`.")
+ .define(
+ CONFIG_ROW_KEY_DEFINITION,
+ ConfigDef.Type.LIST,
+ "",
+ ConfigDef.Importance.MEDIUM,
+ "A comma separated list of Kafka Record key field names that specifies the order of"
+ + " Kafka key fields to be concatenated to form the row key."
+ + "\nFor example the list: `username, post_id, time_stamp` when applied to a Kafka"
+ + " key: `{'username': 'bob','post_id': '213', 'time_stamp': '123123'}` and with"
+ + " delimiter `#` gives the row key `bob#213#123123`. You can also access terms"
+ + " nested in the key by using `.` as a delimiter. If this configuration is empty"
+ + " or unspecified and the Kafka Message Key is a"
+ + "\n- struct, all the fields in the struct are used to construct the row key."
+ + "\n- byte array, the row key is set to the byte array as is."
+ + "\n- primitive, the row key is set to the primitive stringified."
+ + "If prefixes, more complicated delimiters, and string constants are required in"
+ + " your Row Key, consider configuring an SMT to add relevant fields to the Kafka"
+ + " Record key.")
+ .define(
+ CONFIG_ROW_KEY_DELIMITER,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.LOW,
+ "The delimiter used in concatenating Kafka key fields in the row key. If this"
+ + " configuration is empty or unspecified, the key fields will be concatenated"
+ + " together directly.")
+ .define(
+ CONFIG_AUTO_CREATE_TABLES,
+ ConfigDef.Type.BOOLEAN,
+ false,
+ new ConfigDef.NonNullValidator(),
+ ConfigDef.Importance.MEDIUM,
+ "Whether to automatically create the destination table if it is found to be missing."
+ + "\nWhen enabled, the records for which the auto-creation fails, are failed."
+ + "\nRecreation of tables deleted by other Cloud Bigtable users is not supported.")
+ .define(
+ CONFIG_AUTO_CREATE_COLUMN_FAMILIES,
+ ConfigDef.Type.BOOLEAN,
+ false,
+ new ConfigDef.NonNullValidator(),
+ ConfigDef.Importance.MEDIUM,
+ "Whether to automatically create missing columns families in the table relative to the"
+ + " record schema."
+ + "\nDoes not imply auto-creation of tables."
+ + "\nWhen enabled, the records for which the auto-creation fails, are failed."
+ + "\nRecreation of column families deleted by other Cloud Bigtable users is not"
+ + " supported.")
+ .define(
+ CONFIG_DEFAULT_COLUMN_FAMILY,
+ ConfigDef.Type.STRING,
+ "default",
+ ConfigDef.Importance.MEDIUM,
+ "Any root-level fields on the SinkRecord that aren't objects will be added to this"
+ + " column family. If empty, the fields will be ignored.")
+ .define(
+ CONFIG_DEFAULT_COLUMN_QUALIFIER,
+ ConfigDef.Type.STRING,
+ "KAFKA_VALUE",
+ ConfigDef.Importance.MEDIUM,
+ "Any root-level values on the SinkRecord that aren't objects will be added to this"
+ + " column within default column family. If empty, the value will be ignored.")
+ .define(
+ CONFIG_RETRY_TIMEOUT_MILLIS,
+ ConfigDef.Type.LONG,
+ 90000,
+ ConfigDef.Range.atLeast(0),
+ ConfigDef.Importance.MEDIUM,
+ "Maximum time in milliseconds allocated for retrying database operations before trying"
+ + " other error handling mechanisms.");
+ }
+
+ /**
+ * Adds a validation error in the format expected by {@link BigtableSinkConfig#validate(Map)}.
+ *
+ * @param validatedConfig Input/output parameter containing current validation result.
+ * @param name Configuration parameter name.
+ * @param value Configuration parameter value.
+ * @param errorMessage Error message to be added.
+ */
+ private static void addErrorMessage(
+ Map validatedConfig, String name, String value, String errorMessage) {
+ validatedConfig
+ .computeIfAbsent(
+ name, p -> new ConfigValue(name, value, Collections.emptyList(), new ArrayList<>()))
+ .addErrorMessage(errorMessage);
+ }
+
+ public NullValueMode getNullValueMode() {
+ return getEnum(CONFIG_VALUE_NULL_MODE, NullValueMode::valueOf);
+ }
+
+ public BigtableErrorMode getBigtableErrorMode() {
+ return getEnum(CONFIG_ERROR_MODE, BigtableErrorMode::valueOf);
+ }
+
+ public InsertMode getInsertMode() {
+ return getEnum(CONFIG_INSERT_MODE, InsertMode::valueOf);
+ }
+
+ /**
+ * @return {@link BigtableTableAdminClient} connected to a Cloud Bigtable instance configured as
+ * described in {@link BigtableSinkConfig#getDefinition()}.
+ */
+ public BigtableTableAdminClient getBigtableAdminClient() {
+ RetrySettings retrySettings = getRetrySettings();
+ return getBigtableAdminClient(retrySettings);
+ }
+
+ @VisibleForTesting
+ BigtableTableAdminClient getBigtableAdminClient(RetrySettings retrySettings) {
+ Optional credentialsProvider =
+ getUserConfiguredBigtableCredentialsProvider();
+
+ BigtableTableAdminSettings.Builder adminSettingsBuilder =
+ BigtableTableAdminSettings.newBuilder()
+ .setProjectId(getString(BigtableSinkTaskConfig.CONFIG_GCP_PROJECT_ID))
+ .setInstanceId(getString(BigtableSinkTaskConfig.CONFIG_BIGTABLE_INSTANCE_ID));
+ if (credentialsProvider.isPresent()) {
+ adminSettingsBuilder.setCredentialsProvider(credentialsProvider.get());
+ } else {
+ // Use the default credential provider that utilizes Application Default Credentials.
+ }
+
+ BigtableTableAdminStubSettings.Builder adminStubSettings = adminSettingsBuilder.stubSettings();
+ adminStubSettings.createTableSettings().setRetrySettings(retrySettings);
+ adminStubSettings.modifyColumnFamiliesSettings().setRetrySettings(retrySettings);
+ adminStubSettings.listTablesSettings().setRetrySettings(retrySettings);
+ adminStubSettings.getTableSettings().setRetrySettings(retrySettings);
+ try {
+ return BigtableTableAdminClient.create(adminSettingsBuilder.build());
+ } catch (IOException e) {
+ throw new RetriableException(e);
+ }
+ }
+
+ /**
+ * @return {@link BigtableDataClient} connected to Cloud Bigtable instance configured as described
+ * in {@link BigtableSinkConfig#getDefinition()}.
+ */
+ public BigtableDataClient getBigtableDataClient() {
+ RetrySettings retrySettings = getRetrySettings();
+ Optional credentialsProvider =
+ getUserConfiguredBigtableCredentialsProvider();
+
+ BigtableDataSettings.Builder dataSettingsBuilder =
+ BigtableDataSettings.newBuilder()
+ .setProjectId(getString(BigtableSinkTaskConfig.CONFIG_GCP_PROJECT_ID))
+ .setInstanceId(getString(BigtableSinkTaskConfig.CONFIG_BIGTABLE_INSTANCE_ID));
+ if (credentialsProvider.isPresent()) {
+ dataSettingsBuilder.setCredentialsProvider(credentialsProvider.get());
+ } else {
+ // Use the default credential provider that utilizes Application Default Credentials.
+ }
+ String appProfileId = getString(BigtableSinkTaskConfig.CONFIG_BIGTABLE_APP_PROFILE_ID);
+ if (appProfileId == null) {
+ dataSettingsBuilder.setDefaultAppProfileId();
+ } else {
+ dataSettingsBuilder.setAppProfileId(appProfileId);
+ }
+
+ EnhancedBigtableStubSettings.Builder dataStubSettings = dataSettingsBuilder.stubSettings();
+ dataStubSettings.mutateRowSettings().setRetrySettings(retrySettings);
+ dataStubSettings.bulkMutateRowsSettings().setRetrySettings(retrySettings);
+ dataStubSettings.readRowSettings().setRetrySettings(retrySettings);
+ dataStubSettings.readRowsSettings().setRetrySettings(retrySettings);
+
+ try {
+ return BigtableDataClient.create(dataSettingsBuilder.build());
+ } catch (IOException e) {
+ throw new RetriableException(e);
+ }
+ }
+
+ /**
+ * Checks whether Cloud Bigtable configuration is valid by connecting to Cloud Bigtable and
+ * attempting to execute a simple read-only operation.
+ *
+ * @return true if Cloud Bigtable configuration is valid, false otherwise.
+ */
+ @VisibleForTesting
+ boolean isBigtableConfigurationValid() {
+ BigtableTableAdminClient bigtable = null;
+ try {
+ RetrySettings retrySettings =
+ RetrySettings.newBuilder()
+ .setMaxAttempts(0)
+ .setTotalTimeout(
+ Duration.of(BIGTABLE_CREDENTIALS_CHECK_TIMEOUT_SECONDS, ChronoUnit.SECONDS))
+ .build();
+ bigtable = getBigtableAdminClient(retrySettings);
+ bigtable.listTables();
+ return true;
+ } catch (Throwable t) {
+ return false;
+ } finally {
+ if (bigtable != null) {
+ bigtable.close();
+ }
+ }
+ }
+
+ /**
+ * @return {@link RetrySettings} of Cloud Bigtable clients configured as described in {@link
+ * BigtableSinkConfig#getDefinition()}.
+ */
+ protected RetrySettings getRetrySettings() {
+ return RetrySettings.newBuilder()
+ .setTotalTimeout(
+ Duration.of(
+ getLong(BigtableSinkTaskConfig.CONFIG_RETRY_TIMEOUT_MILLIS), ChronoUnit.MILLIS))
+ .build();
+ }
+
+ /**
+ * Extracts typed enum value from this object.
+ *
+ * @param configName Enum parameter name in {@link BigtableSinkConfig}.
+ * @param converter Function that parses parameter value into an enum value. It's assumed to throw
+ * only {@link NullPointerException} and {@link IllegalArgumentException}.
+ * @return Parsed enum value.
+ * @param Enum type.
+ */
+ private T getEnum(String configName, Function converter) {
+ String s = this.getString(configName);
+ try {
+ return converter.apply(s.toUpperCase());
+ } catch (NullPointerException | IllegalArgumentException e) {
+ throw new ConfigException(configName, s);
+ }
+ }
+
+ private static ConfigDef.Validator enumValidator(Enum>[] enumValues) {
+ return ConfigDef.CaseInsensitiveValidString.in(
+ Arrays.stream(enumValues).map(Enum::name).toArray(String[]::new));
+ }
+
+ /**
+ * @return {@link Optional#empty()} if the user didn't configure the Cloud Bigtable credentials,
+ * {@link Optional} containing {@link CredentialsProvider} configured as described in {@link
+ * BigtableSinkConfig#getDefinition()} otherwise.
+ */
+ protected Optional getUserConfiguredBigtableCredentialsProvider() {
+ String credentialsJson = getString(BigtableSinkTaskConfig.CONFIG_GCP_CREDENTIALS_JSON);
+ String credentialsPath = getString(BigtableSinkTaskConfig.CONFIG_GCP_CREDENTIALS_PATH);
+ byte[] credentials;
+ if (!Utils.isBlank(credentialsJson)) {
+ credentials = credentialsJson.getBytes(StandardCharsets.UTF_8);
+ } else if (!Utils.isBlank(credentialsPath)) {
+ try (FileInputStream is = new FileInputStream(credentialsPath)) {
+ credentials = is.readAllBytes();
+ } catch (IOException e) {
+ throw new ConfigException(
+ String.format("Error getting credentials from file: %s.", credentialsPath));
+ }
+ } else {
+ // We will use the default CredentialsProvider, which doesn't need any application-level
+ // configuration.
+ return Optional.empty();
+ }
+ try {
+ return Optional.of(
+ FixedCredentialsProvider.create(
+ GoogleCredentials.fromStream(new ByteArrayInputStream(credentials))));
+ } catch (IOException e) {
+ throw new ConfigException("Cloud Bigtable credentials creation failed.");
+ }
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkTaskConfig.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkTaskConfig.java
new file mode 100644
index 000000000..48129c32f
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkTaskConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.config;
+
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+
+/**
+ * A class defining configuration of {@link
+ * com.google.cloud.kafka.connect.bigtable.BigtableSinkTask}.
+ */
+public class BigtableSinkTaskConfig extends BigtableSinkConfig {
+ public static String CONFIG_TASK_ID = "taskId";
+
+ /**
+ * The main constructor.
+ *
+ * @param properties The properties provided by the caller.
+ */
+ public BigtableSinkTaskConfig(Map properties) {
+ super(getDefinition(), properties);
+ }
+
+ /**
+ * @return {@link ConfigDef} used by Kafka Connect to advertise configuration options to the user
+ * and by us to perform basic validation of the user-provided values.
+ */
+ public static ConfigDef getDefinition() {
+ return BigtableSinkConfig.getDefinition()
+ .defineInternal(
+ CONFIG_TASK_ID,
+ ConfigDef.Type.INT,
+ ConfigDef.NO_DEFAULT_VALUE,
+ ConfigDef.Importance.LOW);
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/InsertMode.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/InsertMode.java
new file mode 100644
index 000000000..a34481aa3
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/InsertMode.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.config;
+
+public enum InsertMode {
+ INSERT,
+ UPSERT,
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/NullValueMode.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/NullValueMode.java
new file mode 100644
index 000000000..ad7a208f7
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/NullValueMode.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.config;
+
+public enum NullValueMode {
+ WRITE,
+ IGNORE,
+ DELETE,
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/exception/BatchException.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/exception/BatchException.java
new file mode 100644
index 000000000..67575ac26
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/exception/BatchException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.exception;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A wrapper exception class that may be thrown to explicitly mark a throw as supposed to fail an
+ * entire batch of input records.
+ */
+public class BatchException extends ConnectException {
+ public BatchException(Throwable t) {
+ super(t);
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/exception/InvalidBigtableSchemaModificationException.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/exception/InvalidBigtableSchemaModificationException.java
new file mode 100644
index 000000000..8c2fcfc5b
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/exception/InvalidBigtableSchemaModificationException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.exception;
+
+import org.apache.kafka.connect.errors.DataException;
+
+/**
+ * An {@link Exception} that signifies that input {@link org.apache.kafka.connect.sink.SinkRecord
+ * SinkRecord(s)} cause attempt of invalid Cloud Bigtable schema modification and thus is invalid
+ * and should not be retried.
+ */
+public class InvalidBigtableSchemaModificationException extends DataException {
+ public InvalidBigtableSchemaModificationException(String message) {
+ super(message);
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java
new file mode 100644
index 000000000..81436dc90
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/KeyMapper.java
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.mapping;
+
+import com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+
+/**
+ * A class responsible for converting Kafka {@link org.apache.kafka.connect.sink.SinkRecord
+ * SinkRecord(s)} into Cloud Bigtable row keys.
+ */
+public class KeyMapper {
+ final List> definition;
+ final byte[] delimiter;
+
+ /**
+ * The main constructor.
+ *
+ * @param delimiter Delimiter in the mapping as per {@link
+ * com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig#CONFIG_ROW_KEY_DELIMITER}
+ * @param definition Definition of the mapping as per {@link
+ * com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig#CONFIG_ROW_KEY_DEFINITION}.
+ */
+ public KeyMapper(String delimiter, List definition) {
+ this.delimiter = delimiter.getBytes(StandardCharsets.UTF_8);
+ this.definition =
+ definition.stream()
+ .map(s -> s.split("\\."))
+ .map(Arrays::asList)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Converts input data into Cloud Bigtable row key bytes as described in {@link
+ * BigtableSinkConfig#getDefinition()}.
+ *
+ * @param kafkaKey An {@link Object} to be converted into Cloud Bigtable row key.
+ * @return {@link Optional#empty()} if the input doesn't convert into a valid Cloud Bigtable row
+ * key, {@link Optional} containing row Cloud Bigtable row key bytes the input converts into
+ * otherwise.
+ */
+ public byte[] getKey(Object kafkaKey) {
+ ensureKeyElementIsNotNull(kafkaKey);
+ Stream keyParts =
+ this.getDefinition(kafkaKey).stream()
+ .map((d) -> serializeTopLevelKeyElement(extractField(kafkaKey, d.iterator())));
+ return concatenateByteArrays(new byte[0], keyParts, delimiter, new byte[0]);
+ }
+
+ /**
+ * Returns key definition as configured during object creation or extracted from the object being
+ * mapped if it's been configured to an empty {@link List}.
+ *
+ * @param kafkaKey {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key.
+ * @return {@link List} containing {@link List Lists} of key fields that need to be retrieved and
+ * concatenated to construct the Cloud Bigtable row key.
+ * See {@link KeyMapper#extractField(Object, Iterator)} for details on semantics of the
+ * inner list.
+ */
+ private List> getDefinition(Object kafkaKey) {
+ if (this.definition.isEmpty()) {
+ Optional> maybeRootFields = getFieldsOfRootValue(kafkaKey);
+ if (maybeRootFields.isEmpty()) {
+ List rootElementDefinition = List.of();
+ return List.of(rootElementDefinition);
+ } else {
+ return maybeRootFields.get().stream()
+ .map(Collections::singletonList)
+ .collect(Collectors.toList());
+ }
+ }
+ return this.definition;
+ }
+
+ /**
+ * Extracts names of child fields of the value.
+ *
+ * @param kafkaKey {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key.
+ * @return {@link Optional#empty()} if the input value has no children, {@link Optional}
+ * containing names of its child fields otherwise.
+ */
+ private static Optional> getFieldsOfRootValue(Object kafkaKey) {
+ if (kafkaKey instanceof Struct) {
+ return Optional.of(
+ ((Struct) kafkaKey)
+ .schema().fields().stream().map(Field::name).collect(Collectors.toList()));
+ } else if (kafkaKey instanceof Map) {
+ return Optional.of(
+ ((Map, ?>) kafkaKey)
+ .keySet().stream().map(Object::toString).collect(Collectors.toList()));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Extract possibly nested fields from the input value.
+ *
+ * @param value {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key or some its
+ * child.
+ * @param fields Fields that need to be accessed before the target value is reached.
+ * @return Extracted nested field.
+ */
+ private Object extractField(Object value, Iterator fields) {
+ ensureKeyElementIsNotNull(value);
+ if (!fields.hasNext()) {
+ return value;
+ }
+ String field = fields.next();
+ if (value instanceof Struct) {
+ Struct struct = (Struct) value;
+ // Note that getWithoutDefault() throws if such a field does not exist.
+ return extractField(struct.getWithoutDefault(field), fields);
+ } else if (value instanceof Map) {
+ Map, ?> map = (Map, ?>) value;
+ if (!map.containsKey(field)) {
+ throw new DataException("Map contains no value for key `" + field + "`.");
+ }
+ return extractField(map.get(field), fields);
+ } else {
+ throw new DataException(
+ "Unexpected class `"
+ + value.getClass()
+ + "` doesn't "
+ + "support extracting field `"
+ + field
+ + "` using a dot.");
+ }
+ }
+
+ private static byte[] serializeTopLevelKeyElement(Object keyElement) {
+ ensureKeyElementIsNotNull(keyElement);
+ return serializeKeyElement(keyElement);
+ }
+
+ /**
+ * Serializes Kafka Connect entry key.
+ *
+ * We implement custom serialization since {@link Object#toString()} mangles arrays.
+ *
+ * @param keyElement {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's} key to be
+ * serialized.
+ * @return Serialization of the input value.
+ */
+ private static byte[] serializeKeyElement(Object keyElement) {
+ if (keyElement == null) {
+ // Note that it's needed for serializing null-containing Maps and Lists.
+ return "null".getBytes(StandardCharsets.UTF_8);
+ } else if (keyElement instanceof byte[]) {
+ // Note that it breaks compatibility with Confluent's sink.
+ return (byte[]) keyElement;
+ } else if (keyElement instanceof ByteBuffer) {
+ return ((ByteBuffer) keyElement).array();
+ } else if (keyElement instanceof List) {
+ List> list = (List>) keyElement;
+ return concatenateByteArrays(
+ "[", list.stream().map(o -> o.toString().getBytes(StandardCharsets.UTF_8)), ", ", "]");
+ } else if (keyElement instanceof Map) {
+ Map, ?> map = (Map, ?>) keyElement;
+ return concatenateByteArrays(
+ "{",
+ map.entrySet().stream()
+ .map(
+ e ->
+ concatenateByteArrays(
+ new byte[0],
+ Stream.of(
+ serializeKeyElement(e.getKey()), serializeKeyElement(e.getValue())),
+ "=".getBytes(StandardCharsets.UTF_8),
+ new byte[0])),
+ // Note that Map and Struct have different delimiters for compatibility's sake.
+ ", ",
+ "}");
+ } else if (keyElement instanceof Struct) {
+ Struct struct = (Struct) keyElement;
+ return concatenateByteArrays(
+ "Struct{",
+ struct.schema().fields().stream()
+ .flatMap(
+ f ->
+ Optional.ofNullable(struct.get(f))
+ .map(v -> new AbstractMap.SimpleImmutableEntry<>(f.name(), v))
+ .stream())
+ .map(
+ e ->
+ concatenateByteArrays(
+ new byte[0],
+ Stream.of(
+ serializeKeyElement(e.getKey()), serializeKeyElement(e.getValue())),
+ "=".getBytes(StandardCharsets.UTF_8),
+ new byte[0])),
+ // Note that Map and Struct have different delimiters for compatibility's sake.
+ ",",
+ "}");
+ } else {
+ // TODO: handle logical data types.
+ return keyElement.toString().getBytes(StandardCharsets.UTF_8);
+ }
+ }
+
+ private static void ensureKeyElementIsNotNull(Object value) {
+ if (value == null) {
+ // Matching Confluent's sink behavior.
+ throw new DataException("Error with row key definition: row key fields cannot be null.");
+ }
+ }
+
+ private static byte[] concatenateByteArrays(
+ byte[] start, Stream byteArrays, byte[] delimiter, byte[] end) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ bos.write(start);
+ for (Iterator it = byteArrays.iterator(); it.hasNext(); ) {
+ byte[] keyPart = it.next();
+ bos.write(keyPart);
+ if (it.hasNext()) {
+ bos.write(delimiter);
+ }
+ }
+ bos.write(end);
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw new DataException("Concatenation of Cloud Bigtable key failed.", e);
+ }
+ }
+
+ private static byte[] concatenateByteArrays(
+ String start, Stream byteArrays, String delimiter, String end) {
+ return concatenateByteArrays(
+ start.getBytes(StandardCharsets.UTF_8),
+ byteArrays,
+ delimiter.getBytes(StandardCharsets.UTF_8),
+ end.getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/MutationData.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/MutationData.java
new file mode 100644
index 000000000..65d4fc1c4
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/MutationData.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.mapping;
+
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.protobuf.ByteString;
+import java.util.Set;
+
+/**
+ * A class representing single Kafka {@link org.apache.kafka.connect.sink.SinkRecord SinkRecord's}
+ * output to be written into Cloud Bigtable.
+ */
+public class MutationData {
+ private final String targetTable;
+ private final ByteString rowKey;
+ private final Mutation mutation;
+ private final Set requiredColumnFamilies;
+
+ public MutationData(
+ String targetTable,
+ ByteString rowKey,
+ Mutation mutation,
+ Set requiredColumnFamilies) {
+ this.targetTable = targetTable;
+ this.rowKey = rowKey;
+ this.mutation = mutation;
+ this.requiredColumnFamilies = requiredColumnFamilies;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public ByteString getRowKey() {
+ return rowKey;
+ }
+
+ public RowMutationEntry getUpsertMutation() {
+ return RowMutationEntry.createFromMutationUnsafe(this.rowKey, this.mutation);
+ }
+
+ public Mutation getInsertMutation() {
+ return mutation;
+ }
+
+ public Set getRequiredColumnFamilies() {
+ return requiredColumnFamilies;
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/MutationDataBuilder.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/MutationDataBuilder.java
new file mode 100644
index 000000000..229853dae
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/MutationDataBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.mapping;
+
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Range;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** A builder class for {@link MutationData}. */
+public class MutationDataBuilder {
+ private final Mutation mutation;
+ private boolean mutationIsEmpty;
+ private final Set requiredColumnFamilies;
+
+ @VisibleForTesting
+ MutationDataBuilder(Mutation mutation) {
+ this.mutation = mutation;
+ mutationIsEmpty = true;
+ requiredColumnFamilies = new HashSet<>();
+ }
+
+ public MutationDataBuilder() {
+ this(Mutation.create());
+ }
+
+ /**
+ * Tries to convert this object into {@link MutationData}.
+ *
+ * @param targetTable - Cloud Bigtable {@link com.google.cloud.bigtable.admin.v2.models.Table}
+ * this mutation is to be written to.
+ * @param rowKey - Cloud Bigtable row key this mutation is to be written to.
+ * @return {@link Optional#empty()} if this mutation is empty, an {@link Optional} containing this
+ * mutation ready to be written to Cloud Bigtable otherwise.
+ */
+ public Optional maybeBuild(String targetTable, ByteString rowKey) {
+ return this.mutationIsEmpty
+ ? Optional.empty()
+ : Optional.of(
+ new MutationData(targetTable, rowKey, this.mutation, this.requiredColumnFamilies));
+ }
+
+ public void deleteRow() {
+ mutationIsEmpty = false;
+ mutation.deleteRow();
+ }
+
+ public void deleteFamily(String columnFamily) {
+ mutationIsEmpty = false;
+ mutation.deleteFamily(columnFamily);
+ }
+
+ public void deleteCells(
+ String columnFamily, ByteString columnQualifier, Range.TimestampRange timestampRange) {
+ mutationIsEmpty = false;
+ mutation.deleteCells(columnFamily, columnQualifier, timestampRange);
+ }
+
+ public void setCell(
+ String columnFamily, ByteString columnQualifier, long timestampMicros, ByteString value) {
+ mutationIsEmpty = false;
+ requiredColumnFamilies.add(columnFamily);
+ mutation.setCell(columnFamily, columnQualifier, timestampMicros, value);
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java
new file mode 100644
index 000000000..f97fc78f5
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/mapping/ValueMapper.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.mapping;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.google.cloud.ByteArray;
+import com.google.cloud.bigtable.data.v2.models.Range;
+import com.google.cloud.kafka.connect.bigtable.config.NullValueMode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+
+/**
+ * A class responsible for converting Kafka {@link org.apache.kafka.connect.sink.SinkRecord
+ * SinkRecord(s)} into Cloud Bigtable {@link com.google.cloud.bigtable.data.v2.models.Mutation
+ * Mutation(s)}.
+ */
+public class ValueMapper {
+ public final String defaultColumnFamily;
+ public final ByteString defaultColumnQualifier;
+ private final NullValueMode nullMode;
+ private static final ObjectMapper jsonMapper = getJsonMapper();
+
+ /**
+ * The main constructor.
+ *
+ * @param defaultColumnFamily Default column family as per {@link
+ * com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig#CONFIG_DEFAULT_COLUMN_FAMILY}.
+ * @param defaultColumnQualifier Default column as per {@link
+ * com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig#CONFIG_ROW_KEY_DELIMITER}.
+ */
+ public ValueMapper(
+ String defaultColumnFamily, String defaultColumnQualifier, @Nonnull NullValueMode nullMode) {
+ this.defaultColumnFamily = Utils.isBlank(defaultColumnFamily) ? null : defaultColumnFamily;
+ this.defaultColumnQualifier =
+ Utils.isBlank(defaultColumnQualifier)
+ ? null
+ : ByteString.copyFrom(defaultColumnQualifier.getBytes(StandardCharsets.UTF_8));
+ this.nullMode = nullMode;
+ }
+
+ /**
+ * Creates a {@link MutationDataBuilder} that can be used to create a {@link MutationData}
+ * representing the input Kafka Connect value as Cloud Bigtable mutations that need to be applied.
+ *
+ * @param rootKafkaValue The value to be converted into Cloud Bigtable {@link
+ * com.google.cloud.bigtable.data.v2.models.Mutation Mutation(s)}.
+ * @param timestampMicros The timestamp the mutations will be created at in microseconds.
+ */
+ public MutationDataBuilder getRecordMutationDataBuilder(
+ Object rootKafkaValue, long timestampMicros) {
+ MutationDataBuilder mutationDataBuilder = createMutationDataBuilder();
+ if (rootKafkaValue == null && nullMode == NullValueMode.IGNORE) {
+ // Do nothing
+ } else if (rootKafkaValue == null && nullMode == NullValueMode.DELETE) {
+ mutationDataBuilder.deleteRow();
+ } else if (rootKafkaValue instanceof Map || rootKafkaValue instanceof Struct) {
+ for (Map.Entry field : getChildren(rootKafkaValue)) {
+ String kafkaFieldName = field.getKey().toString();
+ Object kafkaFieldValue = field.getValue();
+ if (kafkaFieldValue == null && nullMode == NullValueMode.IGNORE) {
+ continue;
+ } else if (kafkaFieldValue == null && nullMode == NullValueMode.DELETE) {
+ mutationDataBuilder.deleteFamily(kafkaFieldName);
+ } else if (kafkaFieldValue instanceof Map || kafkaFieldValue instanceof Struct) {
+ for (Map.Entry subfield : getChildren(kafkaFieldValue)) {
+ ByteString kafkaSubfieldName =
+ ByteString.copyFrom(subfield.getKey().toString().getBytes(StandardCharsets.UTF_8));
+ Object kafkaSubfieldValue = subfield.getValue();
+ if (kafkaSubfieldValue == null && nullMode == NullValueMode.IGNORE) {
+ continue;
+ } else if (kafkaSubfieldValue == null && nullMode == NullValueMode.DELETE) {
+ mutationDataBuilder.deleteCells(
+ kafkaFieldName,
+ kafkaSubfieldName,
+ Range.TimestampRange.create(0, timestampMicros));
+ } else {
+ mutationDataBuilder.setCell(
+ kafkaFieldName,
+ kafkaSubfieldName,
+ timestampMicros,
+ ByteString.copyFrom(serialize(kafkaSubfieldValue)));
+ }
+ }
+ } else {
+ if (defaultColumnFamily != null) {
+ mutationDataBuilder.setCell(
+ defaultColumnFamily,
+ ByteString.copyFrom(kafkaFieldName.getBytes(StandardCharsets.UTF_8)),
+ timestampMicros,
+ ByteString.copyFrom(serialize(kafkaFieldValue)));
+ }
+ }
+ }
+ } else {
+ if (defaultColumnFamily != null && defaultColumnQualifier != null) {
+ mutationDataBuilder.setCell(
+ defaultColumnFamily,
+ defaultColumnQualifier,
+ timestampMicros,
+ ByteString.copyFrom(serialize(rootKafkaValue)));
+ }
+ }
+ return mutationDataBuilder;
+ }
+
+ @VisibleForTesting
+ // Method only needed for use in tests. It could be inlined otherwise.
+ protected MutationDataBuilder createMutationDataBuilder() {
+ return new MutationDataBuilder();
+ }
+
+ /**
+ * @param mapOrStruct {@link Map} or {@link Struct} whose children we want to list
+ * @return {@link List} of names or keys of input value's child entries.
+ */
+ private static List> getChildren(Object mapOrStruct) {
+ if (mapOrStruct instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map kafkaMapValue = (Map) mapOrStruct;
+ return new ArrayList<>(kafkaMapValue.entrySet());
+ } else if (mapOrStruct instanceof Struct) {
+ Struct kafkaStructValue = (Struct) mapOrStruct;
+ return kafkaStructValue.schema().fields().stream()
+ .map(
+ f ->
+ new AbstractMap.SimpleImmutableEntry<>(
+ (Object) f.name(), kafkaStructValue.get(f)))
+ .collect(Collectors.toList());
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ /**
+ * @param value Input value.
+ * @return Input value's serialization's bytes that will be written to Cloud Bigtable as a cell's
+ * value.
+ */
+ private static byte[] serialize(Object value) {
+ if (value == null) {
+ return new byte[0];
+ }
+ if (value instanceof byte[]) {
+ return (byte[]) value;
+ } else if (value instanceof ByteArray) {
+ return serialize(((ByteArray) value).toByteArray());
+ } else if (value instanceof Integer) {
+ return Bytes.toBytes((Integer) value);
+ } else if (value instanceof Long) {
+ return Bytes.toBytes((Long) value);
+ } else if (value instanceof Short) {
+ return Bytes.toBytes((Short) value);
+ } else if (value instanceof Byte) {
+ return Bytes.toBytes((Byte) value);
+ } else if (value instanceof Float) {
+ return Bytes.toBytes((Float) value);
+ } else if (value instanceof Double) {
+ return Bytes.toBytes((Double) value);
+ } else if (value instanceof Boolean) {
+ return Bytes.toBytes((Boolean) value);
+ } else if (value instanceof String) {
+ return Bytes.toBytes((String) value);
+ } else if (value instanceof Character) {
+ return serialize(Character.toString((Character) value));
+ } else if (value instanceof Date) {
+ // TODO: implement.
+ throw new DataException("TODO");
+ } else if (value instanceof BigDecimal) {
+ // TODO: implement.
+ throw new DataException("TODO");
+ } else if (value instanceof Map || value instanceof Struct || value instanceof List) {
+ try {
+ return jsonMapper.writeValueAsBytes(value);
+ } catch (JsonProcessingException e) {
+ throw new DataException("Failed to deserialize a(n) " + value.getClass(), e);
+ }
+ } else {
+ throw new DataException(
+ "Unsupported serialization of an unexpected class `" + value.getClass() + "`.");
+ }
+ }
+
+ /**
+ * @return {@link ObjectMapper} that can serialize all the Kafka Connect types.
+ */
+ private static ObjectMapper getJsonMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule mapperModule = new SimpleModule("KafkaConnectSerializer");
+ mapperModule.addSerializer(Struct.class, new StructJsonSerializer(Struct.class));
+ mapper.registerModule(mapperModule);
+ return mapper;
+ }
+
+ private static class StructJsonSerializer extends StdSerializer {
+ protected StructJsonSerializer(Class t) {
+ super(t);
+ }
+
+ @Override
+ public void serialize(Struct value, JsonGenerator gen, SerializerProvider provider)
+ throws IOException {
+ Schema schema = value.schema();
+ gen.writeStartObject();
+ for (Field field : schema.fields()) {
+ String fieldName = field.name();
+ gen.writeObjectField(fieldName, value.getWithoutDefault(fieldName));
+ }
+ gen.writeEndObject();
+ }
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/version/PackageMetadata.java b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/version/PackageMetadata.java
new file mode 100644
index 000000000..0954d8d01
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/main/java/com/google/cloud/kafka/connect/bigtable/version/PackageMetadata.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.version;
+
+import java.util.Optional;
+
+/** A class responsible for extracting maven-generated package metadata. */
+public class PackageMetadata {
+ public static String UNKNOWN_VERSION = "unknown";
+
+ /**
+ * Extracts version information from the package metadata.
+ *
+ * @return String representation of the version of the package. Is equal to {@link
+ * PackageMetadata#UNKNOWN_VERSION} when the information is missing from package metadata.
+ */
+ public static String getVersion() {
+ Optional discoveredVersion = Optional.empty();
+ try {
+ discoveredVersion =
+ Optional.ofNullable(PackageMetadata.class.getPackage().getImplementationVersion());
+ } catch (NullPointerException ignored) {
+ }
+ return discoveredVersion.orElse(UNKNOWN_VERSION);
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkConnectorTest.java b/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkConnectorTest.java
new file mode 100644
index 000000000..6b3141d80
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkConnectorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable;
+
+import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkTaskConfig.CONFIG_TASK_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.cloud.kafka.connect.bigtable.util.BasicPropertiesFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigtableSinkConnectorTest {
+ BigtableSinkConnector connector;
+
+ @Before
+ public void setUp() {
+ connector = new BigtableSinkConnector();
+ }
+
+ @Test
+ public void testConfig() {
+ assertNotNull(connector.config());
+ }
+
+ @Test
+ public void testValidate() {
+ connector.validate(BasicPropertiesFactory.getSinkProps());
+ }
+
+ @Test
+ public void testStart() {
+ connector.start(BasicPropertiesFactory.getSinkProps());
+ }
+
+ @Test
+ public void testStop() {
+ connector.stop();
+ }
+
+ @Test
+ public void testTaskClass() {
+ assertEquals(BigtableSinkTask.class, connector.taskClass());
+ }
+
+ @Test
+ public void testTaskConfigs() {
+ Map connectorConfig = BasicPropertiesFactory.getSinkProps();
+ connector.start(new HashMap<>(connectorConfig));
+ int maxTasks = 1000;
+ List> taskConfigs = connector.taskConfigs(maxTasks);
+ assertEquals(maxTasks, taskConfigs.size());
+ for (Integer i = 0; i < maxTasks; i++) {
+ Map taskConfig = taskConfigs.get(i);
+ assertEquals(i.toString(), taskConfig.get(CONFIG_TASK_ID));
+ taskConfig.remove(CONFIG_TASK_ID);
+ assertEquals(connectorConfig, taskConfig);
+ }
+ }
+
+ @Test
+ public void testVersion() {
+ assertNotNull(connector.version());
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTaskTest.java b/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTaskTest.java
new file mode 100644
index 000000000..3348d4bf6
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/BigtableSinkTaskTest.java
@@ -0,0 +1,612 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable;
+
+import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.CONFIG_AUTO_CREATE_COLUMN_FAMILIES;
+import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.CONFIG_AUTO_CREATE_TABLES;
+import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.CONFIG_ERROR_MODE;
+import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.CONFIG_INSERT_MODE;
+import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.CONFIG_TABLE_NAME_FORMAT;
+import static com.google.cloud.kafka.connect.bigtable.util.FutureUtil.completedApiFuture;
+import static com.google.cloud.kafka.connect.bigtable.util.MockUtil.assertTotalNumberOfInvocations;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.MockitoAnnotations.openMocks;
+
+import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.rpc.ApiException;
+import com.google.bigtable.admin.v2.Table;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.kafka.connect.bigtable.autocreate.BigtableSchemaManager;
+import com.google.cloud.kafka.connect.bigtable.autocreate.ResourceCreationResult;
+import com.google.cloud.kafka.connect.bigtable.config.BigtableErrorMode;
+import com.google.cloud.kafka.connect.bigtable.config.BigtableSinkTaskConfig;
+import com.google.cloud.kafka.connect.bigtable.config.InsertMode;
+import com.google.cloud.kafka.connect.bigtable.exception.InvalidBigtableSchemaModificationException;
+import com.google.cloud.kafka.connect.bigtable.mapping.KeyMapper;
+import com.google.cloud.kafka.connect.bigtable.mapping.MutationData;
+import com.google.cloud.kafka.connect.bigtable.mapping.MutationDataBuilder;
+import com.google.cloud.kafka.connect.bigtable.mapping.ValueMapper;
+import com.google.cloud.kafka.connect.bigtable.util.ApiExceptionFactory;
+import com.google.cloud.kafka.connect.bigtable.util.BasicPropertiesFactory;
+import com.google.cloud.kafka.connect.bigtable.util.FutureUtil;
+import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+
+@RunWith(JUnit4.class)
+public class BigtableSinkTaskTest {
+ TestBigtableSinkTask task;
+ BigtableSinkTaskConfig config;
+ @Mock BigtableDataClient bigtableData;
+ @Mock BigtableTableAdminClient bigtableAdmin;
+ @Mock KeyMapper keyMapper;
+ @Mock ValueMapper valueMapper;
+ @Mock BigtableSchemaManager schemaManager;
+ @Mock SinkTaskContext context;
+ @Mock ErrantRecordReporter errorReporter;
+
+ @Before
+ public void setUp() {
+ openMocks(this);
+ config = new BigtableSinkTaskConfig(BasicPropertiesFactory.getTaskProps());
+ }
+
+ @Test
+ public void testStart() {
+ task = spy(new TestBigtableSinkTask(null, null, null, null, null, null, null));
+ task.start(BasicPropertiesFactory.getTaskProps());
+ }
+
+ @Test
+ public void testStop() throws InterruptedException {
+ for (List test :
+ List.of(
+ List.of(false, false),
+ List.of(false, true),
+ List.of(true, false),
+ List.of(true, true))) {
+ assertEquals(2, test.size());
+ boolean adminIsNotNull = test.get(0);
+ boolean dataIsNotNull = test.get(1);
+ int expectedAdminCloseCallCount = adminIsNotNull ? 1 : 0;
+ int expectedDataCloseCallCount = dataIsNotNull ? 1 : 0;
+
+ BigtableTableAdminClient maybeAdmin = adminIsNotNull ? bigtableAdmin : null;
+ BigtableDataClient maybeData = dataIsNotNull ? bigtableData : null;
+ task = new TestBigtableSinkTask(null, maybeData, maybeAdmin, null, null, null, null);
+ Batcher batcher = mock(Batcher.class);
+ doReturn(completedApiFuture(null)).when(batcher).closeAsync();
+ task.getBatchers().put("batcherTable", batcher);
+
+ doThrow(new RuntimeException()).when(bigtableAdmin).close();
+ doThrow(new RuntimeException()).when(bigtableData).close();
+
+ assertFalse(task.getBatchers().isEmpty());
+ task.stop();
+ assertTrue(task.getBatchers().isEmpty());
+ verify(bigtableAdmin, times(expectedAdminCloseCallCount)).close();
+ verify(bigtableData, times(expectedDataCloseCallCount)).close();
+ verify(batcher, times(1)).closeAsync();
+
+ reset(bigtableAdmin);
+ reset(bigtableData);
+ }
+ }
+
+ @Test
+ public void testVersion() {
+ task = spy(new TestBigtableSinkTask(null, null, null, null, null, null, null));
+ assertNotNull(task.version());
+ }
+
+ @Test
+ public void testGetTableName() {
+ SinkRecord record = new SinkRecord("topic${test}", 1, null, null, null, null, 1);
+ for (Map.Entry test :
+ List.of(
+ new AbstractMap.SimpleImmutableEntry<>(
+ "prefix_${topic}_suffix", "prefix_topic${test}_suffix"),
+ new AbstractMap.SimpleImmutableEntry<>(
+ "prefix_${topic_suffix", "prefix_${topic_suffix"),
+ new AbstractMap.SimpleImmutableEntry<>("prefix_$topic_suffix", "prefix_$topic_suffix"),
+ new AbstractMap.SimpleImmutableEntry<>("prefix_${bad}_suffix", "prefix_${bad}_suffix"),
+ new AbstractMap.SimpleImmutableEntry<>("noSubstitution", "noSubstitution"))) {
+ Map props = BasicPropertiesFactory.getTaskProps();
+ props.put(CONFIG_TABLE_NAME_FORMAT, test.getKey());
+ task =
+ new TestBigtableSinkTask(
+ new BigtableSinkTaskConfig(props), null, null, null, null, null, null);
+ assertEquals(test.getValue(), task.getTableName(record));
+ }
+ }
+
+ @Test
+ public void testCreateRecordMutationDataEmptyKey() {
+ task = new TestBigtableSinkTask(config, null, null, keyMapper, null, null, null);
+ doReturn(new byte[0]).when(keyMapper).getKey(any());
+ SinkRecord record = new SinkRecord("topic", 1, null, new Object(), null, null, 1);
+ assertThrows(ConnectException.class, () -> task.createRecordMutationData(record));
+ }
+
+ @Test
+ public void testCreateRecordMutationDataNonemptyKey() {
+ SinkRecord record = new SinkRecord("topic", 1, null, new Object(), null, null, 1);
+ task = new TestBigtableSinkTask(config, null, null, keyMapper, valueMapper, null, null);
+
+ byte[] rowKey = "rowKey".getBytes(StandardCharsets.UTF_8);
+ doReturn(rowKey).when(keyMapper).getKey(any());
+ doAnswer(
+ i -> {
+ MutationDataBuilder builder = new MutationDataBuilder();
+ return builder;
+ })
+ .when(valueMapper)
+ .getRecordMutationDataBuilder(any(), anyLong());
+ assertTrue(task.createRecordMutationData(record).isEmpty());
+
+ doAnswer(
+ i -> {
+ MutationDataBuilder builder = new MutationDataBuilder();
+ builder.deleteRow();
+ return builder;
+ })
+ .when(valueMapper)
+ .getRecordMutationDataBuilder(any(), anyLong());
+ assertTrue(task.createRecordMutationData(record).isPresent());
+ }
+
+ @Test
+ public void testErrorReporterWithDLQ() {
+ doReturn(errorReporter).when(context).errantRecordReporter();
+ task = new TestBigtableSinkTask(null, null, null, null, null, null, context);
+ SinkRecord record = new SinkRecord(null, 1, null, null, null, null, 1);
+ Throwable t = new Exception("testErrorReporterWithDLQ");
+ verifyNoMoreInteractions(task.getLogger());
+ task.reportError(record, t);
+ verify(errorReporter, times(1)).report(record, t);
+ }
+
+ @Test
+ public void testErrorReporterNoDLQIgnoreMode() {
+ Map props = BasicPropertiesFactory.getTaskProps();
+ props.put(CONFIG_ERROR_MODE, BigtableErrorMode.IGNORE.name());
+ BigtableSinkTaskConfig config = new BigtableSinkTaskConfig(props);
+
+ doThrow(new NoSuchMethodError()).when(context).errantRecordReporter();
+ task = new TestBigtableSinkTask(config, null, null, null, null, null, context);
+ SinkRecord record = new SinkRecord(null, 1, null, null, null, null, 1);
+ verifyNoMoreInteractions(task.getLogger());
+ verifyNoMoreInteractions(errorReporter);
+ task.reportError(record, new Exception("testErrorReporterWithDLQ"));
+ }
+
+ @Test
+ public void testErrorReporterNoDLQWarnMode() {
+ Map props = BasicPropertiesFactory.getTaskProps();
+ props.put(CONFIG_ERROR_MODE, BigtableErrorMode.WARN.name());
+ BigtableSinkTaskConfig config = new BigtableSinkTaskConfig(props);
+
+ doReturn(null).when(context).errantRecordReporter();
+ task = new TestBigtableSinkTask(config, null, null, null, null, null, context);
+ SinkRecord record = new SinkRecord(null, 1, null, "key", null, null, 1);
+ Throwable t = new Exception("testErrorReporterNoDLQWarnMode");
+ verifyNoMoreInteractions(errorReporter);
+ task.reportError(record, t);
+ verify(task.getLogger(), times(1)).warn(anyString(), eq(record.key()), eq(t));
+ }
+
+ @Test
+ public void testErrorReporterNoDLQFailMode() {
+ Map props = BasicPropertiesFactory.getTaskProps();
+ props.put(CONFIG_ERROR_MODE, BigtableErrorMode.FAIL.name());
+ BigtableSinkTaskConfig config = new BigtableSinkTaskConfig(props);
+
+ doReturn(null).when(context).errantRecordReporter();
+ task = new TestBigtableSinkTask(config, null, null, null, null, null, context);
+ SinkRecord record = new SinkRecord(null, 1, null, "key", null, null, 1);
+ Throwable t = new Exception("testErrorReporterNoDLQFailMode");
+ verifyNoMoreInteractions(errorReporter);
+ verifyNoMoreInteractions(task.getLogger());
+ assertThrows(ConnectException.class, () -> task.reportError(record, t));
+ }
+
+ @Test
+ public void testGetTimestamp() {
+ task = new TestBigtableSinkTask(null, null, null, null, null, null, null);
+ long timestampMillis = 123L;
+ SinkRecord recordWithTimestamp =
+ new SinkRecord(
+ null, 1, null, null, null, null, 1, timestampMillis, TimestampType.CREATE_TIME);
+ SinkRecord recordWithNullTimestamp = new SinkRecord(null, 1, null, null, null, null, 2);
+
+ assertEquals(
+ (Long) (1000L * timestampMillis), (Long) task.getTimestampMicros(recordWithTimestamp));
+ assertNotNull(task.getTimestampMicros(recordWithNullTimestamp));
+
+ // Assertion that the Java Bigtable client doesn't support microsecond timestamp granularity.
+ // When it starts supporting it, getTimestamp() will need to get modified.
+ assertEquals(
+ Arrays.stream(Table.TimestampGranularity.values()).collect(Collectors.toSet()),
+ Set.of(
+ Table.TimestampGranularity.TIMESTAMP_GRANULARITY_UNSPECIFIED,
+ Table.TimestampGranularity.MILLIS,
+ Table.TimestampGranularity.UNRECOGNIZED));
+ }
+
+ @Test
+ public void testHandleResults() {
+ SinkRecord errorSinkRecord = new SinkRecord("", 1, null, null, null, null, 1);
+ SinkRecord successSinkRecord = new SinkRecord("", 1, null, null, null, null, 2);
+ Map> perRecordResults =
+ Map.of(
+ errorSinkRecord, CompletableFuture.failedFuture(new Exception("testHandleResults")),
+ successSinkRecord, CompletableFuture.completedFuture(null));
+ doReturn(errorReporter).when(context).errantRecordReporter();
+ task = new TestBigtableSinkTask(null, null, null, null, null, null, context);
+ task.handleResults(perRecordResults);
+ verify(errorReporter, times(1)).report(eq(errorSinkRecord), any());
+ assertTotalNumberOfInvocations(errorReporter, 1);
+ }
+
+ @Test
+ public void testPrepareRecords() {
+ task = spy(new TestBigtableSinkTask(null, null, null, null, null, null, context));
+ doReturn(errorReporter).when(context).errantRecordReporter();
+
+ MutationData okMutationData = mock(MutationData.class);
+ Exception exception = new RuntimeException();
+ doThrow(exception)
+ .doReturn(Optional.empty())
+ .doReturn(Optional.of(okMutationData))
+ .when(task)
+ .createRecordMutationData(any());
+
+ SinkRecord exceptionRecord = new SinkRecord("", 1, null, null, null, null, 1);
+ SinkRecord emptyRecord = new SinkRecord("", 1, null, null, null, null, 3);
+ SinkRecord okRecord = new SinkRecord("", 1, null, null, null, null, 2);
+
+ Map result =
+ task.prepareRecords(List.of(exceptionRecord, emptyRecord, okRecord));
+ assertEquals(Map.of(okRecord, okMutationData), result);
+ verify(errorReporter, times(1)).report(exceptionRecord, exception);
+ assertTotalNumberOfInvocations(errorReporter, 1);
+ }
+
+ @Test
+ public void testAutoCreateTablesAndHandleErrors() {
+ task = spy(new TestBigtableSinkTask(null, null, null, null, null, schemaManager, context));
+ doReturn(errorReporter).when(context).errantRecordReporter();
+
+ doReturn(errorReporter).when(context).errantRecordReporter();
+ SinkRecord okRecord = new SinkRecord("", 1, null, null, null, null, 1);
+ SinkRecord bigtableErrorRecord = new SinkRecord("", 1, null, null, null, null, 2);
+ SinkRecord dataErrorRecord = new SinkRecord("", 1, null, null, null, null, 3);
+ MutationData okMutationData = mock(MutationData.class);
+ MutationData bigtableErrorMutationData = mock(MutationData.class);
+ MutationData dataErrorMutationData = mock(MutationData.class);
+
+ Map mutations = new HashMap<>();
+ mutations.put(okRecord, okMutationData);
+ mutations.put(bigtableErrorRecord, bigtableErrorMutationData);
+ mutations.put(dataErrorRecord, dataErrorMutationData);
+
+ ResourceCreationResult resourceCreationResult =
+ new ResourceCreationResult(Set.of(bigtableErrorRecord), Set.of(dataErrorRecord));
+ doReturn(resourceCreationResult).when(schemaManager).ensureTablesExist(any());
+ Map mutationsToApply =
+ task.autoCreateTablesAndHandleErrors(mutations);
+
+ assertEquals(Map.of(okRecord, okMutationData), mutationsToApply);
+ verify(errorReporter, times(1))
+ .report(eq(bigtableErrorRecord), argThat(e -> e instanceof ConnectException));
+ verify(errorReporter, times(1))
+ .report(
+ eq(dataErrorRecord),
+ argThat(e -> e instanceof InvalidBigtableSchemaModificationException));
+ assertTotalNumberOfInvocations(errorReporter, 2);
+ }
+
+ @Test
+ public void testAutoCreateColumnFamiliesAndHandleErrors() {
+ task = spy(new TestBigtableSinkTask(null, null, null, null, null, schemaManager, context));
+ doReturn(errorReporter).when(context).errantRecordReporter();
+
+ doReturn(errorReporter).when(context).errantRecordReporter();
+ SinkRecord okRecord = new SinkRecord("", 1, null, null, null, null, 1);
+ SinkRecord bigtableErrorRecord = new SinkRecord("", 1, null, null, null, null, 2);
+ SinkRecord dataErrorRecord = new SinkRecord("", 1, null, null, null, null, 3);
+ MutationData okMutationData = mock(MutationData.class);
+ MutationData bigtableErrorMutationData = mock(MutationData.class);
+ MutationData dataErrorMutationData = mock(MutationData.class);
+
+ Map mutations = new HashMap<>();
+ mutations.put(okRecord, okMutationData);
+ mutations.put(bigtableErrorRecord, bigtableErrorMutationData);
+ mutations.put(dataErrorRecord, dataErrorMutationData);
+
+ ResourceCreationResult resourceCreationResult =
+ new ResourceCreationResult(Set.of(bigtableErrorRecord), Set.of(dataErrorRecord));
+ doReturn(resourceCreationResult).when(schemaManager).ensureColumnFamiliesExist(any());
+ Map mutationsToApply =
+ task.autoCreateColumnFamiliesAndHandleErrors(mutations);
+
+ assertEquals(Map.of(okRecord, okMutationData), mutationsToApply);
+ verify(errorReporter, times(1))
+ .report(eq(bigtableErrorRecord), argThat(e -> e instanceof ConnectException));
+ verify(errorReporter, times(1))
+ .report(
+ eq(dataErrorRecord),
+ argThat(e -> e instanceof InvalidBigtableSchemaModificationException));
+ assertTotalNumberOfInvocations(errorReporter, 2);
+ }
+
+ @Test
+ public void testInsertRows() throws ExecutionException, InterruptedException {
+ task = new TestBigtableSinkTask(null, bigtableData, null, null, null, null, null);
+ ApiException exception = ApiExceptionFactory.create();
+ doReturn(false).doReturn(true).doThrow(exception).when(bigtableData).checkAndMutateRow(any());
+
+ SinkRecord successRecord = new SinkRecord("", 1, null, null, null, null, 1);
+ SinkRecord errorRecord = new SinkRecord("", 1, null, null, null, null, 2);
+ SinkRecord exceptionRecord = new SinkRecord("", 1, null, null, null, null, 3);
+ MutationData commonMutationData = mock(MutationData.class);
+ doReturn("ignored").when(commonMutationData).getTargetTable();
+ doReturn(ByteString.copyFrom("ignored".getBytes(StandardCharsets.UTF_8)))
+ .when(commonMutationData)
+ .getRowKey();
+ doReturn(mock(Mutation.class)).when(commonMutationData).getInsertMutation();
+
+ // LinkedHashMap, because we mock consecutive return values of Bigtable client mock and thus
+ // rely on the order.
+ Map input = new LinkedHashMap<>();
+ input.put(successRecord, commonMutationData);
+ input.put(errorRecord, commonMutationData);
+ input.put(exceptionRecord, commonMutationData);
+ Map> output = new HashMap<>();
+ task.insertRows(input, output);
+
+ assertEquals(input.keySet(), output.keySet());
+ verify(bigtableData, times(input.size())).checkAndMutateRow(any());
+ assertTotalNumberOfInvocations(bigtableData, input.size());
+
+ output.get(successRecord).get();
+ assertThrows(ExecutionException.class, () -> output.get(errorRecord).get());
+ assertThrows(ExecutionException.class, () -> output.get(exceptionRecord).get());
+ }
+
+ @Test
+ public void testUpsertRows() {
+ Map props = BasicPropertiesFactory.getTaskProps();
+ int maxBatchSize = 3;
+ int totalRecords = 1000;
+ props.put(BigtableSinkTaskConfig.CONFIG_MAX_BATCH_SIZE, Integer.toString(maxBatchSize));
+ BigtableSinkTaskConfig config = new BigtableSinkTaskConfig(props);
+
+ task = spy(new TestBigtableSinkTask(config, null, null, null, null, null, null));
+ String batcherTable = "batcherTable";
+ Batcher batcher = mock(Batcher.class);
+ doAnswer(
+ invocation -> {
+ TestBigtableSinkTask task = (TestBigtableSinkTask) invocation.getMock();
+ task.getBatchers().computeIfAbsent(batcherTable, ignored -> batcher);
+ return null;
+ })
+ .when(task)
+ .performUpsertBatch(any(), any());
+
+ MutationData commonMutationData = mock(MutationData.class);
+
+ Map input =
+ IntStream.range(0, totalRecords)
+ .mapToObj(i -> new SinkRecord("", 1, null, null, null, null, i))
+ .collect(Collectors.toMap(i -> i, ignored -> commonMutationData));
+
+ Map> fakeMutationData = mock(Map.class);
+ assertTrue(task.getBatchers().isEmpty());
+ task.upsertRows(input, fakeMutationData);
+ assertEquals(Set.of(batcher), task.getBatchers().values().stream().collect(Collectors.toSet()));
+
+ int expectedFullBatches = totalRecords / maxBatchSize;
+ int expectedPartialBatches = totalRecords % maxBatchSize == 0 ? 0 : 1;
+
+ verify(task, times(expectedFullBatches))
+ .performUpsertBatch(argThat(v -> v.size() == maxBatchSize), any());
+ verify(task, times(expectedPartialBatches))
+ .performUpsertBatch(argThat(v -> v.size() != maxBatchSize), any());
+ }
+
+ @Test
+ public void testPerformUpsertBatch() throws ExecutionException, InterruptedException {
+ String okTable = "okTable";
+ String errorTable = "errorTable";
+
+ Batcher okBatcher = mock(Batcher.class);
+ doReturn(completedApiFuture(null)).when(okBatcher).add(any());
+ Batcher errorBatcher = mock(Batcher.class);
+ doReturn(FutureUtil.failedApiFuture(new Exception())).when(errorBatcher).add(any());
+
+ doReturn(okBatcher).when(bigtableData).newBulkMutationBatcher(okTable);
+ doReturn(errorBatcher).when(bigtableData).newBulkMutationBatcher(errorTable);
+ task = new TestBigtableSinkTask(null, bigtableData, null, null, null, null, null);
+
+ SinkRecord okRecord = new SinkRecord(okTable, 1, null, null, null, null, 1);
+ SinkRecord errorRecord = new SinkRecord(errorTable, 1, null, null, null, null, 2);
+
+ MutationData okMutationData = mock(MutationData.class);
+ doReturn(okTable).when(okMutationData).getTargetTable();
+ doReturn(mock(RowMutationEntry.class)).when(okMutationData).getUpsertMutation();
+ MutationData errorMutationData = mock(MutationData.class);
+ doReturn(errorTable).when(errorMutationData).getTargetTable();
+ doReturn(mock(RowMutationEntry.class)).when(errorMutationData).getUpsertMutation();
+
+ Map input =
+ Map.of(
+ okRecord, okMutationData,
+ errorRecord, errorMutationData);
+ Map> output = new HashMap<>();
+
+ assertTrue(task.getBatchers().isEmpty());
+ task.performUpsertBatch(new ArrayList<>(input.entrySet()), output);
+ assertEquals(
+ Set.of(okBatcher, errorBatcher),
+ task.getBatchers().values().stream().collect(Collectors.toSet()));
+
+ assertEquals(input.keySet(), output.keySet());
+ verify(okBatcher, times(1)).add(any());
+ verify(okBatcher, times(1)).sendOutstanding();
+ assertTotalNumberOfInvocations(okBatcher, 2);
+ verify(errorBatcher, times(1)).add(any());
+ verify(errorBatcher, times(1)).sendOutstanding();
+ assertTotalNumberOfInvocations(errorBatcher, 2);
+
+ output.get(okRecord).get();
+ assertThrows(ExecutionException.class, () -> output.get(errorRecord).get());
+ }
+
+ @Test
+ public void testPutBranches() {
+ SinkRecord record1 = new SinkRecord("table1", 1, null, null, null, null, 1);
+ SinkRecord record2 = new SinkRecord("table2", 1, null, null, null, null, 2);
+
+ for (List test :
+ List.of(
+ List.of(false, false, false),
+ List.of(false, false, true),
+ List.of(false, true, false),
+ List.of(false, true, true),
+ List.of(true, false, false),
+ List.of(true, false, true),
+ List.of(true, true, false),
+ List.of(true, true, true))) {
+ boolean autoCreateTables = test.get(0);
+ boolean autoCreateColumnFamilies = test.get(1);
+ boolean useInsertMode = test.get(2);
+
+ Map props = BasicPropertiesFactory.getTaskProps();
+ props.put(CONFIG_AUTO_CREATE_TABLES, Boolean.toString(autoCreateTables));
+ props.put(CONFIG_AUTO_CREATE_COLUMN_FAMILIES, Boolean.toString(autoCreateColumnFamilies));
+ props.put(CONFIG_INSERT_MODE, (useInsertMode ? InsertMode.INSERT : InsertMode.UPSERT).name());
+ config = new BigtableSinkTaskConfig(props);
+
+ byte[] rowKey = "rowKey".getBytes(StandardCharsets.UTF_8);
+ doReturn(rowKey).when(keyMapper).getKey(any());
+ doAnswer(
+ i -> {
+ MutationDataBuilder builder = new MutationDataBuilder();
+ builder.deleteRow();
+ return builder;
+ })
+ .when(valueMapper)
+ .getRecordMutationDataBuilder(any(), anyLong());
+
+ Batcher batcher = mock(Batcher.class);
+ doReturn(completedApiFuture(null)).when(batcher).add(any());
+ doReturn(batcher).when(bigtableData).newBulkMutationBatcher(anyString());
+ doReturn(new ResourceCreationResult(Collections.emptySet(), Collections.emptySet()))
+ .when(schemaManager)
+ .ensureTablesExist(any());
+ doReturn(new ResourceCreationResult(Collections.emptySet(), Collections.emptySet()))
+ .when(schemaManager)
+ .ensureColumnFamiliesExist(any());
+
+ task =
+ spy(
+ new TestBigtableSinkTask(
+ config, bigtableData, null, keyMapper, valueMapper, schemaManager, null));
+
+ task.put(List.of(record1, record2));
+
+ verify(task, times(1)).prepareRecords(any());
+ verify(schemaManager, times(autoCreateTables ? 1 : 0)).ensureTablesExist(any());
+ verify(schemaManager, times(autoCreateColumnFamilies ? 1 : 0))
+ .ensureColumnFamiliesExist(any());
+ verify(task, times(useInsertMode ? 1 : 0)).insertRows(any(), any());
+ verify(task, times(useInsertMode ? 0 : 1)).upsertRows(any(), any());
+ verify(task, times(1)).handleResults(any());
+
+ reset(task);
+ reset(schemaManager);
+ }
+ }
+
+ private static class TestBigtableSinkTask extends BigtableSinkTask {
+ public TestBigtableSinkTask(
+ BigtableSinkTaskConfig config,
+ BigtableDataClient bigtableData,
+ BigtableTableAdminClient bigtableAdmin,
+ KeyMapper keyMapper,
+ ValueMapper valueMapper,
+ BigtableSchemaManager schemaManager,
+ SinkTaskContext context) {
+ super(config, bigtableData, bigtableAdmin, keyMapper, valueMapper, schemaManager, context);
+ this.logger = mock(Logger.class);
+ }
+
+ public Logger getLogger() {
+ return logger;
+ }
+
+ public Map> getBatchers() {
+ return batchers;
+ }
+ }
+}
diff --git a/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManagerTest.java b/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManagerTest.java
new file mode 100644
index 000000000..c8b6eaf45
--- /dev/null
+++ b/google-cloud-bigtable-kafka-connect-sink/src/test/java/com/google/cloud/kafka/connect/bigtable/autocreate/BigtableSchemaManagerTest.java
@@ -0,0 +1,749 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.kafka.connect.bigtable.autocreate;
+
+import static com.google.cloud.kafka.connect.bigtable.util.FutureUtil.completedApiFuture;
+import static com.google.cloud.kafka.connect.bigtable.util.FutureUtil.failedApiFuture;
+import static com.google.cloud.kafka.connect.bigtable.util.MockUtil.assertTotalNumberOfInvocations;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.models.ColumnFamily;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest;
+import com.google.cloud.bigtable.admin.v2.models.Table;
+import com.google.cloud.kafka.connect.bigtable.autocreate.BigtableSchemaManager.ResourceAndRecords;
+import com.google.cloud.kafka.connect.bigtable.mapping.MutationData;
+import com.google.cloud.kafka.connect.bigtable.util.ApiExceptionFactory;
+import io.grpc.Status;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigtableSchemaManagerTest {
+ BigtableTableAdminClient bigtable;
+ TestBigtableSchemaManager bigtableSchemaManager;
+
+ @Before
+ public void setUp() {
+ bigtable = mock(BigtableTableAdminClient.class);
+ bigtableSchemaManager = spy(new TestBigtableSchemaManager(bigtable));
+ }
+
+ @Test
+ public void testTableCachePopulationSuccess() {
+ List tables = List.of("table1", "table2");
+ doReturn(tables).when(bigtable).listTables();
+ bigtableSchemaManager.refreshTableNamesCache();
+ assertEquals(new HashSet<>(tables), bigtableSchemaManager.getCache().keySet());
+ assertTotalNumberOfInvocations(bigtable, 1);
+
+ reset(bigtable);
+ verifyNoInteractions(bigtable);
+ Map input =
+ generateInput(
+ tables.stream()
+ .map(l -> new AbstractMap.SimpleImmutableEntry<>(l, Set.of("cf")))
+ .collect(Collectors.toList()));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(input);
+ assertTrue(result.getBigtableErrors().isEmpty());
+ assertTrue(result.getDataErrors().isEmpty());
+ }
+
+ @Test
+ public void testTableCachePopulationMayRemoveElements() {
+ List tables1 = List.of("table1", "table2");
+ List tables2 = List.of(tables1.get(0));
+
+ doReturn(tables1).when(bigtable).listTables();
+ bigtableSchemaManager.refreshTableNamesCache();
+ assertEquals(new HashSet<>(tables1), bigtableSchemaManager.getCache().keySet());
+ reset(bigtable);
+
+ doReturn(tables2).when(bigtable).listTables();
+ bigtableSchemaManager.refreshTableNamesCache();
+ assertEquals(new HashSet<>(tables2), bigtableSchemaManager.getCache().keySet());
+ verify(bigtable, times(1)).listTables();
+ assertTotalNumberOfInvocations(bigtable, 1);
+ }
+
+ @Test
+ public void testTableCachePopulationError() {
+ doThrow(ApiExceptionFactory.create()).when(bigtable).listTables();
+ assertThrows(ConnectException.class, () -> bigtableSchemaManager.refreshTableNamesCache());
+ }
+
+ @Test
+ public void testTableColumnFamiliesCachePopulationSuccess() {
+ Map> tablesAndColumnFamilies =
+ Map.of(
+ "table1", Set.of("cf1", "cf2"),
+ "table2", Set.of("cf3", "cf4"));
+ doReturn(new ArrayList<>(tablesAndColumnFamilies.keySet())).when(bigtable).listTables();
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ mockGetTableSuccess(bigtable, entry.getKey(), entry.getValue());
+ }
+
+ Set refreshedTables = tablesAndColumnFamilies.keySet();
+ bigtableSchemaManager.refreshTableColumnFamiliesCache(refreshedTables);
+ verify(bigtableSchemaManager, times(1)).refreshTableColumnFamiliesCache(refreshedTables);
+ verify(bigtableSchemaManager, times(1)).refreshTableNamesCache();
+ assertTotalNumberOfInvocations(bigtableSchemaManager, 2);
+ assertEquals(
+ bigtableSchemaManager.getCache(),
+ tablesAndColumnFamilies.entrySet().stream()
+ .map(e -> new AbstractMap.SimpleImmutableEntry<>(e.getKey(), Optional.of(e.getValue())))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
+ verify(bigtable, times(1)).listTables();
+ for (String tableName : tablesAndColumnFamilies.keySet()) {
+ verify(bigtable, times(1)).getTableAsync(tableName);
+ }
+ // One for listTables() and one for each table lookup.
+ int expectedBigtableCalls = tablesAndColumnFamilies.size() + 1;
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableCalls);
+
+ reset(bigtable);
+ verifyNoInteractions(bigtable);
+ Map input =
+ generateInput(new ArrayList<>(tablesAndColumnFamilies.entrySet()));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(input);
+ assertTrue(result.getBigtableErrors().isEmpty());
+ assertTrue(result.getDataErrors().isEmpty());
+ }
+
+ @Test
+ public void testTableColumnFamiliesCachePopulationErrors() {
+ doThrow(ApiExceptionFactory.create()).when(bigtable).listTables();
+ assertThrows(
+ ConnectException.class,
+ () -> bigtableSchemaManager.refreshTableColumnFamiliesCache(Collections.emptySet()));
+ verify(bigtable, times(1)).listTables();
+ reset(bigtable);
+
+ String successTable = "table1";
+ String errorTable = "table2";
+ List allTables = List.of(successTable, errorTable);
+
+ doReturn(allTables).when(bigtable).listTables();
+ mockGetTableSuccess(bigtable, successTable, Collections.emptySet());
+ // We simulate an error due to e.g., deletion of the table by another user.
+ doReturn(failedApiFuture(ApiExceptionFactory.create()))
+ .when(bigtable)
+ .getTableAsync(errorTable);
+ bigtableSchemaManager.refreshTableColumnFamiliesCache(new HashSet<>(allTables));
+ assertEquals(Set.of(successTable), bigtableSchemaManager.getCache().keySet());
+ verify(bigtable, times(1)).listTables();
+ verify(bigtable, times(1)).getTableAsync(successTable);
+ verify(bigtable, times(1)).getTableAsync(errorTable);
+ assertTotalNumberOfInvocations(bigtable, 3);
+ }
+
+ @Test
+ public void testEnsureTablesExistAllExisted() {
+ // Prepopulate the cache.
+ List tables = List.of("table1", "table2");
+ doReturn(tables).when(bigtable).listTables();
+ bigtableSchemaManager.refreshTableNamesCache();
+ reset(bigtable);
+
+ Map ensureTablesExistInput =
+ generateInput(
+ List.of(
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(0), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(0), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(1), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(1), Set.of("missingCF"))));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(ensureTablesExistInput);
+ assertTrue(result.getBigtableErrors().isEmpty());
+ assertTrue(result.getDataErrors().isEmpty());
+ assertTotalNumberOfInvocations(bigtable, 0);
+ }
+
+ @Test
+ public void testEnsureTablesExistAllCreatedSuccessfully() {
+ List tables = List.of("table1", "table2");
+ // We call listTables() only once, after sending all the create requests. In this case all the
+ // requests were successful.
+ doReturn(tables).when(bigtable).listTables();
+ for (String table : tables) {
+ mockCreateTableSuccess(bigtable, table, Collections.emptySet());
+ }
+
+ assertTrue(bigtableSchemaManager.getCache().isEmpty());
+ Map ensureTablesExistInput =
+ generateInput(
+ List.of(
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(0), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(0), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(1), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(1), Set.of("missingCF"))));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(ensureTablesExistInput);
+ assertTrue(result.getBigtableErrors().isEmpty());
+ assertTrue(result.getDataErrors().isEmpty());
+ for (String table : tables) {
+ assertTrue(bigtableSchemaManager.getCache().containsKey(table));
+ verify(bigtable, times(1))
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(table, ctr)));
+ }
+
+ // One for each table creation and one for result check.
+ int expectedBigtableCalls = tables.size() + 1;
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableCalls);
+ }
+
+ @Test
+ public void testEnsureTablesExistSomeCreatedSuccessfullySomeErrorsDueToRaces() {
+ List tables = List.of("table1", "table2");
+ // We call listTables() only once, after sending all the create requests. In this case some
+ // requests failed since another thread concurrently created one of these tables.
+ doReturn(tables).when(bigtable).listTables();
+ String tableWhoseCreationFailed = tables.get(1);
+ for (String table : tables) {
+ if (!table.equals(tableWhoseCreationFailed)) {
+ mockCreateTableSuccess(bigtable, table, Collections.emptySet());
+ }
+ }
+ doReturn(failedApiFuture(ApiExceptionFactory.create()))
+ .when(bigtable)
+ .createTableAsync(
+ argThat(ctr -> createTableMockRefersTable(tableWhoseCreationFailed, ctr)));
+
+ assertTrue(bigtableSchemaManager.getCache().isEmpty());
+ Map ensureTablesExistInput =
+ generateInput(
+ List.of(
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(0), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(0), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(1), Set.of("missingCF")),
+ new AbstractMap.SimpleImmutableEntry<>(tables.get(1), Set.of("missingCF"))));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(ensureTablesExistInput);
+ assertTrue(result.getBigtableErrors().isEmpty());
+ assertTrue(result.getDataErrors().isEmpty());
+ for (String table : tables) {
+ assertTrue(bigtableSchemaManager.getCache().containsKey(table));
+ verify(bigtable, times(1))
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(table, ctr)));
+ }
+
+ // One for each table creation and one for result check.
+ int expectedBigtableCalls = tables.size() + 1;
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableCalls);
+ }
+
+ @Test
+ public void testEnsureTablesExistSomeCreatedSuccessfullySomeErrors() {
+ String successfulTable = "table1";
+ String bigtableErrorTable = "table2";
+ String dataErrorTable = "table3";
+ Set columnFamilies = Set.of("cf1");
+
+ doReturn(List.of(successfulTable)).when(bigtable).listTables();
+ mockCreateTableSuccess(bigtable, successfulTable, columnFamilies);
+ doReturn(failedApiFuture(ApiExceptionFactory.create(Status.Code.RESOURCE_EXHAUSTED)))
+ .when(bigtable)
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(bigtableErrorTable, ctr)));
+ doReturn(failedApiFuture(ApiExceptionFactory.create(Status.Code.INVALID_ARGUMENT)))
+ .when(bigtable)
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(dataErrorTable, ctr)));
+
+ assertTrue(bigtableSchemaManager.getCache().isEmpty());
+ Map ensureTablesExistInput =
+ generateInput(
+ List.of(
+ new AbstractMap.SimpleImmutableEntry<>(successfulTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(successfulTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(bigtableErrorTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(bigtableErrorTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(dataErrorTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(dataErrorTable, columnFamilies)));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(ensureTablesExistInput);
+ Set bigtableErrors = result.getBigtableErrors();
+ Set dataErrors = result.getDataErrors();
+ assertEquals(
+ ensureTablesExistInput.entrySet().stream()
+ .filter(e -> e.getValue().getTargetTable().equals(bigtableErrorTable))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet()),
+ bigtableErrors);
+ assertEquals(
+ ensureTablesExistInput.entrySet().stream()
+ .filter(e -> e.getValue().getTargetTable().equals(dataErrorTable))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet()),
+ dataErrors);
+ Map>> cache = bigtableSchemaManager.getCache();
+ assertTrue(cache.containsKey(successfulTable));
+ verify(bigtable, times(1))
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(successfulTable, ctr)));
+ assertFalse(cache.containsKey(bigtableErrorTable));
+ verify(bigtable, times(1))
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(bigtableErrorTable, ctr)));
+ assertFalse(cache.containsKey(dataErrorTable));
+ verify(bigtable, times(1))
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(dataErrorTable, ctr)));
+
+ // One for each table creation and one for result check.
+ int expectedBigtableCalls = 4;
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableCalls);
+ }
+
+ @Test
+ public void testEnsureTablesExistConcurrentDeletion() {
+ String createdTable = "table1";
+ String createdAndThenConcurrentlyDeletedTable = "table2";
+ Set columnFamilies = Set.of("cf1");
+
+ // Note that only a single table is returned - we simulate concurrent deletion of the other
+ // table.
+ doAnswer(ignored -> List.of(createdTable)).when(bigtable).listTables();
+ mockCreateTableSuccess(bigtable, createdTable, columnFamilies);
+ mockCreateTableSuccess(bigtable, createdAndThenConcurrentlyDeletedTable, columnFamilies);
+
+ assertTrue(bigtableSchemaManager.getCache().isEmpty());
+ Map ensureTablesExistInput =
+ generateInput(
+ List.of(
+ new AbstractMap.SimpleImmutableEntry<>(createdTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(createdTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(
+ createdAndThenConcurrentlyDeletedTable, columnFamilies),
+ new AbstractMap.SimpleImmutableEntry<>(
+ createdAndThenConcurrentlyDeletedTable, columnFamilies)));
+ ResourceCreationResult result = bigtableSchemaManager.ensureTablesExist(ensureTablesExistInput);
+ assertTrue(result.getDataErrors().isEmpty());
+ Set missingTables = result.getBigtableErrors();
+ assertEquals(
+ ensureTablesExistInput.entrySet().stream()
+ .filter(
+ e -> e.getValue().getTargetTable().equals(createdAndThenConcurrentlyDeletedTable))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet()),
+ missingTables);
+ Map>> cache = bigtableSchemaManager.getCache();
+ assertTrue(cache.containsKey(createdTable));
+ verify(bigtable, times(1))
+ .createTableAsync(argThat(ctr -> createTableMockRefersTable(createdTable, ctr)));
+ assertFalse(cache.containsKey(createdAndThenConcurrentlyDeletedTable));
+ verify(bigtable, times(1))
+ .createTableAsync(
+ argThat(
+ ctr -> createTableMockRefersTable(createdAndThenConcurrentlyDeletedTable, ctr)));
+
+ // One for each table creation and one for result check.
+ int expectedBigtableCalls = 3;
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableCalls);
+ }
+
+ @Test
+ public void testEnsureColumnFamiliesExistAllExisted() {
+ Map> tablesAndColumnFamilies =
+ Map.of(
+ "table1", Set.of("cf1", "cf2"),
+ "table2", Set.of("cf3", "cf4"));
+ doReturn(new ArrayList<>(tablesAndColumnFamilies.keySet())).when(bigtable).listTables();
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ mockGetTableSuccess(bigtable, entry.getKey(), entry.getValue());
+ }
+ Set refreshedTables = tablesAndColumnFamilies.keySet();
+ bigtableSchemaManager.refreshTableColumnFamiliesCache(refreshedTables);
+ reset(bigtable);
+ verifyNoInteractions(bigtable);
+
+ Map ensureColumnFamiliesExistInput =
+ generateInput(new ArrayList<>(tablesAndColumnFamilies.entrySet()));
+ ResourceCreationResult result =
+ bigtableSchemaManager.ensureColumnFamiliesExist(ensureColumnFamiliesExistInput);
+ assertTrue(result.getDataErrors().isEmpty());
+ assertTrue(result.getBigtableErrors().isEmpty());
+ }
+
+ @Test
+ public void testEnsureColumnFamiliesExistAllCreatedSuccessfully() {
+ Map> tablesAndColumnFamilies =
+ Map.of(
+ "table1", Set.of("cf1", "cf2"),
+ "table2", Set.of("cf3", "cf4"));
+ doReturn(new ArrayList<>(tablesAndColumnFamilies.keySet())).when(bigtable).listTables();
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ mockCreateColumnFamilySuccess(bigtable, entry.getKey(), entry.getValue());
+ mockGetTableSuccess(bigtable, entry.getKey(), entry.getValue());
+ }
+ Map ensureColumnFamiliesExistInput =
+ generateInput(new ArrayList<>(tablesAndColumnFamilies.entrySet()));
+
+ ResourceCreationResult result =
+ bigtableSchemaManager.ensureColumnFamiliesExist(ensureColumnFamiliesExistInput);
+ assertTrue(result.getDataErrors().isEmpty());
+ assertTrue(result.getBigtableErrors().isEmpty());
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String tableName = entry.getKey();
+ for (String columnFamily : entry.getValue()) {
+ verify(bigtable, times(1))
+ .modifyFamiliesAsync(
+ argThat(
+ mcfr ->
+ createColumnFamilyMockRefersTableAndColumnFamily(
+ tableName, columnFamily, mcfr)));
+ }
+ verify(bigtable, times(1)).getTableAsync(tableName);
+ }
+ int expectedBigtableInteractions =
+ 1 // listTables()
+ + tablesAndColumnFamilies.values().stream()
+ .mapToInt(Set::size)
+ .sum() // modifyColumnFamily()
+ + tablesAndColumnFamilies.keySet().size(); // getTable()
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableInteractions);
+ }
+
+ @Test
+ public void
+ testEnsureColumnFamiliesExistSomeCreatedSuccessfullySomeErrorsDueToRacesOrInvalidRequests() {
+ String successTable = "table1";
+ String bigtableErrorTable = "table2";
+ String dataErrorTable = "table3";
+ String invalidArgumentColumnFamilyName = "INVALID_ARGUMENT_COLUMN_FAMILY_NAME";
+ Map> tablesAndColumnFamilies =
+ Map.of(
+ successTable, Set.of("cf1", "cf2"),
+ bigtableErrorTable, Set.of("cf3", "cf4"),
+ dataErrorTable, Set.of("cf5", invalidArgumentColumnFamilyName));
+ doReturn(new ArrayList<>(tablesAndColumnFamilies.keySet())).when(bigtable).listTables();
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String table = entry.getKey();
+ for (String columnFamily : entry.getValue()) {
+ if (table.equals(bigtableErrorTable)) {
+ doReturn(failedApiFuture(ApiExceptionFactory.create(Status.Code.RESOURCE_EXHAUSTED)))
+ .when(bigtable)
+ .modifyFamiliesAsync(
+ argThat(
+ mcfr ->
+ createColumnFamilyMockRefersTableAndColumnFamily(
+ table, columnFamily, mcfr)));
+ } else if (table.equals(dataErrorTable)) {
+ doReturn(failedApiFuture(ApiExceptionFactory.create(Status.Code.INVALID_ARGUMENT)))
+ .when(bigtable)
+ .modifyFamiliesAsync(
+ argThat(
+ mcfr ->
+ createColumnFamilyMockRefersTableAndColumnFamily(
+ table, columnFamily, mcfr)));
+ } else {
+ mockCreateColumnFamilySuccess(bigtable, entry.getKey(), entry.getValue());
+ }
+ }
+ Set columnFamilies = new HashSet<>(entry.getValue());
+ columnFamilies.remove(invalidArgumentColumnFamilyName);
+ mockGetTableSuccess(bigtable, table, columnFamilies);
+ }
+ Map ensureColumnFamiliesExistInput =
+ generateInput(new ArrayList<>(tablesAndColumnFamilies.entrySet()));
+ ResourceCreationResult result =
+ bigtableSchemaManager.ensureColumnFamiliesExist(ensureColumnFamiliesExistInput);
+ assertTrue(result.getBigtableErrors().isEmpty());
+ Set missingColumnFamilies = result.getDataErrors();
+ assertEquals(
+ ensureColumnFamiliesExistInput.entrySet().stream()
+ .filter(e -> e.getValue().getTargetTable().equals(dataErrorTable))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet()),
+ missingColumnFamilies);
+
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String tableName = entry.getKey();
+ for (String columnFamily : entry.getValue()) {
+ verify(bigtable, times(1))
+ .modifyFamiliesAsync(
+ argThat(
+ mcfr ->
+ createColumnFamilyMockRefersTableAndColumnFamily(
+ tableName, columnFamily, mcfr)));
+ }
+ verify(bigtable, times(1)).getTableAsync(tableName);
+ }
+ int expectedBigtableInteractions =
+ 1 // listTables()
+ + tablesAndColumnFamilies.values().stream()
+ .mapToInt(Set::size)
+ .sum() // modifyColumnFamily()
+ + tablesAndColumnFamilies.keySet().size(); // getTable()
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableInteractions);
+ }
+
+ @Test
+ public void testEnsureColumnFamiliesExistSomeSomeErrorsDueToConcurrentColumnFamilyDeletion() {
+ String successTable = "table1";
+ String errorTable = "table2";
+ Map> tablesAndColumnFamilies =
+ Map.of(
+ successTable, Set.of("cf1", "cf2"),
+ errorTable, Set.of("cf3", "cf4"));
+ doReturn(new ArrayList<>(tablesAndColumnFamilies.keySet())).when(bigtable).listTables();
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String table = entry.getKey();
+ mockCreateColumnFamilySuccess(bigtable, table, entry.getValue());
+ if (table.equals(errorTable)) {
+ doReturn(failedApiFuture(ApiExceptionFactory.create())).when(bigtable).getTableAsync(table);
+ } else {
+ mockGetTableSuccess(bigtable, table, entry.getValue());
+ }
+ }
+ Map ensureColumnFamiliesExistInput =
+ generateInput(new ArrayList<>(tablesAndColumnFamilies.entrySet()));
+ ResourceCreationResult result =
+ bigtableSchemaManager.ensureColumnFamiliesExist(ensureColumnFamiliesExistInput);
+ assertTrue(result.getDataErrors().isEmpty());
+ Set missingColumnFamilies = result.getBigtableErrors();
+ assertEquals(1, missingColumnFamilies.size());
+ assertEquals(
+ ensureColumnFamiliesExistInput.entrySet().stream()
+ .filter(e -> e.getValue().getTargetTable().equals(errorTable))
+ .findFirst()
+ .get()
+ .getKey(),
+ missingColumnFamilies.stream().findFirst().get());
+
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String tableName = entry.getKey();
+ for (String columnFamily : entry.getValue()) {
+ verify(bigtable, times(1))
+ .modifyFamiliesAsync(
+ argThat(
+ mcfr ->
+ createColumnFamilyMockRefersTableAndColumnFamily(
+ tableName, columnFamily, mcfr)));
+ }
+ verify(bigtable, times(1)).getTableAsync(tableName);
+ }
+ int expectedBigtableInteractions =
+ 1 // listTables()
+ + tablesAndColumnFamilies.values().stream()
+ .mapToInt(Set::size)
+ .sum() // modifyColumnFamily()
+ + tablesAndColumnFamilies.keySet().size(); // getTable()
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableInteractions);
+ }
+
+ @Test
+ public void testEnsureColumnFamiliesExistMissingTable() {
+ String successTable = "table1";
+ String errorTable = "table2";
+ Map> tablesAndColumnFamilies =
+ Map.of(
+ successTable, Set.of("cf1", "cf2"),
+ errorTable, Set.of("cf3", "cf4"));
+ doReturn(List.of(successTable)).when(bigtable).listTables();
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String table = entry.getKey();
+ mockGetTableSuccess(bigtable, table, entry.getValue());
+ if (table.equals(errorTable)) {
+ doReturn(failedApiFuture(ApiExceptionFactory.create()))
+ .when(bigtable)
+ .modifyFamiliesAsync(argThat(mcfr -> createColumnFamilyMockRefersTable(table, mcfr)));
+ } else {
+ mockCreateColumnFamilySuccess(bigtable, table, entry.getValue());
+ }
+ }
+ Map ensureColumnFamiliesExistInput =
+ generateInput(new ArrayList<>(tablesAndColumnFamilies.entrySet()));
+ ResourceCreationResult result =
+ bigtableSchemaManager.ensureColumnFamiliesExist(ensureColumnFamiliesExistInput);
+ assertTrue(result.getDataErrors().isEmpty());
+ Set missingColumnFamilies = result.getBigtableErrors();
+ assertEquals(1, missingColumnFamilies.size());
+ assertEquals(
+ ensureColumnFamiliesExistInput.entrySet().stream()
+ .filter(e -> e.getValue().getTargetTable().equals(errorTable))
+ .findFirst()
+ .get()
+ .getKey(),
+ missingColumnFamilies.stream().findFirst().get());
+
+ for (Map.Entry> entry : tablesAndColumnFamilies.entrySet()) {
+ String table = entry.getKey();
+ for (String columnFamily : entry.getValue()) {
+ verify(bigtable, times(1))
+ .modifyFamiliesAsync(
+ argThat(
+ mcfr ->
+ createColumnFamilyMockRefersTableAndColumnFamily(
+ table, columnFamily, mcfr)));
+ }
+ if (!table.equals(errorTable)) {
+ verify(bigtable, times(1)).getTableAsync(table);
+ }
+ }
+ int expectedBigtableInteractions =
+ 1 // listTables()
+ + tablesAndColumnFamilies.values().stream()
+ .mapToInt(Set::size)
+ .sum() // modifyColumnFamily()
+ + 1; // getTable()
+ assertTotalNumberOfInvocations(bigtable, expectedBigtableInteractions);
+ }
+
+ @Test
+ public void testErrorsCreatingColumnFamilies() {}
+
+ @Test
+ public void testAwaitResourceCreationAndHandleInvalidInputErrors() {
+ int uniqueKafkaOffset = 0;
+ SinkRecord ok1 = spoofSinkRecord("topic1", uniqueKafkaOffset++);
+ SinkRecord ok2 = spoofSinkRecord("topic2", uniqueKafkaOffset++);
+ SinkRecord dataError1 = spoofSinkRecord("topic3", uniqueKafkaOffset++);
+ SinkRecord dataError2 = spoofSinkRecord("topic4", uniqueKafkaOffset++);
+ SinkRecord bigtableError1 = spoofSinkRecord("topic5", uniqueKafkaOffset++);
+ SinkRecord bigtableError2 = spoofSinkRecord("topic6", uniqueKafkaOffset++);
+
+ ResourceAndRecords ok = new ResourceAndRecords("ok", List.of(ok1, ok2));
+ ResourceAndRecords dataError =
+ new ResourceAndRecords("data", List.of(dataError1, dataError2));
+ ResourceAndRecords bigtableError =
+ new ResourceAndRecords("bigtable", List.of(bigtableError1, bigtableError2));
+
+ Map