Skip to content

Commit

Permalink
feat: optimize Spanner changestream metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagotnunes committed Aug 16, 2024
1 parent 65550a7 commit d7b4ba4
Showing 1 changed file with 68 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -78,6 +80,16 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";

/**
* Metadata table index for queries over the watermark column.
*/
public static final String WATERMARK_INDEX = "WatermarkIndex";

/**
* Metadata table index for queries over the created at / start timestamp columns.
*/
public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex";

private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;

Expand Down Expand Up @@ -117,11 +129,10 @@ public class PartitionMetadataAdminDao {
* PartitionMetadataAdminDao#TTL_AFTER_PARTITION_FINISHED_DAYS} days.
*/
public void createPartitionMetadataTable() {
String metadataCreateStmt = "";
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
metadataCreateStmt =
"CREATE TABLE \""
ddl.add("CREATE TABLE \""
+ tableName
+ "\"(\""
+ COLUMN_PARTITION_TOKEN
Expand All @@ -146,16 +157,35 @@ public void createPartitionMetadataTable() {
+ "\" SPANNER.COMMIT_TIMESTAMP,\""
+ COLUMN_FINISHED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,"
+ " PRIMARY KEY (\"PartitionToken\")"
+ " PRIMARY KEY (\""
+ COLUMN_PARTITION_TOKEN
+"\")"
+ ")"
+ " TTL INTERVAL '"
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " days' ON \""
+ COLUMN_FINISHED_AT
+ "\"";
+ "\"");
ddl.add("CREATE INDEX \""
+ WATERMARK_INDEX
+ "\" on \""
+ tableName
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add("CREATE INDEX \""
+ CREATED_AT_START_TIMESTAMP_INDEX
+ "\" ON \""
+ tableName
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
+ COLUMN_START_TIMESTAMP
+ "\")");
} else {
metadataCreateStmt =
"CREATE TABLE "
ddl.add("CREATE TABLE "
+ tableName
+ " ("
+ COLUMN_PARTITION_TOKEN
Expand All @@ -180,16 +210,36 @@ public void createPartitionMetadataTable() {
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_FINISHED_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ ") PRIMARY KEY (PartitionToken),"
+ ") PRIMARY KEY ("
+ COLUMN_PARTITION_TOKEN
+"),"
+ " ROW DELETION POLICY (OLDER_THAN("
+ COLUMN_FINISHED_AT
+ ", INTERVAL "
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " DAY))";
+ " DAY))");
ddl.add("CREATE INDEX "
+ WATERMARK_INDEX
+ " on "
+ tableName
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add("CREATE INDEX "
+ CREATED_AT_START_TIMESTAMP_INDEX
+ " ON "
+ tableName
+ " ("
+ COLUMN_CREATED_AT
+ ","
+ COLUMN_START_TIMESTAMP
+ ")");
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
instanceId, databaseId, Collections.singletonList(metadataCreateStmt), null);
instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
Expand All @@ -212,15 +262,18 @@ public void createPartitionMetadataTable() {
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
String metadataDropStmt;
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
metadataDropStmt = "DROP TABLE \"" + tableName + "\"";
ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
ddl.add("DROP TABLE \"" + tableName + "\"");
} else {
metadataDropStmt = "DROP TABLE " + tableName;
ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
ddl.add("DROP INDEX " + WATERMARK_INDEX);
ddl.add("DROP TABLE " + tableName);
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
instanceId, databaseId, Collections.singletonList(metadataDropStmt), null);
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
Expand Down

0 comments on commit d7b4ba4

Please sign in to comment.