From 7bbf8302a7312e1e0f661f722724e78e96139b4f Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Thu, 13 Feb 2020 09:58:45 +0600 Subject: [PATCH 01/13] Do not expose NAT Proxy --- cmd/di.go | 1 - nat/traversal/nat_proxy.go | 26 +++++++++++++------------- nat/traversal/pinger.go | 6 +++--- nat/traversal/pinger_test.go | 10 +++++++--- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/cmd/di.go b/cmd/di.go index cfdb34b5cc..00e7c740c5 100644 --- a/cmd/di.go +++ b/cmd/di.go @@ -811,7 +811,6 @@ func (di *Dependencies) bootstrapNATComponents(options node.Options) { log.Debug().Msg("Experimental NAT punching enabled, creating a pinger") di.NATPinger = traversal.NewPinger( traversal.DefaultPingConfig(), - traversal.NewNATProxy(), di.EventBus, ) } else { diff --git a/nat/traversal/nat_proxy.go b/nat/traversal/nat_proxy.go index b0dc491c48..f40a6fd092 100644 --- a/nat/traversal/nat_proxy.go +++ b/nat/traversal/nat_proxy.go @@ -28,21 +28,21 @@ import ( const bufferLen = 2048 * 1024 -// NATProxy provides traffic proxying functionality for registered services -type NATProxy struct { +// natProxy provides traffic proxying functionality for registered services +type natProxy struct { servicePorts map[string]int addrLast *net.UDPAddr socketProtect func(socket int) bool } // NewNATProxy constructs an instance of NATProxy -func NewNATProxy() *NATProxy { - return &NATProxy{ +func newNATProxy() *natProxy { + return &natProxy{ servicePorts: make(map[string]int), } } -func (np *NATProxy) consumerHandOff(consumerAddr string, remoteConn *net.UDPConn) chan struct{} { +func (np *natProxy) consumerHandOff(consumerAddr string, remoteConn *net.UDPConn) chan struct{} { time.Sleep(400 * time.Millisecond) stop := make(chan struct{}) if np.socketProtect == nil { @@ -57,7 +57,7 @@ func (np *NATProxy) consumerHandOff(consumerAddr string, remoteConn *net.UDPConn // consumerProxy launches listener on pinger port and wait for openvpn connect // Read from listener socket and write to remoteConn // Read from remoteConn and write to listener socket -func (np *NATProxy) consumerProxy(consumerAddr string, remoteConn *net.UDPConn, stop chan struct{}) { +func (np *natProxy) consumerProxy(consumerAddr string, remoteConn *net.UDPConn, stop chan struct{}) { laddr, err := net.ResolveUDPAddr("udp4", consumerAddr) if err != nil { log.Error().Err(err).Msg("Failed to get local address for consumer NATProxy") @@ -104,7 +104,7 @@ func (np *NATProxy) consumerProxy(consumerAddr string, remoteConn *net.UDPConn, } } -func (np *NATProxy) joinUDPStreams(conn *net.UDPConn, remoteConn *net.UDPConn, stop chan struct{}) { +func (np *natProxy) joinUDPStreams(conn *net.UDPConn, remoteConn *net.UDPConn, stop chan struct{}) { log.Info().Msg("Start copying stream from consumer NATProxy to remote remoteConn") buf := make([]byte, bufferLen) for { @@ -133,7 +133,7 @@ func (np *NATProxy) joinUDPStreams(conn *net.UDPConn, remoteConn *net.UDPConn, s } } -func (np *NATProxy) readWriteToAddr(conn *net.UDPConn, remoteConn *net.UDPConn, addr *net.UDPAddr, stop chan struct{}) { +func (np *natProxy) readWriteToAddr(conn *net.UDPConn, remoteConn *net.UDPConn, addr *net.UDPAddr, stop chan struct{}) { buf := make([]byte, bufferLen) for { select { @@ -163,7 +163,7 @@ func (np *NATProxy) readWriteToAddr(conn *net.UDPConn, remoteConn *net.UDPConn, } // handOff traffic incoming through NATPinger punched hole should be handed off to NATPoxy -func (np *NATProxy) handOff(key string, incomingConn *net.UDPConn) { +func (np *natProxy) handOff(key string, incomingConn *net.UDPConn) { proxyConn, err := np.getConnection(key) if err != nil { log.Error().Err(err).Msg("Failed to connect to NATProxy") @@ -189,12 +189,12 @@ func copyStreams(dstConn *net.UDPConn, srcConn *net.UDPConn) { totalBytes) } -func (np *NATProxy) registerServicePort(key string, port int) { +func (np *natProxy) registerServicePort(key string, port int) { log.Info().Msgf("Registering service %s for port %d to NATProxy", key, port) np.servicePorts[key] = port } -func (np *NATProxy) getConnection(key string) (*net.UDPConn, error) { +func (np *natProxy) getConnection(key string) (*net.UDPConn, error) { udpAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", np.servicePorts[key])) if err != nil { return nil, err @@ -202,10 +202,10 @@ func (np *NATProxy) getConnection(key string) (*net.UDPConn, error) { return net.DialUDP("udp", nil, udpAddr) } -func (np *NATProxy) isAvailable(key string) bool { +func (np *natProxy) isAvailable(key string) bool { return np.servicePorts[key] > 0 } -func (np *NATProxy) setProtectSocketCallback(socketProtect func(socket int) bool) { +func (np *natProxy) setProtectSocketCallback(socketProtect func(socket int) bool) { np.socketProtect = socketProtect } diff --git a/nat/traversal/pinger.go b/nat/traversal/pinger.go index ce29ff0d47..df4402729a 100644 --- a/nat/traversal/pinger.go +++ b/nat/traversal/pinger.go @@ -76,7 +76,7 @@ type Pinger struct { stop chan struct{} stopNATProxy chan struct{} once sync.Once - natProxy *NATProxy + natProxy *natProxy eventPublisher eventbus.Publisher } @@ -86,13 +86,13 @@ type PortSupplier interface { } // NewPinger returns Pinger instance -func NewPinger(pingConfig *PingConfig, proxy *NATProxy, publisher eventbus.Publisher) NATPinger { +func NewPinger(pingConfig *PingConfig, publisher eventbus.Publisher) NATPinger { return &Pinger{ pingConfig: pingConfig, pingTarget: make(chan *Params), stop: make(chan struct{}), stopNATProxy: make(chan struct{}), - natProxy: proxy, + natProxy: newNATProxy(), eventPublisher: publisher, } } diff --git a/nat/traversal/pinger_test.go b/nat/traversal/pinger_test.go index 6e89e8a235..0cddf91307 100644 --- a/nat/traversal/pinger_test.go +++ b/nat/traversal/pinger_test.go @@ -23,7 +23,6 @@ import ( "testing" "time" - "github.com/mysteriumnetwork/node/mocks" "github.com/stretchr/testify/assert" ) @@ -119,6 +118,11 @@ func TestPinger_PingProvider_Timeout(t *testing.T) { } func newPinger(config *PingConfig) NATPinger { - proxy := NewNATProxy() - return NewPinger(config, proxy, mocks.NewEventBus()) + return NewPinger(config, &mockPublisher{}) +} + +type mockPublisher struct { +} + +func (p mockPublisher) Publish(topic string, data interface{}) { } From 440b0d88e1f8f7ffa264a2c2d04fdaf7e877ec8c Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Mon, 17 Feb 2020 20:42:13 +0600 Subject: [PATCH 02/13] TTL based NAT hole punching with parallel pings --- nat/traversal/noop.go | 6 +- nat/traversal/pinger.go | 181 +++++++++++++--------------- services/noop/service.go | 2 +- services/openvpn/client.go | 82 +++++++++---- services/openvpn/client_config.go | 2 - services/openvpn/openvpn.go | 3 +- services/openvpn/service/manager.go | 32 +++-- 7 files changed, 163 insertions(+), 145 deletions(-) diff --git a/nat/traversal/noop.go b/nat/traversal/noop.go index 40d42759cd..d64f74f4a6 100644 --- a/nat/traversal/noop.go +++ b/nat/traversal/noop.go @@ -17,6 +17,8 @@ package traversal +import "net" + // NoopPinger does nothing type NoopPinger struct{} @@ -43,8 +45,8 @@ func (np *NoopPinger) Start() {} func (np *NoopPinger) Stop() {} // PingProvider does nothing -func (np *NoopPinger) PingProvider(ip string, port, consumerPort, proxyPort int, stop <-chan struct{}) error { - return nil +func (np *NoopPinger) PingProvider(ip string, cPorts, pPorts []int, proxyPort int) (*net.UDPConn, error) { + return nil, nil } // PingTarget does nothing diff --git a/nat/traversal/pinger.go b/nat/traversal/pinger.go index df4402729a..92ef19f439 100644 --- a/nat/traversal/pinger.go +++ b/nat/traversal/pinger.go @@ -41,7 +41,7 @@ var ( // NATProviderPinger pings provider and optionally hands off connection to consumer proxy. type NATProviderPinger interface { - PingProvider(ip string, providerPort, consumerPort, proxyPort int, stop <-chan struct{}) error + PingProvider(ip string, cPorts, pPorts []int, proxyPort int) (*net.UDPConn, error) } // NATPinger is responsible for pinging nat holes @@ -99,11 +99,10 @@ func NewPinger(pingConfig *PingConfig, publisher eventbus.Publisher) NATPinger { // Params contains session parameters needed to NAT ping remote peer type Params struct { - ProviderPort int - ConsumerPort int - ConsumerPublicIP string + ProviderPorts []int + ConsumerPorts []int + IP string ProxyPortMappingKey string - Cancel chan struct{} } // Start starts NAT pinger and waits for PingTarget to ping @@ -124,7 +123,7 @@ func (p *Pinger) Start() { } func isPunchingRequired(params *Params) bool { - return params.ConsumerPort > 0 + return true } // Stop stops pinger loop @@ -136,81 +135,47 @@ func (p *Pinger) Stop() { } // PingProvider pings provider determined by destination provided in sessionConfig -func (p *Pinger) PingProvider(ip string, providerPort, consumerPort, proxyPort int, stop <-chan struct{}) error { +func (p *Pinger) PingProvider(ip string, cPorts, pPorts []int, proxyPort int) (*net.UDPConn, error) { log.Info().Msg("NAT pinging to provider") - conn, err := p.getConnection(ip, providerPort, consumerPort) - if err != nil { - return errors.Wrap(err, "failed to get connection") - } + stop := make(chan struct{}) + defer close(stop) - // Add read deadline to prevent possible conn.Read hang when remote peer doesn't send ping ack. - conn.SetReadDeadline(time.Now().Add(p.pingConfig.Timeout * 2)) - - pingStop := make(chan struct{}) - defer close(pingStop) - go func() { - err := p.ping(conn, pingStop) - if err != nil { - log.Warn().Err(err).Msg("Error while pinging") - } - }() - - time.Sleep(p.pingConfig.Interval) - err = p.pingReceiver(conn, stop) + conn, err := p.multiPing(ip, cPorts, pPorts, 128, stop) if err != nil { - return err + log.Err(err).Msg("Failed to ping remote peer") + return nil, err } - // send one last ping request to end hole punching procedure gracefully - err = p.sendPingRequest(conn, 128) - if err != nil { - return errors.Wrap(err, "remote ping failed") - } + consumerAddr := fmt.Sprintf("127.0.0.1:%d", proxyPort) + log.Info().Msg("Handing connection to consumer NATProxy: " + consumerAddr) - if proxyPort > 0 { - consumerAddr := fmt.Sprintf("127.0.0.1:%d", proxyPort) - log.Info().Msg("Handing connection to consumer NATProxy: " + consumerAddr) - - // Set higher read deadline when NAT proxy is used. - conn.SetReadDeadline(time.Now().Add(12 * time.Hour)) - p.stopNATProxy = p.natProxy.consumerHandOff(consumerAddr, conn) - } else { - log.Info().Msg("Closing ping connection") - if err := conn.Close(); err != nil { - return errors.Wrap(err, "could not close ping conn") - } - } - return nil + p.stopNATProxy = p.natProxy.consumerHandOff(consumerAddr, conn) + + return conn, err } -func (p *Pinger) ping(conn *net.UDPConn, stop <-chan struct{}) error { +func (p *Pinger) ping(conn *net.UDPConn, ttl int, stop <-chan struct{}) error { // Windows detects that 1 TTL is too low and throws an exception during send - ttl := 0 i := 0 + err := ipv4.NewConn(conn).SetTTL(ttl) + if err != nil { + return errors.Wrap(err, "pinger setting ttl failed") + } + for { select { case <-stop: return nil case <-time.After(p.pingConfig.Interval): - log.Debug().Msg("Pinging... ") - // This is the essence of the TTL based udp punching. - // We're slowly increasing the TTL so that the packet is held. - // After a few attempts we're setting the value to 128 and assuming we're through. - // We could stop sending ping to Consumer beyond 4 hops to prevent from possible Consumer's router's - // DOS block, but we plan, that Consumer at the same time will be Provider too in near future. - ttl++ - - if ttl > 4 { - ttl = 128 - } + log.Debug().Msgf("Pinging %s from %s...", conn.RemoteAddr().String(), conn.LocalAddr().String()) - err := p.sendPingRequest(conn, ttl) + _, err := conn.Write([]byte("continuously pinging to " + conn.RemoteAddr().String())) if err != nil { p.eventPublisher.Publish(event.AppTopicTraversal, event.BuildFailureEvent(StageName, err)) - return err + return errors.Wrap(err, "pinging request failed") } i++ @@ -224,25 +189,15 @@ func (p *Pinger) ping(conn *net.UDPConn, stop <-chan struct{}) error { } } -func (p *Pinger) sendPingRequest(conn *net.UDPConn, ttl int) error { - err := ipv4.NewConn(conn).SetTTL(ttl) - if err != nil { - return errors.Wrap(err, "pinger setting ttl failed") - } - - _, err = conn.Write([]byte("continuously pinging to " + conn.RemoteAddr().String())) - return errors.Wrap(err, "pinging request failed") -} - -func (p *Pinger) getConnection(ip string, port int, pingerPort int) (*net.UDPConn, error) { - udpAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ip, port)) +func (p *Pinger) getConnection(ip string, remotePort int, localPort int) (*net.UDPConn, error) { + udpAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ip, remotePort)) if err != nil { return nil, err } log.Info().Msg("Remote socket: " + udpAddr.String()) - conn, err := net.DialUDP("udp", &net.UDPAddr{Port: pingerPort}, udpAddr) + conn, err := net.DialUDP("udp", &net.UDPAddr{Port: localPort}, udpAddr) if err != nil { return nil, err } @@ -280,16 +235,18 @@ func (p *Pinger) pingReceiver(conn *net.UDPConn, stop <-chan struct{}) error { case <-stop: return errNATPunchAttemptStopped default: + // Add read deadline to prevent possible conn.Read hang when remote peer doesn't send ping ack. + conn.SetReadDeadline(time.Now().Add(p.pingConfig.Timeout * 2)) n, err := conn.Read(buf) - if err != nil { + // Set higher read deadline when NAT proxy is used. + conn.SetReadDeadline(time.Now().Add(12 * time.Hour)) + if err != nil || n == 0 { log.Error().Err(err).Msgf("Failed to read remote peer: %s - attempting to continue", conn.RemoteAddr().String()) continue } - if n > 0 { - log.Info().Msgf("Remote peer data received: %s, len: %d", string(buf[:n]), n) - return nil - } + log.Info().Msgf("Remote peer data received: %s, len: %d", string(buf[:n]), n) + return nil } } } @@ -312,36 +269,66 @@ func (p *Pinger) pingTargetConsumer(pingParams *Params) { return } - log.Info().Msgf("Ping target received: IP: %v, port: %v", pingParams.ConsumerPublicIP, pingParams.ConsumerPort) - if !p.natProxy.isAvailable(pingParams.ProxyPortMappingKey) { - log.Warn().Msgf("NATProxy is not available for this transport protocol key %v", pingParams.ProxyPortMappingKey) - return - } + stop := make(chan struct{}) + defer close(stop) - conn, err := p.getConnection(pingParams.ConsumerPublicIP, pingParams.ConsumerPort, pingParams.ProviderPort) + conn, err := p.multiPing(pingParams.IP, pingParams.ProviderPorts, pingParams.ConsumerPorts, 2, stop) if err != nil { - log.Error().Err(err).Msg("Failed to get connection") + log.Err(err).Msg("Failed to ping remote peer") return } - pingStop := make(chan struct{}) - defer close(pingStop) - go func() { - err := p.ping(conn, pingStop) - if err != nil { - log.Warn().Err(err).Msg("Error while pinging") - } - }() - - err = p.pingReceiver(conn, pingParams.Cancel) + err = ipv4.NewConn(conn).SetTTL(128) if err != nil { - log.Error().Err(err).Msg("Ping receiver error") + log.Error().Err(err).Msg("Failed to set connection TTL") return } - p.eventPublisher.Publish(event.AppTopicTraversal, event.BuildSuccessfulEvent(StageName)) + conn.Write([]byte("Using this connection")) + p.eventPublisher.Publish(event.AppTopicTraversal, event.BuildSuccessfulEvent(StageName)) log.Info().Msg("Ping received, waiting for a new connection") go p.natProxy.handOff(pingParams.ProxyPortMappingKey, conn) } + +func (p *Pinger) multiPing(remoteIP string, localPorts, remotePorts []int, initialTTL int, stop <-chan struct{}) (*net.UDPConn, error) { + if len(localPorts) != len(remotePorts) { + return nil, errors.New("number of local and remote ports does not match") + } + + type res struct { + conn *net.UDPConn + err error + } + + ch := make(chan res, len(localPorts)) + + for i := range localPorts { + go func(i int) { + conn, err := p.singlePing(remoteIP, localPorts[i], remotePorts[i], initialTTL+i, stop) + ch <- res{conn, err} + }(i) + } + + // First responce wins. Other are not important. + r := <-ch + return r.conn, r.err +} + +func (p *Pinger) singlePing(remoteIP string, localPort, remotePort, ttl int, stop <-chan struct{}) (*net.UDPConn, error) { + conn, err := p.getConnection(remoteIP, remotePort, localPort) + if err != nil { + return nil, errors.Wrap(err, "failed to get connection") + } + + go func() { + err := p.ping(conn, ttl, stop) + if err != nil { + log.Warn().Err(err).Msg("Error while pinging") + } + }() + + err = p.pingReceiver(conn, stop) + return conn, errors.Wrap(err, "ping receiver error") +} diff --git a/services/noop/service.go b/services/noop/service.go index 049b8e929e..ab6e9aa92a 100644 --- a/services/noop/service.go +++ b/services/noop/service.go @@ -48,7 +48,7 @@ type Manager struct { // ProvideConfig provides the session configuration func (manager *Manager) ProvideConfig(_ string, _ json.RawMessage) (*session.ConfigParams, error) { - return &session.ConfigParams{TraversalParams: &traversal.Params{Cancel: make(chan struct{})}}, nil + return &session.ConfigParams{TraversalParams: &traversal.Params{}}, nil } // Serve starts service - does block diff --git a/services/openvpn/client.go b/services/openvpn/client.go index 5933ad1dc0..249d61d56d 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -19,6 +19,9 @@ package openvpn import ( "encoding/json" + "fmt" + "net" + "strconv" "sync" "time" @@ -29,6 +32,7 @@ import ( "github.com/mysteriumnetwork/go-openvpn/openvpn/middlewares/state" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" + "github.com/mysteriumnetwork/node/core/port" "github.com/mysteriumnetwork/node/firewall" "github.com/mysteriumnetwork/node/identity" "github.com/mysteriumnetwork/node/nat/traversal" @@ -42,7 +46,7 @@ import ( var ErrProcessNotStarted = errors.New("process not started yet") // processFactory creates a new openvpn process -type processFactory func(options connection.ConnectOptions) (openvpn.Process, *ClientConfig, error) +type processFactory func(options connection.ConnectOptions, sessionConfig *VPNConfig) (openvpn.Process, *ClientConfig, error) // NewClient creates a new openvpn connection func NewClient(openvpnBinary, configDirectory, runtimeDirectory string, @@ -63,12 +67,7 @@ func NewClient(openvpnBinary, configDirectory, runtimeDirectory string, removeAllowedIPRule: func() {}, } - procFactory := func(options connection.ConnectOptions) (openvpn.Process, *ClientConfig, error) { - sessionConfig := &VPNConfig{} - err := json.Unmarshal(options.SessionConfig, sessionConfig) - if err != nil { - return nil, nil, err - } + procFactory := func(options connection.ConnectOptions, sessionConfig *VPNConfig) (openvpn.Process, *ClientConfig, error) { // override vpnClientConfig params with proxy local IP and pinger port // do this only if connecting to natted provider @@ -108,7 +107,7 @@ type Client struct { processFactory processFactory ipResolver ip.Resolver natPinger traversal.NATProviderPinger - pingerStop chan struct{} + ports []int removeAllowedIPRule func() stopOnce sync.Once } @@ -130,7 +129,43 @@ func (c *Client) Statistics() (connection.Statistics, error) { // Start starts the connection func (c *Client) Start(options connection.ConnectOptions) error { log.Info().Msg("Starting connection") - proc, clientConfig, err := c.processFactory(options) + + sessionConfig := &VPNConfig{} + err := json.Unmarshal(options.SessionConfig, sessionConfig) + if err != nil { + return err + } + + if len(sessionConfig.Ports) == 0 || len(c.ports) == 0 { + c.ports = []int{sessionConfig.LocalPort} + sessionConfig.Ports = []int{sessionConfig.RemotePort} + } + + conn, err := c.natPinger.PingProvider( + sessionConfig.RemoteIP, + c.ports, + sessionConfig.Ports, + sessionConfig.LocalPort, + ) + if err != nil { + return err + } + + _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + if err != nil { + return err + } + + _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } + + sessionConfig.LocalPort, _ = strconv.Atoi(lPort) + sessionConfig.RemotePort, _ = strconv.Atoi(rPort) + + proc, clientConfig, err := c.processFactory(options, sessionConfig) + log.Info().Msg(fmt.Sprint(clientConfig.VpnConfig)) if err != nil { log.Info().Err(err).Msg("Client config factory error") return err @@ -143,19 +178,6 @@ func (c *Client) Start(options connection.ConnectOptions) error { } c.removeAllowedIPRule = removeAllowedIPRule - if clientConfig.VpnConfig.LocalPort > 0 { - err = c.natPinger.PingProvider( - clientConfig.VpnConfig.OriginalRemoteIP, - clientConfig.VpnConfig.OriginalRemotePort, - clientConfig.LocalPort, - clientConfig.LocalPort+1, - c.pingerStop, - ) - if err != nil { - removeAllowedIPRule() - return err - } - } err = c.process.Start() if err != nil { removeAllowedIPRule() @@ -178,7 +200,6 @@ func (c *Client) Stop() { c.process.Stop() } c.removeAllowedIPRule() - close(c.pingerStop) }) } @@ -205,8 +226,20 @@ func (c *Client) GetConfig() (connection.ConsumerConfig, error) { return nil, errors.Wrap(err, "failed to get consumer public IP") } + pool := port.NewPool() + + for i := 0; i < 10; i++ { + cp, err := pool.Acquire() + if err != nil { + return nil, err + } + + c.ports = append(c.ports, cp.Num()) + } + return &ConsumerConfig{ - IP: publicIP, + IP: publicIP, + Ports: c.ports, }, nil } @@ -220,6 +253,7 @@ type VPNConfig struct { RemoteIP string `json:"remote"` RemotePort int `json:"port"` LocalPort int `json:"lport"` + Ports []int `json:"ports"` RemoteProtocol string `json:"protocol"` TLSPresharedKey string `json:"TLSPresharedKey"` CACertificate string `json:"CACertificate"` diff --git a/services/openvpn/client_config.go b/services/openvpn/client_config.go index 976071b513..35925fd8f6 100644 --- a/services/openvpn/client_config.go +++ b/services/openvpn/client_config.go @@ -27,7 +27,6 @@ import ( // ClientConfig represents specific "openvpn as client" configuration type ClientConfig struct { *config.GenericConfig - LocalPort int VpnConfig *VPNConfig } @@ -42,7 +41,6 @@ func (c *ClientConfig) SetClientMode(serverIP string, serverPort, localPort int) c.SetFlag("float") // more on this: https://www.v13.gr/blog/?p=386 c.SetParam("remote-cert-ku", "84") - c.LocalPort = localPort c.SetFlag("auth-user-pass") c.SetFlag("management-query-passwords") } diff --git a/services/openvpn/openvpn.go b/services/openvpn/openvpn.go index a403fe3b48..5bd68e84cf 100644 --- a/services/openvpn/openvpn.go +++ b/services/openvpn/openvpn.go @@ -19,5 +19,6 @@ package openvpn // ConsumerConfig is used for sending some configuration from consumer to provider type ConsumerConfig struct { - IP string `json:"Ip,omitempty"` + IP string `json:"Ip,omitempty"` + Ports []int `json:"Ports",omitempty"` } diff --git a/services/openvpn/service/manager.go b/services/openvpn/service/manager.go index 3ddbcaf197..3c3dfe496c 100644 --- a/services/openvpn/service/manager.go +++ b/services/openvpn/service/manager.go @@ -244,10 +244,7 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi return nil, errors.New("Service port not initialized") } - traversalParams := &traversal.Params{ - Cancel: make(chan struct{}), - ProviderPort: m.vpnServerPort, - } + traversalParams := &traversal.Params{} publicIP, err := m.ipResolver.GetPublicIP() if err != nil { @@ -265,6 +262,7 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi if m.dnsOK { vpnConfig.DNSIPs = m.dnsIP.String() } + vpnConfig.Ports = []int{} // TODO This line will not be required once we will have unique VPN config for every connection. // Older clients do not send any sessionConfig, but we should keep back compatibility and not fail in this case. if sessionConfig != nil && len(sessionConfig) > 0 && m.natPinger.Valid() { @@ -274,27 +272,25 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi return nil, errors.Wrap(err, "could not parse consumer config") } - if m.behindNAT(publicIP) && m.portMappingFailed() { - pp, err := m.natPingerPorts.Acquire() - if err != nil { - return nil, err - } + if m.location.BehindNAT() && m.portMappingFailed() { + for range consumerConfig.Ports { + pp, err := m.natPingerPorts.Acquire() + if err != nil { + return nil, err + } - cp, err := m.natPingerPorts.Acquire() - if err != nil { - return nil, err + vpnConfig.Ports = append(vpnConfig.Ports, pp.Num()) } - traversalParams.ProviderPort = pp.Num() - traversalParams.ConsumerPort = cp.Num() // For OpenVPN only one running NAT proxy required. - traversalParams.ProxyPortMappingKey = openvpn_service.ServiceType - vpnConfig.LocalPort = traversalParams.ConsumerPort - vpnConfig.RemotePort = traversalParams.ProviderPort if consumerConfig.IP == "" { return nil, errors.New("remote party does not support NAT Hole punching, public IP is missing") } - traversalParams.ConsumerPublicIP = consumerConfig.IP + + traversalParams.IP = consumerConfig.IP + traversalParams.ProviderPorts = vpnConfig.Ports + traversalParams.ConsumerPorts = consumerConfig.Ports + traversalParams.ProxyPortMappingKey = openvpn_service.ServiceType } } From 95b606ca73e833c09d77de07816cfbba1b08c8b9 Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Mon, 17 Feb 2020 20:43:57 +0600 Subject: [PATCH 03/13] Backward NAT hole punching compatibility for older clients --- services/openvpn/client.go | 48 +++++++++++++++-------------- services/openvpn/service/manager.go | 11 +++++++ session/manager.go | 1 - 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/services/openvpn/client.go b/services/openvpn/client.go index 249d61d56d..fba03a7659 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -136,33 +136,35 @@ func (c *Client) Start(options connection.ConnectOptions) error { return err } - if len(sessionConfig.Ports) == 0 || len(c.ports) == 0 { - c.ports = []int{sessionConfig.LocalPort} - sessionConfig.Ports = []int{sessionConfig.RemotePort} - } + if sessionConfig.LocalPort == 0 && len(sessionConfig.Ports) > 0 { + if len(sessionConfig.Ports) == 0 || len(c.ports) == 0 { + c.ports = []int{sessionConfig.LocalPort} + sessionConfig.Ports = []int{sessionConfig.RemotePort} + } - conn, err := c.natPinger.PingProvider( - sessionConfig.RemoteIP, - c.ports, - sessionConfig.Ports, - sessionConfig.LocalPort, - ) - if err != nil { - return err - } + conn, err := c.natPinger.PingProvider( + sessionConfig.RemoteIP, + c.ports, + sessionConfig.Ports, + sessionConfig.LocalPort, + ) + if err != nil { + return err + } - _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) - if err != nil { - return err - } + _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + if err != nil { + return err + } - _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) - if err != nil { - return err - } + _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } - sessionConfig.LocalPort, _ = strconv.Atoi(lPort) - sessionConfig.RemotePort, _ = strconv.Atoi(rPort) + sessionConfig.LocalPort, _ = strconv.Atoi(lPort) + sessionConfig.RemotePort, _ = strconv.Atoi(rPort) + } proc, clientConfig, err := c.processFactory(options, sessionConfig) log.Info().Msg(fmt.Sprint(clientConfig.VpnConfig)) diff --git a/services/openvpn/service/manager.go b/services/openvpn/service/manager.go index 3c3dfe496c..a9d7b35655 100644 --- a/services/openvpn/service/manager.go +++ b/services/openvpn/service/manager.go @@ -272,6 +272,16 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi return nil, errors.Wrap(err, "could not parse consumer config") } + if len(consumerConfig.Ports) == 0 { + cp, err := m.natPingerPorts.Acquire() + if err != nil { + return nil, err + } + + consumerConfig.Ports = []int{cp.Num(), cp.Num(), cp.Num(), cp.Num()} + vpnConfig.LocalPort = cp.Num() + } + if m.location.BehindNAT() && m.portMappingFailed() { for range consumerConfig.Ports { pp, err := m.natPingerPorts.Acquire() @@ -280,6 +290,7 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi } vpnConfig.Ports = append(vpnConfig.Ports, pp.Num()) + vpnConfig.RemotePort = pp.Num() } // For OpenVPN only one running NAT proxy required. diff --git a/session/manager.go b/session/manager.go index 1dc21f8701..c3996d21ce 100644 --- a/session/manager.go +++ b/session/manager.go @@ -146,7 +146,6 @@ func (manager *Manager) Start(session *Session, consumerID identity.Identity, co // stop the balance tracker once the session is finished go func() { <-session.done - close(pingerParams.Cancel) engine.Stop() }() From e656d1519737dd21578da4905f159f0580ae3ecc Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Tue, 18 Feb 2020 09:51:52 +0600 Subject: [PATCH 04/13] Wireguard TTL based NAT hole punching --- nat/traversal/pinger_test.go | 2 +- services/wireguard/connection/connection.go | 49 ++++++++++++++++----- services/wireguard/service/service_unix.go | 35 ++++++--------- services/wireguard/wireguard.go | 12 +++-- 4 files changed, 62 insertions(+), 36 deletions(-) diff --git a/nat/traversal/pinger_test.go b/nat/traversal/pinger_test.go index 0cddf91307..a493e390c3 100644 --- a/nat/traversal/pinger_test.go +++ b/nat/traversal/pinger_test.go @@ -40,7 +40,7 @@ func TestPinger_Start_Stop(t *testing.T) { pinger.Stop() } -func TestPinger_Provider_Consumser_Ping_Flow(t *testing.T) { +func TestPinger_Provider_Consumer_Ping_Flow(t *testing.T) { providerProxyPort := 51199 providerPort := 51200 consumerPort := 51201 diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index 5fa58da231..5ac1c28b32 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -20,11 +20,13 @@ package connection import ( "encoding/json" "net" + "strconv" "sync" "time" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" + "github.com/mysteriumnetwork/node/core/port" "github.com/mysteriumnetwork/node/firewall" "github.com/mysteriumnetwork/node/nat/traversal" wg "github.com/mysteriumnetwork/node/services/wireguard" @@ -48,7 +50,6 @@ func NewConnection(opts Options, ipResolver ip.Resolver, natPinger traversal.NAT return &Connection{ done: make(chan struct{}), - pingerStop: make(chan struct{}), stateCh: make(chan connection.State, 100), privateKey: privateKey, opts: opts, @@ -62,11 +63,11 @@ func NewConnection(opts Options, ipResolver ip.Resolver, natPinger traversal.NAT // Connection which does wireguard tunneling. type Connection struct { - stopOnce sync.Once - done chan struct{} - pingerStop chan struct{} - stateCh chan connection.State + stopOnce sync.Once + done chan struct{} + stateCh chan connection.State + ports []int privateKey string ipResolver ip.Resolver connectionEndpoint wg.ConnectionEndpoint @@ -119,17 +120,31 @@ func (c *Connection) Start(options connection.ConnectOptions) (err error) { c.stateCh <- connection.Connecting - if config.LocalPort > 0 { - err = c.natPinger.PingProvider( + if config.LocalPort > 0 || len(config.Ports) > 0 { + conn, err := c.natPinger.PingProvider( config.Provider.Endpoint.IP.String(), - config.RemotePort, - config.LocalPort, + c.ports, + config.Ports, 0, - c.pingerStop, ) if err != nil { return errors.Wrap(err, "could not ping provider") } + + _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + if err != nil { + return err + } + + _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } + + config.LocalPort, _ = strconv.Atoi(lPort) + config.Provider.Endpoint.Port, _ = strconv.Atoi(rPort) + + conn.Close() } log.Info().Msg("Starting new connection") @@ -217,9 +232,22 @@ func (c *Connection) GetConfig() (connection.ConsumerConfig, error) { return nil, errors.Wrap(err, "failed to get consumer public IP") } } + + pool := port.NewPool() + + for i := 0; i < 10; i++ { + cp, err := pool.Acquire() + if err != nil { + return nil, err + } + + c.ports = append(c.ports, cp.Num()) + } + return wg.ConsumerConfig{ PublicKey: publicKey, IP: publicIP, + Ports: c.ports, }, nil } @@ -249,7 +277,6 @@ func (c *Connection) Stop() { c.stateCh <- connection.NotConnected - close(c.pingerStop) close(c.stateCh) close(c.done) }) diff --git a/services/wireguard/service/service_unix.go b/services/wireguard/service/service_unix.go index 645503238a..f1057d0e9b 100644 --- a/services/wireguard/service/service_unix.go +++ b/services/wireguard/service/service_unix.go @@ -179,7 +179,7 @@ func (m *Manager) ProvideConfig(sessionID string, sessionConfig json.RawMessage) config.Consumer.ConnectDelay = m.connectDelayMS } - if err := m.addConsumerPeer(conn, traversalParams.ConsumerPort, traversalParams.ProviderPort, consumerConfig.PublicKey); err != nil { + if err := m.addConsumerPeer(conn, 0, 0, consumerConfig.PublicKey); err != nil { return nil, errors.Wrap(err, "could not add consumer peer") } @@ -294,8 +294,7 @@ func (m *Manager) addConsumerPeer(conn wg.ConnectionEndpoint, consumerPort, prov } func (m *Manager) addTraversalParams(config wg.ServiceConfig, traversalParams traversal.Params) (wg.ServiceConfig, error) { - config.LocalPort = traversalParams.ConsumerPort - config.RemotePort = traversalParams.ProviderPort + config.Ports = traversalParams.ProviderPorts // Provide new provider endpoint which points to providers NAT Proxy. newProviderEndpoint, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", config.Provider.Endpoint.IP, config.RemotePort)) @@ -309,33 +308,27 @@ func (m *Manager) addTraversalParams(config wg.ServiceConfig, traversalParams tr return config, nil } -func (m *Manager) newTraversalParams(natPingerEnabled bool, consumserConfig wg.ConsumerConfig) (traversal.Params, error) { - params := traversal.Params{ - Cancel: make(chan struct{}), - } - +func (m *Manager) newTraversalParams(natPingerEnabled bool, consumerConfig wg.ConsumerConfig) (params traversal.Params, err error) { if !natPingerEnabled { return params, nil } - pp, err := m.natPingerPorts.Acquire() - if err != nil { - return params, errors.Wrap(err, "could not acquire NAT pinger provider port") - } + for range consumerConfig.Ports { + pp, err := m.natPingerPorts.Acquire() + if err != nil { + return params, errors.Wrap(err, "could not acquire NAT pinger provider port") + } - cp, err := m.natPingerPorts.Acquire() - if err != nil { - return params, errors.Wrap(err, "could not acquire NAT pinger consumer port") + params.ProviderPorts = append(params.ProviderPorts, pp.Num()) } - params.ProviderPort = pp.Num() - params.ConsumerPort = cp.Num() - params.ProxyPortMappingKey = fmt.Sprintf("%s_%d", wg.ServiceType, params.ProviderPort) - - if consumserConfig.IP == "" { + if consumerConfig.IP == "" { return params, errors.New("remote party does not support NAT Hole punching, public IP is missing") } - params.ConsumerPublicIP = consumserConfig.IP + + params.IP = consumerConfig.IP + params.ConsumerPorts = consumerConfig.Ports + params.ProxyPortMappingKey = fmt.Sprintf("%s_%s", wg.ServiceType, consumerConfig.PublicKey) return params, nil } diff --git a/services/wireguard/wireguard.go b/services/wireguard/wireguard.go index 00dc0b2518..fe4d9157f0 100644 --- a/services/wireguard/wireguard.go +++ b/services/wireguard/wireguard.go @@ -81,14 +81,16 @@ type ProviderModeConfig struct { type ConsumerConfig struct { PublicKey string `json:"PublicKey"` // IP is needed when provider is behind NAT. In such case provider parses this IP and tries to ping consumer. - IP string `json:"IP,omitempty"` + IP string `json:"IP,omitempty"` + Ports []int `json:"Ports"` } // ServiceConfig represent a Wireguard service provider configuration that will be passed to the consumer for establishing a connection. type ServiceConfig struct { // LocalPort and RemotePort are needed for NAT hole punching only. - LocalPort int `json:"-"` - RemotePort int `json:"-"` + LocalPort int `json:"-"` + RemotePort int `json:"-"` + Ports []int `json:"Ports"` Provider struct { PublicKey string @@ -116,9 +118,11 @@ func (s ServiceConfig) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { LocalPort int `json:"local_port"` RemotePort int `json:"remote_port"` + Ports []int `json:"ports"` Provider provider `json:"provider"` Consumer consumer `json:"consumer"` }{ + Ports: s.Ports, LocalPort: s.LocalPort, RemotePort: s.RemotePort, Provider: provider{ @@ -147,6 +151,7 @@ func (s *ServiceConfig) UnmarshalJSON(data []byte) error { var config struct { LocalPort int `json:"local_port"` RemotePort int `json:"remote_port"` + Ports []int `json:"ports"` Provider provider `json:"provider"` Consumer consumer `json:"consumer"` } @@ -164,6 +169,7 @@ func (s *ServiceConfig) UnmarshalJSON(data []byte) error { return err } + s.Ports = config.Ports s.LocalPort = config.LocalPort s.RemotePort = config.RemotePort s.Provider.Endpoint = *endpoint From 934d4afaeb37930d6a8f79547e07ec79a9980308 Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Tue, 18 Feb 2020 10:46:44 +0600 Subject: [PATCH 05/13] Wireguard backward compatibility for TTL based NAT hole punching --- services/wireguard/service/service_unix.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/services/wireguard/service/service_unix.go b/services/wireguard/service/service_unix.go index f1057d0e9b..3578652924 100644 --- a/services/wireguard/service/service_unix.go +++ b/services/wireguard/service/service_unix.go @@ -179,7 +179,7 @@ func (m *Manager) ProvideConfig(sessionID string, sessionConfig json.RawMessage) config.Consumer.ConnectDelay = m.connectDelayMS } - if err := m.addConsumerPeer(conn, 0, 0, consumerConfig.PublicKey); err != nil { + if err := m.addConsumerPeer(conn, config.LocalPort, config.RemotePort, consumerConfig.PublicKey); err != nil { return nil, errors.Wrap(err, "could not add consumer peer") } @@ -305,6 +305,11 @@ func (m *Manager) addTraversalParams(config wg.ServiceConfig, traversalParams tr // There is no need to add any connect delay when port mapping failed. config.Consumer.ConnectDelay = 0 + // TODO this backward compatibility block needs to be removed once we will start using port ranges for all peers. + config.LocalPort = traversalParams.ConsumerPorts[len(traversalParams.ConsumerPorts)-1] + config.RemotePort = traversalParams.ProviderPorts[len(traversalParams.ProviderPorts)-1] + config.Provider.Endpoint.Port = config.RemotePort + return config, nil } @@ -313,6 +318,16 @@ func (m *Manager) newTraversalParams(natPingerEnabled bool, consumerConfig wg.Co return params, nil } + if len(consumerConfig.Ports) == 0 { + cp, err := m.natPingerPorts.Acquire() + if err != nil { + return params, err + } + + // TODO this backward compatibility block needs to be removed once we will start using port ranges for all peers. + consumerConfig.Ports = []int{cp.Num(), cp.Num(), cp.Num(), cp.Num()} + } + for range consumerConfig.Ports { pp, err := m.natPingerPorts.Acquire() if err != nil { From 20c022d7de312f9de2b8630cf49d50e137e4777a Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Tue, 18 Feb 2020 11:17:46 +0600 Subject: [PATCH 06/13] Fix unit tests for TTL based NAT hole punching --- nat/traversal/pinger_test.go | 12 +++++------ services/openvpn/client_test.go | 7 +++--- services/wireguard/connection/connection.go | 24 +++++++++++---------- services/wireguard/wireguard_test.go | 8 +++---- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/nat/traversal/pinger_test.go b/nat/traversal/pinger_test.go index a493e390c3..f24415dde9 100644 --- a/nat/traversal/pinger_test.go +++ b/nat/traversal/pinger_test.go @@ -72,11 +72,10 @@ func TestPinger_Provider_Consumer_Ping_Flow(t *testing.T) { go func() { pinger.BindServicePort("wg1", providerProxyPort) p := &Params{ - ProviderPort: providerPort, - ConsumerPort: consumerPort, - ConsumerPublicIP: "127.0.0.1", + ProviderPorts: []int{providerPort}, + ConsumerPorts: []int{consumerPort}, + IP: "127.0.0.1", ProxyPortMappingKey: "wg1", - Cancel: make(chan struct{}), } pinger.PingTarget(p) }() @@ -87,7 +86,7 @@ func TestPinger_Provider_Consumer_Ping_Flow(t *testing.T) { // Start pinging provider. stop := make(chan struct{}) defer close(stop) - err := pinger.PingProvider("127.0.0.1", providerPort, consumerPort, consumerPort+1, stop) + _, err := pinger.PingProvider("127.0.0.1", []int{consumerPort}, []int{providerPort}, consumerPort+1) assert.NoError(t, err) assert.Contains(t, string(proxyBuf), fmt.Sprintf("continuously pinging to 127.0.0.1:%d", providerPort)) @@ -113,7 +112,8 @@ func TestPinger_PingProvider_Timeout(t *testing.T) { stop := make(chan struct{}) defer close(stop) - err := pinger.PingProvider("127.0.0.1", providerPort, consumerPort, 0, stop) + _, err := pinger.PingProvider("127.0.0.1", []int{providerPort}, []int{consumerPort}, 0) + assert.Error(t, errNATPunchAttemptTimedOut, err) } diff --git a/services/openvpn/client_test.go b/services/openvpn/client_test.go index 07392ff0f3..dea36218bc 100644 --- a/services/openvpn/client_test.go +++ b/services/openvpn/client_test.go @@ -18,6 +18,7 @@ package openvpn import ( + "net" "testing" "github.com/mysteriumnetwork/node/core/connection" @@ -44,10 +45,10 @@ func TestConnection_CreatesConnection(t *testing.T) { assert.NotNil(t, conn) } -// MockNATPinger returns a mock nat pinger, that really doesnt do much +// MockNATPinger returns a mock nat pinger, that really doesn't do much type MockNATPinger struct{} // PingProvider does nothing -func (mnp *MockNATPinger) PingProvider(_ string, port, consumerPort, proxyPort int, _ <-chan struct{}) error { - return nil +func (mnp *MockNATPinger) PingProvider(_ string, _, _ []int, _ int) (*net.UDPConn, error) { + return nil, nil } diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index 5ac1c28b32..aa02174a2c 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -131,20 +131,22 @@ func (c *Connection) Start(options connection.ConnectOptions) (err error) { return errors.Wrap(err, "could not ping provider") } - _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) - if err != nil { - return err - } + if conn != nil { + _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + if err != nil { + return err + } - _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) - if err != nil { - return err - } + _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } - config.LocalPort, _ = strconv.Atoi(lPort) - config.Provider.Endpoint.Port, _ = strconv.Atoi(rPort) + config.LocalPort, _ = strconv.Atoi(lPort) + config.Provider.Endpoint.Port, _ = strconv.Atoi(rPort) - conn.Close() + conn.Close() + } } log.Info().Msg("Starting new connection") diff --git a/services/wireguard/wireguard_test.go b/services/wireguard/wireguard_test.go index 026e876b4d..bf70c3093e 100644 --- a/services/wireguard/wireguard_test.go +++ b/services/wireguard/wireguard_test.go @@ -42,7 +42,7 @@ func Test_PaymentMethod_Serialize(t *testing.T) { }, `{ "bytes":0, - "duration":0, + "duration":0, "type":"", "price": { "amount": 50000000, @@ -54,7 +54,7 @@ func Test_PaymentMethod_Serialize(t *testing.T) { pingpong.PaymentMethod{}, `{ "bytes":0, - "duration":0, + "duration":0, "type":"", "price": {} }`, @@ -80,7 +80,7 @@ func Test_PaymentMethod_Unserialize(t *testing.T) { { `{ "bytes":1, - "duration":2, + "duration":2, "type":"test", "price": { "amount": 50000000, @@ -272,7 +272,7 @@ func TestServiceConfig_MarshalJSON(t *testing.T) { configBytes, err := json.Marshal(config) assert.NoError(t, err) assert.Equal(t, - `{"local_port":51000,"remote_port":51001,"provider":{"public_key":"wg1","endpoint":"127.0.0.1:51001"},"consumer":{"ip_address":"127.0.0.1/25","dns_ips":"128.0.0.1","connect_delay":3000}}`, + `{"local_port":51000,"remote_port":51001,"ports":null,"provider":{"public_key":"wg1","endpoint":"127.0.0.1:51001"},"consumer":{"ip_address":"127.0.0.1/25","dns_ips":"128.0.0.1","connect_delay":3000}}`, string(configBytes), ) } From 712df226f1ecd852b26ce562937dc1f7744fabf7 Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Tue, 18 Feb 2020 11:53:52 +0600 Subject: [PATCH 07/13] TTL based NAT hole punching for mobile --- mobile/mysterium/openvpn_connection_setup.go | 61 ++++++++++++------- .../mysterium/wireguard_connection_setup.go | 31 +++++++--- services/openvpn/client.go | 2 - services/openvpn/openvpn.go | 2 +- 4 files changed, 62 insertions(+), 34 deletions(-) diff --git a/mobile/mysterium/openvpn_connection_setup.go b/mobile/mysterium/openvpn_connection_setup.go index f29055ab3d..ae0c87d5e7 100644 --- a/mobile/mysterium/openvpn_connection_setup.go +++ b/mobile/mysterium/openvpn_connection_setup.go @@ -19,6 +19,8 @@ package mysterium import ( "encoding/json" + "net" + "strconv" "sync" "time" @@ -38,7 +40,7 @@ type natPinger interface { SetProtectSocketCallback(SocketProtect func(socket int) bool) } -type openvpn3SessionFactory func(connection.ConnectOptions) (*openvpn3.Session, *openvpn.ClientConfig, error) +type openvpn3SessionFactory func(connection.ConnectOptions, *openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) var errSessionWrapperNotStarted = errors.New("session wrapper not started") @@ -50,21 +52,12 @@ func NewOpenVPNConnection(sessionTracker *sessionTracker, signerFactory identity ipResolver: ipResolver, pingerStop: make(chan struct{}), } - sessionFactory := func(options connection.ConnectOptions) (*openvpn3.Session, *openvpn.ClientConfig, error) { - sessionConfig := &openvpn.VPNConfig{} - err := json.Unmarshal(options.SessionConfig, sessionConfig) - if err != nil { - return nil, nil, err - } - + sessionFactory := func(options connection.ConnectOptions, sessionConfig *openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) { // override vpnClientConfig params with proxy local IP and pinger port // do this only if connecting to natted provider if sessionConfig.LocalPort > 0 { sessionConfig.OriginalRemoteIP = sessionConfig.RemoteIP sessionConfig.OriginalRemotePort = sessionConfig.RemotePort - sessionConfig.RemoteIP = "127.0.0.1" - // TODO: randomize this too? - sessionConfig.RemotePort = sessionConfig.LocalPort + 1 } vpnClientConfig, err := openvpn.NewClientConfigFromSession(sessionConfig, "", "", connection.DNSOptionAuto) @@ -106,7 +99,7 @@ func NewOpenVPNConnection(sessionTracker *sessionTracker, signerFactory identity } type openvpnConnection struct { - pingerStop chan struct{} + ports []int stateCh chan connection.State stats connection.Statistics statsMu sync.RWMutex @@ -159,24 +152,48 @@ func (c *openvpnConnection) Statistics() (connection.Statistics, error) { } func (c *openvpnConnection) Start(options connection.ConnectOptions) error { - newSession, clientConfig, err := c.createSession(options) + sessionConfig := &openvpn.VPNConfig{} + err := json.Unmarshal(options.SessionConfig, sessionConfig) if err != nil { return err } - log.Info().Msgf("Client config after session create: %v", clientConfig) - if clientConfig.LocalPort > 0 { - err := c.natPinger.PingProvider( - clientConfig.VpnConfig.OriginalRemoteIP, - clientConfig.VpnConfig.OriginalRemotePort, - clientConfig.LocalPort, - clientConfig.LocalPort+1, - c.pingerStop, + if sessionConfig.LocalPort == 0 && len(sessionConfig.Ports) > 0 { + if len(sessionConfig.Ports) == 0 || len(c.ports) == 0 { + c.ports = []int{sessionConfig.LocalPort} + sessionConfig.Ports = []int{sessionConfig.RemotePort} + } + + conn, err := c.natPinger.PingProvider( + sessionConfig.RemoteIP, + c.ports, + sessionConfig.Ports, + sessionConfig.LocalPort, ) if err != nil { return err } + + _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + if err != nil { + return err + } + + _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } + + sessionConfig.LocalPort, _ = strconv.Atoi(lPort) + sessionConfig.RemotePort, _ = strconv.Atoi(rPort) + } + + newSession, clientConfig, err := c.createSession(options, sessionConfig) + if err != nil { + log.Info().Err(err).Msg("Client config factory error") + return err } + log.Info().Interface("data", clientConfig).Msgf("Openvpn client configuration") c.session = newSession c.session.Start() @@ -188,8 +205,6 @@ func (c *openvpnConnection) Stop() { if c.session != nil { c.session.Stop() } - log.Info().Msg("Stopping NATProxy") - close(c.pingerStop) }) } diff --git a/mobile/mysterium/wireguard_connection_setup.go b/mobile/mysterium/wireguard_connection_setup.go index 732de4ceb9..f31bedf3c4 100644 --- a/mobile/mysterium/wireguard_connection_setup.go +++ b/mobile/mysterium/wireguard_connection_setup.go @@ -20,6 +20,8 @@ package mysterium import ( "bufio" "encoding/json" + "net" + "strconv" "strings" "sync" "time" @@ -68,7 +70,6 @@ func NewWireGuardConnection(opts wireGuardOptions, device wireguardDevice, ipRes return &wireguardConnection{ done: make(chan struct{}), - pingerStop: make(chan struct{}), stateCh: make(chan connection.State, 100), opts: opts, device: device, @@ -82,7 +83,6 @@ func NewWireGuardConnection(opts wireGuardOptions, device wireguardDevice, ipRes type wireguardConnection struct { closeOnce sync.Once done chan struct{} - pingerStop chan struct{} stateCh chan connection.State opts wireGuardOptions privateKey string @@ -125,17 +125,33 @@ func (c *wireguardConnection) Start(options connection.ConnectOptions) (err erro } }() - if config.LocalPort > 0 { - err = c.natPinger.PingProvider( + if config.LocalPort > 0 || len(config.Ports) > 0 { + conn, err := c.natPinger.PingProvider( config.Provider.Endpoint.IP.String(), - config.RemotePort, - config.LocalPort, + c.ports, + config.Ports, 0, - c.pingerStop, ) if err != nil { return errors.Wrap(err, "could not ping provider") } + + if conn != nil { + _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + if err != nil { + return err + } + + _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + return err + } + + config.LocalPort, _ = strconv.Atoi(lPort) + config.Provider.Endpoint.Port, _ = strconv.Atoi(rPort) + + conn.Close() + } } if err := c.device.Start(c.privateKey, config); err != nil { @@ -162,7 +178,6 @@ func (c *wireguardConnection) Stop() { c.device.Stop() c.stateCh <- connection.NotConnected - close(c.pingerStop) close(c.stateCh) close(c.done) }) diff --git a/services/openvpn/client.go b/services/openvpn/client.go index fba03a7659..419070157d 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -19,7 +19,6 @@ package openvpn import ( "encoding/json" - "fmt" "net" "strconv" "sync" @@ -167,7 +166,6 @@ func (c *Client) Start(options connection.ConnectOptions) error { } proc, clientConfig, err := c.processFactory(options, sessionConfig) - log.Info().Msg(fmt.Sprint(clientConfig.VpnConfig)) if err != nil { log.Info().Err(err).Msg("Client config factory error") return err diff --git a/services/openvpn/openvpn.go b/services/openvpn/openvpn.go index 5bd68e84cf..cdeffbb616 100644 --- a/services/openvpn/openvpn.go +++ b/services/openvpn/openvpn.go @@ -20,5 +20,5 @@ package openvpn // ConsumerConfig is used for sending some configuration from consumer to provider type ConsumerConfig struct { IP string `json:"Ip,omitempty"` - Ports []int `json:"Ports",omitempty"` + Ports []int `json:"Ports,omitempty"` } From c71e594505e96654f69cdcc8224cee5ced2c177a Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Wed, 19 Feb 2020 12:19:54 +0600 Subject: [PATCH 08/13] Refactor NAT traversal params to use for both provider and consumer --- cmd/di.go | 2 +- core/service/stub_service.go | 2 +- mobile/mysterium/openvpn_connection_setup.go | 30 +++++----- .../mysterium/wireguard_connection_setup.go | 28 ++++------ nat/traversal/nat_proxy.go | 22 ++++---- nat/traversal/noop.go | 4 +- nat/traversal/pinger.go | 55 +++++++++---------- nat/traversal/pinger_test.go | 18 ++++-- services/noop/service.go | 2 +- services/openvpn/client.go | 29 +++++----- services/openvpn/client_test.go | 3 +- services/openvpn/service/manager.go | 12 ++-- services/wireguard/connection/connection.go | 28 ++++------ services/wireguard/service/service_unix.go | 12 ++-- services/wireguard/service/service_windows.go | 2 +- services/wireguard/wireguard.go | 2 +- session/create_consumer.go | 2 +- session/create_consumer_test.go | 4 +- session/manager.go | 8 +-- session/manager_test.go | 18 +++--- 20 files changed, 136 insertions(+), 147 deletions(-) diff --git a/cmd/di.go b/cmd/di.go index 00e7c740c5..621370e09a 100644 --- a/cmd/di.go +++ b/cmd/di.go @@ -574,7 +574,7 @@ func newSessionManagerFactory( sessionStorage *session.EventBasedStorage, providerInvoiceStorage *pingpong.ProviderInvoiceStorage, accountantPromiseStorage *pingpong.AccountantPromiseStorage, - natPingerChan func(*traversal.Params), + natPingerChan func(traversal.Params), natTracker *event.Tracker, serviceID string, eventbus eventbus.EventBus, diff --git a/core/service/stub_service.go b/core/service/stub_service.go index bb1aeaea6b..0419e692b2 100644 --- a/core/service/stub_service.go +++ b/core/service/stub_service.go @@ -56,7 +56,7 @@ func (service *serviceFake) GetType() string { } func (service *serviceFake) ProvideConfig(_ string, _ json.RawMessage) (*session.ConfigParams, error) { - return &session.ConfigParams{TraversalParams: &traversal.Params{}}, nil + return &session.ConfigParams{TraversalParams: traversal.Params{}}, nil } type mockDialogWaiter struct { diff --git a/mobile/mysterium/openvpn_connection_setup.go b/mobile/mysterium/openvpn_connection_setup.go index ae0c87d5e7..8623de3713 100644 --- a/mobile/mysterium/openvpn_connection_setup.go +++ b/mobile/mysterium/openvpn_connection_setup.go @@ -20,7 +20,6 @@ package mysterium import ( "encoding/json" "net" - "strconv" "sync" "time" @@ -158,34 +157,31 @@ func (c *openvpnConnection) Start(options connection.ConnectOptions) error { return err } - if sessionConfig.LocalPort == 0 && len(sessionConfig.Ports) > 0 { - if len(sessionConfig.Ports) == 0 || len(c.ports) == 0 { + // TODO this backward compatibility check needs to be removed once we will start using port ranges for all peers. + if sessionConfig.LocalPort > 0 || len(sessionConfig.Ports) > 0 { + if len(c.ports) == 0 { c.ports = []int{sessionConfig.LocalPort} sessionConfig.Ports = []int{sessionConfig.RemotePort} } - conn, err := c.natPinger.PingProvider( - sessionConfig.RemoteIP, - c.ports, - sessionConfig.Ports, - sessionConfig.LocalPort, - ) - if err != nil { - return err + params := traversal.Params{ + IP: sessionConfig.RemoteIP, + LocalPorts: c.ports, + RemotePorts: sessionConfig.Ports, } - _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + conn, err := c.natPinger.PingProvider(params, sessionConfig.LocalPort) if err != nil { return err } - _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) - if err != nil { - return err + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + sessionConfig.LocalPort = addr.Port } - sessionConfig.LocalPort, _ = strconv.Atoi(lPort) - sessionConfig.RemotePort, _ = strconv.Atoi(rPort) + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + sessionConfig.RemotePort = addr.Port + } } newSession, clientConfig, err := c.createSession(options, sessionConfig) diff --git a/mobile/mysterium/wireguard_connection_setup.go b/mobile/mysterium/wireguard_connection_setup.go index f31bedf3c4..5099c13fc6 100644 --- a/mobile/mysterium/wireguard_connection_setup.go +++ b/mobile/mysterium/wireguard_connection_setup.go @@ -21,7 +21,6 @@ import ( "bufio" "encoding/json" "net" - "strconv" "strings" "sync" "time" @@ -125,31 +124,28 @@ func (c *wireguardConnection) Start(options connection.ConnectOptions) (err erro } }() + // TODO this backward compatibility check needs to be removed once we will start using port ranges for all peers. if config.LocalPort > 0 || len(config.Ports) > 0 { - conn, err := c.natPinger.PingProvider( - config.Provider.Endpoint.IP.String(), - c.ports, - config.Ports, - 0, - ) + params := traversal.Params{ + IP: config.Provider.Endpoint.IP.String(), + LocalPorts: c.ports, + RemotePorts: config.Ports, + } + + conn, err := c.natPinger.PingProvider(params, 0) if err != nil { return errors.Wrap(err, "could not ping provider") } if conn != nil { - _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) - if err != nil { - return err + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + config.LocalPort = addr.Port } - _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) - if err != nil { - return err + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + config.Provider.Endpoint.Port = addr.Port } - config.LocalPort, _ = strconv.Atoi(lPort) - config.Provider.Endpoint.Port, _ = strconv.Atoi(rPort) - conn.Close() } } diff --git a/nat/traversal/nat_proxy.go b/nat/traversal/nat_proxy.go index f40a6fd092..78b34da231 100644 --- a/nat/traversal/nat_proxy.go +++ b/nat/traversal/nat_proxy.go @@ -35,7 +35,7 @@ type natProxy struct { socketProtect func(socket int) bool } -// NewNATProxy constructs an instance of NATProxy +// NewNATProxy constructs an instance of natProxy func newNATProxy() *natProxy { return &natProxy{ servicePorts: make(map[string]int), @@ -46,7 +46,7 @@ func (np *natProxy) consumerHandOff(consumerAddr string, remoteConn *net.UDPConn time.Sleep(400 * time.Millisecond) stop := make(chan struct{}) if np.socketProtect == nil { - // shutdown pinger session since openvpn client will connect directly (without NATProxy) + // shutdown pinger session since openvpn client will connect directly (without natProxy) remoteConn.Close() return stop } @@ -60,7 +60,7 @@ func (np *natProxy) consumerHandOff(consumerAddr string, remoteConn *net.UDPConn func (np *natProxy) consumerProxy(consumerAddr string, remoteConn *net.UDPConn, stop chan struct{}) { laddr, err := net.ResolveUDPAddr("udp4", consumerAddr) if err != nil { - log.Error().Err(err).Msg("Failed to get local address for consumer NATProxy") + log.Error().Err(err).Msg("Failed to get local address for consumer natProxy") return } @@ -89,7 +89,7 @@ func (np *natProxy) consumerProxy(consumerAddr string, remoteConn *net.UDPConn, select { case <-stop: - log.Info().Msg("Stopping NATProxy handOff loop") + log.Info().Msg("Stopping natProxy handOff loop") proxyConn.Close() remoteConn.Close() return @@ -105,12 +105,12 @@ func (np *natProxy) consumerProxy(consumerAddr string, remoteConn *net.UDPConn, } func (np *natProxy) joinUDPStreams(conn *net.UDPConn, remoteConn *net.UDPConn, stop chan struct{}) { - log.Info().Msg("Start copying stream from consumer NATProxy to remote remoteConn") + log.Info().Msg("Start copying stream from consumer natProxy to remote remoteConn") buf := make([]byte, bufferLen) for { select { case <-stop: - log.Info().Msg("Stopping NATProxy joinUDPStreams") + log.Info().Msg("Stopping natProxy joinUDPStreams") return default: } @@ -138,7 +138,7 @@ func (np *natProxy) readWriteToAddr(conn *net.UDPConn, remoteConn *net.UDPConn, for { select { case <-stop: - log.Info().Msg("Stopping NATProxy readWriteToAddr loop") + log.Info().Msg("Stopping natProxy readWriteToAddr loop") return default: } @@ -162,11 +162,11 @@ func (np *natProxy) readWriteToAddr(conn *net.UDPConn, remoteConn *net.UDPConn, } } -// handOff traffic incoming through NATPinger punched hole should be handed off to NATPoxy +// handOff traffic incoming through NATPinger punched hole should be handed off to natProxy func (np *natProxy) handOff(key string, incomingConn *net.UDPConn) { proxyConn, err := np.getConnection(key) if err != nil { - log.Error().Err(err).Msg("Failed to connect to NATProxy") + log.Error().Err(err).Msg("Failed to connect to natProxy") return } log.Info().Msg("Handing off a connection to a service on " + proxyConn.RemoteAddr().String()) @@ -181,7 +181,7 @@ func copyStreams(dstConn *net.UDPConn, srcConn *net.UDPConn) { defer srcConn.Close() totalBytes, err := io.CopyBuffer(dstConn, srcConn, buf) if err != nil { - log.Error().Err(err).Msg("Failed to write/read a stream to/from NATProxy") + log.Error().Err(err).Msg("Failed to write/read a stream to/from natProxy") } log.Debug().Msgf("Total bytes transferred from %s to %s: %d", srcConn.RemoteAddr().String(), @@ -190,7 +190,7 @@ func copyStreams(dstConn *net.UDPConn, srcConn *net.UDPConn) { } func (np *natProxy) registerServicePort(key string, port int) { - log.Info().Msgf("Registering service %s for port %d to NATProxy", key, port) + log.Info().Msgf("Registering service %s for port %d to natProxy", key, port) np.servicePorts[key] = port } diff --git a/nat/traversal/noop.go b/nat/traversal/noop.go index d64f74f4a6..4ba30c1fad 100644 --- a/nat/traversal/noop.go +++ b/nat/traversal/noop.go @@ -45,12 +45,12 @@ func (np *NoopPinger) Start() {} func (np *NoopPinger) Stop() {} // PingProvider does nothing -func (np *NoopPinger) PingProvider(ip string, cPorts, pPorts []int, proxyPort int) (*net.UDPConn, error) { +func (np *NoopPinger) PingProvider(_ Params, proxyPort int) (*net.UDPConn, error) { return nil, nil } // PingTarget does nothing -func (np *NoopPinger) PingTarget(*Params) {} +func (np *NoopPinger) PingTarget(Params) {} // BindServicePort does nothing func (np *NoopPinger) BindServicePort(key string, port int) {} diff --git a/nat/traversal/pinger.go b/nat/traversal/pinger.go index 92ef19f439..a9637d2316 100644 --- a/nat/traversal/pinger.go +++ b/nat/traversal/pinger.go @@ -41,13 +41,13 @@ var ( // NATProviderPinger pings provider and optionally hands off connection to consumer proxy. type NATProviderPinger interface { - PingProvider(ip string, cPorts, pPorts []int, proxyPort int) (*net.UDPConn, error) + PingProvider(params Params, proxyPort int) (*net.UDPConn, error) } // NATPinger is responsible for pinging nat holes type NATPinger interface { NATProviderPinger - PingTarget(*Params) + PingTarget(Params) BindServicePort(key string, port int) Start() Stop() @@ -72,7 +72,7 @@ func DefaultPingConfig() *PingConfig { // Pinger represents NAT pinger structure type Pinger struct { pingConfig *PingConfig - pingTarget chan *Params + pingTarget chan Params stop chan struct{} stopNATProxy chan struct{} once sync.Once @@ -89,7 +89,7 @@ type PortSupplier interface { func NewPinger(pingConfig *PingConfig, publisher eventbus.Publisher) NATPinger { return &Pinger{ pingConfig: pingConfig, - pingTarget: make(chan *Params), + pingTarget: make(chan Params), stop: make(chan struct{}), stopNATProxy: make(chan struct{}), natProxy: newNATProxy(), @@ -99,8 +99,8 @@ func NewPinger(pingConfig *PingConfig, publisher eventbus.Publisher) NATPinger { // Params contains session parameters needed to NAT ping remote peer type Params struct { - ProviderPorts []int - ConsumerPorts []int + RemotePorts []int + LocalPorts []int IP string ProxyPortMappingKey string } @@ -114,18 +114,12 @@ func (p *Pinger) Start() { case <-p.stop: log.Info().Msg("NAT pinger is stopped") return - case pingParams := <-p.pingTarget: - if isPunchingRequired(pingParams) { - go p.pingTargetConsumer(pingParams) - } + case params := <-p.pingTarget: + go p.pingTargetConsumer(params) } } } -func isPunchingRequired(params *Params) bool { - return true -} - // Stop stops pinger loop func (p *Pinger) Stop() { p.once.Do(func() { @@ -135,13 +129,13 @@ func (p *Pinger) Stop() { } // PingProvider pings provider determined by destination provided in sessionConfig -func (p *Pinger) PingProvider(ip string, cPorts, pPorts []int, proxyPort int) (*net.UDPConn, error) { +func (p *Pinger) PingProvider(params Params, proxyPort int) (*net.UDPConn, error) { log.Info().Msg("NAT pinging to provider") stop := make(chan struct{}) defer close(stop) - conn, err := p.multiPing(ip, cPorts, pPorts, 128, stop) + conn, err := p.multiPing(params, 128, stop) if err != nil { log.Err(err).Msg("Failed to ping remote peer") return nil, err @@ -208,7 +202,7 @@ func (p *Pinger) getConnection(ip string, remotePort int, localPort int) (*net.U } // PingTarget relays ping target address data -func (p *Pinger) PingTarget(target *Params) { +func (p *Pinger) PingTarget(target Params) { select { case p.pingTarget <- target: return @@ -238,8 +232,9 @@ func (p *Pinger) pingReceiver(conn *net.UDPConn, stop <-chan struct{}) error { // Add read deadline to prevent possible conn.Read hang when remote peer doesn't send ping ack. conn.SetReadDeadline(time.Now().Add(p.pingConfig.Timeout * 2)) n, err := conn.Read(buf) - // Set higher read deadline when NAT proxy is used. - conn.SetReadDeadline(time.Now().Add(12 * time.Hour)) + // Reset read deadline. + conn.SetReadDeadline(time.Time{}) + if err != nil || n == 0 { log.Error().Err(err).Msgf("Failed to read remote peer: %s - attempting to continue", conn.RemoteAddr().String()) continue @@ -261,10 +256,10 @@ func (p *Pinger) Valid() bool { return true } -func (p *Pinger) pingTargetConsumer(pingParams *Params) { - log.Info().Msgf("Pinging peer with: %+v", pingParams) +func (p *Pinger) pingTargetConsumer(params Params) { + log.Info().Msgf("Pinging peer with: %+v", params) - if pingParams.ProxyPortMappingKey == "" { + if params.ProxyPortMappingKey == "" { log.Error().Msg("Service proxy connection port mapping key is missing") return } @@ -272,7 +267,7 @@ func (p *Pinger) pingTargetConsumer(pingParams *Params) { stop := make(chan struct{}) defer close(stop) - conn, err := p.multiPing(pingParams.IP, pingParams.ProviderPorts, pingParams.ConsumerPorts, 2, stop) + conn, err := p.multiPing(params, 2, stop) if err != nil { log.Err(err).Msg("Failed to ping remote peer") return @@ -289,11 +284,11 @@ func (p *Pinger) pingTargetConsumer(pingParams *Params) { p.eventPublisher.Publish(event.AppTopicTraversal, event.BuildSuccessfulEvent(StageName)) log.Info().Msg("Ping received, waiting for a new connection") - go p.natProxy.handOff(pingParams.ProxyPortMappingKey, conn) + go p.natProxy.handOff(params.ProxyPortMappingKey, conn) } -func (p *Pinger) multiPing(remoteIP string, localPorts, remotePorts []int, initialTTL int, stop <-chan struct{}) (*net.UDPConn, error) { - if len(localPorts) != len(remotePorts) { +func (p *Pinger) multiPing(params Params, initialTTL int, stop <-chan struct{}) (*net.UDPConn, error) { + if len(params.LocalPorts) != len(params.RemotePorts) { return nil, errors.New("number of local and remote ports does not match") } @@ -302,16 +297,16 @@ func (p *Pinger) multiPing(remoteIP string, localPorts, remotePorts []int, initi err error } - ch := make(chan res, len(localPorts)) + ch := make(chan res, len(params.LocalPorts)) - for i := range localPorts { + for i := range params.LocalPorts { go func(i int) { - conn, err := p.singlePing(remoteIP, localPorts[i], remotePorts[i], initialTTL+i, stop) + conn, err := p.singlePing(params.IP, params.LocalPorts[i], params.RemotePorts[i], initialTTL+i, stop) ch <- res{conn, err} }(i) } - // First responce wins. Other are not important. + // First response wins. Other are not important. r := <-ch return r.conn, r.err } diff --git a/nat/traversal/pinger_test.go b/nat/traversal/pinger_test.go index f24415dde9..36398d46b1 100644 --- a/nat/traversal/pinger_test.go +++ b/nat/traversal/pinger_test.go @@ -71,9 +71,9 @@ func TestPinger_Provider_Consumer_Ping_Flow(t *testing.T) { // Start pinging consumer. go func() { pinger.BindServicePort("wg1", providerProxyPort) - p := &Params{ - ProviderPorts: []int{providerPort}, - ConsumerPorts: []int{consumerPort}, + p := Params{ + LocalPorts: []int{providerPort}, + RemotePorts: []int{consumerPort}, IP: "127.0.0.1", ProxyPortMappingKey: "wg1", } @@ -86,7 +86,11 @@ func TestPinger_Provider_Consumer_Ping_Flow(t *testing.T) { // Start pinging provider. stop := make(chan struct{}) defer close(stop) - _, err := pinger.PingProvider("127.0.0.1", []int{consumerPort}, []int{providerPort}, consumerPort+1) + _, err := pinger.PingProvider(Params{ + IP: "127.0.0.1", + LocalPorts: []int{consumerPort}, + RemotePorts: []int{providerPort}, + }, consumerPort+1) assert.NoError(t, err) assert.Contains(t, string(proxyBuf), fmt.Sprintf("continuously pinging to 127.0.0.1:%d", providerPort)) @@ -112,7 +116,11 @@ func TestPinger_PingProvider_Timeout(t *testing.T) { stop := make(chan struct{}) defer close(stop) - _, err := pinger.PingProvider("127.0.0.1", []int{providerPort}, []int{consumerPort}, 0) + _, err := pinger.PingProvider(Params{ + IP: "127.0.0.1", + RemotePorts: []int{providerPort}, + LocalPorts: []int{consumerPort}, + }, 0) assert.Error(t, errNATPunchAttemptTimedOut, err) } diff --git a/services/noop/service.go b/services/noop/service.go index ab6e9aa92a..fb03ce8161 100644 --- a/services/noop/service.go +++ b/services/noop/service.go @@ -48,7 +48,7 @@ type Manager struct { // ProvideConfig provides the session configuration func (manager *Manager) ProvideConfig(_ string, _ json.RawMessage) (*session.ConfigParams, error) { - return &session.ConfigParams{TraversalParams: &traversal.Params{}}, nil + return &session.ConfigParams{TraversalParams: traversal.Params{}}, nil } // Serve starts service - does block diff --git a/services/openvpn/client.go b/services/openvpn/client.go index 419070157d..1a73f77818 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -20,7 +20,6 @@ package openvpn import ( "encoding/json" "net" - "strconv" "sync" "time" @@ -135,34 +134,32 @@ func (c *Client) Start(options connection.ConnectOptions) error { return err } - if sessionConfig.LocalPort == 0 && len(sessionConfig.Ports) > 0 { + // TODO this backward compatibility check needs to be removed once we will start using port ranges for all peers. + if sessionConfig.LocalPort > 0 || len(sessionConfig.Ports) > 0 { if len(sessionConfig.Ports) == 0 || len(c.ports) == 0 { c.ports = []int{sessionConfig.LocalPort} sessionConfig.Ports = []int{sessionConfig.RemotePort} } - conn, err := c.natPinger.PingProvider( - sessionConfig.RemoteIP, - c.ports, - sessionConfig.Ports, - sessionConfig.LocalPort, - ) - if err != nil { - return err + params := traversal.Params{ + IP: sessionConfig.RemoteIP, + LocalPorts: c.ports, + RemotePorts: sessionConfig.Ports, } - _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) + conn, err := c.natPinger.PingProvider(params, sessionConfig.LocalPort) if err != nil { return err } - _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) - if err != nil { - return err + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + sessionConfig.LocalPort = addr.Port + } + + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + sessionConfig.RemotePort = addr.Port } - sessionConfig.LocalPort, _ = strconv.Atoi(lPort) - sessionConfig.RemotePort, _ = strconv.Atoi(rPort) } proc, clientConfig, err := c.processFactory(options, sessionConfig) diff --git a/services/openvpn/client_test.go b/services/openvpn/client_test.go index dea36218bc..7ddf38203f 100644 --- a/services/openvpn/client_test.go +++ b/services/openvpn/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" "github.com/mysteriumnetwork/node/identity" + "github.com/mysteriumnetwork/node/nat/traversal" "github.com/stretchr/testify/assert" ) @@ -49,6 +50,6 @@ func TestConnection_CreatesConnection(t *testing.T) { type MockNATPinger struct{} // PingProvider does nothing -func (mnp *MockNATPinger) PingProvider(_ string, _, _ []int, _ int) (*net.UDPConn, error) { +func (mnp *MockNATPinger) PingProvider(_ traversal.Params, _ int) (*net.UDPConn, error) { return nil, nil } diff --git a/services/openvpn/service/manager.go b/services/openvpn/service/manager.go index a9d7b35655..2594144808 100644 --- a/services/openvpn/service/manager.go +++ b/services/openvpn/service/manager.go @@ -189,7 +189,7 @@ func (m *Manager) Serve(instance *service.Instance) (err error) { } }() - if err := m.startServer(m.openvpnProcess, stateChannel); err != nil { + if err := m.startServer(stateChannel); err != nil { return errors.Wrap(err, "failed to start Openvpn server") } @@ -244,7 +244,7 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi return nil, errors.New("Service port not initialized") } - traversalParams := &traversal.Params{} + traversalParams := traversal.Params{} publicIP, err := m.ipResolver.GetPublicIP() if err != nil { @@ -299,8 +299,8 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi } traversalParams.IP = consumerConfig.IP - traversalParams.ProviderPorts = vpnConfig.Ports - traversalParams.ConsumerPorts = consumerConfig.Ports + traversalParams.LocalPorts = vpnConfig.Ports + traversalParams.RemotePorts = consumerConfig.Ports traversalParams.ProxyPortMappingKey = openvpn_service.ServiceType } } @@ -308,8 +308,8 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi return &session.ConfigParams{SessionServiceConfig: vpnConfig, TraversalParams: traversalParams}, nil } -func (m *Manager) startServer(server openvpn.Process, stateChannel chan openvpn.State) error { - if err := server.Start(); err != nil { +func (m *Manager) startServer(stateChannel chan openvpn.State) error { + if err := m.openvpnProcess.Start(); err != nil { return err } diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index aa02174a2c..2e9f4cce8a 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -20,7 +20,6 @@ package connection import ( "encoding/json" "net" - "strconv" "sync" "time" @@ -120,31 +119,28 @@ func (c *Connection) Start(options connection.ConnectOptions) (err error) { c.stateCh <- connection.Connecting + // TODO this backward compatibility check needs to be removed once we will start using port ranges for all peers. if config.LocalPort > 0 || len(config.Ports) > 0 { - conn, err := c.natPinger.PingProvider( - config.Provider.Endpoint.IP.String(), - c.ports, - config.Ports, - 0, - ) + params := traversal.Params{ + IP: config.Provider.Endpoint.IP.String(), + LocalPorts: c.ports, + RemotePorts: config.Ports, + } + + conn, err := c.natPinger.PingProvider(params, 0) if err != nil { return errors.Wrap(err, "could not ping provider") } if conn != nil { - _, lPort, err := net.SplitHostPort(conn.LocalAddr().String()) - if err != nil { - return err + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + config.LocalPort = addr.Port } - _, rPort, err := net.SplitHostPort(conn.RemoteAddr().String()) - if err != nil { - return err + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + config.Provider.Endpoint.Port = addr.Port } - config.LocalPort, _ = strconv.Atoi(lPort) - config.Provider.Endpoint.Port, _ = strconv.Atoi(rPort) - conn.Close() } } diff --git a/services/wireguard/service/service_unix.go b/services/wireguard/service/service_unix.go index 3578652924..4528ee7f25 100644 --- a/services/wireguard/service/service_unix.go +++ b/services/wireguard/service/service_unix.go @@ -249,7 +249,7 @@ func (m *Manager) ProvideConfig(sessionID string, sessionConfig json.RawMessage) m.sessionCleanup[sessionID] = destroy m.sessionCleanupMu.Unlock() - return &session.ConfigParams{SessionServiceConfig: config, SessionDestroyCallback: destroy, TraversalParams: &traversalParams}, nil + return &session.ConfigParams{SessionServiceConfig: config, SessionDestroyCallback: destroy, TraversalParams: traversalParams}, nil } func (m *Manager) tryAddPortMapping(pubIP string, port int) (release func(), ok bool) { @@ -294,7 +294,7 @@ func (m *Manager) addConsumerPeer(conn wg.ConnectionEndpoint, consumerPort, prov } func (m *Manager) addTraversalParams(config wg.ServiceConfig, traversalParams traversal.Params) (wg.ServiceConfig, error) { - config.Ports = traversalParams.ProviderPorts + config.Ports = traversalParams.LocalPorts // Provide new provider endpoint which points to providers NAT Proxy. newProviderEndpoint, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", config.Provider.Endpoint.IP, config.RemotePort)) @@ -306,8 +306,8 @@ func (m *Manager) addTraversalParams(config wg.ServiceConfig, traversalParams tr config.Consumer.ConnectDelay = 0 // TODO this backward compatibility block needs to be removed once we will start using port ranges for all peers. - config.LocalPort = traversalParams.ConsumerPorts[len(traversalParams.ConsumerPorts)-1] - config.RemotePort = traversalParams.ProviderPorts[len(traversalParams.ProviderPorts)-1] + config.LocalPort = traversalParams.RemotePorts[len(traversalParams.RemotePorts)-1] + config.RemotePort = traversalParams.LocalPorts[len(traversalParams.LocalPorts)-1] config.Provider.Endpoint.Port = config.RemotePort return config, nil @@ -334,7 +334,7 @@ func (m *Manager) newTraversalParams(natPingerEnabled bool, consumerConfig wg.Co return params, errors.Wrap(err, "could not acquire NAT pinger provider port") } - params.ProviderPorts = append(params.ProviderPorts, pp.Num()) + params.LocalPorts = append(params.LocalPorts, pp.Num()) } if consumerConfig.IP == "" { @@ -342,7 +342,7 @@ func (m *Manager) newTraversalParams(natPingerEnabled bool, consumerConfig wg.Co } params.IP = consumerConfig.IP - params.ConsumerPorts = consumerConfig.Ports + params.RemotePorts = consumerConfig.Ports params.ProxyPortMappingKey = fmt.Sprintf("%s_%s", wg.ServiceType, consumerConfig.PublicKey) return params, nil diff --git a/services/wireguard/service/service_windows.go b/services/wireguard/service/service_windows.go index 4a5b554d24..586598f82e 100644 --- a/services/wireguard/service/service_windows.go +++ b/services/wireguard/service/service_windows.go @@ -69,7 +69,7 @@ type Manager struct { } // ProvideConfig provides the config for consumer -func (manager *Manager) ProvideConfig(publicKey json.RawMessage, traversalParams *traversal.Params) (*session.ConfigParams, error) { +func (manager *Manager) ProvideConfig(publicKey json.RawMessage, traversalParams traversal.Params) (*session.ConfigParams, error) { key := &wg.ConsumerConfig{} err := json.Unmarshal(publicKey, key) if err != nil { diff --git a/services/wireguard/wireguard.go b/services/wireguard/wireguard.go index fe4d9157f0..f02025989f 100644 --- a/services/wireguard/wireguard.go +++ b/services/wireguard/wireguard.go @@ -90,7 +90,7 @@ type ServiceConfig struct { // LocalPort and RemotePort are needed for NAT hole punching only. LocalPort int `json:"-"` RemotePort int `json:"-"` - Ports []int `json:"Ports"` + Ports []int `json:"ports"` Provider struct { PublicKey string diff --git a/session/create_consumer.go b/session/create_consumer.go index cf70abea55..bd825a1daa 100644 --- a/session/create_consumer.go +++ b/session/create_consumer.go @@ -36,7 +36,7 @@ type createConsumer struct { // Starter starts the session. type Starter interface { - Start(session *Session, consumerID identity.Identity, consumerInfo ConsumerInfo, proposalID int, config ServiceConfiguration, pingerParams *traversal.Params) error + Start(session *Session, consumerID identity.Identity, consumerInfo ConsumerInfo, proposalID int, config ServiceConfiguration, pingerParams traversal.Params) error } // GetMessageEndpoint returns endpoint where to receive messages diff --git a/session/create_consumer_test.go b/session/create_consumer_test.go index 2b25b98dd2..4e04cf0d4d 100644 --- a/session/create_consumer_test.go +++ b/session/create_consumer_test.go @@ -138,7 +138,7 @@ type mockConfigProvider struct { } func (mockConfigProvider) ProvideConfig(_ string, _ json.RawMessage) (*ConfigParams, error) { - return &ConfigParams{SessionServiceConfig: config, TraversalParams: &traversal.Params{}}, nil + return &ConfigParams{SessionServiceConfig: config, TraversalParams: traversal.Params{}}, nil } // managerFake represents fake Manager usually useful in tests @@ -151,7 +151,7 @@ type managerFake struct { } // Start function creates and returns fake session -func (manager *managerFake) Start(session *Session, consumerID identity.Identity, consumerInfo ConsumerInfo, proposalID int, config ServiceConfiguration, pingerParams *traversal.Params) error { +func (manager *managerFake) Start(session *Session, consumerID identity.Identity, consumerInfo ConsumerInfo, proposalID int, config ServiceConfiguration, pingerParams traversal.Params) error { session.ID = manager.fakeSession.ID session.ConsumerID = manager.fakeSession.ConsumerID manager.lastConsumerID = consumerID diff --git a/session/manager.go b/session/manager.go index c3996d21ce..8fa5f0691a 100644 --- a/session/manager.go +++ b/session/manager.go @@ -47,7 +47,7 @@ type IDGenerator func() (ID, error) type ConfigParams struct { SessionServiceConfig ServiceConfiguration SessionDestroyCallback DestroyCallback - TraversalParams *traversal.Params + TraversalParams traversal.Params } type publisher interface { @@ -90,7 +90,7 @@ func NewManager( currentProposal market.ServiceProposal, sessionStorage Storage, paymentEngineFactory PaymentEngineFactory, - natPingerChan func(*traversal.Params), + natPingerChan func(traversal.Params), natEventGetter NATEventGetter, serviceId string, publisher publisher, @@ -112,7 +112,7 @@ type Manager struct { currentProposal market.ServiceProposal sessionStorage Storage paymentEngineFactory PaymentEngineFactory - natPingerChan func(*traversal.Params) + natPingerChan func(traversal.Params) natEventGetter NATEventGetter serviceId string publisher publisher @@ -121,7 +121,7 @@ type Manager struct { // Start starts a session on the provider side for the given consumer. // Multiple sessions per peerID is possible in case different services are used -func (manager *Manager) Start(session *Session, consumerID identity.Identity, consumerInfo ConsumerInfo, proposalID int, config ServiceConfiguration, pingerParams *traversal.Params) (err error) { +func (manager *Manager) Start(session *Session, consumerID identity.Identity, consumerInfo ConsumerInfo, proposalID int, config ServiceConfiguration, pingerParams traversal.Params) (err error) { manager.creationLock.Lock() defer manager.creationLock.Unlock() diff --git a/session/manager_test.go b/session/manager_test.go index db5e312e1b..cf969562c1 100644 --- a/session/manager_test.go +++ b/session/manager_test.go @@ -66,12 +66,12 @@ func TestManager_Start_StoresSession(t *testing.T) { sessionStore := NewStorageMemory() - natPinger := func(*traversal.Params) {} + natPinger := func(traversal.Params) {} manager := NewManager(currentProposal, sessionStore, mockPaymentEngineFactory, natPinger, &MockNatEventTracker{}, "test service id", mocks.NewEventBus()) - pingerParams := &traversal.Params{} + pingerParams := traversal.Params{} session, err := newSession() assert.NoError(t, err) err = manager.Start(session, consumerID, ConsumerInfo{IssuerID: consumerID}, currentProposalID, nil, pingerParams) @@ -87,12 +87,12 @@ func TestManager_Start_StoresSession(t *testing.T) { func TestManager_Start_RejectsUnknownProposal(t *testing.T) { sessionStore := NewStorageMemory() - natPinger := func(*traversal.Params) {} + natPinger := func(traversal.Params) {} manager := NewManager(currentProposal, sessionStore, mockPaymentEngineFactory, natPinger, &MockNatEventTracker{}, "test service id", mocks.NewEventBus()) - pingerParams := &traversal.Params{} + pingerParams := traversal.Params{} session, err := newSession() assert.NoError(t, err) err = manager.Start(session, consumerID, ConsumerInfo{IssuerID: consumerID}, 69, nil, pingerParams) @@ -109,7 +109,7 @@ func (mnet *MockNatEventTracker) LastEvent() *event.Event { func TestManager_AcknowledgeSession_RejectsUnknown(t *testing.T) { sessionStore := NewStorageMemory() - natPinger := func(*traversal.Params) {} + natPinger := func(traversal.Params) {} manager := NewManager(currentProposal, sessionStore, mockPaymentEngineFactory, natPinger, &MockNatEventTracker{}, "test service id", mocks.NewEventBus()) @@ -119,12 +119,12 @@ func TestManager_AcknowledgeSession_RejectsUnknown(t *testing.T) { func TestManager_AcknowledgeSession_RejectsBadClient(t *testing.T) { sessionStore := NewStorageMemory() - natPinger := func(*traversal.Params) {} + natPinger := func(traversal.Params) {} manager := NewManager(currentProposal, sessionStore, mockPaymentEngineFactory, natPinger, &MockNatEventTracker{}, "test service id", mocks.NewEventBus()) - pingerParams := &traversal.Params{} + pingerParams := traversal.Params{} session, err := newSession() err = manager.Start(session, consumerID, ConsumerInfo{IssuerID: consumerID}, currentProposalID, nil, pingerParams) assert.Nil(t, err) @@ -135,13 +135,13 @@ func TestManager_AcknowledgeSession_RejectsBadClient(t *testing.T) { func TestManager_AcknowledgeSession_PublishesEvent(t *testing.T) { sessionStore := NewStorageMemory() - natPinger := func(*traversal.Params) {} + natPinger := func(traversal.Params) {} mp := mocks.NewEventBus() manager := NewManager(currentProposal, sessionStore, mockPaymentEngineFactory, natPinger, &MockNatEventTracker{}, "test service id", mp) - pingerParams := &traversal.Params{} + pingerParams := traversal.Params{} session, err := newSession() assert.NoError(t, err) err = manager.Start(session, consumerID, ConsumerInfo{IssuerID: consumerID}, currentProposalID, nil, pingerParams) From 14cdfba2aa91e41f792075d6f05ba559a4075920 Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Thu, 20 Feb 2020 09:35:52 +0600 Subject: [PATCH 09/13] Use correct connection for noop pinger --- mobile/mysterium/wireguard_connection_setup.go | 16 +++++++--------- nat/traversal/noop.go | 2 +- services/wireguard/connection/connection.go | 16 +++++++--------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/mobile/mysterium/wireguard_connection_setup.go b/mobile/mysterium/wireguard_connection_setup.go index 5099c13fc6..bdc1ce6e4e 100644 --- a/mobile/mysterium/wireguard_connection_setup.go +++ b/mobile/mysterium/wireguard_connection_setup.go @@ -137,17 +137,15 @@ func (c *wireguardConnection) Start(options connection.ConnectOptions) (err erro return errors.Wrap(err, "could not ping provider") } - if conn != nil { - if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - config.LocalPort = addr.Port - } - - if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { - config.Provider.Endpoint.Port = addr.Port - } + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + config.LocalPort = addr.Port + } - conn.Close() + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + config.Provider.Endpoint.Port = addr.Port } + + conn.Close() } if err := c.device.Start(c.privateKey, config); err != nil { diff --git a/nat/traversal/noop.go b/nat/traversal/noop.go index 4ba30c1fad..df6e235319 100644 --- a/nat/traversal/noop.go +++ b/nat/traversal/noop.go @@ -46,7 +46,7 @@ func (np *NoopPinger) Stop() {} // PingProvider does nothing func (np *NoopPinger) PingProvider(_ Params, proxyPort int) (*net.UDPConn, error) { - return nil, nil + return &net.UDPConn{}, nil } // PingTarget does nothing diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index 2e9f4cce8a..cd42cea339 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -132,17 +132,15 @@ func (c *Connection) Start(options connection.ConnectOptions) (err error) { return errors.Wrap(err, "could not ping provider") } - if conn != nil { - if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - config.LocalPort = addr.Port - } - - if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { - config.Provider.Endpoint.Port = addr.Port - } + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + config.LocalPort = addr.Port + } - conn.Close() + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + config.Provider.Endpoint.Port = addr.Port } + + conn.Close() } log.Info().Msg("Starting new connection") From fd86ff18cbaeaf433bfb7f5310e024317f832b65 Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Thu, 20 Feb 2020 09:36:12 +0600 Subject: [PATCH 10/13] Stop pings on NAT pinger stop --- nat/traversal/pinger.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nat/traversal/pinger.go b/nat/traversal/pinger.go index a9637d2316..eafcd0c6e1 100644 --- a/nat/traversal/pinger.go +++ b/nat/traversal/pinger.go @@ -162,6 +162,8 @@ func (p *Pinger) ping(conn *net.UDPConn, ttl int, stop <-chan struct{}) error { select { case <-stop: return nil + case <-p.stop: + return nil case <-time.After(p.pingConfig.Interval): log.Debug().Msgf("Pinging %s from %s...", conn.RemoteAddr().String(), conn.LocalAddr().String()) @@ -206,6 +208,8 @@ func (p *Pinger) PingTarget(target Params) { select { case p.pingTarget <- target: return + case <-p.stop: + return // do not block if ping target is not received case <-time.After(100 * time.Millisecond): log.Info().Msgf("Ping target timeout: %v", target) @@ -228,6 +232,8 @@ func (p *Pinger) pingReceiver(conn *net.UDPConn, stop <-chan struct{}) error { return errNATPunchAttemptTimedOut case <-stop: return errNATPunchAttemptStopped + case <-p.stop: + return errNATPunchAttemptStopped default: // Add read deadline to prevent possible conn.Read hang when remote peer doesn't send ping ack. conn.SetReadDeadline(time.Now().Add(p.pingConfig.Timeout * 2)) From e9c23cf3cd5c23ffc7ec985d079429fc655108ff Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Thu, 20 Feb 2020 10:14:49 +0600 Subject: [PATCH 11/13] Avoid VPNConfig mutation during validation --- mobile/mysterium/openvpn_connection_setup.go | 14 +++------ services/openvpn/client.go | 18 ++--------- services/openvpn/client_config.go | 4 +-- services/openvpn/config_validator.go | 33 +++++++++++--------- services/openvpn/config_validator_test.go | 32 +++++++++---------- 5 files changed, 44 insertions(+), 57 deletions(-) diff --git a/mobile/mysterium/openvpn_connection_setup.go b/mobile/mysterium/openvpn_connection_setup.go index 8623de3713..5452854cce 100644 --- a/mobile/mysterium/openvpn_connection_setup.go +++ b/mobile/mysterium/openvpn_connection_setup.go @@ -39,7 +39,7 @@ type natPinger interface { SetProtectSocketCallback(SocketProtect func(socket int) bool) } -type openvpn3SessionFactory func(connection.ConnectOptions, *openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) +type openvpn3SessionFactory func(connection.ConnectOptions, openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) var errSessionWrapperNotStarted = errors.New("session wrapper not started") @@ -51,14 +51,8 @@ func NewOpenVPNConnection(sessionTracker *sessionTracker, signerFactory identity ipResolver: ipResolver, pingerStop: make(chan struct{}), } - sessionFactory := func(options connection.ConnectOptions, sessionConfig *openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) { - // override vpnClientConfig params with proxy local IP and pinger port - // do this only if connecting to natted provider - if sessionConfig.LocalPort > 0 { - sessionConfig.OriginalRemoteIP = sessionConfig.RemoteIP - sessionConfig.OriginalRemotePort = sessionConfig.RemotePort - } + sessionFactory := func(options connection.ConnectOptions, sessionConfig openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) { vpnClientConfig, err := openvpn.NewClientConfigFromSession(sessionConfig, "", "", connection.DNSOptionAuto) if err != nil { return nil, nil, err @@ -151,8 +145,8 @@ func (c *openvpnConnection) Statistics() (connection.Statistics, error) { } func (c *openvpnConnection) Start(options connection.ConnectOptions) error { - sessionConfig := &openvpn.VPNConfig{} - err := json.Unmarshal(options.SessionConfig, sessionConfig) + sessionConfig := openvpn.VPNConfig{} + err := json.Unmarshal(options.SessionConfig, &sessionConfig) if err != nil { return err } diff --git a/services/openvpn/client.go b/services/openvpn/client.go index 1a73f77818..2d42ed34b3 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -44,7 +44,7 @@ import ( var ErrProcessNotStarted = errors.New("process not started yet") // processFactory creates a new openvpn process -type processFactory func(options connection.ConnectOptions, sessionConfig *VPNConfig) (openvpn.Process, *ClientConfig, error) +type processFactory func(options connection.ConnectOptions, sessionConfig VPNConfig) (openvpn.Process, *ClientConfig, error) // NewClient creates a new openvpn connection func NewClient(openvpnBinary, configDirectory, runtimeDirectory string, @@ -65,15 +65,7 @@ func NewClient(openvpnBinary, configDirectory, runtimeDirectory string, removeAllowedIPRule: func() {}, } - procFactory := func(options connection.ConnectOptions, sessionConfig *VPNConfig) (openvpn.Process, *ClientConfig, error) { - - // override vpnClientConfig params with proxy local IP and pinger port - // do this only if connecting to natted provider - if sessionConfig.LocalPort > 0 { - sessionConfig.OriginalRemoteIP = sessionConfig.RemoteIP - sessionConfig.OriginalRemotePort = sessionConfig.RemotePort - } - + procFactory := func(options connection.ConnectOptions, sessionConfig VPNConfig) (openvpn.Process, *ClientConfig, error) { vpnClientConfig, err := NewClientConfigFromSession(sessionConfig, configDirectory, runtimeDirectory, options.DNS) if err != nil { return nil, nil, err @@ -128,7 +120,7 @@ func (c *Client) Statistics() (connection.Statistics, error) { func (c *Client) Start(options connection.ConnectOptions) error { log.Info().Msg("Starting connection") - sessionConfig := &VPNConfig{} + sessionConfig := VPNConfig{} err := json.Unmarshal(options.SessionConfig, sessionConfig) if err != nil { return err @@ -242,10 +234,6 @@ func (c *Client) GetConfig() (connection.ConsumerConfig, error) { //VPNConfig structure represents VPN configuration options for given session type VPNConfig struct { - // OriginalRemoteIP and OriginalRemotePort are used for NAT punching from consumer side. - OriginalRemoteIP string - OriginalRemotePort int - DNSIPs string `json:"dns_ips"` RemoteIP string `json:"remote"` RemotePort int `json:"port"` diff --git a/services/openvpn/client_config.go b/services/openvpn/client_config.go index 35925fd8f6..99655d4d91 100644 --- a/services/openvpn/client_config.go +++ b/services/openvpn/client_config.go @@ -77,7 +77,7 @@ func defaultClientConfig(runtimeDir string, scriptSearchPath string) *ClientConf // NewClientConfigFromSession creates client configuration structure for given VPNConfig, configuration dir to store serialized file args, and // configuration filename to store other args // TODO this will become the part of openvpn service consumer separate package -func NewClientConfigFromSession(vpnConfig *VPNConfig, configDir string, runtimeDir string, dnsOption connection.DNSOption) (*ClientConfig, error) { +func NewClientConfigFromSession(vpnConfig VPNConfig, configDir string, runtimeDir string, dnsOption connection.DNSOption) (*ClientConfig, error) { // TODO Rename `vpnConfig` to `sessionConfig` err := NewDefaultValidator().IsValid(vpnConfig) if err != nil { @@ -92,7 +92,7 @@ func NewClientConfigFromSession(vpnConfig *VPNConfig, configDir string, runtimeD for _, ip := range dnsIPs { clientFileConfig.SetParam("dhcp-option", "DNS", ip) } - clientFileConfig.VpnConfig = vpnConfig + clientFileConfig.VpnConfig = &vpnConfig clientFileConfig.SetReconnectRetry(2) clientFileConfig.SetClientMode(vpnConfig.RemoteIP, vpnConfig.RemotePort, vpnConfig.LocalPort) clientFileConfig.SetProtocol(vpnConfig.RemoteProtocol) diff --git a/services/openvpn/config_validator.go b/services/openvpn/config_validator.go index 67513533df..3cf774fc9b 100644 --- a/services/openvpn/config_validator.go +++ b/services/openvpn/config_validator.go @@ -30,7 +30,7 @@ import ( ) // ValidateConfig is function which takes VPNConfig as argument, checks it and returns error if validation fails -type ValidateConfig func(config *VPNConfig) error +type ValidateConfig func(config VPNConfig) error // ConfigValidator represents structure which contains list of validating functions type ConfigValidator struct { @@ -51,7 +51,7 @@ func NewDefaultValidator() *ConfigValidator { } // IsValid function checks if provided config is valid against given config validator and returns first encountered error -func (v *ConfigValidator) IsValid(config *VPNConfig) error { +func (v *ConfigValidator) IsValid(config VPNConfig) error { for _, validator := range v.validators { if err := validator(config); err != nil { return err @@ -60,7 +60,7 @@ func (v *ConfigValidator) IsValid(config *VPNConfig) error { return nil } -func validProtocol(config *VPNConfig) error { +func validProtocol(config VPNConfig) error { switch config.RemoteProtocol { case "udp", @@ -70,14 +70,14 @@ func validProtocol(config *VPNConfig) error { return errors.New("invalid protocol: " + config.RemoteProtocol) } -func validPort(config *VPNConfig) error { +func validPort(config VPNConfig) error { if config.RemotePort > 65535 || config.RemotePort < 1024 { return errors.New("invalid port range, should fall within 1024 .. 65535 range") } return nil } -func validIPFormat(config *VPNConfig) error { +func validIPFormat(config VPNConfig) error { parsed := net.ParseIP(config.RemoteIP) if parsed == nil { return errors.New("unable to parse ip address " + config.RemoteIP) @@ -88,10 +88,15 @@ func validIPFormat(config *VPNConfig) error { return nil } +func validTLSPresharedKey(config VPNConfig) error { + _, err := formatTLSPresharedKey(config) + return err +} + // preshared key format (PEM blocks with data encoded to hex) are taken from -// openvpn --genkey --secret static.key, which is openvpn specific -// side effect: it reformats key from single line to multiline fixed length strings -func validTLSPresharedKey(config *VPNConfig) error { +// openvpn --genkey --secret static.key, which is openvpn specific. +// it reformats key from single line to multiline fixed length strings. +func formatTLSPresharedKey(config VPNConfig) (VPNConfig, error) { contentScanner := bufio.NewScanner(bytes.NewBufferString(config.TLSPresharedKey)) for contentScanner.Scan() { line := contentScanner.Text() @@ -101,11 +106,11 @@ func validTLSPresharedKey(config *VPNConfig) error { } } if err := contentScanner.Err(); err != nil { - return contentScanner.Err() + return VPNConfig{}, contentScanner.Err() } header := contentScanner.Text() if header != "-----BEGIN OpenVPN Static key V1-----" { - return errors.New("Invalid key header: " + header) + return VPNConfig{}, errors.New("Invalid key header: " + header) } var key string @@ -118,11 +123,11 @@ func validTLSPresharedKey(config *VPNConfig) error { } } if err := contentScanner.Err(); err != nil { - return err + return VPNConfig{}, err } // 256 bytes key is 512 bytes if encoded to hex if len(key) != 512 { - return errors.New("invalid key length") + return VPNConfig{}, errors.New("invalid key length") } var buff = &bytes.Buffer{} @@ -135,10 +140,10 @@ func validTLSPresharedKey(config *VPNConfig) error { fmt.Fprintln(buff, "-----END OpenVPN Static key V1-----") config.TLSPresharedKey = buff.String() - return nil + return config, nil } -func validCACertificate(config *VPNConfig) error { +func validCACertificate(config VPNConfig) error { pemBlock, _ := pem.Decode([]byte(config.CACertificate)) if pemBlock.Type != "CERTIFICATE" { return errors.New("invalid CA certificate. Certificate block expected") diff --git a/services/openvpn/config_validator_test.go b/services/openvpn/config_validator_test.go index b08a0834eb..c0dad5c569 100644 --- a/services/openvpn/config_validator_test.go +++ b/services/openvpn/config_validator_test.go @@ -61,42 +61,42 @@ YFcPCscvdnZ1U8hTUaREZmDB2w9eaGyCM4YXAg== ` func TestValidatorReturnsNilErrorOnValidVPNConfig(t *testing.T) { - vpnConfig := &VPNConfig{ - OriginalRemoteIP: "", - OriginalRemotePort: 0, - DNSIPs: "", - RemoteIP: "1.2.3.4", - RemotePort: 10999, - LocalPort: 1194, - RemoteProtocol: "tcp", - TLSPresharedKey: tlsTestKey, - CACertificate: caCertificate, + vpnConfig := VPNConfig{ + DNSIPs: "", + RemoteIP: "1.2.3.4", + RemotePort: 10999, + LocalPort: 1194, + RemoteProtocol: "tcp", + TLSPresharedKey: tlsTestKey, + CACertificate: caCertificate, } assert.NoError(t, NewDefaultValidator().IsValid(vpnConfig)) } func TestIPv6AreNotAllowed(t *testing.T) { vpnConfig := VPNConfig{RemoteIP: "2001:db8:85a3::8a2e:370:7334"} - assert.Error(t, validIPFormat(&vpnConfig)) + assert.Error(t, validIPFormat(vpnConfig)) } func TestUnknownProtocolIsNotAllowed(t *testing.T) { vpnConfig := VPNConfig{RemoteProtocol: "fake_protocol"} - assert.Error(t, validProtocol(&vpnConfig)) + assert.Error(t, validProtocol(vpnConfig)) } func TestPortOutOfRangeIsNotAllowed(t *testing.T) { vpnConfig := VPNConfig{RemotePort: -1} - assert.Error(t, validPort(&vpnConfig)) + assert.Error(t, validPort(vpnConfig)) } func TestTLSPresharedKeyIsValid(t *testing.T) { vpnConfig := VPNConfig{TLSPresharedKey: tlsTestKey} - assert.NoError(t, validTLSPresharedKey(&vpnConfig)) - assert.Equal(t, tlsTestKeyPreformatted, vpnConfig.TLSPresharedKey) + assert.NoError(t, validTLSPresharedKey(vpnConfig)) + newVPNConfig, err := formatTLSPresharedKey(vpnConfig) + assert.NoError(t, err) + assert.Equal(t, tlsTestKeyPreformatted, newVPNConfig.TLSPresharedKey) } func TestCACertificateIsValid(t *testing.T) { vpnConfig := VPNConfig{CACertificate: caCertificate} - assert.NoError(t, validCACertificate(&vpnConfig)) + assert.NoError(t, validCACertificate(vpnConfig)) } From 9a269c9478f2a074870a3df816367073413ee4be Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Thu, 20 Feb 2020 10:58:58 +0600 Subject: [PATCH 12/13] Add flag for NAT hole punching max TTL value --- config/flags_network.go | 8 ++++++++ core/port/pool.go | 15 +++++++++++++++ core/port/pool_fixed.go | 14 ++++++++++++++ core/port/pool_test.go | 4 ++-- services/openvpn/client.go | 17 ++++++++--------- services/wireguard/connection/connection.go | 15 +++++++-------- services/wireguard/service/service_unix.go | 8 +++++--- 7 files changed, 59 insertions(+), 22 deletions(-) diff --git a/config/flags_network.go b/config/flags_network.go index fd48ef14d1..5807da9f41 100644 --- a/config/flags_network.go +++ b/config/flags_network.go @@ -57,6 +57,12 @@ var ( Usage: "Enables NAT hole punching", Value: true, } + // FlagNATPunchingMaxTTL sets max number of devices to try pass for NAT hole punching. + FlagNATPunchingMaxTTL = cli.IntFlag{ + Name: "natpunching.max-ttl", + Usage: "Max number of devices to try pass for NAT hole punching", + Value: 10, + } ) // RegisterFlagsNetwork function register network flags to flag list @@ -66,6 +72,7 @@ func RegisterFlagsNetwork(flags *[]cli.Flag) { &FlagTestnet, &FlagLocalnet, &FlagNATPunching, + &FlagNATPunchingMaxTTL, &FlagAPIAddress, &FlagBrokerAddress, &FlagEtherRPC, @@ -80,4 +87,5 @@ func ParseFlagsNetwork(ctx *cli.Context) { Current.ParseStringFlag(ctx, FlagBrokerAddress) Current.ParseStringFlag(ctx, FlagEtherRPC) Current.ParseBoolFlag(ctx, FlagNATPunching) + Current.ParseIntFlag(ctx, FlagNATPunchingMaxTTL) } diff --git a/core/port/pool.go b/core/port/pool.go index 3e74be7546..11f1c147c2 100644 --- a/core/port/pool.go +++ b/core/port/pool.go @@ -34,6 +34,7 @@ type Pool struct { // ServicePortSupplier provides port needed to run a service on type ServicePortSupplier interface { Acquire() (Port, error) + AcquireMultiple(n int) (ports []Port, err error) } // NewPool creates a port pool that will provide ports from range 40000-50000 @@ -82,3 +83,17 @@ func (pool *Pool) seekAvailablePort() (int, error) { } return 0, errors.New("port pool is exhausted") } + +// AcquireMultiple returns n unused ports from pool's range. +func (pool *Pool) AcquireMultiple(n int) (ports []Port, err error) { + for i := 0; i < n; i++ { + p, err := pool.Acquire() + if err != nil { + return ports, err + } + + ports = append(ports, p) + } + + return ports, nil +} diff --git a/core/port/pool_fixed.go b/core/port/pool_fixed.go index 6af3d1cbdc..89068291eb 100644 --- a/core/port/pool_fixed.go +++ b/core/port/pool_fixed.go @@ -41,3 +41,17 @@ func (pool *PoolFixed) Acquire() (port Port, err error) { pool.port = port return } + +// AcquireMultiple returns n unused ports from pool's range. +func (pool *PoolFixed) AcquireMultiple(n int) (ports []Port, err error) { + for i := 0; i < n; i++ { + p, err := pool.Acquire() + if err != nil { + return ports, err + } + + ports = append(ports, p) + } + + return ports, nil +} diff --git a/core/port/pool_test.go b/core/port/pool_test.go index 5b8955c536..da0a34b3e6 100644 --- a/core/port/pool_test.go +++ b/core/port/pool_test.go @@ -29,12 +29,12 @@ func TestAcquiredPortsAreUsable(t *testing.T) { pool := NewPool() port, _ := pool.Acquire() - err := listenUdp(port.Num()) + err := listenUDP(port.Num()) assert.NoError(t, err) } -func listenUdp(port int) error { +func listenUDP(port int) error { udpAddr, err := net.ResolveUDPAddr("udp", ":"+strconv.Itoa(port)) if err != nil { return err diff --git a/services/openvpn/client.go b/services/openvpn/client.go index 2d42ed34b3..e68b2b8a0b 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -28,6 +28,7 @@ import ( "github.com/mysteriumnetwork/go-openvpn/openvpn/middlewares/client/auth" openvpn_bytescount "github.com/mysteriumnetwork/go-openvpn/openvpn/middlewares/client/bytescount" "github.com/mysteriumnetwork/go-openvpn/openvpn/middlewares/state" + "github.com/mysteriumnetwork/node/config" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" "github.com/mysteriumnetwork/node/core/port" @@ -121,7 +122,7 @@ func (c *Client) Start(options connection.ConnectOptions) error { log.Info().Msg("Starting connection") sessionConfig := VPNConfig{} - err := json.Unmarshal(options.SessionConfig, sessionConfig) + err := json.Unmarshal(options.SessionConfig, &sessionConfig) if err != nil { return err } @@ -215,15 +216,13 @@ func (c *Client) GetConfig() (connection.ConsumerConfig, error) { return nil, errors.Wrap(err, "failed to get consumer public IP") } - pool := port.NewPool() - - for i := 0; i < 10; i++ { - cp, err := pool.Acquire() - if err != nil { - return nil, err - } + ports, err := port.NewPool().AcquireMultiple(config.GetInt(config.FlagNATPunchingMaxTTL)) + if err != nil { + return nil, err + } - c.ports = append(c.ports, cp.Num()) + for _, p := range ports { + c.ports = append(c.ports, p.Num()) } return &ConsumerConfig{ diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index cd42cea339..dec65ce50d 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/mysteriumnetwork/node/config" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" "github.com/mysteriumnetwork/node/core/port" @@ -229,15 +230,13 @@ func (c *Connection) GetConfig() (connection.ConsumerConfig, error) { } } - pool := port.NewPool() - - for i := 0; i < 10; i++ { - cp, err := pool.Acquire() - if err != nil { - return nil, err - } + ports, err := port.NewPool().AcquireMultiple(config.GetInt(config.FlagNATPunchingMaxTTL)) + if err != nil { + return nil, err + } - c.ports = append(c.ports, cp.Num()) + for _, p := range ports { + c.ports = append(c.ports, p.Num()) } return wg.ConsumerConfig{ diff --git a/services/wireguard/service/service_unix.go b/services/wireguard/service/service_unix.go index 4528ee7f25..9975711caa 100644 --- a/services/wireguard/service/service_unix.go +++ b/services/wireguard/service/service_unix.go @@ -306,9 +306,11 @@ func (m *Manager) addTraversalParams(config wg.ServiceConfig, traversalParams tr config.Consumer.ConnectDelay = 0 // TODO this backward compatibility block needs to be removed once we will start using port ranges for all peers. - config.LocalPort = traversalParams.RemotePorts[len(traversalParams.RemotePorts)-1] - config.RemotePort = traversalParams.LocalPorts[len(traversalParams.LocalPorts)-1] - config.Provider.Endpoint.Port = config.RemotePort + if len(traversalParams.RemotePorts) > 0 && len(traversalParams.LocalPorts) > 0 { + config.LocalPort = traversalParams.RemotePorts[len(traversalParams.RemotePorts)-1] + config.RemotePort = traversalParams.LocalPorts[len(traversalParams.LocalPorts)-1] + config.Provider.Endpoint.Port = config.RemotePort + } return config, nil } From 65bb88123bdc84d7720eccf11c5736f39e9d02d1 Mon Sep 17 00:00:00 2001 From: Dmitry Shihovtsev Date: Fri, 21 Feb 2020 09:58:00 +0600 Subject: [PATCH 13/13] Handle NAT pinger connection internally --- mobile/mysterium/openvpn_connection_setup.go | 37 +++++++++-------- .../mysterium/wireguard_connection_setup.go | 35 ++++++++-------- nat/traversal/noop.go | 6 +-- nat/traversal/pinger.go | 40 ++++++++++++------- nat/traversal/pinger_test.go | 12 +----- services/openvpn/client.go | 24 ++++------- services/openvpn/client_test.go | 6 +-- services/openvpn/service/manager.go | 2 +- services/wireguard/connection/connection.go | 21 +++------- 9 files changed, 85 insertions(+), 98 deletions(-) diff --git a/mobile/mysterium/openvpn_connection_setup.go b/mobile/mysterium/openvpn_connection_setup.go index 5452854cce..11fe847c64 100644 --- a/mobile/mysterium/openvpn_connection_setup.go +++ b/mobile/mysterium/openvpn_connection_setup.go @@ -19,13 +19,14 @@ package mysterium import ( "encoding/json" - "net" "sync" "time" "github.com/mysteriumnetwork/go-openvpn/openvpn3" + "github.com/mysteriumnetwork/node/config" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" + "github.com/mysteriumnetwork/node/core/port" "github.com/mysteriumnetwork/node/identity" "github.com/mysteriumnetwork/node/nat/traversal" "github.com/mysteriumnetwork/node/services/openvpn" @@ -49,7 +50,6 @@ func NewOpenVPNConnection(sessionTracker *sessionTracker, signerFactory identity stateCh: make(chan connection.State, 100), natPinger: natPinger, ipResolver: ipResolver, - pingerStop: make(chan struct{}), } sessionFactory := func(options connection.ConnectOptions, sessionConfig openvpn.VPNConfig) (*openvpn3.Session, *openvpn.ClientConfig, error) { @@ -158,24 +158,17 @@ func (c *openvpnConnection) Start(options connection.ConnectOptions) error { sessionConfig.Ports = []int{sessionConfig.RemotePort} } - params := traversal.Params{ - IP: sessionConfig.RemoteIP, - LocalPorts: c.ports, - RemotePorts: sessionConfig.Ports, - } + ip := sessionConfig.RemoteIP + localPorts := c.ports + remotePorts := sessionConfig.Ports - conn, err := c.natPinger.PingProvider(params, sessionConfig.LocalPort) + lPort, rPort, err := c.natPinger.PingProvider(ip, localPorts, remotePorts, sessionConfig.LocalPort) if err != nil { - return err - } - - if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - sessionConfig.LocalPort = addr.Port + return errors.Wrap(err, "could not ping provider") } - if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { - sessionConfig.RemotePort = addr.Port - } + sessionConfig.LocalPort = lPort + sessionConfig.RemotePort = rPort } newSession, clientConfig, err := c.createSession(options, sessionConfig) @@ -217,8 +210,18 @@ func (c *openvpnConnection) GetConfig() (connection.ConsumerConfig, error) { return nil, errors.Wrap(err, "failed to get consumer public IP") } + ports, err := port.NewPool().AcquireMultiple(config.GetInt(config.FlagNATPunchingMaxTTL)) + if err != nil { + return nil, err + } + + for _, p := range ports { + c.ports = append(c.ports, p.Num()) + } + return &openvpn.ConsumerConfig{ - IP: publicIP, + IP: publicIP, + Ports: c.ports, }, nil } diff --git a/mobile/mysterium/wireguard_connection_setup.go b/mobile/mysterium/wireguard_connection_setup.go index bdc1ce6e4e..1086ecc747 100644 --- a/mobile/mysterium/wireguard_connection_setup.go +++ b/mobile/mysterium/wireguard_connection_setup.go @@ -20,13 +20,14 @@ package mysterium import ( "bufio" "encoding/json" - "net" "strings" "sync" "time" + "github.com/mysteriumnetwork/node/config" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" + "github.com/mysteriumnetwork/node/core/port" "github.com/mysteriumnetwork/node/nat/traversal" "github.com/mysteriumnetwork/node/services/wireguard" wireguard_connection "github.com/mysteriumnetwork/node/services/wireguard/connection" @@ -80,6 +81,7 @@ func NewWireGuardConnection(opts wireGuardOptions, device wireguardDevice, ipRes } type wireguardConnection struct { + ports []int closeOnce sync.Once done chan struct{} stateCh chan connection.State @@ -126,26 +128,17 @@ func (c *wireguardConnection) Start(options connection.ConnectOptions) (err erro // TODO this backward compatibility check needs to be removed once we will start using port ranges for all peers. if config.LocalPort > 0 || len(config.Ports) > 0 { - params := traversal.Params{ - IP: config.Provider.Endpoint.IP.String(), - LocalPorts: c.ports, - RemotePorts: config.Ports, - } + ip := config.Provider.Endpoint.IP.String() + localPorts := c.ports + remotePorts := config.Ports - conn, err := c.natPinger.PingProvider(params, 0) + lPort, rPort, err := c.natPinger.PingProvider(ip, localPorts, remotePorts, 0) if err != nil { return errors.Wrap(err, "could not ping provider") } - if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - config.LocalPort = addr.Port - } - - if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { - config.Provider.Endpoint.Port = addr.Port - } - - conn.Close() + config.LocalPort = lPort + config.Provider.Endpoint.Port = rPort } if err := c.device.Start(c.privateKey, config); err != nil { @@ -193,11 +186,21 @@ func (c *wireguardConnection) GetConfig() (connection.ConsumerConfig, error) { if err != nil { return nil, errors.Wrap(err, "failed to get consumer public IP") } + + ports, err := port.NewPool().AcquireMultiple(config.GetInt(config.FlagNATPunchingMaxTTL)) + if err != nil { + return nil, err + } + + for _, p := range ports { + c.ports = append(c.ports, p.Num()) + } } return wireguard.ConsumerConfig{ PublicKey: publicKey, IP: publicIP, + Ports: c.ports, }, nil } diff --git a/nat/traversal/noop.go b/nat/traversal/noop.go index df6e235319..03ef0c83a8 100644 --- a/nat/traversal/noop.go +++ b/nat/traversal/noop.go @@ -17,8 +17,6 @@ package traversal -import "net" - // NoopPinger does nothing type NoopPinger struct{} @@ -45,8 +43,8 @@ func (np *NoopPinger) Start() {} func (np *NoopPinger) Stop() {} // PingProvider does nothing -func (np *NoopPinger) PingProvider(_ Params, proxyPort int) (*net.UDPConn, error) { - return &net.UDPConn{}, nil +func (np *NoopPinger) PingProvider(_ string, _, _ []int, proxyPort int) (int, int, error) { + return 0, 0, nil } // PingTarget does nothing diff --git a/nat/traversal/pinger.go b/nat/traversal/pinger.go index eafcd0c6e1..8ad8940a14 100644 --- a/nat/traversal/pinger.go +++ b/nat/traversal/pinger.go @@ -41,7 +41,7 @@ var ( // NATProviderPinger pings provider and optionally hands off connection to consumer proxy. type NATProviderPinger interface { - PingProvider(params Params, proxyPort int) (*net.UDPConn, error) + PingProvider(ip string, localPorts, remotePorts []int, proxyPort int) (localPort, remotePort int, err error) } // NATPinger is responsible for pinging nat holes @@ -129,24 +129,36 @@ func (p *Pinger) Stop() { } // PingProvider pings provider determined by destination provided in sessionConfig -func (p *Pinger) PingProvider(params Params, proxyPort int) (*net.UDPConn, error) { +func (p *Pinger) PingProvider(ip string, localPorts, remotePorts []int, proxyPort int) (localPort, remotePort int, err error) { log.Info().Msg("NAT pinging to provider") stop := make(chan struct{}) defer close(stop) - conn, err := p.multiPing(params, 128, stop) + conn, err := p.multiPing(ip, localPorts, remotePorts, 128, stop) if err != nil { log.Err(err).Msg("Failed to ping remote peer") - return nil, err + return 0, 0, err + } + + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + localPort = addr.Port } - consumerAddr := fmt.Sprintf("127.0.0.1:%d", proxyPort) - log.Info().Msg("Handing connection to consumer NATProxy: " + consumerAddr) + if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { + remotePort = addr.Port + } + + if proxyPort > 0 { + consumerAddr := fmt.Sprintf("127.0.0.1:%d", proxyPort) + log.Info().Msg("Handing connection to consumer NATProxy: " + consumerAddr) - p.stopNATProxy = p.natProxy.consumerHandOff(consumerAddr, conn) + p.stopNATProxy = p.natProxy.consumerHandOff(consumerAddr, conn) + } else { + conn.Close() + } - return conn, err + return localPort, remotePort, err } func (p *Pinger) ping(conn *net.UDPConn, ttl int, stop <-chan struct{}) error { @@ -273,7 +285,7 @@ func (p *Pinger) pingTargetConsumer(params Params) { stop := make(chan struct{}) defer close(stop) - conn, err := p.multiPing(params, 2, stop) + conn, err := p.multiPing(params.IP, params.LocalPorts, params.RemotePorts, 2, stop) if err != nil { log.Err(err).Msg("Failed to ping remote peer") return @@ -293,8 +305,8 @@ func (p *Pinger) pingTargetConsumer(params Params) { go p.natProxy.handOff(params.ProxyPortMappingKey, conn) } -func (p *Pinger) multiPing(params Params, initialTTL int, stop <-chan struct{}) (*net.UDPConn, error) { - if len(params.LocalPorts) != len(params.RemotePorts) { +func (p *Pinger) multiPing(ip string, localPorts, remotePorts []int, initialTTL int, stop <-chan struct{}) (*net.UDPConn, error) { + if len(localPorts) != len(remotePorts) { return nil, errors.New("number of local and remote ports does not match") } @@ -303,11 +315,11 @@ func (p *Pinger) multiPing(params Params, initialTTL int, stop <-chan struct{}) err error } - ch := make(chan res, len(params.LocalPorts)) + ch := make(chan res, len(localPorts)) - for i := range params.LocalPorts { + for i := range localPorts { go func(i int) { - conn, err := p.singlePing(params.IP, params.LocalPorts[i], params.RemotePorts[i], initialTTL+i, stop) + conn, err := p.singlePing(ip, localPorts[i], remotePorts[i], initialTTL+i, stop) ch <- res{conn, err} }(i) } diff --git a/nat/traversal/pinger_test.go b/nat/traversal/pinger_test.go index 36398d46b1..ada62962e9 100644 --- a/nat/traversal/pinger_test.go +++ b/nat/traversal/pinger_test.go @@ -86,11 +86,7 @@ func TestPinger_Provider_Consumer_Ping_Flow(t *testing.T) { // Start pinging provider. stop := make(chan struct{}) defer close(stop) - _, err := pinger.PingProvider(Params{ - IP: "127.0.0.1", - LocalPorts: []int{consumerPort}, - RemotePorts: []int{providerPort}, - }, consumerPort+1) + _, _, err := pinger.PingProvider("127.0.0.1", []int{consumerPort}, []int{providerPort}, consumerPort+1) assert.NoError(t, err) assert.Contains(t, string(proxyBuf), fmt.Sprintf("continuously pinging to 127.0.0.1:%d", providerPort)) @@ -116,11 +112,7 @@ func TestPinger_PingProvider_Timeout(t *testing.T) { stop := make(chan struct{}) defer close(stop) - _, err := pinger.PingProvider(Params{ - IP: "127.0.0.1", - RemotePorts: []int{providerPort}, - LocalPorts: []int{consumerPort}, - }, 0) + _, _, err := pinger.PingProvider("127.0.0.1", []int{consumerPort}, []int{providerPort}, 0) assert.Error(t, errNATPunchAttemptTimedOut, err) } diff --git a/services/openvpn/client.go b/services/openvpn/client.go index e68b2b8a0b..ed236cead2 100644 --- a/services/openvpn/client.go +++ b/services/openvpn/client.go @@ -19,7 +19,6 @@ package openvpn import ( "encoding/json" - "net" "sync" "time" @@ -62,7 +61,6 @@ func NewClient(openvpnBinary, configDirectory, runtimeDirectory string, stateCh: stateCh, ipResolver: ipResolver, natPinger: natPinger, - pingerStop: make(chan struct{}), removeAllowedIPRule: func() {}, } @@ -134,25 +132,17 @@ func (c *Client) Start(options connection.ConnectOptions) error { sessionConfig.Ports = []int{sessionConfig.RemotePort} } - params := traversal.Params{ - IP: sessionConfig.RemoteIP, - LocalPorts: c.ports, - RemotePorts: sessionConfig.Ports, - } + ip := sessionConfig.RemoteIP + localPorts := c.ports + remotePorts := sessionConfig.Ports - conn, err := c.natPinger.PingProvider(params, sessionConfig.LocalPort) + lPort, rPort, err := c.natPinger.PingProvider(ip, localPorts, remotePorts, sessionConfig.LocalPort) if err != nil { - return err - } - - if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - sessionConfig.LocalPort = addr.Port - } - - if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { - sessionConfig.RemotePort = addr.Port + return errors.Wrap(err, "could not ping provider") } + sessionConfig.LocalPort = lPort + sessionConfig.RemotePort = rPort } proc, clientConfig, err := c.processFactory(options, sessionConfig) diff --git a/services/openvpn/client_test.go b/services/openvpn/client_test.go index 7ddf38203f..e48caa80a5 100644 --- a/services/openvpn/client_test.go +++ b/services/openvpn/client_test.go @@ -18,13 +18,11 @@ package openvpn import ( - "net" "testing" "github.com/mysteriumnetwork/node/core/connection" "github.com/mysteriumnetwork/node/core/ip" "github.com/mysteriumnetwork/node/identity" - "github.com/mysteriumnetwork/node/nat/traversal" "github.com/stretchr/testify/assert" ) @@ -50,6 +48,6 @@ func TestConnection_CreatesConnection(t *testing.T) { type MockNATPinger struct{} // PingProvider does nothing -func (mnp *MockNATPinger) PingProvider(_ traversal.Params, _ int) (*net.UDPConn, error) { - return nil, nil +func (mnp *MockNATPinger) PingProvider(_ string, _, _ []int, _ int) (int, int, error) { + return 0, 0, nil } diff --git a/services/openvpn/service/manager.go b/services/openvpn/service/manager.go index 2594144808..d701368ca9 100644 --- a/services/openvpn/service/manager.go +++ b/services/openvpn/service/manager.go @@ -282,7 +282,7 @@ func (m *Manager) ProvideConfig(_ string, sessionConfig json.RawMessage) (*sessi vpnConfig.LocalPort = cp.Num() } - if m.location.BehindNAT() && m.portMappingFailed() { + if m.behindNAT(publicIP) && m.portMappingFailed() { for range consumerConfig.Ports { pp, err := m.natPingerPorts.Acquire() if err != nil { diff --git a/services/wireguard/connection/connection.go b/services/wireguard/connection/connection.go index dec65ce50d..2883adb6cd 100644 --- a/services/wireguard/connection/connection.go +++ b/services/wireguard/connection/connection.go @@ -122,26 +122,17 @@ func (c *Connection) Start(options connection.ConnectOptions) (err error) { // TODO this backward compatibility check needs to be removed once we will start using port ranges for all peers. if config.LocalPort > 0 || len(config.Ports) > 0 { - params := traversal.Params{ - IP: config.Provider.Endpoint.IP.String(), - LocalPorts: c.ports, - RemotePorts: config.Ports, - } + ip := config.Provider.Endpoint.IP.String() + localPorts := c.ports + remotePorts := config.Ports - conn, err := c.natPinger.PingProvider(params, 0) + lPort, rPort, err := c.natPinger.PingProvider(ip, localPorts, remotePorts, 0) if err != nil { return errors.Wrap(err, "could not ping provider") } - if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { - config.LocalPort = addr.Port - } - - if addr, ok := conn.RemoteAddr().(*net.UDPAddr); ok { - config.Provider.Endpoint.Port = addr.Port - } - - conn.Close() + config.LocalPort = lPort + config.Provider.Endpoint.Port = rPort } log.Info().Msg("Starting new connection")