Skip to content

Commit

Permalink
feat: Add capability to pre-connect to MQTT Broker for MQTT Export (#…
Browse files Browse the repository at this point in the history
…1527)

* feat: Add capability to pre-connect to MQTT Broker for MQTT Export

Performance enhancement for when service is processing many events very
fast causing back ups when using lazy connection.

closes #1516

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Dec 20, 2023
1 parent f1b4116 commit 437ed90
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 95 deletions.
2 changes: 1 addition & 1 deletion app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/diegoholiveira/jsonlogic/v3 v3.3.2 // indirect
github.com/diegoholiveira/jsonlogic/v3 v3.4.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.6 // indirect
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/diegoholiveira/jsonlogic/v3 v3.3.2 h1:srg/h16pzyuS0/+P2HOt2zdDPDnzaFZtsHtfTugRPVc=
github.com/diegoholiveira/jsonlogic/v3 v3.3.2/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/diegoholiveira/jsonlogic/v3 v3.4.0 h1:TN++nRmEMA5UHzKl8MJ1kbF5SSzWtKHE0PZ6ITbJeH4=
github.com/diegoholiveira/jsonlogic/v3 v3.4.0/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.6 h1:8W0q+EJUnSt5YkvbZKSWScaKt/yrBdITX7ga0Y/z188=
Expand Down
196 changes: 126 additions & 70 deletions internal/app/configurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,85 +21,96 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/transforms"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/util"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
)

const (
ProfileNames = "profilenames"
DeviceNames = "devicenames"
SourceNames = "sourcenames"
ResourceNames = "resourcenames"
FilterOut = "filterout"
EncryptionKey = "key"
InitVector = "initvector"
Url = "url"
ExportMethod = "method"
ExportMethodPost = "post"
ExportMethodPut = "put"
MimeType = "mimetype"
PersistOnError = "persistonerror"
ContinueOnSendError = "continueonsenderror"
ReturnInputData = "returninputdata"
SkipVerify = "skipverify"
Qos = "qos"
Retain = "retain"
AutoReconnect = "autoreconnect"
ConnectTimeout = "connecttimeout"
ProfileName = "profilename"
DeviceName = "devicename"
ResourceName = "resourcename"
ValueType = "valuetype"
MediaType = "mediatype"
Rule = "rule"
BatchThreshold = "batchthreshold"
TimeInterval = "timeinterval"
HeaderName = "headername"
SecretName = "secretname"
SecretValueKey = "secretvaluekey"
BrokerAddress = "brokeraddress"
ClientID = "clientid"
KeepAlive = "keepalive"
Topic = "topic"
TransformType = "type"
TransformXml = "xml"
TransformJson = "json"
AuthMode = "authmode"
Tags = "tags"
ResponseContentType = "responsecontenttype"
Algorithm = "algorithm"
CompressGZIP = "gzip"
CompressZLIB = "zlib"
EncryptAES256 = "aes256"
Mode = "mode"
BatchByCount = "bycount"
BatchByTime = "bytime"
BatchByTimeAndCount = "bytimecount"
IsEventData = "iseventdata"
MergeOnSend = "mergeonsend"
HttpRequestHeaders = "httprequestheaders"
WillEnabled = "willenabled"
WillTopic = "willtopic"
WillQos = "willqos"
WillPayload = "willpayload"
WillRetained = "willretained"
ProfileNames = "profilenames"
DeviceNames = "devicenames"
SourceNames = "sourcenames"
ResourceNames = "resourcenames"
FilterOut = "filterout"
EncryptionKey = "key"
InitVector = "initvector"
Url = "url"
ExportMethod = "method"
ExportMethodPost = "post"
ExportMethodPut = "put"
MimeType = "mimetype"
PersistOnError = "persistonerror"
ContinueOnSendError = "continueonsenderror"
ReturnInputData = "returninputdata"
SkipVerify = "skipverify"
Qos = "qos"
Retain = "retain"
AutoReconnect = "autoreconnect"
ConnectTimeout = "connecttimeout"
ProfileName = "profilename"
DeviceName = "devicename"
ResourceName = "resourcename"
ValueType = "valuetype"
MediaType = "mediatype"
Rule = "rule"
BatchThreshold = "batchthreshold"
TimeInterval = "timeinterval"
HeaderName = "headername"
SecretName = "secretname"
SecretValueKey = "secretvaluekey"
BrokerAddress = "brokeraddress"
ClientID = "clientid"
KeepAlive = "keepalive"
Topic = "topic"
TransformType = "type"
TransformXml = "xml"
TransformJson = "json"
AuthMode = "authmode"
Tags = "tags"
ResponseContentType = "responsecontenttype"
Algorithm = "algorithm"
CompressGZIP = "gzip"
CompressZLIB = "zlib"
EncryptAES256 = "aes256"
Mode = "mode"
BatchByCount = "bycount"
BatchByTime = "bytime"
BatchByTimeAndCount = "bytimecount"
IsEventData = "iseventdata"
MergeOnSend = "mergeonsend"
HttpRequestHeaders = "httprequestheaders"
WillEnabled = "willenabled"
WillTopic = "willtopic"
WillQos = "willqos"
WillPayload = "willpayload"
WillRetained = "willretained"
MaxReconnectInterval = "maxreconnectinterval"
PreConnect = "preconnect"
PreConnectRetryCount = "preconnectretrycount"
PreConnectRetryInterval = "preconnectretryinterval"

PreConnectRetryCountDefault = 6
PreConnectRetryIntervalDefault = time.Second * 10
)

