-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.go
154 lines (131 loc) · 3.72 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (c) 2014-2017 Bitmark Inc.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package bitmarkdClient
import (
"crypto/tls"
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"sync"
"sync/atomic"
"time"
)
var ErrRPCRequestTimeout = fmt.Errorf("rpc requests timed out")
var ErrRPCConnectionClosed = fmt.Errorf("rpc connection closed")
var ErrCannotEstablishConnection = fmt.Errorf("connection can not be established")
// RPCEmptyArguments is an empty argument for rpc requests
type RPCEmptyArguments struct{}
// PersistentRPCClient is client that will maintain a long-lived connection for requests
type PersistentRPCClient struct {
sync.Mutex
*rpc.Client
address string
closed chan struct{}
connected bool
timeout time.Duration
}
func NewPersistentRPCClient(address string, timeout time.Duration) *PersistentRPCClient {
return &PersistentRPCClient{
address: address,
timeout: timeout,
closed: make(chan struct{}),
}
}
// Call is to make an RPC request. It will check whether the connection is still alive.
// If not, it will try to create one.
func (c *PersistentRPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
client, err := c.client()
if err != nil {
return ErrCannotEstablishConnection
}
select {
case call := <-client.Go(serviceMethod, args, reply, make(chan *rpc.Call, 1)).Done:
return call.Error
case <-c.closed:
return ErrRPCConnectionClosed
case <-time.After(c.timeout):
return ErrRPCRequestTimeout
}
}
// Close will close the current connection
func (c *PersistentRPCClient) Close() error {
c.Lock()
defer c.Unlock()
if !c.connected {
return nil
}
c.connected = false
close(c.closed)
return c.Client.Close()
}
// connect will establish a TCP connection to the remote
func (c *PersistentRPCClient) client() (*rpc.Client, error) {
c.Lock()
defer c.Unlock()
if c.connected {
return c.Client, nil
}
conn, err := tls.Dial("tcp", c.address, &tls.Config{
InsecureSkipVerify: true,
})
if err != nil {
return nil, fmt.Errorf("can not dial to: %s, error: %s", c.address, err.Error())
}
c.connected = true
c.closed = make(chan struct{})
c.Client = jsonrpc.NewClient(conn)
return c.Client, nil
}
// BitmarkdRPCClient is a client to make bitmarkd RPC requests. It maintains
// a list of PersistentRPCClient that will create connections to bitmarkd.
type BitmarkdRPCClient struct {
sync.RWMutex
counter uint32
clients map[string]*PersistentRPCClient
addresses []string
}
// NewBitmarkdRPCClient is to create a rpc client for bitmarkd
func New(addresses []string, timeout time.Duration) *BitmarkdRPCClient {
clients := map[string]*PersistentRPCClient{}
for _, addr := range addresses {
clients[addr] = NewPersistentRPCClient(addr, timeout)
}
client := &BitmarkdRPCClient{
addresses: addresses,
clients: clients,
}
return client
}
// Close will terminate all rpc clients
func (bc *BitmarkdRPCClient) Close() {
bc.Lock()
defer bc.Unlock()
for _, c := range bc.clients {
c.Close()
}
}
// client will return a client based on the addresses list in a round-robin manner
func (bc *BitmarkdRPCClient) client() *PersistentRPCClient {
bc.RLock()
defer bc.RUnlock()
counter := atomic.AddUint32(&bc.counter, 1)
index := int(counter) % len(bc.addresses)
addr := bc.addresses[index]
return bc.clients[addr]
}
// call will first get a rpc client by `client` function and use that client to request an RPC
func (bc *BitmarkdRPCClient) call(command string, args interface{}, reply interface{}) error {
client := bc.client()
err := client.Call(command, args, reply)
if err != nil {
if _, ok := err.(*net.OpError); ok {
client.Close()
}
if err == rpc.ErrShutdown {
client.Close()
}
}
return err
}