Skip to content

Commit

Permalink
Ensure source is returned for all check debug traces
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Jan 8, 2025
1 parent c18b165 commit cc6a261
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 28 deletions.
8 changes: 8 additions & 0 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (

"github.com/authzed/spicedb/internal/dispatch"
"github.com/authzed/spicedb/internal/dispatch/keys"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/cache"
"github.com/authzed/spicedb/pkg/middleware/nodeid"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
)

Expand Down Expand Up @@ -182,11 +184,17 @@ func (cd *Dispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRe
cd.checkFromCacheCounter.Inc()
// If debugging is requested, add the req and the response to the trace.
if req.Debug == v1.DispatchCheckRequest_ENABLE_BASIC_DEBUGGING {
nodeID, err := nodeid.FromContext(ctx)
if err != nil {
log.Err(err).Msg("failed to get nodeID from context")
}

response.Metadata.DebugInfo = &v1.DebugInformation{
Check: &v1.CheckDebugTrace{
Request: req,
Results: maps.Clone(response.ResultsByResourceId),
IsCachedResult: true,
SourceId: nodeID,
},
}
}
Expand Down
8 changes: 7 additions & 1 deletion internal/dispatch/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,18 @@ func (ld *localDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCh
}

// NOTE: we return debug information here to ensure tooling can see the cycle.
nodeID, nerr := nodeid.FromContext(ctx)
if nerr != nil {
log.Err(nerr).Msg("failed to get nodeID from context")
}

return &v1.DispatchCheckResponse{
Metadata: &v1.ResponseMeta{
DispatchCount: 0,
DebugInfo: &v1.DebugInformation{
Check: &v1.CheckDebugTrace{
Request: req,
Request: req,
SourceId: nodeID,
},
},
},
Expand Down
28 changes: 17 additions & 11 deletions internal/graph/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func checkIntersectionTupleToUserset(
}

resultsByDispatchedSubject[result.relationType].UnionWith(result.Resp.ResultsByResourceId)
combinedMetadata = combineResponseMetadata(combinedMetadata, result.Resp.Metadata)
combinedMetadata = combineResponseMetadata(ctx, combinedMetadata, result.Resp.Metadata)
}

// For each resource ID, check that there exist some sort of permission for *each* subject. If not, then the
Expand Down Expand Up @@ -884,11 +884,11 @@ func checkTupleToUserset[T relation](
), hintsToReturn)
}

