Skip to content

Commit

Permalink
Merge pull request #28 from JeffMboya/MG-26
Browse files Browse the repository at this point in the history
PROP-26 - Implement Proplet
  • Loading branch information
drasko authored Dec 11, 2024
2 parents d5984da + 4d0f3c0 commit 794cf64
Show file tree
Hide file tree
Showing 12 changed files with 976 additions and 39 deletions.
19 changes: 16 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ build:

install:
cp ${BUILD_DIR}/propellerd $(GOBIN)/propellerd

all: build

clean:
Expand All @@ -30,8 +30,21 @@ clean:
lint:
golangci-lint run --config .golangci.yaml

start-supermq:
start-magistrala:
docker compose -f docker/compose.yaml up -d

stop-supermq:
stop-magistrala:
docker compose -f docker/compose.yaml down

help:
@echo "Usage: make <target>"
@echo ""
@echo "Targets:"
@echo " build: build the binary"
@echo " install: install the binary"
@echo " all: build the binary"
@echo " clean: clean the build directory"
@echo " lint: run golangci-lint"
@echo " start-magistrala: start the magistrala docker compose"
@echo " stop-magistrala: stop the magistrala docker compose"
@echo " help: display this help message"
159 changes: 159 additions & 0 deletions cmd/proplet/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/absmach/propeller/proplet"
)

const registryTimeout = 30 * time.Second

var (
wasmFilePath string
wasmBinary []byte
logLevel slog.Level
)

func main() {
if err := run(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

func run() error {
flag.StringVar(&wasmFilePath, "file", "", "Path to the WASM file")
flag.Parse()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger := configureLogger("info")
slog.SetDefault(logger)

logger.Info("Starting Proplet service")

go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
sig := <-sigChan
logger.Info("Received shutdown signal", slog.String("signal", sig.String()))
cancel()
}()

hasWASMFile := wasmFilePath != ""

cfg, err := proplet.LoadConfig("proplet/config.json", hasWASMFile)
if err != nil {
logger.Error("Failed to load configuration", slog.String("path", "proplet/config.json"), slog.Any("error", err))

return fmt.Errorf("failed to load configuration: %w", err)
}

if cfg.RegistryURL != "" {
if err := checkRegistryConnectivity(cfg.RegistryURL, logger); err != nil {
logger.Error("Failed connectivity check for Registry URL", slog.String("url", cfg.RegistryURL), slog.Any("error", err))

return fmt.Errorf("registry connectivity check failed: %w", err)
}
logger.Info("Registry connectivity verified", slog.String("url", cfg.RegistryURL))
}

if hasWASMFile {
wasmBinary, err = loadWASMFile(wasmFilePath, logger)
if err != nil {
logger.Error("Failed to load WASM file", slog.String("wasm_file_path", wasmFilePath), slog.Any("error", err))

return fmt.Errorf("failed to load WASM file: %w", err)
}
logger.Info("WASM binary loaded at startup", slog.Int("size_bytes", len(wasmBinary)))
}

if cfg.RegistryURL == "" && wasmBinary == nil {
logger.Error("Neither a registry URL nor a WASM binary file was provided")

return errors.New("missing registry URL and WASM binary file")
}

service, err := proplet.NewService(ctx, cfg, wasmBinary, logger)
if err != nil {
logger.Error("Error initializing service", slog.Any("error", err))

return fmt.Errorf("service initialization error: %w", err)
}

if err := service.Run(ctx, logger); err != nil {
logger.Error("Error running service", slog.Any("error", err))

return fmt.Errorf("service run error: %w", err)
}

return nil
}

func configureLogger(level string) *slog.Logger {
if err := logLevel.UnmarshalText([]byte(level)); err != nil {
log.Printf("Invalid log level: %s. Defaulting to info.\n", level)
logLevel = slog.LevelInfo
}

logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: logLevel,
})

return slog.New(logHandler)
}

func loadWASMFile(path string, logger *slog.Logger) ([]byte, error) {
logger.Info("Loading WASM file", slog.String("path", path))
wasmBytes, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read WASM file: %w", err)
}

return wasmBytes, nil
}

