Skip to content

Commit

Permalink
Merge pull request #1886 from authzed/refactor-bulk-export-logic
Browse files Browse the repository at this point in the history
refactor bulk export relationships logic
  • Loading branch information
vroldanbet authored May 3, 2024
2 parents 1af8fb1 + 394cc8f commit f391921
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/security.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ jobs:
- name: "Obtain container image to scan"
run: 'echo "IMAGE_VERSION=$(jq .version dist/linux_amd64/metadata.json --raw-output)" >> $GITHUB_ENV'
- name: "run trivy on release image"
run: "docker run -v /var/run/docker.sock:/var/run/docker.sock aquasec/trivy image --format table --exit-code 1 --ignore-unfixed --vuln-type os,library --no-progress --severity CRITICAL,HIGH,MEDIUM authzed/spicedb:v${{ env.IMAGE_VERSION }}-amd64"
run: "docker run -v /var/run/docker.sock:/var/run/docker.sock aquasec/trivy:0.50.4 image --format table --exit-code 1 --ignore-unfixed --vuln-type os,library --no-progress --severity CRITICAL,HIGH,MEDIUM authzed/spicedb:v${{ env.IMAGE_VERSION }}-amd64"
4 changes: 4 additions & 0 deletions internal/services/shared/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type ConfigForErrors struct {
MaximumAPIDepth uint32
}

func RewriteErrorWithoutConfig(ctx context.Context, err error) error {
return RewriteError(ctx, err, nil)
}

