Skip to content

Commit

Permalink
issue a checkpoint when head revision moved outside an application tr…
Browse files Browse the repository at this point in the history
…ansaction

it's possible for `pg_current_snapshot()` (used for HeadRevision) in PG
to return a new xmin:xmax outside of application transactions. For
example, running `ANALYZE;` could bump it.

In this situation, we need to emit a Checkpoint, in case clients
triggered a call to Watch API based on a different HeadRevision
from what they had cached locally (e.g. relevant to compute
change deltas between revisions).

The proposal is to compute head revision in the same transaction
where we compute new revisions, but only if checkpoints were requested
as an optimization. If the transaction determines there are no new
SpiceDB transactions, we return the compute head revision, if and only
if checkpoints were requested. This Watch API poll loop into 2 queries
instead of 1, which can add an extra load to the database, but at
least it only happens when checkpoints are being requested.
  • Loading branch information
vroldanbet committed Nov 21, 2024
1 parent 880b536 commit 74dc7b2
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 22 deletions.
85 changes: 75 additions & 10 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
MigrationPhase(config.migrationPhase),
))

t.Run("TestCheckpointsOnOutOfBandChanges", createDatastoreTest(
b,
CheckpointsOnOutOfBandChangesTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))

t.Run("TestSerializationError", createDatastoreTest(
b,
SerializationErrorTest,
Expand Down Expand Up @@ -1590,25 +1599,81 @@ func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore)
return
}

rev := change.Revision.(postgresRevision)
timestamp, timestampPresent := rev.OptionalNanosTimestamp()
require.True(timestampPresent, "expected timestamp to be present in revision")
isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo)
require.True(isCorrectAndUsesNanos, "timestamp is not correct")
if !change.IsCheckpoint {
rev := change.Revision.(postgresRevision)
timestamp, timestampPresent := rev.OptionalNanosTimestamp()
require.True(timestampPresent, "expected timestamp to be present in revision")
isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo)
require.True(isCorrectAndUsesNanos, "timestamp is not correct")

_, transactionIDPresent := rev.OptionalTransactionID()
require.True(transactionIDPresent, "expected transactionID to be present in revision")
_, transactionIDPresent := rev.OptionalTransactionID()
require.True(transactionIDPresent, "expected transactionID to be present in revision")

if change.IsCheckpoint {
checkedCheckpoint = true
} else {
checkedUpdate = true
} else {
// we wait for a checkpoint right after the update. Checkpoints could happen at any time off band.
if checkedUpdate {
checkedCheckpoint = true
}
}

time.Sleep(1 * time.Millisecond)
case <-changeWait.C:
require.Fail("Timed out")
}
}
}

func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// Run the watch API.
changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints,
})
require.Zero(len(errchan))

// Make the current snapshot move
pds := ds.(*pgDatastore)
tx, err := pds.writePool.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable})
require.NoError(err)
_, err = tx.Exec(ctx, "LOCK pg_class;")
require.NoError(err)
require.NoError(tx.Commit(ctx))

newRevision, err := ds.HeadRevision(ctx)
require.NoError(err)
require.True(newRevision.GreaterThan(lowestRevision))

var checkedCheckpoint bool
for {
if checkedCheckpoint {
break
}

changeWait := time.NewTimer(waitForChangesTimeout)
select {
case change, ok := <-changes:
if !ok {
require.True(checkedCheckpoint, "expected checkpoint to be emitted")
return
}

if change.IsCheckpoint {
checkedCheckpoint = change.Revision.GreaterThan(lowestRevision)
}

time.Sleep(10 * time.Millisecond)
case <-changeWait.C:
require.Fail("timed out waiting for checkpoint for out of band change")
}
}
}

const waitForChangesTimeout = 5 * time.Second
22 changes: 18 additions & 4 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ccoveille/go-safecast"
"github.com/jackc/pgx/v5"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
implv1 "github.com/authzed/spicedb/pkg/proto/impl/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
Expand Down Expand Up @@ -91,15 +92,28 @@ func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, e
ctx, span := tracer.Start(ctx, "HeadRevision")
defer span.End()

