Skip to content

Commit

Permalink
NOISSUE - Add opcua-adapter conn route-map, use ServerURI and NodeID (a…
Browse files Browse the repository at this point in the history
…bsmach#975)

* NOISSUE - Add opcua-adapter conn route-map, use ServerURI and NodeID

Signed-off-by: Manuel Imperiale <[email protected]>

* NOISSUE - Add dynamic subscription

Signed-off-by: Manuel Imperiale <[email protected]>
  • Loading branch information
manuio authored and drasko committed Dec 9, 2019
1 parent 5120a71 commit 76b68e1
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 282 deletions.
47 changes: 21 additions & 26 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ const (
envRouteMapDB = "MF_OPCUA_ADAPTER_ROUTE_MAP_DB"
envNodesConfig = "MF_OPCUA_ADAPTER_CONFIG_FILE"

thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"

columns = 2
)

type config struct {
httpPort string
opcConfig opcua.Config
opcuaConfig opcua.Config
natsURL string
logLevel string
esURL string
Expand All @@ -95,15 +98,19 @@ func main() {
rmConn := connectToRedis(cfg.routeMapURL, cfg.routeMapPass, cfg.routeMapDB, logger)
defer rmConn.Close()

thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)
connRM := newRouteMapRepositoy(rmConn, connectionRMPrefix, logger)

esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
defer esConn.Close()

publisher := pub.NewMessagePublisher(natsConn)

thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger)
chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger)
ctx := context.Background()
pubsub := gopcua.NewPubSub(ctx, publisher, thingRM, chanRM, connRM, logger)

svc := opcua.New(publisher, thingRM, chanRM)
svc := opcua.New(pubsub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand All @@ -121,7 +128,7 @@ func main() {
}, []string{"method"}),
)

go subscribeToOpcuaServers(svc, cfg.nodesConfig, cfg.opcConfig, logger)
//go subscribeToNodesFromFile(svc, cfg.nodesConfig, cfg.opcuaConfig, logger)
go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger)

errs := make(chan error, 2)
Expand All @@ -147,7 +154,7 @@ func loadConfig() config {
}
return config{
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
opcConfig: oc,
opcuaConfig: oc,
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
esURL: mainflux.Env(envESURL, defESURL),
Expand Down Expand Up @@ -186,15 +193,7 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *
})
}

func readFromOpcuaServer(svc opcua.Service, cfg opcua.Config, logger logger.Logger) {
ctx := context.Background()
gr := gopcua.NewReader(ctx, svc, logger)
if err := gr.Read(cfg); err != nil {
logger.Warn(fmt.Sprintf("OPC-UA Read failed: %s", err))
}
}

