diff --git a/internal/datastore/context.go b/internal/datastore/context.go index ef452a8417..fd8d578893 100644 --- a/internal/datastore/context.go +++ b/internal/datastore/context.go @@ -77,7 +77,7 @@ func (p *ctxProxy) RevisionFromString(serialized string) (datastore.Revision, er return p.delegate.RevisionFromString(serialized) } -func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 2008a743ad..ba830c3278 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -65,13 +65,13 @@ type changeDetails struct { } } -func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { watchBufferLength := options.WatchBufferLength if watchBufferLength <= 0 { watchBufferLength = cds.watchBufferLength } - updates := make(chan *datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) features, err := cds.Features(ctx) @@ -102,7 +102,7 @@ func (cds *crdbDatastore) watch( ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions, - updates chan *datastore.RevisionChanges, + updates chan datastore.RevisionChanges, errs chan error, ) { defer close(updates) @@ -173,7 +173,7 @@ func (cds *crdbDatastore) watch( watchBufferWriteTimeout = cds.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) error { + sendChange := func(change datastore.RevisionChanges) error { select { case updates <- change: return nil @@ -252,7 +252,7 @@ func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev return spiceerrors.MustBugf("unknown change operation") } - return s.sendChange(&changes) + return s.sendChange(changes) } func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error { @@ -265,7 +265,7 @@ func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev rev ChangedDefinitions: []datastore.SchemaDefinition{def}, } - return s.sendChange(&changes) + return s.sendChange(changes) } func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error { @@ -278,7 +278,7 @@ func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revi DeletedNamespaces: []string{namespaceName}, } - return s.sendChange(&changes) + return s.sendChange(changes) } func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error { @@ -291,7 +291,7 @@ func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisio DeletedCaveats: []string{caveatName}, } - return s.sendChange(&changes) + return s.sendChange(changes) } func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error { @@ -306,14 +306,14 @@ func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revi Metadata: parsedMetadata, } - return s.sendChange(&changes) + return s.sendChange(changes) } return nil } type ( - sendChangeFunc func(change *datastore.RevisionChanges) error + sendChangeFunc func(change datastore.RevisionChanges) error sendErrorFunc func(err error) ) @@ -404,14 +404,14 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, } } - if err := sendChange(&revChange); err != nil { + if err := sendChange(revChange); err != nil { sendError(err) return } } if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { - if err := sendChange(&datastore.RevisionChanges{ + if err := sendChange(datastore.RevisionChanges{ Revision: rev, IsCheckpoint: true, }); err != nil { diff --git a/internal/datastore/memdb/watch.go b/internal/datastore/memdb/watch.go index 049a04ed54..feafda54f3 100644 --- a/internal/datastore/memdb/watch.go +++ b/internal/datastore/memdb/watch.go @@ -14,13 +14,13 @@ import ( const errWatchError = "watch error: %w" -func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { watchBufferLength := options.WatchBufferLength if watchBufferLength == 0 { watchBufferLength = mdb.watchBufferLength } - updates := make(chan *datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if options.EmissionStrategy == datastore.EmitImmediatelyStrategy { @@ -34,7 +34,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt watchBufferWriteTimeout = mdb.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) bool { + sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: return true @@ -63,7 +63,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt currentTxn := ar.(revisions.TimestampRevision).TimestampNanoSec() for { - var stagedUpdates []*datastore.RevisionChanges + var stagedUpdates []datastore.RevisionChanges var watchChan <-chan struct{} var err error stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn, options) @@ -99,7 +99,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt return updates, errs } -func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) { +func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]datastore.RevisionChanges, int64, <-chan struct{}, error) { mdb.RLock() defer mdb.RUnlock() @@ -111,22 +111,22 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti return nil, 0, nil, fmt.Errorf(errWatchError, err) } - var changes []*datastore.RevisionChanges + var changes []datastore.RevisionChanges lastRevision := currentTxn for changeRaw := it.Next(); changeRaw != nil; changeRaw = it.Next() { change := changeRaw.(*changelog) if options.Content&datastore.WatchRelationships == datastore.WatchRelationships && len(change.changes.RelationshipChanges) > 0 { - changes = append(changes, &change.changes) + changes = append(changes, change.changes) } if options.Content&datastore.WatchSchema == datastore.WatchSchema && len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 { - changes = append(changes, &change.changes) + changes = append(changes, change.changes) } if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision { - changes = append(changes, &datastore.RevisionChanges{ + changes = append(changes, datastore.RevisionChanges{ Revision: revisions.NewForTimestamp(change.revisionNanos), IsCheckpoint: true, }) diff --git a/internal/datastore/mysql/watch.go b/internal/datastore/mysql/watch.go index 8d03a34472..3613b312dc 100644 --- a/internal/datastore/mysql/watch.go +++ b/internal/datastore/mysql/watch.go @@ -20,13 +20,13 @@ const ( // Watch notifies the caller about all changes to tuples. // // All events following afterRevision will be sent to the caller. -func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { watchBufferLength := options.WatchBufferLength if watchBufferLength <= 0 { watchBufferLength = mds.watchBufferLength } - updates := make(chan *datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if options.Content&datastore.WatchSchema == datastore.WatchSchema { @@ -52,7 +52,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi watchBufferWriteTimeout = mds.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) bool { + sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: return true @@ -95,7 +95,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi // Write the staged updates to the channel for _, changeToWrite := range stagedUpdates { changeToWrite := changeToWrite - if !sendChange(&changeToWrite) { + if !sendChange(changeToWrite) { return } } diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 45b3bb64dd..3684021d9d 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -65,13 +65,13 @@ func (pgd *pgDatastore) Watch( ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions, -) (<-chan *datastore.RevisionChanges, <-chan error) { +) (<-chan datastore.RevisionChanges, <-chan error) { watchBufferLength := options.WatchBufferLength if watchBufferLength <= 0 { watchBufferLength = pgd.watchBufferLength } - updates := make(chan *datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if !pgd.watchEnabled { @@ -97,7 +97,7 @@ func (pgd *pgDatastore) Watch( watchBufferWriteTimeout = pgd.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) bool { + sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: return true @@ -151,7 +151,7 @@ func (pgd *pgDatastore) Watch( for _, changeToWrite := range changesToWrite { changeToWrite := changeToWrite - if !sendChange(&changeToWrite) { + if !sendChange(changeToWrite) { return } } @@ -174,7 +174,7 @@ func (pgd *pgDatastore) Watch( // move revisions forward outside of changes, these could be necessary if the caller is // watching only a *subset* of changes. if requestedCheckpoints { - if !sendChange(&datastore.RevisionChanges{ + if !sendChange(datastore.RevisionChanges{ Revision: currentTxn, IsCheckpoint: true, }) { @@ -196,7 +196,7 @@ func (pgd *pgDatastore) Watch( } if optionalHeadRevision.GreaterThan(currentTxn) { - if !sendChange(&datastore.RevisionChanges{ + if !sendChange(datastore.RevisionChanges{ Revision: *optionalHeadRevision, IsCheckpoint: true, }) { diff --git a/internal/datastore/proxy/checkingreplicated_test.go b/internal/datastore/proxy/checkingreplicated_test.go index fdf5bd528c..a1597bcdc3 100644 --- a/internal/datastore/proxy/checkingreplicated_test.go +++ b/internal/datastore/proxy/checkingreplicated_test.go @@ -114,7 +114,7 @@ func (f fakeDatastore) RevisionFromString(_ string) (datastore.Revision, error) return nil, nil } -func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return nil, nil } diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index 3f92beb71e..0b2744637a 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -110,7 +110,7 @@ func (p *observableProxy) RevisionFromString(serialized string) (datastore.Revis return p.delegate.RevisionFromString(serialized) } -func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index dc59c780ff..de32caec88 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -57,9 +57,9 @@ func (dm *MockDatastore) RevisionFromString(s string) (datastore.Revision, error return args.Get(0).(datastore.Revision), args.Error(1) } -func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { args := dm.Called(afterRevision) - return args.Get(0).(<-chan *datastore.RevisionChanges), args.Get(1).(<-chan error) + return args.Get(0).(<-chan datastore.RevisionChanges), args.Get(1).(<-chan error) } func (dm *MockDatastore) ReadyState(_ context.Context) (datastore.ReadyState, error) { diff --git a/internal/datastore/proxy/readonly_test.go b/internal/datastore/proxy/readonly_test.go index 23f90612aa..94b750f66d 100644 --- a/internal/datastore/proxy/readonly_test.go +++ b/internal/datastore/proxy/readonly_test.go @@ -117,7 +117,7 @@ func TestWatchPassthrough(t *testing.T) { ctx := context.Background() delegate.On("Watch", expectedRevision).Return( - make(<-chan *datastore.RevisionChanges), + make(<-chan datastore.RevisionChanges), make(<-chan error), ).Times(1) diff --git a/internal/datastore/proxy/relationshipintegrity.go b/internal/datastore/proxy/relationshipintegrity.go index 4b129b14d1..c7573de084 100644 --- a/internal/datastore/proxy/relationshipintegrity.go +++ b/internal/datastore/proxy/relationshipintegrity.go @@ -252,9 +252,9 @@ func (r *relationshipIntegrityProxy) validateRelationTuple(rel tuple.Relationshi return nil } -func (r *relationshipIntegrityProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (r *relationshipIntegrityProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { resultsChan, errChan := r.ds.Watch(ctx, afterRevision, options) - checkedResultsChan := make(chan *datastore.RevisionChanges) + checkedResultsChan := make(chan datastore.RevisionChanges) checkedErrChan := make(chan error, 1) go func() { diff --git a/internal/datastore/proxy/schemacaching/watchingcache.go b/internal/datastore/proxy/schemacaching/watchingcache.go index b0f8dc5f98..8499b31b7f 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache.go +++ b/internal/datastore/proxy/schemacaching/watchingcache.go @@ -243,6 +243,9 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error { Content: datastore.WatchSchema | datastore.WatchCheckpoints, CheckpointInterval: p.watchHeartbeat, }) + spiceerrors.DebugAssertNotNil(ssc, "ssc is nil") + spiceerrors.DebugAssertNotNil(serrc, "serrc is nil") + log.Debug().Msg("schema watch started") p.namespaceCache.startAtRevision(headRev) @@ -261,7 +264,12 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error { return case ss := <-ssc: - log.Trace().Object("update", ss).Msg("received update from schema watch") + log.Trace(). + Bool("is-checkpoint", ss.IsCheckpoint). + Int("changed-definition-count", len(ss.ChangedDefinitions)). + Int("deleted-namespace-count", len(ss.DeletedNamespaces)). + Int("deleted-caveat-count", len(ss.DeletedCaveats)). + Msg("received update from schema watch") if ss.IsCheckpoint { if converted, ok := ss.Revision.(revisions.WithInexactFloat64); ok { diff --git a/internal/datastore/proxy/schemacaching/watchingcache_test.go b/internal/datastore/proxy/schemacaching/watchingcache_test.go index 8512e998ef..9aa33ee456 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache_test.go +++ b/internal/datastore/proxy/schemacaching/watchingcache_test.go @@ -25,7 +25,7 @@ func TestWatchingCacheBasicOperation(t *testing.T) { headRevision: rev("0"), namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, - schemaChan: make(chan *datastore.RevisionChanges, 1), + schemaChan: make(chan datastore.RevisionChanges, 1), errChan: make(chan error, 1), } @@ -128,7 +128,7 @@ func TestWatchingCacheParallelOperations(t *testing.T) { headRevision: rev("0"), namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, - schemaChan: make(chan *datastore.RevisionChanges, 1), + schemaChan: make(chan datastore.RevisionChanges, 1), errChan: make(chan error, 1), } @@ -184,7 +184,7 @@ func TestWatchingCacheParallelReaderWriter(t *testing.T) { headRevision: rev("0"), namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, - schemaChan: make(chan *datastore.RevisionChanges, 1), + schemaChan: make(chan datastore.RevisionChanges, 1), errChan: make(chan error, 1), } @@ -236,7 +236,7 @@ func TestWatchingCacheFallbackToStandardCache(t *testing.T) { headRevision: rev("0"), namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, - schemaChan: make(chan *datastore.RevisionChanges, 1), + schemaChan: make(chan datastore.RevisionChanges, 1), errChan: make(chan error, 1), } @@ -280,7 +280,7 @@ func TestWatchingCachePrepopulated(t *testing.T) { headRevision: rev("4"), namespaces: map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition]{}, caveats: map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition]{}, - schemaChan: make(chan *datastore.RevisionChanges, 1), + schemaChan: make(chan datastore.RevisionChanges, 1), errChan: make(chan error, 1), existingNamespaces: []datastore.RevisionedNamespace{ datastore.RevisionedDefinition[*corev1.NamespaceDefinition]{ @@ -324,7 +324,7 @@ type fakeDatastore struct { namespaces map[string][]fakeEntry[datastore.RevisionedNamespace, *corev1.NamespaceDefinition] caveats map[string][]fakeEntry[datastore.RevisionedCaveat, *corev1.CaveatDefinition] - schemaChan chan *datastore.RevisionChanges + schemaChan chan datastore.RevisionChanges errChan chan error readsDisabled bool @@ -350,7 +350,7 @@ func (fds *fakeDatastore) updateCaveat(name string, def *corev1.CaveatDefinition } func (fds *fakeDatastore) sendCheckpoint(revision datastore.Revision) { - fds.schemaChan <- &datastore.RevisionChanges{ + fds.schemaChan <- datastore.RevisionChanges{ Revision: revision, IsCheckpoint: true, } @@ -373,7 +373,7 @@ func updateDef[T datastore.SchemaDefinition]( def T, isDelete bool, revision datastore.Revision, - schemaChan chan *datastore.RevisionChanges, + schemaChan chan datastore.RevisionChanges, ) { slice, ok := defs[name] if !ok { @@ -390,12 +390,12 @@ func updateDef[T datastore.SchemaDefinition]( defs[name] = slice if isDelete { - schemaChan <- &datastore.RevisionChanges{ + schemaChan <- datastore.RevisionChanges{ Revision: revision, DeletedNamespaces: []string{name}, } } else { - schemaChan <- &datastore.RevisionChanges{ + schemaChan <- datastore.RevisionChanges{ Revision: revision, ChangedDefinitions: []datastore.SchemaDefinition{def}, } @@ -526,7 +526,7 @@ func (*fakeDatastore) Statistics(context.Context) (datastore.Stats, error) { return datastore.Stats{}, fmt.Errorf("not implemented") } -func (fds *fakeDatastore) Watch(_ context.Context, _ datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (fds *fakeDatastore) Watch(_ context.Context, _ datastore.Revision, opts datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { if opts.Content&datastore.WatchSchema != datastore.WatchSchema { panic("unexpected option") } diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index ee02f32e1d..c2e13e7205 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -56,7 +56,7 @@ func (p *singleflightProxy) RevisionFromString(serialized string) (datastore.Rev return p.delegate.RevisionFromString(serialized) } -func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { return p.delegate.Watch(ctx, afterRevision, options) } diff --git a/internal/datastore/spanner/watch.go b/internal/datastore/spanner/watch.go index c312430c99..6d92f261ec 100644 --- a/internal/datastore/spanner/watch.go +++ b/internal/datastore/spanner/watch.go @@ -52,13 +52,13 @@ func parseDatabaseName(db string) (project, instance, database string, err error return matches[1], matches[2], matches[3], nil } -func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { +func (sd *spannerDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, opts datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) { watchBufferLength := opts.WatchBufferLength if watchBufferLength <= 0 { watchBufferLength = sd.watchBufferLength } - updates := make(chan *datastore.RevisionChanges, watchBufferLength) + updates := make(chan datastore.RevisionChanges, watchBufferLength) errs := make(chan error, 1) if opts.EmissionStrategy == datastore.EmitImmediatelyStrategy { @@ -76,7 +76,7 @@ func (sd *spannerDatastore) watch( ctx context.Context, afterRevisionRaw datastore.Revision, opts datastore.WatchOptions, - updates chan *datastore.RevisionChanges, + updates chan datastore.RevisionChanges, errs chan error, ) { defer close(updates) @@ -102,7 +102,7 @@ func (sd *spannerDatastore) watch( watchBufferWriteTimeout = sd.watchBufferWriteTimeout } - sendChange := func(change *datastore.RevisionChanges) bool { + sendChange := func(change datastore.RevisionChanges) bool { select { case updates <- change: return true @@ -361,7 +361,7 @@ func (sd *spannerDatastore) watch( for _, revChange := range changes { revChange := revChange - if !sendChange(&revChange) { + if !sendChange(revChange) { return datastore.NewWatchDisconnectedErr() } } @@ -369,7 +369,7 @@ func (sd *spannerDatastore) watch( if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints { for _, hbr := range record.HeartbeatRecords { - if !sendChange(&datastore.RevisionChanges{ + if !sendChange(datastore.RevisionChanges{ Revision: revisions.NewForTime(hbr.Timestamp), IsCheckpoint: true, }) { diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 6505db7bd4..a299bbf32c 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -68,7 +68,7 @@ type RevisionChanges struct { Metadata *structpb.Struct } -func (rc *RevisionChanges) DebugString() string { +func (rc RevisionChanges) DebugString() string { if rc.IsCheckpoint { return "[checkpoint]" } @@ -94,7 +94,7 @@ func (rc *RevisionChanges) DebugString() string { return debugString } -func (rc *RevisionChanges) MarshalZerologObject(e *zerolog.Event) { +func (rc RevisionChanges) MarshalZerologObject(e *zerolog.Event) { e.Str("revision", rc.Revision.String()) e.Bool("is-checkpoint", rc.IsCheckpoint) e.Array("deleted-namespaces", strArray(rc.DeletedNamespaces)) @@ -651,7 +651,7 @@ type ReadOnlyDatastore interface { // Watch notifies the caller about changes to the datastore, based on the specified options. // // All events following afterRevision will be sent to the caller. - Watch(ctx context.Context, afterRevision Revision, options WatchOptions) (<-chan *RevisionChanges, <-chan error) + Watch(ctx context.Context, afterRevision Revision, options WatchOptions) (<-chan RevisionChanges, <-chan error) // ReadyState returns a state indicating whether the datastore is ready to accept data. // Datastores that require database schema creation will return not-ready until the migrations diff --git a/pkg/datastore/datastore_test.go b/pkg/datastore/datastore_test.go index 0dd7786643..8c75061360 100644 --- a/pkg/datastore/datastore_test.go +++ b/pkg/datastore/datastore_test.go @@ -610,8 +610,8 @@ func (f fakeDatastore) RevisionFromString(_ string) (Revision, error) { return nil, nil } -func (f fakeDatastore) Watch(_ context.Context, _ Revision, _ WatchOptions) (<-chan *RevisionChanges, <-chan error) { - return nil, nil +func (f fakeDatastore) Watch(_ context.Context, _ Revision, _ WatchOptions) (<-chan RevisionChanges, <-chan error) { + panic("should never be called") } func (f fakeDatastore) ReadyState(_ context.Context) (ReadyState, error) { diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index 54cf39f3a5..44ef4c986b 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -140,7 +140,7 @@ func WatchTest(t *testing.T, tester DatastoreTester) { func VerifyUpdates( require *require.Assertions, testUpdates [][]tuple.RelationshipUpdate, - changes <-chan *datastore.RevisionChanges, + changes <-chan datastore.RevisionChanges, errchan <-chan error, expectDisconnect bool, ) { @@ -182,7 +182,7 @@ func VerifyUpdates( func VerifyUpdatesWithMetadata( require *require.Assertions, testUpdates []updateWithMetadata, - changes <-chan *datastore.RevisionChanges, + changes <-chan datastore.RevisionChanges, errchan <-chan error, expectDisconnect bool, ) { @@ -530,7 +530,7 @@ func WatchWithDeleteTest(t *testing.T, tester DatastoreTester) { func verifyNoUpdates( require *require.Assertions, - changes <-chan *datastore.RevisionChanges, + changes <-chan datastore.RevisionChanges, errchan <-chan error, expectDisconnect bool, ) { @@ -729,7 +729,7 @@ func WatchAllTest(t *testing.T, tester DatastoreTester) { func verifyMixedUpdates( require *require.Assertions, expectedUpdates [][]string, - changes <-chan *datastore.RevisionChanges, + changes <-chan datastore.RevisionChanges, errchan <-chan error, expectDisconnect bool, ) { @@ -911,7 +911,7 @@ func WatchEmissionStrategyTest(t *testing.T, tester DatastoreTester) { func verifyCheckpointUpdate( require *require.Assertions, expectedRevision datastore.Revision, - changes <-chan *datastore.RevisionChanges, + changes <-chan datastore.RevisionChanges, ) { var relChangeEmitted, schemaChangeEmitted bool changeWait := time.NewTimer(waitForChangesTimeout)