Skip to content

Commit

Permalink
Merge pull request #2139 from authzed/pg-checkpoint-offband-revision-…
Browse files Browse the repository at this point in the history
…bumps

issue a checkpoint when head revision moved outside an application transaction
  • Loading branch information
vroldanbet authored Nov 21, 2024
2 parents 5c8068a + 74dc7b2 commit 5dc47f5
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 23 deletions.
85 changes: 75 additions & 10 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ func testPostgresDatastore(t *testing.T, pc []postgresConfig) {
MigrationPhase(config.migrationPhase),
))

t.Run("TestCheckpointsOnOutOfBandChanges", createDatastoreTest(
b,
CheckpointsOnOutOfBandChangesTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
))

t.Run("TestSerializationError", createDatastoreTest(
b,
SerializationErrorTest,
Expand Down Expand Up @@ -1590,25 +1599,81 @@ func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore)
return
}

rev := change.Revision.(postgresRevision)
timestamp, timestampPresent := rev.OptionalNanosTimestamp()
require.True(timestampPresent, "expected timestamp to be present in revision")
isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo)
require.True(isCorrectAndUsesNanos, "timestamp is not correct")
if !change.IsCheckpoint {
rev := change.Revision.(postgresRevision)
timestamp, timestampPresent := rev.OptionalNanosTimestamp()
require.True(timestampPresent, "expected timestamp to be present in revision")
isCorrectAndUsesNanos := time.Unix(0, int64(timestamp)).After(anHourAgo)
require.True(isCorrectAndUsesNanos, "timestamp is not correct")

_, transactionIDPresent := rev.OptionalTransactionID()
require.True(transactionIDPresent, "expected transactionID to be present in revision")
_, transactionIDPresent := rev.OptionalTransactionID()
require.True(transactionIDPresent, "expected transactionID to be present in revision")

if change.IsCheckpoint {
checkedCheckpoint = true
} else {
checkedUpdate = true
} else {
// we wait for a checkpoint right after the update. Checkpoints could happen at any time off band.
if checkedUpdate {
checkedCheckpoint = true
}
}

time.Sleep(1 * time.Millisecond)
case <-changeWait.C:
require.Fail("Timed out")
}
}
}

func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) {
require := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

// Run the watch API.
changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints,
})
require.Zero(len(errchan))

// Make the current snapshot move
pds := ds.(*pgDatastore)
tx, err := pds.writePool.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable})
require.NoError(err)
_, err = tx.Exec(ctx, "LOCK pg_class;")
require.NoError(err)
require.NoError(tx.Commit(ctx))

newRevision, err := ds.HeadRevision(ctx)
require.NoError(err)
require.True(newRevision.GreaterThan(lowestRevision))

var checkedCheckpoint bool
for {
if checkedCheckpoint {
break
}

changeWait := time.NewTimer(waitForChangesTimeout)
select {
case change, ok := <-changes:
if !ok {
require.True(checkedCheckpoint, "expected checkpoint to be emitted")
return
}

if change.IsCheckpoint {
checkedCheckpoint = change.Revision.GreaterThan(lowestRevision)
}

time.Sleep(10 * time.Millisecond)
case <-changeWait.C:
require.Fail("timed out waiting for checkpoint for out of band change")
}
}
}

const waitForChangesTimeout = 5 * time.Second
22 changes: 18 additions & 4 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ccoveille/go-safecast"
"github.com/jackc/pgx/v5"

"github.com/authzed/spicedb/internal/datastore/postgres/common"
"github.com/authzed/spicedb/pkg/datastore"
implv1 "github.com/authzed/spicedb/pkg/proto/impl/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
Expand Down Expand Up @@ -91,15 +92,28 @@ func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, e
ctx, span := tracer.Start(ctx, "HeadRevision")
defer span.End()

result, err := pgd.getHeadRevision(ctx, pgd.readPool)
if err != nil {
return nil, err
}
if result == nil {
return datastore.NoRevision, nil
}

return *result, nil
}

