-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathclient.go
105 lines (94 loc) · 2.76 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package etcd
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"net/http"
etcd "github.com/coreos/etcd/client"
)
type client struct {
keysAPI etcd.KeysAPI
ctx context.Context
}
// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made. The parameter
// machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
// will work, but "localhost:2379" will not.
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
if options.DialTimeout == 0 {
options.DialTimeout = defaultTTL
}
if options.DialKeepAlive == 0 {
options.DialKeepAlive = defaultTTL
}
if options.HeaderTimeoutPerRequest == 0 {
options.HeaderTimeoutPerRequest = defaultTTL
}
transport := etcd.DefaultTransport
if options.Cert != "" && options.Key != "" {
tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
if err != nil {
return nil, err
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
if caCertCt, err := ioutil.ReadFile(options.CACert); err == nil {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCertCt)
tlsCfg.RootCAs = caCertPool
}
transport = &http.Transport{
TLSClientConfig: tlsCfg,
Dial: func(network, address string) (net.Conn, error) {
return (&net.Dialer{
Timeout: options.DialTimeout,
KeepAlive: options.DialKeepAlive,
}).Dial(network, address)
},
}
}
ce, err := etcd.New(etcd.Config{
Endpoints: machines,
Transport: transport,
HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
})
if err != nil {
return nil, err
}
return &client{
keysAPI: etcd.NewKeysAPI(ce),
ctx: ctx,
}, nil
}
// GetEntries implements the etcd Client interface.
func (c *client) GetEntries(key string) ([]string, error) {
resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true})
if err != nil {
return nil, err
}
// Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
// resp.Node.Value is also empty, in which case the key is empty and we
// should not return any entries.
if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
return []string{resp.Node.Value}, nil
}
entries := make([]string, len(resp.Node.Nodes))
for i, node := range resp.Node.Nodes {
entries[i] = node.Value
}
return entries, nil
}
// WatchPrefix implements the etcd Client interface.
func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
ch <- struct{}{} // make sure caller invokes GetEntries
for {
if _, err := watch.Next(c.ctx); err != nil {
return
}
ch <- struct{}{}
}
}