Skip to content

Commit

Permalink
Merge pull request #46 from rodneyosodo/PROP-42
Browse files Browse the repository at this point in the history
PROP - 42 - Add External Wasm Runtime
  • Loading branch information
drasko authored Jan 16, 2025
2 parents d5a929d + 3960ea8 commit 262524e
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 86 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ jobs:
version: v1.61.0
args: --config ./.golangci.yaml

- name: Install TinyGo
run: |
wget https://github.com/tinygo-org/tinygo/releases/download/v0.35.0/tinygo_0.35.0_amd64.deb
sudo dpkg -i tinygo_0.35.0_amd64.deb
- name: Build proxy
run: |
make all
make all -j $(nproc)
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ BUILD_DIR = build
TIME=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')
VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0')
COMMIT ?= $(shell git rev-parse HEAD)
EXAMPLES = addition long-addition
EXAMPLES = addition compute hello-world
SERVICES = manager proplet cli proxy

define compile_service
Expand All @@ -25,8 +25,8 @@ $(SERVICES):
install:
$(foreach f,$(wildcard $(BUILD_DIR)/*[!.wasm]),cp $(f) $(patsubst $(BUILD_DIR)/%,$(GOBIN)/propeller-%,$(f));)

.PHONY: all $(SERVICES)
all: $(SERVICES)
.PHONY: all $(SERVICES) $(EXAMPLES)
all: $(SERVICES) $(EXAMPLES)

clean:
rm -rf build
Expand Down
31 changes: 20 additions & 11 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/absmach/magistrala/pkg/server"
"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/proplet"
"github.com/absmach/propeller/proplet/runtimes"
"github.com/caarlos0/env/v11"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
Expand All @@ -19,15 +20,16 @@ import (
const svcName = "proplet"

type config struct {
LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"PROPLET_INSTANCE_ID"`
MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
LogLevel string `env:"PROPLET_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"PROPLET_INSTANCE_ID"`
MQTTAddress string `env:"PROPLET_MQTT_ADDRESS" envDefault:"tcp://localhost:1883"`
MQTTTimeout time.Duration `env:"PROPLET_MQTT_TIMEOUT" envDefault:"30s"`
MQTTQoS byte `env:"PROPLET_MQTT_QOS" envDefault:"2"`
LivelinessInterval time.Duration `env:"PROPLET_LIVELINESS_INTERVAL" envDefault:"10s"`
ChannelID string `env:"PROPLET_CHANNEL_ID,notEmpty"`
ThingID string `env:"PROPLET_THING_ID,notEmpty"`
ThingKey string `env:"PROPLET_THING_KEY,notEmpty"`
ExternalWasmRuntime string `env:"PROPLET_EXTERNAL_WASM_RUNTIME" envDefault:""`
}

func main() {
Expand Down Expand Up @@ -59,9 +61,16 @@ func main() {

return
}
wazero := proplet.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)

service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, wazero)
var runtime proplet.Runtime
switch cfg.ExternalWasmRuntime != "" {
case true:
runtime = runtimes.NewHostRuntime(logger, mqttPubSub, cfg.ChannelID, cfg.ExternalWasmRuntime)
default:
runtime = runtimes.NewWazeroRuntime(logger, mqttPubSub, cfg.ChannelID)
}

service, err := proplet.NewService(ctx, cfg.ChannelID, cfg.ThingID, cfg.ThingKey, cfg.LivelinessInterval, mqttPubSub, logger, runtime)
if err != nil {
logger.Error("failed to initialize service", slog.Any("error", err))

Expand Down
27 changes: 27 additions & 0 deletions examples/compute/compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import "math"

//export compute
func compute(n uint32) uint32 {
var result uint32

for i := range n {
for j := range n {
for k := range n {
for l := range n {
for m := range n {
// Do some meaningless but CPU-intensive math
result += uint32(math.Pow(float64(i*j*k*l*m), 2)) % 10
}
}
}
}
}

return result
}

// main is required for the `wasi` target, even if it isn't used.
// See https://wazero.io/languages/tinygo/#why-do-i-have-to-define-main
func main() {}
7 changes: 7 additions & 0 deletions examples/hello-world/hello-world.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "fmt"

func main() {
fmt.Println("Hello World!")
}
13 changes: 0 additions & 13 deletions examples/long-addition/long-addition.go

This file was deleted.

21 changes: 10 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
Expand All @@ -47,15 +48,13 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/sdk v1.33.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/grpc v1.69.0 // indirect
google.golang.org/protobuf v1.36.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/grpc v1.69.2 // indirect
google.golang.org/protobuf v1.36.2 // indirect
oras.land/oras-go/v2 v2.5.0
)

require github.com/opencontainers/go-digest v1.0.0 // indirect
34 changes: 18 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 h1:TmHmbvxPmaegwhDubVz0lICL0J5Ka2vwTzhoePEXsGE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0/go.mod h1:qztMSjm835F2bXf+5HKAPIS5qsmQDqZna/PgVt4rWtI=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ=
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8=
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand All @@ -54,6 +54,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
Expand Down Expand Up @@ -100,26 +102,26 @@ go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4Jjx
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg=
go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 h1:ChAdCYNQFDk5fYvFZMywKLIijG7TC2m1C2CMEu11G3o=
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484/go.mod h1:KRUmxRI4JmbpAm8gcZM4Jsffi859fo5LQjILwuqj9z8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 h1:Z7FRVJPSMaHQxD0uXU8WdgFh8PseLM8Q8NzhnpMrBhQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI=
google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24=
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
3 changes: 0 additions & 3 deletions manager/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ func updateTaskEndpoint(svc manager.Service) endpoint.Endpoint {
if !ok {
return taskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData)
}
if err := req.validate(); err != nil {
return taskResponse{}, errors.Join(apiutil.ErrValidation, err)
}

task, err := svc.UpdateTask(ctx, req.Task)
if err != nil {
Expand Down
19 changes: 5 additions & 14 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,28 +332,19 @@ func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]int
return errors.New("task id is empty")
}

results, ok := msg["results"].([]interface{})
if !ok {
return errors.New("invalid results")
}
data := make([]uint64, len(results))
for i := range results {
r, ok := results[i].(float64)
if !ok {
return errors.New("invalid result")
}
data[i] = uint64(r)
}

t, err := svc.GetTask(ctx, taskID)
if err != nil {
return err
}
t.Results = data
t.Results = msg["results"]
t.State = task.Completed
t.UpdatedAt = time.Now()
t.FinishTime = time.Now()

if errMsg, ok := msg["error"].(string); ok {
t.Error = errMsg
}

if err := svc.tasksDB.Update(ctx, t.ID, t); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions proplet/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type startRequest struct {
ID string
CLIArgs []string
FunctionName string
WasmFile []byte
imageURL string
Expand Down
12 changes: 12 additions & 0 deletions proplet/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package proplet

import (
"context"
)

var ResultsTopic = "channels/%s/messages/control/proplet/results"

type Runtime interface {
StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error
StopApp(ctx context.Context, id string) error
}
98 changes: 98 additions & 0 deletions proplet/runtimes/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package runtimes

import (
"bytes"
"context"
"fmt"
"log/slog"
"os"
"os/exec"
"path/filepath"
"strconv"

"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/proplet"
)

type hostRuntime struct {
pubsub mqtt.PubSub
channelID string
logger *slog.Logger
wasmRuntime string
}

func NewHostRuntime(logger *slog.Logger, pubsub mqtt.PubSub, channelID, wasmRuntime string) proplet.Runtime {
return &hostRuntime{
pubsub: pubsub,
channelID: channelID,
logger: logger,
wasmRuntime: wasmRuntime,
}
}

func (w *hostRuntime) StartApp(ctx context.Context, wasmBinary []byte, cliArgs []string, id, functionName string, args ...uint64) error {
currentDir, err := os.Getwd()
if err != nil {
return fmt.Errorf("error getting current directory: %w", err)
}
f, err := os.Create(filepath.Join(currentDir, id+".wasm"))
if err != nil {
return fmt.Errorf("error creating file: %w", err)
}

if _, err = f.Write(wasmBinary); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
if err := f.Close(); err != nil {
return fmt.Errorf("error closing file: %w", err)
}

cliArgs = append(cliArgs, filepath.Join(currentDir, id+".wasm"))
for i := range args {
cliArgs = append(cliArgs, strconv.FormatUint(args[i], 10))
}
cmd := exec.Command(w.wasmRuntime, cliArgs...)
results := bytes.Buffer{}
cmd.Stdout = &results

if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting command: %w", err)
}

go func(fileName string) {
var payload map[string]interface{}

if err := cmd.Wait(); err != nil {
w.logger.Error("failed to wait for command", slog.String("id", id), slog.String("error", err.Error()))
payload = map[string]interface{}{
"task_id": id,
"error": err.Error(),
"results": results.String(),
}
} else {
payload = map[string]interface{}{
"task_id": id,
"results": results.String(),
}
}

topic := fmt.Sprintf(proplet.ResultsTopic, w.channelID)
if err := w.pubsub.Publish(ctx, topic, payload); err != nil {
w.logger.Error("failed to publish results", slog.String("id", id), slog.String("error", err.Error()))

return
}

if err := os.Remove(fileName); err != nil {
w.logger.Error("failed to remove file", slog.String("fileName", fileName), slog.String("error", err.Error()))
}

w.logger.Info("Finished running app", slog.String("id", id))
}(filepath.Join(currentDir, id+".wasm"))

return nil
}

func (w *hostRuntime) StopApp(ctx context.Context, id string) error {
return nil
}
Loading

0 comments on commit 262524e

Please sign in to comment.