From 8cf5ed742b18ddb2ebbfcc5517cbbaff62b594b5 Mon Sep 17 00:00:00 2001 From: gunli <24350715@qq.com> Date: Wed, 22 Jan 2025 14:41:29 +0800 Subject: [PATCH] [INLONG-11703][SDK] Improve the cleanExpiredConnTicker select logic in the connpool of the Golang SDK (#11704) Co-authored-by: gunli --- .../dataproxy-sdk-golang/connpool/connpool.go | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go index 6c28e10d53..5588091dc7 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go @@ -144,7 +144,7 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint, size int, } // starts a backbround task, do rebalancing and recovering periodically - go pool.recoverAndRebalance() + go pool.innerWork() return pool, nil } @@ -527,8 +527,8 @@ func (p *connPool) markUnavailable(ep string) { p.retryCounts.Store(ep, 0) } -// recoverAndRebalance recovers the down endpoint and rebalaces the conns periodically -func (p *connPool) recoverAndRebalance() { +// innerWork recovers the down endpoint, rebalaces the conns and cleans the expired conns periodically +func (p *connPool) innerWork() { // server failure is a low-probability event, so there's basically no endpoint need to recover, a higher frequency is also acceptable recoverTicker := time.NewTicker(10 * time.Second) defer recoverTicker.Stop() @@ -540,15 +540,12 @@ func (p *connPool) recoverAndRebalance() { defer reBalanceTicker.Stop() // clean expired conn every minute - var cleanExpiredConnTicker *time.Ticker + var cleanExpiredConnChan <-chan time.Time if p.maxConnLifetime > 0 { - cleanExpiredConnTicker = time.NewTicker(1 * time.Minute) + cleanExpiredConnTicker := time.NewTicker(1 * time.Minute) + defer cleanExpiredConnTicker.Stop() + cleanExpiredConnChan = cleanExpiredConnTicker.C } - defer func() { - if cleanExpiredConnTicker != nil { - cleanExpiredConnTicker.Stop() - } - }() for { select { @@ -564,17 +561,8 @@ func (p *connPool) recoverAndRebalance() { p.rebalance() case <-p.closeCh: return - default: - if cleanExpiredConnTicker != nil { - select { - case <-cleanExpiredConnTicker.C: - p.cleanExpiredConns() - default: - time.Sleep(time.Second) - } - } else { - time.Sleep(time.Second) - } + case <-cleanExpiredConnChan: + p.cleanExpiredConns() } } }