Skip to content

Commit

Permalink
Fixes #RHINENG-14713 - Error if no valid workload_type found
Browse files Browse the repository at this point in the history
  • Loading branch information
upadhyeammit committed Dec 18, 2024
1 parent d50df50 commit 2c71a47
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 41 deletions.
41 changes: 29 additions & 12 deletions internal/utils/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ func Aggregate_data(df dataframe.DataFrame) (dataframe.DataFrame, error) {
if df.Nrow() == 0 {
return df, fmt.Errorf("no valid records present in CSV to process further")
}

df = determine_k8s_object_type(df)

// filter out only valid workload type
df = filter_valid_k8s_object_types(df)

dfGroups := df.GroupBy(
"namespace",
"k8s_object_type",
Expand Down Expand Up @@ -85,8 +81,8 @@ func Aggregate_data(df dataframe.DataFrame) (dataframe.DataFrame, error) {
return df, nil
}

func filter_valid_csv_records(main_df dataframe.DataFrame) (dataframe.DataFrame, int) {
df := main_df.FilterAggregation(
func filter_valid_csv_records(mainDf dataframe.DataFrame) (dataframe.DataFrame, int) {
df := mainDf.FilterAggregation(
dataframe.And,
dataframe.F{Colname: "memory_rss_usage_container_sum", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_rss_usage_container_max", Comparator: series.GreaterEq, Comparando: 0},
Expand All @@ -104,17 +100,34 @@ func filter_valid_csv_records(main_df dataframe.DataFrame) (dataframe.DataFrame,
dataframe.F{Colname: "owner_name", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "owner_kind", Comparator: series.Neq, Comparando: "<none>"},
dataframe.F{Colname: "owner_name", Comparator: series.Neq, Comparando: "<none>"},
dataframe.F{Colname: "workload_type", Comparator: series.Neq, Comparando: "<none>"},
dataframe.F{Colname: "workload_type", Comparator: series.Neq, Comparando: ""},
)

no_of_dropped_records := main_df.Nrow() - df.Nrow()
// The above filters can delete all the rows
if df.Nrow() == 0 {
return df, mainDf.Nrow()
}

return df, no_of_dropped_records
}
// Change the case of all workload_type to lowercase
lcaseWorkloadTypes := df.Rapply(func(s series.Series) series.Series {
columns := df.Names()
indexOfWorkloadType := findInStringSlice("workload_type", columns)
workloadType := s.Elem(indexOfWorkloadType).String()
lcaseWorkloadType := strings.ToLower(workloadType)
return series.Strings([]string{lcaseWorkloadType})
})

func filter_valid_k8s_object_types(df dataframe.DataFrame) dataframe.DataFrame {
return df.Filter(
// Delete existing workload_type column
df = df.Mutate(df.Col("workload_type")).Drop("workload_type")

// Rename lowercase converted column to workload_type
df = df.Mutate(lcaseWorkloadTypes.Col("X0")).Rename("workload_type", "X0")

df = df.FilterAggregation(
dataframe.And,
dataframe.F{
Colname: "k8s_object_type",
Colname: "workload_type",
Comparator: series.In,
Comparando: []string{
w.Daemonset.String(),
Expand All @@ -125,6 +138,10 @@ func filter_valid_k8s_object_types(df dataframe.DataFrame) dataframe.DataFrame {
w.Statefulset.String(),
}},
)

noOfDroppedRecords := mainDf.Nrow() - df.Nrow()

return df, noOfDroppedRecords
}

func determine_k8s_object_type(df dataframe.DataFrame) dataframe.DataFrame {
Expand Down
81 changes: 52 additions & 29 deletions internal/utils/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package utils

import (
"fmt"
"testing"

"github.com/go-gota/gota/dataframe"
Expand Down Expand Up @@ -47,9 +46,36 @@ type UsageData struct {
Memory_rss_usage_container_sum string `dataframe:"memory_rss_usage_container_sum,float"`
}

func Test_filter_valid_k8s_object_types(t *testing.T) {
// Check valid k8s object type
func Test_filter_valid_csv_records(t *testing.T) {
usage_data := []UsageData{
// k8s object with missing data
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "",
},
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "", "", "", "", "1", "1", "1", "1", "1", "1", "1", "", "", "", "", "", "", "", "",
},
// k8s object with 0 CPU, Memory and RSS usage
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "0", "0", "0", "0", "1", "1", "1", "1", "1", "1", "1", "0", "0", "0", "0", "0", "0", "0", "0",
},
}
df := dataframe.LoadStructs(usage_data)
result, no_of_dropped_records := filter_valid_csv_records(df)
if result.Nrow() != 1 || no_of_dropped_records != 2 {
t.Error("Invalid k8s object type did not get dropped")
}

usage_data = []UsageData{
// k8s object type DaemonSet
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
Expand Down Expand Up @@ -93,10 +119,8 @@ func Test_filter_valid_k8s_object_types(t *testing.T) {
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
}
df := dataframe.LoadStructs(usage_data)
df = determine_k8s_object_type(df)
result := filter_valid_k8s_object_types(df)
fmt.Println(result.Nrow())
df = dataframe.LoadStructs(usage_data)
result, _ = filter_valid_csv_records(df)
if result.Nrow() != 6 {
t.Error("Data not filtered properly. Some of the valid k8s object type got dropped")
}
Expand All @@ -112,43 +136,31 @@ func Test_filter_valid_k8s_object_types(t *testing.T) {
},
}
df = dataframe.LoadStructs(usage_data)
df = determine_k8s_object_type(df)
result = filter_valid_k8s_object_types(df)
result, _ = filter_valid_csv_records(df)
if result.Nrow() != 0 {
t.Error("Invalid k8s object type did not get dropped")
}
}

func Test_filter_valid_csv_records(t *testing.T) {
usage_data := []UsageData{
// k8s object with missing data
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "",
},
// check if empty workload_type is dropped
usage_data = []UsageData{
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "", "", "", "", "1", "1", "1", "1", "1", "1", "1", "", "", "", "", "", "", "", "",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
// k8s object with 0 CPU, Memory and RSS usage
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "<none>", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "0", "0", "0", "0", "1", "1", "1", "1", "1", "1", "1", "0", "0", "0", "0", "0", "0", "0", "0",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
}
df := dataframe.LoadStructs(usage_data)
df = determine_k8s_object_type(df)
result, no_of_dropped_records := filter_valid_csv_records(df)
if result.Nrow() != 1 || no_of_dropped_records != 2 {
df = dataframe.LoadStructs(usage_data)
result, _ = filter_valid_csv_records(df)
if result.Nrow() != 0 {
t.Error("Invalid k8s object type did not get dropped")
}

}

func Test_check_if_all_required_columns_in_CSV(t *testing.T) {
Expand Down Expand Up @@ -178,3 +190,14 @@ func Test_check_if_all_required_columns_in_CSV(t *testing.T) {
t.Error("Expecting error to be returned as all required column not present")
}
}

func TestAggregateDataNoRecords(t *testing.T) {
usage_data := []UsageData{}

// The function should not panic when none of the rows are valid
df := dataframe.LoadStructs(usage_data)
_, err := Aggregate_data(df)
if err == nil {
t.Error("Expecting error to be returned when all rows are invalid")
}
}

0 comments on commit 2c71a47

Please sign in to comment.