diff --git a/api/agent/go/types.go b/api/agent/go/types.go new file mode 100644 index 00000000..1e3f3b83 --- /dev/null +++ b/api/agent/go/types.go @@ -0,0 +1,8 @@ +package agentapi + +const ( + WorkloadTypeDirect = "direct_start" + WorkloadTypeMicroVM = "microvm" + WorkloadTypeJavaScript = "javascript" + WorkloadTypeWasm = "wasm" +) diff --git a/node/internal/actors/control_api.go b/node/internal/actors/control_api.go index fd42fc40..925c4948 100644 --- a/node/internal/actors/control_api.go +++ b/node/internal/actors/control_api.go @@ -416,8 +416,6 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) { return } - // TODO: obtain host services credentials from workload (creds service call) - askResp, err := api.self.Ask(ctx, agent, startRequestToProto(req)) if err != nil { api.logger.Error("Failed to start workload", slog.Any("error", err)) diff --git a/node/internal/actors/external_agent.go b/node/internal/actors/external_agent.go index e776a1e1..7f71305f 100644 --- a/node/internal/actors/external_agent.go +++ b/node/internal/actors/external_agent.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "net/url" + "strings" "time" "github.com/nats-io/nats.go" @@ -16,6 +17,7 @@ import ( goakt "github.com/tochemey/goakt/v2/actors" "github.com/tochemey/goakt/v2/goaktpb" + agentapi "github.com/synadia-io/nex/api/agent/go" agentapigen "github.com/synadia-io/nex/api/agent/go/gen" actorproto "github.com/synadia-io/nex/node/internal/actors/pb" ) @@ -29,16 +31,18 @@ const ( // via environment variables, while the actor itself connects to the internal NATS via the // `internalNatsUrl` and the `hostUserKeypair` type ExternalAgent struct { - agentOptions models.AgentOptions - agentBinaryCreds AgentCredential - hostUserKeypair nkeys.KeyPair - logger *slog.Logger - self *goakt.PID - internalConn *nats.Conn - controlConn *nats.Conn - agentClient *agentcommon.AgentClient - internalNatsUrl string - nodeOptions *models.NodeOptions + agentOptions models.AgentOptions + agentBinaryCreds AgentCredential + hostUserKeypair nkeys.KeyPair + logger *slog.Logger + self *goakt.PID + internalConn *nats.Conn + controlConn *nats.Conn + workloadHostServicesConns map[string]*nats.Conn + agentClient *agentcommon.AgentClient + internalNatsUrl string + nodeOptions *models.NodeOptions + subz map[string][]*nats.Subscription } func CreateExternalAgent(logger *slog.Logger, @@ -50,13 +54,15 @@ func CreateExternalAgent(logger *slog.Logger, nodeOptions *models.NodeOptions) *ExternalAgent { return &ExternalAgent{ - agentOptions: agentOptions, - agentBinaryCreds: agentBinaryCreds, - hostUserKeypair: hostUserKeyPair, - logger: logger, - internalNatsUrl: internalNatsUrl, - nodeOptions: nodeOptions, - controlConn: nc, + agentOptions: agentOptions, + agentBinaryCreds: agentBinaryCreds, + hostUserKeypair: hostUserKeyPair, + logger: logger, + internalNatsUrl: internalNatsUrl, + nodeOptions: nodeOptions, + controlConn: nc, + workloadHostServicesConns: make(map[string]*nats.Conn), + subz: make(map[string][]*nats.Subscription), } } @@ -115,7 +121,8 @@ func (a *ExternalAgent) RegisteredAgentReceive(ctx *goakt.ReceiveContext) { a.startWorkload(ctx, msg) case *actorproto.StopWorkload: a.stopWorkload(ctx, msg) - // TODO: implement TriggerWorkload + // NOTE: we don't respond to a trigger workload here because this agent actor + // is directly subscribing to the trigger subjects for all workloads of this type default: ctx.Unhandled() } @@ -129,6 +136,13 @@ func (a *ExternalAgent) stopWorkload(ctx *goakt.ReceiveContext, req *actorproto. slog.Any("error", err)) return } + for _, sub := range a.subz[req.WorkloadId] { + _ = sub.Unsubscribe() + } + if hsConn, ok := a.workloadHostServicesConns[req.WorkloadId]; ok { + _ = hsConn.Drain() + delete(a.workloadHostServicesConns, req.WorkloadId) + } } func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto.StartWorkload) { @@ -153,10 +167,29 @@ func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto WorkloadId: nuid.Next(), WorkloadType: req.WorkloadType, } - // TODO: create a host services client for the new workload + hsConn, err := a.createHostServicesConnection(req) + if err != nil { + a.logger.Error("Failed to create connection to host services for workload", + slog.String("agent_name", a.agentOptions.Name), + slog.String("workload_name", req.WorkloadName), + slog.Any("error", err)) + return + } + a.workloadHostServicesConns[reqJson.WorkloadId] = hsConn - // TODO: if there are trigger subjects, subscribe to them on the host services connection - // (creds come from the workload request) + // TODO: remove magic string + if req.WorkloadType != agentapi.WorkloadTypeDirect && + req.WorkloadType != agentapi.WorkloadTypeMicroVM && + len(req.TriggerSubjects) > 0 { + err := a.subscribeToTriggerSubjects(req, reqJson.WorkloadId, hsConn) + if err != nil { + a.logger.Error("Failed to subscribe to trigger subjects on host services connection", + slog.String("agent_name", a.agentOptions.Name), + slog.String("workload_name", req.WorkloadName), + slog.Any("error", err)) + return + } + } err = a.agentClient.StartWorkload(reqJson) if err != nil { @@ -167,6 +200,93 @@ func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto } } +func (a *ExternalAgent) createHostServicesConnection(request *actorproto.StartWorkload) (*nats.Conn, error) { + natsOpts := []nats.Option{} + if len(strings.TrimSpace(request.WorkloadName)) > 0 { + natsOpts = append(natsOpts, nats.Name("nex-hostservices-"+strings.ToLower(request.WorkloadName))) + } else { + natsOpts = append(natsOpts, nats.Name("nex-hostservices")) + } + var url string + if request.HostServiceConfig != nil { + natsOpts = append(natsOpts, + nats.UserJWTAndSeed(request.HostServiceConfig.NatsUserJwt, + request.HostServiceConfig.NatsUserSeed)) + url = request.HostServiceConfig.NatsUrl + } else if len(strings.TrimSpace(a.nodeOptions.HostServiceOptions.NatsUserSeed)) > 0 { + natsOpts = append(natsOpts, + nats.UserJWTAndSeed(a.nodeOptions.HostServiceOptions.NatsUserJwt, + a.nodeOptions.HostServiceOptions.NatsUserSeed)) + if len(strings.TrimSpace(a.nodeOptions.HostServiceOptions.NatsUrl)) != 0 { + url = a.nodeOptions.HostServiceOptions.NatsUrl + } else { + url = a.controlConn.Servers()[0] + } + } else { + if a.controlConn.Opts.UserJWT != nil { + natsOpts = append(natsOpts, + nats.UserJWT(a.controlConn.Opts.UserJWT, a.controlConn.Opts.SignatureCB)) + } + + url = a.controlConn.Opts.Url + } + + a.logger.Debug("Attempting to establish host services connection for workload", + slog.String("workload_name", request.WorkloadName), + slog.String("url", url)) + + nc, err := nats.Connect(url, natsOpts...) + if err != nil { + a.logger.Error("Failed to establish host services connection for workload", + slog.String("workload_name", request.WorkloadName), + slog.String("url", url), + slog.Any("error", err)) + return nil, err + } + + return nc, nil +} + +func (a *ExternalAgent) subscribeToTriggerSubjects(request *actorproto.StartWorkload, workloadId string, conn *nats.Conn) error { + for _, tsub := range request.TriggerSubjects { + sub, err := conn.QueueSubscribe(tsub, tsub, a.generateTriggerHandler(workloadId, tsub)) + if err != nil { + return err + } + a.subz[workloadId] = append(a.subz[workloadId], sub) + } + return nil +} + +func (a *ExternalAgent) generateTriggerHandler(workloadId string, tsub string) func(m *nats.Msg) { + + agentClient := a.agentClient + agentName := a.agentOptions.Name + + return func(msg *nats.Msg) { + + bytes, err := agentClient.TriggerWorkload(workloadId, msg.Data) + if err != nil { + // TODO: respond with error envelope + a.logger.Error("Failed to trigger workload", + slog.String("subscribe_subject", tsub), + slog.String("receive_subject", msg.Subject), + slog.String("agent_name", agentName), + slog.Any("error", err)) + return + } + err = msg.Respond(bytes) + if err != nil { + a.logger.Error("Failed to respond to trigger subject request", + slog.String("subject", tsub), + slog.String("agent_name", agentName), + slog.String("receive_subject", msg.Subject), + slog.Any("error", err)) + return + } + } +} + func (a *ExternalAgent) startBinary() error { artRef, err := GetArtifact(a.agentOptions.Name, a.agentOptions.Uri, a.controlConn) if err != nil {