-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmain.go
301 lines (253 loc) · 8.05 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/*
* Zabbix Agent Bench (C) 2014 Ryan Armstrong <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"flag"
"fmt"
"github.com/mitchellh/colorstring"
"os"
"os/signal"
"runtime"
"strings"
"time"
)
const (
APP = "zabbix_agent_bench"
APP_VERSION = "0.4.0"
APP_AUTHOR = "Ryan Armstrong <[email protected]>"
ZBX_NOTSUPPORTED = "ZBX_NOTSUPPORTED"
)
// command args
var (
debug bool
exitErrorCount bool
host string
iterationLimit int
key string
keyFilePath string
port int
delayMsArg int
staggerMsArg int
threadCount int
timeLimitArg int
timeoutMsArg int
verbose bool
version bool
)
var (
timeout time.Duration
delayDuration time.Duration
)
// flag to signal all threads to stop gracefully
var stop = false
func main() {
// Configure from command line
flag.BoolVar(&version, "version", false, "print version")
flag.StringVar(&host, "host", "localhost", "remote Zabbix agent host")
flag.IntVar(&port, "port", 10050, "remote Zabbix agent TCP port")
flag.IntVar(&timeoutMsArg, "timeout", 3000, "timeout in milliseconds for each zabbix_get request")
flag.IntVar(&delayMsArg, "delay", 0, "delay between queries on each thread in milliseconds")
flag.IntVar(&staggerMsArg, "offset", 0, "delay start of each thread in milliseconds")
flag.IntVar(&threadCount, "threads", runtime.NumCPU(), "number of test threads")
flag.IntVar(&timeLimitArg, "timelimit", 0, "time limit in seconds")
flag.IntVar(&iterationLimit, "iterations", 0, "maximum test iterations of each key")
flag.StringVar(&keyFilePath, "keys", "", "read keys from file path")
flag.StringVar(&key, "key", "", "benchmark a single agent item key")
flag.BoolVar(&exitErrorCount, "strict", false, "exit code to include tally of unsupported items")
flag.BoolVar(&verbose, "verbose", false, "print more output")
flag.BoolVar(&debug, "debug", false, "print program debug messages")
flag.Parse()
timeout = time.Duration(timeoutMsArg) * time.Millisecond
stagger := time.Duration(staggerMsArg) * time.Millisecond
delayDuration = time.Duration(delayMsArg) * time.Millisecond
timeLimit := time.Duration(timeLimitArg) * time.Second
// print version and exit
if version {
fmt.Printf("%s v%s\n", APP, APP_VERSION)
os.Exit(0)
}
// Bind threads to each core
runtime.GOMAXPROCS(runtime.NumCPU())
// Create a list of keys for processing
queuedKeys := ItemKeys{}
// user specified a single key
if key != "" {
queuedKeys = append(queuedKeys, NewItemKey(key))
}
// load item keys from text file
if keyFilePath != "" {
keyFile, err := NewKeyFile(keyFilePath)
PanicOn(err, "Failed to open key file")
// expand discovery item prototypes by doing an actual agent discovery
queuedKeys, err = keyFile.Keys.Expand(host, timeout)
PanicOn(err, "Failed to expand discovery items")
}
// Make sure we have work to do
if 0 == len(queuedKeys) {
fmt.Fprintf(os.Stderr, "No agent item keys specified for testing\n")
os.Exit(1)
}
// TODO: deduplicate the key list
// start producer thread
fmt.Printf("Testing %d keys with %d threads (press Ctrl-C to cancel)...\n", len(queuedKeys), threadCount)
HandleSignals()
statsChan := make(chan *ThreadStats)
producer := StartProducer(queuedKeys, statsChan)
// set time limit if set
if 0 < timeLimit {
timer := time.NewTimer(timeLimit)
go func() {
<-timer.C
stop = true
}()
}
start := time.Now()
// fan out consumer threads to start work
for i := 0; !stop && i < threadCount; i++ {
// Stagger thread start
time.Sleep(stagger)
dprintf("Starting thread %d...\n", i+1)
go StartConsumer(producer, statsChan)
}
// Fan in threads to gather stats
totals := NewThreadStats()
for i := 0; i < threadCount+1; i++ {
threadStats := <-statsChan
totals.Add(threadStats)
}
duration := time.Now().Sub(start)
// Sort the key list
keyNames := queuedKeys.SortedKeyNames()
// Print results per key
longestKeyName := queuedKeys.LongestKeyName()
for _, key := range keyNames {
keyStats := totals.KeyStats[key]
// escape %'s in key name
key = strings.Replace(key, "%", "%%", -1)
// show stats
row := fmt.Sprintf("%-*s :\t%s\t%s\t%s\n", longestKeyName, key, hl(keyStats.Success, "green"), hl(keyStats.NotSupported, "yellow"), hl(keyStats.Error, "red"))
colorstring.Printf(row)
}
// Print totals
fmt.Printf("\n=== Totals ===\n\n")
fmt.Printf("Total values processed:\t\t%d\n", totals.TotalValues)
fmt.Printf("Total unsupported values:\t%d\n", totals.UnsupportedValues)
fmt.Printf("Total transport errors:\t\t%d\n", totals.ErrorCount)
fmt.Printf("Total key list iterations:\t%d\n", totals.Iterations)
colorstring.Printf("\n[green]Finished![default] Processed %d values across %d threads in %s (%f NVPS)\n", totals.TotalValues, threadCount, duration.String(), (float64(totals.TotalValues) / duration.Seconds()))
// exit code
if exitErrorCount {
os.Exit(int(totals.UnsupportedValues + totals.ErrorCount))
} else {
os.Exit(int(totals.ErrorCount))
}
}
// HandleSignals starts a new goroutine to handle signals from the operating
// system and signal other goroutine to gracefully stop.
func HandleSignals() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for {
<-c // Wait for signal
if stop {
// Force exit if user sent SIGINT during cleanup
fmt.Printf("Aborting...\n")
os.Exit(1)
} else {
fmt.Printf("Caught SIGINT. Cleaning up...\n")
stop = true
}
}
}()
}
// StartProducer starts a goroutine which iterates through the list of queued
// agent item check keys and published them sequentially to the returned
// channel until the runtime limits are reached.
func StartProducer(keys ItemKeys, statsChan chan *ThreadStats) <-chan *ItemKey {
c := make(chan *ItemKey)
go func() {
stats := ThreadStats{}
for i := 0; !stop && (iterationLimit <= 0 || i < iterationLimit); i++ {
for _, key := range keys {
if stop {
break
}
// send key to a consumer
c <- key
}
stats.Iterations++
}
close(c)
statsChan <- &stats
}()
return c
}
// StartConsumer consumes ItemKeys from a producer channel, queries the Zabbix
// agent for a response and submits the results to a ThreadStats channel.
func StartConsumer(producer <-chan *ItemKey, statsChan chan *ThreadStats) {
threadStats := NewThreadStats()
// process items as long the producer produces them
for key := range producer {
keyStats := threadStats.KeyStats[key.Key]
// Get the value from Zabbix agent
val, err := Get(host, key.Key, timeout)
// tally stats
if err != nil {
threadStats.ErrorCount++
keyStats.Error++
} else {
threadStats.TotalValues++
if strings.HasPrefix(val, ErrorMessage) {
threadStats.UnsupportedValues++
keyStats.NotSupported++
} else {
keyStats.Success++
}
// Print response
if verbose {
typ := "item"
if key.IsPrototype {
typ = "proto"
} else if key.IsDiscoveryRule {
typ = "disco"
}
fmt.Printf("[%s] %s: %s\n", typ, key.Key, val)
}
}
threadStats.KeyStats[key.Key] = keyStats
// sleep
if delayMsArg > 0 && !stop {
time.Sleep(delayDuration)
}
}
// Push stats to collector channel
statsChan <- threadStats
}
// dprintf prints debug output if debug is enabled.
func dprintf(format string, a ...interface{}) {
if debug {
fmt.Fprintf(os.Stderr, format, a...)
}
}
func hl(val int64, color string) string {
if val > 0 {
return fmt.Sprintf("[%s]%d[default]", color, val)
} else {
return fmt.Sprintf("%d", val)
}
}