Skip to content

Commit

Permalink
Merge pull request #1607 from authzed/add-singleflight-check-dispatch
Browse files Browse the repository at this point in the history
add singleflight check dispatch
  • Loading branch information
vroldanbet authored Oct 24, 2023
2 parents d8a5af2 + da6ff63 commit f991ac7
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
resenje.org/singleflight v0.4.0
sigs.k8s.io/controller-runtime v0.16.2
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,8 @@ mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b h1:DxJ5nJdkhDlLok9K6qO+5290kphD
mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b/go.mod h1:2odslEg/xrtNQqCYg2/jCoyKnw3vv5biOc3JnIcYfL4=
mvdan.cc/unparam v0.0.0-20230312165513-e84e2d14e3b8 h1:VuJo4Mt0EVPychre4fNlDWDuE5AjXtPJpRUWqZDQhaI=
mvdan.cc/unparam v0.0.0-20230312165513-e84e2d14e3b8/go.mod h1:Oh/d7dEtzsNHGOq1Cdv8aMm3KdKhVvPbRQcM8WFpBR8=
resenje.org/singleflight v0.4.0 h1:NdOEhCxEikK2S2WxGjZV9EGSsItolQKslOOi6pE1tJc=
resenje.org/singleflight v0.4.0/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
2 changes: 2 additions & 0 deletions internal/dispatch/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/keys"
"github.com/authzed/spicedb/internal/dispatch/singleflight"
"github.com/authzed/spicedb/pkg/cache"
)

Expand Down Expand Up @@ -67,6 +68,7 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp
}

clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits)
clusterDispatch = singleflight.New(clusterDispatch, &keys.CanonicalKeyHandler{})

if opts.prometheusSubsystem == "" {
opts.prometheusSubsystem = "dispatch"
Expand Down
3 changes: 3 additions & 0 deletions internal/dispatch/combined/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/keys"
"github.com/authzed/spicedb/internal/dispatch/remote"
"github.com/authzed/spicedb/internal/dispatch/singleflight"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/cache"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
}

redispatch := graph.NewDispatcher(cachingRedispatch, opts.concurrencyLimits)
redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{})

// If an upstream is specified, create a cluster dispatcher.
if opts.upstreamAddr != "" {
Expand Down Expand Up @@ -187,6 +189,7 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
KeyHandler: &keys.CanonicalKeyHandler{},
DispatchOverallTimeout: opts.remoteDispatchTimeout,
}, secondaryClients, secondaryExprs)
redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{})
}

cachingRedispatch.SetDelegate(redispatch)
Expand Down
81 changes: 81 additions & 0 deletions internal/dispatch/singleflight/singleflight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package singleflight

import (
"context"
"encoding/hex"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"resenje.org/singleflight"

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/keys"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
)

var singleFlightCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "dispatch",
Name: "single_flight_total",
Help: "total number of dispatch requests that were single flighted",
}, []string{"method", "shared"})

func New(delegate dispatch.Dispatcher, handler keys.Handler) dispatch.Dispatcher {
return &Dispatcher{delegate: delegate, keyHandler: handler}
}

type Dispatcher struct {
delegate dispatch.Dispatcher
keyHandler keys.Handler
checkGroup singleflight.Group[string, *v1.DispatchCheckResponse]
}

func (d *Dispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) {
key, err := d.keyHandler.CheckDispatchKey(ctx, req)
if err != nil {
return &v1.DispatchCheckResponse{Metadata: &v1.ResponseMeta{
DispatchCount: 1,
}}, status.Error(codes.Internal, "unexpected DispatchCheck error")
}

keyString := hex.EncodeToString(key)
v, isShared, err := d.checkGroup.Do(ctx, keyString, func(innerCtx context.Context) (*v1.DispatchCheckResponse, error) {
return d.delegate.DispatchCheck(innerCtx, req)
})

singleFlightCount.WithLabelValues("DispatchCheck", strconv.FormatBool(isShared)).Inc()
if err != nil {
return &v1.DispatchCheckResponse{Metadata: &v1.ResponseMeta{
DispatchCount: 1,
}}, err
}

return v, err
}

func (d *Dispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error) {
return d.delegate.DispatchExpand(ctx, req)
}

func (d *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResourcesRequest, stream dispatch.ReachableResourcesStream) error {
return d.delegate.DispatchReachableResources(req, stream)
}

func (d *Dispatcher) DispatchLookupResources(req *v1.DispatchLookupResourcesRequest, stream dispatch.LookupResourcesStream) error {
return d.delegate.DispatchLookupResources(req, stream)
}

