Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add streaming to keyperf collector #3435

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 76 additions & 60 deletions cmd/collectors/keyperf/keyperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/netapp/harvest/v2/cmd/collectors/restperf/plugins/volumetopmetrics"
"github.com/netapp/harvest/v2/cmd/poller/collector"
"github.com/netapp/harvest/v2/cmd/poller/plugin"
rest2 "github.com/netapp/harvest/v2/cmd/tools/rest"
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/errs"
"github.com/netapp/harvest/v2/pkg/matrix"
Expand Down Expand Up @@ -236,10 +237,14 @@ func (kp *KeyPerf) buildCounters() {

func (kp *KeyPerf) PollData() (map[string]*matrix.Matrix, error) {
var (
err error
perfRecords []gjson.Result
startTime time.Time
apiD, parseD time.Duration
metricCount uint64
numPartials uint64
startTime time.Time
prevMat *matrix.Matrix
curMat *matrix.Matrix
)

startTime = time.Now()
kp.Client.Metadata.Reset()

Expand Down Expand Up @@ -267,14 +272,74 @@ func (kp *KeyPerf) PollData() (map[string]*matrix.Matrix, error) {
}
}

perfRecords, err = kp.GetRestData(href, headers)
// Track old instances before processing batches
oldInstances := set.New()
for key := range kp.Matrix[kp.Object].GetInstances() {
oldInstances.Add(key)
}

prevMat = kp.Matrix[kp.Object]
// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()

processBatch := func(perfRecords []gjson.Result) error {
if len(perfRecords) == 0 {
return nil
}

// Process the current batch of records
count, np, batchParseD := kp.processPerfRecords(perfRecords, curMat, oldInstances)
numPartials += np
metricCount += count
parseD += batchParseD
return nil
}

if err := rest2.FetchAllStream(kp.Client, kp.Prop.Href, processBatch, headers); err != nil {
return nil, fmt.Errorf("failed to fetch data: %w", err)
}
apiD += time.Since(startTime)

if err != nil {
return nil, fmt.Errorf("failed to fetch href=%s %w", href, err)
return nil, err
}

return kp.pollData(startTime, perfRecords, func(e *rest.EndPoint) ([]gjson.Result, time.Duration, error) {
return kp.ProcessEndPoint(e)
})
// Process endpoints after all batches have been processed
eCount, endpointAPID := kp.ProcessEndPoints(curMat, kp.ProcessEndPoint, oldInstances)
metricCount += eCount
apiD += endpointAPID

// Remove old instances that are not found in new instances
for key := range oldInstances.Iter() {
curMat.RemoveInstance(key)
}

_ = kp.Metadata.LazySetValueInt64("api_time", "data", apiD.Microseconds())
_ = kp.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = kp.Metadata.LazySetValueUint64("metrics", "data", metricCount)
_ = kp.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = kp.Metadata.LazySetValueUint64("bytesRx", "data", kp.Client.Metadata.BytesRx)
_ = kp.Metadata.LazySetValueUint64("numCalls", "data", kp.Client.Metadata.NumCalls)
_ = kp.Metadata.LazySetValueUint64("numPartials", "data", numPartials)

kp.AddCollectCount(metricCount)

return kp.cookCounters(curMat, prevMat)
}

func (kp *KeyPerf) processPerfRecords(perfRecords []gjson.Result, curMat *matrix.Matrix, oldInstances *set.Set) (uint64, uint64, time.Duration) {
var (
count uint64
parseD time.Duration
numPartials uint64
)
startTime := time.Now()

count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false, oldInstances)

parseD = time.Since(startTime)
return count, numPartials, parseD
}

// validateMatrix ensures that the previous matrix (prevMat) contains all the metrics present in the current matrix (curMat).
Expand All @@ -297,57 +362,11 @@ func (kp *KeyPerf) validateMatrix(prevMat *matrix.Matrix, curMat *matrix.Matrix)
return nil
}

