Skip to content

Commit

Permalink
MF-25 - Add secure WS listener (#23)
Browse files Browse the repository at this point in the history
* init add wss listener

Signed-off-by: Ivan Milošević <[email protected]>

* resolve conflicts after rebase

Signed-off-by: Ivan Milošević <[email protected]>

* add wss listener

Signed-off-by: Ivan Milošević <[email protected]>

* add tls flag

Signed-off-by: Ivan Milošević <[email protected]>

* example

Signed-off-by: Ivan Milošević <[email protected]>

* remove example

Signed-off-by: Ivan Milošević <[email protected]>

* refactor code

Signed-off-by: Ivan Milošević <[email protected]>

* remove debug print

Signed-off-by: Ivan Milošević <[email protected]>

* move extracting client certs before session init

Signed-off-by: Ivan Milošević <[email protected]>

* remove empty line

Signed-off-by: Ivan Milošević <[email protected]>

* add tls package with tls configuration and client certificate manipulation

Signed-off-by: Ivan Milošević <[email protected]>

* remove unused function

Signed-off-by: Ivan Milošević <[email protected]>

* start using ClientCert function from new tls package

Signed-off-by: Ivan Milošević <[email protected]>

* remove unused errors vars

Signed-off-by: Ivan Milošević <[email protected]>

* configurable ws & wss listen paths
change error messages

Signed-off-by: Ivan Milošević <[email protected]>

* Use configuration for ws and wss listener paths

Signed-off-by: Ivan Milošević <[email protected]>
  • Loading branch information
blokovi authored Sep 21, 2020
1 parent b38ddff commit 58cafc6
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 126 deletions.
38 changes: 21 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,27 @@ TLS termination and LB tasks can be offloaded to a standard ingress proxy - for

The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

| Variable | Description | Default |
|---------------------------|------------------------------------------------|-----------|
| MPROXY_HTTP_HOST | HTTP inbound (IN) connection host | 0.0.0.0 |
| MPROXY_HTTP_PORT | HTTP inbound (IN) connection port | 8080 |
| MPROXY_HTTP_TARGET_SCHEMA | HTTP Target schema | ws |
| MPROXY_HTTP_TARGET_HOST | HTTP Target host | localhost |
| MPROXY_HTTP_TARGET_PORT | HTTP Target port | 8888 |
| MPROXY_HTTP_TARGET_PATH | HTTP Target path | /mqtt |
| MPROXY_MQTT_HOST | MQTT inbound connection host | 0.0.0.0 |
| MPROXY_MQTT_PORT | MQTT inbound connection port | 1883 |
| MPROXY_MQTTS_PORT | MQTTS inbound connection port | 8883 |
| MPROXY_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 |
| MPROXY_MQTT_TARGET_PORT | MQTT broker port | 1884 |
| MPROXY_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MPROXY_CA_CERTS | Path to trusted CAs in PEM format | |
| MPROXY_SERVER_CERT | Path to server certificate in pem format | |
| MPROXY_SERVER_KEY | Path to server key in pem format | |
| Variable | Description | Default |
|-------------------------|------------------------------------------------|-----------|
| MPROXY_WS_HOST | WebSocket inbound (IN) connection host | 0.0.0.0 |
| MPROXY_WS_PORT | WebSocket inbound (IN) connection port | 8080 |
| MPROXY_WS_PATH | WebSocket inbound (IN) connection path | /mqtt |
| MPROXY_WSS_PORT | WebSocket Secure inbound (IN) connection port | 8080 |
| MPROXY_WSS_PATH | WebSocket Secure inbound (IN) connection path | /mqtt |
| MPROXY_WS_TARGET_SCHEME | WebSocket Target schema | ws |
| MPROXY_WS_TARGET_HOST | WebSocket Target host | localhost |
| MPROXY_WS_TARGET_PORT | WebSocket Target port | 8888 |
| MPROXY_WS_TARGET_PATH | WebSocket Target path | /mqtt |
| MPROXY_MQTT_HOST | MQTT inbound connection host | 0.0.0.0 |
| MPROXY_MQTT_PORT | MQTT inbound connection port | 1883 |
| MPROXY_MQTTS_PORT | MQTTS inbound connection port | 8883 |
| MPROXY_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 |
| MPROXY_MQTT_TARGET_PORT | MQTT broker port | 1884 |
| MPROXY_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MPROXY_CA_CERTS | Path to trusted CAs in PEM format | |
| MPROXY_SERVER_CERT | Path to server certificate in pem format | |
| MPROXY_SERVER_KEY | Path to server key in pem format | |
| MPROXY_LOG_LEVEL | Log level | debug |

## License
[Apache-2.0](LICENSE)
96 changes: 62 additions & 34 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"crypto/tls"
"fmt"
"log"
"net/http"
Expand All @@ -13,24 +14,31 @@ import (
"github.com/mainflux/mproxy/examples/simple"
"github.com/mainflux/mproxy/pkg/mqtt"
"github.com/mainflux/mproxy/pkg/session"
mptls "github.com/mainflux/mproxy/pkg/tls"
"github.com/mainflux/mproxy/pkg/websocket"
)

const (
// WS
defWSHost = "0.0.0.0"
defWSPort = "8080"
defWSScheme = "ws"
defWSTargetHost = "localhost"
defWSTargetPort = "8888"
defWSTargetPath = "/mqtt"

envWSHost = "MPROXY_WS_HOST"
envWSPort = "MPROXY_WS_PORT"
envWSScheme = "MPROXY_WS_SCHEMA"
envWSTargetHost = "MPROXY_WS_TARGET_HOST"
envWSTargetPort = "MPROXY_WS_TARGET_PORT"
envWSTargetPath = "MPROXY_WS_TARGET_PATH"
defWSHost = "0.0.0.0"
defWSPath = "/mqtt"
defWSPort = "8080"
defWSSPath = "/mqtt"
defWSSPort = "8081"
defWSTargetScheme = "ws"
defWSTargetHost = "localhost"
defWSTargetPort = "8888"
defWSTargetPath = "/mqtt"

envWSHost = "MPROXY_WS_HOST"
envWSPort = "MPROXY_WS_PORT"
envWSPath = "MPROXY_WS_PATH"
envWSSPort = "MPROXY_WSS_PORT"
envWSSPath = "MPROXY_WSS_PATH"
envWSTargetScheme = "MPROXY_WS_TARGET_SCHEME"
envWSTargetHost = "MPROXY_WS_TARGET_HOST"
envWSTargetPort = "MPROXY_WS_TARGET_PORT"
envWSTargetPath = "MPROXY_WS_TARGET_PATH"

// MQTT
defMQTTHost = "0.0.0.0"
Expand Down Expand Up @@ -58,12 +66,15 @@ const (
)

type config struct {
wsHost string
wsPort string
wsScheme string
wsTargetHost string
wsTargetPort string
wsTargetPath string
wsHost string
wsPort string
wsPath string
wssPort string
wssPath string
wsTargetScheme string
wsTargetHost string
wsTargetPort string
wsTargetPath string

mqttHost string
mqttPort string
Expand Down Expand Up @@ -91,9 +102,17 @@ func main() {
errs := make(chan error, 3)

if cfg.clientTLS {
tlsCfg, err := mptls.LoadTLSCfg(cfg.caCerts, cfg.serverCert, cfg.serverKey)
if err != nil {
errs <- err
}

// WSS
logger.Info(fmt.Sprintf("Starting encrypted WebSocket proxy on port %s ", cfg.wssPort))
go proxyWSS(cfg, tlsCfg, logger, h, errs)
// MQTTS
logger.Info(fmt.Sprintf("Starting MQTTS proxy on port %s ", cfg.mqttsPort))
go proxyMQTTS(cfg, logger, h, errs)
go proxyMQTTS(cfg, tlsCfg, logger, h, errs)
} else {
// WS
logger.Info(fmt.Sprintf("Starting WebSocket proxy on port %s ", cfg.wsPort))
Expand Down Expand Up @@ -130,12 +149,15 @@ func loadConfig() config {

return config{
// WS
wsHost: env(envWSHost, defWSHost),
wsPort: env(envWSPort, defWSPort),
wsScheme: env(envWSScheme, defWSScheme),
wsTargetHost: env(envWSTargetHost, defWSTargetHost),
wsTargetPort: env(envWSTargetPort, defWSTargetPort),
wsTargetPath: env(envWSTargetPath, defWSTargetPath),
wsHost: env(envWSHost, defWSHost),
wsPort: env(envWSPort, defWSPort),
wsPath: env(envWSPath, defWSPath),
wssPort: env(envWSSPort, defWSSPort),
wssPath: env(envWSSPath, defWSSPath),
wsTargetScheme: env(envWSTargetScheme, defWSTargetScheme),
wsTargetHost: env(envWSTargetHost, defWSTargetHost),
wsTargetPort: env(envWSTargetPort, defWSTargetPort),
wsTargetPath: env(envWSTargetPath, defWSTargetPath),

// MQTT
mqttHost: env(envMQTTHost, defMQTTHost),
Expand All @@ -155,25 +177,31 @@ func loadConfig() config {

func proxyWS(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) {
target := fmt.Sprintf("%s:%s", cfg.wsTargetHost, cfg.wsTargetPort)
wp := websocket.New(target, cfg.wsTargetPath, cfg.wsScheme, handler, logger)
http.Handle("/mqtt", wp.Handler())
wp := websocket.New(target, cfg.wsTargetPath, cfg.wsTargetScheme, handler, logger)
http.Handle(cfg.wsPath, wp.Handler())

p := fmt.Sprintf(":%s", cfg.wsPort)
errs <- http.ListenAndServe(p, nil)
errs <- wp.Listen(cfg.wsPort)
}

func proxyWSS(cfg config, tlsCfg *tls.Config, logger mflog.Logger, handler session.Handler, errs chan error) {
target := fmt.Sprintf("%s:%s", cfg.wsTargetHost, cfg.wsTargetPort)
wp := websocket.New(target, cfg.wsTargetPath, cfg.wsTargetScheme, handler, logger)
http.Handle(cfg.wssPath, wp.Handler())
errs <- wp.ListenTLS(tlsCfg, cfg.serverCert, cfg.serverKey, cfg.wssPort)
}

func proxyMQTT(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.mqttHost, cfg.mqttPort)
target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort)
mp := mqtt.New(address, target, handler, logger, cfg.caCerts, cfg.serverCert, cfg.serverKey)
mp := mqtt.New(address, target, handler, logger)

errs <- mp.Listen()
}

func proxyMQTTS(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) {
func proxyMQTTS(cfg config, tlsCfg *tls.Config, logger mflog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.mqttHost, cfg.mqttsPort)
target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort)
mp := mqtt.New(address, target, handler, logger, cfg.caCerts, cfg.serverCert, cfg.serverKey)
mp := mqtt.New(address, target, handler, logger)

errs <- mp.ListenTLS()
errs <- mp.ListenTLS(tlsCfg)
}
52 changes: 11 additions & 41 deletions pkg/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ package mqtt

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"net"

"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mproxy/pkg/session"
mptls "github.com/mainflux/mproxy/pkg/tls"
)

var (
errCreateListener = errors.New("failed creating TLS listener")
errParseRoot = errors.New("failed to parse root certificate")
)

// Proxy is main MQTT proxy struct
Expand All @@ -25,21 +23,15 @@ type Proxy struct {
handler session.Handler
logger logger.Logger
dialer net.Dialer
ca string
crt string
key string
}

// New returns a new mqtt Proxy instance.
func New(address, target string, handler session.Handler, logger logger.Logger, ca, crt, key string) *Proxy {
func New(address, target string, handler session.Handler, logger logger.Logger) *Proxy {
return &Proxy{
address: address,
target: target,
handler: handler,
logger: logger,
ca: ca,
crt: crt,
key: key,
}
}

Expand All @@ -65,7 +57,13 @@ func (p Proxy) handle(inbound net.Conn) {
}
defer p.close(outbound)

s := session.New(inbound, outbound, p.handler, p.logger)
clientCert, err := mptls.ClientCert(inbound)
if err != nil {
p.logger.Error("Failed to get client certificate: " + err.Error())
return
}

s := session.New(inbound, outbound, p.handler, p.logger, clientCert)

if err = s.Stream(); !errors.Contains(err, io.EOF) {
p.logger.Warn("Broken connection for client: " + s.Client.ID + " with error: " + err.Error())
Expand All @@ -87,38 +85,10 @@ func (p Proxy) Listen() error {
return nil
}

func (p Proxy) certConfig() (tls.Config, error) {
caCertPEM, err := ioutil.ReadFile(p.ca)
if err != nil {
return tls.Config{}, err
}

roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCertPEM)
if !ok {
return tls.Config{}, errParseRoot
}

cert, err := tls.LoadX509KeyPair(p.crt, p.key)
if err != nil {
return tls.Config{}, err
}
return tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: roots,
}, nil
}

// ListenTLS - version of Listen with TLS encryption
func (p Proxy) ListenTLS() error {
config, err := p.certConfig()

if err != nil {
return err
}
func (p Proxy) ListenTLS(tlsCfg *tls.Config) error {

l, err := tls.Listen("tcp", p.address, &config)
l, err := tls.Listen("tcp", p.address, tlsCfg)
if err != nil {
return errors.Wrap(errCreateListener, err)
}
Expand Down
41 changes: 9 additions & 32 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package session

import (
"crypto/tls"
"crypto/x509"
"net"

Expand All @@ -16,9 +15,8 @@ const (
)

var (
errBroker = errors.New("error between mProxy and MQTT broker")
errClient = errors.New("error between mProxy and MQTT client")
errTLSdetails = errors.New("failed to get TLS details of connection")
errBroker = errors.New("failed proxying from MQTT client to MQTT broker")
errClient = errors.New("failed proxying from MQTT broker to MQTT client")
)

type direction int
Expand All @@ -33,12 +31,15 @@ type Session struct {
}

// New creates a new Session.
func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger) *Session {
func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger, cert x509.Certificate) *Session {
return &Session{
logger: logger,
inbound: inbound,
outbound: outbound,
handler: handler,
Client: Client{
Cert: cert,
},
}
}

Expand Down Expand Up @@ -91,16 +92,9 @@ func (s *Session) stream(dir direction, r, w net.Conn, errs chan error) {
func (s *Session) authorize(pkt packets.ControlPacket) error {
switch p := pkt.(type) {
case *packets.ConnectPacket:
cert, err := clientCert(s.inbound)
if err != nil {
return err
}
s.Client = Client{
ID: p.ClientIdentifier,
Username: p.Username,
Password: p.Password,
Cert: cert,
}
s.Client.ID = p.ClientIdentifier
s.Client.Username = p.Username
s.Client.Password = p.Password
if err := s.handler.AuthConnect(&s.Client); err != nil {
return err
}
Expand Down Expand Up @@ -144,20 +138,3 @@ func wrap(err error, dir direction) error {
return err
}
}

func clientCert(conn net.Conn) (x509.Certificate, error) {
switch connVal := conn.(type) {
case *tls.Conn:
if err := connVal.Handshake(); err != nil {
return x509.Certificate{}, err
}
state := connVal.ConnectionState()
if state.Version == 0 {
return x509.Certificate{}, errTLSdetails
}
cert := *state.PeerCertificates[0]
return cert, nil
default:
return x509.Certificate{}, nil
}
}
Loading

0 comments on commit 58cafc6

Please sign in to comment.