// Configurable contains the helper functions that return the function pointers for building the configurable function pipeline.
// They transform the parameters map from the Pipeline configuration in to the actual parameters required by the function.
type Configurable struct {
lc logger.LoggingClient
sp bootstrapInterfaces.SecretProvider
}

// NewConfigurable returns a new instance of Configurable
func NewConfigurable(lc logger.LoggingClient) *Configurable {
func NewConfigurable(lc logger.LoggingClient, sp bootstrapInterfaces.SecretProvider) *Configurable {
return &Configurable{
lc: lc,
sp: sp,
}
}

Expand Down Expand Up @@ -499,23 +510,65 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
}
}

preConnect := false
preConnectRetryCount := PreConnectRetryCountDefault
preConnectRetryInterval := PreConnectRetryIntervalDefault

boolValue := parameters[PreConnect]
if len(boolValue) > 0 {
preConnect, err = strconv.ParseBool(boolValue)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a bool for '%s' parameter: %s", boolValue, PreConnect, err.Error())
return nil
}
}

countValue := parameters[PreConnectRetryCount]
if len(countValue) > 0 {
preConnectRetryCount, err = strconv.Atoi(countValue)
if err != nil {
app.lc.Errorf("Could not parse '%s' to an int for '%s' parameter: %s", countValue, PreConnectRetryCount, err.Error())
return nil
}
}

intervalValue := parameters[PreConnectRetryInterval]
if len(intervalValue) > 0 {
preConnectRetryInterval, err = time.ParseDuration(intervalValue)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a Duration for '%s' parameter: %s", intervalValue, PreConnectRetryInterval, err.Error())
return nil
}
}

intervalValue = parameters[MaxReconnectInterval]
if len(intervalValue) > 0 {
_, err = time.ParseDuration(intervalValue)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a Duration for '%s' parameter: %s", intervalValue, MaxReconnectInterval, err.Error())
return nil
}

}

// These are optional and blank values result in MQTT defaults being used.
keepAlive := parameters[KeepAlive]
connectTimeout := parameters[ConnectTimeout]