func withDistinctMetadata(result CheckResult) CheckResult {
func withDistinctMetadata(ctx context.Context, result CheckResult) CheckResult {
// NOTE: This is necessary to ensure unique debug information on the request and that debug
// information from the child metadata is *not* copied over.
clonedResp := result.Resp.CloneVT()
clonedResp.Metadata = combineResponseMetadata(emptyMetadata, clonedResp.Metadata)
clonedResp.Metadata = combineResponseMetadata(ctx, emptyMetadata, clonedResp.Metadata)
return CheckResult{
Resp: clonedResp,
Err: result.Err,
Expand Down Expand Up @@ -944,7 +944,7 @@ func union[T any](
}

if len(children) == 1 {
return withDistinctMetadata(handler(ctx, crc, children[0]))
return withDistinctMetadata(ctx, handler(ctx, crc, children[0]))
}

resultChan := make(chan CheckResult, len(children))
Expand All @@ -959,7 +959,7 @@ func union[T any](
select {
case result := <-resultChan:
log.Ctx(ctx).Trace().Object("anyResult", result.Resp).Send()
responseMetadata = combineResponseMetadata(responseMetadata, result.Resp.Metadata)
responseMetadata = combineResponseMetadata(ctx, responseMetadata, result.Resp.Metadata)
if result.Err != nil {
return checkResultError(result.Err, responseMetadata)
}
Expand Down Expand Up @@ -991,7 +991,7 @@ func all[T any](
}

if len(children) == 1 {
return withDistinctMetadata(handler(ctx, crc, children[0]))
return withDistinctMetadata(ctx, handler(ctx, crc, children[0]))
}

responseMetadata := emptyMetadata
Expand All @@ -1010,7 +1010,7 @@ func all[T any](
for i := 0; i < len(children); i++ {
select {
case result := <-resultChan:
responseMetadata = combineResponseMetadata(responseMetadata, result.Resp.Metadata)
responseMetadata = combineResponseMetadata(ctx, responseMetadata, result.Resp.Metadata)
if result.Err != nil {
return checkResultError(result.Err, responseMetadata)
}
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func difference[T any](
// Wait for the base set to return.
select {
case base := <-baseChan:
responseMetadata = combineResponseMetadata(responseMetadata, base.Resp.Metadata)
responseMetadata = combineResponseMetadata(ctx, responseMetadata, base.Resp.Metadata)

if base.Err != nil {
return checkResultError(base.Err, responseMetadata)
Expand All @@ -1096,7 +1096,7 @@ func difference[T any](
for i := 1; i < len(children); i++ {
select {
case sub := <-othersChan:
responseMetadata = combineResponseMetadata(responseMetadata, sub.Resp.Metadata)
responseMetadata = combineResponseMetadata(ctx, responseMetadata, sub.Resp.Metadata)

if sub.Err != nil {
return checkResultError(sub.Err, responseMetadata)
Expand Down Expand Up @@ -1196,7 +1196,7 @@ func combineResultWithFoundResources(result CheckResult, foundResources *Members
}
}

func combineResponseMetadata(existing *v1.ResponseMeta, responseMetadata *v1.ResponseMeta) *v1.ResponseMeta {
func combineResponseMetadata(ctx context.Context, existing *v1.ResponseMeta, responseMetadata *v1.ResponseMeta) *v1.ResponseMeta {
combined := &v1.ResponseMeta{
DispatchCount: existing.DispatchCount + responseMetadata.DispatchCount,
DepthRequired: max(existing.DepthRequired, responseMetadata.DepthRequired),
Expand All @@ -1207,9 +1207,15 @@ func combineResponseMetadata(existing *v1.ResponseMeta, responseMetadata *v1.Res
return combined
}

nodeID, err := nodeid.FromContext(ctx)
if err != nil {
log.Err(err).Msg("failed to get nodeID from context")
}

debugInfo := &v1.DebugInformation{
Check: &v1.CheckDebugTrace{
TraceId: uuid.NewString(),
TraceId: uuid.NewString(),
SourceId: nodeID,
},
}

Expand Down
2 changes: 1 addition & 1 deletion internal/graph/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func expandSetOperation(
for _, resultChan := range resultChans {
select {
case result := <-resultChan:
responseMetadata = combineResponseMetadata(responseMetadata, result.Resp.Metadata)
responseMetadata = combineResponseMetadata(ctx, responseMetadata, result.Resp.Metadata)
if result.Err != nil {
return expandResultError(result.Err, responseMetadata)
}
Expand Down
22 changes: 11 additions & 11 deletions internal/graph/lookupsubjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func lookupViaIntersectionTupleToUserset(
results := datasets.NewSubjectSet()
collectedMetadata := emptyMetadata
for _, result := range collectingStream.Results() {
collectedMetadata = combineResponseMetadata(collectedMetadata, result.Metadata)
collectedMetadata = combineResponseMetadata(ctx, collectedMetadata, result.Metadata)
for _, foundSubjects := range result.FoundSubjectsByResourceId {
if err := results.UnionWith(foundSubjects.FoundSubjects); err != nil {
return fmt.Errorf("failed to UnionWith under lookupSubjectsIntersection: %w", err)
Expand All @@ -327,7 +327,7 @@ func lookupViaIntersectionTupleToUserset(
dispatchInfoForResource.lock.Lock()
defer dispatchInfoForResource.lock.Unlock()

dispatchInfoForResource.metadata = combineResponseMetadata(dispatchInfoForResource.metadata, collectedMetadata)
dispatchInfoForResource.metadata = combineResponseMetadata(ctx, dispatchInfoForResource.metadata, collectedMetadata)

// If the first update for the resource, set the subjects set to the results.
if dispatchInfoForResource.isFirstUpdate {
Expand Down Expand Up @@ -365,7 +365,7 @@ func lookupViaIntersectionTupleToUserset(
currentSubjects = currentSubjects.WithParentCaveatExpression(ttuCaveat)
currentSubjectsByResourceID[incomingResourceID] = currentSubjects.AsFoundSubjects()

metadata = combineResponseMetadata(metadata, tracker.metadata)
metadata = combineResponseMetadata(ctx, metadata, tracker.metadata)
}

return parentStream.Publish(&v1.DispatchLookupSubjectsResponse{
Expand Down Expand Up @@ -517,7 +517,7 @@ func (cl *ConcurrentLookupSubjects) lookupSetOperation(
return err
}

return reducer.CompletedChildOperations()
return reducer.CompletedChildOperations(ctx)
}

func (cl *ConcurrentLookupSubjects) dispatchTo(
Expand Down Expand Up @@ -644,7 +644,7 @@ func combineFoundSubjects(existing *v1.FoundSubjects, toAdd *v1.FoundSubjects) (

type lookupSubjectsReducer interface {
ForIndex(ctx context.Context, setOperationIndex int) dispatch.LookupSubjectsStream
CompletedChildOperations() error
CompletedChildOperations(ctx context.Context) error
}

// Union
Expand All @@ -666,7 +666,7 @@ func (lsu *lookupSubjectsUnion) ForIndex(ctx context.Context, setOperationIndex
return collector
}

func (lsu *lookupSubjectsUnion) CompletedChildOperations() error {
func (lsu *lookupSubjectsUnion) CompletedChildOperations(ctx context.Context) error {
foundSubjects := datasets.NewSubjectSetByResourceID()
metadata := emptyMetadata

Expand All @@ -677,7 +677,7 @@ func (lsu *lookupSubjectsUnion) CompletedChildOperations() error {
}

for _, result := range collector.Results() {
metadata = combineResponseMetadata(metadata, result.Metadata)
metadata = combineResponseMetadata(ctx, metadata, result.Metadata)
if err := foundSubjects.UnionWith(result.FoundSubjectsByResourceId); err != nil {
return fmt.Errorf("failed to UnionWith under lookupSubjectsUnion: %w", err)
}
Expand Down Expand Up @@ -713,7 +713,7 @@ func (lsi *lookupSubjectsIntersection) ForIndex(ctx context.Context, setOperatio
return collector
}

func (lsi *lookupSubjectsIntersection) CompletedChildOperations() error {
func (lsi *lookupSubjectsIntersection) CompletedChildOperations(ctx context.Context) error {
var foundSubjects datasets.SubjectSetByResourceID
metadata := emptyMetadata

Expand All @@ -725,7 +725,7 @@ func (lsi *lookupSubjectsIntersection) CompletedChildOperations() error {

results := datasets.NewSubjectSetByResourceID()
for _, result := range collector.Results() {
metadata = combineResponseMetadata(metadata, result.Metadata)
metadata = combineResponseMetadata(ctx, metadata, result.Metadata)
if err := results.UnionWith(result.FoundSubjectsByResourceId); err != nil {
return fmt.Errorf("failed to UnionWith under lookupSubjectsIntersection: %w", err)
}
Expand Down Expand Up @@ -770,15 +770,15 @@ func (lse *lookupSubjectsExclusion) ForIndex(ctx context.Context, setOperationIn
return collector
}

func (lse *lookupSubjectsExclusion) CompletedChildOperations() error {
func (lse *lookupSubjectsExclusion) CompletedChildOperations(ctx context.Context) error {
var foundSubjects datasets.SubjectSetByResourceID
metadata := emptyMetadata

for index := 0; index < len(lse.collectors); index++ {
collector := lse.collectors[index]
results := datasets.NewSubjectSetByResourceID()
for _, result := range collector.Results() {
metadata = combineResponseMetadata(metadata, result.Metadata)
metadata = combineResponseMetadata(ctx, metadata, result.Metadata)
if err := results.UnionWith(result.FoundSubjectsByResourceId); err != nil {
return fmt.Errorf("failed to UnionWith under lookupSubjectsExclusion: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions internal/graph/lr2streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (rdc *checkAndDispatchRunner) runDispatch(
return err
}

if err := publishResultToParentStream(result, rdc.ci, responsePartialCursor, adjustedResources, nil, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
if err := publishResultToParentStream(ctx, result, rdc.ci, responsePartialCursor, adjustedResources, nil, isFirstPublishCall, checkMetadata, rdc.parentStream); err != nil {
return err
}
isFirstPublishCall = false
Expand Down Expand Up @@ -264,7 +264,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
default:
}

if err := publishResultToParentStream(result, ci, ci.responsePartialCursor(), foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
if err := publishResultToParentStream(ctx, result, ci, ci.responsePartialCursor(), foundResources, nil, isFirstPublishCall, emptyMetadata, parentStream); err != nil {
return err
}
isFirstPublishCall = false
Expand All @@ -277,6 +277,7 @@ func unfilteredLookupResourcesDispatchStreamForEntrypoint(
// publishResultToParentStream publishes the result of a lookup resources call to the parent stream,
// mapped via foundResources.
func publishResultToParentStream(
ctx context.Context,
result *v1.DispatchLookupResources2Response,
ci cursorInformation,
responseCursor *v1.Cursor,
Expand Down Expand Up @@ -309,7 +310,7 @@ func publishResultToParentStream(
metadata := result.Metadata
if isFirstPublishCall {
metadata = addCallToResponseMetadata(metadata)
metadata = combineResponseMetadata(metadata, additionalMetadata)
metadata = combineResponseMetadata(ctx, metadata, additionalMetadata)
} else {
metadata = addAdditionalDepthRequired(metadata)
}
Expand Down
1 change: 1 addition & 0 deletions internal/services/v1/bulkcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (bc *bulkChecker) checkBulkPermissions(ctx context.Context, req *v1.CheckBu
Results: localResults,
Duration: durationpb.New(time.Duration(0)),
TraceId: uuid.New().String(),
SourceId: debugInfo.Check.SourceId,
},
}

Expand Down
3 changes: 3 additions & 0 deletions internal/services/v1/permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func TestCheckPermissions(t *testing.T) {
require.Equal(tuple.V1StringObjectRef(tc.resource), tuple.V1StringObjectRef(debugInfo.Check.Resource))
require.Equal(tc.permission, debugInfo.Check.Permission)
require.Equal(tuple.V1StringSubjectRef(tc.subject), tuple.V1StringSubjectRef(debugInfo.Check.Subject))
require.NotEmpty(debugInfo.Check.Source, "source in debug trace is empty")
} else {
require.Nil(encodedDebugInfo)
}
Expand Down Expand Up @@ -2038,6 +2039,8 @@ func TestCheckBulkPermissions(t *testing.T) {
require.NotNil(t, pair.GetItem().DebugTrace, "missing debug trace in response for item %v", pair.GetItem())
require.True(t, pair.GetItem().DebugTrace.Check != nil, "missing check trace in response for item %v", pair.GetItem())
require.Equal(t, parsed.Resource.ObjectID, pair.GetItem().DebugTrace.Check.Resource.ObjectId, "resource in debug trace does not match")
require.NotEmpty(t, pair.GetItem().DebugTrace.Check.TraceOperationId, "trace operation ID in debug trace is empty")
require.NotEmpty(t, pair.GetItem().DebugTrace.Check.Source, "source in debug trace is empty")
}
}
} else {
Expand Down
4 changes: 3 additions & 1 deletion pkg/middleware/nodeid/nodeid.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func ContextWithHandle(ctx context.Context) context.Context {
func FromContext(ctx context.Context) (string, error) {
if c := ctx.Value(nodeIDKey); c != nil {
handle := c.(*nodeIDHandle)
return handle.nodeID, nil
if handle.nodeID != "" {
return handle.nodeID, nil
}
}

if defaultNodeID == "" {
Expand Down

0 comments on commit cc6a261

Please sign in to comment.