Skip to content

Commit

Permalink
Merge pull request #2069 from josephschorr/lr-secondary-dispatch
Browse files Browse the repository at this point in the history
Add support for secondary dispatching on LR2
  • Loading branch information
josephschorr authored Sep 18, 2024
2 parents 49c9652 + 543e6e0 commit 26a2a2d
Show file tree
Hide file tree
Showing 2 changed files with 442 additions and 33 deletions.
180 changes: 148 additions & 32 deletions internal/dispatch/remote/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,150 @@ func dispatchRequest[Q requestMessage, S responseMessage](ctx context.Context, c
return *new(S), foundError
}

type requestMessageWithCursor interface {
requestMessage
GetOptionalCursor() *v1.Cursor
}

type responseMessageWithCursor interface {
responseMessage
GetAfterResponseCursor() *v1.Cursor
}

type receiver[S responseMessage] interface {
Recv() (S, error)
grpc.ClientStream
}

const (
secondaryCursorPrefix = "$$secondary:"
primaryDispatcher = ""
)

func publishClient[Q requestMessageWithCursor, R responseMessageWithCursor](ctx context.Context, client receiver[R], stream dispatch.Stream[R], secondaryDispatchName string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()

default:
result, err := client.Recv()
if errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return err
}

merr := adjustMetadataForDispatch(result.GetMetadata())
if merr != nil {
return merr
}

if secondaryDispatchName != primaryDispatcher {
afterResponseCursor := result.GetAfterResponseCursor()
if afterResponseCursor == nil {
return spiceerrors.MustBugf("received a nil after response cursor for secondary dispatch")
}
afterResponseCursor.Sections = append([]string{secondaryCursorPrefix + secondaryDispatchName}, afterResponseCursor.Sections...)
}

serr := stream.Publish(result)
if serr != nil {
return serr
}
}
}
}

// dispatchStreamingRequest handles the dispatching of a streaming request to the primary and any
// secondary dispatchers. Unlike the non-streaming version, this will first attempt to dispatch
// from the allowed secondary dispatchers before falling back to the primary, rather than running
// them in parallel.
func dispatchStreamingRequest[Q requestMessageWithCursor, R responseMessageWithCursor](
ctx context.Context,
cr *clusterDispatcher,
reqKey string,
req Q,
stream dispatch.Stream[R],
handler func(context.Context, ClusterClient) (receiver[R], error),
) error {
withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()

client, err := handler(withTimeout, cr.clusterClient)
if err != nil {
return err
}

// Check the cursor to see if the dispatch went to one of the secondary endpoints.
cursor := req.GetOptionalCursor()
cursorLockedSecondaryName := ""
if cursor != nil && len(cursor.Sections) > 0 {
if strings.HasPrefix(cursor.Sections[0], secondaryCursorPrefix) {
cursorLockedSecondaryName = strings.TrimPrefix(cursor.Sections[0], secondaryCursorPrefix)
cursor.Sections = cursor.Sections[1:]
}
}

// If no secondary dispatches are defined, just invoke directly.
if len(cr.secondaryDispatchExprs) == 0 || len(cr.secondaryDispatch) == 0 {
return publishClient[Q](withTimeout, client, stream, primaryDispatcher)
}

// If the cursor is locked to a known secondary, dispatch to it.
if cursorLockedSecondaryName != "" {
secondary, ok := cr.secondaryDispatch[cursorLockedSecondaryName]
if ok {
secondaryClient, err := handler(withTimeout, secondary.Client)
if err != nil {
return err
}

log.Debug().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher based on cursor")
return publishClient[Q](withTimeout, secondaryClient, stream, cursorLockedSecondaryName)
}

return fmt.Errorf("unknown secondary dispatcher in cursor: %s", cursorLockedSecondaryName)
}

// Otherwise, look for a matching expression for the initial secondary dispatch
// and, if present, try to dispatch to it.
expr, ok := cr.secondaryDispatchExprs[reqKey]
if !ok {
return publishClient[Q](withTimeout, client, stream, primaryDispatcher)
}

result, err := RunDispatchExpr(expr, req)
if err != nil {
log.Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
}

for _, secondaryDispatchName := range result {
secondary, ok := cr.secondaryDispatch[secondaryDispatchName]
if !ok {
log.Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
continue
}

log.Trace().Str("secondary-dispatcher", secondary.Name).Object("request", req).Msg("running secondary dispatcher")
secondaryClient, err := handler(withTimeout, secondary.Client)
if err != nil {
log.Warn().Str("secondary-dispatcher", secondary.Name).Err(err).Msg("failed to create secondary dispatch client")
continue
}

if err := publishClient[Q](withTimeout, secondaryClient, stream, secondaryDispatchName); err != nil {
log.Warn().Str("secondary-dispatcher", secondary.Name).Err(err).Msg("failed to publish secondary dispatch response")
continue
}

return nil
}

// Fallback: use the primary client if no secondary matched.
return publishClient[Q](withTimeout, client, stream, primaryDispatcher)
}

func adjustMetadataForDispatch(metadata *v1.ResponseMeta) error {
if metadata == nil {
return spiceerrors.MustBugf("received a nil metadata")
Expand Down Expand Up @@ -370,38 +514,10 @@ func (cr *clusterDispatcher) DispatchLookupResources2(
return err
}

withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()

client, err := cr.clusterClient.DispatchLookupResources2(withTimeout, req)
if err != nil {
return err
}

for {
select {
case <-withTimeout.Done():
return withTimeout.Err()

default:
result, err := client.Recv()
if errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return err
}

merr := adjustMetadataForDispatch(result.Metadata)
if merr != nil {
return merr
}

serr := stream.Publish(result)
if serr != nil {
return serr
}
}
}
return dispatchStreamingRequest(ctx, cr, "lookupresources", req, stream,
func(ctx context.Context, client ClusterClient) (receiver[*v1.DispatchLookupResources2Response], error) {
return client.DispatchLookupResources2(ctx, req)
})
}

func (cr *clusterDispatcher) DispatchLookupSubjects(
Expand Down
Loading

0 comments on commit 26a2a2d

Please sign in to comment.