func RewriteError(ctx context.Context, err error, config *ConfigForErrors) error {
// Check if the error can be directly used.
if _, ok := status.FromError(err); ok {
Expand Down
71 changes: 35 additions & 36 deletions internal/services/v1/experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,30 @@ import (
"strings"
"time"

"github.com/authzed/spicedb/pkg/cursor"
"github.com/authzed/spicedb/pkg/datastore"
dsoptions "github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
dispatchv1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
implv1 "github.com/authzed/spicedb/pkg/proto/impl/v1"
"github.com/authzed/spicedb/pkg/tuple"
"github.com/authzed/spicedb/pkg/typesystem"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
grpcvalidate "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
"github.com/samber/lo"

"github.com/authzed/spicedb/internal/dispatch"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/internal/middleware"
"github.com/authzed/spicedb/internal/middleware/consistency"
datastoremw "github.com/authzed/spicedb/internal/middleware/datastore"
"github.com/authzed/spicedb/internal/middleware/handwrittenvalidation"
"github.com/authzed/spicedb/internal/middleware/streamtimeout"
"github.com/authzed/spicedb/internal/middleware/usagemetrics"
"github.com/authzed/spicedb/internal/relationships"
"github.com/authzed/spicedb/internal/services/shared"
"github.com/authzed/spicedb/internal/services/v1/options"
"github.com/authzed/spicedb/pkg/cursor"
"github.com/authzed/spicedb/pkg/datastore"
dsoptions "github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/middleware/consistency"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
dispatchv1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
implv1 "github.com/authzed/spicedb/pkg/proto/impl/v1"
"github.com/authzed/spicedb/pkg/tuple"
"github.com/authzed/spicedb/pkg/typesystem"
)

const (
Expand Down Expand Up @@ -190,10 +191,6 @@ func extractBatchNewReferencedNamespacesAndCaveats(
return lo.Keys(newNamespaces), lo.Keys(newCaveats)
}

func (es *experimentalServer) rewriteError(ctx context.Context, err error) error {
return shared.RewriteError(ctx, err, nil)
}

func (es *experimentalServer) BulkImportRelationships(stream v1.ExperimentalService_BulkImportRelationshipsServer) error {
ds := datastoremw.MustFromContext(stream.Context())

Expand Down Expand Up @@ -253,7 +250,7 @@ func (es *experimentalServer) BulkImportRelationships(stream v1.ExperimentalServ

return err
}, dsoptions.WithDisableRetries(true)); err != nil {
return es.rewriteError(stream.Context(), err)
return shared.RewriteErrorWithoutConfig(stream.Context(), err)
}

usagemetrics.SetInContext(stream.Context(), &dispatchv1.ResponseMeta{
Expand All @@ -270,36 +267,39 @@ func (es *experimentalServer) BulkExportRelationships(
req *v1.BulkExportRelationshipsRequest,
resp v1.ExperimentalService_BulkExportRelationshipsServer,
) error {
if req.OptionalLimit > 0 && uint64(req.OptionalLimit) > es.maxBatchSize {
return es.rewriteError(resp.Context(), NewExceedsMaximumLimitErr(uint64(req.OptionalLimit), es.maxBatchSize))
ctx := resp.Context()
atRevision, _, err := consistency.RevisionFromContext(ctx)
if err != nil {
return shared.RewriteErrorWithoutConfig(ctx, err)
}

ctx := resp.Context()
ds := datastoremw.MustFromContext(ctx)
return BulkExport(ctx, datastoremw.MustFromContext(ctx), es.maxBatchSize, req, atRevision, resp.Send)
}

var atRevision datastore.Revision
// BulkExport implements the BulkExportRelationships API functionality. Given a datastore.Datastore, it will
// export stream via the sender all relationships matched by the incoming request.
// If no cursor is provided, it will fallback to the provided revision.
func BulkExport(ctx context.Context, ds datastore.Datastore, batchSize uint64, req *v1.BulkExportRelationshipsRequest, fallbackRevision datastore.Revision, sender func(response *v1.BulkExportRelationshipsResponse) error) error {
if req.OptionalLimit > 0 && uint64(req.OptionalLimit) > batchSize {
return shared.RewriteErrorWithoutConfig(ctx, NewExceedsMaximumLimitErr(uint64(req.OptionalLimit), batchSize))
}

atRevision := fallbackRevision
var curNamespace string
var cur dsoptions.Cursor

if req.OptionalCursor != nil {
var err error
atRevision, curNamespace, cur, err = decodeCursor(ds, req.OptionalCursor)
if err != nil {
return es.rewriteError(ctx, err)
}
} else {
var err error
atRevision, _, err = consistency.RevisionFromContext(ctx)
if err != nil {
return es.rewriteError(ctx, err)
return shared.RewriteErrorWithoutConfig(ctx, err)
}
}

reader := ds.SnapshotReader(atRevision)

namespaces, err := reader.ListAllNamespaces(ctx)
if err != nil {
return es.rewriteError(ctx, err)
return shared.RewriteErrorWithoutConfig(ctx, err)
}

// Make sure the namespaces are always in a stable order
Expand All @@ -315,7 +315,7 @@ func (es *experimentalServer) BulkExportRelationships(
namespaces = namespaces[1:]
}

limit := es.defaultBatchSize
limit := batchSize
if req.OptionalLimit > 0 {
limit = uint64(req.OptionalLimit)
}
Expand Down Expand Up @@ -354,7 +354,7 @@ func (es *experimentalServer) BulkExportRelationships(
if req.OptionalRelationshipFilter != nil {
rf, err := datastore.RelationshipsFilterFromPublicFilter(req.OptionalRelationshipFilter)
if err != nil {
return es.rewriteError(ctx, err)
return shared.RewriteErrorWithoutConfig(ctx, err)
}

// Overload the namespace name with the one from the request, because each iteration is for a different namespace.
Expand Down Expand Up @@ -385,7 +385,7 @@ func (es *experimentalServer) BulkExportRelationships(
dsoptions.WithSort(dsoptions.ByResource),
)
if err != nil {
return es.rewriteError(ctx, err)
return shared.RewriteErrorWithoutConfig(ctx, err)
}

if len(rels) == 0 {
Expand All @@ -404,18 +404,17 @@ func (es *experimentalServer) BulkExportRelationships(
},
})
if err != nil {
return es.rewriteError(ctx, err)
return shared.RewriteErrorWithoutConfig(ctx, err)
}

if err := resp.Send(&v1.BulkExportRelationshipsResponse{
if err := sender(&v1.BulkExportRelationshipsResponse{
AfterResultCursor: encoded,
Relationships: rels,
}); err != nil {
return es.rewriteError(ctx, err)
return shared.RewriteErrorWithoutConfig(ctx, err)
}
}
}

return nil
}

Expand All @@ -425,7 +424,7 @@ func (es *experimentalServer) BulkCheckPermission(ctx context.Context, req *v1.B
convertedReq := toCheckBulkPermissionsRequest(req)
res, err := es.bulkChecker.checkBulkPermissions(ctx, convertedReq)
if err != nil {
return nil, es.rewriteError(ctx, err)
return nil, shared.RewriteErrorWithoutConfig(ctx, err)
}

return toBulkCheckPermissionResponse(res), nil
Expand Down

0 comments on commit f391921

Please sign in to comment.