Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize Spanner changestream metadata table #32213

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -78,6 +79,12 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";

/** Metadata table index for queries over the watermark column. */
public static final String WATERMARK_INDEX = "WatermarkIndex";

/** Metadata table index for queries over the created at / start timestamp columns. */
public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex";

private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;

Expand Down Expand Up @@ -117,10 +124,10 @@ public class PartitionMetadataAdminDao {
* PartitionMetadataAdminDao#TTL_AFTER_PARTITION_FINISHED_DAYS} days.
*/
public void createPartitionMetadataTable() {
String metadataCreateStmt = "";
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
metadataCreateStmt =
ddl.add(
"CREATE TABLE \""
+ tableName
+ "\"(\""
Expand All @@ -146,15 +153,37 @@ public void createPartitionMetadataTable() {
+ "\" SPANNER.COMMIT_TIMESTAMP,\""
+ COLUMN_FINISHED_AT
+ "\" SPANNER.COMMIT_TIMESTAMP,"
+ " PRIMARY KEY (\"PartitionToken\")"
+ " PRIMARY KEY (\""
+ COLUMN_PARTITION_TOKEN
+ "\")"
+ ")"
+ " TTL INTERVAL '"
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " days' ON \""
+ COLUMN_FINISHED_AT
+ "\"";
+ "\"");
ddl.add(
"CREATE INDEX \""
+ WATERMARK_INDEX
+ "\" on \""
+ tableName
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
"CREATE INDEX \""
+ CREATED_AT_START_TIMESTAMP_INDEX
+ "\" ON \""
+ tableName
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
+ COLUMN_START_TIMESTAMP
+ "\")");
} else {
metadataCreateStmt =
ddl.add(
"CREATE TABLE "
+ tableName
+ " ("
Expand All @@ -180,16 +209,37 @@ public void createPartitionMetadataTable() {
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ COLUMN_FINISHED_AT
+ " TIMESTAMP OPTIONS (allow_commit_timestamp=true),"
+ ") PRIMARY KEY (PartitionToken),"
+ ") PRIMARY KEY ("
+ COLUMN_PARTITION_TOKEN
+ "),"
+ " ROW DELETION POLICY (OLDER_THAN("
+ COLUMN_FINISHED_AT
+ ", INTERVAL "
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " DAY))";
+ " DAY))");
ddl.add(
"CREATE INDEX "
+ WATERMARK_INDEX
+ " on "
+ tableName
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
"CREATE INDEX "
+ CREATED_AT_START_TIMESTAMP_INDEX
+ " ON "
+ tableName
+ " ("
+ COLUMN_CREATED_AT
+ ","
+ COLUMN_START_TIMESTAMP
+ ")");
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
instanceId, databaseId, Collections.singletonList(metadataCreateStmt), null);
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
Expand All @@ -212,15 +262,18 @@ public void createPartitionMetadataTable() {
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
String metadataDropStmt;
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
metadataDropStmt = "DROP TABLE \"" + tableName + "\"";
ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
ddl.add("DROP TABLE \"" + tableName + "\"");
} else {
metadataDropStmt = "DROP TABLE " + tableName;
ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
ddl.add("DROP INDEX " + WATERMARK_INDEX);
ddl.add("DROP TABLE " + tableName);
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(
instanceId, databaseId, Collections.singletonList(metadataDropStmt), null);
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
try {
// Initiate the request which returns an OperationFuture.
op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.spanner.SpannerException;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
Expand Down Expand Up @@ -86,8 +87,11 @@ public void testCreatePartitionMetadataTable() throws Exception {
partitionMetadataAdminDao.createPartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("CREATE TABLE"));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("CREATE TABLE"));
assertTrue(it.next().contains("CREATE INDEX"));
assertTrue(it.next().contains("CREATE INDEX"));
}

@Test
Expand All @@ -96,8 +100,11 @@ public void testCreatePartitionMetadataTablePostgres() throws Exception {
partitionMetadataAdminDaoPostgres.createPartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("CREATE TABLE \""));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("CREATE TABLE \""));
assertTrue(it.next().contains("CREATE INDEX \""));
assertTrue(it.next().contains("CREATE INDEX \""));
}

@Test
Expand Down Expand Up @@ -129,8 +136,11 @@ public void testDeletePartitionMetadataTable() throws Exception {
partitionMetadataAdminDao.deletePartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("DROP TABLE"));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("DROP INDEX"));
assertTrue(it.next().contains("DROP INDEX"));
assertTrue(it.next().contains("DROP TABLE"));
}

@Test
Expand All @@ -139,8 +149,11 @@ public void testDeletePartitionMetadataTablePostgres() throws Exception {
partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable();
verify(databaseAdminClient, times(1))
.updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull());
assertEquals(1, ((Collection<?>) statements.getValue()).size());
assertTrue(statements.getValue().iterator().next().contains("DROP TABLE \""));
assertEquals(3, ((Collection<?>) statements.getValue()).size());
Iterator<String> it = statements.getValue().iterator();
assertTrue(it.next().contains("DROP INDEX \""));
assertTrue(it.next().contains("DROP INDEX \""));
assertTrue(it.next().contains("DROP TABLE \""));
}

@Test
Expand Down
Loading