From b923051e17f55ede0361574715a9fb0cc48246e8 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 24 Oct 2023 14:08:09 -0400 Subject: [PATCH 1/2] datastore/proxy: add singleflight proxy --- internal/datastore/proxy/singleflight.go | 85 ++++++++++++++++++++++++ pkg/cmd/server/server.go | 1 + 2 files changed, 86 insertions(+) create mode 100644 internal/datastore/proxy/singleflight.go diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go new file mode 100644 index 0000000000..b7cb087c51 --- /dev/null +++ b/internal/datastore/proxy/singleflight.go @@ -0,0 +1,85 @@ +package proxy + +import ( + "context" + + "resenje.org/singleflight" + + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" +) + +// NewSingleflightDatastoreProxy creates a new Datastore proxy which +// deduplicates calls to Datastore methods that can share results. +func NewSingleflightDatastoreProxy(d datastore.Datastore) datastore.Datastore { + return &observableProxy{delegate: d} +} + +type singleflightProxy struct { + headRevGroup singleflight.Group[string, datastore.Revision] + optRevGroup singleflight.Group[string, datastore.Revision] + checkRevGroup singleflight.Group[string, string] + statsGroup singleflight.Group[string, datastore.Stats] + featureGroup singleflight.Group[string, *datastore.Features] + delegate datastore.Datastore +} + +var _ datastore.Datastore = (*singleflightProxy)(nil) + +func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { + return p.delegate.SnapshotReader(rev) +} + +func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserFunc, opts ...options.RWTOptionsOption) (datastore.Revision, error) { + return p.delegate.ReadWriteTx(ctx, f, opts...) +} + +func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) { + rev, _, err := p.optRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) { + return p.delegate.OptimizedRevision(ctx) + }) + return rev, err +} + +func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error { + _, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) { + return "", p.delegate.CheckRevision(ctx, revision) + }) + return err +} + +func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.Revision, error) { + rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) { + return p.delegate.HeadRevision(ctx) + }) + return rev, err +} + +func (p *singleflightProxy) RevisionFromString(serialized string) (datastore.Revision, error) { + return p.delegate.RevisionFromString(serialized) +} + +func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.Revision) (<-chan *datastore.RevisionChanges, <-chan error) { + return p.delegate.Watch(ctx, afterRevision) +} + +func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) { + features, _, err := p.featureGroup.Do(ctx, "", func(ctx context.Context) (*datastore.Features, error) { + return p.delegate.Features(ctx) + }) + return features, err +} + +func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) { + stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) { + return p.delegate.Statistics(ctx) + }) + return stats, err +} + +func (p *singleflightProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) { + return p.delegate.ReadyState(ctx) +} + +func (p *singleflightProxy) Close() error { return p.delegate.Close() } +func (p *singleflightProxy) Unwrap() datastore.Datastore { return p.delegate } diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 54f1093723..3c51393ff1 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -226,6 +226,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { cachingMode = schemacaching.WatchIfSupported } + ds = proxy.NewSingleflightDatastoreProxy(ds) ds = schemacaching.NewCachingDatastoreProxy(ds, nscc, c.DatastoreConfig.GCWindow, cachingMode) ds = proxy.NewObservableDatastoreProxy(ds) closeables.AddWithError(ds.Close) From 6497bb207cb1f92d6dba39eaf98c171ef33d175a Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Thu, 26 Oct 2023 13:30:00 -0400 Subject: [PATCH 2/2] datastore/crdb: singleflight features Only singleflight features internal to CRDB so that it can be deduplicated in calls to Watch. No other datastore has an expensive Features impl. --- internal/datastore/crdb/crdb.go | 10 ++++++++++ internal/datastore/proxy/singleflight.go | 12 ++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 954c5f5026..5600bd9882 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -17,6 +17,7 @@ import ( "github.com/shopspring/decimal" "go.opentelemetry.io/otel" "golang.org/x/sync/errgroup" + "resenje.org/singleflight" datastoreinternal "github.com/authzed/spicedb/internal/datastore" "github.com/authzed/spicedb/internal/datastore/common" @@ -257,6 +258,8 @@ type crdbDatastore struct { beginChangefeedQuery string + featureGroup singleflight.Group[string, *datastore.Features] + pruneGroup *errgroup.Group ctx context.Context cancel context.CancelFunc @@ -419,6 +422,13 @@ func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (revision.De } func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, error) { + features, _, err := cds.featureGroup.Do(ctx, "", func(ictx context.Context) (*datastore.Features, error) { + return cds.features(ictx) + }) + return features, err +} + +func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, error) { var features datastore.Features head, err := cds.HeadRevision(ctx) diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index b7cb087c51..6d5fe9742f 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -20,7 +20,6 @@ type singleflightProxy struct { optRevGroup singleflight.Group[string, datastore.Revision] checkRevGroup singleflight.Group[string, string] statsGroup singleflight.Group[string, datastore.Stats] - featureGroup singleflight.Group[string, *datastore.Features] delegate datastore.Datastore } @@ -63,13 +62,6 @@ func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.R return p.delegate.Watch(ctx, afterRevision) } -func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) { - features, _, err := p.featureGroup.Do(ctx, "", func(ctx context.Context) (*datastore.Features, error) { - return p.delegate.Features(ctx) - }) - return features, err -} - func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) { stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) { return p.delegate.Statistics(ctx) @@ -77,6 +69,10 @@ func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, er return stats, err } +func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) { + return p.delegate.Features(ctx) +} + func (p *singleflightProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) { return p.delegate.ReadyState(ctx) }