Skip to content

Commit

Permalink
set up host services conn and subscribe to trigger subjects (#446)
Browse files Browse the repository at this point in the history
* set up host services conn and subscribe to trigger subjects

* changing logic for triggerable
  • Loading branch information
autodidaddict authored Dec 3, 2024
1 parent a364f72 commit e5bcb4b
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 23 deletions.
8 changes: 8 additions & 0 deletions api/agent/go/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package agentapi

const (
WorkloadTypeDirect = "direct_start"
WorkloadTypeMicroVM = "microvm"
WorkloadTypeJavaScript = "javascript"
WorkloadTypeWasm = "wasm"
)
2 changes: 0 additions & 2 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,6 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) {
return
}

// TODO: obtain host services credentials from workload (creds service call)

askResp, err := api.self.Ask(ctx, agent, startRequestToProto(req))
if err != nil {
api.logger.Error("Failed to start workload", slog.Any("error", err))
Expand Down
162 changes: 141 additions & 21 deletions node/internal/actors/external_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"net/url"
"strings"
"time"

"github.com/nats-io/nats.go"
Expand All @@ -16,6 +17,7 @@ import (
goakt "github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/goaktpb"

agentapi "github.com/synadia-io/nex/api/agent/go"
agentapigen "github.com/synadia-io/nex/api/agent/go/gen"
actorproto "github.com/synadia-io/nex/node/internal/actors/pb"
)
Expand All @@ -29,16 +31,18 @@ const (
// via environment variables, while the actor itself connects to the internal NATS via the
// `internalNatsUrl` and the `hostUserKeypair`
type ExternalAgent struct {
agentOptions models.AgentOptions
agentBinaryCreds AgentCredential
hostUserKeypair nkeys.KeyPair
logger *slog.Logger
self *goakt.PID
internalConn *nats.Conn
controlConn *nats.Conn
agentClient *agentcommon.AgentClient
internalNatsUrl string
nodeOptions *models.NodeOptions
agentOptions models.AgentOptions
agentBinaryCreds AgentCredential
hostUserKeypair nkeys.KeyPair
logger *slog.Logger
self *goakt.PID
internalConn *nats.Conn
controlConn *nats.Conn
workloadHostServicesConns map[string]*nats.Conn
agentClient *agentcommon.AgentClient
internalNatsUrl string
nodeOptions *models.NodeOptions
subz map[string][]*nats.Subscription
}

func CreateExternalAgent(logger *slog.Logger,
Expand All @@ -50,13 +54,15 @@ func CreateExternalAgent(logger *slog.Logger,
nodeOptions *models.NodeOptions) *ExternalAgent {

return &ExternalAgent{
agentOptions: agentOptions,
agentBinaryCreds: agentBinaryCreds,
hostUserKeypair: hostUserKeyPair,
logger: logger,
internalNatsUrl: internalNatsUrl,
nodeOptions: nodeOptions,
controlConn: nc,
agentOptions: agentOptions,
agentBinaryCreds: agentBinaryCreds,
hostUserKeypair: hostUserKeyPair,
logger: logger,
internalNatsUrl: internalNatsUrl,
nodeOptions: nodeOptions,
controlConn: nc,
workloadHostServicesConns: make(map[string]*nats.Conn),
subz: make(map[string][]*nats.Subscription),
}
}

Expand Down Expand Up @@ -115,7 +121,8 @@ func (a *ExternalAgent) RegisteredAgentReceive(ctx *goakt.ReceiveContext) {
a.startWorkload(ctx, msg)
case *actorproto.StopWorkload:
a.stopWorkload(ctx, msg)
// TODO: implement TriggerWorkload
// NOTE: we don't respond to a trigger workload here because this agent actor
// is directly subscribing to the trigger subjects for all workloads of this type
default:
ctx.Unhandled()
}
Expand All @@ -129,6 +136,13 @@ func (a *ExternalAgent) stopWorkload(ctx *goakt.ReceiveContext, req *actorproto.
slog.Any("error", err))
return
}
for _, sub := range a.subz[req.WorkloadId] {
_ = sub.Unsubscribe()
}
if hsConn, ok := a.workloadHostServicesConns[req.WorkloadId]; ok {
_ = hsConn.Drain()
delete(a.workloadHostServicesConns, req.WorkloadId)
}
}

func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto.StartWorkload) {
Expand All @@ -153,10 +167,29 @@ func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto
WorkloadId: nuid.Next(),
WorkloadType: req.WorkloadType,
}
// TODO: create a host services client for the new workload
hsConn, err := a.createHostServicesConnection(req)
if err != nil {
a.logger.Error("Failed to create connection to host services for workload",
slog.String("agent_name", a.agentOptions.Name),
slog.String("workload_name", req.WorkloadName),
slog.Any("error", err))
return
}
a.workloadHostServicesConns[reqJson.WorkloadId] = hsConn

