Skip to content

Commit

Permalink
[INLONG-11668][SDK] Add max life time support for the connections in …
Browse files Browse the repository at this point in the history
…conn pool of Golang SDK (#11669)
  • Loading branch information
gunli authored Jan 15, 2025
1 parent a110f9f commit 9995c22
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 34 deletions.
2 changes: 2 additions & 0 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type Options struct {
BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false
AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of the AddColumns, just a cache, used internal
Auth Auth // dataproxy authentication interface
MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers)
}
```

Expand Down
191 changes: 163 additions & 28 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/connpool/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,50 @@ var (

// Dialer is the interface of a dialer that return a NetConn
type Dialer interface {
Dial(addr string) (gnet.Conn, error)
// Dial dials to the addr and bind ctx to the returned connection, which network(TCP/UDP) to use is determined by the Dialer
// Dial should use gnet.Client.DialContext() to get a connection that can be driven by a gnet event engine.
Dial(addr string, ctx any) (gnet.Conn, error)
}

// ConnContext is the additional attributes to set to a gnet.Conn
type ConnContext struct {
CreatedAt time.Time // the created time of the connection
Endpoint string // the address of the remote endpoint
}

// EndpointRestrictedConnPool is the interface of a simple endpoint restricted connection pool that
// the connection's remote address must be in an endpoint list, if not, it will be closed and can
// not be used anymore, it is useful for holding the connections to a service whose endpoints can
// be changed at runtime.
// Best practice:
// gnet is a high-performance networking package, the best way to use this pool is:
// 1. call Get() to get a gnet.Conn;
// 2. use the conn to read/write for a duration, 1m, for example, and then put the conn back to the pool and get a new one for load balancing, avoid putting/getting frequently;
// 3. do not switch(put and get) to a new conn in the callback of gnet.Conn.AsyncWrite(buf []byte, callback AsyncCallback) or gnet.Conn.AsyncWritev(bs [][]byte, callback AsyncCallback), it may be blocked;
// 4. if you use TCP conn and can not update endpoints by service discovery directly, for example, your endpoints are behind at the back of a LB, it is better to set a max lifetime
// for your pool, so that you can restart your endpoints(RS) without data lost by:
// 1). set the weight of your endpoint(RS) to 0, so that no new connection incoming;
// 2). wait for the existing connections to close by lifetime timeout;
// 3). restart your endpoint.
type EndpointRestrictedConnPool interface {
// Get gets a connection
// Get gets a connection, it's concurrency-safe, but you can not call it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev().
Get() (gnet.Conn, error)
// Put puts a connection back to the pool, if err is not nil, the connection will be closed by the pool
// Put puts a connection back to the pool, if err is not nil, the connection will be closed by the pool, it's concurrency-safe,
// but you can not call it in the callback of gnet.Conn.AsyncWrite() or gnet.Conn.AsyncWritev().
Put(conn gnet.Conn, err error)
// UpdateEndpoints updates the endpoints the pool to dial to
// UpdateEndpoints updates the endpoints the pool to dial to, it's not concurrency-safe.
UpdateEndpoints(all, add, del []string)
// NumPooled returns the connection number in the pool, not the number of all the connection that the pool created
// NumPooled returns the connection number in the pool, not the number of all the connection that the pool created, it's concurrency-safe.
NumPooled() int
// OnConnClosed used to notify that a connection is closed, the connection will be removed from the pool, if err is not nil, the remote endpoint will mark as unavailable
// OnConnClosed used to notify that a connection is closed, the connection will be removed from the pool, if err is not nil, the remote endpoint will mark as unavailable, it's concurrency-safe.
OnConnClosed(conn gnet.Conn, err error)
// Close closes the pool
Close()
}

// NewConnPool news a EndpointRestrictedConnPool
func NewConnPool(initEndpoints []string, connsPerEndpoint, size int,
dialer Dialer, log logger.Logger) (EndpointRestrictedConnPool, error) {
dialer Dialer, log logger.Logger, maxConnLifetime time.Duration) (EndpointRestrictedConnPool, error) {
if len(initEndpoints) == 0 {
return nil, ErrInitEndpointEmpty
}
Expand Down Expand Up @@ -107,7 +126,8 @@ func NewConnPool(initEndpoints []string, connsPerEndpoint, size int,
Multiplier: 2,
Randomization: 0.5,
},
closeCh: make(chan struct{}),
closeCh: make(chan struct{}),
maxConnLifetime: maxConnLifetime,
}

// store endpoints
Expand Down Expand Up @@ -143,7 +163,25 @@ type connPool struct {
backoff util.ExponentialBackoff
closeCh chan struct{}
closeOnce sync.Once
endpointConnCounts sync.Map // store the conn count of each endpoint
endpointConnCounts sync.Map // store the conn count of each endpoint
maxConnLifetime time.Duration // the max lifetime of a connection
}

func (p *connPool) expired(conn gnet.Conn) bool {
if conn == nil || p.maxConnLifetime <= 0 {
return false
}

ctx := conn.Context()
if ctx == nil {
return false
}

connCtx, ok := ctx.(ConnContext)
if !ok {
return false
}
return connCtx.CreatedAt.Add(p.maxConnLifetime).Before(time.Now())
}

func (p *connPool) Get() (gnet.Conn, error) {
Expand Down Expand Up @@ -204,7 +242,7 @@ func (p *connPool) newConn() (gnet.Conn, error) {

func (p *connPool) dialNewConn(ep string) (gnet.Conn, error) {
p.log.Debug("dialNewConn()")
conn, err := p.dialer.Dial(ep)
conn, err := p.dialer.Dial(ep, ConnContext{CreatedAt: time.Now(), Endpoint: ep})
if err != nil {
p.markUnavailable(ep)
return nil, err
Expand Down Expand Up @@ -278,6 +316,15 @@ func (p *connPool) put(conn gnet.Conn, err error, isNewConn bool) {
return
}

// if conn is expired, close it
if p.expired(conn) {
p.log.Debug("connection expired, close it, addr:", addr, ", err:", err)
CloseConn(conn, defaultConnCloseDelay)
// 关闭连接后,可用连接数变少,addr对应的节点的连接数可能也不均衡,尽管会递归调用当前函数,仍在这里追加创建新的连接
_ = p.appendNewConn(addr)
return
}

select {
case p.connChan <- conn:
// update the conn count
Expand Down Expand Up @@ -488,6 +535,17 @@ func (p *connPool) recoverAndRebalance() {
reBalanceTicker := time.NewTicker(defaultConnCloseDelay + 30*time.Second)
defer reBalanceTicker.Stop()

// clean expired conn every minute
var cleanExpiredConnTicker *time.Ticker
if p.maxConnLifetime > 0 {
cleanExpiredConnTicker = time.NewTicker(1 * time.Minute)
}
defer func() {
if cleanExpiredConnTicker != nil {
cleanExpiredConnTicker.Stop()
}
}()

for {
select {
case <-recoverTicker.C:
Expand All @@ -502,10 +560,79 @@ func (p *connPool) recoverAndRebalance() {
p.rebalance()
case <-p.closeCh:
return
default:
if cleanExpiredConnTicker != nil {
select {
case <-cleanExpiredConnTicker.C:
p.cleanExpiredConns()
default:
time.Sleep(time.Second)
}
}
}
}
}

func getRemoteAddr(conn gnet.Conn) string {
if conn == nil {
return ""
}

addr := conn.RemoteAddr()
if addr != nil {
return addr.String()
}
ctx := conn.Context()
if ctx == nil {
return ""
}

connCtx, ok := ctx.(ConnContext)
if !ok {
return ""
}
return connCtx.Endpoint
}

func (p *connPool) cleanExpiredConns() {
p.log.Debug("cleanExpiredConns()")
var leftConns []gnet.Conn
var expiredConns []gnet.Conn
loop:
for i := 0; i < cap(p.connChan); i++ {
select {
case conn := <-p.connChan:
if p.expired(conn) {
expiredConns = append(expiredConns, conn)
continue
}

// not the expired conn, put it back
leftConns = append(leftConns, conn)
default:
// no more conn, exit the loop
break loop
}
}

// put the conn back to the chan
for _, left := range leftConns {
select {
case p.connChan <- left:
default:
CloseConn(left, defaultConnCloseDelay)
}
}

// close the expired conn and append new conn with the same addr
for _, expired := range expiredConns {
addr := getRemoteAddr(expired)
p.log.Debug("connection expired, close it, addr:", addr, ", err:", nil)
CloseConn(expired, defaultConnCloseDelay)
_ = p.appendNewConn(addr)
}
}

func (p *connPool) dump() {
p.log.Debug("all endpoints:")
eps := p.endpoints.Load()
Expand Down Expand Up @@ -542,7 +669,7 @@ func (p *connPool) recover() bool {
}
if time.Since(lastUnavailable) > p.backoff.Next(retries) {
// try to create new conn
conn, err := p.dialer.Dial(key.(string))
conn, err := p.dialer.Dial(key.(string), ConnContext{CreatedAt: time.Now(), Endpoint: key.(string)})
if err == nil {
p.log.Debug("endpoint recovered, addr: ", key)
p.put(conn, nil, true)
Expand Down Expand Up @@ -675,15 +802,11 @@ func (p *connPool) rebalance() {

// add new conn
for i := currentCount; i < expectedConnPerEndpoint; i++ {
conn, err := p.dialNewConn(addr)
if err == nil {
p.log.Debug("adding connection for addr: ", addr)
p.put(conn, nil, true)
rebalanced = true
} else {
p.log.Warn("failed to add connection during rebalancing, addr: ", addr, ", err: ", err)
break
err := p.appendNewConn(addr)
if err != nil {
continue
}
rebalanced = true
}
} else if currentCount > expectedConnPerEndpoint {
rebalanced = true
Expand All @@ -699,15 +822,11 @@ func (p *connPool) rebalance() {
return true
}
for i := 0; i < expectedConnPerEndpoint; i++ {
conn, err := p.dialNewConn(addr)
if err == nil {
p.log.Debug("adding connection for addr: ", addr)
p.put(conn, nil, true)
rebalanced = true
} else {
p.log.Warn("failed to add connection during rebalancing, addr: ", addr, ", err: ", err)
break
err := p.appendNewConn(addr)
if err != nil {
continue
}
rebalanced = true
}
return true
})
Expand All @@ -717,6 +836,22 @@ func (p *connPool) rebalance() {
}
}

func (p *connPool) appendNewConn(addr string) error {
if addr == "" {
return errors.New("addr is empty")
}

conn, err := p.dialNewConn(addr)
if err != nil {
p.log.Warn("failed to add connection, addr: ", addr, ", err: ", err)
return err
}

p.log.Debug("adding connection for addr: ", addr)
p.put(conn, nil, true)
return nil
}

func (p *connPool) removeEndpointConn(addr string, count int) {
var leftConns []gnet.Conn
var removed int
Expand All @@ -730,7 +865,7 @@ loop:
}

if remoteAddr.String() == addr {
p.log.Info("reducing connection for addr: ", addr)
p.log.Debug("reducing connection for addr: ", addr)
// we do not decrease conn count here, if the frequence of rebalancing is less then defaultConnCloseDelay, may lead to an inaccurate expected conn count per endpoint
CloseConn(conn, defaultConnCloseDelay)
removed++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *client) initConns() error {

// minimum connection number per endpoint is 1
connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / float64(epLen)))
pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log)
pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log, c.options.MaxConnLifetime)
if err != nil {
return err
}
Expand All @@ -176,7 +176,7 @@ func (c *client) initConns() error {
}

func (c *client) initFramer() error {
framer, err := framer.NewLengthField(framer.LengthFieldCfg{
fr, err := framer.NewLengthField(framer.LengthFieldCfg{
MaxFrameLen: 64 * 1024,
FieldOffset: 0,
FieldLength: 4,
Expand All @@ -186,7 +186,7 @@ func (c *client) initFramer() error {
if err != nil {
return err
}
c.framer = framer
c.framer = fr
return nil
}

Expand All @@ -211,8 +211,8 @@ func (c *client) initWorkers() error {
return nil
}

func (c *client) Dial(addr string) (gnet.Conn, error) {
return c.netClient.Dial("tcp", addr)
func (c *client) Dial(addr string, ctx any) (gnet.Conn, error) {
return c.netClient.DialContext("tcp", addr, ctx)
}

func (c *client) Send(ctx context.Context, msg Message) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util"

"github.com/prometheus/client_golang/prometheus"

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -78,6 +79,7 @@ type Options struct {
AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy
addColumnStr string // the string format of the AddColumns, just a cache, used internal
Auth Auth // dataproxy authentication interface
MaxConnLifetime time.Duration // connection max lifetime, default: 0, set to 5m/10m when the servers provide service though CLBs (Cloud Load Balancers)
}

// ValidateAndSetDefault validates an options and set up the default values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,10 @@ func WithAuth(auth Auth) Option {
o.Auth = auth
}
}

// WithMaxConnLifetime sets MaxConnLifetime
func WithMaxConnLifetime(lifetime time.Duration) Option {
return func(o *Options) {
o.MaxConnLifetime = lifetime
}
}

0 comments on commit 9995c22

Please sign in to comment.