mqttConfig := transforms.MQTTSecretConfig{
Retain: retain,
SkipCertVerify: skipCertVerify,
AutoReconnect: autoReconnect,
ConnectTimeout: connectTimeout,
KeepAlive: keepAlive,
QoS: byte(qos),
BrokerAddress: brokerAddress,
ClientId: clientID,
SecretName: secretName,
Topic: topic,
AuthMode: authMode,
Will: will,
Retain: retain,
SkipCertVerify: skipCertVerify,
AutoReconnect: autoReconnect,
ConnectTimeout: connectTimeout,
KeepAlive: keepAlive,
QoS: byte(qos),
BrokerAddress: brokerAddress,
ClientId: clientID,
SecretName: secretName,
Topic: topic,
AuthMode: authMode,
Will: will,
MaxReconnectInterval: parameters[MaxReconnectInterval], // Validated above
}

// PersistOnError is optional and is false by default.
Expand All @@ -529,6 +582,9 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
}
}
transform := transforms.NewMQTTSecretSender(mqttConfig, persistOnError)
if preConnect {
transform.ConnectToBroker(app.lc, app.sp, preConnectRetryCount, preConnectRetryInterval)
}
return transform.MQTTSend
}

Expand Down
3 changes: 3 additions & 0 deletions internal/app/configurable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ func TestMQTTExport(t *testing.T) {
params[WillEnabled] = "true"
params[WillTopic] = "will"
params[WillPayload] = "goodbye"
params[PreConnectRetryCount] = "10"
params[PreConnectRetryInterval] = "6s"
params[MaxReconnectInterval] = "10s"

trx := configurable.MQTTExport(params)
assert.NotNil(t, trx, "return result from MQTTSecretSend should not be nil")
Expand Down
2 changes: 1 addition & 1 deletion internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.F
return nil, fmt.Errorf("pipline TargetType of '%s' is not supported", svc.config.Writable.Pipeline.TargetType)
}

configurable := reflect.ValueOf(NewConfigurable(svc.lc))
configurable := reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))
pipelineConfig := svc.config.Writable.Pipeline

defaultExecutionOrder := strings.TrimSpace(pipelineConfig.ExecutionOrder)
Expand Down
15 changes: 10 additions & 5 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func TestGetAppSettingStringsNoAppSettings(t *testing.T) {

func TestLoadConfigurableFunctionPipelinesDefaultNotFound(t *testing.T) {
service := Service{
lc: lc,
lc: lc,
dic: dic,
config: &common.ConfigurationStruct{
Writable: common.WritableInfo{
Pipeline: common.PipelineInfo{
Expand Down Expand Up @@ -572,7 +573,8 @@ func TestLoadConfigurableFunctionPipelinesNotABuiltInSdkFunction(t *testing.T) {
functions["Bogus"] = common.PipelineFunction{}

sdk := Service{
lc: lc,
lc: lc,
dic: dic,
config: &common.ConfigurationStruct{
Writable: common.WritableInfo{
Pipeline: common.PipelineInfo{
Expand Down Expand Up @@ -604,7 +606,8 @@ func TestLoadConfigurableFunctionPipelinesNumFunctions(t *testing.T) {
transforms["SetResponseData"] = common.PipelineFunction{}

sdk := Service{
lc: lc,
lc: lc,
dic: dic,
config: &common.ConfigurationStruct{
Writable: common.WritableInfo{
Pipeline: common.PipelineInfo{
Expand Down Expand Up @@ -661,7 +664,8 @@ func TestTargetType(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sdk := Service{
lc: lc,
lc: lc,
dic: dic,
config: &common.ConfigurationStruct{
Writable: common.WritableInfo{
Pipeline: common.PipelineInfo{
Expand Down Expand Up @@ -821,11 +825,12 @@ func TestStop(t *testing.T) {
func TestFindMatchingFunction(t *testing.T) {
svc := Service{
lc: lc,
dic: dic,
serviceKey: "MyAppService",
profileSuffixPlaceholder: interfaces.ProfileSuffixPlaceholder,
}

configurable := reflect.ValueOf(NewConfigurable(svc.lc))
configurable := reflect.ValueOf(NewConfigurable(svc.lc, svc.SecretProvider()))

tests := []struct {
Name string
Expand Down
Loading

0 comments on commit 437ed90

Please sign in to comment.