From 978ed97b4dd5593a5256d2863aee435431848d39 Mon Sep 17 00:00:00 2001 From: Andrew Nicoll Date: Mon, 26 Aug 2024 15:18:01 +0930 Subject: [PATCH] feat: index creation --- partitionstorage/spanner.go | 21 ++++++++++++++++- partitionstorage/spanner_test.go | 39 +++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/partitionstorage/spanner.go b/partitionstorage/spanner.go index 36c4330..6f1ca16 100644 --- a/partitionstorage/spanner.go +++ b/partitionstorage/spanner.go @@ -71,6 +71,9 @@ const ( columnScheduledAt = "ScheduledAt" columnRunningAt = "RunningAt" columnFinishedAt = "FinishedAt" + + IndexWatermarkIndex = "WatermarkIndex" + IndexCreatedAtStartTimestampIndex = "CreatedAtStartTimestampIndex" ) func (s *SpannerPartitionStorage) CreateTableIfNotExists(ctx context.Context) error { @@ -107,9 +110,25 @@ func (s *SpannerPartitionStorage) CreateTableIfNotExists(ctx context.Context) er columnFinishedAt, ) + // -- For GoogleSQL dialect + // CREATE INDEX WatermarkIndex ON (Watermark) STORING (State); + // CREATE INDEX CreatedAtStartTimestampIndex ON (CreatedAt, StartTimestamp); + watermarkIndexStmt := fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %[1]s ON %[2]s(%[3]s) STORING (%[4]s)`, + IndexWatermarkIndex, + s.tableName, + columnWatermark, + columnState, + ) + timestampIndexStmt := fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %[1]s ON %[2]s(%[3]s, %[4]s)`, + IndexCreatedAtStartTimestampIndex, + s.tableName, + columnCreatedAt, + columnStartTimestamp, + ) + req := &databasepb.UpdateDatabaseDdlRequest{ Database: s.client.DatabaseName(), - Statements: []string{stmt}, + Statements: []string{stmt, watermarkIndexStmt, timestampIndexStmt}, } op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, req) if err != nil { diff --git a/partitionstorage/spanner_test.go b/partitionstorage/spanner_test.go index ec2bf5d..24cb598 100644 --- a/partitionstorage/spanner_test.go +++ b/partitionstorage/spanner_test.go @@ -2,6 +2,7 @@ package partitionstorage import ( "context" + "errors" "fmt" "log" "os" @@ -157,6 +158,15 @@ func TestSpannerPartitionStorage_CreateTableIfNotExists(t *testing.T) { if !existsTable { t.Errorf("SpannerPartitionStorage.existsTable() = %v, want %v", existsTable, false) } + + indexNames, err := getTableIndexNames(ctx, client, storage.tableName) + if err != nil { + t.Error(err) + return + } + if len(indexNames) != 3 { + t.Errorf("SpannerPartitionStorage.getTableIndexNames() = %v, want %v", len(indexNames), 3) + } } func existsTable(ctx context.Context, client *spanner.Client, tableName string) (bool, error) { @@ -178,6 +188,34 @@ func existsTable(ctx context.Context, client *spanner.Client, tableName string) return true, nil } +func getTableIndexNames(ctx context.Context, client *spanner.Client, tableName string) ([]string, error) { + iter := client.Single().Query(ctx, spanner.Statement{ + SQL: "SELECT index_name FROM information_schema.indexes where table_name = @tableName", + Params: map[string]interface{}{ + "tableName": tableName, + }, + }) + defer iter.Stop() + + expectedNames := make([]string, 3) + for { + name := "" + r, err := iter.Next() + if err != nil { + if errors.Is(err, iterator.Done) { + break + } + return nil, err + } + if err := r.ColumnByName("index_name", &name); err != nil { + return nil, err + } + expectedNames = append(expectedNames, name) + } + + return expectedNames, nil +} + func setupSpannerPartitionStorage(t *testing.T, ctx context.Context) *SpannerPartitionStorage { t.Helper() @@ -582,5 +620,4 @@ func TestSpannerPartitionStorage_Update(t *testing.T) { t.Errorf("UpdateWatermark(ctx, %+v, %q): got = %+v, want %+v", partitions[0], timestamp, got, want) } }) - }