From d05a0f92fd5d8684200cc882e98c295c3cac0c58 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 8 May 2024 16:36:52 -0400 Subject: [PATCH] Switch spanner datastore to use the built-in stats table for estimating rel count While this will be far less accurate of an estimate, it removes the need to write to a stats table on every write and delete, which should help with performance --- internal/datastore/spanner/readwrite.go | 35 ++--- internal/datastore/spanner/revisions.go | 6 +- internal/datastore/spanner/schema.go | 4 - internal/datastore/spanner/spanner.go | 34 +++-- internal/datastore/spanner/spanner_test.go | 109 ++++++++++++++- internal/datastore/spanner/stats.go | 151 +++++++++++++-------- internal/datastore/spanner/watch.go | 4 +- internal/testserver/datastore/spanner.go | 2 +- pkg/cmd/datastore/datastore.go | 9 +- pkg/cmd/datastore/zz_generated.options.go | 9 ++ pkg/datastore/test/datastore.go | 10 +- 11 files changed, 264 insertions(+), 109 deletions(-) diff --git a/internal/datastore/spanner/readwrite.go b/internal/datastore/spanner/readwrite.go index 327cd419b8..c940ebc9ca 100644 --- a/internal/datastore/spanner/readwrite.go +++ b/internal/datastore/spanner/readwrite.go @@ -20,8 +20,7 @@ import ( type spannerReadWriteTXN struct { spannerReader - spannerRWT *spanner.ReadWriteTransaction - disableStats bool + spannerRWT *spanner.ReadWriteTransaction } const inLimit = 10_000 // https://cloud.google.com/spanner/quotas#query-limits @@ -40,12 +39,6 @@ func (rwt spannerReadWriteTXN) WriteRelationships(ctx context.Context, mutations } } - if !rwt.disableStats { - if err := updateCounter(ctx, rwt.spannerRWT, rowCountChange); err != nil { - return fmt.Errorf(errUnableToWriteRelationships, err) - } - } - return nil } @@ -74,7 +67,7 @@ func spannerMutation( } func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { - limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, rwt.disableStats, opts...) + limitReached, err := deleteWithFilter(ctx, rwt.spannerRWT, filter, opts...) if err != nil { return false, fmt.Errorf(errUnableToDeleteRelationships, err) } @@ -82,7 +75,7 @@ func (rwt spannerReadWriteTXN) DeleteRelationships(ctx context.Context, filter * return limitReached, nil } -func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, opts ...options.DeleteOptionsOption) (bool, error) { +func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, opts ...options.DeleteOptionsOption) (bool, error) { delOpts := options.NewDeleteOptionsWithOptionsAndDefaults(opts...) var delLimit uint64 if delOpts.DeleteLimit != nil && *delOpts.DeleteLimit > 0 { @@ -94,13 +87,13 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi var numDeleted int64 if delLimit > 0 { - nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, disableStats, delLimit) + nu, err := deleteWithFilterAndLimit(ctx, rwt, filter, delLimit) if err != nil { return false, err } numDeleted = nu } else { - nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter, disableStats) + nu, err := deleteWithFilterAndNoLimit(ctx, rwt, filter) if err != nil { return false, err } @@ -108,12 +101,6 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi numDeleted = nu } - if !disableStats { - if err := updateCounter(ctx, rwt, -1*numDeleted); err != nil { - return false, err - } - } - if delLimit > 0 && uint64(numDeleted) == delLimit { return true, nil } @@ -121,7 +108,7 @@ func deleteWithFilter(ctx context.Context, rwt *spanner.ReadWriteTransaction, fi return false, nil } -func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool, delLimit uint64) (int64, error) { +func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, delLimit uint64) (int64, error) { query := queryTuplesForDelete filteredQuery, err := applyFilterToQuery(query, filter) if err != nil { @@ -172,7 +159,7 @@ func deleteWithFilterAndLimit(ctx context.Context, rwt *spanner.ReadWriteTransac return int64(len(mutations)), nil } -func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter, disableStats bool) (int64, error) { +func deleteWithFilterAndNoLimit(ctx context.Context, rwt *spanner.ReadWriteTransaction, filter *v1.RelationshipFilter) (int64, error) { query := sql.Delete(tableRelationship) filteredQuery, err := applyFilterToQuery(query, filter) if err != nil { @@ -278,7 +265,7 @@ func (rwt spannerReadWriteTXN) WriteNamespaces(_ context.Context, newConfigs ... func (rwt spannerReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error { for _, nsName := range nsNames { relFilter := &v1.RelationshipFilter{ResourceType: nsName} - if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter, rwt.disableStats); err != nil { + if _, err := deleteWithFilter(ctx, rwt.spannerRWT, relFilter); err != nil { return fmt.Errorf(errUnableToDeleteConfig, err) } @@ -313,12 +300,6 @@ func (rwt spannerReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.Bulk return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err) } - if !rwt.disableStats { - if err := updateCounter(ctx, rwt.spannerRWT, int64(numLoaded)); err != nil { - return 0, fmt.Errorf(errUnableToBulkLoadRelationships, err) - } - } - return numLoaded, nil } diff --git a/internal/datastore/spanner/revisions.go b/internal/datastore/spanner/revisions.go index 965bb553af..eadd7c809a 100644 --- a/internal/datastore/spanner/revisions.go +++ b/internal/datastore/spanner/revisions.go @@ -13,7 +13,7 @@ import ( var ParseRevisionString = revisions.RevisionParser(revisions.Timestamp) -func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { +func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { now, err := sd.now(ctx) if err != nil { return datastore.NoRevision, fmt.Errorf(errRevision, err) @@ -22,11 +22,11 @@ func (sd spannerDatastore) headRevisionInternal(ctx context.Context) (datastore. return revisions.NewForTime(now), nil } -func (sd spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { +func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { return sd.headRevisionInternal(ctx) } -func (sd spannerDatastore) now(ctx context.Context) (time.Time, error) { +func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) { var timestamp time.Time if err := sd.client.Single().Query(ctx, spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")).Do(func(r *spanner.Row) error { return r.Columns(×tamp) diff --git a/internal/datastore/spanner/schema.go b/internal/datastore/spanner/schema.go index f796953bd7..81dc8b57bb 100644 --- a/internal/datastore/spanner/schema.go +++ b/internal/datastore/spanner/schema.go @@ -24,10 +24,6 @@ const ( tableMetadata = "metadata" colUniqueID = "unique_id" - - tableCounters = "relationship_estimate_counters" - colID = "id" - colCount = "count" ) var allRelationshipCols = []string{ diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index 9efafde43a..0cb06b0dc3 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -6,6 +6,7 @@ import ( "os" "regexp" "strconv" + "sync" "time" "cloud.google.com/go/spanner" @@ -61,6 +62,8 @@ const ( defaultChangeStreamRetention = 24 * time.Hour ) +const tableSizesStatsTable = "spanner_sys.table_sizes_stats_1hour" + var ( sql = sq.StatementBuilder.PlaceholderFormat(sq.AtP) tracer = otel.Tracer("spicedb/internal/datastore/spanner") @@ -78,6 +81,11 @@ type spannerDatastore struct { client *spanner.Client config spannerOptions database string + + cachedEstimatedBytesPerRelationshipLock sync.RWMutex + cachedEstimatedBytesPerRelationship uint64 + + tableSizesStatsTable string } // NewSpannerDatastore returns a datastore backed by cloud spanner @@ -143,7 +151,7 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) ( maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())* config.maxRevisionStalenessPercent) * time.Nanosecond - ds := spannerDatastore{ + ds := &spannerDatastore{ RemoteClockRevisions: revisions.NewRemoteClockRevisions( defaultChangeStreamRetention, maxRevisionStaleness, @@ -153,11 +161,14 @@ func NewSpannerDatastore(ctx context.Context, database string, opts ...Option) ( CommonDecoder: revisions.CommonDecoder{ Kind: revisions.Timestamp, }, - client: client, - config: config, - database: database, - watchBufferWriteTimeout: config.watchBufferWriteTimeout, - watchBufferLength: config.watchBufferLength, + client: client, + config: config, + database: database, + watchBufferWriteTimeout: config.watchBufferWriteTimeout, + watchBufferLength: config.watchBufferLength, + cachedEstimatedBytesPerRelationship: 0, + cachedEstimatedBytesPerRelationshipLock: sync.RWMutex{}, + tableSizesStatsTable: tableSizesStatsTable, } ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal) @@ -195,7 +206,7 @@ func (t *traceableRTX) Query(ctx context.Context, statement spanner.Statement) * return t.delegate.Query(ctx, statement) } -func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { +func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datastore.Reader { r := revisionRaw.(revisions.TimestampRevision) txSource := func() readTX { @@ -205,7 +216,7 @@ func (sd spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datast return spannerReader{executor, txSource} } -func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { +func (sd *spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { config := options.NewRWTOptionsWithOptions(opts...) ctx, span := tracer.Start(ctx, "ReadWriteTx") @@ -221,7 +232,6 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF rwt := spannerReadWriteTXN{ spannerReader{executor, txSource}, spannerRWT, - sd.config.disableStats, } err := func() error { innerCtx, innerSpan := tracer.Start(ctx, "TxUserFunc") @@ -248,7 +258,7 @@ func (sd spannerDatastore) ReadWriteTx(ctx context.Context, fn datastore.TxUserF return revisions.NewForTime(ts), nil } -func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { +func (sd *spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) { headMigration, err := migrations.SpannerMigrations.HeadRevision() if err != nil { return datastore.ReadyState{}, fmt.Errorf("invalid head migration found for spanner: %w", err) @@ -275,11 +285,11 @@ func (sd spannerDatastore) ReadyState(ctx context.Context) (datastore.ReadyState }, nil } -func (sd spannerDatastore) Features(_ context.Context) (*datastore.Features, error) { +func (sd *spannerDatastore) Features(_ context.Context) (*datastore.Features, error) { return &datastore.Features{Watch: datastore.Feature{Enabled: true}}, nil } -func (sd spannerDatastore) Close() error { +func (sd *spannerDatastore) Close() error { sd.client.Close() return nil } diff --git a/internal/datastore/spanner/spanner_test.go b/internal/datastore/spanner/spanner_test.go index e6b6008d87..1fbc6c49f2 100644 --- a/internal/datastore/spanner/spanner_test.go +++ b/internal/datastore/spanner/spanner_test.go @@ -8,6 +8,9 @@ import ( "testing" "time" + "cloud.google.com/go/spanner" + admin "cloud.google.com/go/spanner/admin/database/apiv1" + "cloud.google.com/go/spanner/admin/database/apiv1/databasepb" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -15,10 +18,12 @@ import ( testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/test" + corev1 "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/tuple" ) // Implement TestableDatastore interface -func (sd spannerDatastore) ExampleRetryableError() error { +func (sd *spannerDatastore) ExampleRetryableError() error { return status.New(codes.Aborted, "retryable").Err() } @@ -38,5 +43,105 @@ func TestSpannerDatastore(t *testing.T) { return ds }) return ds, nil - }), test.WithCategories(test.GCCategory, test.WatchCategory)) + }), test.WithCategories(test.GCCategory, test.WatchCategory, test.StatsCategory)) + + t.Run("TestFakeStats", createDatastoreTest( + b, + FakeStatsTest, + )) +} + +type datastoreTestFunc func(t *testing.T, ds datastore.Datastore) + +func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + ds, err := NewSpannerDatastore(ctx, uri, options...) + require.NoError(t, err) + return ds + }) + defer ds.Close() + + tf(t, ds) + } +} + +const createFakeStatsTable = ` +CREATE TABLE fake_stats_table ( + interval_end TIMESTAMP, + table_name STRING(MAX), + used_bytes INT64, +) PRIMARY KEY (table_name, interval_end) +` + +func FakeStatsTest(t *testing.T, ds datastore.Datastore) { + spannerDS := ds.(*spannerDatastore) + spannerDS.tableSizesStatsTable = "fake_stats_table" + + spannerClient := spannerDS.client + ctx := context.Background() + + adminClient, err := admin.NewDatabaseAdminClient(ctx) + require.NoError(t, err) + + // Manually add the stats table to simulate the table that the emulator doesn't create. + updateOp, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{ + Database: spannerClient.DatabaseName(), + Statements: []string{ + createFakeStatsTable, + }, + }) + require.NoError(t, err) + + err = updateOp.Wait(ctx) + require.NoError(t, err) + + // Call stats with no stats rows and no relationship rows. + stats, err := ds.Statistics(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), stats.EstimatedRelationshipCount) + + // Add some relationships. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error { + return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{ + tuple.Create(tuple.MustParse("document:foo#viewer@user:tom")), + tuple.Create(tuple.MustParse("document:foo#viewer@user:sarah")), + tuple.Create(tuple.MustParse("document:foo#viewer@user:fred")), + }) + }) + require.NoError(t, err) + + // Call stats with no stats rows and some relationship rows. + stats, err = ds.Statistics(ctx) + require.NoError(t, err) + require.Equal(t, uint64(0), stats.EstimatedRelationshipCount) + + // Add some stats row with a byte count. + _, err = spannerClient.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("fake_stats_table", []string{"interval_end", "table_name", "used_bytes"}, []interface{}{ + time.Now().UTC().Add(-100 * time.Second), tableRelationship, 100, + }), + }) + require.NoError(t, err) + + // Call stats with a stats row and some relationship rows and ensure we get an estimate. + stats, err = ds.Statistics(ctx) + require.NoError(t, err) + require.Equal(t, uint64(3), stats.EstimatedRelationshipCount) + + // Add some more relationships. + _, err = ds.ReadWriteTx(ctx, func(ctx context.Context, tx datastore.ReadWriteTransaction) error { + return tx.WriteRelationships(ctx, []*corev1.RelationTupleUpdate{ + tuple.Create(tuple.MustParse("document:foo#viewer@user:tommy1236512365123651236512365123612365123655")), + tuple.Create(tuple.MustParse("document:foo#viewer@user:sara1236512365123651236512365123651236512365")), + tuple.Create(tuple.MustParse("document:foo#viewer@user:freddy1236512365123651236512365123651236512365")), + }) + }) + require.NoError(t, err) + + // Call stats again and ensure it uses the cached relationship size value, even if we'd addded more relationships. + stats, err = ds.Statistics(ctx) + require.NoError(t, err) + require.Equal(t, uint64(3), stats.EstimatedRelationshipCount) } diff --git a/internal/datastore/spanner/stats.go b/internal/datastore/spanner/stats.go index 0df2d25b79..1107c35702 100644 --- a/internal/datastore/spanner/stats.go +++ b/internal/datastore/spanner/stats.go @@ -3,24 +3,29 @@ package spanner import ( "context" "fmt" - "math/rand" - "time" + "strings" "cloud.google.com/go/spanner" "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc/codes" - log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" + core "github.com/authzed/spicedb/pkg/proto/core/v1" ) -var ( - queryRelationshipEstimate = fmt.Sprintf("SELECT SUM(%s) FROM %s", colCount, tableCounters) +var querySomeRandomRelationships = fmt.Sprintf(`SELECT %s FROM %s LIMIT 10`, + strings.Join([]string{ + colNamespace, + colObjectID, + colRelation, + colUsersetNamespace, + colUsersetObjectID, + colUsersetRelation, + }, ", "), + tableRelationship) - rng = rand.NewSource(time.Now().UnixNano()) -) +const defaultEstimatedBytesPerRelationships = 20 // determined by looking at some sample clusters -func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { +func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { var uniqueID string if err := sd.client.Single().Read( context.Background(), @@ -46,57 +51,97 @@ func (sd spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, err return datastore.Stats{}, fmt.Errorf("unable to read namespaces: %w", err) } - var estimate spanner.NullInt64 - if err := sd.client.Single().Query(ctx, spanner.Statement{SQL: queryRelationshipEstimate}).Do(func(r *spanner.Row) error { - return r.Columns(&estimate) - }); err != nil { - return datastore.Stats{}, fmt.Errorf("unable to read row counts: %w", err) - } - - return datastore.Stats{ - UniqueID: uniqueID, - ObjectTypeStatistics: datastore.ComputeObjectTypeStats(allNamespaces), - EstimatedRelationshipCount: uint64(estimate.Int64), - }, nil -} - -func updateCounter(ctx context.Context, rwt *spanner.ReadWriteTransaction, change int64) error { - newValue := change - - counterID := make([]byte, 2) - // nolint:gosec - // G404 use of non cryptographically secure random number generator is not concern here, - // as this is only used to randomly distributed the counters across multiple rows and reduce write contention - _, err := rand.New(rng).Read(counterID) - if err != nil { - return fmt.Errorf("unable to select random counter: %w", err) - } + // If there is not yet a cached estimated bytes per relationship, read a few relationships and then + // compute the average bytes per relationship. + sd.cachedEstimatedBytesPerRelationshipLock.RLock() + estimatedBytesPerRelationship := sd.cachedEstimatedBytesPerRelationship + sd.cachedEstimatedBytesPerRelationshipLock.RUnlock() + + if estimatedBytesPerRelationship == 0 { + riter := sd.client.Single().Query(ctx, spanner.Statement{SQL: querySomeRandomRelationships}) + defer riter.Stop() + + totalByteCount := 0 + totalRelationships := 0 + + if err := riter.Do(func(row *spanner.Row) error { + nextTuple := &core.RelationTuple{ + ResourceAndRelation: &core.ObjectAndRelation{}, + Subject: &core.ObjectAndRelation{}, + } + err := row.Columns( + &nextTuple.ResourceAndRelation.Namespace, + &nextTuple.ResourceAndRelation.ObjectId, + &nextTuple.ResourceAndRelation.Relation, + &nextTuple.Subject.Namespace, + &nextTuple.Subject.ObjectId, + &nextTuple.Subject.Relation, + ) + if err != nil { + return err + } + + relationshipByteCount := len(nextTuple.ResourceAndRelation.Namespace) + len(nextTuple.ResourceAndRelation.ObjectId) + + len(nextTuple.ResourceAndRelation.Relation) + len(nextTuple.Subject.Namespace) + len(nextTuple.Subject.ObjectId) + + len(nextTuple.Subject.Relation) + + totalRelationships++ + totalByteCount += relationshipByteCount + + return nil + }); err != nil { + return datastore.Stats{}, err + } - counterRow, err := rwt.ReadRow(ctx, tableCounters, spanner.Key{counterID}, []string{colCount}) - if err != nil { - if spanner.ErrCode(err) != codes.NotFound { - return fmt.Errorf("unable to read counter value: %w", err) + if totalRelationships == 0 { + return datastore.Stats{ + UniqueID: uniqueID, + ObjectTypeStatistics: datastore.ComputeObjectTypeStats(allNamespaces), + EstimatedRelationshipCount: 0, + }, nil } - // In this branch we leave newValue alone because the counter doesn't exist - } else { - var currentValue int64 - if err := counterRow.Columns(¤tValue); err != nil { - return fmt.Errorf("unable to decode counter value: %w", err) + + estimatedBytesPerRelationship = uint64(totalByteCount / totalRelationships) + if estimatedBytesPerRelationship > 0 { + sd.cachedEstimatedBytesPerRelationshipLock.Lock() + sd.cachedEstimatedBytesPerRelationship = estimatedBytesPerRelationship + sd.cachedEstimatedBytesPerRelationshipLock.Unlock() } - newValue += currentValue } - log.Ctx(ctx).Trace(). - Bytes("counterID", counterID). - Int64("newValue", newValue). - Int64("change", change). - Msg("updating counter") + if estimatedBytesPerRelationship == 0 { + estimatedBytesPerRelationship = defaultEstimatedBytesPerRelationships // Use a default + } - if err := rwt.BufferWrite([]*spanner.Mutation{ - spanner.InsertOrUpdate(tableCounters, []string{colID, colCount}, []any{counterID, newValue}), + // Reference: https://cloud.google.com/spanner/docs/introspection/table-sizes-statistics + queryRelationshipByteEstimate := fmt.Sprintf(`SELECT used_bytes FROM %s WHERE + interval_end = ( + SELECT MAX(interval_end) + FROM %s + ) + AND table_name = '%s'`, sd.tableSizesStatsTable, sd.tableSizesStatsTable, tableRelationship) + + var byteEstimate spanner.NullInt64 + if err := sd.client.Single().Query(ctx, spanner.Statement{SQL: queryRelationshipByteEstimate}).Do(func(r *spanner.Row) error { + return r.Columns(&byteEstimate) }); err != nil { - return fmt.Errorf("unable to buffer update to counter: %w", err) + return datastore.Stats{}, fmt.Errorf("unable to read tuples byte count: %w", err) + } + + // If the byte estimate is NULL, try to fallback to just selecting the single row. This is necessary for certain + // versions of the emulator. + if byteEstimate.IsNull() { + lookupSingleEstimate := fmt.Sprintf(`SELECT used_bytes FROM %s WHERE table_name = '%s'`, sd.tableSizesStatsTable, tableRelationship) + if err := sd.client.Single().Query(ctx, spanner.Statement{SQL: lookupSingleEstimate}).Do(func(r *spanner.Row) error { + return r.Columns(&byteEstimate) + }); err != nil { + return datastore.Stats{}, fmt.Errorf("unable to fallback read tuples byte count: %w", err) + } } - return nil + return datastore.Stats{ + UniqueID: uniqueID, + ObjectTypeStatistics: datastore.ComputeObjectTypeStats(allNamespaces), + EstimatedRelationshipCount: uint64(byteEstimate.Int64) / estimatedBytesPerRelationship, + }, nil } diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index 8a2c437da9..0cc707eb3d 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -51,7 +51,7 @@ func parseDatabaseName(db string) (project, instance, database string, err error return matches[1], matches[2], matches[3], nil } -func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { watchBufferLength := opts.WatchBufferLength if watchBufferLength <= 0 { watchBufferLength = sd.watchBufferLength @@ -65,7 +65,7 @@ func (sd spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Re return updates, errs } -func (sd spannerDatastore) watch( +func (sd *spannerDatastore) watch( ctx context.Context, afterRevisionRaw datastore.Revision, opts datastore.WatchOptions, diff --git a/internal/testserver/datastore/spanner.go b/internal/testserver/datastore/spanner.go index ca020e1937..426cb5665e 100644 --- a/internal/testserver/datastore/spanner.go +++ b/internal/testserver/datastore/spanner.go @@ -34,7 +34,7 @@ func RunSpannerForTesting(t testing.TB, bridgeNetworkName string, targetMigratio pool, err := dockertest.NewPool("") require.NoError(t, err) - name := fmt.Sprintf("postgres-%s", uuid.New().String()) + name := fmt.Sprintf("spanner-%s", uuid.New().String()) resource, err := pool.RunWithOptions(&dockertest.RunOptions{ Name: name, Repository: "gcr.io/cloud-spanner-emulator/emulator", diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index c35590d135..00f01eb575 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -133,10 +133,11 @@ type Config struct { GCMaxOperationTime time.Duration `debugmap:"visible"` // Spanner - SpannerCredentialsFile string `debugmap:"visible"` - SpannerEmulatorHost string `debugmap:"visible"` - SpannerMinSessions uint64 `debugmap:"visible"` - SpannerMaxSessions uint64 `debugmap:"visible"` + SpannerCredentialsFile string `debugmap:"visible"` + SpannerEmulatorHost string `debugmap:"visible"` + SpannerMinSessions uint64 `debugmap:"visible"` + SpannerMaxSessions uint64 `debugmap:"visible"` + SpannerEstimatedBytesPerRelationship uint64 `debugmap:"visible"` // MySQL TablePrefix string `debugmap:"visible"` diff --git a/pkg/cmd/datastore/zz_generated.options.go b/pkg/cmd/datastore/zz_generated.options.go index e3457544c4..34644bca28 100644 --- a/pkg/cmd/datastore/zz_generated.options.go +++ b/pkg/cmd/datastore/zz_generated.options.go @@ -63,6 +63,7 @@ func (c *Config) ToOption() ConfigOption { to.SpannerEmulatorHost = c.SpannerEmulatorHost to.SpannerMinSessions = c.SpannerMinSessions to.SpannerMaxSessions = c.SpannerMaxSessions + to.SpannerEstimatedBytesPerRelationship = c.SpannerEstimatedBytesPerRelationship to.TablePrefix = c.TablePrefix to.WatchBufferLength = c.WatchBufferLength to.WatchBufferWriteTimeout = c.WatchBufferWriteTimeout @@ -105,6 +106,7 @@ func (c Config) DebugMap() map[string]any { debugMap["SpannerEmulatorHost"] = helpers.DebugValue(c.SpannerEmulatorHost, false) debugMap["SpannerMinSessions"] = helpers.DebugValue(c.SpannerMinSessions, false) debugMap["SpannerMaxSessions"] = helpers.DebugValue(c.SpannerMaxSessions, false) + debugMap["SpannerEstimatedBytesPerRelationship"] = helpers.DebugValue(c.SpannerEstimatedBytesPerRelationship, false) debugMap["TablePrefix"] = helpers.DebugValue(c.TablePrefix, false) debugMap["WatchBufferLength"] = helpers.DebugValue(c.WatchBufferLength, false) debugMap["WatchBufferWriteTimeout"] = helpers.DebugValue(c.WatchBufferWriteTimeout, false) @@ -366,6 +368,13 @@ func WithSpannerMaxSessions(spannerMaxSessions uint64) ConfigOption { } } +// WithSpannerEstimatedBytesPerRelationship returns an option that can set SpannerEstimatedBytesPerRelationship on a Config +func WithSpannerEstimatedBytesPerRelationship(spannerEstimatedBytesPerRelationship uint64) ConfigOption { + return func(c *Config) { + c.SpannerEstimatedBytesPerRelationship = spannerEstimatedBytesPerRelationship + } +} + // WithTablePrefix returns an option that can set TablePrefix on a Config func WithTablePrefix(tablePrefix string) ConfigOption { return func(c *Config) { diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 9e8ec85b8e..13019d505b 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -46,6 +46,11 @@ func (c Categories) GC() bool { return ok } +func (c Categories) Stats() bool { + _, ok := c[StatsCategory] + return ok +} + func (c Categories) Watch() bool { _, ok := c[WatchCategory] return ok @@ -68,6 +73,7 @@ const ( WatchCategory = "Watch" WatchSchemaCategory = "WatchSchema" WatchCheckpointsCategory = "WatchCheckpoints" + StatsCategory = "Stats" ) func WithCategories(cats ...string) Categories { @@ -135,7 +141,9 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories) t.Run("TestBulkUploadAlreadyExistsError", func(t *testing.T) { BulkUploadAlreadyExistsErrorTest(t, tester) }) t.Run("TestBulkUploadAlreadyExistsSameCallError", func(t *testing.T) { BulkUploadAlreadyExistsSameCallErrorTest(t, tester) }) - t.Run("TestStats", func(t *testing.T) { StatsTest(t, tester) }) + if !except.Stats() { + t.Run("TestStats", func(t *testing.T) { StatsTest(t, tester) }) + } t.Run("TestRetries", func(t *testing.T) { RetryTest(t, tester) })