Skip to content

Commit

Permalink
Merge pull request #1610 from jzelinskie/singleflight-proxy
Browse files Browse the repository at this point in the history
datastore/proxy: add singleflight proxy
  • Loading branch information
jzelinskie authored Oct 26, 2023
2 parents 172be6c + 6497bb2 commit 294aa37
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
10 changes: 10 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"
"resenje.org/singleflight"

datastoreinternal "github.com/authzed/spicedb/internal/datastore"
"github.com/authzed/spicedb/internal/datastore/common"
Expand Down Expand Up @@ -257,6 +258,8 @@ type crdbDatastore struct {

beginChangefeedQuery string

featureGroup singleflight.Group[string, *datastore.Features]

pruneGroup *errgroup.Group
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -419,6 +422,13 @@ func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (revision.De
}

func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, error) {
features, _, err := cds.featureGroup.Do(ctx, "", func(ictx context.Context) (*datastore.Features, error) {
return cds.features(ictx)
})
return features, err
}

func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, error) {
var features datastore.Features

head, err := cds.HeadRevision(ctx)
Expand Down
81 changes: 81 additions & 0 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package proxy

import (
"context"

"resenje.org/singleflight"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
)

// NewSingleflightDatastoreProxy creates a new Datastore proxy which
// deduplicates calls to Datastore methods that can share results.
func NewSingleflightDatastoreProxy(d datastore.Datastore) datastore.Datastore {
return &observableProxy{delegate: d}
}

type singleflightProxy struct {
headRevGroup singleflight.Group[string, datastore.Revision]
optRevGroup singleflight.Group[string, datastore.Revision]
checkRevGroup singleflight.Group[string, string]
statsGroup singleflight.Group[string, datastore.Stats]
delegate datastore.Datastore
}

var _ datastore.Datastore = (*singleflightProxy)(nil)

func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader {
return p.delegate.SnapshotReader(rev)
}

func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) {
return p.delegate.ReadWriteTx(ctx, f, opts...)
}

func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
rev, _, err := p.optRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
return p.delegate.OptimizedRevision(ctx)
})
return rev, err
}

func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error {
_, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) {
return "", p.delegate.CheckRevision(ctx, revision)
})
return err
}

func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.Revision, error) {
rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
return p.delegate.HeadRevision(ctx)
})
return rev, err
}

func (p *singleflightProxy) RevisionFromString(serialized string) (datastore.Revision, error) {
return p.delegate.RevisionFromString(serialized)
}

func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.Revision) (<-chan *datastore.RevisionChanges, <-chan error) {
return p.delegate.Watch(ctx, afterRevision)
}

func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) {
stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) {
return p.delegate.Statistics(ctx)
})
return stats, err
}

func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) {
return p.delegate.Features(ctx)
}

func (p *singleflightProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
return p.delegate.ReadyState(ctx)
}

func (p *singleflightProxy) Close() error { return p.delegate.Close() }
func (p *singleflightProxy) Unwrap() datastore.Datastore { return p.delegate }
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
cachingMode = schemacaching.WatchIfSupported
}

ds = proxy.NewSingleflightDatastoreProxy(ds)
ds = schemacaching.NewCachingDatastoreProxy(ds, nscc, c.DatastoreConfig.GCWindow, cachingMode)
ds = proxy.NewObservableDatastoreProxy(ds)
closeables.AddWithError(ds.Close)
Expand Down

0 comments on commit 294aa37

Please sign in to comment.