diff --git a/pkg/mqtt/pubsub.go b/pkg/mqtt/pubsub.go index 4b3cdee..8f325fa 100644 --- a/pkg/mqtt/pubsub.go +++ b/pkg/mqtt/pubsub.go @@ -33,6 +33,7 @@ type PubSub interface { Publish(ctx context.Context, topic string, msg any) error Subscribe(ctx context.Context, topic string, handler Handler) error Unsubscribe(ctx context.Context, topic string) error + Close() error } func NewPubSub(url string, qos byte, id, username, password string, timeout time.Duration, logger *slog.Logger) (PubSub, error) { diff --git a/proplet/mqtt.go b/proplet/mqtt.go index a4d4012..394415d 100644 --- a/proplet/mqtt.go +++ b/proplet/mqtt.go @@ -1,131 +1,158 @@ package proplet import ( - "encoding/json" + "context" "fmt" "log/slog" "time" - pkgerrors "github.com/absmach/propeller/pkg/errors" - mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/absmach/propeller/pkg/mqtt" ) -const livelinessInterval = 10 * time.Second +const ( + livelinessInterval = 10 * time.Second + mqttTimeout = 30 * time.Second + qos = 0 +) var ( RegistryFailurePayload = `{"status":"failure","error":"%v"}` RegistrySuccessPayload = `{"status":"success"}` - RegistryAckTopicTemplate = "channels/%s/messages/control/manager/registry" - lwtPayloadTemplate = `{"status":"online","proplet_id":"%s","chan_id":"%s"}` - discoveryPayloadTemplate = `{"proplet_id":"%s","chan_id":"%s"}` - alivePayloadTemplate = `{"status":"alive","proplet_id":"%s","chan_id":"%s"}` - aliveTopicTemplate = "channels/%s/messages/control/proplet/alive" - discoveryTopicTemplate = "channels/%s/messages/control/proplet/create" - startTopicTemplate = "channels/%s/messages/control/manager/start" - stopTopicTemplate = "channels/%s/messages/control/manager/stop" - registryUpdateTopicTemplate = "channels/%s/messages/control/manager/updateRegistry" - registryResponseTopic = "channels/%s/messages/registry/server" - fetchRequestTopicTemplate = "channels/%s/messages/registry/proplet" + RegistryUpdateRequestTopic = "channels/%s/messages/control/manager/updateRegistry" + RegistryUpdateResponseTopic = "channels/%s/messages/control/proplet/updateRegistry" + AliveTopic = "channels/%s/messages/control/proplet/alive" + AlivePayload = `{"status":"alive","proplet_id":"%s","chan_id":"%s"}` + DiscoveryTopic = "channels/%s/messages/control/proplet/create" + DiscoveryPayload = `{"proplet_id":"%s","chan_id":"%s"}` + LWTTopic = "channels/%s/messages/control/proplet/create" + LWTPayload = `{"status":"online","proplet_id":"%s","chan_id":"%s"}` + StartTopic = "channels/%s/messages/control/manager/start" + StopTopic = "channels/%s/messages/control/manager/stop" + RegistryResponseTopic = "channels/%s/messages/registry/server" + RegistryRequestTopic = "channels/%s/messages/registry/proplet" ) -func NewMQTTClient(config Config, logger *slog.Logger) (mqtt.Client, error) { - lwtPayload := fmt.Sprintf(lwtPayloadTemplate, config.PropletID, config.ChannelID) - if lwtPayload == "" { - return nil, fmt.Errorf("failed to prepare MQTT last will payload: %w", pkgerrors.ErrMQTTWillPayloadFailed) - } - - opts := mqtt.NewClientOptions(). - AddBroker(config.BrokerURL). - SetClientID("Proplet-"+config.PropletID). - SetUsername(config.PropletID). - SetPassword(config.Password). - SetCleanSession(true). - SetWill(aliveTopicTemplate+config.ChannelID, lwtPayloadTemplate+config.PropletID+config.ChannelID, 0, false) - - logger.Info("Configured Last Will and Testament") - - opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { - logger.Error("MQTT connection lost", slog.Any("error", err)) - }) - - opts.SetReconnectingHandler(func(client mqtt.Client, options *mqtt.ClientOptions) { - logger.Info("MQTT reconnecting") - }) +type MQTTService struct { + pubsub mqtt.PubSub + config Config + logger *slog.Logger +} - client := mqtt.NewClient(opts) - password := client.Connect() - if password.Wait() && password.Error() != nil { - return nil, fmt.Errorf("failed to connect to MQTT broker '%s': %w", config.BrokerURL, pkgerrors.ErrMQTTConnectionFailed) +func NewMQTTService(ctx context.Context, config Config, logger *slog.Logger) (*MQTTService, error) { + pubsub, err := mqtt.NewPubSub( + config.BrokerURL, + qos, + "Proplet-"+config.PropletID, + config.PropletID, + config.Password, + mqttTimeout, + logger, + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize MQTT PubSub: %w", err) } - PublishDiscovery(client, config, logger) + service := &MQTTService{ + pubsub: pubsub, + config: config, + logger: logger, + } - go startLivelinessUpdates(client, config, logger) + lwtTopic := fmt.Sprintf(LWTTopic, config.ChannelID) + lwtPayload := map[string]string{ + "status": "offline", + "proplet_id": config.PropletID, + "chan_id": config.ChannelID, + } + if err := pubsub.Publish(ctx, lwtTopic, lwtPayload); err != nil { + logger.Error("Failed to set LWT message", slog.Any("error", err)) - return client, nil -} + return nil, err + } -func PublishDiscovery(client mqtt.Client, config Config, logger *slog.Logger) { - topic := fmt.Sprintf(discoveryTopicTemplate, config.ChannelID) - payload := fmt.Sprintf(discoveryPayloadTemplate, config.PropletID, config.ChannelID) - password := client.Publish(topic, 0, false, payload) - password.Wait() - if password.Error() != nil { - logger.Info("failed to publish discovery message: %w", slog.Any("error", password.Error())) + if err := service.PublishDiscoveryMessage(ctx); err != nil { + logger.Error("Failed to publish discovery message", slog.Any("error", err)) - return + return nil, err } - logger.Info("Discovery message published successfully") + go service.StartLivelinessUpdates(ctx) + + return service, nil } -func startLivelinessUpdates(client mqtt.Client, config Config, logger *slog.Logger) { +func (m *MQTTService) StartLivelinessUpdates(ctx context.Context) { ticker := time.NewTicker(livelinessInterval) defer ticker.Stop() - for range ticker.C { - password := client.Publish(fmt.Sprintf(aliveTopicTemplate, config.ChannelID), 0, false, fmt.Sprintf(alivePayloadTemplate, config.PropletID, config.ChannelID)) - password.Wait() - if password.Error() != nil { - logger.Error("Failed to publish liveliness message", slog.String("topic", fmt.Sprintf(aliveTopicTemplate, config.ChannelID)), slog.Any("error", password.Error())) - } else { - logger.Info("Published liveliness message") + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := m.pubsub.Publish(ctx, fmt.Sprintf(AliveTopic, m.config.ChannelID), map[string]string{ + "status": "alive", + "proplet_id": m.config.PropletID, + "chan_id": m.config.ChannelID, + }) + if err != nil { + m.logger.Error("Failed to publish liveliness message", slog.Any("error", err)) + } else { + m.logger.Info("Published liveliness message") + } } } } -func SubscribeToManagerTopics(client mqtt.Client, config Config, startHandler, stopHandler, registryHandler mqtt.MessageHandler) error { - if password := client.Subscribe(fmt.Sprintf(startTopicTemplate, config.ChannelID), 0, startHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to start topic: %w", password.Error()) +func (m *MQTTService) PublishDiscoveryMessage(ctx context.Context) error { + topic := fmt.Sprintf(DiscoveryTopic, m.config.ChannelID) + payload := map[string]string{ + "proplet_id": m.config.PropletID, + "chan_id": m.config.ChannelID, } - - if password := client.Subscribe(fmt.Sprintf(stopTopicTemplate, config.ChannelID), 0, stopHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to stop topic: %w", password.Error()) + if err := m.pubsub.Publish(ctx, topic, payload); err != nil { + return fmt.Errorf("failed to publish discovery message: %w", err) } + m.logger.Info("Discovery message published successfully") + + return nil +} - if password := client.Subscribe(fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID), 0, registryHandler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to registry update topic: %w", password.Error()) +func (m *MQTTService) SubscribeToManagerTopics(ctx context.Context, startHandler, stopHandler, registryHandler mqtt.Handler) error { + handlers := map[string]mqtt.Handler{ + fmt.Sprintf(StartTopic, m.config.ChannelID): startHandler, + fmt.Sprintf(StopTopic, m.config.ChannelID): stopHandler, + fmt.Sprintf(RegistryUpdateRequestTopic, m.config.ChannelID): registryHandler, + } + for topic, handler := range handlers { + if err := m.pubsub.Subscribe(ctx, topic, handler); err != nil { + return fmt.Errorf("failed to subscribe to topic %s: %w", topic, err) + } } return nil } -func SubscribeToRegistryTopic(client mqtt.Client, channelID string, handler mqtt.MessageHandler, logger *slog.Logger) error { - if password := client.Subscribe(fmt.Sprintf(registryResponseTopic, channelID), 0, handler); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to subscribe to registry topic '%s': %w", fmt.Sprintf(registryResponseTopic, channelID), password.Error()) +func (m *MQTTService) SubscribeToRegistryTopic(ctx context.Context, handler mqtt.Handler) error { + topic := fmt.Sprintf(RegistryResponseTopic, m.config.ChannelID) + if err := m.pubsub.Subscribe(ctx, topic, handler); err != nil { + return fmt.Errorf("failed to subscribe to registry topic: %w", err) } return nil } -func PublishFetchRequest(client mqtt.Client, channelID, appName string, logger *slog.Logger) error { - payload, err := json.Marshal(map[string]string{"app_name": appName}) - if err != nil { - return fmt.Errorf("failed to marshal fetch request payload: %w", err) - } - if password := client.Publish(fmt.Sprintf(fetchRequestTopicTemplate, channelID), 0, false, payload); password.Wait() && password.Error() != nil { - return fmt.Errorf("failed to publish fetch request: %w", password.Error()) +func (m *MQTTService) PublishFetchRequest(ctx context.Context, appName string) error { + topic := fmt.Sprintf(RegistryRequestTopic, m.config.ChannelID) + payload := map[string]string{"app_name": appName} + if err := m.pubsub.Publish(ctx, topic, payload); err != nil { + return fmt.Errorf("failed to publish fetch request: %w", err) } + m.logger.Info("Fetch request published successfully") return nil } + +func (m *MQTTService) Close() error { + return m.pubsub.Close() +} diff --git a/proplet/service.go b/proplet/service.go index 91ee9d2..be80756 100644 --- a/proplet/service.go +++ b/proplet/service.go @@ -14,19 +14,19 @@ import ( pkgerrors "github.com/absmach/propeller/pkg/errors" propletapi "github.com/absmach/propeller/proplet/api" - mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/tetratelabs/wazero" wazeroapi "github.com/tetratelabs/wazero/api" ) const ( - filePermissions = 0o644 - pollingInterval = 5 * time.Second + filePermissions = 0o644 + pollingInterval = 5 * time.Second + chunkWaitTimeout = 10 * time.Minute ) type PropletService struct { config Config - mqttClient mqtt.Client + mqttService *MQTTService runtime *WazeroRuntime wasmBinary []byte chunks map[string][][]byte @@ -46,13 +46,6 @@ type WazeroRuntime struct { mutex sync.Mutex } -func NewWazeroRuntime(ctx context.Context) *WazeroRuntime { - return &WazeroRuntime{ - runtime: wazero.NewRuntime(ctx), - modules: make(map[string]wazeroapi.Module), - } -} - func (w *WazeroRuntime) StartApp(ctx context.Context, appName string, wasmBinary []byte, functionName string) (wazeroapi.Function, error) { if appName == "" { return nil, fmt.Errorf("start app: appName is required but missing: %w", pkgerrors.ErrMissingValue) @@ -111,49 +104,47 @@ func (w *WazeroRuntime) StopApp(ctx context.Context, appName string) error { } func NewService(ctx context.Context, cfg Config, wasmBinary []byte, logger *slog.Logger) (*PropletService, error) { - mqttClient, err := NewMQTTClient(cfg, logger) + mqttService, err := NewMQTTService(ctx, cfg, logger) if err != nil { return nil, fmt.Errorf("failed to initialize MQTT client: %w", err) } - runtime := NewWazeroRuntime(ctx) - return &PropletService{ config: cfg, - mqttClient: mqttClient, - runtime: runtime, + mqttService: mqttService, + runtime: NewWazeroRuntime(ctx), wasmBinary: wasmBinary, chunks: make(map[string][][]byte), chunkMetadata: make(map[string]*ChunkPayload), }, nil } +func NewWazeroRuntime(ctx context.Context) *WazeroRuntime { + return &WazeroRuntime{ + runtime: wazero.NewRuntime(ctx), + modules: make(map[string]wazeroapi.Module), + } +} + func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { - if err := SubscribeToManagerTopics( - p.mqttClient, - p.config, - func(client mqtt.Client, msg mqtt.Message) { - p.handleStartCommand(ctx, client, msg, logger) + if err := p.mqttService.SubscribeToManagerTopics(ctx, + func(topic string, msg map[string]interface{}) error { + return p.handleStartCommand(ctx, topic, msg, logger) }, - func(client mqtt.Client, msg mqtt.Message) { - p.handleStopCommand(ctx, client, msg, logger) + func(topic string, msg map[string]interface{}) error { + return p.handleStopCommand(ctx, topic, msg, logger) }, - func(client mqtt.Client, msg mqtt.Message) { - p.registryUpdate(ctx, client, msg, logger) + func(topic string, msg map[string]interface{}) error { + return p.registryUpdate(ctx, topic, msg, logger) }, ); err != nil { return fmt.Errorf("failed to subscribe to Manager topics: %w", err) } - if err := SubscribeToRegistryTopic( - p.mqttClient, - p.config.ChannelID, - func(client mqtt.Client, msg mqtt.Message) { - p.handleChunk(ctx, client, msg) - }, - logger, - ); err != nil { - return fmt.Errorf("failed to subscribe to Registry topics: %w", err) + if err := p.mqttService.SubscribeToRegistryTopic(ctx, func(topic string, msg map[string]interface{}) error { + return p.handleChunk(ctx, topic, msg) + }); err != nil { + return fmt.Errorf("failed to subscribe to registry topic: %w", err) } logger.Info("Proplet service is running.") @@ -162,12 +153,14 @@ func (p *PropletService) Run(ctx context.Context, logger *slog.Logger) error { return nil } -func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { +func (p *PropletService) handleStartCommand(ctx context.Context, _ string, msg map[string]interface{}, logger *slog.Logger) error { var req propletapi.StartRequest - if err := json.Unmarshal(msg.Payload(), &req); err != nil { - logger.Error("Invalid start command payload", slog.Any("error", err)) - - return + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to serialize message payload: %w", err) + } + if err := json.Unmarshal(data, &req); err != nil { + return fmt.Errorf("invalid start command payload: %w", err) } logger.Info("Received start command", slog.String("app_name", req.AppName)) @@ -176,85 +169,87 @@ func (p *PropletService) handleStartCommand(ctx context.Context, _ mqtt.Client, logger.Info("Using preloaded WASM binary", slog.String("app_name", req.AppName)) function, err := p.runtime.StartApp(ctx, req.AppName, p.wasmBinary, "main") if err != nil { - logger.Error("Failed to start app", slog.String("app_name", req.AppName), slog.Any("error", err)) - - return + return fmt.Errorf("failed to start app '%s': %w", req.AppName, err) } _, err = function.Call(ctx) if err != nil { - logger.Error("Error executing app", slog.String("app_name", req.AppName), slog.Any("error", err)) - } else { - logger.Info("App started successfully", slog.String("app_name", req.AppName)) + return fmt.Errorf("error executing app '%s': %w", req.AppName, err) } - return + return nil } - if p.config.RegistryURL != "" { - err := PublishFetchRequest(p.mqttClient, p.config.ChannelID, req.AppName, logger) - if err != nil { - logger.Error("Failed to publish fetch request", slog.String("app_name", req.AppName), slog.Any("error", err)) + if p.config.RegistryURL == "" { + logger.Warn("Registry URL is empty, and no binary provided", slog.String("app_name", req.AppName)) - return - } + return nil + } - go func() { - logger.Info("Waiting for chunks", slog.String("app_name", req.AppName)) + if err := p.mqttService.PublishFetchRequest(ctx, req.AppName); err != nil { + return fmt.Errorf("failed to publish fetch request for app '%s': %w", req.AppName, err) + } - for { - p.chunksMutex.Lock() - metadata, exists := p.chunkMetadata[req.AppName] - receivedChunks := len(p.chunks[req.AppName]) - p.chunksMutex.Unlock() + logger.Info("Waiting for chunks", slog.String("app_name", req.AppName)) + timeout := time.After(chunkWaitTimeout) - if exists && receivedChunks == metadata.TotalChunks { - logger.Info("All chunks received, deploying app", slog.String("app_name", req.AppName)) - go p.deployAndRunApp(ctx, req.AppName) + for { + select { + case <-timeout: + return fmt.Errorf("timed out waiting for chunks for app '%s'", req.AppName) + default: + p.chunksMutex.Lock() + metadata, exists := p.chunkMetadata[req.AppName] + receivedChunks := len(p.chunks[req.AppName]) + p.chunksMutex.Unlock() - break - } + if exists && receivedChunks == metadata.TotalChunks { + go p.deployAndRunApp(ctx, req.AppName) - time.Sleep(pollingInterval) + return nil } - }() - } else { - logger.Warn("Registry URL is empty, and no binary provided", slog.String("app_name", req.AppName)) + + time.Sleep(pollingInterval) + } } } -func (p *PropletService) handleStopCommand(ctx context.Context, _ mqtt.Client, msg mqtt.Message, logger *slog.Logger) { +func (p *PropletService) handleStopCommand(ctx context.Context, _ string, msg map[string]interface{}, logger *slog.Logger) error { var req propletapi.StopRequest - if err := json.Unmarshal(msg.Payload(), &req); err != nil { - logger.Error("Invalid stop command payload", slog.Any("error", err)) + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to serialize message payload: %w", err) + } - return + if err := json.Unmarshal(data, &req); err != nil { + return fmt.Errorf("invalid stop command payload: %w", err) } logger.Info("Received stop command", slog.String("app_name", req.AppName)) - err := p.runtime.StopApp(ctx, req.AppName) + err = p.runtime.StopApp(ctx, req.AppName) if err != nil { - logger.Error("Failed to stop app", slog.String("app_name", req.AppName), slog.Any("error", err)) - - return + return fmt.Errorf("failed to stop app '%s': %w", req.AppName, err) } logger.Info("App stopped successfully", slog.String("app_name", req.AppName)) + + return nil } -func (p *PropletService) handleChunk(ctx context.Context, _ mqtt.Client, msg mqtt.Message) { +func (p *PropletService) handleChunk(ctx context.Context, _ string, msg map[string]interface{}) error { var chunk ChunkPayload - if err := json.Unmarshal(msg.Payload(), &chunk); err != nil { - log.Printf("Failed to unmarshal chunk payload: %v", err) + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to serialize chunk payload: %w", err) + } - return + if err := json.Unmarshal(data, &chunk); err != nil { + return fmt.Errorf("failed to unmarshal chunk payload: %w", err) } if err := chunk.Validate(); err != nil { - log.Printf("Invalid chunk payload: %v\n", err) - - return + return fmt.Errorf("invalid chunk payload: %w", err) } p.chunksMutex.Lock() @@ -272,6 +267,8 @@ func (p *PropletService) handleChunk(ctx context.Context, _ mqtt.Client, msg mqt log.Printf("All chunks received for app '%s'. Deploying...\n", chunk.AppName) go p.deployAndRunApp(ctx, chunk.AppName) } + + return nil } func (p *PropletService) deployAndRunApp(ctx context.Context, appName string) { @@ -349,23 +346,33 @@ func (p *PropletService) UpdateRegistry(ctx context.Context, registryURL, regist return nil } -func (p *PropletService) registryUpdate(ctx context.Context, client mqtt.Client, msg mqtt.Message, logger *slog.Logger) { +func (p *PropletService) registryUpdate(ctx context.Context, _ string, msg map[string]interface{}, _ *slog.Logger) error { var payload struct { RegistryURL string `json:"registry_url"` RegistryToken string `json:"registry_token"` } - if err := json.Unmarshal(msg.Payload(), &payload); err != nil { - logger.Error("Invalid registry update payload", slog.Any("error", err)) - return + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to serialize registry update payload: %w", err) + } + + if err := json.Unmarshal(data, &payload); err != nil { + return fmt.Errorf("invalid registry update payload: %w", err) } - ackTopic := fmt.Sprintf(RegistryAckTopicTemplate, p.config.ChannelID) + ackTopic := fmt.Sprintf(RegistryUpdateResponseTopic, p.config.ChannelID) if err := p.UpdateRegistry(ctx, payload.RegistryURL, payload.RegistryToken); err != nil { - client.Publish(ackTopic, 0, false, fmt.Sprintf(RegistryFailurePayload, err)) - logger.Error("Failed to update registry configuration", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL), slog.Any("error", err)) - } else { - client.Publish(ackTopic, 0, false, RegistrySuccessPayload) - logger.Info("App Registry configuration updated successfully", slog.String("ack_topic", ackTopic), slog.String("registry_url", payload.RegistryURL)) + if pubErr := p.mqttService.pubsub.Publish(ctx, ackTopic, fmt.Sprintf(RegistryFailurePayload, err)); pubErr != nil { + return fmt.Errorf("failed to publish registry update failure acknowledgment on topic '%s': %w", ackTopic, pubErr) + } + + return fmt.Errorf("failed to update registry configuration on topic '%s' with registry URL '%s': %w", ackTopic, payload.RegistryURL, err) + } + + if pubErr := p.mqttService.pubsub.Publish(ctx, ackTopic, RegistrySuccessPayload); pubErr != nil { + return fmt.Errorf("failed to publish registry update success acknowledgment on topic '%s': %w", ackTopic, pubErr) } + + return nil }