Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include start time, exported metrics, and poll duration in coll… #2493

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions cmd/collectors/ems/ems.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

const defaultDataPollDuration = 3 * time.Minute
const maxURLSize = 8_000 //bytes
const maxURLSize = 8_000 // bytes
const severityFilterPrefix = "message.severity="
const defaultSeverityFilter = "alert|emergency|error|informational|notice"
const MaxBookendInstances = 1000
Expand Down Expand Up @@ -213,7 +213,7 @@ func (e *Ems) InitCache() error {
continue
}

//populate prop counter for asup
// populate prop counter for asup
eventName := line.GetChildContentS("name")
e.Prop.Counters[eventName] = eventName

Expand Down Expand Up @@ -243,7 +243,7 @@ func (e *Ems) InitCache() error {
}
e.emsProp[prop.Name] = append(e.emsProp[prop.Name], &prop)
}
//add severity filter
// add severity filter
e.Filter = append(e.Filter, e.severityFilter)
return nil
}
Expand Down Expand Up @@ -294,10 +294,13 @@ func (e *Ems) PollInstance() (map[string]*matrix.Matrix, error) {
ReturnTimeout(e.ReturnTimeOut).
Build()

apiT := time.Now()
if records, err = e.GetRestData(href); err != nil {
return nil, err
}
apiD := time.Since(apiT)

parseT := time.Now()
if len(records) == 0 {
return nil, errs.New(errs.ErrNoInstance, e.Object+" no ems message found on cluster")
}
Expand All @@ -316,7 +319,8 @@ func (e *Ems) PollInstance() (map[string]*matrix.Matrix, error) {
names = append(names, key)
}

//filter out names which exists on the cluster. ONTAP rest ems throws error for a message.name filter if that event is not supported by that cluster
// Filter out names which exist on the cluster.
// ONTAP rest ems throws error for a message.name filter if that event is not supported by that cluster
filteredNames, _ := util.Intersection(names, emsEventCatalogue)
e.Logger.Debug().Strs("querying for events", filteredNames).Msg("")
_, missingNames := util.Intersection(filteredNames, names)
Expand All @@ -337,6 +341,12 @@ func (e *Ems) PollInstance() (map[string]*matrix.Matrix, error) {
if bookendCacheSize > MaxBookendInstances {
e.Logger.Warn().Int("total instances", bookendCacheSize).Msg("cache has more than 1000 instances")
}

// update metadata for collector logs
_ = e.Metadata.LazySetValueInt64("api_time", "instance", apiD.Microseconds())
_ = e.Metadata.LazySetValueInt64("parse_time", "instance", time.Since(parseT).Microseconds())
_ = e.Metadata.LazySetValueUint64("instances", "instance", uint64(bookendCacheSize))

return nil, nil
}

Expand Down Expand Up @@ -440,13 +450,13 @@ func parseProperties(instanceData gjson.Result, property string) gjson.Result {
value := gjson.Get(instanceData.String(), property)
return value
}
//strip parameters. from property name
// strip parameters. from property name
_, after, found := strings.Cut(property, "parameters.")
if found {
property = after
}

//process parameter search
// process parameter search
t := gjson.Get(instanceData.String(), "parameters.#.name")

for _, name := range t.Array() {
Expand Down Expand Up @@ -536,7 +546,7 @@ func (e *Ems) HandleResults(result []gjson.Result, prop map[string][]*emsProp) (
}
} else {
if _, ok := m[msgName]; !ok {
//create matrix if not exists for the ems event
// create matrix if not exists for the ems event
mx = matrix.New(msgName, e.Prop.Object, msgName)
mx.SetGlobalLabels(e.Matrix[e.Object].GetGlobalLabels())
m[msgName] = mx
Expand Down Expand Up @@ -594,12 +604,12 @@ func (e *Ems) HandleResults(result []gjson.Result, prop map[string][]*emsProp) (
}
}

//set labels
// set labels
for k, v := range p.Labels {
instance.SetLabel(k, v)
}

//matches filtering
// matches filtering
if len(p.Matches) == 0 {
isMatchPs = true
} else {
Expand All @@ -610,7 +620,7 @@ func (e *Ems) HandleResults(result []gjson.Result, prop map[string][]*emsProp) (
break
}
} else {
//value not found
// value not found
e.Logger.Warn().
Str("Instance key", instanceKey).
Str("name", match.Name).
Expand Down
27 changes: 22 additions & 5 deletions cmd/collectors/restperf/restperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,26 @@ func (r *RestPerf) PollCounter() (map[string]*matrix.Matrix, error) {
return nil, errs.New(errs.ErrConfig, "empty url")
}

apiT := time.Now()
records, err = rest.Fetch(r.Client, href)
if err != nil {
return r.handleError(err, href)
}

return r.pollCounter(records)
return r.pollCounter(records, time.Since(apiT))
}

func (r *RestPerf) pollCounter(records []gjson.Result) (map[string]*matrix.Matrix, error) {
func (r *RestPerf) pollCounter(records []gjson.Result, apiD time.Duration) (map[string]*matrix.Matrix, error) {
var (
err error
counterSchema gjson.Result
parseT time.Time
)
mat := r.Matrix[r.Object]
firstRecord := records[0]

parseT = time.Now()

if firstRecord.Exists() {
counterSchema = firstRecord.Get("counter_schemas")
} else {
Expand Down Expand Up @@ -344,6 +349,11 @@ func (r *RestPerf) pollCounter(records []gjson.Result) (map[string]*matrix.Matri
return nil, err
}

// update metadata for collector logs
_ = r.Metadata.LazySetValueInt64("api_time", "counter", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "counter", time.Since(parseT).Microseconds())
_ = r.Metadata.LazySetValueUint64("metrics", "counter", uint64(len(r.perfProp.counterInfo)))

return nil, nil
}

Expand Down Expand Up @@ -1366,15 +1376,16 @@ func (r *RestPerf) PollInstance() (map[string]*matrix.Matrix, error) {
return nil, errs.New(errs.ErrConfig, "empty url")
}

apiT := time.Now()
records, err = rest.Fetch(r.Client, href)
if err != nil {
return r.handleError(err, href)
}

return r.pollInstance(records)
return r.pollInstance(records, time.Since(apiT))
}

func (r *RestPerf) pollInstance(records []gjson.Result) (map[string]*matrix.Matrix, error) {
func (r *RestPerf) pollInstance(records []gjson.Result, apiD time.Duration) (map[string]*matrix.Matrix, error) {
var (
err error
oldInstances *set.Set
Expand All @@ -1383,6 +1394,7 @@ func (r *RestPerf) pollInstance(records []gjson.Result) (map[string]*matrix.Matr

mat := r.Matrix[r.Object]
oldInstances = set.New()
parseT := time.Now()
for key := range mat.GetInstances() {
oldInstances.Add(key)
}
Expand Down Expand Up @@ -1450,7 +1462,12 @@ func (r *RestPerf) pollInstance(records []gjson.Result) (map[string]*matrix.Matr
newSize = len(mat.GetInstances())
added = newSize - (oldSize - removed)

r.Logger.Debug().Msgf("added %d new, removed %d (total instances %d)", added, removed, newSize)
r.Logger.Debug().Int("new", added).Int("removed", removed).Int("total", newSize).Msg("instances")

// update metadata for collector logs
_ = r.Metadata.LazySetValueInt64("api_time", "instance", apiD.Microseconds())
_ = r.Metadata.LazySetValueInt64("parse_time", "instance", time.Since(parseT).Microseconds())
_ = r.Metadata.LazySetValueUint64("instances", "instance", uint64(newSize))

if newSize == 0 {
return nil, errs.New(errs.ErrNoInstance, "")
Expand Down
14 changes: 7 additions & 7 deletions cmd/collectors/restperf/restperf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ func TestMain(m *testing.M) {

benchPerf = newRestPerf("Volume", "volume.yaml")
counters := jsonToPerfRecords("testdata/volume-counters.json")
_, _ = benchPerf.pollCounter(counters[0].Records.Array())
_, _ = benchPerf.pollCounter(counters[0].Records.Array(), 0)
now := time.Now().Truncate(time.Second)
propertiesData = jsonToPerfRecords("testdata/volume-poll-properties.json.gz")
fullPollData = jsonToPerfRecords("testdata/volume-poll-full.json.gz")
fullPollData[0].Timestamp = now.UnixNano()
_, _ = benchPerf.pollInstance(propertiesData[0].Records.Array())
_, _ = benchPerf.pollInstance(propertiesData[0].Records.Array(), 0)
_, _ = benchPerf.pollData(now, fullPollData)

os.Exit(m.Run())
Expand All @@ -97,7 +97,7 @@ func BenchmarkRestPerf_PollData(b *testing.B) {
for i := 0; i < b.N; i++ {
now = now.Add(time.Minute * 15)
fullPollData[0].Timestamp = now.UnixNano()
mi, _ := benchPerf.pollInstance(propertiesData[0].Records.Array())
mi, _ := benchPerf.pollInstance(propertiesData[0].Records.Array(), 0)
for _, mm := range mi {
ms = append(ms, mm)
}
Expand Down Expand Up @@ -142,13 +142,13 @@ func TestRestPerf_pollData(t *testing.T) {
r := newRestPerf("Volume", "volume.yaml")

counters := jsonToPerfRecords(tt.pollCounters)
_, err := r.pollCounter(counters[0].Records.Array())
_, err := r.pollCounter(counters[0].Records.Array(), 0)
if err != nil {
t.Fatal(err)
}
pollInstance := jsonToPerfRecords(tt.pollInstance)
pollData := jsonToPerfRecords(tt.pollDataPath1)
_, err = r.pollInstance(pollInstance[0].Records.Array())
_, err = r.pollInstance(pollInstance[0].Records.Array(), 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -277,13 +277,13 @@ func TestQosVolume(t *testing.T) {
r := newRestPerf("WorkloadVolume", "workload_volume.yaml")

counters := jsonToPerfRecords(tt.pollCounters)
_, err := r.pollCounter(counters[0].Records.Array())
_, err := r.pollCounter(counters[0].Records.Array(), 0)
if err != nil {
t.Fatal(err)
}

pollInst := jsonToPerfRecords(tt.pollInstance)
_, err = r.pollInstance(pollInst[0].Records.Array())
_, err = r.pollInstance(pollInst[0].Records.Array(), 0)
if err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 5 additions & 10 deletions cmd/collectors/zapi/collector/zapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,17 +255,12 @@ func (z *Zapi) PollData() (map[string]*matrix.Matrix, error) {
count, skipped uint64
tag string
err error
apiD, parseD time.Duration // Request/API time, Parse time, Fetch time
ad, pd time.Duration // Request/API time, Parse time, Fetch time
fetch func(*matrix.Instance, *node.Node, []string, bool)
instances []*node.Node
)

count = 0
skipped = 0

apiT := 0 * time.Second
parseT := 0 * time.Second

oldInstances := set.New()
mat := z.Matrix[z.Object]
// copy keys of current instances. This is used to remove deleted instances from matrix later
Expand Down Expand Up @@ -352,8 +347,8 @@ func (z *Zapi) PollData() (map[string]*matrix.Matrix, error) {
break
}

apiT += ad
parseT += pd
apiD += ad
parseD += pd

instances = response.SearchChildren(z.shortestPathPrefix)

Expand Down Expand Up @@ -409,8 +404,8 @@ func (z *Zapi) PollData() (map[string]*matrix.Matrix, error) {

numInstances := len(mat.GetInstances())
// update metadata
_ = z.Metadata.LazySetValueInt64("api_time", "data", apiT.Microseconds())
_ = z.Metadata.LazySetValueInt64("parse_time", "data", parseT.Microseconds())
_ = z.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = z.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = z.Metadata.LazySetValueUint64("metrics", "data", count)
_ = z.Metadata.LazySetValueUint64("instances", "data", uint64(numInstances))
z.AddCollectCount(count)
Expand Down
Loading