From 74dc7b26a8d1ec70266f7ac99edee10d11cb269f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 21 Nov 2024 15:24:59 +0000 Subject: [PATCH] issue a checkpoint when head revision moved outside an application transaction it's possible for `pg_current_snapshot()` (used for HeadRevision) in PG to return a new xmin:xmax outside of application transactions. For example, running `ANALYZE;` could bump it. In this situation, we need to emit a Checkpoint, in case clients triggered a call to Watch API based on a different HeadRevision from what they had cached locally (e.g. relevant to compute change deltas between revisions). The proposal is to compute head revision in the same transaction where we compute new revisions, but only if checkpoints were requested as an optimization. If the transaction determines there are no new SpiceDB transactions, we return the compute head revision, if and only if checkpoints were requested. This Watch API poll loop into 2 queries instead of 1, which can add an extra load to the database, but at least it only happens when checkpoints are being requested. --- .../postgres/postgres_shared_test.go | 85 ++++++++++++++++--- internal/datastore/postgres/revisions.go | 22 ++++- internal/datastore/postgres/watch.go | 46 ++++++++-- 3 files changed, 131 insertions(+), 22 deletions(-) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index b600f372bc..495eb3c593 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -188,6 +188,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) { MigrationPhase(config.migrationPhase), )) + t.Run("TestCheckpointsOnOutOfBandChanges", createDatastoreTest( + b, + CheckpointsOnOutOfBandChangesTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + )) + t.Run("TestSerializationError", createDatastoreTest( b, SerializationErrorTest, @@ -1590,20 +1599,24 @@ func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore) return } - rev := change.Revision.(postgresRevision) - timestamp, timestampPresent := rev.OptionalNanosTimestamp() - require.True(timestampPresent, "expected timestamp to be present in revision") - isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo) - require.True(isCorrectAndUsesNanos, "timestamp is not correct") + if !change.IsCheckpoint { + rev := change.Revision.(postgresRevision) + timestamp, timestampPresent := rev.OptionalNanosTimestamp() + require.True(timestampPresent, "expected timestamp to be present in revision") + isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo) + require.True(isCorrectAndUsesNanos, "timestamp is not correct") - _, transactionIDPresent := rev.OptionalTransactionID() - require.True(transactionIDPresent, "expected transactionID to be present in revision") + _, transactionIDPresent := rev.OptionalTransactionID() + require.True(transactionIDPresent, "expected transactionID to be present in revision") - if change.IsCheckpoint { - checkedCheckpoint = true - } else { checkedUpdate = true + } else { + // we wait for a checkpoint right after the update. Checkpoints could happen at any time off band. + if checkedUpdate { + checkedCheckpoint = true + } } + time.Sleep(1 * time.Millisecond) case <-changeWait.C: require.Fail("Timed out") @@ -1611,4 +1624,56 @@ func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore) } } +func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lowestRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + + // Run the watch API. + changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ + Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, + }) + require.Zero(len(errchan)) + + // Make the current snapshot move + pds := ds.(*pgDatastore) + tx, err := pds.writePool.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable}) + require.NoError(err) + _, err = tx.Exec(ctx, "LOCK pg_class;") + require.NoError(err) + require.NoError(tx.Commit(ctx)) + + newRevision, err := ds.HeadRevision(ctx) + require.NoError(err) + require.True(newRevision.GreaterThan(lowestRevision)) + + var checkedCheckpoint bool + for { + if checkedCheckpoint { + break + } + + changeWait := time.NewTimer(waitForChangesTimeout) + select { + case change, ok := <-changes: + if !ok { + require.True(checkedCheckpoint, "expected checkpoint to be emitted") + return + } + + if change.IsCheckpoint { + checkedCheckpoint = change.Revision.GreaterThan(lowestRevision) + } + + time.Sleep(10 * time.Millisecond) + case <-changeWait.C: + require.Fail("timed out waiting for checkpoint for out of band change") + } + } +} + const waitForChangesTimeout = 5 * time.Second diff --git a/internal/datastore/postgres/revisions.go b/internal/datastore/postgres/revisions.go index 2329d1cbaa..abfbd03ed9 100644 --- a/internal/datastore/postgres/revisions.go +++ b/internal/datastore/postgres/revisions.go @@ -12,6 +12,7 @@ import ( "github.com/ccoveille/go-safecast" "github.com/jackc/pgx/v5" + "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/pkg/datastore" implv1 "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -91,15 +92,28 @@ func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, e ctx, span := tracer.Start(ctx, "HeadRevision") defer span.End() + result, err := pgd.getHeadRevision(ctx, pgd.readPool) + if err != nil { + return nil, err + } + if result == nil { + return datastore.NoRevision, nil + } + + return *result, nil +} + +func (pgd *pgDatastore) getHeadRevision(ctx context.Context, querier common.Querier) (*postgresRevision, error) { var snapshot pgSnapshot - if err := pgd.readPool.QueryRow(ctx, queryCurrentSnapshot).Scan(&snapshot); err != nil { + if err := querier.QueryRow(ctx, queryCurrentSnapshot).Scan(&snapshot); err != nil { if errors.Is(err, pgx.ErrNoRows) { - return datastore.NoRevision, nil + return nil, nil } - return datastore.NoRevision, fmt.Errorf(errRevision, err) + + return nil, fmt.Errorf(errRevision, err) } - return postgresRevision{snapshot: snapshot}, nil + return &postgresRevision{snapshot: snapshot}, nil } func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error { diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index fc568ad30c..f409bccef3 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -123,9 +123,9 @@ func (pgd *pgDatastore) Watch( defer close(errs) currentTxn := afterRevision - + requestedCheckpoints := options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints for { - newTxns, err := pgd.getNewRevisions(ctx, currentTxn) + newTxns, optionalHeadRevision, err := pgd.getNewRevisions(ctx, currentTxn, requestedCheckpoints) if err != nil { if errors.Is(ctx.Err(), context.Canceled) { errs <- datastore.NewWatchCanceledErr() @@ -172,7 +172,7 @@ func (pgd *pgDatastore) Watch( // If checkpoints were requested, output a checkpoint. While the Postgres datastore does not // move revisions forward outside of changes, these could be necessary if the caller is // watching only a *subset* of changes. - if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { + if requestedCheckpoints { if !sendChange(&datastore.RevisionChanges{ Revision: currentTxn, IsCheckpoint: true, @@ -181,10 +181,31 @@ func (pgd *pgDatastore) Watch( } } } else { - sleep := time.NewTimer(watchSleep) + // PG head revision could move outside of changes (e.g. VACUUM ANALYZE), and we still need to + // send a checkpoint to the client, as could have determined also via Head that changes exist and + // called Watch API + // + // we need to compute the head revision in the same transaction where we determine any new spicedb, + // transactions, otherwise there could be a race that means we issue a checkpoint before we issue + // the corresponding changes. + if requestedCheckpoints { + if optionalHeadRevision == nil { + errs <- spiceerrors.MustBugf("expected to have an optional head revision") + return + } + + if optionalHeadRevision.GreaterThan(currentTxn) { + if !sendChange(&datastore.RevisionChanges{ + Revision: *optionalHeadRevision, + IsCheckpoint: true, + }) { + return + } + } + } select { - case <-sleep.C: + case <-time.NewTimer(watchSleep).C: break case <-ctx.Done(): errs <- datastore.NewWatchCanceledErr() @@ -197,9 +218,18 @@ func (pgd *pgDatastore) Watch( return updates, errs } -func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision) ([]postgresRevision, error) { +func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision, returnHeadRevision bool) ([]postgresRevision, *postgresRevision, error) { var ids []postgresRevision + var optionalHeadRevision *postgresRevision + var err error if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, func(tx pgx.Tx) error { + if returnHeadRevision { + optionalHeadRevision, err = pgd.getHeadRevision(ctx, tx) + if err != nil { + return fmt.Errorf("unable to get head revision: %w", err) + } + } + rows, err := tx.Query(ctx, newRevisionsQuery, afterTX.snapshot) if err != nil { return fmt.Errorf("unable to load new revisions: %w", err) @@ -232,10 +262,10 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev } return nil }); err != nil { - return nil, fmt.Errorf("transaction error: %w", err) + return nil, optionalHeadRevision, fmt.Errorf("transaction error: %w", err) } - return ids, nil + return ids, optionalHeadRevision, nil } func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) {