Skip to content

Commit

Permalink
allows each host service to be configured via a portion of the node c…
Browse files Browse the repository at this point in the history
…onfig (#223)

* allows each host service to be configured via a portion of the node config

* linter

* reuse main connection url if nats url is empty in hsconfig

* default host services nats url to empty sentinel string

* overflow! run away!

* removing debug
  • Loading branch information
autodidaddict authored May 14, 2024
1 parent c4b5a7f commit 947a70e
Show file tree
Hide file tree
Showing 14 changed files with 387 additions and 122 deletions.
42 changes: 42 additions & 0 deletions examples/nodeconfigs/hostservices_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"default_resource_dir": "/tmp/wd",
"machine_pool_size": 1,
"cni": {
"network_name": "fcnet",
"interface_name": "veth0"
},
"machine_template": {
"vcpu_count": 1,
"memsize_mib": 256
},
"tags": {
"simple": "true"
},
"host_services": {
"nats_url": "nats://0.0.0.0:4222",
"nats_user_jwt": "",
"nats_user_seed": "",
"services": {
"kv": {
"enabled": true,
"config": {
"bucket_name": "SAMPLEHSKV_${namespace}_${workload_name}",
"max_bytes": 0,
"jit_provision": false
}
},
"messaging": {
"enabled": true,
"config": {}
},
"objectstore": {
"enabled": true,
"config": {}
},
"http": {
"enabled": false,
"config": {}
}
}
}
}
16 changes: 11 additions & 5 deletions host-services/builtins/builtins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ func TestKvBuiltin(t *testing.T) {
bClient := NewBuiltinServicesClient(client)

service, _ := NewKeyValueService(nc, slog.Default())
_ = server.AddService("kv", service, make(map[string]string))
err := server.AddService("kv", service, nil)
if err != nil {
t.Fatalf("Failed to add service: %s", err)
}

_ = server.Start()
err = server.Start()
if err != nil {
t.Fatalf("Failed to start server: %s", err)
}

_, err := bClient.KVSet("testone", []byte{9, 8, 7, 6, 5})
_, err = bClient.KVSet("testone", []byte{9, 8, 7, 6, 5})
if err != nil {
t.Fatalf("Got an error setting kv: %s", err.Error())
}
Expand Down Expand Up @@ -79,7 +85,7 @@ func TestMessagingBuiltin(t *testing.T) {
bClient := NewBuiltinServicesClient(client)

service, _ := NewMessagingService(nc, slog.Default())
_ = server.AddService("messaging", service, make(map[string]string))
_ = server.AddService("messaging", service, nil)
_ = server.Start()

wg := new(sync.WaitGroup)
Expand All @@ -105,7 +111,7 @@ func TestObjectBuiltin(t *testing.T) {
bClient := NewBuiltinServicesClient(client)

service, _ := NewObjectStoreService(nc, slog.Default())
_ = server.AddService("objectstore", service, make(map[string]string))
_ = server.AddService("objectstore", service, []byte{})
_ = server.Start()

res, err := bClient.ObjectPut("objecttest", []byte{100, 101, 102})
Expand Down
2 changes: 1 addition & 1 deletion host-services/builtins/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewHTTPService(nc *nats.Conn, log *slog.Logger) (*HTTPService, error) {
return http, nil
}

func (h *HTTPService) Initialize(_ map[string]string) error {
func (h *HTTPService) Initialize(_ json.RawMessage) error {
return nil
}

Expand Down
38 changes: 31 additions & 7 deletions host-services/builtins/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"regexp"

"github.com/nats-io/nats.go"
hostservices "github.com/synadia-io/nex/host-services"
Expand All @@ -17,8 +18,15 @@ const kvServiceMethodDelete = "delete"
const kvServiceMethodKeys = "keys"

type KeyValueService struct {
log *slog.Logger
nc *nats.Conn
log *slog.Logger
nc *nats.Conn
config kvConfig
}

type kvConfig struct {
BucketName string `json:"bucket_name"`
MaxBytes int `json:"max_bytes"`
JitProvision bool `json:"jit_provision"`
}

func NewKeyValueService(nc *nats.Conn, log *slog.Logger) (*KeyValueService, error) {
Expand All @@ -30,7 +38,19 @@ func NewKeyValueService(nc *nats.Conn, log *slog.Logger) (*KeyValueService, erro
return kv, nil
}

func (k *KeyValueService) Initialize(_ map[string]string) error {
func (k *KeyValueService) Initialize(config json.RawMessage) error {

k.config.BucketName = "hs_${namespace}_${workload_name}_kv"
k.config.JitProvision = true
k.config.MaxBytes = 524288

if len(config) > 0 {
err := json.Unmarshal(config, &k.config)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -170,12 +190,16 @@ func (k *KeyValueService) resolveKeyValueStore(namespace, workload string) (nats
return nil, err
}

kvStoreName := fmt.Sprintf("hs_%s_%s_kv", namespace, workload)
reWorkload := regexp.MustCompile(`(?i)\$\{workload_name\}`)
reNamespace := regexp.MustCompile(`(?i)\$\{namespace\}`)

kvStoreName := reWorkload.ReplaceAllString(k.config.BucketName, workload)
kvStoreName = reNamespace.ReplaceAllString(kvStoreName, namespace)

kvStore, err := js.KeyValue(kvStoreName)
if err != nil {
if errors.Is(err, nats.ErrBucketNotFound) {
// TODO: make this configurable after kubecon
kvStore, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: kvStoreName, MaxBytes: 524288})
if errors.Is(err, nats.ErrBucketNotFound) && k.config.JitProvision {
kvStore, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: kvStoreName, MaxBytes: int64(k.config.MaxBytes)})
if err != nil {
return nil, err
}
Expand Down
33 changes: 27 additions & 6 deletions host-services/builtins/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,25 @@ import (
agentapi "github.com/synadia-io/nex/internal/agent-api"
)

const messagingServiceMethodPublish = "publish"
const messagingServiceMethodRequest = "request"
const messagingServiceMethodRequestMany = "requestMany"
const (
messagingServiceMethodPublish = "publish"
messagingServiceMethodRequest = "request"
messagingServiceMethodRequestMany = "requestMany"

const messagingRequestTimeout = time.Millisecond * 750 // FIXME-- make timeout configurable per request?
defaultMessagingRequestTimeout = int64(time.Millisecond * 750)
defaultMessagingRequestManyTimeout = int64(time.Second * 3)
)

type MessagingService struct {
log *slog.Logger
nc *nats.Conn

config messagingConfig
}

type messagingConfig struct {
RequestTimeoutMs int64 `json:"request_timeout_ms"`
RequestManyTimeoutMs int64 `json:"request_many_timeout_ms"`
}

func NewMessagingService(nc *nats.Conn, log *slog.Logger) (*MessagingService, error) {
Expand All @@ -31,7 +41,18 @@ func NewMessagingService(nc *nats.Conn, log *slog.Logger) (*MessagingService, er
return messaging, nil
}

func (m *MessagingService) Initialize(_ map[string]string) error {
func (m *MessagingService) Initialize(config json.RawMessage) error {

m.config.RequestManyTimeoutMs = defaultMessagingRequestManyTimeout
m.config.RequestTimeoutMs = defaultMessagingRequestTimeout

if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -88,7 +109,7 @@ func (m *MessagingService) handleRequest(_, _ string,
return hostservices.ServiceResultFail(400, "subject is required"), nil
}

resp, err := m.nc.Request(subject, data, messagingRequestTimeout)
resp, err := m.nc.Request(subject, data, time.Duration(m.config.RequestTimeoutMs*int64(time.Millisecond)))
if err != nil {
m.log.Debug(fmt.Sprintf("failed to send %d-byte request on subject %s: %s", len(data), subject, err.Error()))
return hostservices.ServiceResultFail(500, "failed to send request"), nil
Expand Down
52 changes: 41 additions & 11 deletions host-services/builtins/objectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@ import (
"fmt"
"io"
"log/slog"
"regexp"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
hostservices "github.com/synadia-io/nex/host-services"
agentapi "github.com/synadia-io/nex/internal/agent-api"
)

const objectStoreServiceMethodGet = "get"
const objectStoreServiceMethodPut = "put"
const objectStoreServiceMethodDelete = "delete"
const objectStoreServiceMethodList = "list"
const (
objectStoreServiceMethodGet = "get"
objectStoreServiceMethodPut = "put"
objectStoreServiceMethodDelete = "delete"
objectStoreServiceMethodList = "list"

defaultMaxBytes = 524288
defaultBucketName = "hs_${namespace}_${workload_name}_obj"
)

type ObjectStoreService struct {
log *slog.Logger
nc *nats.Conn
log *slog.Logger
nc *nats.Conn
config objectStoreConfig
}

type objectStoreConfig struct {
BucketName string `json:"bucket_name"`
MaxBytes int `json:"max_bytes"`
JitProvision bool `json:"jit_provision"`
}

func NewObjectStoreService(nc *nats.Conn, log *slog.Logger) (*ObjectStoreService, error) {
Expand All @@ -34,7 +47,19 @@ func NewObjectStoreService(nc *nats.Conn, log *slog.Logger) (*ObjectStoreService
return objectStore, nil
}

func (o *ObjectStoreService) Initialize(_ map[string]string) error {
func (o *ObjectStoreService) Initialize(config json.RawMessage) error {

o.config.BucketName = defaultBucketName
o.config.JitProvision = true
o.config.MaxBytes = defaultMaxBytes

if len(config) > 0 {
err := json.Unmarshal(config, &o.config)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -181,18 +206,23 @@ func (o *ObjectStoreService) handleList(_, workload string,
return hostservices.ServiceResultPass(200, "", resp), nil
}

// resolve the object store for the given workload; initialize it if necessary
// resolve the object store for the given workload; initialize it if necessary & configured to do so
func (o *ObjectStoreService) resolveObjectStore(namespace, workload string) (nats.ObjectStore, error) {
js, err := o.nc.JetStream()
if err != nil {
return nil, err
}

objectStoreName := fmt.Sprintf("hs_%s_%s_os", namespace, workload)
reWorkload := regexp.MustCompile(`(?i)\$\{workload_name\}`)
reNamespace := regexp.MustCompile(`(?i)\$\{namespace\}`)

objectStoreName := reWorkload.ReplaceAllString(o.config.BucketName, workload)
objectStoreName = reNamespace.ReplaceAllString(objectStoreName, namespace)

objectStore, err := js.ObjectStore(objectStoreName)
if err != nil {
if errors.Is(err, nats.ErrStreamNotFound) {
objectStore, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: objectStoreName, MaxBytes: 524288}) // FIXME-- make configurable
if errors.Is(err, nats.ErrStreamNotFound) && o.config.JitProvision {
objectStore, err = js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: objectStoreName, MaxBytes: int64(o.config.MaxBytes)})
if err != nil {
return nil, err
}
Expand Down
24 changes: 17 additions & 7 deletions host-services/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hostservices

import (
"encoding/json"
"fmt"
"log/slog"
"strings"
Expand All @@ -9,21 +10,30 @@ import (
)

type HostServicesServer struct {
nc *nats.Conn
services map[string]HostService
ncInternal *nats.Conn
services map[string]HostService

log *slog.Logger
}

func NewHostServicesServer(nc *nats.Conn, log *slog.Logger) *HostServicesServer {
return &HostServicesServer{
nc: nc,
log: log,
services: make(map[string]HostService),
ncInternal: nc,
log: log,
services: make(map[string]HostService),
}
}

func (h *HostServicesServer) AddService(name string, svc HostService, config map[string]string) error {
func (h *HostServicesServer) Services() []string {
result := make([]string, 0)
for k := range h.services {
result = append(result, k)
}

return result
}

func (h *HostServicesServer) AddService(name string, svc HostService, config json.RawMessage) error {
err := svc.Initialize(config)
if err != nil {
return err
Expand All @@ -34,7 +44,7 @@ func (h *HostServicesServer) AddService(name string, svc HostService, config map
}

func (h *HostServicesServer) Start() error {
_, err := h.nc.Subscribe("agentint.*.rpc.*.*.*.*", h.handleRPC)
_, err := h.ncInternal.Subscribe("agentint.*.rpc.*.*.*.*", h.handleRPC)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion host-services/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hostservices

import (
"encoding/json"
"fmt"
)

Expand Down Expand Up @@ -41,7 +42,7 @@ func ServiceResultPass(code uint, message string, data []byte) ServiceResult {
}

type HostService interface {
Initialize(map[string]string) error
Initialize(json.RawMessage) error
HandleRequest(namespace string,
workloadId string,
method string,
Expand Down
Loading

0 comments on commit 947a70e

Please sign in to comment.