Skip to content

Commit

Permalink
additional dispatch chunk safeguards
Browse files Browse the repository at this point in the history
the parameter is wired into various parts of the application
and leaving it unset can lead to runtime errors
  • Loading branch information
vroldanbet committed Jul 25, 2024
1 parent 5d16bb1 commit a657d62
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 13 deletions.
6 changes: 6 additions & 0 deletions internal/datastore/common/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
Expand Down Expand Up @@ -107,6 +108,11 @@ type SchemaQueryFilterer struct {

// NewSchemaQueryFilterer creates a new SchemaQueryFilterer object.
func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder, filterMaximumIDCount uint16) SchemaQueryFilterer {
if filterMaximumIDCount == 0 {
filterMaximumIDCount = 100
log.Warn().Msg("SchemaQueryFilterer: filterMaximumIDCount not set, defaulting to 100")
}

return SchemaQueryFilterer{
schema: schema,
queryBuilder: initialQuery,
Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

type crdbOptions struct {
Expand Down Expand Up @@ -85,6 +86,11 @@ func generateConfig(options []Option) (crdbOptions, error) {
)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/mysql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mysql
import (
"fmt"
"time"

log "github.com/authzed/spicedb/internal/logging"
)

const (
Expand Down Expand Up @@ -82,6 +84,11 @@ func generateConfig(options []Option) (mysqlOptions, error) {
)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
log "github.com/authzed/spicedb/internal/logging"
)

type postgresOptions struct {
Expand Down Expand Up @@ -106,6 +107,11 @@ func generateConfig(options []Option) (postgresOptions, error) {
return computed, fmt.Errorf("unknown migration phase: %s", computed.migrationPhase)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datastore/spanner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"runtime"
"time"

log "github.com/authzed/spicedb/internal/logging"
)

type spannerOptions struct {
Expand Down Expand Up @@ -88,6 +90,11 @@ func generateConfig(options []Option) (spannerOptions, error) {
return computed, fmt.Errorf("unknown migration phase: %s", computed.migrationPhase)
}

if computed.filterMaximumIDCount == 0 {
computed.filterMaximumIDCount = 100
log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100")
}

return computed, nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/dispatch/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/authzed/spicedb/internal/dispatch/caching"
"github.com/authzed/spicedb/internal/dispatch/graph"
"github.com/authzed/spicedb/internal/dispatch/keys"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/cache"
)

Expand Down Expand Up @@ -74,6 +75,11 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp
fn(&opts)
}

chunkSize := opts.dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("ClusterDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}
clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits, opts.dispatchChunkSize)

if opts.prometheusSubsystem == "" {
Expand Down
1 change: 1 addition & 0 deletions internal/dispatch/combined/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
chunkSize := opts.dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("CombinedDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}
redispatch := graph.NewDispatcher(cachingRedispatch, opts.concurrencyLimits, chunkSize)
redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{})
Expand Down
30 changes: 20 additions & 10 deletions internal/dispatch/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,18 @@ func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispa
d := &localDispatcher{}

concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
chunkSize := dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("LocalOnlyDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}

d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, dispatchChunkSize)
d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, chunkSize)
d.expander = graph.NewConcurrentExpander(d)
d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, dispatchChunkSize)
d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, dispatchChunkSize)
d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, dispatchChunkSize)
d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, dispatchChunkSize)
d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, chunkSize)
d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, chunkSize)
d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, chunkSize)
d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, chunkSize)

return d
}
Expand All @@ -103,13 +108,18 @@ func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispa
// the provided redispatcher.
func NewDispatcher(redispatcher dispatch.Dispatcher, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher {
concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
chunkSize := dispatchChunkSize
if chunkSize == 0 {
chunkSize = 100
log.Warn().Msgf("Dispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
}

checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, dispatchChunkSize)
checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, chunkSize)
expander := graph.NewConcurrentExpander(redispatcher)
reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, dispatchChunkSize)
lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize)
lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, dispatchChunkSize)
lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize)
reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, chunkSize)
lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, chunkSize)
lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, chunkSize)
lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, chunkSize)

return &localDispatcher{
checker: checker,
Expand Down
10 changes: 9 additions & 1 deletion internal/services/v1/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis
config.StreamReadTimeout = streamReadTimeoutFallbackSeconds * time.Second
}

chunkSize := permServerConfig.DispatchChunkSize
if chunkSize == 0 {
log.
Warn().
Msg("experimental server config specified invalid DispatchChunkSize, defaulting to 100")
chunkSize = 100
}

return &experimentalServer{
WithServiceSpecificInterceptors: shared.WithServiceSpecificInterceptors{
Unary: middleware.ChainUnaryServer(
Expand All @@ -98,7 +106,7 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis
maxCaveatContextSize: permServerConfig.MaxCaveatContextSize,
maxConcurrency: config.BulkCheckMaxConcurrency,
dispatch: dispatch,
dispatchChunkSize: permServerConfig.DispatchChunkSize,
dispatchChunkSize: chunkSize,
},
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/genutil/slicez/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func ForEachChunk[T any](data []T, chunkSize uint16, handler func(items []T)) {

func ForEachChunkUntil[T any](data []T, chunkSize uint16, handler func(items []T) (bool, error)) (bool, error) {
if chunkSize == 0 {
logging.Warn().Int("invalid-chunk-size", int(chunkSize)).Msg("ForEachChunk got an invalid chunk size; defaulting to 1")
chunkSize = 1
logging.Warn().Int("invalid-chunk-size", int(chunkSize)).Msg("ForEachChunk got an invalid chunk size; defaulting to 100")
chunkSize = 100
}

dataLength := uint64(len(data))
Expand Down

0 comments on commit a657d62

Please sign in to comment.