Skip to content

Commit

Permalink
feat: add streaming to keyperf collector (#3435)
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulguptajss authored Jan 23, 2025
1 parent de48ebc commit 8a798a1
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 64 deletions.
136 changes: 76 additions & 60 deletions cmd/collectors/keyperf/keyperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/netapp/harvest/v2/cmd/collectors/restperf/plugins/volumetopmetrics"
"github.com/netapp/harvest/v2/cmd/poller/collector"
"github.com/netapp/harvest/v2/cmd/poller/plugin"
rest2 "github.com/netapp/harvest/v2/cmd/tools/rest"
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/errs"
"github.com/netapp/harvest/v2/pkg/matrix"
Expand Down Expand Up @@ -236,10 +237,14 @@ func (kp *KeyPerf) buildCounters() {

func (kp *KeyPerf) PollData() (map[string]*matrix.Matrix, error) {
var (
err error
perfRecords []gjson.Result
startTime time.Time
apiD, parseD time.Duration
metricCount uint64
numPartials uint64
startTime time.Time
prevMat *matrix.Matrix
curMat *matrix.Matrix
)

startTime = time.Now()
kp.Client.Metadata.Reset()

Expand Down Expand Up @@ -267,14 +272,74 @@ func (kp *KeyPerf) PollData() (map[string]*matrix.Matrix, error) {
}
}

perfRecords, err = kp.GetRestData(href, headers)
// Track old instances before processing batches
oldInstances := set.New()
for key := range kp.Matrix[kp.Object].GetInstances() {
oldInstances.Add(key)
}

prevMat = kp.Matrix[kp.Object]
// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()

processBatch := func(perfRecords []gjson.Result) error {
if len(perfRecords) == 0 {
return nil
}

// Process the current batch of records
count, np, batchParseD := kp.processPerfRecords(perfRecords, curMat, oldInstances)
numPartials += np
metricCount += count
parseD += batchParseD
return nil
}

if err := rest2.FetchAllStream(kp.Client, kp.Prop.Href, processBatch, headers); err != nil {
return nil, fmt.Errorf("failed to fetch data: %w", err)
}
apiD += time.Since(startTime)

if err != nil {
return nil, fmt.Errorf("failed to fetch href=%s %w", href, err)
return nil, err
}

return kp.pollData(startTime, perfRecords, func(e *rest.EndPoint) ([]gjson.Result, time.Duration, error) {
return kp.ProcessEndPoint(e)
})
// Process endpoints after all batches have been processed
eCount, endpointAPID := kp.ProcessEndPoints(curMat, kp.ProcessEndPoint, oldInstances)
metricCount += eCount
apiD += endpointAPID

// Remove old instances that are not found in new instances
for key := range oldInstances.Iter() {
curMat.RemoveInstance(key)
}

_ = kp.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = kp.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = kp.Metadata.LazySetValueUint64("metrics", "data", metricCount)
_ = kp.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = kp.Metadata.LazySetValueUint64("bytesRx", "data", kp.Client.Metadata.BytesRx)
_ = kp.Metadata.LazySetValueUint64("numCalls", "data", kp.Client.Metadata.NumCalls)
_ = kp.Metadata.LazySetValueUint64("numPartials", "data", numPartials)

kp.AddCollectCount(metricCount)

return kp.cookCounters(curMat, prevMat)
}

func (kp *KeyPerf) processPerfRecords(perfRecords []gjson.Result, curMat *matrix.Matrix, oldInstances *set.Set) (uint64, uint64, time.Duration) {
var (
count uint64
parseD time.Duration
numPartials uint64
)
startTime := time.Now()

count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false, oldInstances)

parseD = time.Since(startTime)
return count, numPartials, parseD
}

// validateMatrix ensures that the previous matrix (prevMat) contains all the metrics present in the current matrix (curMat).
Expand All @@ -297,57 +362,11 @@ func (kp *KeyPerf) validateMatrix(prevMat *matrix.Matrix, curMat *matrix.Matrix)
return nil
}

