-
Notifications
You must be signed in to change notification settings - Fork 72
/
Copy pathvirtualaddr.go
179 lines (146 loc) · 3.94 KB
/
virtualaddr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package tunnel
import (
"net"
"strconv"
"sync"
"sync/atomic"
"github.com/koding/logging"
)
type listener struct {
net.Listener
*vaddrOptions
done int32
// ips keeps track of registered clients for ip-based routing;
// when last client is deleted from the ip routing map, we stop
// listening on connections
ips map[string]struct{}
}
type vaddrOptions struct {
connCh chan<- net.Conn
log logging.Logger
}
type vaddrStorage struct {
*vaddrOptions
listeners map[net.Listener]*listener
ports map[int]string // port-based routing: maps port number to identifier
ips map[string]string // ip-based routing: maps ip address to identifier
mu sync.RWMutex
}
func newVirtualAddrs(opts *vaddrOptions) *vaddrStorage {
return &vaddrStorage{
vaddrOptions: opts,
listeners: make(map[net.Listener]*listener),
ports: make(map[int]string),
ips: make(map[string]string),
}
}
func (l *listener) serve() {
for {
conn, err := l.Accept()
if err != nil {
l.log.Error("failue listening on %q: %s", l.Addr(), err)
return
}
if atomic.LoadInt32(&l.done) != 0 {
l.log.Debug("stopped serving %q", l.Addr())
conn.Close()
return
}
l.connCh <- conn
}
}
func (l *listener) localAddr() string {
if addr, ok := l.Addr().(*net.TCPAddr); ok {
if addr.IP.Equal(net.IPv4zero) {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(addr.Port))
}
}
return l.Addr().String()
}
func (l *listener) stop() {
if atomic.CompareAndSwapInt32(&l.done, 0, 1) {
// stop is called when no more connections should be accepted by
// the user-provided listener; as we can't simple close the listener
// to not break the guarantee given by the (*Server).DeleteAddr
// method, we make a dummy connection to break out of serve loop.
// It is safe to make a dummy connection, as either the following
// dial will time out when the listener is busy accepting connections,
// or will get closed immadiately after idle listeners accepts connection
// and returns from the serve loop.
conn, err := net.DialTimeout("tcp", l.localAddr(), defaultTimeout)
if err == nil {
conn.Close()
}
}
}
func (vaddr *vaddrStorage) Add(l net.Listener, ip net.IP, ident string) {
vaddr.mu.Lock()
defer vaddr.mu.Unlock()
lis, ok := vaddr.listeners[l]
if !ok {
lis = vaddr.newListener(l)
vaddr.listeners[l] = lis
go lis.serve()
}
if ip != nil {
lis.ips[ip.String()] = struct{}{}
vaddr.ips[ip.String()] = ident
} else {
vaddr.ports[mustPort(l)] = ident
}
}
func (vaddr *vaddrStorage) Delete(l net.Listener, ip net.IP) {
vaddr.mu.Lock()
defer vaddr.mu.Unlock()
lis, ok := vaddr.listeners[l]
if !ok {
return
}
var stop bool
if ip != nil {
delete(lis.ips, ip.String())
delete(vaddr.ips, ip.String())
stop = len(lis.ips) == 0
} else {
delete(vaddr.ports, mustPort(l))
stop = true
}
// Only stop listening for connections when listener has clients
// registered to tunnel the connections to.
if stop {
lis.stop()
delete(vaddr.listeners, l)
}
}
func (vaddr *vaddrStorage) newListener(l net.Listener) *listener {
return &listener{
Listener: l,
vaddrOptions: vaddr.vaddrOptions,
ips: make(map[string]struct{}),
}
}
func (vaddr *vaddrStorage) getIdent(conn net.Conn) (string, bool) {
vaddr.mu.Lock()
defer vaddr.mu.Unlock()
ip, port, err := parseHostPort(conn.LocalAddr().String())
if err != nil {
vaddr.log.Debug("failed to get identifier for connection %q: %s", conn.LocalAddr(), err)
return "", false
}
// First lookup if there's a ip-based route, then try port-base one.
if ident, ok := vaddr.ips[ip]; ok {
return ident, true
}
ident, ok := vaddr.ports[port]
return ident, ok
}
func mustPort(l net.Listener) int {
_, port, err := parseHostPort(l.Addr().String())
if err != nil {
// This can happened when user passed custom type that
// implements net.Listener, which returns ill-formed
// net.Addr value.
panic("ill-formed net.Addr: " + err.Error())
}
return port
}