Skip to content

Commit

Permalink
Export the unique, stable, ID from datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Jan 17, 2025
1 parent ec216dc commit a04dd54
Show file tree
Hide file tree
Showing 20 changed files with 154 additions and 39 deletions.
4 changes: 4 additions & 0 deletions internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR

type ctxProxy struct{ delegate datastore.Datastore }

func (p *ctxProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(SeparateContextWithTracing(ctx))
}

func (p *ctxProxy) ReadWriteTx(
ctx context.Context,
f datastore.TxUserFunc,
Expand Down
3 changes: 3 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"regexp"
"strconv"
"sync/atomic"
"time"

"github.com/IBM/pgxpoolprometheus"
Expand Down Expand Up @@ -345,6 +346,8 @@ type crdbDatastore struct {
cancel context.CancelFunc
filterMaximumIDCount uint16
supportsIntegrity bool

uniqueID atomic.Pointer[string]
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
Expand Down
27 changes: 19 additions & 8 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,33 @@ const (
colUniqueID = "unique_id"
)

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
uniqueID string
)
var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
if len(uniqueID) == 0 {
func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
if cds.uniqueID.Load() == nil {
sql, args, err := queryReadUniqueID.ToSql()
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
return "", fmt.Errorf("unable to prepare unique ID sql: %w", err)
}

var uniqueID string
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&uniqueID)
}, sql, args...); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
return "", fmt.Errorf("unable to query unique ID: %w", err)
}

cds.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return *cds.uniqueID.Load(), nil
}

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
uniqueID, err := cds.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, err
}

var nsDefs []datastore.RevisionedNamespace
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ type snapshot struct {
db *memdb.MemDB
}

func (mdb *memdbDatastore) UniqueID(_ context.Context) (string, error) {
return mdb.uniqueID, nil
}

func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader {
mdb.RLock()
defer mdb.RUnlock()
Expand Down
4 changes: 3 additions & 1 deletion internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ type Datastore struct {
createTxn sq.InsertBuilder
createBaseTxn string

uniqueID atomic.Pointer[string]

*QueryBuilder
*revisions.CachedOptimizedRevisions
revisions.CommonDecoder
Expand Down Expand Up @@ -586,7 +588,7 @@ func (mds *Datastore) isSeeded(ctx context.Context) (bool, error) {
return false, nil
}

_, err = mds.getUniqueID(ctx)
_, err = mds.UniqueID(ctx)
if err != nil {
return false, nil
}
Expand Down
24 changes: 14 additions & 10 deletions internal/datastore/mysql/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
}
}

uniqueID, err := mds.getUniqueID(ctx)
uniqueID, err := mds.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, err
}
Expand Down Expand Up @@ -88,16 +88,20 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
}, nil
}

func (mds *Datastore) getUniqueID(ctx context.Context) (string, error) {
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}
func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
if mds.uniqueID.Load() == nil {
sql, args, err := sb.Select(metadataUniqueIDColumn).From(mds.driver.Metadata()).ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}

var uniqueID string
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
var uniqueID string
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
}
mds.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return uniqueID, nil
return *mds.uniqueID.Load(), nil
}
4 changes: 3 additions & 1 deletion internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ type pgDatastore struct {
includeQueryParametersInTraces bool

credentialsProvider datastore.CredentialsProvider
uniqueID atomic.Pointer[string]

gcGroup *errgroup.Group
gcCtx context.Context
Expand Down Expand Up @@ -691,8 +692,9 @@ func (pgd *pgDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, e
if !state.IsReady {
return state, nil
}

// Ensure a datastore ID is present. This ensures the tables have not been truncated.
uniqueID, err := pgd.datastoreUniqueID(ctx)
uniqueID, err := pgd.UniqueID(ctx)
if err != nil {
return datastore.ReadyState{}, fmt.Errorf("database validation failed: %w; if you have previously run `TRUNCATE`, this database is no longer valid and must be remigrated. See: https://spicedb.dev/d/truncate-unsupported", err)
}
Expand Down
25 changes: 17 additions & 8 deletions internal/datastore/postgres/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ var (
Where(sq.Eq{colRelname: tableTuple})
)

func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) {
idSQL, idArgs, err := queryUniqueID.ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
func (pgd *pgDatastore) UniqueID(ctx context.Context) (string, error) {
if pgd.uniqueID.Load() == nil {
idSQL, idArgs, err := queryUniqueID.ToSql()
if err != nil {
return "", fmt.Errorf("unable to generate query sql: %w", err)
}

var uniqueID string
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
}); err != nil {
return "", fmt.Errorf("unable to query unique ID: %w", err)
}

pgd.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

var uniqueID string
return uniqueID, pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error {
return tx.QueryRow(ctx, idSQL, idArgs...).Scan(&uniqueID)
})
return *pgd.uniqueID.Load(), nil
}

func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore {

type observableProxy struct{ delegate datastore.Datastore }

func (p *observableProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(ctx)
}

func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
delegateReader := p.delegate.SnapshotReader(rev)
return &observableReader{delegateReader}
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type MockDatastore struct {
mock.Mock
}

func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
return "mockds", nil
}

