Skip to content

Commit

Permalink
initial commit (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict authored Dec 2, 2024
1 parent 7b3431f commit a364f72
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 27 deletions.
10 changes: 9 additions & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ tasks:
--schema-package=https://github.com/synadia-io/nex/api/agent/stop-workload-request=github.com/synadia-io/nex/agentapi/go/gen \
--schema-output=https://github.com/synadia-io/nex/api/agent/stop-workload-request=gen/stop_workload_request.go \
../stop-workload-request.json"

- "go-jsonschema \
--schema-package=https://github.com/synadia-io/nex/api/agent/start-workload-response=github.com/synadia-io/nex/agentapi/go/gen \
--schema-output=https://github.com/synadia-io/nex/api/agent/start-workload-response=gen/start_workload_response.go \
../start-workload-response.json"
- "go-jsonschema \
--schema-package=https://github.com/synadia-io/nex/api/agent/stop-workload-response=github.com/synadia-io/nex/agentapi/go/gen \
--schema-output=https://github.com/synadia-io/nex/api/agent/stop-workload-response=gen/stop_workload_response.go \
../stop-workload-response.json"

gen-schema-nodecontrol:
dir: api/nodecontrol
cmds:
Expand Down
73 changes: 72 additions & 1 deletion agents/common/agent_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package agentcommon

import "github.com/nats-io/nats.go"
import (
"encoding/json"
"errors"
"time"

"github.com/nats-io/nats.go"
agentapi "github.com/synadia-io/nex/api/agent/go"
agentapigen "github.com/synadia-io/nex/api/agent/go/gen"
)

// An agent client is used by a Nex node host to communicate with
// an agent.
Expand All @@ -12,3 +20,66 @@ type AgentClient struct {
func NewAgentClient(nc *nats.Conn, name string) (*AgentClient, error) {
return &AgentClient{internalNatsConn: nc, agentName: name}, nil
}

func (ac *AgentClient) StartWorkload(req *agentapigen.StartWorkloadRequestJson) error {
subject := agentapi.StartWorkloadSubject(ac.agentName)
payload, err := json.Marshal(&req)
if err != nil {
return err
}

res, err := ac.internalNatsConn.Request(subject, payload, 2*time.Second)
if err != nil {
return err
}

var result agentapigen.StartWorkloadResponseJson
err = json.Unmarshal(res.Data, &result)
if err != nil {
return err
}

if !result.Success && result.Error != nil {
return errors.New(*result.Error)
} else if !result.Success {
return errors.New("unknown failure starting workload")
}
return nil
}

func (ac *AgentClient) StopWorkload(workloadId string) error {
subject := agentapi.StopWorkloadSubject(workloadId)
immediate := true // TODO: accept options
reason := "normal" // TODO: accept options

req := &agentapigen.StopWorkloadRequestJson{
Immediate: &immediate,
Reason: &reason,
WorkloadId: workloadId,
}
payload, err := json.Marshal(&req)
if err != nil {
return err
}
res, err := ac.internalNatsConn.Request(subject, payload, 2*time.Second)
if err != nil {
return err
}
var response agentapigen.StopWorkloadResponseJson
err = json.Unmarshal(res.Data, &response)
if err != nil {
return err
}

if !response.Stopped {
return errors.New("Agent indicated workload did not stop")
}

return nil
}

func (ac *AgentClient) TriggerWorkload(workloadId string, payload []byte) ([]byte, error) {
// TODO: implement

return nil, nil
}
37 changes: 24 additions & 13 deletions agents/common/nexagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AgentCallback interface {
StartWorkload(request *agentapigen.StartWorkloadRequestJson) error
StopWorkload(request *agentapigen.StopWorkloadRequestJson) error
ListWorkloads() error
Trigger(workloadId string, data []byte) error
Trigger(workloadId string, data []byte) ([]byte, error)
}

func NewNexAgent(name, version, description string,
Expand Down Expand Up @@ -142,30 +142,41 @@ func (agent *NexAgent) createSubscriptions() error {

func (agent *NexAgent) handleStartWorkload(m *nats.Msg) {
var req agentapigen.StartWorkloadRequestJson
err := json.Unmarshal(m.Data, &req)
if err != nil {
_ = 0 // for linter
// TODO: return error envelope
response := agentapigen.StartWorkloadResponseJson{
Error: nil,
Success: true,
}
err = agent.callback.StartWorkload(&req)
err := json.Unmarshal(m.Data, &req)
if err != nil {
_ = 0 // for linter
// TODO: return error envelope
}
// TODO: return success envelope
msg := "Failed to unmarshal workload start request"
response.Error = &msg
response.Success = false
} else {
response.WorkloadId = &req.WorkloadId
err = agent.callback.StartWorkload(&req)
if err != nil {
msg := fmt.Sprintf("Failed to start workload: %s", err)
response.Error = &msg
response.Success = false
}
}
outBytes, _ := json.Marshal(&response)
_ = m.Respond(outBytes)
}

func (agent *NexAgent) handleTrigger(m *nats.Msg) {

tokens := strings.Split(m.Subject, ".")
// agent.{workload type}.workloads.{workload id}.trigger
workloadId := tokens[3]

err := agent.callback.Trigger(workloadId, m.Data)
res, err := agent.callback.Trigger(workloadId, m.Data)
if err != nil {
_ = 0
// TODO: return error envelope
// TODO: look into using an error envelope here?
}
// TODO: return success envelope

_ = m.Respond(res)
}

func (agent *NexAgent) handleStopWorkload(m *nats.Msg) {
Expand Down
23 changes: 19 additions & 4 deletions agents/javascript/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,26 @@ func (a *JavaScriptAgent) Preflight() error {
func (a *JavaScriptAgent) StartWorkload(req *agentapigen.StartWorkloadRequestJson) error {
a.workloads[req.WorkloadId] = req
// TODO: get script file
f, err := os.ReadFile(req.LocalFilePath)
if err != nil {
return err
}

// TODO: call AddScript on runner
err = a.runner.AddScript(req.WorkloadId, string(f))
if err != nil {
return err
}

return nil
}

func (a *JavaScriptAgent) StopWorkload(req *agentapigen.StopWorkloadRequestJson) error {
delete(a.workloads, req.WorkloadId)

// TODO: call RemoveScript on runner
err := a.runner.RemoveScript(req.WorkloadId)
if err != nil {
return err
}

return nil
}
Expand All @@ -64,6 +74,11 @@ func (a *JavaScriptAgent) ListWorkloads() error {
return nil
}

func (a *JavaScriptAgent) Trigger(workloadId string, payload []byte) error {
return nil
func (a *JavaScriptAgent) Trigger(workloadId string, payload []byte) ([]byte, error) {
payload, err := a.runner.TriggerScript(workloadId, payload)
if err != nil {
return nil, err
}

return payload, nil
}
8 changes: 7 additions & 1 deletion api/agent/go/gen/start_workload_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions api/agent/go/gen/start_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions api/agent/go/gen/stop_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions api/agent/go/subjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@ func StartWorkloadSubscribeSubject(workloadType string) string {
return fmt.Sprintf("agent.%s.workloads.start", workloadType)
}

func StartWorkloadSubject(workloadType string) string {
return fmt.Sprintf("agent.%s.workloads.start", workloadType)
}

// These _could_ be shared in a single function, but if we decide to change
// the wildcards in the future, we're ok and don't need to refactor
func StopWorkloadSubscribeSubject(workloadType string) string {
return fmt.Sprintf("agent.%s.workloads.stop", workloadType)
}

func StopWorkloadSubject(workloadType string) string {
return fmt.Sprintf("agent.%s.workloads.stop", workloadType)
}

func ListWorkloadsSubscribeSubject(workloadType string) string {
return fmt.Sprintf("agent.%s.workloads.list", workloadType)
}
Expand Down
9 changes: 6 additions & 3 deletions api/agent/start-workload-request.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
"properties": {
"workloadId": {
"type": "string",
"format": "uuid",
"description": "The unique identifier of the workload to start."
"description": "The unique identifier (nuid) of the workload to start."
},
"name": {
"type": "string",
Expand All @@ -33,6 +32,10 @@
"type": "string"
}
},
"localFilePath": {
"type": "string",
"description": "The path to the cached artifact to execute (e.g. wasm binary, .js file, etc)"
},
"workloadType": {
"type": "string",
"description": "Type of the workload"
Expand All @@ -49,7 +52,7 @@
"description": "A map containing environment variables, applicable for native workload types"
}
},
"required": ["workloadId", "name", "namespace", "totalBytes", "hash", "workloadType"],
"required": ["workloadId", "name", "namespace", "totalBytes", "hash", "workloadType", "localFilePath"],
"additionalProperties": false
}

23 changes: 23 additions & 0 deletions api/agent/start-workload-response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/synadia-io/nex/api/agent/start-workload-response",
"title": "StartWorkloadResponse",
"type": "object",
"properties": {
"workloadId": {
"type": "string",
"description": "The unique identifier of the workload started. This is a nuid"
},
"error": {
"type": "string",
"description": "Optional error text in case of workload start failure"
},
"success": {
"type": "boolean",
"description": "Indicates whether the workload was successfully started"
}
},
"required": ["success"],
"additionalProperties": false
}

16 changes: 16 additions & 0 deletions api/agent/stop-workload-response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://github.com/synadia-io/nex/api/agent/stop-workload-response",
"title": "StopWorkloadResponse",
"type": "object",
"properties": {
"stopped": {
"type": "boolean"
}
},
"required": [
"stopped"
],
"definitions": {},
"additionalProperties": false
}
Loading

0 comments on commit a364f72

Please sign in to comment.