-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathkping.go
352 lines (307 loc) · 9.23 KB
/
kping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package kping
import (
"context"
"fmt"
"net"
"os"
"sync"
"time"
"golang.org/x/net/ipv4"
)
type ipEvent struct {
ip string
seq int
sendDuration time.Duration
recvRTT time.Duration
}
// A Statistic is a ping statistic of IP
type Statistic struct {
RecvNum int64 // recv packets number
SentNum int64 // sent packets number
LostNum float64 // lost packets number
RTTs []float64 // RTT of each recv packet
MinRTT float64 // minimum RTT of RTTs
MaxRTT float64 // maxmum RTT of RTTs
AvgRTT float64 // average RTT of RTTs
StdDevRTT float64 // stddev of RTTs
ipEvents map[int]ipEvent // key: ICMP seq
}
// DefaultRecvMode is the default recv mode
var DefaultRecvMode = "afpacket"
// Options representes send & recv options
type Options interface{}
type sendOptions struct {
BatchSize int64
BufferSize int64
Parallel int64
Timeout time.Duration
WaitTimeout time.Duration
}
var defaultSendOptions = sendOptions{
BatchSize: 1024,
BufferSize: 10 * 1024 * 1024,
Parallel: 30,
Timeout: 100 * time.Second,
WaitTimeout: 20 * time.Millisecond,
}
// SendOptions creates batch send options
// batchSize: batch send ICMP packet number, must <= 1024, default: 1024
// bufferSize: batch send buffer size, default: 10MB
// parallel: send goroutine number, default: 30
// timeout: send timeout, default: 100s
// waitTimeout: batch send interval, default: 20ms
func SendOptions(batchSize, bufferSize, parallel int64, timeout, waitTimeout time.Duration) (options Options, err error) {
return sendOptions{
BatchSize: batchSize,
BufferSize: bufferSize,
Parallel: parallel,
Timeout: timeout,
WaitTimeout: waitTimeout,
}, nil
}
type batchRecvOptions struct {
BatchSize int64
BufferSize int64
Parallel int64
Timeout time.Duration
}
var defaultBatchRecvOptions = batchRecvOptions{
BatchSize: 100,
BufferSize: 10 * 1024 * 1024,
Parallel: 10,
Timeout: 100 * time.Millisecond,
}
// BatchRecvOptions creates batch recv options
// batchSize: batch recv ICMP packet number, must <= 1024, default: 100
// bufferSize: batch recv buffer size, default: 10MB
// parallel: recv goroutine number, default: 10
// timeout: recv timeout, default: 100ms
func BatchRecvOptions(batchSize, bufferSize, parallel int64, timeout time.Duration) (options Options, err error) {
return batchRecvOptions{
BatchSize: batchSize,
BufferSize: bufferSize,
Parallel: parallel,
Timeout: timeout,
}, nil
}
type afpacketRecvOptions struct {
Parallel int64
BlockMB int64
PollTimeout time.Duration
Iface string
}
var defaultAfPacketRecvOptions = afpacketRecvOptions{
Parallel: 1,
BlockMB: 128,
PollTimeout: 100 * time.Millisecond,
Iface: "eth0",
}
// AfPacketRecvOptions creates af_packet recv options
// parallel: recv goroutine number, default: 1
// blockMB: af_packet: total block size, default: 128MB
// timeout: af_packet: poll timeout, default: 100ms
// iface: recv interface name, default: eth0
func AfPacketRecvOptions(parallel, blockMB int64, iface string, timeout time.Duration) (options Options, err error) {
return afpacketRecvOptions{
Parallel: parallel,
BlockMB: blockMB,
PollTimeout: timeout,
Iface: iface,
}, nil
}
// A Pinger provides various methods of kping
type Pinger interface {
// SetRecvMode set recv mode, oneof: afpacket(default)|batch
SetRecvMode(recvMode string) error
// SetOptions set send or recv options
SetOptions(options Options) error
// AddIPs add IP addrs to pinger
AddIPs(addrs []string) error
// Run flood ping, then calculate statistic of each IP
Run() (statistics map[string]*Statistic, err error)
}
type kping struct {
sourceIP string // ping source IP
count int64 // ping count, must < 55536
size int64 // ping ICMP packet payload bytes, must > 8
timeout time.Duration // ping total timeout
interval time.Duration // ping interval
recvMode string
sendOpts sendOptions
afpacketRecvOpts afpacketRecvOptions
batchRecvOpts batchRecvOptions
rawConn *rawConn
addrs []*net.IPAddr
stats map[string]*Statistic // key: ip
ipCount int64
recvReady chan bool
sendDone chan bool
sendLock *sync.Mutex
ipEventChan chan *ipEvent
context context.Context
cancel context.CancelFunc
}
// NewPinger returns a new pinger
// sourceIP: ping source IP
// count: number of request packets to send to each IP
// size: ICMP payload size in bytes, must >= 8
// timeout: the time to wait for all ping done
// interval: the time between sending a ping packet to each IP
func NewPinger(sourceIP string, count, size int64, timeout, interval time.Duration) (p Pinger, err error) {
p = &kping{
sourceIP: sourceIP,
count: count,
size: size,
timeout: timeout,
interval: interval,
recvMode: DefaultRecvMode,
sendOpts: defaultSendOptions,
afpacketRecvOpts: defaultAfPacketRecvOptions,
batchRecvOpts: defaultBatchRecvOptions,
recvReady: make(chan bool),
sendDone: make(chan bool),
sendLock: new(sync.Mutex),
}
return p, nil
}
func (p *kping) SetRecvMode(mode string) (err error) {
switch mode {
case "afpacket", "batch":
p.recvMode = mode
default:
return fmt.Errorf("unknown recv mode: %s, should be oneof: afpacket|batch", p.recvMode)
}
return nil
}
func (p *kping) SetOptions(options Options) (err error) {
switch opts := options.(type) {
case sendOptions:
p.sendOpts = opts
case batchRecvOptions:
p.batchRecvOpts = opts
case afpacketRecvOptions:
p.afpacketRecvOpts = opts
}
return nil
}
func (p *kping) AddIPs(ipaddrs []string) error {
p.addrs = make([]*net.IPAddr, 0, len(ipaddrs))
p.stats = make(map[string]*Statistic, len(ipaddrs))
for _, ipaddr := range ipaddrs {
addr, err := net.ResolveIPAddr("ip4", ipaddr)
if err != nil {
return fmt.Errorf("invalid IP: %s", ipaddr)
}
p.addrs = append(p.addrs, addr)
p.ipCount++
}
return nil
}
type addrBatch struct {
seq int
addrs []*net.IPAddr
}
func (p *kping) Run() (statistics map[string]*Statistic, err error) {
// used by send & recv, so buffer size is double
p.ipEventChan = make(chan *ipEvent, p.ipCount*p.count*2)
// set context
ctx, cancel := context.WithTimeout(context.TODO(), p.timeout)
p.context = ctx
p.cancel = cancel
defer p.cancel()
// create raw socket connection
rawConn, err := newRawConn(p.sourceIP, p.batchRecvOpts.BufferSize, p.sendOpts.BufferSize, p.batchRecvOpts.Timeout, p.sendOpts.Timeout)
if err != nil {
return nil, err
}
defer rawConn.close()
p.rawConn = rawConn
if p.recvMode == "batch" {
// filter ICMP Echo & Reply type packet
filter := ipv4.ICMPFilter{}
filter.SetAll(false)
filter.Accept(ipv4.ICMPTypeEchoReply)
if err := rawConn.setICMPFilter(&filter); err != nil {
return nil, fmt.Errorf("setICMPFilter failed: %v", err)
}
}
// receive packets
go func() {
defer close(p.ipEventChan)
var recvParallel int64
var recvFunc func(index int, wg *sync.WaitGroup)
switch p.recvMode {
case "afpacket":
recvParallel = p.afpacketRecvOpts.Parallel
recvFunc = p.afpacketRecv
case "batch":
recvParallel = p.batchRecvOpts.Parallel
recvFunc = p.batchRecv
default:
panic(fmt.Sprintf("ping recv: unknown recvMode: %s", p.recvMode))
}
fmt.Fprintf(os.Stderr, "kping recv: started, parallel %d, ipCount: %d\n", recvParallel, p.ipCount)
wg := new(sync.WaitGroup)
stime := time.Now()
for i := 0; i < int(recvParallel); i++ {
wg.Add(1)
index := i
go recvFunc(index, wg)
}
// receive ready
close(p.recvReady)
wg.Wait()
fmt.Fprintf(os.Stderr, "kping recv: all done, ipCount: %d, usedTime: %s\n", p.ipCount, time.Since(stime))
}()
// send packets
go func() {
defer close(p.sendDone)
// wait receive ready
<-p.recvReady
stime := time.Now()
addrBatchChan := make(chan addrBatch)
fmt.Fprintf(os.Stderr, "kping send: started, parallel %d, ipCount: %d, \n", p.sendOpts.Parallel, p.ipCount)
// cocurenccy send packets
for i := 0; i < int(p.sendOpts.Parallel); i++ {
index := i
go p.send(index, addrBatchChan) // after close addrBatchChan, goroutine return
}
// caculate batch number
var batchNum = 0
if p.ipCount%p.sendOpts.BatchSize == 0 {
batchNum = int(p.ipCount / p.sendOpts.BatchSize)
} else {
batchNum = int(p.ipCount/p.sendOpts.BatchSize + 1)
}
// fill address for each batch
addrBatchs := make([]addrBatch, batchNum)
for i := range addrBatchs {
addrBatchs[i] = addrBatch{
addrs: make([]*net.IPAddr, 0, p.sendOpts.BatchSize),
}
}
for i, addr := range p.addrs {
j := i / int(p.sendOpts.BatchSize)
batch := addrBatchs[j]
batch.addrs = append(batch.addrs, addr)
addrBatchs[j] = batch
}
// send extra 10 packets
for n := 0; n < int(p.count+10); n++ {
stime := time.Now()
for _, batch := range addrBatchs {
batch.seq = n
addrBatchChan <- batch
}
time.Sleep(p.interval)
fmt.Fprintf(os.Stderr, "kping send: seq %d(%d) done, usedTime: %s\n", n, p.count+10, time.Since(stime))
}
// sent done, sleep 1s
close(addrBatchChan)
time.Sleep(1 * time.Second)
fmt.Fprintf(os.Stderr, "kping send: all done, parallel: %d, ipCount: %d, usedTime: %s\n", p.sendOpts.Parallel, p.ipCount, time.Since(stime))
}()
p.process()
statistics = p.statistic()
return statistics, nil
}