func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
args := dm.Called(rev)
return args.Get(0).(datastore.Reader)
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/replicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (f fakeDatastore) Statistics(_ context.Context) (datastore.Stats, error) {
return datastore.Stats{}, nil
}

func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
return "fake", nil
}

func (f fakeDatastore) Close() error {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/schemacaching/watchingcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ type fakeDatastore struct {
lock sync.RWMutex
}

func (fds *fakeDatastore) UniqueID(_ context.Context) (string, error) {
return "fakedsforwatch", nil
}

func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) {
fds.lock.Lock()
defer fds.lock.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type singleflightProxy struct {

var _ datastore.Datastore = (*singleflightProxy)(nil)

func (p *singleflightProxy) UniqueID(ctx context.Context) (string, error) {
return p.delegate.UniqueID(ctx)
}

func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return p.delegate.SnapshotReader(rev)
}
Expand Down
17 changes: 15 additions & 2 deletions internal/datastore/spanner/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,27 @@ var (
nowStmt = spanner.NewStatement("SELECT CURRENT_TIMESTAMP()")
)

func (sd *spannerDatastore) headRevisionInternal(ctx context.Context) (datastore.Revision, error) {
now, err := sd.now(ctx)
if err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}

return revisions.NewForTime(now), nil
}

func (sd *spannerDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
return sd.headRevisionInternal(ctx)
}

func (sd *spannerDatastore) now(ctx context.Context) (time.Time, error) {
var timestamp time.Time
if err := sd.client.Single().Query(ctx, nowStmt).Do(func(r *spanner.Row) error {
return r.Columns(&timestamp)
}); err != nil {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
return timestamp, fmt.Errorf(errRevision, err)
}
return revisions.NewForTime(timestamp), nil
return timestamp, nil
}

func (sd *spannerDatastore) staleHeadRevision(ctx context.Context) (datastore.Revision, error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/datastore/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strconv"
"sync"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
Expand Down Expand Up @@ -97,6 +98,7 @@ type spannerDatastore struct {

tableSizesStatsTable string
filterMaximumIDCount uint16
uniqueID atomic.Pointer[string]
}

// NewSpannerDatastore returns a datastore backed by cloud spanner
Expand Down
31 changes: 22 additions & 9 deletions internal/datastore/spanner/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,29 @@ var querySomeRandomRelationships = fmt.Sprintf(`SELECT %s FROM %s LIMIT 10`,

const defaultEstimatedBytesPerRelationships = 20 // determined by looking at some sample clusters

func (sd *spannerDatastore) UniqueID(ctx context.Context) (string, error) {
if sd.uniqueID.Load() == nil {
var uniqueID string
if err := sd.client.Single().Read(
ctx,
tableMetadata,
spanner.AllKeys(),
[]string{colUniqueID},
).Do(func(r *spanner.Row) error {
return r.Columns(&uniqueID)
}); err != nil {
return "", fmt.Errorf("unable to read unique ID: %w", err)
}
sd.uniqueID.Store(&uniqueID)
return uniqueID, nil
}

return *sd.uniqueID.Load(), nil
}

func (sd *spannerDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
var uniqueID string
if err := sd.client.Single().Read(
context.Background(),
tableMetadata,
spanner.AllKeys(),
[]string{colUniqueID},
).Do(func(r *spanner.Row) error {
return r.Columns(&uniqueID)
}); err != nil {
uniqueID, err := sd.UniqueID(ctx)
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read unique ID: %w", err)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio

// ReadOnlyDatastore is an interface for reading relationships from the datastore.
type ReadOnlyDatastore interface {
// UniqueID returns a unique identifier for the datastore. This identifier
// must be stable across restarts of the datastore if the datastore is
// persistent.
UniqueID(context.Context) (string, error)

// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
// Any errors establishing the reader will be returned by subsequent calls.
SnapshotReader(Revision) Reader
Expand Down
4 changes: 4 additions & 0 deletions pkg/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ type fakeDatastore struct {
delegate Datastore
}

func (f fakeDatastore) UniqueID(_ context.Context) (string, error) {
return "fake", nil
}

func (f fakeDatastore) Unwrap() Datastore {
return f.delegate
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/datastore/test/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,21 @@ func DeleteAllDataTest(t *testing.T, tester DatastoreTester) {
}
}
}

func UniqueIDTest(t *testing.T, tester DatastoreTester) {
require := require.New(t)

// Create the datastore.
ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 1)
require.NoError(err)

// Ensure the unique ID is not empty.
uniqueID, err := ds.UniqueID(context.Background())
require.NoError(err)
require.NotEmpty(uniqueID)

// Ensure the unique ID is stable.
uniqueID2, err := ds.UniqueID(context.Background())
require.NoError(err)
require.Equal(uniqueID, uniqueID2)
}
1 change: 1 addition & 0 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories,
runner = parallel
}

t.Run("TestUniqueID", func(t *testing.T) { UniqueIDTest(t, tester) })
t.Run("TestUseAfterClose", runner(tester, UseAfterCloseTest))

t.Run("TestNamespaceNotFound", runner(tester, NamespaceNotFoundTest))
Expand Down

0 comments on commit a04dd54

Please sign in to comment.