-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsampler.go
190 lines (156 loc) · 4.03 KB
/
sampler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package brahms
import (
"context"
"crypto/sha256"
"math/big"
"math/rand"
"sync"
"time"
)
// SampleRank describes a rank based on 32 bytes of data as a big number
type SampleRank [32]byte
// ToInt converts the bytes to the big nr
func (sr SampleRank) ToInt() *big.Int {
return new(big.Int).SetBytes(sr[:])
}
// MaxSampleRank is the maximum rank a sample can reach
var MaxSampleRank = SampleRank{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
}
// Prober allows for probing peers to determine if they are still online
type Prober interface {
Probe(ctx context.Context, c chan<- NID, id NID, n Node)
}
// Sampler holds a sample from a node stream such that it is not biased by the
// nr of times a appears in the stream.
type Sampler struct {
seeds [][32]byte
mins []SampleRank
sample []Node
invalid map[NID]time.Time
ito time.Duration
prober Prober
mu sync.RWMutex
}
// NewSampler initializes a sampler with the provided source of randomness
func NewSampler(rnd *rand.Rand, l2 int, pr Prober, ito time.Duration) (s *Sampler) {
s = &Sampler{
mins: make([]SampleRank, l2),
sample: make([]Node, l2),
seeds: make([][32]byte, l2),
invalid: make(map[NID]time.Time),
ito: ito,
prober: pr,
}
for i := 0; i < l2; i++ {
s.mins[i] = MaxSampleRank
rnd.Read(s.seeds[i][:])
}
return
}
// Validate if a random subset of the sampled nodes are still alive
func (s *Sampler) Validate(rnd *rand.Rand, n int, to time.Duration) {
sample := s.Sample().Pick(rnd, n)
probes := make(chan NID, len(sample))
// @TODO probe only an unpredictable subset every call
done := make(chan struct{})
go func() {
ctx, cancel := context.WithTimeout(context.Background(), to)
defer cancel()
// probe all currently sampled nodes
var wg sync.WaitGroup
for id, n := range sample {
wg.Add(1)
go func(id NID, n Node) { s.prober.Probe(ctx, probes, id, n); wg.Done() }(id, n)
}
// wait for all the return early, or context to cancel whatever is still probing
wg.Wait()
close(done)
}()
<-done
//read the indexes of all probes that returned a response
alive := map[NID]struct{}{}
DRAIN:
for {
select {
case id := <-probes:
alive[id] = struct{}{}
default:
break DRAIN
}
}
// remove any sample that didn't respond (in time) to the probe
s.mu.Lock()
for i, n := range s.sample {
id := n.Hash()
if _, ok := sample[id]; !ok {
continue //node was not probed, keep it for now
}
if _, ok := alive[id]; ok {
continue //this sample replied to the probe, keep it
}
// reset the sample otherwise and mark as invalidated
s.invalid[id] = time.Now()
s.sample[i] = Node{}
s.mins[i] = MaxSampleRank
}
// clear old invalidated nodes
for id, t := range s.invalid {
if time.Now().Sub(t) < s.ito {
continue //still fresh
}
//eviction expired
delete(s.invalid, id)
}
s.mu.Unlock()
}
// Update the sampler with a new set of ids
func (s *Sampler) Update(v View) {
s.mu.Lock()
defer s.mu.Unlock()
for _, n := range v.Sorted() {
id := n.Hash()
for i, v := range s.mins {
// we use a seeded crypto hash to rank a sample
hv := SampleRank(sha256.Sum256(append(id[:], s.seeds[i][:]...)))
if hv.ToInt().Cmp(v.ToInt()) < 0 {
s.mins[i] = hv
s.sample[i] = n
}
}
}
return
}
// Sample returns a un-biases sample from all seen nodes
func (s *Sampler) Sample() (v View) {
s.mu.RLock()
defer s.mu.RUnlock()
v = View{}
for _, n := range s.sample {
if n.IsZero() {
continue
}
v[n.Hash()] = n
}
return
}
// Clear the sampler of all samples and mins
func (s *Sampler) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
for i := range s.mins {
s.mins[i] = MaxSampleRank
}
s.sample = make([]Node, len(s.mins))
}
// RecentlyInvalidated returns whether a given node was recently invalidated
// due to a failing probe
func (s *Sampler) RecentlyInvalidated(id NID) (ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok = s.invalid[id]
return
}