// TODO: if there are trigger subjects, subscribe to them on the host services connection
// (creds come from the workload request)
// TODO: remove magic string
if req.WorkloadType != agentapi.WorkloadTypeDirect &&
req.WorkloadType != agentapi.WorkloadTypeMicroVM &&
len(req.TriggerSubjects) > 0 {
err := a.subscribeToTriggerSubjects(req, reqJson.WorkloadId, hsConn)
if err != nil {
a.logger.Error("Failed to subscribe to trigger subjects on host services connection",
slog.String("agent_name", a.agentOptions.Name),
slog.String("workload_name", req.WorkloadName),
slog.Any("error", err))
return
}
}

err = a.agentClient.StartWorkload(reqJson)
if err != nil {
Expand All @@ -167,6 +200,93 @@ func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto
}
}

func (a *ExternalAgent) createHostServicesConnection(request *actorproto.StartWorkload) (*nats.Conn, error) {
natsOpts := []nats.Option{}
if len(strings.TrimSpace(request.WorkloadName)) > 0 {
natsOpts = append(natsOpts, nats.Name("nex-hostservices-"+strings.ToLower(request.WorkloadName)))
} else {
natsOpts = append(natsOpts, nats.Name("nex-hostservices"))
}
var url string
if request.HostServiceConfig != nil {
natsOpts = append(natsOpts,
nats.UserJWTAndSeed(request.HostServiceConfig.NatsUserJwt,
request.HostServiceConfig.NatsUserSeed))
url = request.HostServiceConfig.NatsUrl
} else if len(strings.TrimSpace(a.nodeOptions.HostServiceOptions.NatsUserSeed)) > 0 {
natsOpts = append(natsOpts,
nats.UserJWTAndSeed(a.nodeOptions.HostServiceOptions.NatsUserJwt,
a.nodeOptions.HostServiceOptions.NatsUserSeed))
if len(strings.TrimSpace(a.nodeOptions.HostServiceOptions.NatsUrl)) != 0 {
url = a.nodeOptions.HostServiceOptions.NatsUrl
} else {
url = a.controlConn.Servers()[0]
}
} else {
if a.controlConn.Opts.UserJWT != nil {
natsOpts = append(natsOpts,
nats.UserJWT(a.controlConn.Opts.UserJWT, a.controlConn.Opts.SignatureCB))
}

url = a.controlConn.Opts.Url
}

a.logger.Debug("Attempting to establish host services connection for workload",
slog.String("workload_name", request.WorkloadName),
slog.String("url", url))

nc, err := nats.Connect(url, natsOpts...)
if err != nil {
a.logger.Error("Failed to establish host services connection for workload",
slog.String("workload_name", request.WorkloadName),
slog.String("url", url),
slog.Any("error", err))
return nil, err
}

return nc, nil
}

func (a *ExternalAgent) subscribeToTriggerSubjects(request *actorproto.StartWorkload, workloadId string, conn *nats.Conn) error {
for _, tsub := range request.TriggerSubjects {
sub, err := conn.QueueSubscribe(tsub, tsub, a.generateTriggerHandler(workloadId, tsub))
if err != nil {
return err
}
a.subz[workloadId] = append(a.subz[workloadId], sub)
}
return nil
}

func (a *ExternalAgent) generateTriggerHandler(workloadId string, tsub string) func(m *nats.Msg) {

agentClient := a.agentClient
agentName := a.agentOptions.Name

return func(msg *nats.Msg) {

bytes, err := agentClient.TriggerWorkload(workloadId, msg.Data)
if err != nil {
// TODO: respond with error envelope
a.logger.Error("Failed to trigger workload",
slog.String("subscribe_subject", tsub),
slog.String("receive_subject", msg.Subject),
slog.String("agent_name", agentName),
slog.Any("error", err))
return
}
err = msg.Respond(bytes)
if err != nil {
a.logger.Error("Failed to respond to trigger subject request",
slog.String("subject", tsub),
slog.String("agent_name", agentName),
slog.String("receive_subject", msg.Subject),
slog.Any("error", err))
return
}
}
}

func (a *ExternalAgent) startBinary() error {
artRef, err := GetArtifact(a.agentOptions.Name, a.agentOptions.Uri, a.controlConn)
if err != nil {
Expand Down

0 comments on commit e5bcb4b

Please sign in to comment.