func checkRegistryConnectivity(registryURL string, logger *slog.Logger) error {
ctx, cancel := context.WithTimeout(context.Background(), registryTimeout)
defer cancel()

client := http.Client{}

logger.Info("Checking registry connectivity", slog.String("url", registryURL))

req, err := http.NewRequestWithContext(ctx, http.MethodGet, registryURL, http.NoBody)
if err != nil {
logger.Error("Failed to create HTTP request", slog.String("url", registryURL), slog.Any("error", err))

return fmt.Errorf("failed to create HTTP request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
logger.Error("Failed to connect to registry", slog.String("url", registryURL), slog.Any("error", err))

return fmt.Errorf("failed to connect to registry URL '%s': %w", registryURL, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
logger.Error("Registry returned unexpected status", slog.String("url", registryURL), slog.Int("status_code", resp.StatusCode))

return fmt.Errorf("registry URL '%s' returned status: %s", registryURL, resp.Status)
}

logger.Info("Registry connectivity verified", slog.String("url", registryURL))

return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ require (
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
)

require github.com/gorilla/websocket v1.5.3 // indirect
36 changes: 32 additions & 4 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,36 @@ package errors
import "errors"

var (
ErrNotFound = errors.New("not found")
ErrEmptyKey = errors.New("empty key")
ErrInvalidData = errors.New("invalid data type")
ErrEntityExists = errors.New("entity already exists")
ErrNotFound = errors.New("not found")
ErrEmptyKey = errors.New("empty key")
ErrInvalidData = errors.New("invalid data type")
ErrEntityExists = errors.New("entity already exists")
ErrMissingValue = errors.New("missing required value")
ErrInvalidValue = errors.New("invalid value provided")
ErrTimeout = errors.New("operation timed out")
ErrInternal = errors.New("internal server error")
ErrMQTTConnectionFailed = errors.New("failed to connect to MQTT broker")
ErrMQTTInvalidBrokerURL = errors.New("invalid MQTT broker URL")
ErrMQTTWillPayloadFailed = errors.New("failed to set MQTT last will payload")
ErrInvalidMethod = errors.New("invalid RPC method")
ErrInvalidParams = errors.New("invalid RPC parameters")
ErrMissingAppName = errors.New("app name is missing")
ErrInvalidStatus = errors.New("invalid response status")
ErrMissingResult = errors.New("missing result in RPC response")
ErrAppAlreadyRunning = errors.New("application is already running")
ErrAppNotRunning = errors.New("application is not running")
ErrFunctionNotFound = errors.New("specified function not found in Wasm module")
ErrModuleInstantiation = errors.New("failed to instantiate Wasm module")
ErrModuleStopFailed = errors.New("failed to stop Wasm module")
ErrChunkValidationFailed = errors.New("chunk validation failed")
ErrArtifactRequestFailed = errors.New("failed to request artifact")
ErrAppStartFailed = errors.New("failed to start application")
ErrAppStopFailed = errors.New("failed to stop application")
ErrChunksTimeout = errors.New("timeout waiting for Wasm chunks")
ErrReassemblyFailed = errors.New("failed to reassemble Wasm binary from chunks")
ErrNilMQTTClient = errors.New("MQTT client is nil")
ErrConfigValidation = errors.New("configuration validation failed")
ErrPublishDiscovery = errors.New("failed to publish discovery message")
ErrMissingChannelID = errors.New("ChannelID is missing in the configuration")
ErrMissingPropletID = errors.New("PropletID is missing in the configuration")
)
47 changes: 16 additions & 31 deletions pkg/mqtt/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ var (
)

type pubsub struct {
address string
qos byte
id string
username string
password string
timeout time.Duration
logger *slog.Logger
client mqtt.Client
qos byte
timeout time.Duration
logger *slog.Logger
}

type Handler func(topic string, msg map[string]interface{}) error
Expand All @@ -43,14 +40,16 @@ func NewPubSub(url string, qos byte, id, username, password string, timeout time
return nil, errEmptyID
}

client, err := newClient(url, id, username, password, timeout)
if err != nil {
return nil, err
}

return &pubsub{
address: url,
qos: qos,
id: id,
username: username,
password: password,
timeout: timeout,
logger: logger,
client: client,
qos: qos,
timeout: timeout,
logger: logger,
}, nil
}

Expand All @@ -59,16 +58,12 @@ func (ps *pubsub) Publish(ctx context.Context, topic string, msg any) error {
return errEmptyTopic
}

client, err := newClient(ps.address, ps.id, ps.username, ps.password, ps.timeout)
if err != nil {
return err
}
data, err := json.Marshal(msg)
if err != nil {
return err
}

token := client.Publish(topic, ps.qos, false, data)
token := ps.client.Publish(topic, ps.qos, false, data)
if token.Error() != nil {
return token.Error()
}
Expand All @@ -85,12 +80,7 @@ func (ps *pubsub) Subscribe(ctx context.Context, topic string, handler Handler)
return errEmptyTopic
}

client, err := newClient(ps.address, ps.id, ps.username, ps.password, ps.timeout)
if err != nil {
return err
}

token := client.Subscribe(topic, ps.qos, ps.mqttHandler(handler))
token := ps.client.Subscribe(topic, ps.qos, ps.mqttHandler(handler))
if token.Error() != nil {
return token.Error()
}
Expand All @@ -106,12 +96,7 @@ func (ps *pubsub) Unsubscribe(ctx context.Context, topic string) error {
return errEmptyTopic
}

client, err := newClient(ps.address, ps.id, ps.username, ps.password, ps.timeout)
if err != nil {
return err
}

token := client.Unsubscribe(topic)
token := ps.client.Unsubscribe(topic)
if token.Error() != nil {
return token.Error()
}
Expand Down
3 changes: 2 additions & 1 deletion propellerd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package propellerd

import (
"context"
"log/slog"
"time"

"github.com/absmach/magistrala/pkg/server"
Expand Down Expand Up @@ -40,7 +41,7 @@ var managerCmd = []cobra.Command{
}
ctx, cancel := context.WithCancel(cmd.Context())
if err := manager.StartManager(ctx, cancel, cfg); err != nil {
cmd.PrintErrf("failed to start manager: %s", err.Error())
slog.Error("failed to start manager", slog.String("error", err.Error()))
}
cancel()
},
Expand Down
Loading

0 comments on commit 794cf64

Please sign in to comment.