diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 4a21f1775c..ee35c032d0 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -62,13 +62,16 @@ const ( envRouteMapDB = "MF_OPCUA_ADAPTER_ROUTE_MAP_DB" envNodesConfig = "MF_OPCUA_ADAPTER_CONFIG_FILE" - thingsRMPrefix = "thing" - channelsRMPrefix = "channel" + thingsRMPrefix = "thing" + channelsRMPrefix = "channel" + connectionRMPrefix = "connection" + + columns = 2 ) type config struct { httpPort string - opcConfig opcua.Config + opcuaConfig opcua.Config natsURL string logLevel string esURL string @@ -95,15 +98,19 @@ func main() { rmConn := connectToRedis(cfg.routeMapURL, cfg.routeMapPass, cfg.routeMapDB, logger) defer rmConn.Close() + thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger) + chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger) + connRM := newRouteMapRepositoy(rmConn, connectionRMPrefix, logger) + esConn := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger) defer esConn.Close() publisher := pub.NewMessagePublisher(natsConn) - thingRM := newRouteMapRepositoy(rmConn, thingsRMPrefix, logger) - chanRM := newRouteMapRepositoy(rmConn, channelsRMPrefix, logger) + ctx := context.Background() + pubsub := gopcua.NewPubSub(ctx, publisher, thingRM, chanRM, connRM, logger) - svc := opcua.New(publisher, thingRM, chanRM) + svc := opcua.New(pubsub, thingRM, chanRM, connRM, cfg.opcuaConfig, logger) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( svc, @@ -121,7 +128,7 @@ func main() { }, []string{"method"}), ) - go subscribeToOpcuaServers(svc, cfg.nodesConfig, cfg.opcConfig, logger) + //go subscribeToNodesFromFile(svc, cfg.nodesConfig, cfg.opcuaConfig, logger) go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger) errs := make(chan error, 2) @@ -147,7 +154,7 @@ func loadConfig() config { } return config{ httpPort: mainflux.Env(envHTTPPort, defHTTPPort), - opcConfig: oc, + opcuaConfig: oc, natsURL: mainflux.Env(envNatsURL, defNatsURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), esURL: mainflux.Env(envESURL, defESURL), @@ -186,15 +193,7 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) * }) } -func readFromOpcuaServer(svc opcua.Service, cfg opcua.Config, logger logger.Logger) { - ctx := context.Background() - gr := gopcua.NewReader(ctx, svc, logger) - if err := gr.Read(cfg); err != nil { - logger.Warn(fmt.Sprintf("OPC-UA Read failed: %s", err)) - } -} - -func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) { +func subscribeToNodesFromFile(svc opcua.Service, nodes string, cfg opcua.Config, logger logger.Logger) { if _, err := os.Stat(nodes); os.IsNotExist(err) { logger.Warn(fmt.Sprintf("Config file not found: %s", err)) return @@ -208,10 +207,6 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config, defer file.Close() reader := csv.NewReader(file) - - ctx := context.Background() - gc := gopcua.NewClient(ctx, svc, logger) - for { l, err := reader.Read() if err == io.EOF { @@ -222,17 +217,17 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config, return } - if len(l) < 4 { + if len(l) < columns { logger.Warn(fmt.Sprintf("Empty or incomplete line found in file")) return } cfg.ServerURI = l[0] - cfg.NodeNamespace = l[1] - cfg.NodeIdentifierType = l[2] - cfg.NodeIdentifier = l[3] + cfg.NodeID = l[1] - go subscribeToOpcuaServer(gc, cfg, logger) + if err := svc.Subscribe(cfg); err != nil { + logger.Warn(fmt.Sprintf("OPC-UA Subscription failed: %s", err)) + } } } diff --git a/docker/addons/opcua-adapter/nodes.csv b/docker/addons/opcua-adapter/nodes.csv index bba6b5d31e..e65be3a9b0 100644 --- a/docker/addons/opcua-adapter/nodes.csv +++ b/docker/addons/opcua-adapter/nodes.csv @@ -1,4 +1 @@ -opc.tcp://opcua.rocks:4840,0,i,2255 -opc.tcp://opcua.rocks:4840,0,i,2256 -opc.tcp://opcua.rocks:4840,1,i,2255 -opc.tcp://opcua.rocks:4840,1,i,2256 +opc.tcp://opcua.rocks:4840,ns=0;i=2256 diff --git a/opcua/api/logging.go b/opcua/api/logging.go index b411ffe1db..eef2ad5334 100644 --- a/opcua/api/logging.go +++ b/opcua/api/logging.go @@ -4,7 +4,6 @@ package api import ( - "context" "fmt" "time" @@ -27,9 +26,9 @@ func LoggingMiddleware(svc opcua.Service, logger logger.Logger) opcua.Service { } } -func (lm loggingMiddleware) CreateThing(mfxThing string, opcID string) (err error) { +func (lm loggingMiddleware) CreateThing(mfxThing, opcuaNodeID string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("create_thing mfx:opcua:%s:%s took %s to complete", mfxThing, opcID, time.Since(begin)) + message := fmt.Sprintf("create_thing %s with NodeID %s, took %s to complete", mfxThing, opcuaNodeID, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -37,12 +36,12 @@ func (lm loggingMiddleware) CreateThing(mfxThing string, opcID string) (err erro lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.CreateThing(mfxThing, opcID) + return lm.svc.CreateThing(mfxThing, opcuaNodeID) } -func (lm loggingMiddleware) UpdateThing(mfxThing string, opcID string) (err error) { +func (lm loggingMiddleware) UpdateThing(mfxThing, opcuaNodeID string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("update_thing mfx:opcua:%s:%s took %s to complete", mfxThing, opcID, time.Since(begin)) + message := fmt.Sprintf("update_thing %s with NodeID %s, took %s to complete", mfxThing, opcuaNodeID, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -50,12 +49,12 @@ func (lm loggingMiddleware) UpdateThing(mfxThing string, opcID string) (err erro lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.UpdateThing(mfxThing, opcID) + return lm.svc.UpdateThing(mfxThing, opcuaNodeID) } func (lm loggingMiddleware) RemoveThing(mfxThing string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("remove_thing mfx:opcua:%s took %s to complete", mfxThing, time.Since(begin)) + message := fmt.Sprintf("remove_thing %s, took %s to complete", mfxThing, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -66,9 +65,9 @@ func (lm loggingMiddleware) RemoveThing(mfxThing string) (err error) { return lm.svc.RemoveThing(mfxThing) } -func (lm loggingMiddleware) CreateChannel(mfxChan string, opcNamespace string) (err error) { +func (lm loggingMiddleware) CreateChannel(mfxChan, opcuaServerURI string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("create_channel mfx:opcua:%s:%s took %s to complete", mfxChan, opcNamespace, time.Since(begin)) + message := fmt.Sprintf("create_channel %s with ServerURI %s, took %s to complete", mfxChan, opcuaServerURI, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -76,12 +75,12 @@ func (lm loggingMiddleware) CreateChannel(mfxChan string, opcNamespace string) ( lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.CreateChannel(mfxChan, opcNamespace) + return lm.svc.CreateChannel(mfxChan, opcuaServerURI) } -func (lm loggingMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) (err error) { +func (lm loggingMiddleware) UpdateChannel(mfxChanID, opcuaServerURI string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("update_channel mfx:opcua:%s:%s took %s to complete", mfxChanID, opcNamespace, time.Since(begin)) + message := fmt.Sprintf("update_channel %s with ServerURI %s, took %s to complete", mfxChanID, opcuaServerURI, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -89,12 +88,12 @@ func (lm loggingMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.UpdateChannel(mfxChanID, opcNamespace) + return lm.svc.UpdateChannel(mfxChanID, opcuaServerURI) } func (lm loggingMiddleware) RemoveChannel(mfxChanID string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("remove_channel mfx_channel_%s took %s to complete", mfxChanID, time.Since(begin)) + message := fmt.Sprintf("remove_channel %s, took %s to complete", mfxChanID, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -105,9 +104,9 @@ func (lm loggingMiddleware) RemoveChannel(mfxChanID string) (err error) { return lm.svc.RemoveChannel(mfxChanID) } -func (lm loggingMiddleware) Publish(ctx context.Context, token string, m opcua.Message) (err error) { +func (lm loggingMiddleware) ConnectThing(mfxChanID, mfxThingID string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("publish namespace/%s/id/%s/rx took %s to complete", m.Namespace, m.ID, time.Since(begin)) + message := fmt.Sprintf("connect_thing for channel %s and thing %s, took %s to complete", mfxChanID, mfxThingID, time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -115,5 +114,31 @@ func (lm loggingMiddleware) Publish(ctx context.Context, token string, m opcua.M lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Publish(ctx, token, m) + return lm.svc.ConnectThing(mfxChanID, mfxThingID) +} + +func (lm loggingMiddleware) DisconnectThing(mfxChanID, mfxThingID string) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("disconnect_thing mfx-%s : mfx-%s, took %s to complete", mfxChanID, mfxThingID, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.DisconnectThing(mfxChanID, mfxThingID) +} + +func (lm loggingMiddleware) Subscribe(cfg opcua.Config) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("subscribe to server %s and node_id %s, took %s to complete", cfg.ServerURI, cfg.NodeID, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.svc.Subscribe(cfg) } diff --git a/opcua/api/metrics.go b/opcua/api/metrics.go index 3ce185cc03..37b7a60026 100644 --- a/opcua/api/metrics.go +++ b/opcua/api/metrics.go @@ -4,7 +4,6 @@ package api import ( - "context" "time" "github.com/go-kit/kit/metrics" @@ -28,22 +27,22 @@ func MetricsMiddleware(svc opcua.Service, counter metrics.Counter, latency metri } } -func (mm *metricsMiddleware) CreateThing(mfxDevID string, opcID string) error { +func (mm *metricsMiddleware) CreateThing(mfxDevID, opcuaNodeID string) error { defer func(begin time.Time) { mm.counter.With("method", "create_thing").Add(1) mm.latency.With("method", "create_thing").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.CreateThing(mfxDevID, opcID) + return mm.svc.CreateThing(mfxDevID, opcuaNodeID) } -func (mm *metricsMiddleware) UpdateThing(mfxDevID string, opcID string) error { +func (mm *metricsMiddleware) UpdateThing(mfxDevID, opcuaNodeID string) error { defer func(begin time.Time) { mm.counter.With("method", "update_thing").Add(1) mm.latency.With("method", "update_thing").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.UpdateThing(mfxDevID, opcID) + return mm.svc.UpdateThing(mfxDevID, opcuaNodeID) } func (mm *metricsMiddleware) RemoveThing(mfxDevID string) error { @@ -55,22 +54,22 @@ func (mm *metricsMiddleware) RemoveThing(mfxDevID string) error { return mm.svc.RemoveThing(mfxDevID) } -func (mm *metricsMiddleware) CreateChannel(mfxChanID string, opcNamespace string) error { +func (mm *metricsMiddleware) CreateChannel(mfxChanID, opcuaServerURI string) error { defer func(begin time.Time) { mm.counter.With("method", "create_channel").Add(1) mm.latency.With("method", "create_channel").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.CreateChannel(mfxChanID, opcNamespace) + return mm.svc.CreateChannel(mfxChanID, opcuaServerURI) } -func (mm *metricsMiddleware) UpdateChannel(mfxChanID string, opcNamespace string) error { +func (mm *metricsMiddleware) UpdateChannel(mfxChanID, opcuaServerURI string) error { defer func(begin time.Time) { mm.counter.With("method", "update_channel").Add(1) mm.latency.With("method", "update_channel").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.UpdateChannel(mfxChanID, opcNamespace) + return mm.svc.UpdateChannel(mfxChanID, opcuaServerURI) } func (mm *metricsMiddleware) RemoveChannel(mfxChanID string) error { @@ -82,11 +81,29 @@ func (mm *metricsMiddleware) RemoveChannel(mfxChanID string) error { return mm.svc.RemoveChannel(mfxChanID) } -func (mm *metricsMiddleware) Publish(ctx context.Context, token string, m opcua.Message) error { +func (mm *metricsMiddleware) ConnectThing(mfxChanID, mfxThingID string) error { defer func(begin time.Time) { - mm.counter.With("method", "publish").Add(1) - mm.latency.With("method", "publish").Observe(time.Since(begin).Seconds()) + mm.counter.With("method", "connect_thing").Add(1) + mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.Publish(ctx, token, m) + return mm.svc.ConnectThing(mfxChanID, mfxThingID) +} + +func (mm *metricsMiddleware) DisconnectThing(mfxChanID, mfxThingID string) error { + defer func(begin time.Time) { + mm.counter.With("method", "disconnect_thing").Add(1) + mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.DisconnectThing(mfxChanID, mfxThingID) +} + +func (mm *metricsMiddleware) Subscribe(cfg opcua.Config) error { + defer func(begin time.Time) { + mm.counter.With("method", "subscribe").Add(1) + mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.Subscribe(cfg) } diff --git a/opcua/gopcua/sub.go b/opcua/gopcua/pubsub.go similarity index 56% rename from opcua/gopcua/sub.go rename to opcua/gopcua/pubsub.go index d16dc8e5da..b565b4a801 100644 --- a/opcua/gopcua/sub.go +++ b/opcua/gopcua/pubsub.go @@ -10,6 +10,7 @@ import ( opcuaGopcua "github.com/gopcua/opcua" uaGopcua "github.com/gopcua/opcua/ua" + "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/opcua" @@ -29,22 +30,28 @@ var ( var _ opcua.Subscriber = (*client)(nil) type client struct { - ctx context.Context - svc opcua.Service - logger logger.Logger + ctx context.Context + publisher mainflux.MessagePublisher + thingsRM opcua.RouteMapRepository + channelsRM opcua.RouteMapRepository + connectRM opcua.RouteMapRepository + logger logger.Logger } -// NewClient returns new OPC-UA client instance. -func NewClient(ctx context.Context, svc opcua.Service, log logger.Logger) opcua.Subscriber { +// NewPubSub returns new OPC-UA client instance. +func NewPubSub(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber { return client{ - ctx: ctx, - svc: svc, - logger: log, + ctx: ctx, + publisher: pub, + thingsRM: thingsRM, + channelsRM: channelsRM, + connectRM: connectRM, + logger: log, } } // Subscribe subscribes to the OPC-UA Server. -func (b client) Subscribe(cfg opcua.Config) error { +func (c client) Subscribe(cfg opcua.Config) error { opts := []opcuaGopcua.Option{ opcuaGopcua.SecurityMode(uaGopcua.MessageSecurityModeNone), } @@ -70,13 +77,13 @@ func (b client) Subscribe(cfg opcua.Config) error { } } - c := opcuaGopcua.NewClient(cfg.ServerURI, opts...) - if err := c.Connect(b.ctx); err != nil { + oc := opcuaGopcua.NewClient(cfg.ServerURI, opts...) + if err := oc.Connect(c.ctx); err != nil { return errors.Wrap(errFailedConn, err) } - defer c.Close() + defer oc.Close() - sub, err := c.Subscribe(&opcuaGopcua.SubscriptionParameters{ + sub, err := oc.Subscribe(&opcuaGopcua.SubscriptionParameters{ Interval: 2000 * time.Millisecond, }) if err != nil { @@ -84,19 +91,15 @@ func (b client) Subscribe(cfg opcua.Config) error { } defer sub.Cancel() - b.logger.Info(fmt.Sprintf("OPC-UA server URI: %s", cfg.ServerURI)) - b.logger.Info(fmt.Sprintf("Created subscription with id %v", sub.SubscriptionID)) - - if err := b.runHandler(sub, cfg); err != nil { + if err := c.runHandler(sub, cfg); err != nil { return err } return nil } -func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error { - nid := fmt.Sprintf("ns=%s;%s=%s", cfg.NodeNamespace, cfg.NodeIdentifierType, cfg.NodeIdentifier) - nodeID, err := uaGopcua.ParseNodeID(nid) +func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error { + nodeID, err := uaGopcua.ParseNodeID(cfg.NodeID) if err != nil { return errors.Wrap(errFailedParseNodeID, err) } @@ -112,15 +115,15 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro return errResponseStatus } - go sub.Run(b.ctx) + go sub.Run(c.ctx) for { select { - case <-b.ctx.Done(): + case <-c.ctx.Done(): return nil case res := <-sub.Notifs: if res.Error != nil { - b.logger.Error(res.Error.Error()) + c.logger.Error(res.Error.Error()) continue } @@ -128,8 +131,9 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro case *uaGopcua.DataChangeNotification: for _, item := range x.MonitoredItems { msg := opcua.Message{ - Namespace: cfg.NodeNamespace, - ID: cfg.NodeIdentifier, + ServerURI: cfg.ServerURI, + NodeID: cfg.NodeID, + Type: item.Value.Value.Type().String(), } switch item.Value.Value.Type() { @@ -139,7 +143,7 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro msg.Data = item.Value.Value.Int() case uaGopcua.TypeIDUint64: msg.Data = item.Value.Value.Uint() - case uaGopcua.TypeIDFloat: + case uaGopcua.TypeIDFloat, uaGopcua.TypeIDDouble: msg.Data = item.Value.Value.Float() case uaGopcua.TypeIDString: msg.Data = item.Value.Value.String() @@ -147,13 +151,51 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro msg.Data = 0 } - // Publish on Mainflux NATS broker - b.svc.Publish(b.ctx, "", msg) + c.Publish(c.ctx, "", msg) } default: - b.logger.Info(fmt.Sprintf("what's this publish result? %T", res.Value)) + c.logger.Info(fmt.Sprintf("unknown publish result: %T", res.Value)) } } } } + +// Publish forwards messages from OPC-UA MQTT broker to Mainflux NATS broker +func (c client) Publish(ctx context.Context, token string, m opcua.Message) error { + // Get route-map of the OPC-UA ServerURI + chanID, err := c.channelsRM.Get(m.ServerURI) + if err != nil { + return opcua.ErrNotFoundServerURI + } + + // Get route-map of the OPC-UA NodeID + thingID, err := c.thingsRM.Get(m.NodeID) + if err != nil { + return opcua.ErrNotFoundNodeID + } + + // Check connection between ServerURI and NodeID + cKey := fmt.Sprintf("%s:%s", chanID, thingID) + if _, err := c.connectRM.Get(cKey); err != nil { + return opcua.ErrNotFoundConn + } + + // Publish on Mainflux NATS broker + SenML := fmt.Sprintf(`[{"n":"%s","v":%v}]`, m.Type, m.Data) + payload := []byte(SenML) + msg := mainflux.Message{ + Publisher: thingID, + Protocol: "opcua", + ContentType: "Content-Type", + Channel: chanID, + Payload: payload, + } + + if err := c.publisher.Publish(ctx, token, msg); err != nil { + return err + } + + c.logger.Info(fmt.Sprintf("publish from server %s and node_id %s with value %v", m.ServerURI, m.NodeID, m.Data)) + return nil +} diff --git a/opcua/gopcua/read.go b/opcua/gopcua/read.go deleted file mode 100644 index d18fd995c1..0000000000 --- a/opcua/gopcua/read.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package gopcua - -import ( - "context" - "fmt" - - opcuaGopcua "github.com/gopcua/opcua" - uaGopcua "github.com/gopcua/opcua/ua" - "github.com/mainflux/mainflux/errors" - "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/opcua" -) - -var _ opcua.Reader = (*reader)(nil) - -type reader struct { - ctx context.Context - svc opcua.Service - logger logger.Logger -} - -// NewReader returns new OPC-UA reader instance. -func NewReader(ctx context.Context, svc opcua.Service, log logger.Logger) opcua.Reader { - return reader{ - ctx: ctx, - svc: svc, - logger: log, - } -} - -// Read reads a given OPC-UA Server endpoint. -func (r reader) Read(cfg opcua.Config) error { - c := opcuaGopcua.NewClient(cfg.ServerURI, opcuaGopcua.SecurityMode(uaGopcua.MessageSecurityModeNone)) - if err := c.Connect(r.ctx); err != nil { - return errors.Wrap(errFailedConn, err) - } - defer c.Close() - - nid := fmt.Sprintf("ns=%s;%s=%s", cfg.NodeNamespace, cfg.NodeIdentifierType, cfg.NodeIdentifier) - id, err := uaGopcua.ParseNodeID(nid) - if err != nil { - return errors.Wrap(errFailedParseNodeID, err) - } - - req := &uaGopcua.ReadRequest{ - MaxAge: 2000, - NodesToRead: []*uaGopcua.ReadValueID{ - &uaGopcua.ReadValueID{NodeID: id}, - }, - TimestampsToReturn: uaGopcua.TimestampsToReturnBoth, - } - - resp, err := c.Read(req) - if err != nil { - return errors.Wrap(errFailedRead, err) - } - if resp.Results[0].Status != uaGopcua.StatusOK { - return errResponseStatus - } - - // Publish on Mainflux NATS broker - msg := opcua.Message{ - Namespace: cfg.NodeNamespace, - ID: cfg.NodeIdentifier, - Data: resp.Results[0].Value.Float(), - } - r.svc.Publish(r.ctx, "", msg) - - return nil -} diff --git a/opcua/message.go b/opcua/message.go index 11238511d0..5ff58396d6 100644 --- a/opcua/message.go +++ b/opcua/message.go @@ -5,7 +5,8 @@ package opcua // Message represent an OPC-UA message type Message struct { - Namespace string `json:"namespace"` - ID string `json:"id"` - Data interface{} `json:"data"` + ServerURI string + NodeID string + Type string + Data interface{} } diff --git a/opcua/reader.go b/opcua/reader.go deleted file mode 100644 index 0a21b164be..0000000000 --- a/opcua/reader.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package opcua - -// Reader represents the OPC-UA client. -type Reader interface { - // Read given OPC-UA Server NodeID (Namespace + ID). - Read(Config) error -} diff --git a/opcua/redis/events.go b/opcua/redis/events.go index 715ea8e2d7..dfbd148593 100644 --- a/opcua/redis/events.go +++ b/opcua/redis/events.go @@ -4,17 +4,22 @@ package redis type createThingEvent struct { - id string - opcuaNodeIdentifier string + id string + opcuaNodeID string } type removeThingEvent struct { id string } +type connectThingEvent struct { + chanID string + thingID string +} + type createChannelEvent struct { - id string - opcuaNodeNamespace string + id string + opcuaServerURI string } type removeChannelEvent struct { diff --git a/opcua/redis/routemap.go b/opcua/redis/routemap.go index 500cd7e067..15c5e01ee1 100644 --- a/opcua/redis/routemap.go +++ b/opcua/redis/routemap.go @@ -10,11 +10,6 @@ import ( "github.com/mainflux/mainflux/opcua" ) -const ( - mfxMapPrefix = "mfx:opcua" - opcMapPrefix = "opcua:mfx" -) - var _ opcua.RouteMapRepository = (*routerMap)(nil) type routerMap struct { @@ -30,12 +25,13 @@ func NewRouteMapRepository(client *redis.Client, prefix string) opcua.RouteMapRe } } -func (mr *routerMap) Save(mfxID, opcID string) error { - tkey := fmt.Sprintf("%s:%s:%s", mr.prefix, mfxMapPrefix, mfxID) - if err := mr.client.Set(tkey, opcID, 0).Err(); err != nil { +func (mr *routerMap) Save(mfxID, opcuaID string) error { + tkey := fmt.Sprintf("%s:%s", mr.prefix, mfxID) + if err := mr.client.Set(tkey, opcuaID, 0).Err(); err != nil { return err } - lkey := fmt.Sprintf("%s:%s:%s", mr.prefix, opcMapPrefix, opcID) + + lkey := fmt.Sprintf("%s:%s", mr.prefix, opcuaID) if err := mr.client.Set(lkey, mfxID, 0).Err(); err != nil { return err } @@ -43,8 +39,8 @@ func (mr *routerMap) Save(mfxID, opcID string) error { return nil } -func (mr *routerMap) Get(mfxID string) (string, error) { - lKey := fmt.Sprintf("%s:%s:%s", mr.prefix, opcMapPrefix, mfxID) +func (mr *routerMap) Get(opcuaID string) (string, error) { + lKey := fmt.Sprintf("%s:%s", mr.prefix, opcuaID) mval, err := mr.client.Get(lKey).Result() if err != nil { return "", err @@ -54,12 +50,12 @@ func (mr *routerMap) Get(mfxID string) (string, error) { } func (mr *routerMap) Remove(mfxID string) error { - mkey := fmt.Sprintf("%s:%s:%s", mr.prefix, mfxMapPrefix, mfxID) + mkey := fmt.Sprintf("%s:%s", mr.prefix, mfxID) lval, err := mr.client.Get(mkey).Result() if err != nil { return err } - lkey := fmt.Sprintf("%s:%s:%s", mr.prefix, opcMapPrefix, lval) + lkey := fmt.Sprintf("%s:%s", mr.prefix, lval) return mr.client.Del(mkey, lkey).Err() } diff --git a/opcua/redis/streams.go b/opcua/redis/streams.go index 555dbb6bad..38b8262403 100644 --- a/opcua/redis/streams.go +++ b/opcua/redis/streams.go @@ -14,17 +14,19 @@ import ( ) const ( - keyProtocol = "opcua" - keyIdentifier = "identifier" - keyNamespace = "namespace" + keyProtocol = "opcua" + keyNodeID = "nodeID" + keyServerURI = "serverURI" group = "mainflux.opcua" stream = "mainflux.things" - thingPrefix = "thing." - thingCreate = thingPrefix + "create" - thingUpdate = thingPrefix + "update" - thingRemove = thingPrefix + "remove" + thingPrefix = "thing." + thingCreate = thingPrefix + "create" + thingUpdate = thingPrefix + "update" + thingRemove = thingPrefix + "remove" + thingConnect = thingPrefix + "connect" + thingDisconnect = thingPrefix + "disconnect" channelPrefix = "channel." channelCreate = channelPrefix + "create" @@ -39,9 +41,9 @@ var ( errMetadataFormat = errors.New("malformed metadata") - errMetadataNamespace = errors.New("Node Namespace not found in channel metadatada") + errMetadataServerURI = errors.New("ServerURI not found in channel metadatada") - errMetadataIdentifier = errors.New("Node Identifier not found in thing metadatada") + errMetadataNodeID = errors.New("NodeID not found in thing metadatada") ) var _ opcua.EventStore = (*eventStore)(nil) @@ -119,6 +121,12 @@ func (es eventStore) Subscribe(subject string) error { case channelRemove: rce := decodeRemoveChannel(event) err = es.handleRemoveChannel(rce) + case thingConnect: + rce := decodeConnectThing(event) + err = es.handleConnectThing(rce) + case thingDisconnect: + rce := decodeDisconnectThing(event) + err = es.handleDisconnectThing(rce) } if err != nil && err != errMetadataType { es.logger.Warn(fmt.Sprintf("Failed to handle event sourcing: %s", err.Error())) @@ -150,12 +158,12 @@ func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) { return createThingEvent{}, errMetadataFormat } - val, ok := metadataVal[keyIdentifier].(string) + val, ok := metadataVal[keyNodeID].(string) if !ok || val == "" { - return createThingEvent{}, errMetadataIdentifier + return createThingEvent{}, errMetadataNodeID } - cte.opcuaNodeIdentifier = val + cte.opcuaNodeID = val return cte, nil } @@ -186,12 +194,12 @@ func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, erro return createChannelEvent{}, errMetadataFormat } - val, ok := metadataVal[keyNamespace].(string) + val, ok := metadataVal[keyServerURI].(string) if !ok || val == "" { - return createChannelEvent{}, errMetadataNamespace + return createChannelEvent{}, errMetadataServerURI } - cce.opcuaNodeNamespace = val + cce.opcuaServerURI = val return cce, nil } @@ -201,8 +209,22 @@ func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent { } } +func decodeConnectThing(event map[string]interface{}) connectThingEvent { + return connectThingEvent{ + chanID: read(event, "chan_id", ""), + thingID: read(event, "thing_id", ""), + } +} + +func decodeDisconnectThing(event map[string]interface{}) connectThingEvent { + return connectThingEvent{ + chanID: read(event, "chan_id", ""), + thingID: read(event, "thing_id", ""), + } +} + func (es eventStore) handleCreateThing(cte createThingEvent) error { - return es.svc.CreateThing(cte.id, cte.opcuaNodeIdentifier) + return es.svc.CreateThing(cte.id, cte.opcuaNodeID) } func (es eventStore) handleRemoveThing(rte removeThingEvent) error { @@ -210,13 +232,21 @@ func (es eventStore) handleRemoveThing(rte removeThingEvent) error { } func (es eventStore) handleCreateChannel(cce createChannelEvent) error { - return es.svc.CreateChannel(cce.id, cce.opcuaNodeNamespace) + return es.svc.CreateChannel(cce.id, cce.opcuaServerURI) } func (es eventStore) handleRemoveChannel(rce removeChannelEvent) error { return es.svc.RemoveChannel(rce.id) } +func (es eventStore) handleConnectThing(rte connectThingEvent) error { + return es.svc.ConnectThing(rte.chanID, rte.thingID) +} + +func (es eventStore) handleDisconnectThing(rte connectThingEvent) error { + return es.svc.DisconnectThing(rte.chanID, rte.thingID) +} + func read(event map[string]interface{}, key, def string) string { val, ok := event[key].(string) if !ok { diff --git a/opcua/service.go b/opcua/service.go index d6f95f6605..1f0559e8df 100644 --- a/opcua/service.go +++ b/opcua/service.go @@ -4,32 +4,21 @@ package opcua import ( - "context" - "errors" "fmt" - "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/errors" + "github.com/mainflux/mainflux/logger" ) -const ( - protocol = "opcua" - thingSuffix = "thing" - channelSuffix = "channel" -) +const protocol = "opcua" var ( - // ErrMalformedIdentity indicates malformed identity received (e.g. - // invalid namespace or ID). - ErrMalformedIdentity = errors.New("malformed identity received") - - // ErrMalformedMessage indicates malformed OPC-UA message. - ErrMalformedMessage = errors.New("malformed message received") - - // ErrNotFoundIdentifier indicates a non-existent route map for a Node Identifier. - ErrNotFoundIdentifier = errors.New("route map not found for this node identifier") - - // ErrNotFoundNamespace indicates a non-existent route map for an Node Namespace. - ErrNotFoundNamespace = errors.New("route map not found for this node namespace") + // ErrNotFoundServerURI indicates missing ServerURI route-map + ErrNotFoundServerURI = errors.New("route map not found for this Server URI") + // ErrNotFoundNodeID indicates missing NodeID route-map + ErrNotFoundNodeID = errors.New("route map not found for this Node ID") + // ErrNotFoundConn indicates missing connection + ErrNotFoundConn = errors.New("connection not found") ) // Service specifies an API that must be fullfiled by the domain service @@ -44,96 +33,108 @@ type Service interface { // RemoveThing removes thing mfx:opc & opc:mfx route-map RemoveThing(string) error - // CreateChannel creates channel mfx:opc & opc:mfx route-map + // CreateChannel creates channel route-map CreateChannel(string, string) error - // UpdateChannel updates mfx:opc & opc:mfx route-map + // UpdateChannel updates chroute-map UpdateChannel(string, string) error - // RemoveChannel removes channel mfx:opc & opc:mfx route-map + // RemoveChannel removes channel route-map RemoveChannel(string) error - // Publish forwards messages from the OPC-UA MQTT broker to Mainflux NATS broker - Publish(context.Context, string, Message) error + // ConnectThing creates thing and channel connection route-map + ConnectThing(string, string) error + + // DisconnectThing removes thing and channel connection route-map + DisconnectThing(string, string) error + + // Subscribe subscribes to a given OPC-UA server + Subscribe(Config) error } // Config OPC-UA Server type Config struct { - ServerURI string - NodeNamespace string - NodeIdentifier string - NodeIdentifierType string - Policy string - Mode string - CertFile string - KeyFile string + ServerURI string + NodeID string + Policy string + Mode string + CertFile string + KeyFile string } var _ Service = (*adapterService)(nil) type adapterService struct { - publisher mainflux.MessagePublisher + subscriber Subscriber thingsRM RouteMapRepository channelsRM RouteMapRepository + connectRM RouteMapRepository + cfg Config + logger logger.Logger } // New instantiates the OPC-UA adapter implementation. -func New(pub mainflux.MessagePublisher, thingsRM, channelsRM RouteMapRepository) Service { +func New(sub Subscriber, thingsRM, channelsRM, connectRM RouteMapRepository, cfg Config, log logger.Logger) Service { return &adapterService{ - publisher: pub, + subscriber: sub, thingsRM: thingsRM, channelsRM: channelsRM, + connectRM: connectRM, + cfg: cfg, + logger: log, } } -// Publish forwards messages from OPC-UA MQTT broker to Mainflux NATS broker -func (as *adapterService) Publish(ctx context.Context, token string, m Message) error { - // Get route map of OPC-UA Node Namespace - channelID, err := as.channelsRM.Get(m.Namespace) - if err != nil { - return ErrNotFoundNamespace - } - - // Get route map of OPC-UA Node Identifier - thingID, err := as.thingsRM.Get(m.ID) - if err != nil { - return ErrNotFoundIdentifier - } - - // Publish on Mainflux NATS broker - SenML := fmt.Sprintf(`[{"n":"opcua","v":%v}]`, m.Data) - payload := []byte(SenML) - msg := mainflux.Message{ - Publisher: thingID, - Protocol: protocol, - ContentType: "Content-Type", - Channel: channelID, - Payload: payload, - } - - return as.publisher.Publish(ctx, token, msg) +func (as *adapterService) CreateThing(mfxDevID, opcuaNodeID string) error { + return as.thingsRM.Save(mfxDevID, opcuaNodeID) } -func (as *adapterService) CreateThing(mfxDevID string, opcID string) error { - return as.thingsRM.Save(mfxDevID, opcID) -} - -func (as *adapterService) UpdateThing(mfxDevID string, opcID string) error { - return as.thingsRM.Save(mfxDevID, opcID) +func (as *adapterService) UpdateThing(mfxDevID, opcuaNodeID string) error { + return as.thingsRM.Save(mfxDevID, opcuaNodeID) } func (as *adapterService) RemoveThing(mfxDevID string) error { return as.thingsRM.Remove(mfxDevID) } -func (as *adapterService) CreateChannel(mfxChanID string, opcNamespace string) error { - return as.channelsRM.Save(mfxChanID, opcNamespace) +func (as *adapterService) CreateChannel(mfxChanID, opcuaServerURI string) error { + return as.channelsRM.Save(mfxChanID, opcuaServerURI) } -func (as *adapterService) UpdateChannel(mfxChanID string, opcNamespace string) error { - return as.channelsRM.Save(mfxChanID, opcNamespace) +func (as *adapterService) UpdateChannel(mfxChanID, opcuaServerURI string) error { + return as.channelsRM.Save(mfxChanID, opcuaServerURI) } func (as *adapterService) RemoveChannel(mfxChanID string) error { return as.channelsRM.Remove(mfxChanID) } + +func (as *adapterService) ConnectThing(mfxChanID, mfxThingID string) error { + serverURI, err := as.channelsRM.Get(mfxChanID) + if err != nil { + return err + } + + nodeID, err := as.thingsRM.Get(mfxThingID) + if err != nil { + return err + } + + as.cfg.NodeID = nodeID + as.cfg.ServerURI = serverURI + go as.subscriber.Subscribe(as.cfg) + + c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID) + return as.connectRM.Save(c, c) +} + +func (as *adapterService) DisconnectThing(mfxChanID, mfxThingID string) error { + c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID) + return as.connectRM.Remove(c) +} + +// Subscribe subscribes to the OPC-UA Server. +func (as *adapterService) Subscribe(cfg Config) error { + go as.subscriber.Subscribe(cfg) + return nil +}