func (kp *KeyPerf) pollData(
startTime time.Time,
perfRecords []gjson.Result,
endpointFunc func(e *rest.EndPoint) ([]gjson.Result, time.Duration, error),
) (map[string]*matrix.Matrix, error) {
func (kp *KeyPerf) cookCounters(curMat *matrix.Matrix, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, error) {
var (
count uint64
apiD, parseD time.Duration
err error
skips int
numPartials uint64
instIndex int
prevMat *matrix.Matrix
curMat *matrix.Matrix
err error
skips int
)

prevMat = kp.Matrix[kp.Object]
// Track old instances before processing batches
oldInstances := set.New()
for key := range prevMat.GetInstances() {
oldInstances.Add(key)
}

// clone matrix without numeric data
curMat = prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()

apiD = time.Since(startTime)

startTime = time.Now()

if len(perfRecords) == 0 {
return nil, errs.New(errs.ErrNoInstance, "no "+kp.Object+" instances on cluster")
}
count, numPartials = kp.HandleResults(curMat, perfRecords, kp.Prop, false, oldInstances)

// process endpoints
eCount, endpointAPID := kp.ProcessEndPoints(curMat, endpointFunc, oldInstances)
count += eCount

parseD = time.Since(startTime)
_ = kp.Metadata.LazySetValueInt64("api_time", "data", (apiD + endpointAPID).Microseconds())
_ = kp.Metadata.LazySetValueInt64("parse_time", "data", parseD.Microseconds())
_ = kp.Metadata.LazySetValueUint64("metrics", "data", count)
_ = kp.Metadata.LazySetValueUint64("instances", "data", uint64(len(curMat.GetInstances())))
_ = kp.Metadata.LazySetValueUint64("bytesRx", "data", kp.Client.Metadata.BytesRx)
_ = kp.Metadata.LazySetValueUint64("numCalls", "data", kp.Client.Metadata.NumCalls)
_ = kp.Metadata.LazySetValueUint64("numPartials", "data", numPartials)

kp.AddCollectCount(count)

// skip calculating from delta if no data from previous poll
if kp.perfProp.isCacheEmpty {
kp.Logger.Debug("skip postprocessing until next poll (previous cache empty)")
Expand Down Expand Up @@ -454,7 +473,6 @@ func (kp *KeyPerf) pollData(
slog.String("key", key),
slog.String("property", property),
slog.String("denominator", counter.denominator),
slog.Int("instIndex", instIndex),
)
skips = curMat.Skip(key)
totalSkips += skips
Expand Down Expand Up @@ -499,7 +517,6 @@ func (kp *KeyPerf) pollData(
"Unknown property",
slog.String("key", key),
slog.String("property", property),
slog.Int("instIndex", instIndex),
)
}

Expand All @@ -519,7 +536,6 @@ func (kp *KeyPerf) pollData(
slogx.Err(err),
slog.Int("i", i),
slog.String("key", key),
slog.Int("instIndex", instIndex),
)
continue
}
Expand Down
17 changes: 13 additions & 4 deletions cmd/collectors/keyperf/keyperf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"github.com/netapp/harvest/v2/cmd/poller/options"
"github.com/netapp/harvest/v2/pkg/conf"
"github.com/netapp/harvest/v2/pkg/matrix"
"github.com/netapp/harvest/v2/pkg/set"
"github.com/netapp/harvest/v2/pkg/tree"
"github.com/netapp/harvest/v2/pkg/tree/node"
"github.com/netapp/harvest/v2/third_party/tidwall/gjson"
"sort"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -65,16 +66,16 @@ func TestPartialAggregationSequence(t *testing.T) {

func (kp *KeyPerf) testPollInstanceAndDataWithMetrics(t *testing.T, pollDataFile string, expectedExportedInst, expectedExportedMetrics int) *matrix.Matrix {
// Additional logic to count metrics
prevMat := kp.Matrix[kp.Object]
pollData := collectors.JSONToGson(pollDataFile, true)
now := time.Now().Truncate(time.Second)
data, err := kp.pollData(now, pollData, nil)
got, _, err := processAndCookCounters(kp, pollData, prevMat)
if err != nil {
t.Fatal(err)
}

totalMetrics := 0
exportableInstance := 0
mat := data[kp.Object]
mat := got[kp.Object]
if mat != nil {
for _, instance := range mat.GetInstances() {
if instance.IsExportable() {
Expand Down Expand Up @@ -105,6 +106,14 @@ func (kp *KeyPerf) testPollInstanceAndDataWithMetrics(t *testing.T, pollDataFile
return mat
}

func processAndCookCounters(kp *KeyPerf, pollData []gjson.Result, prevMat *matrix.Matrix) (map[string]*matrix.Matrix, uint64, error) {
curMat := prevMat.Clone(matrix.With{Data: false, Metrics: true, Instances: true, ExportInstances: true})
curMat.Reset()
metricCount, _, _ := kp.processPerfRecords(pollData, curMat, set.New())
got, err := kp.cookCounters(curMat, prevMat)
return got, metricCount, err
}

func TestKeyPerf_pollData(t *testing.T) {
conf.TestLoadHarvestConfig("testdata/config.yml")
tests := []struct {
Expand Down
Loading