Skip to content

Commit

Permalink
datastore/proxy: add singleflight proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
jzelinskie committed Oct 26, 2023
1 parent 172be6c commit b923051
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
85 changes: 85 additions & 0 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package proxy

import (
"context"

"resenje.org/singleflight"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
)

// NewSingleflightDatastoreProxy creates a new Datastore proxy which
// deduplicates calls to Datastore methods that can share results.
func NewSingleflightDatastoreProxy(d datastore.Datastore) datastore.Datastore {
return &observableProxy{delegate: d}
}

type singleflightProxy struct {
headRevGroup singleflight.Group[string, datastore.Revision]
optRevGroup singleflight.Group[string, datastore.Revision]
checkRevGroup singleflight.Group[string, string]
statsGroup singleflight.Group[string, datastore.Stats]
featureGroup singleflight.Group[string, *datastore.Features]
delegate datastore.Datastore
}

var _ datastore.Datastore = (*singleflightProxy)(nil)

func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return p.delegate.SnapshotReader(rev)
}

func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
return p.delegate.ReadWriteTx(ctx, f, opts...)
}

func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
rev, _, err := p.optRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
return p.delegate.OptimizedRevision(ctx)
})
return rev, err
}

func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error {
_, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) {
return "", p.delegate.CheckRevision(ctx, revision)
})
return err
}

func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.Revision, error) {
rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
return p.delegate.HeadRevision(ctx)
})
return rev, err
}

func (p *singleflightProxy) RevisionFromString(serialized string) (datastore.Revision, error) {
return p.delegate.RevisionFromString(serialized)
}

func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.Revision) (<-chan *datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision)
}

func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) {
features, _, err := p.featureGroup.Do(ctx, "", func(ctx context.Context) (*datastore.Features, error) {
return p.delegate.Features(ctx)
})
return features, err
}

func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) {
stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) {
return p.delegate.Statistics(ctx)
})
return stats, err
}

func (p *singleflightProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
return p.delegate.ReadyState(ctx)
}

func (p *singleflightProxy) Close() error { return p.delegate.Close() }
func (p *singleflightProxy) Unwrap() datastore.Datastore { return p.delegate }
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
cachingMode = schemacaching.WatchIfSupported
}

ds = proxy.NewSingleflightDatastoreProxy(ds)
ds = schemacaching.NewCachingDatastoreProxy(ds, nscc, c.DatastoreConfig.GCWindow, cachingMode)
ds = proxy.NewObservableDatastoreProxy(ds)
closeables.AddWithError(ds.Close)
Expand Down

0 comments on commit b923051

Please sign in to comment.