func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) {
func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) {
if _, err := os.Stat(nodes); os.IsNotExist(err) {
logger.Warn(fmt.Sprintf("Config file not found: %s", err))
return
Expand All @@ -208,10 +207,6 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config,
defer file.Close()

reader := csv.NewReader(file)

ctx := context.Background()
gc := gopcua.NewClient(ctx, svc, logger)

for {
l, err := reader.Read()
if err == io.EOF {
Expand All @@ -222,17 +217,17 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config,
return
}

if len(l) < 4 {
if len(l) < columns {
logger.Warn(fmt.Sprintf("Empty or incomplete line found in file"))
return
}

cfg.ServerURI = l[0]
cfg.NodeNamespace = l[1]
cfg.NodeIdentifierType = l[2]
cfg.NodeIdentifier = l[3]
cfg.NodeID = l[1]

go subscribeToOpcuaServer(gc, cfg, logger)
if err := svc.Subscribe(cfg); err != nil {
logger.Warn(fmt.Sprintf("OPC-UA Subscription failed: %s", err))
}
}
}

Expand Down
5 changes: 1 addition & 4 deletions docker/addons/opcua-adapter/nodes.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
opc.tcp://opcua.rocks:4840,0,i,2255
opc.tcp://opcua.rocks:4840,0,i,2256
opc.tcp://opcua.rocks:4840,1,i,2255
opc.tcp://opcua.rocks:4840,1,i,2256
opc.tcp://opcua.rocks:4840,ns=0;i=2256
61 changes: 43 additions & 18 deletions opcua/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package api

import (
"context"
"fmt"
"time"

Expand All @@ -27,35 +26,35 @@ func LoggingMiddleware(svc opcua.Service, logger logger.Logger) opcua.Service {
}
}

func (lm loggingMiddleware) CreateThing(mfxThing string, opcID string) (err error) {
func (lm loggingMiddleware) CreateThing(mfxThing, opcuaNodeID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("create_thing mfx:opcua:%s:%s took %s to complete", mfxThing, opcID, time.Since(begin))
message := fmt.Sprintf("create_thing %s with NodeID %s, took %s to complete", mfxThing, opcuaNodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.CreateThing(mfxThing, opcID)
return lm.svc.CreateThing(mfxThing, opcuaNodeID)
}

func (lm loggingMiddleware) UpdateThing(mfxThing string, opcID string) (err error) {
func (lm loggingMiddleware) UpdateThing(mfxThing, opcuaNodeID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("update_thing mfx:opcua:%s:%s took %s to complete", mfxThing, opcID, time.Since(begin))
message := fmt.Sprintf("update_thing %s with NodeID %s, took %s to complete", mfxThing, opcuaNodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.UpdateThing(mfxThing, opcID)
return lm.svc.UpdateThing(mfxThing, opcuaNodeID)
}

func (lm loggingMiddleware) RemoveThing(mfxThing string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("remove_thing mfx:opcua:%s took %s to complete", mfxThing, time.Since(begin))
message := fmt.Sprintf("remove_thing %s, took %s to complete", mfxThing, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -66,35 +65,35 @@ func (lm loggingMiddleware) RemoveThing(mfxThing string) (err error) {
return lm.svc.RemoveThing(mfxThing)
}

func (lm loggingMiddleware) CreateChannel(mfxChan string, opcNamespace string) (err error) {
func (lm loggingMiddleware) CreateChannel(mfxChan, opcuaServerURI string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("create_channel mfx:opcua:%s:%s took %s to complete", mfxChan, opcNamespace, time.Since(begin))
message := fmt.Sprintf("create_channel %s with ServerURI %s, took %s to complete", mfxChan, opcuaServerURI, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.CreateChannel(mfxChan, opcNamespace)
return lm.svc.CreateChannel(mfxChan, opcuaServerURI)
}

func (lm loggingMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) (err error) {
func (lm loggingMiddleware) UpdateChannel(mfxChanID, opcuaServerURI string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("update_channel mfx:opcua:%s:%s took %s to complete", mfxChanID, opcNamespace, time.Since(begin))
message := fmt.Sprintf("update_channel %s with ServerURI %s, took %s to complete", mfxChanID, opcuaServerURI, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.UpdateChannel(mfxChanID, opcNamespace)
return lm.svc.UpdateChannel(mfxChanID, opcuaServerURI)
}

func (lm loggingMiddleware) RemoveChannel(mfxChanID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("remove_channel mfx_channel_%s took %s to complete", mfxChanID, time.Since(begin))
message := fmt.Sprintf("remove_channel %s, took %s to complete", mfxChanID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -105,15 +104,41 @@ func (lm loggingMiddleware) RemoveChannel(mfxChanID string) (err error) {
return lm.svc.RemoveChannel(mfxChanID)
}

func (lm loggingMiddleware) Publish(ctx context.Context, token string, m opcua.Message) (err error) {
func (lm loggingMiddleware) ConnectThing(mfxChanID, mfxThingID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("publish namespace/%s/id/%s/rx took %s to complete", m.Namespace, m.ID, time.Since(begin))
message := fmt.Sprintf("connect_thing for channel %s and thing %s, took %s to complete", mfxChanID, mfxThingID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Publish(ctx, token, m)
return lm.svc.ConnectThing(mfxChanID, mfxThingID)
}

func (lm loggingMiddleware) DisconnectThing(mfxChanID, mfxThingID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("disconnect_thing mfx-%s : mfx-%s, took %s to complete", mfxChanID, mfxThingID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.DisconnectThing(mfxChanID, mfxThingID)
}

func (lm loggingMiddleware) Subscribe(cfg opcua.Config) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("subscribe to server %s and node_id %s, took %s to complete", cfg.ServerURI, cfg.NodeID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Subscribe(cfg)
}
43 changes: 30 additions & 13 deletions opcua/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package api

import (
"context"
"time"

"github.com/go-kit/kit/metrics"
Expand All @@ -28,22 +27,22 @@ func MetricsMiddleware(svc opcua.Service, counter metrics.Counter, latency metri
}
}

func (mm *metricsMiddleware) CreateThing(mfxDevID string, opcID string) error {
func (mm *metricsMiddleware) CreateThing(mfxDevID, opcuaNodeID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "create_thing").Add(1)
mm.latency.With("method", "create_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.CreateThing(mfxDevID, opcID)
return mm.svc.CreateThing(mfxDevID, opcuaNodeID)
}

func (mm *metricsMiddleware) UpdateThing(mfxDevID string, opcID string) error {
func (mm *metricsMiddleware) UpdateThing(mfxDevID, opcuaNodeID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "update_thing").Add(1)
mm.latency.With("method", "update_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.UpdateThing(mfxDevID, opcID)
return mm.svc.UpdateThing(mfxDevID, opcuaNodeID)
}

func (mm *metricsMiddleware) RemoveThing(mfxDevID string) error {
Expand All @@ -55,22 +54,22 @@ func (mm *metricsMiddleware) RemoveThing(mfxDevID string) error {
return mm.svc.RemoveThing(mfxDevID)
}

func (mm *metricsMiddleware) CreateChannel(mfxChanID string, opcNamespace string) error {
func (mm *metricsMiddleware) CreateChannel(mfxChanID, opcuaServerURI string) error {
defer func(begin time.Time) {
mm.counter.With("method", "create_channel").Add(1)
mm.latency.With("method", "create_channel").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.CreateChannel(mfxChanID, opcNamespace)
return mm.svc.CreateChannel(mfxChanID, opcuaServerURI)
}

func (mm *metricsMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) error {
func (mm *metricsMiddleware) UpdateChannel(mfxChanID, opcuaServerURI string) error {
defer func(begin time.Time) {
mm.counter.With("method", "update_channel").Add(1)
mm.latency.With("method", "update_channel").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.UpdateChannel(mfxChanID, opcNamespace)
return mm.svc.UpdateChannel(mfxChanID, opcuaServerURI)
}

func (mm *metricsMiddleware) RemoveChannel(mfxChanID string) error {
Expand All @@ -82,11 +81,29 @@ func (mm *metricsMiddleware) RemoveChannel(mfxChanID string) error {
return mm.svc.RemoveChannel(mfxChanID)
}

func (mm *metricsMiddleware) Publish(ctx context.Context, token string, m opcua.Message) error {
func (mm *metricsMiddleware) ConnectThing(mfxChanID, mfxThingID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "publish").Add(1)
mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds())
mm.counter.With("method", "connect_thing").Add(1)
mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Publish(ctx, token, m)
return mm.svc.ConnectThing(mfxChanID, mfxThingID)
}

func (mm *metricsMiddleware) DisconnectThing(mfxChanID, mfxThingID string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_thing").Add(1)
mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.DisconnectThing(mfxChanID, mfxThingID)
}

func (mm *metricsMiddleware) Subscribe(cfg opcua.Config) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Subscribe(cfg)
}
Loading

0 comments on commit 76b68e1

Please sign in to comment.