From dee5c465b4676e0b4d4f4babae169790746d4a67 Mon Sep 17 00:00:00 2001 From: SammyOina Date: Thu, 10 Aug 2023 13:58:35 +0300 Subject: [PATCH] rewrite service Signed-off-by: SammyOina --- modbus/adapter.go | 115 ++++++++++++++++++++++------------------------ modbus/modbus.go | 85 ++++++++++++++++++++++++++++++++-- 2 files changed, 136 insertions(+), 64 deletions(-) diff --git a/modbus/adapter.go b/modbus/adapter.go index cd46b1b4179..6d182ab2d00 100644 --- a/modbus/adapter.go +++ b/modbus/adapter.go @@ -1,83 +1,78 @@ package modbus import ( - "errors" + "context" + "net/url" + "strings" - "github.com/goburrow/modbus" + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/messaging" ) -type IO string - const ( - Coil IO = "coil" - HoldingRegister IO = "h_register" - InputRegister IO = "i_register" - Register IO = "register" - Discrete IO = "discrete" - FIFO IO = "fifo" + readTopic = "modbus-read" + writeTopic = "modbus-write" ) -var errInvalidInput = errors.New("invalid input type") - type Service interface { - // Read gets data from modbus. - Read(address, quantity uint16, iotype IO) ([]byte, error) - // Write writes a value/s on Modbus. - Write(address, quantity uint16, value interface{}, iotype IO) ([]byte, error) + // Forward subscribes to the Subscriber and + // publishes messages using provided Publisher. + Read(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error + // Forward subscribes to the Subscriber and + // publishes messages using provided Publisher. + Write(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error } -var _ Service = (*adapterService)(nil) +type service struct { + logger mflog.Logger +} -// adapterService provides methods for reading and writing data on Modbus. -type adapterService struct { - Client modbus.Client +// NewForwarder returns new Forwarder implementation. +func New(logger mflog.Logger) Service { + return service{ + logger: logger, + } } -// NewModbusService creates a new instance of ModbusService. -func NewModbusService(client modbus.Client) Service { - return &adapterService{Client: client} +func (f service) Read(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { + return sub.Subscribe(ctx, id, readTopic, handleRead(ctx, pub, f.logger)) } -// Write writes a value/s on Modbus. -func (s *adapterService) Write(address, quantity uint16, value interface{}, iotype IO) ([]byte, error) { - switch iotype { - case Coil: - switch val := value.(type) { - case uint16: - return s.Client.WriteSingleCoil(address, val) - case []byte: - return s.Client.WriteMultipleCoils(address, quantity, val) - default: - return nil, errInvalidInput - } - case Register: - switch val := value.(type) { - case uint16: - return s.Client.WriteSingleRegister(address, val) - case []byte: - return s.Client.WriteMultipleRegisters(address, quantity, val) - default: - return nil, errInvalidInput - } - default: - return nil, errInvalidInput +func handleRead(ctx context.Context, pub messaging.Publisher, logger mflog.Logger) handleFunc { + return func(msg *messaging.Message) error { + return nil } } -// Read gets data from modbus. -func (s *adapterService) Read(address uint16, quantity uint16, iotype IO) ([]byte, error) { - switch iotype { - case Coil: - return s.Client.ReadCoils(address, quantity) - case Discrete: - return s.Client.ReadDiscreteInputs(address, quantity) - case FIFO: - return s.Client.ReadFIFOQueue(address) - case HoldingRegister: - return s.Client.ReadHoldingRegisters(address, quantity) - case InputRegister: - return s.Client.ReadInputRegisters(address, quantity) +func (f service) Write(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { + return sub.Subscribe(ctx, id, writeTopic, handleWrite(ctx, pub, f.logger)) +} + +func handleWrite(ctx context.Context, pub messaging.Publisher, logger mflog.Logger) handleFunc { + return func(msg *messaging.Message) error { + return nil + } +} + +type handleFunc func(msg *messaging.Message) error + +func (h handleFunc) Handle(msg *messaging.Message) error { + return h(msg) + +} + +func (h handleFunc) Cancel() error { + return nil +} + +func getClient(address string) (ModbusService, error) { + switch { + case strings.HasPrefix(address, "/dev/") || strings.Contains(address, "COM"): + return NewRTUClient(address, RTUHandlerOptions{}) default: - return nil, errInvalidInput + if _, err := url.Parse(address); err != nil { + return nil, err + } + return NewTCPClient(address, TCPHandlerOptions{}) } } diff --git a/modbus/modbus.go b/modbus/modbus.go index 83cf917866a..b49340dd566 100644 --- a/modbus/modbus.go +++ b/modbus/modbus.go @@ -1,6 +1,7 @@ package modbus import ( + "errors" "log" "reflect" "time" @@ -9,6 +10,33 @@ import ( "github.com/goburrow/serial" ) +type IO string + +const ( + Coil IO = "coil" + HoldingRegister IO = "h_register" + InputRegister IO = "i_register" + Register IO = "register" + Discrete IO = "discrete" + FIFO IO = "fifo" +) + +var errInvalidInput = errors.New("invalid input type") + +type ModbusService interface { + // Read gets data from modbus. + Read(address, quantity uint16, iotype IO) ([]byte, error) + // Write writes a value/s on Modbus. + Write(address, quantity uint16, value interface{}, iotype IO) ([]byte, error) +} + +var _ ModbusService = (*modbusService)(nil) + +// adapterService provides methods for reading and writing data on Modbus. +type modbusService struct { + Client modbus.Client +} + // TCPHandlerOptions defines optional handler values. type TCPHandlerOptions struct { IdleTimeout time.Duration @@ -19,7 +47,7 @@ type TCPHandlerOptions struct { // NewRTUClient initializes a new modbus.Client on TCP protocol from the address // and handler options provided. -func NewTCPClient(address string, config TCPHandlerOptions) (modbus.Client, error) { +func NewTCPClient(address string, config TCPHandlerOptions) (ModbusService, error) { handler := modbus.NewTCPClientHandler(address) if err := handler.Connect(); err != nil { return nil, err @@ -36,7 +64,10 @@ func NewTCPClient(address string, config TCPHandlerOptions) (modbus.Client, erro if !isZeroValue(config.Timeout) { handler.Timeout = config.Timeout } - return modbus.NewClient(handler), nil + + return &modbusService{ + Client: modbus.NewClient(handler), + }, nil } // RTUHandlerOptions defines optional handler values. @@ -55,7 +86,7 @@ type RTUHandlerOptions struct { // NewRTUClient initializes a new modbus.Client on RTU/ASCII protocol from the address // and handler options provided. -func NewRTUClient(address string, config RTUHandlerOptions) (modbus.Client, error) { +func NewRTUClient(address string, config RTUHandlerOptions) (ModbusService, error) { handler := modbus.NewRTUClientHandler(address) if err := handler.Connect(); err != nil { return nil, err @@ -90,7 +121,9 @@ func NewRTUClient(address string, config RTUHandlerOptions) (modbus.Client, erro if !isZeroValue(config.Timeout) { handler.Timeout = config.Timeout } - return modbus.NewClient(handler), nil + return &modbusService{ + Client: modbus.NewClient(handler), + }, nil } func isZeroValue(val interface{}) bool { @@ -112,3 +145,47 @@ func isZeroValue(val interface{}) bool { return reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface()) } } + +// Write writes a value/s on Modbus. +func (s *modbusService) Write(address, quantity uint16, value interface{}, iotype IO) ([]byte, error) { + switch iotype { + case Coil: + switch val := value.(type) { + case uint16: + return s.Client.WriteSingleCoil(address, val) + case []byte: + return s.Client.WriteMultipleCoils(address, quantity, val) + default: + return nil, errInvalidInput + } + case Register: + switch val := value.(type) { + case uint16: + return s.Client.WriteSingleRegister(address, val) + case []byte: + return s.Client.WriteMultipleRegisters(address, quantity, val) + default: + return nil, errInvalidInput + } + default: + return nil, errInvalidInput + } +} + +// Read gets data from modbus. +func (s *modbusService) Read(address uint16, quantity uint16, iotype IO) ([]byte, error) { + switch iotype { + case Coil: + return s.Client.ReadCoils(address, quantity) + case Discrete: + return s.Client.ReadDiscreteInputs(address, quantity) + case FIFO: + return s.Client.ReadFIFOQueue(address) + case HoldingRegister: + return s.Client.ReadHoldingRegisters(address, quantity) + case InputRegister: + return s.Client.ReadInputRegisters(address, quantity) + default: + return nil, errInvalidInput + } +}