result, err := pgd.getHeadRevision(ctx, pgd.readPool)
if err != nil {
return nil, err
}
if result == nil {
return datastore.NoRevision, nil
}

return *result, nil
}

func (pgd *pgDatastore) getHeadRevision(ctx context.Context, querier common.Querier) (*postgresRevision, error) {
var snapshot pgSnapshot
if err := pgd.readPool.QueryRow(ctx, queryCurrentSnapshot).Scan(&snapshot); err != nil {
if err := querier.QueryRow(ctx, queryCurrentSnapshot).Scan(&snapshot); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return datastore.NoRevision, nil
return nil, nil
}
return datastore.NoRevision, fmt.Errorf(errRevision, err)

return nil, fmt.Errorf(errRevision, err)
}

return postgresRevision{snapshot: snapshot}, nil
return &postgresRevision{snapshot: snapshot}, nil
}

func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error {
Expand Down
46 changes: 38 additions & 8 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func (pgd *pgDatastore) Watch(
defer close(errs)

currentTxn := afterRevision

requestedCheckpoints := options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints
for {
newTxns, err := pgd.getNewRevisions(ctx, currentTxn)
newTxns, optionalHeadRevision, err := pgd.getNewRevisions(ctx, currentTxn, requestedCheckpoints)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
errs <- datastore.NewWatchCanceledErr()
Expand Down Expand Up @@ -172,7 +172,7 @@ func (pgd *pgDatastore) Watch(
// If checkpoints were requested, output a checkpoint. While the Postgres datastore does not
// move revisions forward outside of changes, these could be necessary if the caller is
// watching only a *subset* of changes.
if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if requestedCheckpoints {
if !sendChange(&datastore.RevisionChanges{
Revision: currentTxn,
IsCheckpoint: true,
Expand All @@ -181,10 +181,31 @@ func (pgd *pgDatastore) Watch(
}
}
} else {
sleep := time.NewTimer(watchSleep)
// PG head revision could move outside of changes (e.g. VACUUM ANALYZE), and we still need to
// send a checkpoint to the client, as could have determined also via Head that changes exist and
// called Watch API
//
// we need to compute the head revision in the same transaction where we determine any new spicedb,
// transactions, otherwise there could be a race that means we issue a checkpoint before we issue
// the corresponding changes.
if requestedCheckpoints {
if optionalHeadRevision == nil {
errs <- spiceerrors.MustBugf("expected to have an optional head revision")
return
}

if optionalHeadRevision.GreaterThan(currentTxn) {
if !sendChange(&datastore.RevisionChanges{
Revision: *optionalHeadRevision,
IsCheckpoint: true,
}) {
return
}
}
}

select {
case <-sleep.C:
case <-time.NewTimer(watchSleep).C:
break
case <-ctx.Done():
errs <- datastore.NewWatchCanceledErr()
Expand All @@ -197,9 +218,18 @@ func (pgd *pgDatastore) Watch(
return updates, errs
}

func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision) ([]postgresRevision, error) {
func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision, returnHeadRevision bool) ([]postgresRevision, *postgresRevision, error) {
var ids []postgresRevision
var optionalHeadRevision *postgresRevision
var err error
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, func(tx pgx.Tx) error {
if returnHeadRevision {
optionalHeadRevision, err = pgd.getHeadRevision(ctx, tx)
if err != nil {
return fmt.Errorf("unable to get head revision: %w", err)
}
}

rows, err := tx.Query(ctx, newRevisionsQuery, afterTX.snapshot)
if err != nil {
return fmt.Errorf("unable to load new revisions: %w", err)
Expand Down Expand Up @@ -232,10 +262,10 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev
}
return nil
}); err != nil {
return nil, fmt.Errorf("transaction error: %w", err)
return nil, optionalHeadRevision, fmt.Errorf("transaction error: %w", err)
}

return ids, nil
return ids, optionalHeadRevision, nil
}

func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) {
Expand Down

0 comments on commit 74dc7b2

Please sign in to comment.