Skip to content

Commit

Permalink
Merge pull request #1997 from authzed/dispatch-chunk-safeguards
Browse files Browse the repository at this point in the history
additional dispatch chunk safeguards
  • Loading branch information
vroldanbet authored Jul 25, 2024
2 parents 5d16bb1 + a657d62 commit f4bbdf3
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 13 deletions.
6 changes: 6 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
Expand Down Expand Up @@ -107,6 +108,11 @@ type SchemaQueryFilterer struct {

// NewSchemaQueryFilterer creates a new SchemaQueryFilterer object.
func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder, filterMaximumIDCount uint16) SchemaQueryFilterer {
if filterMaximumIDCount == 0 {
filterMaximumIDCount = 100
log.Warn().Msg("SchemaQueryFilterer: filterMaximumIDCount not set, defaulting to 100")
}

return SchemaQueryFilterer{
schema: schema,
queryBuilder: initialQuery,
Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

type crdbOptions struct {
Expand Down Expand Up @@ -85,6 +86,11 @@ func generateConfig(options []Option) (crdbOptions, error) {
)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/mysql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mysql
import (
"fmt"
"time"

log "github.com/authzed/spicedb/internal/logging"
)

const (
Expand Down Expand Up @@ -82,6 +84,11 @@ func generateConfig(options []Option) (mysqlOptions, error) {
)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

type postgresOptions struct {
Expand Down Expand Up @@ -106,6 +107,11 @@ func generateConfig(options []Option) (postgresOptions, error) {
return computed, fmt.Errorf("unknown migration phase: %s", computed.migrationPhase)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/spanner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"runtime"
"time"

log "github.com/authzed/spicedb/internal/logging"
)

type spannerOptions struct {
Expand Down Expand Up @@ -88,6 +90,11 @@ func generateConfig(options []Option) (spannerOptions, error) {
return computed, fmt.Errorf("unknown migration phase: %s", computed.migrationPhase)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/dispatch/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/keys"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/cache"
)

Expand Down Expand Up @@ -74,6 +75,11 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp
fn(&opts)
}

chunkSize := opts.dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("ClusterDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}
clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits, opts.dispatchChunkSize)

if opts.prometheusSubsystem == "" {
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/combined/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
chunkSize := opts.dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("CombinedDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}
redispatch := graph.NewDispatcher(cachingRedispatch, opts.concurrencyLimits, chunkSize)
redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{})
Expand Down
30 changes: 20 additions & 10 deletions internal/dispatch/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,18 @@ func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispa
d := &localDispatcher{}

concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
chunkSize := dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("LocalOnlyDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}

d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, dispatchChunkSize)
d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, chunkSize)
d.expander = graph.NewConcurrentExpander(d)
d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, dispatchChunkSize)
d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, dispatchChunkSize)
d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, dispatchChunkSize)
d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, dispatchChunkSize)
d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, chunkSize)
d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, chunkSize)
d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, chunkSize)
d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, chunkSize)

return d
}
Expand All @@ -103,13 +108,18 @@ func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispa
// the provided redispatcher.
func NewDispatcher(redispatcher dispatch.Dispatcher, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher {
concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
chunkSize := dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("Dispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}

checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, dispatchChunkSize)
checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, chunkSize)
expander := graph.NewConcurrentExpander(redispatcher)
reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, dispatchChunkSize)
lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize)
lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, dispatchChunkSize)
lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize)
reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, chunkSize)
lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, chunkSize)
lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, chunkSize)
lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, chunkSize)

return &localDispatcher{
checker: checker,
Expand Down
10 changes: 9 additions & 1 deletion internal/services/v1/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis
config.StreamReadTimeout = streamReadTimeoutFallbackSeconds * time.Second
}

chunkSize := permServerConfig.DispatchChunkSize
if chunkSize == 0 {
log.
Warn().
Msg("experimental server config specified invalid DispatchChunkSize, defaulting to 100")
chunkSize = 100
}

return &experimentalServer{
WithServiceSpecificInterceptors: shared.WithServiceSpecificInterceptors{
Unary: middleware.ChainUnaryServer(
Expand All @@ -98,7 +106,7 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis
maxCaveatContextSize: permServerConfig.MaxCaveatContextSize,
maxConcurrency: config.BulkCheckMaxConcurrency,
dispatch: dispatch,
dispatchChunkSize: permServerConfig.DispatchChunkSize,
dispatchChunkSize: chunkSize,
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/genutil/slicez/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func ForEachChunk[T any](data []T, chunkSize uint16, handler func(items []T)) {

func ForEachChunkUntil[T any](data []T, chunkSize uint16, handler func(items []T) (bool, error)) (bool, error) {
if chunkSize == 0 {
logging.Warn().Int("invalid-chunk-size", int(chunkSize)).Msg("ForEachChunk got an invalid chunk size; defaulting to 1")
chunkSize = 1
logging.Warn().Int("invalid-chunk-size", int(chunkSize)).Msg("ForEachChunk got an invalid chunk size; defaulting to 100")
chunkSize = 100
}

dataLength := uint64(len(data))
Expand Down

0 comments on commit f4bbdf3

Please sign in to comment.