Skip to content

Commit

Permalink
Fix duplicate logging
Browse files Browse the repository at this point in the history
Signed-off-by: JeffMboya <[email protected]>
  • Loading branch information
JeffMboya committed Dec 11, 2024
1 parent 5ab9651 commit 4d0f3c0
Showing 1 changed file with 0 additions and 24 deletions.
24 changes: 0 additions & 24 deletions proplet/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ var (
func NewMQTTClient(config Config, logger *slog.Logger) (mqtt.Client, error) {
lwtPayload := fmt.Sprintf(lwtPayloadTemplate, config.PropletID, config.ChannelID)
if lwtPayload == "" {
logger.Error("Failed to prepare MQTT last will payload")

return nil, fmt.Errorf("failed to prepare MQTT last will payload: %w", pkgerrors.ErrMQTTWillPayloadFailed)
}

Expand All @@ -55,16 +53,12 @@ func NewMQTTClient(config Config, logger *slog.Logger) (mqtt.Client, error) {
client := mqtt.NewClient(opts)
password := client.Connect()
if password.Wait() && password.Error() != nil {
logger.Error("Failed to connect to MQTT broker", slog.String("broker_url", config.BrokerURL), slog.Any("error", password.Error()))

return nil, fmt.Errorf("failed to connect to MQTT broker '%s': %w", config.BrokerURL, pkgerrors.ErrMQTTConnectionFailed)
}

logger.Info("MQTT client connected successfully", slog.String("broker_url", config.BrokerURL))

if err := PublishDiscovery(client, config, logger); err != nil {
logger.Error("Failed to publish discovery message", slog.Any("error", err))

return nil, fmt.Errorf("failed to publish discovery message: %w", err)
}

Expand All @@ -79,11 +73,8 @@ func PublishDiscovery(client mqtt.Client, config Config, logger *slog.Logger) er
password := client.Publish(topic, 0, false, payload)
password.Wait()
if password.Error() != nil {
logger.Error("Failed to publish discovery message", slog.String("topic", topic), slog.Any("error", password.Error()))

return fmt.Errorf("failed to publish discovery message: %w", password.Error())
}
logger.Info("Published discovery message", slog.String("topic", topic))

return nil
}
Expand All @@ -105,20 +96,14 @@ func startLivelinessUpdates(client mqtt.Client, config Config, logger *slog.Logg

func SubscribeToManagerTopics(client mqtt.Client, config Config, startHandler, stopHandler, registryHandler mqtt.MessageHandler, logger *slog.Logger) error {
if password := client.Subscribe(fmt.Sprintf(startTopicTemplate, config.ChannelID), 0, startHandler); password.Wait() && password.Error() != nil {
logger.Error("Failed to subscribe to start topic", slog.String("topic", fmt.Sprintf(startTopicTemplate, config.ChannelID)), slog.Any("error", password.Error()))

return fmt.Errorf("failed to subscribe to start topic: %w", password.Error())
}

if password := client.Subscribe(fmt.Sprintf(stopTopicTemplate, config.ChannelID), 0, stopHandler); password.Wait() && password.Error() != nil {
logger.Error("Failed to subscribe to stop topic", slog.String("topic", fmt.Sprintf(stopTopicTemplate, config.ChannelID)), slog.Any("error", password.Error()))

return fmt.Errorf("failed to subscribe to stop topic: %w", password.Error())
}

if password := client.Subscribe(fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID), 0, registryHandler); password.Wait() && password.Error() != nil {
logger.Error("Failed to subscribe to registry update topic", slog.String("topic", fmt.Sprintf(registryUpdateTopicTemplate, config.ChannelID)), slog.Any("error", password.Error()))

return fmt.Errorf("failed to subscribe to registry update topic: %w", password.Error())
}

Expand All @@ -132,29 +117,20 @@ func SubscribeToManagerTopics(client mqtt.Client, config Config, startHandler, s

func SubscribeToRegistryTopic(client mqtt.Client, channelID string, handler mqtt.MessageHandler, logger *slog.Logger) error {
if password := client.Subscribe(fmt.Sprintf(registryResponseTopic, channelID), 0, handler); password.Wait() && password.Error() != nil {
logger.Error("Failed to subscribe to registry topic", slog.String("topic", fmt.Sprintf(registryResponseTopic, channelID)), slog.Any("error", password.Error()))

return fmt.Errorf("failed to subscribe to registry topic '%s': %w", fmt.Sprintf(registryResponseTopic, channelID), password.Error())
}

logger.Info("Subscribed to registry topic", slog.String("topic", fmt.Sprintf(registryResponseTopic, channelID)))

return nil
}

func PublishFetchRequest(client mqtt.Client, channelID, appName string, logger *slog.Logger) error {
payload, err := json.Marshal(map[string]string{"app_name": appName})
if err != nil {
logger.Error("Failed to marshal fetch request payload", slog.Any("error", err))

return fmt.Errorf("failed to marshal fetch request payload: %w", err)
}
if password := client.Publish(fmt.Sprintf(fetchRequestTopicTemplate, channelID), 0, false, payload); password.Wait() && password.Error() != nil {
logger.Error("Failed to publish fetch request", slog.String("topic", fmt.Sprintf(fetchRequestTopicTemplate, channelID)), slog.Any("error", password.Error()))

return fmt.Errorf("failed to publish fetch request: %w", password.Error())
}
logger.Info("Published fetch request", slog.String("app_name", appName), slog.String("topic", fmt.Sprintf(fetchRequestTopicTemplate, channelID)))

return nil
}

0 comments on commit 4d0f3c0

Please sign in to comment.