-
Notifications
You must be signed in to change notification settings - Fork 2
/
stela.go
226 lines (195 loc) · 4.9 KB
/
stela.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package stela
import (
"bytes"
"crypto/rand"
"encoding/gob"
"fmt"
"sync"
"time"
)
const (
// Version of stela
Version = "0.11.6"
// DefaultStelaAddress by default stela assumes there is a local instance running
DefaultStelaAddress = "127.0.0.1:31000"
// DefaultStelaPort by default stela assumes there is a local instance running
DefaultStelaPort = 31000
// DefaultMulticastAddress is the multicast IPV6 address stela communicates on
DefaultMulticastAddress = "[ff12::9000]"
// DefaultMulticastPort is the default multicast port
DefaultMulticastPort = 31053
// ServiceName is how stela instances are registered as services
ServiceName = "stela.services.fg"
// DefaultMaxValueBytes only allows the Value byte slice to be 256 bytes
DefaultMaxValueBytes = 256
// DefaultServerName Used for TLS gRPC. Name that matches the common name in the certificate of the server
DefaultServerName = "Stela"
)
// Actions for Service
const (
RegisterAction = iota
DeregisterAction = iota
)
// Service used in request/response
type Service struct {
Name string
Hostname string
IPv4 string
IPv6 string
Port int32
Priority int32
Timeout int32 // The length of time, in milliseconds, before a service is deregistered
Action int32
Client *Client // Store reference to the client that registered the service
Value []byte
id string // Automatically set when the service is registered
registerCh chan struct{}
deregisterCh chan struct{}
stopped bool
mu *sync.Mutex // protects registerCh, deregisterCh, stopped
}
// Stringer
func (s Service) String() string {
return fmt.Sprintf("Name: %s, Hostname: %s, IPv4: %s, IPv6: %s, Port: %d, Action: %d", s.Name, s.Hostname, s.IPv4, s.IPv6, s.Port, s.Action)
}
// Equal tests if a Service is the same
func (s Service) Equal(testService *Service) bool {
// if s.id != testService.id {
// return false
// }
if s.Name != testService.Name {
return false
}
if s.Hostname != testService.Hostname {
return false
}
if s.IPv4 != testService.IPv4 {
return false
}
if s.IPv6 != testService.IPv6 {
return false
}
if s.Port != testService.Port {
return false
}
return true
}
// Valid test the required fields of a Service
func (s *Service) Valid() bool {
if s.Name == "" {
return false
}
if s.Hostname == "" {
return false
}
if s.IPv4 == "" {
if s.IPv6 == "" {
return false
}
}
if s.IPv6 == "" {
if s.IPv4 == "" {
return false
}
}
return true
}
// GenerateID will overwrite the id with a new rand 10 byte string
func (s *Service) GenerateID() error {
id, err := generateID(10)
if err != nil {
return err
}
s.id = id
return nil
}
// ID getter return s.id
func (s *Service) ID() string {
return s.id
}
// IPv4Address helper to combine IPv4 and Port
func (s *Service) IPv4Address() string {
return fmt.Sprintf("%s:%d", s.IPv4, s.Port)
}
// IPv6Address helper to combine IPv4 and Port
func (s *Service) IPv6Address() string {
return fmt.Sprintf("%s:%d", s.IPv6, s.Port)
}
// Client struct holds all the information about a client registering a service
type Client struct {
Address string // IPv4 address
ID string
mu sync.Mutex // protect subscribeCh
subscribeCh chan *Service // used to send changes in services
}
// Equal tests if a Service is the same
func (c *Client) Equal(testClient *Client) bool {
if c.Address != testClient.Address {
return false
}
if c.ID != testClient.ID {
return false
}
return true
}
func (c *Client) init() {
c.mu.Lock()
defer c.mu.Unlock()
if c.subscribeCh == nil {
c.subscribeCh = make(chan *Service)
}
}
// GenerateID will overwrite the id with a new rand 10 byte string
func (c *Client) GenerateID() error {
id, err := generateID(10)
if err != nil {
return err
}
c.ID = id
return nil
}
// SubscribeCh returns a channel of Services that is sent a service when one is removed or added
func (c *Client) SubscribeCh() <-chan *Service {
c.init()
return c.subscribeCh
}
// Notify sends a service to the client's subscribeCh
func (c *Client) Notify(s *Service) {
c.init()
// Notify the client of the new service and timeout if nothing reads it within 10 millisecond
t := time.NewTimer(time.Millisecond * 10)
select {
case c.subscribeCh <- s:
case <-t.C:
t.Stop()
}
}
type value struct {
Val interface{}
}
// EncodeValue converts interface{} to a byte slice
func EncodeValue(v interface{}) []byte {
val := value{
Val: v,
}
buf := new(bytes.Buffer)
enc := gob.NewEncoder(buf)
enc.Encode(val)
return buf.Bytes()
}
// DecodeValue converts byte slice to interface{}
func DecodeValue(b []byte) interface{} {
// Decode
var data value
buf := bytes.NewBuffer(b)
dec := gob.NewDecoder(buf)
dec.Decode(&data)
return data.Val
}
func generateID(length int) (string, error) {
b := make([]byte, length)
if _, err := rand.Read(b); err != nil {
return "", err
}
return fmt.Sprintf("%X", b), nil
}