Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the reference for changes being emitted by Watch #2232

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (p *ctxProxy) RevisionFromString(serialized string) (datastore.Revision, er
return p.delegate.RevisionFromString(serialized)
}

func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (p *ctxProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision, options)
}

Expand Down
24 changes: 12 additions & 12 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ type changeDetails struct {
}
}

func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (cds *crdbDatastore) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = cds.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

features, err := cds.Features(ctx)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (cds *crdbDatastore) watch(
ctx context.Context,
afterRevision datastore.Revision,
opts datastore.WatchOptions,
updates chan *datastore.RevisionChanges,
updates chan datastore.RevisionChanges,
errs chan error,
) {
defer close(updates)
Expand Down Expand Up @@ -173,7 +173,7 @@ func (cds *crdbDatastore) watch(
watchBufferWriteTimeout = cds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) error {
sendChange := func(change datastore.RevisionChanges) error {
select {
case updates <- change:
return nil
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev
return spiceerrors.MustBugf("unknown change operation")
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev revisions.HLCRevision, def datastore.SchemaDefinition) error {
Expand All @@ -265,7 +265,7 @@ func (s streamingChangeProvider) AddChangedDefinition(_ context.Context, rev rev
ChangedDefinitions: []datastore.SchemaDefinition{def},
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revisions.HLCRevision, namespaceName string) error {
Expand All @@ -278,7 +278,7 @@ func (s streamingChangeProvider) AddDeletedNamespace(_ context.Context, rev revi
DeletedNamespaces: []string{namespaceName},
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisions.HLCRevision, caveatName string) error {
Expand All @@ -291,7 +291,7 @@ func (s streamingChangeProvider) AddDeletedCaveat(_ context.Context, rev revisio
DeletedCaveats: []string{caveatName},
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revisions.HLCRevision, metadata map[string]any) error {
Expand All @@ -306,14 +306,14 @@ func (s streamingChangeProvider) SetRevisionMetadata(_ context.Context, rev revi
Metadata: parsedMetadata,
}

return s.sendChange(&changes)
return s.sendChange(changes)
}

return nil
}

type (
sendChangeFunc func(change *datastore.RevisionChanges) error
sendChangeFunc func(change datastore.RevisionChanges) error
sendErrorFunc func(err error)
)

Expand Down Expand Up @@ -404,14 +404,14 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows,
}
}

if err := sendChange(&revChange); err != nil {
if err := sendChange(revChange); err != nil {
sendError(err)
return
}
}

if opts.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints {
if err := sendChange(&datastore.RevisionChanges{
if err := sendChange(datastore.RevisionChanges{
Revision: rev,
IsCheckpoint: true,
}); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions internal/datastore/memdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (

const errWatchError = "watch error: %w"

func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength == 0 {
watchBufferLength = mdb.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.EmissionStrategy == datastore.EmitImmediatelyStrategy {
Expand All @@ -34,7 +34,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
watchBufferWriteTimeout = mdb.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change datastore.RevisionChanges) bool {
select {
case updates <- change:
return true
Expand Down Expand Up @@ -63,7 +63,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
currentTxn := ar.(revisions.TimestampRevision).TimestampNanoSec()

for {
var stagedUpdates []*datastore.RevisionChanges
var stagedUpdates []datastore.RevisionChanges
var watchChan <-chan struct{}
var err error
stagedUpdates, currentTxn, watchChan, err = mdb.loadChanges(ctx, currentTxn, options)
Expand Down Expand Up @@ -99,7 +99,7 @@ func (mdb *memdbDatastore) Watch(ctx context.Context, ar datastore.Revision, opt
return updates, errs
}

func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]*datastore.RevisionChanges, int64, <-chan struct{}, error) {
func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, options datastore.WatchOptions) ([]datastore.RevisionChanges, int64, <-chan struct{}, error) {
mdb.RLock()
defer mdb.RUnlock()

Expand All @@ -111,22 +111,22 @@ func (mdb *memdbDatastore) loadChanges(_ context.Context, currentTxn int64, opti
return nil, 0, nil, fmt.Errorf(errWatchError, err)
}

var changes []*datastore.RevisionChanges
var changes []datastore.RevisionChanges
lastRevision := currentTxn
for changeRaw := it.Next(); changeRaw != nil; changeRaw = it.Next() {
change := changeRaw.(*changelog)

if options.Content&datastore.WatchRelationships == datastore.WatchRelationships && len(change.changes.RelationshipChanges) > 0 {
changes = append(changes, &change.changes)
changes = append(changes, change.changes)
}

if options.Content&datastore.WatchSchema == datastore.WatchSchema &&
len(change.changes.ChangedDefinitions) > 0 || len(change.changes.DeletedCaveats) > 0 || len(change.changes.DeletedNamespaces) > 0 {
changes = append(changes, &change.changes)
changes = append(changes, change.changes)
}

if options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints && change.revisionNanos > lastRevision {
changes = append(changes, &datastore.RevisionChanges{
changes = append(changes, datastore.RevisionChanges{
Revision: revisions.NewForTimestamp(change.revisionNanos),
IsCheckpoint: true,
})
Expand Down
8 changes: 4 additions & 4 deletions internal/datastore/mysql/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ const (
// Watch notifies the caller about all changes to tuples.
//
// All events following afterRevision will be sent to the caller.
func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = mds.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if options.Content&datastore.WatchSchema == datastore.WatchSchema {
Expand All @@ -52,7 +52,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
watchBufferWriteTimeout = mds.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change datastore.RevisionChanges) bool {
select {
case updates <- change:
return true
Expand Down Expand Up @@ -95,7 +95,7 @@ func (mds *Datastore) Watch(ctx context.Context, afterRevisionRaw datastore.Revi
// Write the staged updates to the channel
for _, changeToWrite := range stagedUpdates {
changeToWrite := changeToWrite
if !sendChange(&changeToWrite) {
if !sendChange(changeToWrite) {
return
}
}
Expand Down
12 changes: 6 additions & 6 deletions internal/datastore/postgres/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ func (pgd *pgDatastore) Watch(
ctx context.Context,
afterRevisionRaw datastore.Revision,
options datastore.WatchOptions,
) (<-chan *datastore.RevisionChanges, <-chan error) {
) (<-chan datastore.RevisionChanges, <-chan error) {
watchBufferLength := options.WatchBufferLength
if watchBufferLength <= 0 {
watchBufferLength = pgd.watchBufferLength
}

updates := make(chan *datastore.RevisionChanges, watchBufferLength)
updates := make(chan datastore.RevisionChanges, watchBufferLength)
errs := make(chan error, 1)

if !pgd.watchEnabled {
Expand All @@ -97,7 +97,7 @@ func (pgd *pgDatastore) Watch(
watchBufferWriteTimeout = pgd.watchBufferWriteTimeout
}

sendChange := func(change *datastore.RevisionChanges) bool {
sendChange := func(change datastore.RevisionChanges) bool {
select {
case updates <- change:
return true
Expand Down Expand Up @@ -151,7 +151,7 @@ func (pgd *pgDatastore) Watch(

for _, changeToWrite := range changesToWrite {
changeToWrite := changeToWrite
if !sendChange(&changeToWrite) {
if !sendChange(changeToWrite) {
return
}
}
Expand All @@ -174,7 +174,7 @@ func (pgd *pgDatastore) Watch(
// move revisions forward outside of changes, these could be necessary if the caller is
// watching only a *subset* of changes.
if requestedCheckpoints {
if !sendChange(&datastore.RevisionChanges{
if !sendChange(datastore.RevisionChanges{
Revision: currentTxn,
IsCheckpoint: true,
}) {
Expand All @@ -196,7 +196,7 @@ func (pgd *pgDatastore) Watch(
}

if optionalHeadRevision.GreaterThan(currentTxn) {
if !sendChange(&datastore.RevisionChanges{
if !sendChange(datastore.RevisionChanges{
Revision: *optionalHeadRevision,
IsCheckpoint: true,
}) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/checkingreplicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (f fakeDatastore) RevisionFromString(_ string) (datastore.Revision, error)
return nil, nil
}

func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (f fakeDatastore) Watch(_ context.Context, _ datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (p *observableProxy) RevisionFromString(serialized string) (datastore.Revis
return p.delegate.RevisionFromString(serialized)
}

func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (p *observableProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision, options)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func (dm *MockDatastore) RevisionFromString(s string) (datastore.Revision, error
return args.Get(0).(datastore.Revision), args.Error(1)
}

func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (dm *MockDatastore) Watch(_ context.Context, afterRevision datastore.Revision, _ datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
args := dm.Called(afterRevision)
return args.Get(0).(<-chan *datastore.RevisionChanges), args.Get(1).(<-chan error)
return args.Get(0).(<-chan datastore.RevisionChanges), args.Get(1).(<-chan error)
}

func (dm *MockDatastore) ReadyState(_ context.Context) (datastore.ReadyState, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestWatchPassthrough(t *testing.T) {
ctx := context.Background()

delegate.On("Watch", expectedRevision).Return(
make(<-chan *datastore.RevisionChanges),
make(<-chan datastore.RevisionChanges),
make(<-chan error),
).Times(1)

Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/proxy/relationshipintegrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ func (r *relationshipIntegrityProxy) validateRelationTuple(rel tuple.Relationshi
return nil
}

func (r *relationshipIntegrityProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) {
func (r *relationshipIntegrityProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan datastore.RevisionChanges, <-chan error) {
resultsChan, errChan := r.ds.Watch(ctx, afterRevision, options)
checkedResultsChan := make(chan *datastore.RevisionChanges)
checkedResultsChan := make(chan datastore.RevisionChanges)
checkedErrChan := make(chan error, 1)

go func() {
Expand Down
10 changes: 9 additions & 1 deletion internal/datastore/proxy/schemacaching/watchingcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error {
Content: datastore.WatchSchema | datastore.WatchCheckpoints,
CheckpointInterval: p.watchHeartbeat,
})
spiceerrors.DebugAssertNotNil(ssc, "ssc is nil")
spiceerrors.DebugAssertNotNil(serrc, "serrc is nil")

log.Debug().Msg("schema watch started")

p.namespaceCache.startAtRevision(headRev)
Expand All @@ -261,7 +264,12 @@ func (p *watchingCachingProxy) startSync(ctx context.Context) error {
return

case ss := <-ssc:
log.Trace().Object("update", ss).Msg("received update from schema watch")
log.Trace().
Bool("is-checkpoint", ss.IsCheckpoint).
Int("changed-definition-count", len(ss.ChangedDefinitions)).
Int("deleted-namespace-count", len(ss.DeletedNamespaces)).
Int("deleted-caveat-count", len(ss.DeletedCaveats)).
Msg("received update from schema watch")

if ss.IsCheckpoint {
if converted, ok := ss.Revision.(revisions.WithInexactFloat64); ok {
Expand Down
Loading
Loading