From 120864de5ee84ed0a01d71818c9f9a42948561cb Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Wed, 8 Jan 2025 13:55:54 +0300 Subject: [PATCH 1/9] feat(examples): add long process example and with no input Add two examples: - one example is a process that takes long to compute - the other one is a hello world example that prints out the response to stdout --- Makefile | 6 +++--- examples/compute/compute.go | 27 +++++++++++++++++++++++++ examples/hello-world/hello-world.go | 7 +++++++ examples/long-addition/long-addition.go | 13 ------------ 4 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 examples/compute/compute.go create mode 100644 examples/hello-world/hello-world.go delete mode 100644 examples/long-addition/long-addition.go diff --git a/Makefile b/Makefile index 0bd41d5..d34bb3c 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ BUILD_DIR = build TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0') COMMIT ?= $(shell git rev-parse HEAD) -EXAMPLES = addition long-addition +EXAMPLES = addition compute hello-world SERVICES = manager proplet cli proxy define compile_service @@ -25,8 +25,8 @@ $(SERVICES): install: $(foreach f,$(wildcard $(BUILD_DIR)/*[!.wasm]),cp $(f) $(patsubst $(BUILD_DIR)/%,$(GOBIN)/propeller-%,$(f));) -.PHONY: all $(SERVICES) -all: $(SERVICES) +.PHONY: all $(SERVICES) $(EXAMPLES) +all: $(SERVICES) $(EXAMPLES) clean: rm -rf build diff --git a/examples/compute/compute.go b/examples/compute/compute.go new file mode 100644 index 0000000..d8fd2af --- /dev/null +++ b/examples/compute/compute.go @@ -0,0 +1,27 @@ +package main + +import "math" + +//export compute +func compute(n uint32) uint32 { + var result uint32 + + for i := range n { + for j := range n { + for k := range n { + for l := range n { + for m := range n { + // Do some meaningless but CPU-intensive math + result += uint32(math.Pow(float64(i*j*k*l*m), 2)) % 10 + } + } + } + } + } + + return result +} + +// main is required for the `wasi` target, even if it isn't used. +// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main +func main() {} diff --git a/examples/hello-world/hello-world.go b/examples/hello-world/hello-world.go new file mode 100644 index 0000000..b1b14d0 --- /dev/null +++ b/examples/hello-world/hello-world.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Println("Hello World!") +} diff --git a/examples/long-addition/long-addition.go b/examples/long-addition/long-addition.go deleted file mode 100644 index cf9ff05..0000000 --- a/examples/long-addition/long-addition.go +++ /dev/null @@ -1,13 +0,0 @@ -package main - -import "time" - -//export add -func add(x, y uint32) uint32 { - time.Sleep(time.Minute) - return x + y -} - -// main is required for the `wasi` target, even if it isn't used. -// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main -func main() {} From d2abc2a98620b6740720d07a69aadcdfa90b79eb Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Wed, 8 Jan 2025 13:56:27 +0300 Subject: [PATCH 2/9] chore(gomod): update packages --- go.mod | 21 ++++++++++----------- go.sum | 34 ++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 37009f0..21dd315 100644 --- a/go.mod +++ b/go.mod @@ -32,12 +32,13 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.17.11 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.61.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect @@ -47,15 +48,13 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 // indirect go.opentelemetry.io/otel/metric v1.33.0 // indirect go.opentelemetry.io/otel/sdk v1.33.0 // indirect - go.opentelemetry.io/proto/otlp v1.4.0 // indirect - golang.org/x/net v0.32.0 // indirect - golang.org/x/sys v0.28.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect - google.golang.org/grpc v1.69.0 // indirect - google.golang.org/protobuf v1.36.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect + google.golang.org/grpc v1.69.2 // indirect + google.golang.org/protobuf v1.36.2 // indirect oras.land/oras-go/v2 v2.5.0 ) - -require github.com/opencontainers/go-digest v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3e47db0..a7c2ef0 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 h1:TmHmbvxPmaegwhDubVz0lICL0J5Ka2vwTzhoePEXsGE= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0/go.mod h1:qztMSjm835F2bXf+5HKAPIS5qsmQDqZna/PgVt4rWtI= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8= github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -54,6 +54,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= @@ -100,26 +102,26 @@ go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4Jjx go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= -go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg= -go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= -golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= -golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 h1:ChAdCYNQFDk5fYvFZMywKLIijG7TC2m1C2CMEu11G3o= -google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484/go.mod h1:KRUmxRI4JmbpAm8gcZM4Jsffi859fo5LQjILwuqj9z8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI= -google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24= +google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= +google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 562b80e8d4c62426d9855756912021d01bcf5866 Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Wed, 8 Jan 2025 16:58:53 +0300 Subject: [PATCH 3/9] feat: enable external wasm runtime Signed-off-by: Rodney Osodo --- cmd/proplet/main.go | 21 +++++----- manager/api/endpoint.go | 3 -- manager/service.go | 15 +------ proplet/requests.go | 1 + proplet/service.go | 5 ++- proplet/wasm.go | 88 +++++++++++++++++++++++++++++++++++------ task/task.go | 3 +- 7 files changed, 94 insertions(+), 42 deletions(-) diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index 2f1a09e..50a22c9 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -19,15 +19,16 @@ import ( const svcName = "proplet" type config struct { - LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"` - InstanceID string `env:"PROPLET_INSTANCE_ID"` - MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` - MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"` - MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"` - LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"` - ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` - ThingID string `env:"PROPLET_THING_ID,notEmpty"` - ThingKey string `env:"PROPLET_THING_KEY,notEmpty"` + LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"` + InstanceID string `env:"PROPLET_INSTANCE_ID"` + MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"` + MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"` + MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"` + LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"` + ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"` + ThingID string `env:"PROPLET_THING_ID,notEmpty"` + ThingKey string `env:"PROPLET_THING_KEY,notEmpty"` + ExternalWasmRuntime string `env:"PROPLET_EXTERNAL_WASM_RUNTIME" envDefault:""` } func main() { @@ -59,7 +60,7 @@ func main() { return } - wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) + wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime) service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero) if err != nil { diff --git a/manager/api/endpoint.go b/manager/api/endpoint.go index 6f469f5..d524b54 100644 --- a/manager/api/endpoint.go +++ b/manager/api/endpoint.go @@ -122,9 +122,6 @@ func updateTaskEndpoint(svc manager.Service) endpoint.Endpoint { if !ok { return taskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData) } - if err := req.validate(); err != nil { - return taskResponse{}, errors.Join(apiutil.ErrValidation, err) - } task, err := svc.UpdateTask(ctx, req.Task) if err != nil { diff --git a/manager/service.go b/manager/service.go index 658b0db..2276ca1 100644 --- a/manager/service.go +++ b/manager/service.go @@ -332,24 +332,11 @@ func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]int return errors.New("task id is empty") } - results, ok := msg["results"].([]interface{}) - if !ok { - return errors.New("invalid results") - } - data := make([]uint64, len(results)) - for i := range results { - r, ok := results[i].(float64) - if !ok { - return errors.New("invalid result") - } - data[i] = uint64(r) - } - t, err := svc.GetTask(ctx, taskID) if err != nil { return err } - t.Results = data + t.Results = msg["results"] t.State = task.Completed t.UpdatedAt = time.Now() t.FinishTime = time.Now() diff --git a/proplet/requests.go b/proplet/requests.go index 48a05b4..e3b2e35 100644 --- a/proplet/requests.go +++ b/proplet/requests.go @@ -6,6 +6,7 @@ import ( type startRequest struct { ID string + CLIArgs []string FunctionName string WasmFile []byte imageURL string diff --git a/proplet/service.go b/proplet/service.go index 13b3d7b..efddabd 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -139,6 +139,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri req := startRequest{ ID: payload.ID, + CLIArgs: payload.CLIArgs, FunctionName: payload.Name, WasmFile: payload.File, imageURL: payload.ImageURL, @@ -151,7 +152,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri p.logger.Info("Received start command", slog.String("app_name", req.FunctionName)) if req.WasmFile != nil { - if err := p.runtime.StartApp(ctx, req.WasmFile, req.ID, req.FunctionName, req.Params...); err != nil { + if err := p.runtime.StartApp(ctx, req.WasmFile, req.CLIArgs, req.ID, req.FunctionName, req.Params...); err != nil { return err } @@ -178,7 +179,7 @@ func (p *PropletService) handleStartCommand(ctx context.Context) func(topic stri if exists && receivedChunks == metadata.TotalChunks { p.logger.Info("All chunks received, deploying app", slog.String("app_name", req.imageURL)) wasmBinary := assembleChunks(p.chunks[req.imageURL]) - if err := p.runtime.StartApp(ctx, wasmBinary, req.ID, req.FunctionName, req.Params...); err != nil { + if err := p.runtime.StartApp(ctx, wasmBinary, req.CLIArgs, req.ID, req.FunctionName, req.Params...); err != nil { p.logger.Error("Failed to start app", slog.String("app_name", req.imageURL), slog.Any("error", err)) } diff --git a/proplet/wasm.go b/proplet/wasm.go index 1b9abc3..b074baf 100644 --- a/proplet/wasm.go +++ b/proplet/wasm.go @@ -1,10 +1,14 @@ package proplet import ( + "bytes" "context" "errors" "fmt" "log/slog" + "os" + "os/exec" + "strconv" "sync" "github.com/absmach/propeller/pkg/mqtt" @@ -15,28 +19,34 @@ import ( var resultsTopic = "channels/%s/messages/control/proplet/results" type Runtime interface { - StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error + StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error StopApp(ctx context.Context, id string) error } type wazeroRuntime struct { - mutex sync.Mutex - runtimes map[string]wazero.Runtime - pubsub mqtt.PubSub - channelID string - logger *slog.Logger + mutex sync.Mutex + runtimes map[string]wazero.Runtime + pubsub mqtt.PubSub + channelID string + logger *slog.Logger + hostWasmRuntime string } -func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) Runtime { +func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID, hostWasmRuntime string) Runtime { return &wazeroRuntime{ - runtimes: make(map[string]wazero.Runtime), - pubsub: pubsub, - channelID: channelID, - logger: logger, + runtimes: make(map[string]wazero.Runtime), + pubsub: pubsub, + channelID: channelID, + logger: logger, + hostWasmRuntime: hostWasmRuntime, } } -func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, id, functionName string, args ...uint64) error { +func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error { + if w.hostWasmRuntime != "" { + return w.runOnHostRuntime(ctx, wasmBinary, cliArgs, id, args...) + } + r := wazero.NewRuntime(ctx) w.mutex.Lock() @@ -104,3 +114,57 @@ func (w *wazeroRuntime) StopApp(ctx context.Context, id string) error { return nil } + +func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, cliArgs []string, id string, args ...uint64) error { + currentDir, err := os.Getwd() + if err != nil { + return fmt.Errorf("error getting current directory: %w", err) + } + f, err := os.Create(fmt.Sprintf("%s/%s.wasm", currentDir, id)) + if err != nil { + return fmt.Errorf("error creating file: %w", err) + } + + if _, err = f.Write(wasmBinary); err != nil { + return fmt.Errorf("error writing to file: %w", err) + } + f.Close() + + cliArgs = append(cliArgs, fmt.Sprintf("%s/%s.wasm", currentDir, id)) + for i := range args { + cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10)) + } + cmd := exec.Command(w.hostWasmRuntime, cliArgs...) + results := bytes.Buffer{} + cmd.Stdout = &results + + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting command: %w", err) + } + + go func(fileName string) { + if err := cmd.Wait(); err != nil { + w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error())) + } + + payload := map[string]interface{}{ + "task_id": id, + "results": results.String(), + } + + topic := fmt.Sprintf(resultsTopic, w.channelID) + if err := w.pubsub.Publish(ctx, topic, payload); err != nil { + w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) + + return + } + + if err := os.Remove(fileName); err != nil { + w.logger.Error("failed to remove file", slog.String("fileName", fileName), slog.String("error", err.Error())) + } + + w.logger.Info("Finished running app", slog.String("id", id)) + }(fmt.Sprintf("%s/%s.wasm", currentDir, id)) + + return nil +} diff --git a/task/task.go b/task/task.go index fbac322..aaec420 100644 --- a/task/task.go +++ b/task/task.go @@ -35,8 +35,9 @@ type Task struct { State State `json:"state"` ImageURL string `json:"image_url,omitempty"` File []byte `json:"file,omitempty"` + CLIArgs []string `json:"cli_args"` Inputs []uint64 `json:"inputs,omitempty"` - Results []uint64 `json:"results,omitempty"` + Results any `json:"results,omitempty"` StartTime time.Time `json:"start_time"` FinishTime time.Time `json:"finish_time"` CreatedAt time.Time `json:"created_at"` From 30a493a1f200b40cb11fd1bbabed384c4717980c Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Wed, 8 Jan 2025 22:54:52 +0300 Subject: [PATCH 4/9] fix: use filepath rather than concatenation Signed-off-by: Rodney Osodo --- proplet/wasm.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/proplet/wasm.go b/proplet/wasm.go index b074baf..471ed87 100644 --- a/proplet/wasm.go +++ b/proplet/wasm.go @@ -8,6 +8,7 @@ import ( "log/slog" "os" "os/exec" + "path/filepath" "strconv" "sync" @@ -120,7 +121,7 @@ func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, if err != nil { return fmt.Errorf("error getting current directory: %w", err) } - f, err := os.Create(fmt.Sprintf("%s/%s.wasm", currentDir, id)) + f, err := os.Create(filepath.Join(currentDir, id+".wasm")) if err != nil { return fmt.Errorf("error creating file: %w", err) } @@ -128,9 +129,11 @@ func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, if _, err = f.Write(wasmBinary); err != nil { return fmt.Errorf("error writing to file: %w", err) } - f.Close() + if err := f.Close(); err != nil { + return fmt.Errorf("error closing file: %w", err) + } - cliArgs = append(cliArgs, fmt.Sprintf("%s/%s.wasm", currentDir, id)) + cliArgs = append(cliArgs, filepath.Join(currentDir, id+".wasm")) for i := range args { cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10)) } @@ -164,7 +167,7 @@ func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, } w.logger.Info("Finished running app", slog.String("id", id)) - }(fmt.Sprintf("%s/%s.wasm", currentDir, id)) + }(filepath.Join(currentDir, id+".wasm")) return nil } From 593e3b4ca8cb78666236a1fe95e66b93c2cf579a Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Wed, 8 Jan 2025 23:03:46 +0300 Subject: [PATCH 5/9] fix(task): add error to task if the task fails Signed-off-by: Rodney Osodo --- manager/service.go | 4 ++++ proplet/wasm.go | 17 ++++++++++++----- task/task.go | 1 + 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/manager/service.go b/manager/service.go index 2276ca1..36a5743 100644 --- a/manager/service.go +++ b/manager/service.go @@ -341,6 +341,10 @@ func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]int t.UpdatedAt = time.Now() t.FinishTime = time.Now() + if errMsg, ok := msg["error"].(string); ok { + t.Error = errMsg + } + if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil { return err } diff --git a/proplet/wasm.go b/proplet/wasm.go index 471ed87..12717a9 100644 --- a/proplet/wasm.go +++ b/proplet/wasm.go @@ -146,13 +146,20 @@ func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, } go func(fileName string) { + var payload map[string]interface{} + if err := cmd.Wait(); err != nil { w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error())) - } - - payload := map[string]interface{}{ - "task_id": id, - "results": results.String(), + payload = map[string]interface{}{ + "task_id": id, + "error": err.Error(), + "results": results.String(), + } + } else { + payload = map[string]interface{}{ + "task_id": id, + "results": results.String(), + } } topic := fmt.Sprintf(resultsTopic, w.channelID) diff --git a/task/task.go b/task/task.go index aaec420..c834b4f 100644 --- a/task/task.go +++ b/task/task.go @@ -38,6 +38,7 @@ type Task struct { CLIArgs []string `json:"cli_args"` Inputs []uint64 `json:"inputs,omitempty"` Results any `json:"results,omitempty"` + Error string `json:"error,omitempty"` StartTime time.Time `json:"start_time"` FinishTime time.Time `json:"finish_time"` CreatedAt time.Time `json:"created_at"` From 5dca464c4a524e464e8e60765a2f923bad4da7bf Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Wed, 8 Jan 2025 23:17:39 +0300 Subject: [PATCH 6/9] chore(ci): add tinygo installtion Signed-off-by: Rodney Osodo --- .github/workflows/ci.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ce0bc0..b903b2e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,11 @@ jobs: version: v1.61.0 args: --config ./.golangci.yaml + - name: Install TinyGo + run: | + wget https://github.com/tinygo-org/tinygo/releases/download/v0.35.0/tinygo_0.35.0_amd64.deb + sudo dpkg -i tinygo_0.35.0_amd64.deb + - name: Build proxy run: | - make all + make all -j $(nproc) From 0206fe439f87ae032d623f622996f281e2936773 Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Mon, 13 Jan 2025 14:00:15 +0300 Subject: [PATCH 7/9] fix: seperate host and wazero runtime Signed-off-by: Rodney Osodo --- cmd/proplet/main.go | 9 +- proplet/runtime.go | 12 +++ proplet/runtimes/host.go | 98 ++++++++++++++++++++ proplet/runtimes/wazero.go | 100 +++++++++++++++++++++ proplet/wasm.go | 180 ------------------------------------- 5 files changed, 217 insertions(+), 182 deletions(-) create mode 100644 proplet/runtime.go create mode 100644 proplet/runtimes/host.go create mode 100644 proplet/runtimes/wazero.go delete mode 100644 proplet/wasm.go diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index 50a22c9..c54d26a 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -11,6 +11,7 @@ import ( "github.com/absmach/magistrala/pkg/server" "github.com/absmach/propeller/pkg/mqtt" "github.com/absmach/propeller/proplet" + "github.com/absmach/propeller/proplet/runtimes" "github.com/caarlos0/env/v11" "github.com/google/uuid" "golang.org/x/sync/errgroup" @@ -60,9 +61,13 @@ func main() { return } - wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime) - service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero) + runtime := runtimes.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) + if cfg.ExternalWasmRuntime != "" { + runtime = runtimes.NewHostRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime) + } + + service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, runtime) if err != nil { logger.Error("failed to initialize service", slog.Any("error", err)) diff --git a/proplet/runtime.go b/proplet/runtime.go new file mode 100644 index 0000000..8a11ddb --- /dev/null +++ b/proplet/runtime.go @@ -0,0 +1,12 @@ +package proplet + +import ( + "context" +) + +var ResultsTopic = "channels/%s/messages/control/proplet/results" + +type Runtime interface { + StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error + StopApp(ctx context.Context, id string) error +} diff --git a/proplet/runtimes/host.go b/proplet/runtimes/host.go new file mode 100644 index 0000000..ab2bc2a --- /dev/null +++ b/proplet/runtimes/host.go @@ -0,0 +1,98 @@ +package runtimes + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "strconv" + + "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/proplet" +) + +type hostRuntime struct { + pubsub mqtt.PubSub + channelID string + logger *slog.Logger + wasmRuntime string +} + +func NewHostRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID, wasmRuntime string) proplet.Runtime { + return &hostRuntime{ + pubsub: pubsub, + channelID: channelID, + logger: logger, + wasmRuntime: wasmRuntime, + } +} + +func (w *hostRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error { + currentDir, err := os.Getwd() + if err != nil { + return fmt.Errorf("error getting current directory: %w", err) + } + f, err := os.Create(filepath.Join(currentDir, id+".wasm")) + if err != nil { + return fmt.Errorf("error creating file: %w", err) + } + + if _, err = f.Write(wasmBinary); err != nil { + return fmt.Errorf("error writing to file: %w", err) + } + if err := f.Close(); err != nil { + return fmt.Errorf("error closing file: %w", err) + } + + cliArgs = append(cliArgs, filepath.Join(currentDir, id+".wasm")) + for i := range args { + cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10)) + } + cmd := exec.Command(w.wasmRuntime, cliArgs...) + results := bytes.Buffer{} + cmd.Stdout = &results + + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting command: %w", err) + } + + go func(fileName string) { + var payload map[string]interface{} + + if err := cmd.Wait(); err != nil { + w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error())) + payload = map[string]interface{}{ + "task_id": id, + "error": err.Error(), + "results": results.String(), + } + } else { + payload = map[string]interface{}{ + "task_id": id, + "results": results.String(), + } + } + + topic := fmt.Sprintf(proplet.ResultsTopic, w.channelID) + if err := w.pubsub.Publish(ctx, topic, payload); err != nil { + w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) + + return + } + + if err := os.Remove(fileName); err != nil { + w.logger.Error("failed to remove file", slog.String("fileName", fileName), slog.String("error", err.Error())) + } + + w.logger.Info("Finished running app", slog.String("id", id)) + }(filepath.Join(currentDir, id+".wasm")) + + return nil +} + +func (w *hostRuntime) StopApp(ctx context.Context, id string) error { + return nil +} diff --git a/proplet/runtimes/wazero.go b/proplet/runtimes/wazero.go new file mode 100644 index 0000000..fe1f1d9 --- /dev/null +++ b/proplet/runtimes/wazero.go @@ -0,0 +1,100 @@ +package runtimes + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + + "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/proplet" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +type wazeroRuntime struct { + mutex sync.Mutex + runtimes map[string]wazero.Runtime + pubsub mqtt.PubSub + channelID string + logger *slog.Logger +} + +func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID string) proplet.Runtime { + return &wazeroRuntime{ + runtimes: make(map[string]wazero.Runtime), + pubsub: pubsub, + channelID: channelID, + logger: logger, + } +} + +func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error { + r := wazero.NewRuntime(ctx) + + w.mutex.Lock() + w.runtimes[id] = r + w.mutex.Unlock() + + // Instantiate WASI, which implements host functions needed for TinyGo to + // implement `panic`. + wasi_snapshot_preview1.MustInstantiate(ctx, r) + + module, err := r.Instantiate(ctx, wasmBinary) + if err != nil { + return errors.Join(errors.New("failed to instantiate Wasm module"), err) + } + + function := module.ExportedFunction(functionName) + if function == nil { + return errors.New("failed to find exported function") + } + + go func() { + results, err := function.Call(ctx, args...) + if err != nil { + w.logger.Error("failed to call function", slog.String("id", id), slog.String("function", functionName), slog.String("error", err.Error())) + + return + } + + if err := w.StopApp(ctx, id); err != nil { + w.logger.Error("failed to stop app", slog.String("id", id), slog.String("error", err.Error())) + } + + payload := map[string]interface{}{ + "task_id": id, + "results": results, + } + + topic := fmt.Sprintf(proplet.ResultsTopic, w.channelID) + if err := w.pubsub.Publish(ctx, topic, payload); err != nil { + w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) + + return + } + + w.logger.Info("Finished running app", slog.String("id", id)) + }() + + return nil +} + +func (w *wazeroRuntime) StopApp(ctx context.Context, id string) error { + w.mutex.Lock() + defer w.mutex.Unlock() + + r, exists := w.runtimes[id] + if !exists { + return errors.New("there is no runtime for this id") + } + + if err := r.Close(ctx); err != nil { + return err + } + + delete(w.runtimes, id) + + return nil +} diff --git a/proplet/wasm.go b/proplet/wasm.go deleted file mode 100644 index 12717a9..0000000 --- a/proplet/wasm.go +++ /dev/null @@ -1,180 +0,0 @@ -package proplet - -import ( - "bytes" - "context" - "errors" - "fmt" - "log/slog" - "os" - "os/exec" - "path/filepath" - "strconv" - "sync" - - "github.com/absmach/propeller/pkg/mqtt" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" -) - -var resultsTopic = "channels/%s/messages/control/proplet/results" - -type Runtime interface { - StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error - StopApp(ctx context.Context, id string) error -} - -type wazeroRuntime struct { - mutex sync.Mutex - runtimes map[string]wazero.Runtime - pubsub mqtt.PubSub - channelID string - logger *slog.Logger - hostWasmRuntime string -} - -func NewWazeroRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID, hostWasmRuntime string) Runtime { - return &wazeroRuntime{ - runtimes: make(map[string]wazero.Runtime), - pubsub: pubsub, - channelID: channelID, - logger: logger, - hostWasmRuntime: hostWasmRuntime, - } -} - -func (w *wazeroRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error { - if w.hostWasmRuntime != "" { - return w.runOnHostRuntime(ctx, wasmBinary, cliArgs, id, args...) - } - - r := wazero.NewRuntime(ctx) - - w.mutex.Lock() - w.runtimes[id] = r - w.mutex.Unlock() - - // Instantiate WASI, which implements host functions needed for TinyGo to - // implement `panic`. - wasi_snapshot_preview1.MustInstantiate(ctx, r) - - module, err := r.Instantiate(ctx, wasmBinary) - if err != nil { - return errors.Join(errors.New("failed to instantiate Wasm module"), err) - } - - function := module.ExportedFunction(functionName) - if function == nil { - return errors.New("failed to find exported function") - } - - go func() { - results, err := function.Call(ctx, args...) - if err != nil { - w.logger.Error("failed to call function", slog.String("id", id), slog.String("function", functionName), slog.String("error", err.Error())) - - return - } - - if err := w.StopApp(ctx, id); err != nil { - w.logger.Error("failed to stop app", slog.String("id", id), slog.String("error", err.Error())) - } - - payload := map[string]interface{}{ - "task_id": id, - "results": results, - } - - topic := fmt.Sprintf(resultsTopic, w.channelID) - if err := w.pubsub.Publish(ctx, topic, payload); err != nil { - w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) - - return - } - - w.logger.Info("Finished running app", slog.String("id", id)) - }() - - return nil -} - -func (w *wazeroRuntime) StopApp(ctx context.Context, id string) error { - w.mutex.Lock() - defer w.mutex.Unlock() - - r, exists := w.runtimes[id] - if !exists { - return errors.New("there is no runtime for this id") - } - - if err := r.Close(ctx); err != nil { - return err - } - - delete(w.runtimes, id) - - return nil -} - -func (w *wazeroRuntime) runOnHostRuntime(ctx context.Context, wasmBinary []byte, cliArgs []string, id string, args ...uint64) error { - currentDir, err := os.Getwd() - if err != nil { - return fmt.Errorf("error getting current directory: %w", err) - } - f, err := os.Create(filepath.Join(currentDir, id+".wasm")) - if err != nil { - return fmt.Errorf("error creating file: %w", err) - } - - if _, err = f.Write(wasmBinary); err != nil { - return fmt.Errorf("error writing to file: %w", err) - } - if err := f.Close(); err != nil { - return fmt.Errorf("error closing file: %w", err) - } - - cliArgs = append(cliArgs, filepath.Join(currentDir, id+".wasm")) - for i := range args { - cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10)) - } - cmd := exec.Command(w.hostWasmRuntime, cliArgs...) - results := bytes.Buffer{} - cmd.Stdout = &results - - if err := cmd.Start(); err != nil { - return fmt.Errorf("error starting command: %w", err) - } - - go func(fileName string) { - var payload map[string]interface{} - - if err := cmd.Wait(); err != nil { - w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error())) - payload = map[string]interface{}{ - "task_id": id, - "error": err.Error(), - "results": results.String(), - } - } else { - payload = map[string]interface{}{ - "task_id": id, - "results": results.String(), - } - } - - topic := fmt.Sprintf(resultsTopic, w.channelID) - if err := w.pubsub.Publish(ctx, topic, payload); err != nil { - w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error())) - - return - } - - if err := os.Remove(fileName); err != nil { - w.logger.Error("failed to remove file", slog.String("fileName", fileName), slog.String("error", err.Error())) - } - - w.logger.Info("Finished running app", slog.String("id", id)) - }(filepath.Join(currentDir, id+".wasm")) - - return nil -} From e88034361ec54a3b55fe829e6f5617027906881c Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Mon, 13 Jan 2025 19:33:03 +0300 Subject: [PATCH 8/9] fix: use switch case for assigning runtime based on external runtime Signed-off-by: Rodney Osodo --- cmd/proplet/main.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index c54d26a..32c4875 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -62,9 +62,12 @@ func main() { return } - runtime := runtimes.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) - if cfg.ExternalWasmRuntime != "" { + var runtime proplet.Runtime + switch cfg.ExternalWasmRuntime != "" { + case true: runtime = runtimes.NewHostRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime) + case false: + runtime = runtimes.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) } service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, runtime) From 3960ea8472e178373f62a76de037dc25ff0ec6b2 Mon Sep 17 00:00:00 2001 From: Rodney Osodo Date: Mon, 13 Jan 2025 21:07:45 +0300 Subject: [PATCH 9/9] fix: use default for catch all Signed-off-by: Rodney Osodo --- cmd/proplet/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/proplet/main.go b/cmd/proplet/main.go index 32c4875..33be8cc 100644 --- a/cmd/proplet/main.go +++ b/cmd/proplet/main.go @@ -66,7 +66,7 @@ func main() { switch cfg.ExternalWasmRuntime != "" { case true: runtime = runtimes.NewHostRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime) - case false: + default: runtime = runtimes.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID) }