From ed1a18a0bfa2ecb88c92cbe91450653adba30704 Mon Sep 17 00:00:00 2001 From: M Essam Hamed Date: Mon, 25 Nov 2024 18:52:40 +0200 Subject: [PATCH 1/5] relay client: choose nearest relay based on latency Signed-off-by: M Essam Hamed --- relay/client/picker.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/relay/client/picker.go b/relay/client/picker.go index eb5062dbb75..b6de0962c0f 100644 --- a/relay/client/picker.go +++ b/relay/client/picker.go @@ -24,6 +24,7 @@ type connResult struct { RelayClient *Client Url string Err error + Latency time.Duration } type ServerPicker struct { @@ -61,7 +62,7 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) { if !ok { return nil, errors.New("failed to connect to any relay server: all attempts failed") } - log.Infof("chosen home Relay server: %s", cr.Url) + log.Infof("chosen home Relay server: %s with latency %s", cr.Url, cr.Latency) return cr.RelayClient, nil case <-ctx.Done(): return nil, fmt.Errorf("failed to connect to any relay server: %w", ctx.Err()) @@ -71,34 +72,46 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) { func (sp *ServerPicker) startConnection(ctx context.Context, resultChan chan connResult, url string) { log.Infof("try to connecting to relay server: %s", url) relayClient := NewClient(ctx, url, sp.TokenStore, sp.PeerID) + start := time.Now() err := relayClient.Connect() resultChan <- connResult{ RelayClient: relayClient, Url: url, Err: err, + Latency: time.Since(start), } } func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult) { var hasSuccess bool + var bestLatencyResult connResult + bestLatencyResult.Latency = time.Hour for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ { cr := <-resultChan if cr.Err != nil { log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err) continue } - log.Infof("connected to Relay server: %s", cr.Url) + log.Infof("connected to Relay server: %s with latency %s", cr.Url, cr.Latency) - if hasSuccess { + // Already connected to a lower latency server + if hasSuccess && cr.Latency > bestLatencyResult.Latency { log.Infof("closing unnecessary Relay connection to: %s", cr.Url) if err := cr.RelayClient.Close(); err != nil { log.Errorf("failed to close connection to %s: %v", cr.Url, err) } continue + } else if hasSuccess { // Connected to a higher latency server in bestLatencyResult, disconnect from it + log.Infof("closing unnecessary Relay connection to: %s", bestLatencyResult.Url) + if err := bestLatencyResult.RelayClient.Close(); err != nil { + log.Errorf("failed to close connection to %s: %v", bestLatencyResult.Url, err) + } } hasSuccess = true - successChan <- cr + bestLatencyResult = cr } + + successChan <- bestLatencyResult close(successChan) } From db61cf20c5dd4d458f6a20e8678651bccf7acb22 Mon Sep 17 00:00:00 2001 From: M Essam Hamed Date: Tue, 26 Nov 2024 15:16:39 +0200 Subject: [PATCH 2/5] Do not send nil relayClient to successChan --- relay/client/picker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/relay/client/picker.go b/relay/client/picker.go index b6de0962c0f..52ac1088eb2 100644 --- a/relay/client/picker.go +++ b/relay/client/picker.go @@ -112,6 +112,8 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh bestLatencyResult = cr } - successChan <- bestLatencyResult + if bestLatencyResult.RelayClient != nil { + successChan <- bestLatencyResult + } close(successChan) } From 18216ba6429467557d5011d00ff226caa8ff8022 Mon Sep 17 00:00:00 2001 From: M Essam Hamed Date: Tue, 26 Nov 2024 21:15:38 +0200 Subject: [PATCH 3/5] Add timeout after first relay success --- relay/client/client.go | 14 ++++++++------ relay/client/dialer/ws/ws.go | 19 +++++++++++++------ relay/client/picker.go | 29 +++++++++++++++++++++++++---- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/relay/client/client.go b/relay/client/client.go index db5252f504d..337c5c96622 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -123,11 +123,12 @@ func (cc *connContainer) close() { // the client can be reused by calling Connect again. When the client is closed, all connections are closed too. // While the Connect is in progress, the OpenConn function will block until the connection is established with relay server. type Client struct { - log *log.Entry - parentCtx context.Context - connectionURL string - authTokenStore *auth.TokenStore - hashedID []byte + log *log.Entry + parentCtx context.Context + connectionURL string + authTokenStore *auth.TokenStore + hashedID []byte + InitialConnectionTime time.Duration bufPool *sync.Pool @@ -264,11 +265,12 @@ func (c *Client) Close() error { } func (c *Client) connect() error { - conn, err := ws.Dial(c.connectionURL) + conn, latency, err := ws.Dial(c.connectionURL) if err != nil { return err } c.relayConn = conn + c.InitialConnectionTime = latency err = c.handShake() if err != nil { diff --git a/relay/client/dialer/ws/ws.go b/relay/client/dialer/ws/ws.go index d9388aafdf7..3390f211c63 100644 --- a/relay/client/dialer/ws/ws.go +++ b/relay/client/dialer/ws/ws.go @@ -5,8 +5,10 @@ import ( "fmt" "net" "net/http" + "net/http/httptrace" "net/url" "strings" + "time" log "github.com/sirupsen/logrus" "nhooyr.io/websocket" @@ -15,10 +17,10 @@ import ( nbnet "github.com/netbirdio/netbird/util/net" ) -func Dial(address string) (net.Conn, error) { +func Dial(address string) (net.Conn, time.Duration, error) { wsURL, err := prepareURL(address) if err != nil { - return nil, err + return nil, 0, err } opts := &websocket.DialOptions{ @@ -27,21 +29,26 @@ func Dial(address string) (net.Conn, error) { parsedURL, err := url.Parse(wsURL) if err != nil { - return nil, err + return nil, 0, err } parsedURL.Path = ws.URLPath - wsConn, resp, err := websocket.Dial(context.Background(), parsedURL.String(), opts) + var connStart, firstByte time.Time + ctx := httptrace.WithClientTrace(context.Background(), &httptrace.ClientTrace{ + ConnectStart: func(network, addr string) { connStart = time.Now() }, + GotFirstResponseByte: func() { firstByte = time.Now() }, + }) + wsConn, resp, err := websocket.Dial(ctx, parsedURL.String(), opts) if err != nil { log.Errorf("failed to dial to Relay server '%s': %s", wsURL, err) - return nil, err + return nil, 0, err } if resp.Body != nil { _ = resp.Body.Close() } conn := NewConn(wsConn, address) - return conn, nil + return conn, firstByte.Sub(connStart), nil } func prepareURL(address string) (string, error) { diff --git a/relay/client/picker.go b/relay/client/picker.go index 52ac1088eb2..505de501dd5 100644 --- a/relay/client/picker.go +++ b/relay/client/picker.go @@ -17,7 +17,8 @@ const ( ) var ( - connectionTimeout = 30 * time.Second + connectionTimeout = 30 * time.Second + connectionSortingtimeout = 500 * time.Millisecond ) type connResult struct { @@ -72,13 +73,12 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) { func (sp *ServerPicker) startConnection(ctx context.Context, resultChan chan connResult, url string) { log.Infof("try to connecting to relay server: %s", url) relayClient := NewClient(ctx, url, sp.TokenStore, sp.PeerID) - start := time.Now() err := relayClient.Connect() resultChan <- connResult{ RelayClient: relayClient, Url: url, Err: err, - Latency: time.Since(start), + Latency: relayClient.InitialConnectionTime, } } @@ -86,8 +86,20 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh var hasSuccess bool var bestLatencyResult connResult bestLatencyResult.Latency = time.Hour + processingCtx := context.Background() + var processingCtxCancel context.CancelFunc for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ { - cr := <-resultChan + var cr connResult + select { + case <-processingCtx.Done(): + log.Tracef("terminating Relay server sorting early") + successChan <- bestLatencyResult + close(successChan) + successChan = nil // Prevent any more sending to successChan + // Continue receiving connections to terminate any more + cr = <-resultChan + case cr = <-resultChan: + } if cr.Err != nil { log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err) continue @@ -108,10 +120,19 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh } } + // First successful connection, start a timer to return the result + if !hasSuccess { + processingCtx, processingCtxCancel = context.WithTimeout(processingCtx, connectionSortingtimeout) + } hasSuccess = true bestLatencyResult = cr } + processingCtxCancel() + if successChan == nil { + return + } + if bestLatencyResult.RelayClient != nil { successChan <- bestLatencyResult } From adf31ce0a2206c38affc562a00851383bf5f85fe Mon Sep 17 00:00:00 2001 From: M Essam Hamed Date: Wed, 27 Nov 2024 10:03:20 +0200 Subject: [PATCH 4/5] check if we have a successful connection so we do not need to connect to other servers --- relay/client/picker.go | 54 +++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/relay/client/picker.go b/relay/client/picker.go index 505de501dd5..6a8be22e2a7 100644 --- a/relay/client/picker.go +++ b/relay/client/picker.go @@ -45,19 +45,25 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) { concurrentLimiter := make(chan struct{}, maxConcurrentServers) log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string)) + go sp.processConnResults(connResultChan, successChan) for _, url := range sp.ServerURLs.Load().([]string) { - // todo check if we have a successful connection so we do not need to connect to other servers - concurrentLimiter <- struct{}{} - go func(url string) { - defer func() { - <-concurrentLimiter - }() - sp.startConnection(parentCtx, connResultChan, url) - }(url) + select { + case concurrentLimiter <- struct{}{}: + go func(url string) { + defer func() { + <-concurrentLimiter + }() + sp.startConnection(parentCtx, connResultChan, url) + }(url) + case cr, ok := <-successChan: + if !ok { + return nil, errors.New("failed to connect to any relay server: all attempts failed") + } + log.Infof("chosen home Relay server: %s with latency %s", cr.Url, cr.Latency) + return cr.RelayClient, nil + } } - go sp.processConnResults(connResultChan, successChan) - select { case cr, ok := <-successChan: if !ok { @@ -106,18 +112,8 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh } log.Infof("connected to Relay server: %s with latency %s", cr.Url, cr.Latency) - // Already connected to a lower latency server - if hasSuccess && cr.Latency > bestLatencyResult.Latency { - log.Infof("closing unnecessary Relay connection to: %s", cr.Url) - if err := cr.RelayClient.Close(); err != nil { - log.Errorf("failed to close connection to %s: %v", cr.Url, err) - } - continue - } else if hasSuccess { // Connected to a higher latency server in bestLatencyResult, disconnect from it - log.Infof("closing unnecessary Relay connection to: %s", bestLatencyResult.Url) - if err := bestLatencyResult.RelayClient.Close(); err != nil { - log.Errorf("failed to close connection to %s: %v", bestLatencyResult.Url, err) - } + if hasSuccess { + cr = lowestLatency(cr, bestLatencyResult) } // First successful connection, start a timer to return the result @@ -138,3 +134,17 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh } close(successChan) } + +func lowestLatency(a, b connResult) connResult { + if a.Latency > b.Latency { + if err := b.RelayClient.Close(); err != nil { + log.Errorf("failed to close connection to %s: %v", b.Url, err) + } + return a + } + + if err := a.RelayClient.Close(); err != nil { + log.Errorf("failed to close connection to %s: %v", a.Url, err) + } + return b +} From df1c9059ffd3e8ad1b4cb425891049338cf7844a Mon Sep 17 00:00:00 2001 From: M Essam Hamed Date: Thu, 5 Dec 2024 16:21:25 +0200 Subject: [PATCH 5/5] Fix panic on nil processingCtxCancel --- relay/client/picker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/relay/client/picker.go b/relay/client/picker.go index 6a8be22e2a7..f211f7b6c16 100644 --- a/relay/client/picker.go +++ b/relay/client/picker.go @@ -124,7 +124,9 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh bestLatencyResult = cr } - processingCtxCancel() + if processingCtxCancel != nil { + processingCtxCancel() + } if successChan == nil { return }