Skip to content

Commit

Permalink
refactor to support other rpc clients
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Apr 9, 2021
1 parent 7cece5b commit fa5e597
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 37 deletions.
Binary file removed _documents/rpcx_tech_support.png
Binary file not shown.
7 changes: 7 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type RPCClient interface {

IsClosing() bool
IsShutdown() bool

GetConn() net.Conn
}

// Client represents a RPC client.
Expand Down Expand Up @@ -129,6 +131,11 @@ func (c *Client) RemoteAddr() string {
return c.Conn.RemoteAddr().String()
}

// GetConn returns the underlying conn.
func (c *Client) GetConn() net.Conn {
return c.Conn
}

// Option contains all options for creating clients.
type Option struct {
// Group is used to select the services in the same group. Services set group info in their meta.
Expand Down
117 changes: 80 additions & 37 deletions client/xclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ type XClient interface {
DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
Close() error

RegisterCacheClientBuilder(network string, builder CacheClientBuilder)
}

type CacheClientBuilder interface {
setCachedClient(client RPCClient, k, servicePath, serviceMethod string)
findCachedClient(k, servicePath, serviceMethod string) RPCClient
deleteCachedClient(client RPCClient, k, servicePath, serviceMethod string)
generateClient(k, servicePath, serviceMethod string) (client RPCClient, err error)
}

// KVPair contains a key and a string.
Expand Down Expand Up @@ -84,10 +93,11 @@ type xClient struct {
servicePath string
option Option

mu sync.RWMutex
servers map[string]string
discovery ServiceDiscovery
selector Selector
mu sync.RWMutex
servers map[string]string
discovery ServiceDiscovery
selector Selector
cacheClientBuilders map[string]CacheClientBuilder

slGroup singleflight.Group

Expand All @@ -106,12 +116,13 @@ type xClient struct {
// NewXClient creates a XClient that supports service discovery and service governance.
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
client := &xClient{
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
option: option,
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
cacheClientBuilders: make(map[string]CacheClientBuilder),
option: option,
}

pairs := discovery.GetServices()
Expand Down Expand Up @@ -171,6 +182,12 @@ func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode S
return client
}

func (c *xClient) RegisterCacheClientBuilder(network string, builder CacheClientBuilder) {
c.mu.Lock()
c.mu.Unlock()
c.cacheClientBuilders[network] = builder
}

// SetSelector sets customized selector by users.
func (c *xClient) SetSelector(s Selector) {
c.mu.RLock()
Expand Down Expand Up @@ -255,7 +272,7 @@ func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, a
var needCallPlugin bool
defer func() {
if needCallPlugin {
c.Plugins.DoClientConnected((client.(*Client)).Conn)
c.Plugins.DoClientConnected(client.GetConn())
}
}()
c.mu.Lock()
Expand Down Expand Up @@ -302,20 +319,52 @@ func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, a
}

func (c *xClient) setCachedClient(client RPCClient, k, servicePath, serviceMethod string) {
network, _ := splitNetworkAndAddress(k)
if builder, ok := c.cacheClientBuilders[network]; ok {
builder.setCachedClient(client, k, servicePath, serviceMethod)
return
}

c.cachedClient[k] = client
}

func (c *xClient) findCachedClient(k, servicePath, serviceMethod string) RPCClient {
network, _ := splitNetworkAndAddress(k)
if builder, ok := c.cacheClientBuilders[network]; ok {
return builder.findCachedClient(k, servicePath, serviceMethod)
}

return c.cachedClient[k]
}

func (c *xClient) deleteCachedClient(client RPCClient, k, servicePath, serviceMethod string) {
network, _ := splitNetworkAndAddress(k)
if builder, ok := c.cacheClientBuilders[network]; ok && client != nil {
builder.deleteCachedClient(client, k, servicePath, serviceMethod)
client.Close()
return
}

delete(c.cachedClient, k)
if client != nil {
client.Close()
}
}

func (c *xClient) removeClient(k, servicePath, serviceMethod string, client RPCClient) {
c.mu.Lock()
cl := c.findCachedClient(k, servicePath, serviceMethod)
if cl == client {
c.deleteCachedClient(client, k, servicePath, serviceMethod)
}
c.mu.Unlock()

if client != nil {
client.UnregisterServerMessageChan()
client.Close()
}
}

func (c *xClient) generateClient(k, servicePath, serviceMethod string) (client RPCClient, err error) {
client = &Client{
option: c.option,
Expand All @@ -339,51 +388,45 @@ func (c *xClient) generateClient(k, servicePath, serviceMethod string) (client R
}

func (c *xClient) getCachedClientWithoutLock(k, servicePath, serviceMethod string) (RPCClient, error) {
client := c.cachedClient[k]
client := c.findCachedClient(k, servicePath, serviceMethod)
if client != nil {
if !client.IsClosing() && !client.IsShutdown() {
return client, nil
}
delete(c.cachedClient, k)
client.Close()
c.deleteCachedClient(client, k, servicePath, serviceMethod)
}

var needCallPlugin bool
defer func() {
if needCallPlugin {
c.Plugins.DoClientConnected(client.GetConn())
}
}()

// double check
client = c.cachedClient[k]
client = c.findCachedClient(k, servicePath, serviceMethod)
if client == nil || client.IsShutdown() {
network, addr := splitNetworkAndAddress(k)

client = &Client{
option: c.option,
Plugins: c.Plugins,
}
err := client.Connect(network, addr)
generatedClient, err, _ := c.slGroup.Do(k, func() (interface{}, error) {
return c.generateClient(k, servicePath, serviceMethod)
})
c.slGroup.Forget(k)
if err != nil {
return nil, err
}

client = generatedClient.(RPCClient)
if c.Plugins != nil {
needCallPlugin = true
}

client.RegisterServerMessageChan(c.serverMessageChan)

c.cachedClient[k] = client
c.setCachedClient(client, k, servicePath, serviceMethod)
}

return client, nil
}

func (c *xClient) removeClient(k, servicePath, serviceMethod string, client RPCClient) {
c.mu.Lock()
cl := c.cachedClient[k]
if cl == client {
delete(c.cachedClient, k)
}
c.mu.Unlock()

if client != nil {
client.UnregisterServerMessageChan()
client.Close()
}
}

func splitNetworkAndAddress(server string) (string, string) {
ss := strings.SplitN(server, "@", 2)
if len(ss) == 1 {
Expand Down

0 comments on commit fa5e597

Please sign in to comment.