diff --git a/README.md b/README.md index e065fd98..e2c6d9b6 100644 --- a/README.md +++ b/README.md @@ -79,23 +79,27 @@ TLS termination and LB tasks can be offloaded to a standard ingress proxy - for The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|---------------------------|------------------------------------------------|-----------| -| MPROXY_HTTP_HOST | HTTP inbound (IN) connection host | 0.0.0.0 | -| MPROXY_HTTP_PORT | HTTP inbound (IN) connection port | 8080 | -| MPROXY_HTTP_TARGET_SCHEMA | HTTP Target schema | ws | -| MPROXY_HTTP_TARGET_HOST | HTTP Target host | localhost | -| MPROXY_HTTP_TARGET_PORT | HTTP Target port | 8888 | -| MPROXY_HTTP_TARGET_PATH | HTTP Target path | /mqtt | -| MPROXY_MQTT_HOST | MQTT inbound connection host | 0.0.0.0 | -| MPROXY_MQTT_PORT | MQTT inbound connection port | 1883 | -| MPROXY_MQTTS_PORT | MQTTS inbound connection port | 8883 | -| MPROXY_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 | -| MPROXY_MQTT_TARGET_PORT | MQTT broker port | 1884 | -| MPROXY_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | -| MPROXY_CA_CERTS | Path to trusted CAs in PEM format | | -| MPROXY_SERVER_CERT | Path to server certificate in pem format | | -| MPROXY_SERVER_KEY | Path to server key in pem format | | +| Variable | Description | Default | +|-------------------------|------------------------------------------------|-----------| +| MPROXY_WS_HOST | WebSocket inbound (IN) connection host | 0.0.0.0 | +| MPROXY_WS_PORT | WebSocket inbound (IN) connection port | 8080 | +| MPROXY_WS_PATH | WebSocket inbound (IN) connection path | /mqtt | +| MPROXY_WSS_PORT | WebSocket Secure inbound (IN) connection port | 8080 | +| MPROXY_WSS_PATH | WebSocket Secure inbound (IN) connection path | /mqtt | +| MPROXY_WS_TARGET_SCHEME | WebSocket Target schema | ws | +| MPROXY_WS_TARGET_HOST | WebSocket Target host | localhost | +| MPROXY_WS_TARGET_PORT | WebSocket Target port | 8888 | +| MPROXY_WS_TARGET_PATH | WebSocket Target path | /mqtt | +| MPROXY_MQTT_HOST | MQTT inbound connection host | 0.0.0.0 | +| MPROXY_MQTT_PORT | MQTT inbound connection port | 1883 | +| MPROXY_MQTTS_PORT | MQTTS inbound connection port | 8883 | +| MPROXY_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 | +| MPROXY_MQTT_TARGET_PORT | MQTT broker port | 1884 | +| MPROXY_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | +| MPROXY_CA_CERTS | Path to trusted CAs in PEM format | | +| MPROXY_SERVER_CERT | Path to server certificate in pem format | | +| MPROXY_SERVER_KEY | Path to server key in pem format | | +| MPROXY_LOG_LEVEL | Log level | debug | ## License [Apache-2.0](LICENSE) diff --git a/cmd/main.go b/cmd/main.go index 667a5962..f9eff6a8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "crypto/tls" "fmt" "log" "net/http" @@ -13,24 +14,31 @@ import ( "github.com/mainflux/mproxy/examples/simple" "github.com/mainflux/mproxy/pkg/mqtt" "github.com/mainflux/mproxy/pkg/session" + mptls "github.com/mainflux/mproxy/pkg/tls" "github.com/mainflux/mproxy/pkg/websocket" ) const ( // WS - defWSHost = "0.0.0.0" - defWSPort = "8080" - defWSScheme = "ws" - defWSTargetHost = "localhost" - defWSTargetPort = "8888" - defWSTargetPath = "/mqtt" - - envWSHost = "MPROXY_WS_HOST" - envWSPort = "MPROXY_WS_PORT" - envWSScheme = "MPROXY_WS_SCHEMA" - envWSTargetHost = "MPROXY_WS_TARGET_HOST" - envWSTargetPort = "MPROXY_WS_TARGET_PORT" - envWSTargetPath = "MPROXY_WS_TARGET_PATH" + defWSHost = "0.0.0.0" + defWSPath = "/mqtt" + defWSPort = "8080" + defWSSPath = "/mqtt" + defWSSPort = "8081" + defWSTargetScheme = "ws" + defWSTargetHost = "localhost" + defWSTargetPort = "8888" + defWSTargetPath = "/mqtt" + + envWSHost = "MPROXY_WS_HOST" + envWSPort = "MPROXY_WS_PORT" + envWSPath = "MPROXY_WS_PATH" + envWSSPort = "MPROXY_WSS_PORT" + envWSSPath = "MPROXY_WSS_PATH" + envWSTargetScheme = "MPROXY_WS_TARGET_SCHEME" + envWSTargetHost = "MPROXY_WS_TARGET_HOST" + envWSTargetPort = "MPROXY_WS_TARGET_PORT" + envWSTargetPath = "MPROXY_WS_TARGET_PATH" // MQTT defMQTTHost = "0.0.0.0" @@ -58,12 +66,15 @@ const ( ) type config struct { - wsHost string - wsPort string - wsScheme string - wsTargetHost string - wsTargetPort string - wsTargetPath string + wsHost string + wsPort string + wsPath string + wssPort string + wssPath string + wsTargetScheme string + wsTargetHost string + wsTargetPort string + wsTargetPath string mqttHost string mqttPort string @@ -91,9 +102,17 @@ func main() { errs := make(chan error, 3) if cfg.clientTLS { + tlsCfg, err := mptls.LoadTLSCfg(cfg.caCerts, cfg.serverCert, cfg.serverKey) + if err != nil { + errs <- err + } + + // WSS + logger.Info(fmt.Sprintf("Starting encrypted WebSocket proxy on port %s ", cfg.wssPort)) + go proxyWSS(cfg, tlsCfg, logger, h, errs) // MQTTS logger.Info(fmt.Sprintf("Starting MQTTS proxy on port %s ", cfg.mqttsPort)) - go proxyMQTTS(cfg, logger, h, errs) + go proxyMQTTS(cfg, tlsCfg, logger, h, errs) } else { // WS logger.Info(fmt.Sprintf("Starting WebSocket proxy on port %s ", cfg.wsPort)) @@ -130,12 +149,15 @@ func loadConfig() config { return config{ // WS - wsHost: env(envWSHost, defWSHost), - wsPort: env(envWSPort, defWSPort), - wsScheme: env(envWSScheme, defWSScheme), - wsTargetHost: env(envWSTargetHost, defWSTargetHost), - wsTargetPort: env(envWSTargetPort, defWSTargetPort), - wsTargetPath: env(envWSTargetPath, defWSTargetPath), + wsHost: env(envWSHost, defWSHost), + wsPort: env(envWSPort, defWSPort), + wsPath: env(envWSPath, defWSPath), + wssPort: env(envWSSPort, defWSSPort), + wssPath: env(envWSSPath, defWSSPath), + wsTargetScheme: env(envWSTargetScheme, defWSTargetScheme), + wsTargetHost: env(envWSTargetHost, defWSTargetHost), + wsTargetPort: env(envWSTargetPort, defWSTargetPort), + wsTargetPath: env(envWSTargetPath, defWSTargetPath), // MQTT mqttHost: env(envMQTTHost, defMQTTHost), @@ -155,25 +177,31 @@ func loadConfig() config { func proxyWS(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) { target := fmt.Sprintf("%s:%s", cfg.wsTargetHost, cfg.wsTargetPort) - wp := websocket.New(target, cfg.wsTargetPath, cfg.wsScheme, handler, logger) - http.Handle("/mqtt", wp.Handler()) + wp := websocket.New(target, cfg.wsTargetPath, cfg.wsTargetScheme, handler, logger) + http.Handle(cfg.wsPath, wp.Handler()) - p := fmt.Sprintf(":%s", cfg.wsPort) - errs <- http.ListenAndServe(p, nil) + errs <- wp.Listen(cfg.wsPort) +} + +func proxyWSS(cfg config, tlsCfg *tls.Config, logger mflog.Logger, handler session.Handler, errs chan error) { + target := fmt.Sprintf("%s:%s", cfg.wsTargetHost, cfg.wsTargetPort) + wp := websocket.New(target, cfg.wsTargetPath, cfg.wsTargetScheme, handler, logger) + http.Handle(cfg.wssPath, wp.Handler()) + errs <- wp.ListenTLS(tlsCfg, cfg.serverCert, cfg.serverKey, cfg.wssPort) } func proxyMQTT(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) { address := fmt.Sprintf("%s:%s", cfg.mqttHost, cfg.mqttPort) target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort) - mp := mqtt.New(address, target, handler, logger, cfg.caCerts, cfg.serverCert, cfg.serverKey) + mp := mqtt.New(address, target, handler, logger) errs <- mp.Listen() } -func proxyMQTTS(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) { +func proxyMQTTS(cfg config, tlsCfg *tls.Config, logger mflog.Logger, handler session.Handler, errs chan error) { address := fmt.Sprintf("%s:%s", cfg.mqttHost, cfg.mqttsPort) target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort) - mp := mqtt.New(address, target, handler, logger, cfg.caCerts, cfg.serverCert, cfg.serverKey) + mp := mqtt.New(address, target, handler, logger) - errs <- mp.ListenTLS() + errs <- mp.ListenTLS(tlsCfg) } diff --git a/pkg/mqtt/mqtt.go b/pkg/mqtt/mqtt.go index ed29f8d5..52fe4e9d 100644 --- a/pkg/mqtt/mqtt.go +++ b/pkg/mqtt/mqtt.go @@ -2,20 +2,18 @@ package mqtt import ( "crypto/tls" - "crypto/x509" "fmt" "io" - "io/ioutil" "net" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mproxy/pkg/session" + mptls "github.com/mainflux/mproxy/pkg/tls" ) var ( errCreateListener = errors.New("failed creating TLS listener") - errParseRoot = errors.New("failed to parse root certificate") ) // Proxy is main MQTT proxy struct @@ -25,21 +23,15 @@ type Proxy struct { handler session.Handler logger logger.Logger dialer net.Dialer - ca string - crt string - key string } // New returns a new mqtt Proxy instance. -func New(address, target string, handler session.Handler, logger logger.Logger, ca, crt, key string) *Proxy { +func New(address, target string, handler session.Handler, logger logger.Logger) *Proxy { return &Proxy{ address: address, target: target, handler: handler, logger: logger, - ca: ca, - crt: crt, - key: key, } } @@ -65,7 +57,13 @@ func (p Proxy) handle(inbound net.Conn) { } defer p.close(outbound) - s := session.New(inbound, outbound, p.handler, p.logger) + clientCert, err := mptls.ClientCert(inbound) + if err != nil { + p.logger.Error("Failed to get client certificate: " + err.Error()) + return + } + + s := session.New(inbound, outbound, p.handler, p.logger, clientCert) if err = s.Stream(); !errors.Contains(err, io.EOF) { p.logger.Warn("Broken connection for client: " + s.Client.ID + " with error: " + err.Error()) @@ -87,38 +85,10 @@ func (p Proxy) Listen() error { return nil } -func (p Proxy) certConfig() (tls.Config, error) { - caCertPEM, err := ioutil.ReadFile(p.ca) - if err != nil { - return tls.Config{}, err - } - - roots := x509.NewCertPool() - ok := roots.AppendCertsFromPEM(caCertPEM) - if !ok { - return tls.Config{}, errParseRoot - } - - cert, err := tls.LoadX509KeyPair(p.crt, p.key) - if err != nil { - return tls.Config{}, err - } - return tls.Config{ - Certificates: []tls.Certificate{cert}, - ClientAuth: tls.RequireAndVerifyClientCert, - ClientCAs: roots, - }, nil -} - // ListenTLS - version of Listen with TLS encryption -func (p Proxy) ListenTLS() error { - config, err := p.certConfig() - - if err != nil { - return err - } +func (p Proxy) ListenTLS(tlsCfg *tls.Config) error { - l, err := tls.Listen("tcp", p.address, &config) + l, err := tls.Listen("tcp", p.address, tlsCfg) if err != nil { return errors.Wrap(errCreateListener, err) } diff --git a/pkg/session/session.go b/pkg/session/session.go index 67085b02..48c3653a 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -1,7 +1,6 @@ package session import ( - "crypto/tls" "crypto/x509" "net" @@ -16,9 +15,8 @@ const ( ) var ( - errBroker = errors.New("error between mProxy and MQTT broker") - errClient = errors.New("error between mProxy and MQTT client") - errTLSdetails = errors.New("failed to get TLS details of connection") + errBroker = errors.New("failed proxying from MQTT client to MQTT broker") + errClient = errors.New("failed proxying from MQTT broker to MQTT client") ) type direction int @@ -33,12 +31,15 @@ type Session struct { } // New creates a new Session. -func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger) *Session { +func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger, cert x509.Certificate) *Session { return &Session{ logger: logger, inbound: inbound, outbound: outbound, handler: handler, + Client: Client{ + Cert: cert, + }, } } @@ -91,16 +92,9 @@ func (s *Session) stream(dir direction, r, w net.Conn, errs chan error) { func (s *Session) authorize(pkt packets.ControlPacket) error { switch p := pkt.(type) { case *packets.ConnectPacket: - cert, err := clientCert(s.inbound) - if err != nil { - return err - } - s.Client = Client{ - ID: p.ClientIdentifier, - Username: p.Username, - Password: p.Password, - Cert: cert, - } + s.Client.ID = p.ClientIdentifier + s.Client.Username = p.Username + s.Client.Password = p.Password if err := s.handler.AuthConnect(&s.Client); err != nil { return err } @@ -144,20 +138,3 @@ func wrap(err error, dir direction) error { return err } } - -func clientCert(conn net.Conn) (x509.Certificate, error) { - switch connVal := conn.(type) { - case *tls.Conn: - if err := connVal.Handshake(); err != nil { - return x509.Certificate{}, err - } - state := connVal.ConnectionState() - if state.Version == 0 { - return x509.Certificate{}, errTLSdetails - } - cert := *state.PeerCertificates[0] - return cert, nil - default: - return x509.Certificate{}, nil - } -} diff --git a/pkg/tls/tls.go b/pkg/tls/tls.go new file mode 100644 index 00000000..e707e682 --- /dev/null +++ b/pkg/tls/tls.go @@ -0,0 +1,56 @@ +package tls + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net" + + "github.com/mainflux/mainflux/pkg/errors" +) + +var ( + errTLSdetails = errors.New("failed to get TLS details of connection") + errParseRoot = errors.New("failed to parse root certificate") +) + +// LoadTLSCfg return a TLS configuration that can be used in TLS servers +func LoadTLSCfg(ca, crt, key string) (*tls.Config, error) { + caCertPEM, err := ioutil.ReadFile(ca) + if err != nil { + return nil, err + } + + roots := x509.NewCertPool() + if ok := roots.AppendCertsFromPEM(caCertPEM); !ok { + return nil, errParseRoot + } + + cert, err := tls.LoadX509KeyPair(crt, key) + if err != nil { + return nil, err + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: roots, + }, nil +} + +// ClientCert returns client certificate +func ClientCert(conn net.Conn) (x509.Certificate, error) { + switch connVal := conn.(type) { + case *tls.Conn: + if err := connVal.Handshake(); err != nil { + return x509.Certificate{}, err + } + state := connVal.ConnectionState() + if state.Version == 0 { + return x509.Certificate{}, errTLSdetails + } + cert := *state.PeerCertificates[0] + return cert, nil + default: + return x509.Certificate{}, nil + } +} diff --git a/pkg/websocket/websocket.go b/pkg/websocket/websocket.go index fc52d924..f542ac06 100644 --- a/pkg/websocket/websocket.go +++ b/pkg/websocket/websocket.go @@ -1,6 +1,8 @@ package websocket import ( + "crypto/tls" + "fmt" "net/http" "net/url" "time" @@ -8,6 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mproxy/pkg/session" + mptls "github.com/mainflux/mproxy/pkg/tls" ) // Proxy represents WS Proxy. @@ -74,7 +77,7 @@ func (p Proxy) pass(in *websocket.Conn) { srv, _, err := dialer.Dial(url.String(), nil) if err != nil { - p.logger.Error("Unable to connect to broker, reason: " + err.Error()) + p.logger.Error("Unable to connect to broker: " + err.Error()) return } @@ -85,8 +88,30 @@ func (p Proxy) pass(in *websocket.Conn) { defer s.Close() defer c.Close() - session := session.New(c, s, p.event, p.logger) + clientCert, err := mptls.ClientCert(in.UnderlyingConn()) + if err != nil { + p.logger.Error("Failed to get client certificate: " + err.Error()) + return + } + + session := session.New(c, s, p.event, p.logger, clientCert) err = session.Stream() errc <- err p.logger.Warn("Broken connection for client: " + session.Client.ID + " with error: " + err.Error()) } + +// Listen of the server +func (p Proxy) Listen(wsPort string) error { + port := fmt.Sprintf(":%s", wsPort) + return http.ListenAndServe(port, nil) +} + +// ListenTLS - version of Listen with TLS encryption +func (p Proxy) ListenTLS(tlsCfg *tls.Config, crt, key, wssPort string) error { + port := fmt.Sprintf(":%s", wssPort) + server := &http.Server{ + Addr: port, + TLSConfig: tlsCfg, + } + return server.ListenAndServeTLS(crt, key) +}