Skip to content

Commit

Permalink
Remove the reference for changes being emitted by Watch
Browse files Browse the repository at this point in the history
A change should always be found in the channel, so there is little benefit to it being a reference
  • Loading branch information
josephschorr committed Feb 3, 2025
1 parent 98e88a1 commit b09a735
Show file tree
Hide file tree
Showing 17 changed files with 74 additions and 66 deletions.
2 changes: 1 addition & 1 deletion internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (p *ctxProxy) RevisionFromString(serialized string) (datastore.Revision, er
return p.delegate.RevisionFromString(serialized)
}

func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision, options)
}

Expand Down
24 changes: 12 additions & 12 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type changeDetails struct {
}
}

func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = cds.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

features, err := cds.Features(ctx)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (cds *crdbDatastore) watch(
ctx context.Context,
afterRevision datastore.Revision,
opts datastore.WatchOptions,
updates chan *datastore.RevisionChanges,
updates chan datastore.RevisionChanges,
errs chan error,
) {
defer close(updates)
Expand Down Expand Up @@ -173,7 +173,7 @@ func (cds *crdbDatastore) watch(
watchBufferWriteTimeout = cds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) error {
sendChange := func(change datastore.RevisionChanges) error {
select {
case updates <- change:
return nil
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev
return spiceerrors.MustBugf("unknown change operation")
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error {
Expand All @@ -265,7 +265,7 @@ func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev rev
ChangedDefinitions: []datastore.SchemaDefinition{def},
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error {
Expand All @@ -278,7 +278,7 @@ func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revi
DeletedNamespaces: []string{namespaceName},
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error {
Expand All @@ -291,7 +291,7 @@ func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisio
DeletedCaveats: []string{caveatName},
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error {
Expand All @@ -306,14 +306,14 @@ func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revi
Metadata: parsedMetadata,
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

return nil
}

type (
sendChangeFunc func(change *datastore.RevisionChanges) error
sendChangeFunc func(change datastore.RevisionChanges) error
sendErrorFunc func(err error)
)

Expand Down Expand Up @@ -404,14 +404,14 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows,
}
}

if err := sendChange(&revChange); err != nil {
if err := sendChange(revChange); err != nil {
sendError(err)
return
}
}

if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if err := sendChange(&datastore.RevisionChanges{
if err := sendChange(datastore.RevisionChanges{
Revision: rev,
IsCheckpoint: true,
}); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (

const errWatchError = "watch error: %w"

func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength == 0 {
watchBufferLength = mdb.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
Expand All @@ -34,7 +34,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
watchBufferWriteTimeout = mdb.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change datastore.RevisionChanges) bool {
select {
case updates <- change:
return true
Expand Down Expand Up @@ -63,7 +63,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
currentTxn := ar.(revisions.TimestampRevision).TimestampNanoSec()

for {
var stagedUpdates []*datastore.RevisionChanges
var stagedUpdates []datastore.RevisionChanges
var watchChan <-chan struct{}
var err error
stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn, options)
Expand Down Expand Up @@ -99,7 +99,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
return updates, errs
}

func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) {
func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]datastore.RevisionChanges, int64, <-chan struct{}, error) {
mdb.RLock()
defer mdb.RUnlock()

Expand All @@ -111,22 +111,22 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti
return nil, 0, nil, fmt.Errorf(errWatchError, err)
}

var changes []*datastore.RevisionChanges
var changes []datastore.RevisionChanges
lastRevision := currentTxn
for changeRaw := it.Next(); changeRaw != nil; changeRaw = it.Next() {
change := changeRaw.(*changelog)

if options.Content&datastore.WatchRelationships == datastore.WatchRelationships && len(change.changes.RelationshipChanges) > 0 {
changes = append(changes, &change.changes)
changes = append(changes, change.changes)
}

if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
changes = append(changes, &change.changes)
changes = append(changes, change.changes)
}

if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision {
changes = append(changes, &datastore.RevisionChanges{
changes = append(changes, datastore.RevisionChanges{
Revision: revisions.NewForTimestamp(change.revisionNanos),
IsCheckpoint: true,
})
Expand Down
8 changes: 4 additions & 4 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ const (
// Watch notifies the caller about all changes to tuples.
//
// All events following afterRevision will be sent to the caller.
func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = mds.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
Expand All @@ -52,7 +52,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
watchBufferWriteTimeout = mds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change datastore.RevisionChanges) bool {
select {
case updates <- change:
return true
Expand Down Expand Up @@ -95,7 +95,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
// Write the staged updates to the channel
for _, changeToWrite := range stagedUpdates {
changeToWrite := changeToWrite
if !sendChange(&changeToWrite) {
if !sendChange(changeToWrite) {
return
}
}
Expand Down
12 changes: 6 additions & 6 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (pgd *pgDatastore) Watch(
ctx context.Context,
afterRevisionRaw datastore.Revision,
options datastore.WatchOptions,
) (<-chan *datastore.RevisionChanges, <-chan error) {
) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = pgd.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if !pgd.watchEnabled {
Expand All @@ -97,7 +97,7 @@ func (pgd *pgDatastore) Watch(
watchBufferWriteTimeout = pgd.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change datastore.RevisionChanges) bool {
select {
case updates <- change:
return true
Expand Down Expand Up @@ -151,7 +151,7 @@ func (pgd *pgDatastore) Watch(

for _, changeToWrite := range changesToWrite {
changeToWrite := changeToWrite
if !sendChange(&changeToWrite) {
if !sendChange(changeToWrite) {
return
}
}
Expand All @@ -174,7 +174,7 @@ func (pgd *pgDatastore) Watch(
// move revisions forward outside of changes, these could be necessary if the caller is
// watching only a *subset* of changes.
if requestedCheckpoints {
if !sendChange(&datastore.RevisionChanges{
if !sendChange(datastore.RevisionChanges{
Revision: currentTxn,
IsCheckpoint: true,
}) {
Expand All @@ -196,7 +196,7 @@ func (pgd *pgDatastore) Watch(
}

if optionalHeadRevision.GreaterThan(currentTxn) {
if !sendChange(&datastore.RevisionChanges{
if !sendChange(datastore.RevisionChanges{
Revision: *optionalHeadRevision,
IsCheckpoint: true,
}) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/checkingreplicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (f fakeDatastore) RevisionFromString(_ string) (datastore.Revision, error)
return nil, nil
}

func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (p *observableProxy) RevisionFromString(serialized string) (datastore.Revis
return p.delegate.RevisionFromString(serialized)
}

func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision, options)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func (dm *MockDatastore) RevisionFromString(s string) (datastore.Revision, error
return args.Get(0).(datastore.Revision), args.Error(1)
}

func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
args := dm.Called(afterRevision)
return args.Get(0).(<-chan *datastore.RevisionChanges), args.Get(1).(<-chan error)
return args.Get(0).(<-chan datastore.RevisionChanges), args.Get(1).(<-chan error)
}

func (dm *MockDatastore) ReadyState(_ context.Context) (datastore.ReadyState, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestWatchPassthrough(t *testing.T) {
ctx := context.Background()

delegate.On("Watch", expectedRevision).Return(
make(<-chan *datastore.RevisionChanges),
make(<-chan datastore.RevisionChanges),
make(<-chan error),
).Times(1)

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/relationshipintegrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ func (r *relationshipIntegrityProxy) validateRelationTuple(rel tuple.Relationshi
return nil
}

func (r *relationshipIntegrityProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (r *relationshipIntegrityProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
resultsChan, errChan := r.ds.Watch(ctx, afterRevision, options)
checkedResultsChan := make(chan *datastore.RevisionChanges)
checkedResultsChan := make(chan datastore.RevisionChanges)
checkedErrChan := make(chan error, 1)

go func() {
Expand Down
10 changes: 9 additions & 1 deletion internal/datastore/proxy/schemacaching/watchingcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error {
Content: datastore.WatchSchema | datastore.WatchCheckpoints,
CheckpointInterval: p.watchHeartbeat,
})
spiceerrors.DebugAssertNotNil(ssc, "ssc is nil")
spiceerrors.DebugAssertNotNil(serrc, "serrc is nil")

log.Debug().Msg("schema watch started")

p.namespaceCache.startAtRevision(headRev)
Expand All @@ -261,7 +264,12 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error {
return

case ss := <-ssc:
log.Trace().Object("update", ss).Msg("received update from schema watch")
log.Trace().
Bool("is-checkpoint", ss.IsCheckpoint).
Int("changed-definition-count", len(ss.ChangedDefinitions)).
Int("deleted-namespace-count", len(ss.DeletedNamespaces)).
Int("deleted-caveat-count", len(ss.DeletedCaveats)).
Msg("received update from schema watch")

if ss.IsCheckpoint {
if converted, ok := ss.Revision.(revisions.WithInexactFloat64); ok {
Expand Down
Loading

0 comments on commit b09a735

Please sign in to comment.