Skip to content

Commit

Permalink
fix: Handle race condition when lazy registering export metrics (#1539)
Browse files Browse the repository at this point in the history
fixes #1537

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Jan 17, 2024
1 parent 763fccf commit 7f23418
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 38 deletions.
1 change: 1 addition & 0 deletions internal/runtime/storeforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestMain(m *testing.M) {
mockMetricsManager := &mocks2.MetricsManager{}
mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockMetricsManager.On("Unregister", mock.Anything)
mockMetricsManager.On("IsRegistered", mock.Anything).Return(false)

dic = di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
Expand Down
36 changes: 18 additions & 18 deletions pkg/transforms/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@
package transforms

import (
"errors"

"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
)

func createRegisterMetric(ctx interfaces.AppFunctionContext,
fullNameFunc func() string, getMetric func() any, setMetric func(),
tags map[string]string) {
// Only need to create and register the metric if it hasn't been created yet.
if getMetric() == nil {
lc := ctx.LoggingClient()
func registerMetric(ctx interfaces.AppFunctionContext, fullNameFunc func() string, getMetric func() any, tags map[string]string) {
lc := ctx.LoggingClient()
fullName := fullNameFunc()

metricsManger := ctx.MetricsManager()
if metricsManger == nil {
lc.Errorf("Metrics manager not available. Unable to register %s metric", fullName)
return
}

// Only register the metric if it hasn't been registered yet.
if !metricsManger.IsRegistered(fullName) {
var err error
fullName := fullNameFunc()
lc.Debugf("Initializing metric %s.", fullName)
setMetric()
metricsManger := ctx.MetricsManager()
if metricsManger != nil {
err = metricsManger.Register(fullName, getMetric(), tags)
} else {
err = errors.New("metrics manager not available")
}
lc.Debugf("Registering metric %s.", fullName)
err = metricsManger.Register(fullName, getMetric(), tags)

if err != nil {
lc.Errorf("Unable to register metric %s. Collection will continue, but metric will not be reported: %s", fullName, err.Error())
// In case of race condition, check again if metric was registered by another thread
if !metricsManger.IsRegistered(fullName) {
lc.Errorf("Unable to register metric %s. Collection will continue, but metric will not be reported: %s", fullName, err.Error())
}
return
}

Expand Down
18 changes: 10 additions & 8 deletions pkg/transforms/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestCreateRegisterMetric(t *testing.T) {
func TestRegisterMetric(t *testing.T) {
expectedName := "testCounter"
expectedFullName := expectedName + "-https://somewhere.com"
expectedUrl := "https://somewhere.com"
Expand All @@ -38,15 +38,18 @@ func TestCreateRegisterMetric(t *testing.T) {
Name string
NilMetricsManager bool
RegisterError error
alreadyRegistered bool
}{
{"Happy Path", false, nil},
{"Error - No Metrics Manager", true, nil},
{"Error - Register error", false, errors.New("register failed")},
{"Happy Path", false, nil, false},
{"Happy Path - already registered", false, nil, false},
{"Error - No Metrics Manager", true, nil, false},
{"Error - Register error", false, errors.New("register failed"), false},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
mockMetricsMgr := &mocks2.MetricsManager{}
mockMetricsMgr.On("IsRegistered", mock.Anything).Return(test.alreadyRegistered)
mockMetricsMgr.On("Register", expectedFullName, mock.Anything, expectedTags).Return(test.RegisterError).Once()
mockLogger := &loggerMocks.LoggingClient{}
mockLogger.On("Debugf", mock.Anything, mock.Anything)
Expand All @@ -55,7 +58,7 @@ func TestCreateRegisterMetric(t *testing.T) {
mockCtx.On("LoggingClient").Return(mockLogger)
if test.NilMetricsManager {
mockCtx.On("MetricsManager").Return(nil)
mockLogger.On("Errorf", mock.Anything, expectedFullName, "metrics manager not available")
mockLogger.On("Errorf", "Metrics manager not available. Unable to register %s metric", expectedFullName)
} else {
mockCtx.On("MetricsManager").Return(mockMetricsMgr)
}
Expand All @@ -64,11 +67,10 @@ func TestCreateRegisterMetric(t *testing.T) {
mockLogger.On("Errorf", mock.Anything, expectedFullName, "register failed")
}

var metric gometrics.Counter
createRegisterMetric(mockCtx,
metric := gometrics.NewCounter()
registerMetric(mockCtx,
func() string { return expectedFullName },
func() any { return metric },
func() { metric = gometrics.NewCounter() },
expectedTags)
require.NotNil(t, metric)

Expand Down
10 changes: 4 additions & 6 deletions pkg/transforms/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func NewHTTPSenderWithOptions(options HTTPSenderOptions) *HTTPSender {
secretValueKey: options.SecretValueKey,
secretName: options.SecretName,
urlFormatter: options.URLFormatter,
httpErrorMetric: gometrics.NewCounter(),
httpSizeMetrics: gometrics.NewHistogram(gometrics.NewUniformSample(internal.MetricsReservoirSize)),
}
}

Expand Down Expand Up @@ -163,18 +165,14 @@ func (sender *HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data inter
return false, err
}

createRegisterMetric(ctx,
registerMetric(ctx,
func() string { return fmt.Sprintf("%s-%s", internal.HttpExportErrorsName, parsedUrl.Redacted()) },
func() any { return sender.httpErrorMetric },
func() { sender.httpErrorMetric = gometrics.NewCounter() },
map[string]string{"url": parsedUrl.Redacted()})

createRegisterMetric(ctx,
registerMetric(ctx,
func() string { return fmt.Sprintf("%s-%s", internal.HttpExportSizeName, parsedUrl.Redacted()) },
func() any { return sender.httpSizeMetrics },
func() {
sender.httpSizeMetrics = gometrics.NewHistogram(gometrics.NewUniformSample(internal.MetricsReservoirSize))
},
map[string]string{"url": parsedUrl.Redacted()})

client := &http.Client{}
Expand Down
11 changes: 5 additions & 6 deletions pkg/transforms/mqttsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func NewMQTTSecretSender(mqttConfig MQTTSecretConfig, persistOnError bool) *MQTT
opts.OnReconnecting = sender.onReconnecting
sender.opts = opts

sender.mqttErrorMetric = gometrics.NewCounter()
sender.mqttSizeMetrics = gometrics.NewHistogram(gometrics.NewUniformSample(internal.MetricsReservoirSize))

return sender
}

Expand Down Expand Up @@ -246,18 +249,14 @@ func (sender *MQTTSecretSender) MQTTSend(ctx interfaces.AppFunctionContext, data
tagValue := fmt.Sprintf("%s/%s", sender.mqttConfig.BrokerAddress, publishTopic)
tag := map[string]string{"address/topic": tagValue}

createRegisterMetric(ctx,
registerMetric(ctx,
func() string { return fmt.Sprintf("%s-%s", internal.MqttExportErrorsName, tagValue) },
func() any { return sender.mqttErrorMetric },
func() { sender.mqttErrorMetric = gometrics.NewCounter() },
tag)

createRegisterMetric(ctx,
registerMetric(ctx,
func() string { return fmt.Sprintf("%s-%s", internal.MqttExportSizeName, tagValue) },
func() any { return sender.mqttSizeMetrics },
func() {
sender.mqttSizeMetrics = gometrics.NewHistogram(gometrics.NewUniformSample(internal.MetricsReservoirSize))
},
tag)

if !sender.client.IsConnected() && !sender.preConnected {
Expand Down

0 comments on commit 7f23418

Please sign in to comment.