func (pgd *pgDatastore) getHeadRevision(ctx context.Context, querier common.Querier) (*postgresRevision, error) {
var snapshot pgSnapshot
if err := pgd.readPool.QueryRow(ctx, queryCurrentSnapshot).Scan(&snapshot); err != nil {
if err := querier.QueryRow(ctx, queryCurrentSnapshot).Scan(&snapshot); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return datastore.NoRevision, nil
return nil, nil
}
return datastore.NoRevision, fmt.Errorf(errRevision, err)

return nil, fmt.Errorf(errRevision, err)
}

return postgresRevision{snapshot: snapshot}, nil
return &postgresRevision{snapshot: snapshot}, nil
}

func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error {
Expand Down
46 changes: 38 additions & 8 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func (pgd *pgDatastore) Watch(
defer close(errs)

currentTxn := afterRevision

requestedCheckpoints := options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints
for {
newTxns, err := pgd.getNewRevisions(ctx, currentTxn)
newTxns, optionalHeadRevision, err := pgd.getNewRevisions(ctx, currentTxn, requestedCheckpoints)
if err != nil {
if errors.Is(ctx.Err(), context.Canceled) {
errs <- datastore.NewWatchCanceledErr()
Expand Down Expand Up @@ -172,7 +172,7 @@ func (pgd *pgDatastore) Watch(
// If checkpoints were requested, output a checkpoint. While the Postgres datastore does not
// move revisions forward outside of changes, these could be necessary if the caller is
// watching only a *subset* of changes.
if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if requestedCheckpoints {
if !sendChange(&datastore.RevisionChanges{
Revision: currentTxn,
IsCheckpoint: true,
Expand All @@ -181,10 +181,31 @@ func (pgd *pgDatastore) Watch(
}
}
} else {
sleep := time.NewTimer(watchSleep)
// PG head revision could move outside of changes (e.g. VACUUM ANALYZE), and we still need to
// send a checkpoint to the client, as could have determined also via Head that changes exist and
// called Watch API
//
// we need to compute the head revision in the same transaction where we determine any new spicedb,
// transactions, otherwise there could be a race that means we issue a checkpoint before we issue
// the corresponding changes.
if requestedCheckpoints {
if optionalHeadRevision == nil {
errs <- spiceerrors.MustBugf("expected to have an optional head revision")
return
}

if optionalHeadRevision.GreaterThan(currentTxn) {
if !sendChange(&datastore.RevisionChanges{
Revision: *optionalHeadRevision,
IsCheckpoint: true,
}) {
return
}
}
}

select {
case <-sleep.C:
case <-time.NewTimer(watchSleep).C:
break
case <-ctx.Done():
errs <- datastore.NewWatchCanceledErr()
Expand All @@ -197,9 +218,18 @@ func (pgd *pgDatastore) Watch(
return updates, errs
}

func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision) ([]postgresRevision, error) {
func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision, returnHeadRevision bool) ([]postgresRevision, *postgresRevision, error) {
var ids []postgresRevision
var optionalHeadRevision *postgresRevision
var err error
if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, func(tx pgx.Tx) error {
if returnHeadRevision {
optionalHeadRevision, err = pgd.getHeadRevision(ctx, tx)
if err != nil {
return fmt.Errorf("unable to get head revision: %w", err)
}
}

rows, err := tx.Query(ctx, newRevisionsQuery, afterTX.snapshot)
if err != nil {
return fmt.Errorf("unable to load new revisions: %w", err)
Expand Down Expand Up @@ -232,10 +262,10 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev
}
return nil
}); err != nil {
return nil, fmt.Errorf("transaction error: %w", err)
return nil, optionalHeadRevision, fmt.Errorf("transaction error: %w", err)
}

return ids, nil
return ids, optionalHeadRevision, nil
}

func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/testserver/datastore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
POSTGRES_TEST_USER = "postgres"
POSTGRES_TEST_PASSWORD = "secret"
POSTGRES_TEST_PORT = "5432"
POSTGRES_TEST_MAX_CONNECTIONS = "2000"
POSTGRES_TEST_MAX_CONNECTIONS = "3000"
PGBOUNCER_TEST_PORT = "6432"
)

Expand Down

0 comments on commit 5dc47f5

Please sign in to comment.