Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[relay client] choose nearest relay based on latency #2952

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ func (cc *connContainer) close() {
// the client can be reused by calling Connect again. When the client is closed, all connections are closed too.
// While the Connect is in progress, the OpenConn function will block until the connection is established with relay server.
type Client struct {
log *log.Entry
parentCtx context.Context
connectionURL string
authTokenStore *auth.TokenStore
hashedID []byte
log *log.Entry
parentCtx context.Context
connectionURL string
authTokenStore *auth.TokenStore
hashedID []byte
InitialConnectionTime time.Duration

bufPool *sync.Pool

Expand Down Expand Up @@ -264,11 +265,12 @@ func (c *Client) Close() error {
}

func (c *Client) connect() error {
conn, err := ws.Dial(c.connectionURL)
conn, latency, err := ws.Dial(c.connectionURL)
if err != nil {
return err
}
c.relayConn = conn
c.InitialConnectionTime = latency

err = c.handShake()
if err != nil {
Expand Down
19 changes: 13 additions & 6 deletions relay/client/dialer/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"net"
"net/http"
"net/http/httptrace"
"net/url"
"strings"
"time"

log "github.com/sirupsen/logrus"
"nhooyr.io/websocket"
Expand All @@ -15,10 +17,10 @@ import (
nbnet "github.com/netbirdio/netbird/util/net"
)

func Dial(address string) (net.Conn, error) {
func Dial(address string) (net.Conn, time.Duration, error) {
wsURL, err := prepareURL(address)
if err != nil {
return nil, err
return nil, 0, err
}

opts := &websocket.DialOptions{
Expand All @@ -27,21 +29,26 @@ func Dial(address string) (net.Conn, error) {

parsedURL, err := url.Parse(wsURL)
if err != nil {
return nil, err
return nil, 0, err
}
parsedURL.Path = ws.URLPath

wsConn, resp, err := websocket.Dial(context.Background(), parsedURL.String(), opts)
var connStart, firstByte time.Time
ctx := httptrace.WithClientTrace(context.Background(), &httptrace.ClientTrace{
ConnectStart: func(network, addr string) { connStart = time.Now() },
GotFirstResponseByte: func() { firstByte = time.Now() },
})
wsConn, resp, err := websocket.Dial(ctx, parsedURL.String(), opts)
if err != nil {
log.Errorf("failed to dial to Relay server '%s': %s", wsURL, err)
return nil, err
return nil, 0, err
}
if resp.Body != nil {
_ = resp.Body.Close()
}

conn := NewConn(wsConn, address)
return conn, nil
return conn, firstByte.Sub(connStart), nil
}

func prepareURL(address string) (string, error) {
Expand Down
88 changes: 68 additions & 20 deletions relay/client/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ const (
)

var (
connectionTimeout = 30 * time.Second
connectionTimeout = 30 * time.Second
connectionSortingtimeout = 500 * time.Millisecond
)

type connResult struct {
RelayClient *Client
Url string
Err error
Latency time.Duration
}

type ServerPicker struct {
Expand All @@ -43,25 +45,31 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
concurrentLimiter := make(chan struct{}, maxConcurrentServers)

log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string))
go sp.processConnResults(connResultChan, successChan)
for _, url := range sp.ServerURLs.Load().([]string) {
// todo check if we have a successful connection so we do not need to connect to other servers
concurrentLimiter <- struct{}{}
go func(url string) {
defer func() {
<-concurrentLimiter
}()
sp.startConnection(parentCtx, connResultChan, url)
}(url)
select {
case concurrentLimiter <- struct{}{}:
go func(url string) {
defer func() {
<-concurrentLimiter
}()
sp.startConnection(parentCtx, connResultChan, url)
}(url)
case cr, ok := <-successChan:
if !ok {
return nil, errors.New("failed to connect to any relay server: all attempts failed")
}
log.Infof("chosen home Relay server: %s with latency %s", cr.Url, cr.Latency)
return cr.RelayClient, nil
}
}

go sp.processConnResults(connResultChan, successChan)

select {
case cr, ok := <-successChan:
if !ok {
return nil, errors.New("failed to connect to any relay server: all attempts failed")
}
log.Infof("chosen home Relay server: %s", cr.Url)
log.Infof("chosen home Relay server: %s with latency %s", cr.Url, cr.Latency)
return cr.RelayClient, nil
case <-ctx.Done():
return nil, fmt.Errorf("failed to connect to any relay server: %w", ctx.Err())
Expand All @@ -76,29 +84,69 @@ func (sp *ServerPicker) startConnection(ctx context.Context, resultChan chan con
RelayClient: relayClient,
Url: url,
Err: err,
Latency: relayClient.InitialConnectionTime,
}
}

func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult) {
var hasSuccess bool
var bestLatencyResult connResult
bestLatencyResult.Latency = time.Hour
processingCtx := context.Background()
var processingCtxCancel context.CancelFunc
for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ {
cr := <-resultChan
var cr connResult
select {
case <-processingCtx.Done():
log.Tracef("terminating Relay server sorting early")
successChan <- bestLatencyResult
close(successChan)
successChan = nil // Prevent any more sending to successChan
// Continue receiving connections to terminate any more
cr = <-resultChan
case cr = <-resultChan:
}
if cr.Err != nil {
log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err)
continue
}
log.Infof("connected to Relay server: %s", cr.Url)
log.Infof("connected to Relay server: %s with latency %s", cr.Url, cr.Latency)

if hasSuccess {
log.Infof("closing unnecessary Relay connection to: %s", cr.Url)
if err := cr.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", cr.Url, err)
}
continue
cr = lowestLatency(cr, bestLatencyResult)
}

// First successful connection, start a timer to return the result
if !hasSuccess {
processingCtx, processingCtxCancel = context.WithTimeout(processingCtx, connectionSortingtimeout)
}
hasSuccess = true
successChan <- cr
bestLatencyResult = cr
}

if processingCtxCancel != nil {
processingCtxCancel()
}
if successChan == nil {
return
}

if bestLatencyResult.RelayClient != nil {
successChan <- bestLatencyResult
}
close(successChan)
}

func lowestLatency(a, b connResult) connResult {
if a.Latency > b.Latency {
if err := b.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", b.Url, err)
}
return a
}

if err := a.RelayClient.Close(); err != nil {
log.Errorf("failed to close connection to %s: %v", a.Url, err)
}
return b
}
Loading