diff --git a/internal/datastore/context.go b/internal/datastore/context.go index ef452a8417..4f082d8982 100644 --- a/internal/datastore/context.go +++ b/internal/datastore/context.go @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR type ctxProxy struct{ delegate datastore.Datastore } +func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) { + return p.delegate.UniqueID(SeparateContextWithTracing(ctx)) +} + func (p *ctxProxy) ReadWriteTx( ctx context.Context, f datastore.TxUserFunc, diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index ad3bff7047..ad5afd5434 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -6,6 +6,7 @@ import ( "fmt" "regexp" "strconv" + "sync/atomic" "time" "github.com/IBM/pgxpoolprometheus" @@ -345,6 +346,8 @@ type crdbDatastore struct { cancel context.CancelFunc filterMaximumIDCount uint16 supportsIntegrity bool + + uniqueID atomic.Pointer[string] } func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index c468747b1a..dd5e66746e 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -20,22 +20,33 @@ const ( colUniqueID = "unique_id" ) -var ( - queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) - uniqueID string -) +var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) -func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { - if len(uniqueID) == 0 { +func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) { + if cds.uniqueID.Load() == nil { sql, args, err := queryReadUniqueID.ToSql() if err != nil { - return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err) + return "", fmt.Errorf("unable to prepare unique ID sql: %w", err) } + + var uniqueID string if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error { return row.Scan(&uniqueID) }, sql, args...); err != nil { - return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err) + return "", fmt.Errorf("unable to query unique ID: %w", err) } + + cds.uniqueID.Store(&uniqueID) + return uniqueID, nil + } + + return *cds.uniqueID.Load(), nil +} + +func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { + uniqueID, err := cds.UniqueID(ctx) + if err != nil { + return datastore.Stats{}, err } var nsDefs []datastore.RevisionedNamespace diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 796b1b2bc2..0205799dbc 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -100,6 +100,10 @@ type snapshot struct { db *memdb.MemDB } +func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) { + return mdb.uniqueID, nil +} + func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader { mdb.RLock() defer mdb.RUnlock() diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 3ccd1b8327..12219c4df5 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -503,6 +503,8 @@ type Datastore struct { createTxn sq.InsertBuilder createBaseTxn string + uniqueID atomic.Pointer[string] + *QueryBuilder *revisions.CachedOptimizedRevisions revisions.CommonDecoder @@ -586,7 +588,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) { return false, nil } - _, err = mds.getUniqueID(ctx) + _, err = mds.UniqueID(ctx) if err != nil { return false, nil } diff --git a/internal/datastore/mysql/stats.go b/internal/datastore/mysql/stats.go index b6841eb2d2..cb525ae734 100644 --- a/internal/datastore/mysql/stats.go +++ b/internal/datastore/mysql/stats.go @@ -30,7 +30,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) { } } - uniqueID, err := mds.getUniqueID(ctx) + uniqueID, err := mds.UniqueID(ctx) if err != nil { return datastore.Stats{}, err } @@ -88,16 +88,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) { }, nil } -func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) { - sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql() - if err != nil { - return "", fmt.Errorf("unable to generate query sql: %w", err) - } +func (mds *Datastore) UniqueID(ctx context.Context) (string, error) { + if mds.uniqueID.Load() == nil { + sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql() + if err != nil { + return "", fmt.Errorf("unable to generate query sql: %w", err) + } - var uniqueID string - if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil { - return "", fmt.Errorf("unable to query unique ID: %w", err) + var uniqueID string + if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil { + return "", fmt.Errorf("unable to query unique ID: %w", err) + } + mds.uniqueID.Store(&uniqueID) + return uniqueID, nil } - return uniqueID, nil + return *mds.uniqueID.Load(), nil } diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 8f5d66b068..95e94b5286 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -416,6 +416,7 @@ type pgDatastore struct { includeQueryParametersInTraces bool credentialsProvider datastore.CredentialsProvider + uniqueID atomic.Pointer[string] gcGroup *errgroup.Group gcCtx context.Context @@ -691,8 +692,9 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e if !state.IsReady { return state, nil } + // Ensure a datastore ID is present. This ensures the tables have not been truncated. - uniqueID, err := pgd.datastoreUniqueID(ctx) + uniqueID, err := pgd.UniqueID(ctx) if err != nil { return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err) } diff --git a/internal/datastore/postgres/stats.go b/internal/datastore/postgres/stats.go index 0e0bea63f6..2b0a1fa68e 100644 --- a/internal/datastore/postgres/stats.go +++ b/internal/datastore/postgres/stats.go @@ -28,16 +28,25 @@ var ( Where(sq.Eq{colRelname: tableTuple}) ) -func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) { - idSQL, idArgs, err := queryUniqueID.ToSql() - if err != nil { - return "", fmt.Errorf("unable to generate query sql: %w", err) +func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) { + if pgd.uniqueID.Load() == nil { + idSQL, idArgs, err := queryUniqueID.ToSql() + if err != nil { + return "", fmt.Errorf("unable to generate query sql: %w", err) + } + + var uniqueID string + if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error { + return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID) + }); err != nil { + return "", fmt.Errorf("unable to query unique ID: %w", err) + } + + pgd.uniqueID.Store(&uniqueID) + return uniqueID, nil } - var uniqueID string - return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error { - return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID) - }) + return *pgd.uniqueID.Load(), nil } func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index 3f92beb71e..86d9187326 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -68,6 +68,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore { type observableProxy struct{ delegate datastore.Datastore } +func (p *observableProxy) UniqueID(ctx context.Context) (string, error) { + return p.delegate.UniqueID(ctx) +} + func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { delegateReader := p.delegate.SnapshotReader(rev) return &observableReader{delegateReader} diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index dc59c780ff..b204fca607 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -17,6 +17,10 @@ type MockDatastore struct { mock.Mock } +func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { + return "mockds", nil +} + func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { args := dm.Called(rev) return args.Get(0).(datastore.Reader) diff --git a/internal/datastore/proxy/replicated_test.go b/internal/datastore/proxy/replicated_test.go index e882998e9d..2e530416b9 100644 --- a/internal/datastore/proxy/replicated_test.go +++ b/internal/datastore/proxy/replicated_test.go @@ -142,6 +142,10 @@ func (f fakeDatastore) Statistics(_ context.Context) (datastore.Stats, error) { return datastore.Stats{}, nil } +func (f fakeDatastore) UniqueID(_ context.Context) (string, error) { + return "fake", nil +} + func (f fakeDatastore) Close() error { return nil } diff --git a/internal/datastore/proxy/schemacaching/watchingcache_test.go b/internal/datastore/proxy/schemacaching/watchingcache_test.go index 8512e998ef..58f2158060 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache_test.go +++ b/internal/datastore/proxy/schemacaching/watchingcache_test.go @@ -333,6 +333,10 @@ type fakeDatastore struct { lock sync.RWMutex } +func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) { + return "fakedsforwatch", nil +} + func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) { fds.lock.Lock() defer fds.lock.Unlock() diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index ee02f32e1d..835fe0e962 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -24,6 +24,10 @@ type singleflightProxy struct { var _ datastore.Datastore = (*singleflightProxy)(nil) +func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) { + return p.delegate.UniqueID(ctx) +} + func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { return p.delegate.SnapshotReader(rev) } diff --git a/internal/datastore/spanner/revisions.go b/internal/datastore/spanner/revisions.go index 2f1c1abae7..b365a420c2 100644 --- a/internal/datastore/spanner/revisions.go +++ b/internal/datastore/spanner/revisions.go @@ -16,14 +16,27 @@ var ( nowStmt = spanner.NewStatement("SELECT CURRENT_TIMESTAMP()") ) +func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) { + now, err := sd.now(ctx) + if err != nil { + return datastore.NoRevision, fmt.Errorf(errRevision, err) + } + + return revisions.NewForTime(now), nil +} + func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { + return sd.headRevisionInternal(ctx) +} + +func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) { var timestamp time.Time if err := sd.client.Single().Query(ctx, nowStmt).Do(func(r *spanner.Row) error { return r.Columns(×tamp) }); err != nil { - return datastore.NoRevision, fmt.Errorf(errRevision, err) + return timestamp, fmt.Errorf(errRevision, err) } - return revisions.NewForTime(timestamp), nil + return timestamp, nil } func (sd *spannerDatastore) staleHeadRevision(ctx context.Context) (datastore.Revision, error) { diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index e894de1cee..cb9ab234fd 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -8,6 +8,7 @@ import ( "regexp" "strconv" "sync" + "sync/atomic" "time" "cloud.google.com/go/spanner" @@ -97,6 +98,7 @@ type spannerDatastore struct { tableSizesStatsTable string filterMaximumIDCount uint16 + uniqueID atomic.Pointer[string] } // NewSpannerDatastore returns a datastore backed by cloud spanner diff --git a/internal/datastore/spanner/stats.go b/internal/datastore/spanner/stats.go index 6f0e420ce7..3359dd145f 100644 --- a/internal/datastore/spanner/stats.go +++ b/internal/datastore/spanner/stats.go @@ -28,16 +28,29 @@ var querySomeRandomRelationships = fmt.Sprintf(`SELECT %s FROM %s LIMIT 10`, const defaultEstimatedBytesPerRelationships = 20 // determined by looking at some sample clusters +func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) { + if sd.uniqueID.Load() == nil { + var uniqueID string + if err := sd.client.Single().Read( + ctx, + tableMetadata, + spanner.AllKeys(), + []string{colUniqueID}, + ).Do(func(r *spanner.Row) error { + return r.Columns(&uniqueID) + }); err != nil { + return "", fmt.Errorf("unable to read unique ID: %w", err) + } + sd.uniqueID.Store(&uniqueID) + return uniqueID, nil + } + + return *sd.uniqueID.Load(), nil +} + func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) { - var uniqueID string - if err := sd.client.Single().Read( - context.Background(), - tableMetadata, - spanner.AllKeys(), - []string{colUniqueID}, - ).Do(func(r *spanner.Row) error { - return r.Columns(&uniqueID) - }); err != nil { + uniqueID, err := sd.UniqueID(ctx) + if err != nil { return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err) } diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 6505db7bd4..aee665f3fe 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -628,6 +628,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio // ReadOnlyDatastore is an interface for reading relationships from the datastore. type ReadOnlyDatastore interface { + // UniqueID returns a unique identifier for the datastore. This identifier + // must be stable across restarts of the datastore if the datastore is + // persistent. + UniqueID(context.Context) (string, error) + // SnapshotReader creates a read-only handle that reads the datastore at the specified revision. // Any errors establishing the reader will be returned by subsequent calls. SnapshotReader(Revision) Reader diff --git a/pkg/datastore/datastore_test.go b/pkg/datastore/datastore_test.go index 0dd7786643..0c2cc3fd70 100644 --- a/pkg/datastore/datastore_test.go +++ b/pkg/datastore/datastore_test.go @@ -582,6 +582,10 @@ type fakeDatastore struct { delegate Datastore } +func (f fakeDatastore) UniqueID(_ context.Context) (string, error) { + return "fake", nil +} + func (f fakeDatastore) Unwrap() Datastore { return f.delegate } diff --git a/pkg/datastore/test/basic.go b/pkg/datastore/test/basic.go index 2dc5a50e04..db2e477f1e 100644 --- a/pkg/datastore/test/basic.go +++ b/pkg/datastore/test/basic.go @@ -73,3 +73,21 @@ func DeleteAllDataTest(t *testing.T, tester DatastoreTester) { } } } + +func UniqueIDTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + // Create the datastore. + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1) + require.NoError(err) + + // Ensure the unique ID is not empty. + uniqueID, err := ds.UniqueID(context.Background()) + require.NoError(err) + require.NotEmpty(uniqueID) + + // Ensure the unique ID is stable. + uniqueID2, err := ds.UniqueID(context.Background()) + require.NoError(err) + require.Equal(uniqueID, uniqueID2) +} diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 034e265a43..b46c0441d9 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -105,6 +105,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, runner = parallel } + t.Run("TestUniqueID", func(t *testing.T) { UniqueIDTest(t, tester) }) t.Run("TestUseAfterClose", runner(tester, UseAfterCloseTest)) t.Run("TestNamespaceNotFound", runner(tester, NamespaceNotFoundTest))