Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Messages Erroring out Actors #443

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (a *ControlAPI) Receive(ctx *goakt.ReceiveContext) {
if err != nil {
_ = a.shutdown()
ctx.Err(err)
return
}
a.logger.Info("Control API NATS server is running", slog.String("name", ctx.Self().Name()))
default:
Expand Down Expand Up @@ -385,15 +386,28 @@ func (api *ControlAPI) handleADeploy(m *nats.Msg) {
return
}

protoResp, ok := askResp.(*actorproto.WorkloadStarted)
protoResp, ok := askResp.(*actorproto.Envelope)
if !ok {
api.logger.Error("Start workload response from agent was not the correct type")
models.RespondEnvelope(m, RunResponseType, 500, "", "Agent returned the wrong data type")
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(protoResp), "")
if protoResp.Error != nil {
api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error))
return
}

var workloadStarted actorproto.WorkloadStarted
err = protoResp.Payload.UnmarshalTo(&workloadStarted)
if err != nil {
api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
}

func (api *ControlAPI) handleDeploy(m *nats.Msg) {
Expand All @@ -420,14 +434,28 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) {
return
}

protoResp, ok := askResp.(*actorproto.WorkloadStarted)
protoResp, ok := askResp.(*actorproto.Envelope)
if !ok {
api.logger.Error("Start workload response from agent was not the correct type")
models.RespondEnvelope(m, RunResponseType, 500, "", "Agent returned the wrong data type")
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(protoResp), "")
if protoResp.Error != nil {
api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error))
return
}

var workloadStarted actorproto.WorkloadStarted
err = protoResp.Payload.UnmarshalTo(&workloadStarted)
if err != nil {
api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
}

func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
Expand All @@ -454,14 +482,28 @@ func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
return
}

protoResp, ok := askResp.(*actorproto.WorkloadStopped)
protoResp, ok := askResp.(*actorproto.Envelope)
if !ok {
api.logger.Error("Workload stop response from agent was not the correct type")
models.RespondEnvelope(m, StopResponseType, 500, "", "Agent returned the wrong data type for workload stop")
return
}

models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(protoResp), "")
if protoResp.Error != nil {
api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error))
return
}

var workloadStopped actorproto.WorkloadStopped
err = protoResp.Payload.UnmarshalTo(&workloadStopped)
if err != nil {
api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err))
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(&workloadStopped), "")
}

func (api *ControlAPI) handleInfo(m *nats.Msg) {
Expand Down
39 changes: 32 additions & 7 deletions node/internal/actors/direct_start_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/synadia-io/nex/models"
goakt "github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/goaktpb"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

actorproto "github.com/synadia-io/nex/node/internal/actors/pb"
Expand Down Expand Up @@ -52,34 +53,58 @@ func (a *DirectStartAgent) PostStop(ctx context.Context) error {
}

func (a *DirectStartAgent) Receive(ctx *goakt.ReceiveContext) {
resp := new(actorproto.Envelope)

switch m := ctx.Message().(type) {
case *goaktpb.PostStart:
a.self = ctx.Self()
a.startedAt = time.Now()
a.logger.Info("Direct start agent is running", slog.String("name", ctx.Self().Name()))
case *actorproto.StartWorkload:
var err error
a.logger.Debug("StartWorkload received", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadName))
resp, err := a.startWorkload(m)
ws, err := a.startWorkload(m)
if err != nil {
a.logger.Error("Failed to start workload", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadName), slog.Any("err", err))
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}
resp.Payload, err = anypb.New(ws)
if err != nil {
ctx.Err(err)
a.logger.Error("Failed to marshal workload started", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadName), slog.Any("err", err))
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}
a.runRequest[resp.Id] = m

a.runRequest[ws.Id] = m
ctx.Response(resp)
case *actorproto.StopWorkload:
a.logger.Debug("StopWorkload received", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadId))
resp, err := a.stopWorkload(m)
ws, err := a.stopWorkload(m)
if err != nil {
a.logger.Error("Failed to stop workload", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadId), slog.Any("err", err))
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}
resp.Payload, err = anypb.New(ws)
if err != nil {
ctx.Err(err)
a.logger.Error("Failed to marshal workload stopped", slog.String("name", ctx.Self().Name()), slog.String("workload", m.WorkloadId), slog.Any("err", err))
resp.Error = &actorproto.Error{Message: err.Error()}
ctx.Response(resp)
return
}

delete(a.runRequest, m.WorkloadId)
ctx.Response(resp)
case *actorproto.QueryWorkloads:
a.logger.Debug("QueryWorkloads received", slog.String("name", ctx.Self().Name()))
resp, err := a.queryWorkloads(m)
if err != nil {
ctx.Err(err)
a.logger.Error("Failed to query workloads", slog.String("name", ctx.Self().Name()), slog.Any("err", err))
ctx.Unhandled()
return
}
ctx.Response(resp)
Expand Down Expand Up @@ -109,7 +134,7 @@ func (a *DirectStartAgent) Receive(ctx *goakt.ReceiveContext) {
workloads, err := a.queryWorkloads(&actorproto.QueryWorkloads{})
if err != nil {
a.logger.Error("Failed to query workloads", slog.String("name", ctx.Self().Name()), slog.Any("err", err))
ctx.Err(err)
ctx.Unhandled()
return
}

Expand Down
2 changes: 0 additions & 2 deletions node/internal/actors/external_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ func (a *ExternalAgent) RegisteredAgentReceive(ctx *goakt.ReceiveContext) {

func (a *ExternalAgent) startWorkload(ctx *goakt.ReceiveContext, req *actorproto.StartWorkload) {
// TODO: send start workload request to agent

// TODO: handle result (ctx.Error, etc)
}

func (a *ExternalAgent) startBinary() error {
Expand Down
Loading
Loading