Skip to content

Commit

Permalink
Merge pull request #1706 from alecmerdler/memdb-watch-revision-checkp…
Browse files Browse the repository at this point in the history
…oints

Watch revision checkpoints using memdb datastore
  • Loading branch information
josephschorr authored Jan 9, 2024
2 parents 5a94656 + d9f2a08 commit d25d8a1
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 11 deletions.
26 changes: 19 additions & 7 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,14 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
var stagedUpdates []*datastore.RevisionChanges
var watchChan <-chan struct{}
var err error
stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn)
stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn, options)
if err != nil {
errs <- err
return
}

// Write the staged updates to the channel
for _, changeToWrite := range stagedUpdates {
if len(changeToWrite.RelationshipChanges) == 0 {
continue
}

select {
case updates <- changeToWrite:
default:
Expand Down Expand Up @@ -72,7 +68,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
return updates, errs
}

func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) {
func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) {
mdb.RLock()
defer mdb.RUnlock()

Expand All @@ -88,7 +84,23 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64) ([]*
lastRevision := currentTxn
for changeRaw := it.Next(); changeRaw != nil; changeRaw = it.Next() {
change := changeRaw.(*changelog)
changes = append(changes, &change.changes)

if options.Content&datastore.WatchRelationships == datastore.WatchRelationships && len(change.changes.RelationshipChanges) > 0 {
changes = append(changes, &change.changes)
}

if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision {
changes = append(changes, &datastore.RevisionChanges{
Revision: revisions.NewForTimestamp(change.revisionNanos),
IsCheckpoint: true,
})
}

if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
changes = append(changes, &change.changes)
}

lastRevision = change.revisionNanos
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/mysql/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestMySQLDatastoreDSNWithoutParseTime(t *testing.T) {
func TestMySQL8Datastore(t *testing.T) {
b := testdatastore.RunMySQLForTestingWithOptions(t, testdatastore.MySQLTesterOptions{MigrateForNewDatastore: true}, "")
dst := datastoreTester{b: b, t: t}
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory))
test.AllWithExceptions(t, test.DatastoreTesterFunc(dst.createDatastore), test.WithCategories(test.WatchSchemaCategory, test.WatchCheckpointsCategory))
additionalMySQLTests(t, b)
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/datastore/test/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ func (c Categories) WatchSchema() bool {
return ok
}

func (c Categories) WatchCheckpoints() bool {
_, ok := c[WatchCheckpointsCategory]
return ok
}

var noException = Categories{}

const (
GCCategory = "GC"
WatchCategory = "Watch"
WatchSchemaCategory = "WatchSchema"
GCCategory = "GC"
WatchCategory = "Watch"
WatchSchemaCategory = "WatchSchema"
WatchCheckpointsCategory = "WatchCheckpoints"
)

func WithCategories(cats ...string) Categories {
Expand Down Expand Up @@ -138,6 +144,10 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories)
t.Run("TestWatchSchema", func(t *testing.T) { WatchSchemaTest(t, tester) })
t.Run("TestWatchAll", func(t *testing.T) { WatchAllTest(t, tester) })
}

if !except.Watch() && !except.WatchCheckpoints() {
t.Run("TestWatchCheckpoints", func(t *testing.T) { WatchCheckpointsTest(t, tester) })
}
}

// All runs all generic datastore tests on a DatastoreTester.
Expand Down
47 changes: 47 additions & 0 deletions pkg/datastore/test/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,50 @@ func verifyMixedUpdates(

require.False(expectDisconnect, "all changes verified without expected disconnect")
}

func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) {
require := require.New(t)

ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16)
require.NoError(err)

setupDatastore(ds, require)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lowestRevision, err := ds.HeadRevision(ctx)
require.NoError(err)

changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{
Content: datastore.WatchCheckpoints | datastore.WatchRelationships,
CheckpointInterval: 100 * time.Millisecond,
})
require.Zero(len(errchan))

afterTouchRevision, err := common.WriteTuples(ctx, ds, core.RelationTupleUpdate_TOUCH,
tuple.Parse("document:firstdoc#viewer@user:tom"),
)
require.NoError(err)
verifyCheckpointUpdate(require, afterTouchRevision, changes)
}

func verifyCheckpointUpdate(
require *require.Assertions,
expectedRevision datastore.Revision,
changes <-chan *datastore.RevisionChanges,
) {
changeWait := time.NewTimer(waitForChangesTimeout)
for {
select {
case change, ok := <-changes:
require.True(ok)
if change.IsCheckpoint {
require.True(change.Revision.Equal(change.Revision) || change.Revision.GreaterThan(expectedRevision))
return
}
case <-changeWait.C:
require.Fail("Timed out", "waited for checkpoint")
}
}
}

0 comments on commit d25d8a1

Please sign in to comment.