From a657d62a9dd673edd9a05a867bfff21418f6845e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Thu, 25 Jul 2024 12:47:49 +0100 Subject: [PATCH] additional dispatch chunk safeguards the parameter is wired into various parts of the application and leaving it unset can lead to runtime errors --- internal/datastore/common/sql.go | 6 ++++++ internal/datastore/crdb/options.go | 6 ++++++ internal/datastore/mysql/options.go | 7 ++++++ internal/datastore/postgres/options.go | 6 ++++++ internal/datastore/spanner/options.go | 7 ++++++ internal/dispatch/cluster/cluster.go | 6 ++++++ internal/dispatch/combined/combined.go | 1 + internal/dispatch/graph/graph.go | 30 +++++++++++++++++--------- internal/services/v1/experimental.go | 10 ++++++++- pkg/genutil/slicez/chunking.go | 4 ++-- 10 files changed, 70 insertions(+), 13 deletions(-) diff --git a/internal/datastore/common/sql.go b/internal/datastore/common/sql.go index c3530da7d3..85288178df 100644 --- a/internal/datastore/common/sql.go +++ b/internal/datastore/common/sql.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -107,6 +108,11 @@ type SchemaQueryFilterer struct { // NewSchemaQueryFilterer creates a new SchemaQueryFilterer object. func NewSchemaQueryFilterer(schema SchemaInformation, initialQuery sq.SelectBuilder, filterMaximumIDCount uint16) SchemaQueryFilterer { + if filterMaximumIDCount == 0 { + filterMaximumIDCount = 100 + log.Warn().Msg("SchemaQueryFilterer: filterMaximumIDCount not set, defaulting to 100") + } + return SchemaQueryFilterer{ schema: schema, queryBuilder: initialQuery, diff --git a/internal/datastore/crdb/options.go b/internal/datastore/crdb/options.go index 16cc330997..7e64327ae0 100644 --- a/internal/datastore/crdb/options.go +++ b/internal/datastore/crdb/options.go @@ -5,6 +5,7 @@ import ( "time" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + log "github.com/authzed/spicedb/internal/logging" ) type crdbOptions struct { @@ -85,6 +86,11 @@ func generateConfig(options []Option) (crdbOptions, error) { ) } + if computed.filterMaximumIDCount == 0 { + computed.filterMaximumIDCount = 100 + log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100") + } + return computed, nil } diff --git a/internal/datastore/mysql/options.go b/internal/datastore/mysql/options.go index 440836e88e..dbdf434975 100644 --- a/internal/datastore/mysql/options.go +++ b/internal/datastore/mysql/options.go @@ -3,6 +3,8 @@ package mysql import ( "fmt" "time" + + log "github.com/authzed/spicedb/internal/logging" ) const ( @@ -82,6 +84,11 @@ func generateConfig(options []Option) (mysqlOptions, error) { ) } + if computed.filterMaximumIDCount == 0 { + computed.filterMaximumIDCount = 100 + log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100") + } + return computed, nil } diff --git a/internal/datastore/postgres/options.go b/internal/datastore/postgres/options.go index 54c8208d41..a3f4eb711a 100644 --- a/internal/datastore/postgres/options.go +++ b/internal/datastore/postgres/options.go @@ -5,6 +5,7 @@ import ( "time" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + log "github.com/authzed/spicedb/internal/logging" ) type postgresOptions struct { @@ -106,6 +107,11 @@ func generateConfig(options []Option) (postgresOptions, error) { return computed, fmt.Errorf("unknown migration phase: %s", computed.migrationPhase) } + if computed.filterMaximumIDCount == 0 { + computed.filterMaximumIDCount = 100 + log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100") + } + return computed, nil } diff --git a/internal/datastore/spanner/options.go b/internal/datastore/spanner/options.go index e18fe4cb93..c0cc8e3922 100644 --- a/internal/datastore/spanner/options.go +++ b/internal/datastore/spanner/options.go @@ -5,6 +5,8 @@ import ( "math" "runtime" "time" + + log "github.com/authzed/spicedb/internal/logging" ) type spannerOptions struct { @@ -88,6 +90,11 @@ func generateConfig(options []Option) (spannerOptions, error) { return computed, fmt.Errorf("unknown migration phase: %s", computed.migrationPhase) } + if computed.filterMaximumIDCount == 0 { + computed.filterMaximumIDCount = 100 + log.Warn().Msg("filterMaximumIDCount not set, defaulting to 100") + } + return computed, nil } diff --git a/internal/dispatch/cluster/cluster.go b/internal/dispatch/cluster/cluster.go index 49e45445fb..054b147c33 100644 --- a/internal/dispatch/cluster/cluster.go +++ b/internal/dispatch/cluster/cluster.go @@ -7,6 +7,7 @@ import ( "github.com/authzed/spicedb/internal/dispatch/caching" "github.com/authzed/spicedb/internal/dispatch/graph" "github.com/authzed/spicedb/internal/dispatch/keys" + log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/cache" ) @@ -74,6 +75,11 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp fn(&opts) } + chunkSize := opts.dispatchChunkSize + if chunkSize == 0 { + chunkSize = 100 + log.Warn().Msgf("ClusterDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize) + } clusterDispatch := graph.NewDispatcher(dispatch, opts.concurrencyLimits, opts.dispatchChunkSize) if opts.prometheusSubsystem == "" { diff --git a/internal/dispatch/combined/combined.go b/internal/dispatch/combined/combined.go index 0debe210a4..537416c76c 100644 --- a/internal/dispatch/combined/combined.go +++ b/internal/dispatch/combined/combined.go @@ -153,6 +153,7 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) { chunkSize := opts.dispatchChunkSize if chunkSize == 0 { chunkSize = 100 + log.Warn().Msgf("CombinedDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize) } redispatch := graph.NewDispatcher(cachingRedispatch, opts.concurrencyLimits, chunkSize) redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{}) diff --git a/internal/dispatch/graph/graph.go b/internal/dispatch/graph/graph.go index 0623467037..135146369b 100644 --- a/internal/dispatch/graph/graph.go +++ b/internal/dispatch/graph/graph.go @@ -88,13 +88,18 @@ func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispa d := &localDispatcher{} concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit) + chunkSize := dispatchChunkSize + if chunkSize == 0 { + chunkSize = 100 + log.Warn().Msgf("LocalOnlyDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize) + } - d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, dispatchChunkSize) + d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, chunkSize) d.expander = graph.NewConcurrentExpander(d) - d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, dispatchChunkSize) - d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, dispatchChunkSize) - d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, dispatchChunkSize) - d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, dispatchChunkSize) + d.reachableResourcesHandler = graph.NewCursoredReachableResources(d, concurrencyLimits.ReachableResources, chunkSize) + d.lookupResourcesHandler = graph.NewCursoredLookupResources(d, d, concurrencyLimits.LookupResources, chunkSize) + d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, chunkSize) + d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, concurrencyLimits.LookupResources, chunkSize) return d } @@ -103,13 +108,18 @@ func NewLocalOnlyDispatcherWithLimits(concurrencyLimits ConcurrencyLimits, dispa // the provided redispatcher. func NewDispatcher(redispatcher dispatch.Dispatcher, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher { concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit) + chunkSize := dispatchChunkSize + if chunkSize == 0 { + chunkSize = 100 + log.Warn().Msgf("Dispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize) + } - checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, dispatchChunkSize) + checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, chunkSize) expander := graph.NewConcurrentExpander(redispatcher) - reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, dispatchChunkSize) - lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize) - lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, dispatchChunkSize) - lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, dispatchChunkSize) + reachableResourcesHandler := graph.NewCursoredReachableResources(redispatcher, concurrencyLimits.ReachableResources, chunkSize) + lookupResourcesHandler := graph.NewCursoredLookupResources(redispatcher, redispatcher, concurrencyLimits.LookupResources, chunkSize) + lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, chunkSize) + lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, concurrencyLimits.LookupResources, chunkSize) return &localDispatcher{ checker: checker, diff --git a/internal/services/v1/experimental.go b/internal/services/v1/experimental.go index 19036a11cf..cf5966544f 100644 --- a/internal/services/v1/experimental.go +++ b/internal/services/v1/experimental.go @@ -77,6 +77,14 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis config.StreamReadTimeout = streamReadTimeoutFallbackSeconds * time.Second } + chunkSize := permServerConfig.DispatchChunkSize + if chunkSize == 0 { + log. + Warn(). + Msg("experimental server config specified invalid DispatchChunkSize, defaulting to 100") + chunkSize = 100 + } + return &experimentalServer{ WithServiceSpecificInterceptors: shared.WithServiceSpecificInterceptors{ Unary: middleware.ChainUnaryServer( @@ -98,7 +106,7 @@ func NewExperimentalServer(dispatch dispatch.Dispatcher, permServerConfig Permis maxCaveatContextSize: permServerConfig.MaxCaveatContextSize, maxConcurrency: config.BulkCheckMaxConcurrency, dispatch: dispatch, - dispatchChunkSize: permServerConfig.DispatchChunkSize, + dispatchChunkSize: chunkSize, }, } } diff --git a/pkg/genutil/slicez/chunking.go b/pkg/genutil/slicez/chunking.go index 2e59bf3c08..bdff41cb1e 100644 --- a/pkg/genutil/slicez/chunking.go +++ b/pkg/genutil/slicez/chunking.go @@ -14,8 +14,8 @@ func ForEachChunk[T any](data []T, chunkSize uint16, handler func(items []T)) { func ForEachChunkUntil[T any](data []T, chunkSize uint16, handler func(items []T) (bool, error)) (bool, error) { if chunkSize == 0 { - logging.Warn().Int("invalid-chunk-size", int(chunkSize)).Msg("ForEachChunk got an invalid chunk size; defaulting to 1") - chunkSize = 1 + logging.Warn().Int("invalid-chunk-size", int(chunkSize)).Msg("ForEachChunk got an invalid chunk size; defaulting to 100") + chunkSize = 100 } dataLength := uint64(len(data))