func (d *Dispatcher) DispatchLookupSubjects(req *v1.DispatchLookupSubjectsRequest, stream dispatch.LookupSubjectsStream) error {
return d.delegate.DispatchLookupSubjects(req, stream)
}

func (d *Dispatcher) Close() error {
return d.delegate.Close()
}

func (d *Dispatcher) ReadyState() dispatch.ReadyState {
return d.delegate.ReadyState()
}
146 changes: 146 additions & 0 deletions internal/dispatch/singleflight/singlegflight_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package singleflight

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/keys"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

func TestSingleFlightDispatcher(t *testing.T) {
var called atomic.Uint64
f := func() {
time.Sleep(100 * time.Millisecond)
called.Add(1)
}
disp := New(mockDispatcher{f: f}, &keys.DirectKeyHandler{})

req := &v1.DispatchCheckRequest{
ResourceRelation: tuple.RelationReference("document", "view"),
ResourceIds: []string{"foo", "bar"},
Subject: tuple.ObjectAndRelation("user", "tom", "..."),
Metadata: &v1.ResolverMeta{
AtRevision: "1234",
},
}

wg := sync.WaitGroup{}
wg.Add(4)
go func() {
_, _ = disp.DispatchCheck(context.Background(), req)
wg.Done()
}()
go func() {
_, _ = disp.DispatchCheck(context.Background(), req)
wg.Done()
}()
go func() {
_, _ = disp.DispatchCheck(context.Background(), req)

wg.Done()
}()
go func() {
_, _ = disp.DispatchCheck(context.Background(), &v1.DispatchCheckRequest{
ResourceRelation: tuple.RelationReference("document", "view"),
ResourceIds: []string{"foo", "baz"},
Subject: tuple.ObjectAndRelation("user", "tom", "..."),
Metadata: &v1.ResolverMeta{
AtRevision: "1234",
},
})
wg.Done()
}()

wg.Wait()

require.Equal(t, uint64(2), called.Load())
}

func TestSingleFlightDispatcherCancelation(t *testing.T) {
var called atomic.Uint64
run := make(chan struct{}, 1)
f := func() {
time.Sleep(100 * time.Millisecond)
called.Add(1)
run <- struct{}{}
}
disp := New(mockDispatcher{f: f}, &keys.DirectKeyHandler{})

req := &v1.DispatchCheckRequest{
ResourceRelation: tuple.RelationReference("document", "view"),
ResourceIds: []string{"foo", "bar"},
Subject: tuple.ObjectAndRelation("user", "tom", "..."),
Metadata: &v1.ResolverMeta{
AtRevision: "1234",
},
}

wg := sync.WaitGroup{}
wg.Add(3)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := disp.DispatchCheck(ctx, req)
wg.Done()
require.ErrorIs(t, err, context.DeadlineExceeded)
}()
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := disp.DispatchCheck(ctx, req)
wg.Done()
require.ErrorIs(t, err, context.DeadlineExceeded)
}()
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := disp.DispatchCheck(ctx, req)
wg.Done()
require.ErrorIs(t, err, context.DeadlineExceeded)
}()

wg.Wait()
<-run
require.Equal(t, uint64(1), called.Load())
}

type mockDispatcher struct {
f func()
}

func (m mockDispatcher) DispatchCheck(_ context.Context, _ *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) {
m.f()
return &v1.DispatchCheckResponse{}, nil
}

func (m mockDispatcher) DispatchExpand(_ context.Context, _ *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error) {
return &v1.DispatchExpandResponse{}, nil
}

func (m mockDispatcher) DispatchReachableResources(_ *v1.DispatchReachableResourcesRequest, _ dispatch.ReachableResourcesStream) error {
return nil
}

func (m mockDispatcher) DispatchLookupResources(_ *v1.DispatchLookupResourcesRequest, _ dispatch.LookupResourcesStream) error {
return nil
}

func (m mockDispatcher) DispatchLookupSubjects(_ *v1.DispatchLookupSubjectsRequest, _ dispatch.LookupSubjectsStream) error {
return nil
}

func (m mockDispatcher) Close() error {
return nil
}

func (m mockDispatcher) ReadyState() dispatch.ReadyState {
return dispatch.ReadyState{}
}
4 changes: 2 additions & 2 deletions internal/services/dispatch/v1/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func (ds *dispatchServer) Close() error {

func rewriteGraphError(ctx context.Context, err error) error {
// Check if the error can be directly used.
if _, ok := status.FromError(err); ok {
return err
if st, ok := status.FromError(err); ok {
return st.Err()
}

switch {
Expand Down

0 comments on commit f991ac7

Please sign in to comment.