From 19f56470b9bbb93858472e1276c513e4df17ad48 Mon Sep 17 00:00:00 2001 From: Vadim Shtayura Date: Fri, 15 Nov 2024 23:33:40 +0000 Subject: [PATCH] [swarming] Support Swarming bot session in "dry run" mode. Bot session tokens are recognized, checked, and refreshed. But any errors are logged and ignored. Also bot dimensions are now fetched from datastore (eventually all bot RPC calls other than /bot/poll will fetch them from datastore: that way we can guarantee all of them use the same consistent set of dimensions). No new tests, because this is a temporary messy state. Test will be written for the final "clean" state. Also remove unused /swarming/api/v1/bot/rbe/ping. R=chanli@chromium.org BUG=b/355013282 Change-Id: I9460cb21975480a29f5718bc177d6f0460be21e9 Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-go/+/6024091 Reviewed-by: Chan Li Commit-Queue: Vadim Shtayura --- swarming/server/botapi/api.go | 1 + swarming/server/botsession/botsession.go | 4 + swarming/server/botsrv/botsrv.go | 187 ++++++++++++++--------- swarming/server/botsrv/botsrv_test.go | 5 + swarming/server/cmd/default/main.go | 63 +++----- swarming/server/rbe/session.go | 77 +++++++++- 6 files changed, 224 insertions(+), 113 deletions(-) diff --git a/swarming/server/botapi/api.go b/swarming/server/botapi/api.go index ac68ea0c98..8efb5c77a0 100644 --- a/swarming/server/botapi/api.go +++ b/swarming/server/botapi/api.go @@ -55,6 +55,7 @@ func NewBotAPIServer(cfg *cfg.Provider, project string) *BotAPIServer { // UnimplementedRequest is used as a placeholder in unimplemented handlers. type UnimplementedRequest struct{} +func (r *UnimplementedRequest) ExtractSession() []byte { return nil } func (r *UnimplementedRequest) ExtractPollToken() []byte { return nil } func (r *UnimplementedRequest) ExtractSessionToken() []byte { return nil } func (r *UnimplementedRequest) ExtractDimensions() map[string][]string { return nil } diff --git a/swarming/server/botsession/botsession.go b/swarming/server/botsession/botsession.go index 1443e3a5f4..5b67204b57 100644 --- a/swarming/server/botsession/botsession.go +++ b/swarming/server/botsession/botsession.go @@ -18,6 +18,7 @@ package botsession import ( "context" "fmt" + "time" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/encoding/prototext" @@ -31,6 +32,9 @@ import ( "go.chromium.org/luci/swarming/server/hmactoken" ) +// Expiry is how long a new Swarming session token will last. +const Expiry = time.Hour + // cryptoCtx is used whe signing and checking the token as a cryptographic // context (to make sure produced token can't be incorrectly used in other // protocols that use the same secret key). diff --git a/swarming/server/botsrv/botsrv.go b/swarming/server/botsrv/botsrv.go index ff726c5b36..f04bee9a49 100644 --- a/swarming/server/botsrv/botsrv.go +++ b/swarming/server/botsrv/botsrv.go @@ -42,6 +42,7 @@ import ( "go.chromium.org/luci/tokenserver/auth/machine" internalspb "go.chromium.org/luci/swarming/proto/internals" + "go.chromium.org/luci/swarming/server/botsession" "go.chromium.org/luci/swarming/server/cfg" "go.chromium.org/luci/swarming/server/hmactoken" "go.chromium.org/luci/swarming/server/pyproxy" @@ -51,19 +52,21 @@ import ( // RequestBody should be implemented by a JSON-serializable struct representing // format of some particular request. type RequestBody interface { + ExtractSession() []byte // the token with bot Session proto ExtractPollToken() []byte // the poll token, if present - ExtractSessionToken() []byte // the session token, if present + ExtractSessionToken() []byte // the RBE session token, if present ExtractDimensions() map[string][]string // dimensions reported by the bot, if present ExtractDebugRequest() any // serialized as JSON and logged on errors } // Request is extracted from an authenticated request from a bot. type Request struct { - BotID string // validated bot ID - SessionID string // validated RBE bot session ID, if present - SessionTokenExpired bool // true if the request has expired session token - PollState *internalspb.PollState // validated poll state - Dimensions map[string][]string // validated dimensions + BotID string // validated bot ID, TODO: delete (part of Session) + SessionID string // validated RBE bot session ID, if present, TODO: delete + SessionTokenExpired bool // true if the request has expired session token, TODO: delete + PollState *internalspb.PollState // validated poll state, TODO: delete + Dimensions map[string][]string // validated dimensions, TODO: delete + Session *internalspb.Session // the bot session from the session token } // Response is serialized as JSON and sent to the bot. @@ -78,16 +81,31 @@ type Response any // a gRPC error code that will be converted into an HTTP error. type Handler[B any] func(ctx context.Context, body *B, req *Request) (Response, error) +// KnownBotInfo is information about a bot registered in the datastore. +type KnownBotInfo struct { + // SessionID is the current bot session ID of this bot. + SessionID string + // Dimensions is "k:v" dimensions registered by this bot in the last poll. + Dimensions []string +} + +// KnownBotProvider knows how to return information about existing bots. +// +// Returns nil and no error if the bot is not registered in the datastore. All +// other errors can be considered transient. +type KnownBotProvider func(ctx context.Context, botID string) (*KnownBotInfo, error) + // Server knows how to authenticate bot requests and route them to handlers. type Server struct { router *router.Router middlewares router.MiddlewareChain hmacSecret *hmactoken.Secret cfg *cfg.Provider + knownBots KnownBotProvider } // New constructs new Server. -func New(ctx context.Context, cfg *cfg.Provider, r *router.Router, prx *pyproxy.Proxy, projectID string, hmacSecret *hmactoken.Secret) *Server { +func New(ctx context.Context, cfg *cfg.Provider, r *router.Router, prx *pyproxy.Proxy, bots KnownBotProvider, projectID string, hmacSecret *hmactoken.Secret) *Server { gaeAppDomain := fmt.Sprintf("%s.appspot.com", projectID) // Redirect to Python for eligible requests before hitting any other @@ -121,6 +139,7 @@ func New(ctx context.Context, cfg *cfg.Provider, r *router.Router, prx *pyproxy. ), hmacSecret: hmacSecret, cfg: cfg, + knownBots: bots, } } @@ -168,6 +187,8 @@ func GET(s *Server, route string, handler router.Handler) { // some JSON-serialized response. // // It performs bot authentication and authorization based on bots.cfg config. +// +// TODO: Most of this will be gone once Bot Session tokens are rolled out. func JSON[B any, RB RequestBodyConstraint[B]](s *Server, route string, h Handler[B]) { s.router.POST(route, s.middlewares, func(c *router.Context) { ctx := c.Request.Context() @@ -303,36 +324,8 @@ func JSON[B any, RB RequestBodyConstraint[B]](s *Server, route string, h Handler } } - // Authenticate the bot based on the config. Do it in a dry run mode for now - // to compare with the authentication based on the poll state token. Check - // various other bits of the poll state token as well (like enforced - // dimensions). Once it's confirmed there are no differences, the poll state - // token mechanism can be retired. - var dryRunAuthErr error - var ignoreDryRunAuthErr bool - dims := RB(body).ExtractDimensions() - botID, err := botIDFromDimensions(dims) - if err != nil { - logging.Errorf(ctx, "bot_auth: bad bot ID in dims: %s", err) - dryRunAuthErr = err - } else { - if fromPollState := extractBotID(pollState); fromPollState != botID { - logging.Errorf(ctx, "bot_auth: mismatch in bot ID from poll state (%q) and dims (%q)", fromPollState, botID) - } - botGroup := s.cfg.Cached(ctx).BotGroup(botID) - if !sameEnforcedDims(botID, botGroup.Dimensions, pollState.EnforcedDimensions) { - logging.Errorf(ctx, "bot_auth: mismatch in enforced dimensions (%v vs %v)", botGroup.Dimensions, pollState.EnforcedDimensions) - } - dryRunAuthErr = AuthorizeBot(ctx, botID, botGroup.Auth) - if transient.Tag.In(dryRunAuthErr) { - logging.Errorf(ctx, "bot_auth: ignoring transient error when checking bot creds: %s", dryRunAuthErr) - dryRunAuthErr = nil - ignoreDryRunAuthErr = true - } - } - // Extract bot ID from the validated PollToken. - botID = extractBotID(pollState) + botID := extractBotID(pollState) if botID == "" { writeErr(status.Errorf(codes.InvalidArgument, "no bot ID")) return @@ -348,21 +341,15 @@ func JSON[B any, RB RequestBodyConstraint[B]](s *Server, route string, h Handler if transient.Tag.In(err) { writeErr(status.Errorf(codes.Internal, "transient error checking bot credentials: %s", err)) } else { - if !ignoreDryRunAuthErr && dryRunAuthErr == nil { - logging.Errorf(ctx, "bot_auth: bot auth mismatch: bots.cfg method succeeded, but poll token method failed") - } writeErr(status.Errorf(codes.Unauthenticated, "bad bot credentials: %s", err)) } return } - if !ignoreDryRunAuthErr && dryRunAuthErr != nil { - logging.Errorf(ctx, "bot_auth: bot auth mismatch: bots.cfg method failed, but poll token method succeeded; poll token:\n%s", prettyProto(pollState)) - } - // Apply verified state stored in PollState on top of whatever was reported // by the bot. Normally functioning bots should report the same values as // stored in the token. + dims := RB(body).ExtractDimensions() for _, dim := range pollState.EnforcedDimensions { reported := dims[dim.Key] if !slices.Equal(reported, dim.Values) { @@ -379,6 +366,42 @@ func JSON[B any, RB RequestBodyConstraint[B]](s *Server, route string, h Handler return } + // We'll soon switch to using the bot dimensions stored in the datastore. + // Verify they match what the bot is sending via tokens. + var swarmingSID string + switch knownBot, err := s.knownBots(ctx, botID); { + case err != nil: + logging.Errorf(ctx, "Failed to fetch BotInfo of %s: %s", botID, err) + case knownBot == nil: + logging.Errorf(ctx, "Missing BotInfo of %s", botID) + default: + swarmingSID = knownBot.SessionID + fromDS := dimsFlatToString(knownBot.Dimensions) + fromTok := dimsMapToString(dims) + if fromDS != fromTok { + logging.Errorf(ctx, "Dims from datastore != dims from token:\nDatastore: %s\nToken: %s", fromDS, fromTok) + } + } + + // If have a Swarming bot session token, verify it is valid and the + // information inside matches what was extracted above from other + // (deprecated) tokens. This is a dry run check for now. + var swarmingSession *internalspb.Session + if sessionTok := RB(body).ExtractSession(); len(sessionTok) != 0 { + swarmingSession = checkSwarmingSession(ctx, + sessionTok, + s.hmacSecret, + botID, + swarmingSID, + sessionState.GetRbeBotSessionId(), + ) + if swarmingSession != nil { + logging.Infof(ctx, "Swarming session ID: %s", swarmingSession.SessionId) + } + } else { + logging.Infof(ctx, "No Swarming session token") + } + // The request is valid, dispatch it to the handler. resp, err := h(ctx, body, &Request{ BotID: botID, @@ -386,6 +409,7 @@ func JSON[B any, RB RequestBodyConstraint[B]](s *Server, route string, h Handler SessionTokenExpired: sessionTokenExpired, PollState: pollState, Dimensions: dims, + Session: swarmingSession, }) if err != nil { writeErr(err) @@ -406,6 +430,41 @@ func JSON[B any, RB RequestBodyConstraint[B]](s *Server, route string, h Handler }) } +// checkSwarmingSession unmarshals Swarming bot session and compares it to given +// values, logging discrepancies. +// +// This is a dry run check before session tokens become authoritative. +func checkSwarmingSession(ctx context.Context, tok []byte, s *hmactoken.Secret, botID, swarmingSID, rbeSID string) *internalspb.Session { + session, err := botsession.Unmarshal(tok, s) + if err != nil { + logging.Errorf(ctx, "Bad session token: %s", err) + return nil + } + if clock.Now(ctx).After(session.Expiry.AsTime()) { + logging.Errorf(ctx, "Expired session:\n%s", botsession.FormatForDebug(session)) + return nil + } + if session.BotId != botID { + logging.Errorf(ctx, "Wrong bot ID:\n%s", botsession.FormatForDebug(session)) + return nil + } + if session.RbeBotSessionId != rbeSID { + logging.Errorf(ctx, "Wrong RBE session ID:\n%s", botsession.FormatForDebug(session)) + return nil + } + // Following errors are "fine" in a sense that if they happen, something is + // still broken, but we can keep using the token for now. They will all become + // real errors once Swarming bot sessions are fully implemented. + if err := AuthorizeBot(ctx, session.BotId, session.BotConfig.GetBotAuth()); err != nil { + logging.Errorf(ctx, "Failing bot authorization: %s", err) + logging.Errorf(ctx, "Session:\n%s", botsession.FormatForDebug(session)) + } + if session.SessionId != swarmingSID { + logging.Errorf(ctx, "Wrong session ID: %s != %s", session.SessionId, swarmingSID) + } + return session +} + // prettyProto formats a proto message for logs. func prettyProto(msg proto.Message) string { blob, err := prototext.MarshalOptions{ @@ -435,31 +494,6 @@ func botIDFromDimensions(dims map[string][]string) (string, error) { } } -// sameEnforcedDims compares enforced dimensions in bots.cfg to ones in the -// poll state. -// -// This is temporary until the poll state token is removed. -func sameEnforcedDims(botID string, cfgDims map[string][]string, tokDims []*internalspb.PollState_Dimension) bool { - var fromCfg []string - fromCfg = append(fromCfg, "id:"+botID) - for key, vals := range cfgDims { - for _, val := range vals { - fromCfg = append(fromCfg, fmt.Sprintf("%s:%s", key, val)) - } - } - sort.Strings(fromCfg) - - var fromTok []string - for _, d := range tokDims { - for _, val := range d.Values { - fromTok = append(fromTok, fmt.Sprintf("%s:%s", d.Key, val)) - } - } - sort.Strings(fromTok) - - return slices.Equal(fromCfg, fromTok) -} - // checkCredentials checks the bot credentials in the context match what is // required by the PollState. // @@ -548,3 +582,20 @@ func extractBotID(s *internalspb.PollState) string { } return "" } + +// dimsFlatToString converts a flat list of dimensions into a debug string. +func dimsFlatToString(dims []string) string { + return strings.Join(dims, " | ") +} + +// dimsMapToString converts a dimensions map into a debug string. +func dimsMapToString(dims map[string][]string) string { + var kv []string + for k, vals := range dims { + for _, v := range vals { + kv = append(kv, fmt.Sprintf("%s:%s", k, v)) + } + } + sort.Strings(kv) + return dimsFlatToString(kv) +} diff --git a/swarming/server/botsrv/botsrv_test.go b/swarming/server/botsrv/botsrv_test.go index 4df83a56ed..77342c87d5 100644 --- a/swarming/server/botsrv/botsrv_test.go +++ b/swarming/server/botsrv/botsrv_test.go @@ -53,11 +53,13 @@ import ( ) type testRequest struct { + Session []byte Dimensions map[string][]string PollToken []byte SessionToken []byte } +func (r *testRequest) ExtractSession() []byte { return r.Session } func (r *testRequest) ExtractPollToken() []byte { return r.PollToken } func (r *testRequest) ExtractSessionToken() []byte { return r.SessionToken } func (r *testRequest) ExtractDimensions() map[string][]string { return r.Dimensions } @@ -85,6 +87,9 @@ func TestBotHandler(t *testing.T) { Passive: [][]byte{[]byte("also-secret")}, }), cfg: cfgtest.MockConfigs(ctx, cfgtest.NewMockedConfigs()), + knownBots: func(ctx context.Context, botID string) (*KnownBotInfo, error) { + return nil, nil + }, } var lastBody *testRequest diff --git a/swarming/server/cmd/default/main.go b/swarming/server/cmd/default/main.go index c084bbbc4c..cef2de8822 100644 --- a/swarming/server/cmd/default/main.go +++ b/swarming/server/cmd/default/main.go @@ -21,8 +21,8 @@ import ( bbpb "go.chromium.org/luci/buildbucket/proto" "go.chromium.org/luci/common/errors" - "go.chromium.org/luci/common/logging" "go.chromium.org/luci/config/server/cfgmodule" + "go.chromium.org/luci/gae/service/datastore" "go.chromium.org/luci/grpc/prpc" "go.chromium.org/luci/server" "go.chromium.org/luci/server/auth/rpcacl" @@ -135,7 +135,7 @@ func main() { sessionsConns, reservationsConn := rbeConns[:*connPoolSize], rbeConns[*connPoolSize] // A server that can authenticate bot API calls and route them to Python. - botSrv := botsrv.New(srv.Context, cfg, srv.Routes, proxy, srv.Options.CloudProject, tokenSecret) + botSrv := botsrv.New(srv.Context, cfg, srv.Routes, proxy, knownBotProvider, srv.Options.CloudProject, tokenSecret) // A server that actually handles core Bot API calls. botAPI := botapi.NewBotAPIServer(cfg, srv.Options.CloudProject) @@ -168,8 +168,7 @@ func main() { botsrv.JSON(botSrv, "/swarming/api/v1/bot/task_error/:TaskID", botAPI.TaskError) // Bot API endpoints to control RBE session. - rbeSessions := rbe.NewSessionServer(srv.Context, sessionsConns, tokenSecret) - botsrv.JSON(botSrv, "/swarming/api/v1/bot/rbe/ping", pingHandler) + rbeSessions := rbe.NewSessionServer(srv.Context, sessionsConns, tokenSecret, srv.Options.ImageVersion()) botsrv.JSON(botSrv, "/swarming/api/v1/bot/rbe/session/create", rbeSessions.CreateBotSession) botsrv.JSON(botSrv, "/swarming/api/v1/bot/rbe/session/update", rbeSessions.UpdateBotSession) @@ -296,45 +295,21 @@ func main() { }) } -//////////////////////////////////////////////////////////////////////////////// - -// pingRequest is a JSON structure of the ping request payload. -type pingRequest struct { - // Dimensions is dimensions reported by the bot. - Dimensions map[string][]string `json:"dimensions"` - // State is the state reported by the bot. - State map[string]any `json:"state"` - // Version is the bot version. - Version string `json:"version"` - // RBEState is RBE-related state reported by the bot. - RBEState struct { - // Instance if the full RBE instance name to use. - Instance string `json:"instance"` - // PollToken is base64-encoded HMAC-tagged internalspb.PollState. - PollToken []byte `json:"poll_token"` - } `json:"rbe_state"` -} - -func (r *pingRequest) ExtractPollToken() []byte { return r.RBEState.PollToken } -func (r *pingRequest) ExtractSessionToken() []byte { return nil } -func (r *pingRequest) ExtractDimensions() map[string][]string { return r.Dimensions } - -func (r *pingRequest) ExtractDebugRequest() any { - return &pingRequest{ - Dimensions: r.Dimensions, - State: r.State, - Version: r.Version, - } -} - -func pingHandler(ctx context.Context, body *pingRequest, r *botsrv.Request) (botsrv.Response, error) { - logging.Infof(ctx, "Dimensions: %v", r.Dimensions) - logging.Infof(ctx, "PollState: %v", r.PollState) - logging.Infof(ctx, "Bot version: %s", body.Version) - if body.RBEState.Instance != r.PollState.RbeInstance { - logging.Errorf(ctx, "RBE instance mismatch: reported %q, expecting %q", - body.RBEState.Instance, r.PollState.RbeInstance, - ) +// knownBotProvider returns info about a registered bot to use in Bot API. +// +// TODO: This will be very hot. May need to add a cache of some kind to avoid +// hitting the datastore all the time. +func knownBotProvider(ctx context.Context, botID string) (*botsrv.KnownBotInfo, error) { + info := &model.BotInfo{Key: model.BotInfoKey(ctx, botID)} + switch err := datastore.Get(ctx, info); { + case err == nil: + return &botsrv.KnownBotInfo{ + SessionID: info.SessionID, + Dimensions: info.Dimensions, + }, nil + case errors.Is(err, datastore.ErrNoSuchEntity): + return nil, nil + default: + return nil, err } - return nil, nil } diff --git a/swarming/server/rbe/session.go b/swarming/server/rbe/session.go index ebf479b5b1..981e34ccaa 100644 --- a/swarming/server/rbe/session.go +++ b/swarming/server/rbe/session.go @@ -35,6 +35,7 @@ import ( "go.chromium.org/luci/swarming/internal/remoteworkers" internalspb "go.chromium.org/luci/swarming/proto/internals" + "go.chromium.org/luci/swarming/server/botsession" "go.chromium.org/luci/swarming/server/botsrv" "go.chromium.org/luci/swarming/server/hmactoken" ) @@ -43,13 +44,15 @@ import ( type SessionServer struct { rbe remoteworkers.BotsClient hmacSecret *hmactoken.Secret // to generate session tokens + backendVer string // to put into the DebugInfo in the tokens } // NewSessionServer creates a new session server given an RBE client connection. -func NewSessionServer(ctx context.Context, cc []grpc.ClientConnInterface, hmacSecret *hmactoken.Secret) *SessionServer { +func NewSessionServer(ctx context.Context, cc []grpc.ClientConnInterface, hmacSecret *hmactoken.Secret, backendVer string) *SessionServer { return &SessionServer{ rbe: botsConnectionPool(cc), hmacSecret: hmacSecret, + backendVer: backendVer, } } @@ -75,18 +78,27 @@ type WorkerProperties struct { // CreateBotSessionRequest is a body of `/bot/rbe/session/create` request. type CreateBotSessionRequest struct { + // Session is a serialized Swarming Bot Session proto. + Session []byte `json:"session"` + // PollToken is a token produced by Python server in `/bot/poll`. Required. // // This token encodes configuration of the bot maintained by the Python // Swarming server. + // + // TODO: To be removed. PollToken []byte `json:"poll_token"` // SessionToken is a session token of a previous session if recreating it. // // Optional. See the corresponding field in UpdateBotSessionRequest. + // + // TODO: To be removed. SessionToken []byte `json:"session_token,omitempty"` // Dimensions is dimensions reported by the bot. Required. + // + // TODO: To be removed. Dimensions map[string][]string `json:"dimensions"` // BotVersion identifies the bot software. It is reported to RBE as is. @@ -96,6 +108,7 @@ type CreateBotSessionRequest struct { WorkerProperties *WorkerProperties `json:"worker_properties,omitempty"` } +func (r *CreateBotSessionRequest) ExtractSession() []byte { return r.Session } func (r *CreateBotSessionRequest) ExtractPollToken() []byte { return r.PollToken } func (r *CreateBotSessionRequest) ExtractSessionToken() []byte { return r.SessionToken } func (r *CreateBotSessionRequest) ExtractDimensions() map[string][]string { return r.Dimensions } @@ -110,6 +123,12 @@ func (r *CreateBotSessionRequest) ExtractDebugRequest() any { // CreateBotSessionResponse is a body of `/bot/rbe/session/create` response. type CreateBotSessionResponse struct { + // Session is a serialized Swarming Bot Session proto. + // + // It is derived from the session in the request, except it has RBE session + // info populated now. + Session []byte `json:"session"` + // SessionToken is a freshly produced session token. // // It encodes the RBE bot session ID and bot configuration provided via the @@ -117,11 +136,15 @@ type CreateBotSessionResponse struct { // // The session token is needed to call `/bot/rbe/session/update`. This call // also will periodically refresh it. + // + // TODO: To be removed. SessionToken []byte `json:"session_token"` // SessionExpiry is when this session expires, as Unix timestamp in seconds. // // The bot should call `/bot/rbe/session/update` before that time. + // + // TODO: To be removed. Currently unused. SessionExpiry int64 `json:"session_expiry"` // SessionID is an RBE bot session ID as encoded in the token. @@ -149,13 +172,28 @@ func (srv *SessionServer) CreateBotSession(ctx context.Context, body *CreateBotS logging.Errorf(ctx, "Unexpected lease when just opening the session: %s", lease) } + // Associate the RBE session with the Swarming session. + var swarmingSession []byte + if r.Session != nil { + r.Session.RbeBotSessionId = session.Name + r.Session.DebugInfo = botsession.DebugInfo(ctx, srv.backendVer) + r.Session.Expiry = timestamppb.New(clock.Now(ctx).Add(botsession.Expiry)) + swarmingSession, err = botsession.Marshal(r.Session, srv.hmacSecret) + if err != nil { + return nil, status.Errorf(codes.Internal, "could not marshal session token: %s", err) + } + } + // Return the token that wraps the session ID. The bot will use it when // calling `/bot/rbe/session/update`. + // + // TODO: Remove. sessionToken, tokenExpiry, err := srv.genSessionToken(ctx, r.PollState, session.Name) if err != nil { return nil, status.Errorf(codes.Internal, "could not generate session token: %s", err) } return &CreateBotSessionResponse{ + Session: swarmingSession, SessionToken: sessionToken, SessionExpiry: tokenExpiry.Unix(), SessionID: session.Name, @@ -197,10 +235,18 @@ type Lease struct { // If PollToken is present, it will be used to refresh the state stored in the // session token. type UpdateBotSessionRequest struct { + // Session is a serialized Swarming Bot Session proto. + // + // It should have an RBE session info inside, as populated by + // `/bot/rbe/session/create`. + Session []byte `json:"session"` + // SessionToken is a token returned by the previous API call. Required. // // This token is initially returned by `/bot/rbe/session/create` and then // refreshed with every `/bot/rbe/session/update` call. + // + // TODO: To be removed. SessionToken []byte `json:"session_token"` // PollToken is a token produced by Python server in `/bot/poll`. @@ -211,9 +257,13 @@ type UpdateBotSessionRequest struct { // // Internals of this token will be copied into the session token returned in // the response to this call. + // + // TODO: To be removed. PollToken []byte `json:"poll_token,omitempty"` // Dimensions is dimensions reported by the bot. Required. + // + // TODO: To be removed. Dimensions map[string][]string `json:"dimensions"` // BotVersion identifies the bot software. It is reported to RBE as is. @@ -249,6 +299,7 @@ type UpdateBotSessionRequest struct { Lease *Lease `json:"lease,omitempty"` } +func (r *UpdateBotSessionRequest) ExtractSession() []byte { return r.Session } func (r *UpdateBotSessionRequest) ExtractPollToken() []byte { return r.PollToken } func (r *UpdateBotSessionRequest) ExtractSessionToken() []byte { return r.SessionToken } func (r *UpdateBotSessionRequest) ExtractDimensions() map[string][]string { return r.Dimensions } @@ -266,6 +317,12 @@ func (r *UpdateBotSessionRequest) ExtractDebugRequest() any { // UpdateBotSessionResponse is a body of `/bot/rbe/session/update` response. type UpdateBotSessionResponse struct { + // Session is a serialized Swarming Bot Session proto. + // + // It is derived from the session in the request, except it has its expiration + // time bumped. + Session []byte `json:"session"` + // SessionToken is a refreshed session token, if available. // // It carries the same RBE bot session ID inside as the incoming token. The @@ -273,6 +330,8 @@ type UpdateBotSessionResponse struct { // // If the incoming token has expired already, this field will be empty, since // it is not possible to refresh an expired token. + // + // TODO: To be removed. SessionToken []byte `json:"session_token,omitempty"` // SessionExpiry is when this session expires, as Unix timestamp in seconds. @@ -280,6 +339,8 @@ type UpdateBotSessionResponse struct { // The bot should call `/bot/rbe/session/update` again before that time. // // If the session token has expired already, this field will be empty. + // + // TODO: To be removed. Currently unused. SessionExpiry int64 `json:"session_expiry,omitempty"` // The session status as seen by the server, as remoteworkers.BotStatus enum. @@ -372,6 +433,18 @@ func (srv *SessionServer) UpdateBotSession(ctx context.Context, body *UpdateBotS } } + // Bump Swarming bot session expiration time. + var swarmingSession []byte + var err error + if r.Session != nil { + r.Session.DebugInfo = botsession.DebugInfo(ctx, srv.backendVer) + r.Session.Expiry = timestamppb.New(clock.Now(ctx).Add(botsession.Expiry)) + swarmingSession, err = botsession.Marshal(r.Session, srv.hmacSecret) + if err != nil { + return nil, status.Errorf(codes.Internal, "could not marshal session token: %s", err) + } + } + // If there are no pending leases, RBE seems to block for `-10s` // (not doing anything at all if the RPC deadline is less than 10s). var timeout time.Duration @@ -412,6 +485,7 @@ func (srv *SessionServer) UpdateBotSession(ctx context.Context, body *UpdateBotS return nil, status.Errorf(codes.Internal, "could not generate session token: %s", err) } return &UpdateBotSessionResponse{ + Session: swarmingSession, SessionToken: sessionToken, SessionExpiry: tokenExpiry.Unix(), Status: "OK", @@ -524,6 +598,7 @@ func (srv *SessionServer) UpdateBotSession(ctx context.Context, body *UpdateBotS return nil, status.Errorf(codes.Internal, "could not generate session token: %s", err) } resp := &UpdateBotSessionResponse{ + Session: swarmingSession, SessionToken: sessionToken, SessionExpiry: tokenExpiry.Unix(), Status: remoteworkers.BotStatus_name[int32(session.Status)],