-
Notifications
You must be signed in to change notification settings - Fork 241
/
Copy pathredis.go
118 lines (103 loc) · 2.49 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package goworker
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net/url"
"time"
"github.com/gomodule/redigo/redis"
"vitess.io/vitess/go/pools"
)
var (
errorInvalidScheme = errors.New("invalid Redis database URI scheme")
)
type RedisConn struct {
redis.Conn
}
func (r *RedisConn) Close() {
_ = r.Conn.Close()
}
func newRedisFactory(uri string) pools.Factory {
return func() (pools.Resource, error) {
return redisConnFromURI(uri)
}
}
func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool {
return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout)
}
func redisConnFromURI(uriString string) (*RedisConn, error) {
uri, err := url.Parse(uriString)
if err != nil {
return nil, err
}
var network string
var host string
var password string
var db string
var dialOptions []redis.DialOption
switch uri.Scheme {
case "redis", "rediss":
network = "tcp"
host = uri.Host
if uri.User != nil {
password, _ = uri.User.Password()
}
if len(uri.Path) > 1 {
db = uri.Path[1:]
}
if uri.Scheme == "rediss" {
dialOptions = append(dialOptions, redis.DialUseTLS(true))
dialOptions = append(dialOptions, redis.DialTLSSkipVerify(workerSettings.SkipTLSVerify))
if len(workerSettings.TLSCertPath) > 0 {
pool, err := getCertPool(workerSettings.TLSCertPath)
if err != nil {
return nil, err
}
config := &tls.Config{
RootCAs: pool,
}
dialOptions = append(dialOptions, redis.DialTLSConfig(config))
}
}
case "unix":
network = "unix"
host = uri.Path
default:
return nil, errorInvalidScheme
}
conn, err := redis.Dial(network, host, dialOptions...)
if err != nil {
return nil, err
}
if password != "" {
_, err := conn.Do("AUTH", password)
if err != nil {
conn.Close()
return nil, err
}
}
if db != "" {
_, err := conn.Do("SELECT", db)
if err != nil {
conn.Close()
return nil, err
}
}
return &RedisConn{Conn: conn}, nil
}
func getCertPool(certPath string) (*x509.CertPool, error) {
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}
certs, err := ioutil.ReadFile(workerSettings.TLSCertPath)
if err != nil {
return nil, fmt.Errorf("Failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err)
}
if ok := rootCAs.AppendCertsFromPEM(certs); !ok {
return nil, fmt.Errorf("Failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err)
}
return rootCAs, nil
}