func (kp *KeyPerf) pollData(
startTime time.Time,
perfRecords []gjson.Result,
endpointFunc func(e *rest.EndPoint) ([]gjson.Result, time.Duration, error),
) (map[string]*matrix.Matrix, error) {
func (kp *KeyPerf) cookCounters(curMat *matrix.Matrix, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, error) {
var (
count uint64
apiD, parseD time.Duration
err error
skips int
numPartials uint64
instIndex int
prevMat *matrix.Matrix
curMat *matrix.Matrix
err error
skips int
)

prevMat = kp.Matrix[kp.Object]
// Track old instances before processing batches
oldInstances := set.New()
for key := range prevMat.GetInstances() {
oldInstances.Add(key)
}

// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()

apiD = time.Since(startTime)

startTime = time.Now()

if len(perfRecords) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+kp.Object+" instances on cluster")
}
count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false, oldInstances)

// process endpoints
eCount, endpointAPID := kp.ProcessEndPoints(curMat, endpointFunc, oldInstances)
count += eCount

parseD = time.Since(startTime)
_ = kp.Metadata.LazySetValueInt64("api_time", "data", (apiD + endpointAPID).Microseconds())
_ = kp.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = kp.Metadata.LazySetValueUint64("metrics", "data", count)
_ = kp.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = kp.Metadata.LazySetValueUint64("bytesRx", "data", kp.Client.Metadata.BytesRx)
_ = kp.Metadata.LazySetValueUint64("numCalls", "data", kp.Client.Metadata.NumCalls)
_ = kp.Metadata.LazySetValueUint64("numPartials", "data", numPartials)

kp.AddCollectCount(count)

// skip calculating from delta if no data from previous poll
if kp.perfProp.isCacheEmpty {
kp.Logger.Debug("skip postprocessing until next poll (previous cache empty)")
Expand Down Expand Up @@ -454,7 +473,6 @@ func (kp *KeyPerf) pollData(
slog.String("key", key),
slog.String("property", property),
slog.String("denominator", counter.denominator),
slog.Int("instIndex", instIndex),
)
skips = curMat.Skip(key)
totalSkips += skips
Expand Down Expand Up @@ -499,7 +517,6 @@ func (kp *KeyPerf) pollData(
"Unknown property",
slog.String("key", key),
slog.String("property", property),
slog.Int("instIndex", instIndex),
)
}

Expand All @@ -519,7 +536,6 @@ func (kp *KeyPerf) pollData(
slogx.Err(err),
slog.Int("i", i),
slog.String("key", key),
slog.Int("instIndex", instIndex),
)
continue
}
Expand Down
17 changes: 13 additions & 4 deletions cmd/collectors/keyperf/keyperf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"github.com/netapp/harvest/v2/cmd/poller/options"
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/matrix"
"github.com/netapp/harvest/v2/pkg/set"
"github.com/netapp/harvest/v2/pkg/tree"
"github.com/netapp/harvest/v2/pkg/tree/node"
"github.com/netapp/harvest/v2/third_party/tidwall/gjson"
"sort"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -65,16 +66,16 @@ func TestPartialAggregationSequence(t *testing.T) {

func (kp *KeyPerf) testPollInstanceAndDataWithMetrics(t *testing.T, pollDataFile string, expectedExportedInst, expectedExportedMetrics int) *matrix.Matrix {
// Additional logic to count metrics
prevMat := kp.Matrix[kp.Object]
pollData := collectors.JSONToGson(pollDataFile, true)
now := time.Now().Truncate(time.Second)
data, err := kp.pollData(now, pollData, nil)
got, _, err := processAndCookCounters(kp, pollData, prevMat)
if err != nil {
t.Fatal(err)
}

totalMetrics := 0
exportableInstance := 0
mat := data[kp.Object]
mat := got[kp.Object]
if mat != nil {
for _, instance := range mat.GetInstances() {
if instance.IsExportable() {
Expand Down Expand Up @@ -105,6 +106,14 @@ func (kp *KeyPerf) testPollInstanceAndDataWithMetrics(t *testing.T, pollDataFile
return mat
}

func processAndCookCounters(kp *KeyPerf, pollData []gjson.Result, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, uint64, error) {
curMat := prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()
metricCount, _, _ := kp.processPerfRecords(pollData, curMat, set.New())
got, err := kp.cookCounters(curMat, prevMat)
return got, metricCount, err
}

func TestKeyPerf_pollData(t *testing.T) {
conf.TestLoadHarvestConfig("testdata/config.yml")
tests := []struct {
Expand Down

0 comments on commit 8a798a1

Please sign in to comment.