Skip to content

Commit

Permalink
proto envelope
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Rash <[email protected]>
  • Loading branch information
jordan-rash committed Nov 26, 2024
1 parent e98108a commit d64f745
Show file tree
Hide file tree
Showing 5 changed files with 613 additions and 422 deletions.
53 changes: 47 additions & 6 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,15 +386,28 @@ func (api *ControlAPI) handleADeploy(m *nats.Msg) {
return
}

protoResp, ok := askResp.(*actorproto.WorkloadStarted)
protoResp, ok := askResp.(*actorproto.Envelope)
if !ok {
api.logger.Error("Start workload response from agent was not the correct type")
models.RespondEnvelope(m, RunResponseType, 500, "", "Agent returned the wrong data type")
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(protoResp), "")
if protoResp.Error != nil {
api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error))
return
}

var workloadStarted actorproto.WorkloadStarted
err = protoResp.Payload.UnmarshalTo(&workloadStarted)
if err != nil {
api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
}

func (api *ControlAPI) handleDeploy(m *nats.Msg) {
Expand All @@ -421,14 +434,28 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) {
return
}

protoResp, ok := askResp.(*actorproto.WorkloadStarted)
protoResp, ok := askResp.(*actorproto.Envelope)
if !ok {
api.logger.Error("Start workload response from agent was not the correct type")
models.RespondEnvelope(m, RunResponseType, 500, "", "Agent returned the wrong data type")
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(protoResp), "")
if protoResp.Error != nil {
api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error))
return
}

var workloadStarted actorproto.WorkloadStarted
err = protoResp.Payload.UnmarshalTo(&workloadStarted)
if err != nil {
api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
}

func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
Expand All @@ -455,14 +482,28 @@ func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
return
}

protoResp, ok := askResp.(*actorproto.WorkloadStopped)
protoResp, ok := askResp.(*actorproto.Envelope)
if !ok {
api.logger.Error("Workload stop response from agent was not the correct type")
models.RespondEnvelope(m, StopResponseType, 500, "", "Agent returned the wrong data type for workload stop")
return
}

models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(protoResp), "")
if protoResp.Error != nil {
api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error))
return
}

var workloadStopped actorproto.WorkloadStopped
err = protoResp.Payload.UnmarshalTo(&workloadStopped)
if err != nil {
api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(&workloadStopped), "")
}

func (api *ControlAPI) handleInfo(m *nats.Msg) {
Expand Down
32 changes: 27 additions & 5 deletions node/internal/actors/direct_start_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/synadia-io/nex/models"
goakt "github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/goaktpb"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

actorproto "github.com/synadia-io/nex/node/internal/actors/pb"
Expand Down Expand Up @@ -52,29 +53,50 @@ func (a *DirectStartAgent) PostStop(ctx context.Context) error {
}

func (a *DirectStartAgent) Receive(ctx *goakt.ReceiveContext) {
resp := new(actorproto.Envelope)

switch m := ctx.Message().(type) {
case *goaktpb.PostStart:
a.self = ctx.Self()
a.startedAt = time.Now()
a.logger.Info("Direct start agent is running", slog.String("name", ctx.Self().Name()))
case *actorproto.StartWorkload:
var err error
a.logger.Debug("StartWorkload received", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadName))
resp, err := a.startWorkload(m)
ws, err := a.startWorkload(m)
if err != nil {
a.logger.Error("Failed to start workload", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadName), slog.Any("err", err))
ctx.Unhandled()
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}
resp.Payload, err = anypb.New(ws)
if err != nil {
a.logger.Error("Failed to marshal workload started", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadName), slog.Any("err", err))
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}
a.runRequest[resp.Id] = m

a.runRequest[ws.Id] = m
ctx.Response(resp)
case *actorproto.StopWorkload:
a.logger.Debug("StopWorkload received", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadId))
resp, err := a.stopWorkload(m)
ws, err := a.stopWorkload(m)
if err != nil {
a.logger.Error("Failed to stop workload", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadId), slog.Any("err", err))
ctx.Unhandled()
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}
resp.Payload, err = anypb.New(ws)
if err != nil {
a.logger.Error("Failed to marshal workload stopped", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadId), slog.Any("err", err))
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}

delete(a.runRequest, m.WorkloadId)
ctx.Response(resp)
case *actorproto.QueryWorkloads:
Expand Down
Loading

0 comments on commit d64f745

Please sign in to comment.