From dbd0e1612f1c1088c3aafb2e0f6d64e3eb89da64 Mon Sep 17 00:00:00 2001 From: Shreya Date: Mon, 11 Nov 2024 09:23:58 +0530 Subject: [PATCH 1/5] Add MetricLabels class to store metric labels --- .../common/data/metrics/MetricLabels.java | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 src/main/java/com/autotune/common/data/metrics/MetricLabels.java diff --git a/src/main/java/com/autotune/common/data/metrics/MetricLabels.java b/src/main/java/com/autotune/common/data/metrics/MetricLabels.java new file mode 100644 index 000000000..767048cc4 --- /dev/null +++ b/src/main/java/com/autotune/common/data/metrics/MetricLabels.java @@ -0,0 +1,138 @@ +package com.autotune.common.data.metrics; + +/** + * + * This class stores labels such as container, pod, namespace, owner and workload kind/name from the output of a query. + * + * Example query output for imageOwners metric + * { + * "metric": { + * "container": "openshift-apiserver", + * "pod": "apiserver-5bfcb9858b-57pqc", + * "namespace": "openshift-apiserver", + * "owner_kind": "ReplicaSet", + * "owner_name": "apiserver-5bfcb9858b" + * }, + * "values": [ + * [ + * 1730691554, + * "1" + * ] + * ] + * } + */ +public class MetricLabels { + private String name; + private String container; + private String namespace; + private String pod; + private String owner_kind; + private String owner_name; + private String workload_kind; + private String workload_name; + + // Constructor with a flag to distinguish between owner_kind/owner_name and workload_kind/workload_name + public MetricLabels(String name, String container, String namespace, String ownerOrWorkloadKind, + String ownerOrWorkloadName, String pod, boolean isOwner) { + this.name = name; + this.container = container; + this.namespace = namespace; + this.pod = pod; + + if (isOwner) { + // field3 and field4 are owner_kind and owner_name + this.owner_kind = ownerOrWorkloadKind; + this.owner_name = ownerOrWorkloadName; + } else { + // field3 and field4 are value1 and value2 + this.workload_kind = ownerOrWorkloadKind; + this.workload_name = ownerOrWorkloadName; + } + } + + public MetricLabels(String name, String container, String namespace, String pod) { + this.name = name; + this.container = container; + this.namespace = namespace; + this.pod = pod; + } + + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getContainer() { + return container; + } + + public void setContainer(String container){ + this.container = container; + } + + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getPod() { + return pod; + } + + public void setPod(String pod) { + this.pod = pod; + } + + public String getOwner_kind() { + return owner_kind; + } + + public void setOwner_kind(String owner_kind) { + this.owner_kind = owner_kind; + } + + public String getOwner_name() { + return owner_name; + } + + public void setOwner_name(String owner_name) { + this.owner_name = owner_name; + } + + public String getWorkload_kind() { + return workload_kind; + } + + public void setWorkload_kind(String workload_kind) { + this.workload_kind = workload_kind; + } + + public String getWorkload_name() { + return workload_name; + } + + public void setWorkload_name(String workload_name) { + this.workload_name = workload_name; + } + + @Override + public String toString() { + return "MetricLabels{" + + "container='" + container + '\'' + + ", namespace='" + namespace + '\'' + + ", pod='" + pod + '\'' + + ", owner_kind='" + owner_kind + '\'' + + ", owner_name='" + owner_name + '\'' + + ", workload_kind='" + workload_kind + '\'' + + ", workload_name='" + workload_name + '\'' + + '}'; + } +} \ No newline at end of file From 1d85faf12f68b4274a23dc2d4c3bfadc00df64f9 Mon Sep 17 00:00:00 2001 From: Shreya Date: Mon, 11 Nov 2024 09:25:32 +0530 Subject: [PATCH 2/5] Add image owner and workload constants --- .../java/com/autotune/analyzer/utils/AnalyzerConstants.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/autotune/analyzer/utils/AnalyzerConstants.java b/src/main/java/com/autotune/analyzer/utils/AnalyzerConstants.java index 740bb859a..2ab5402c3 100644 --- a/src/main/java/com/autotune/analyzer/utils/AnalyzerConstants.java +++ b/src/main/java/com/autotune/analyzer/utils/AnalyzerConstants.java @@ -188,6 +188,8 @@ public enum MetricName { memoryUsage, memoryRSS, maxDate, + imageOwners, + imageWorkloads, namespaceCpuRequest, namespaceCpuLimit, namespaceCpuUsage, From 1e660b7410eb9e7cda21018d96b99597351776d6 Mon Sep 17 00:00:00 2001 From: Shreya Date: Mon, 11 Nov 2024 10:00:10 +0530 Subject: [PATCH 3/5] Update container recommendations by aggregating metric data by pods --- .../engine/RecommendationEngine.java | 655 +++++++++++++++++- 1 file changed, 632 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java index f157550a4..397c08f64 100644 --- a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java +++ b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java @@ -42,8 +42,11 @@ import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletResponse; +import java.io.*; import java.lang.reflect.Method; import java.net.URLEncoder; +import java.nio.file.Files; +import java.nio.file.Paths; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.*; @@ -1971,7 +1974,7 @@ private void fetchNamespaceMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz // Prepare interval results prepareIntervalResults(namespaceDataResults, namespaceIntervalResults, namespaceResMap, namespaceMetricResults, - namespaceMetricAggregationInfoResults, sTime, eTime, metricEntry, aggregationFunctionsEntry, value, format); + namespaceMetricAggregationInfoResults, sTime, eTime, metricEntry, "", aggregationFunctionsEntry, value, format); } } } catch (Exception e) { @@ -2090,8 +2093,15 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz interval_start_time_epoc = interval_start_time.getTime() / KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC - ((long) interval_start_time.getTimezoneOffset() * KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC); } + // >> + HashMap> containerDataResultsByPod = new HashMap<>(); HashMap containerDataResults = new HashMap<>(); + + // >> + HashMap>> metricLabelMapByNameAndPod = new HashMap<>(); + IntervalResults intervalResults = null; + HashMap intervalResultsMap = null; HashMap resMap = null; HashMap acceleratorMetricResultHashMap; MetricResults metricResults = null; @@ -2275,25 +2285,52 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz } } } else { - resultArray = jsonObject.getAsJsonObject(KruizeConstants.JSONKeys.DATA).getAsJsonArray( - KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.RESULT).get(0) - .getAsJsonObject().getAsJsonArray(KruizeConstants.DataSourceConstants - .DataSourceQueryJSONKeys.VALUES); - sdf.setTimeZone(TimeZone.getTimeZone(KruizeConstants.TimeUnitsExt.TimeZones.UTC)); + JsonObject metricObject; + MetricLabels metricLabels; + HashMap imageMetricLabelsMap = new HashMap<>(); + + for (JsonElement jsonElement : resultArray) { + metricObject = jsonElement.getAsJsonObject().get(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.METRIC).getAsJsonObject(); + String containerLabel = metricObject.get(KruizeConstants.JSONKeys.CONTAINER).getAsString(); + String namespaceLabel = metricObject.get(KruizeConstants.JSONKeys.NAMESPACE).getAsString(); + String podLabel = metricObject.get("pod").getAsString(); //TODO + + if (metricEntry.getName().equals("imageOwners")) { + // Extract fields from the metric object + String owner_kindLabel = metricObject.get("owner_kind").getAsString(); + String owner_nameLabel = metricObject.get("owner_name").getAsString(); + metricLabels = new MetricLabels(metricEntry.getName(), containerLabel, namespaceLabel, owner_kindLabel, owner_nameLabel, podLabel, true); + } else if (metricEntry.getName().equals("imageWorkloads")) { + String workload_kindLabel = metricObject.get("workload").getAsString(); + String workload_nameLabel = metricObject.get("workload_type").getAsString(); + metricLabels = new MetricLabels(metricEntry.getName(), containerLabel, namespaceLabel, workload_kindLabel, workload_nameLabel, podLabel, false); + } else { + metricLabels = new MetricLabels(metricEntry.getName(), containerLabel, namespaceLabel, podLabel); + } + imageMetricLabelsMap.put(podLabel, metricLabels); - // Iterate over fetched metrics - Timestamp sTime = new Timestamp(interval_start_time_epoc); - for (JsonElement element : resultArray) { - JsonArray valueArray = element.getAsJsonArray(); - long epochTime = valueArray.get(0).getAsLong(); - double value = valueArray.get(1).getAsDouble(); - String timestamp = sdf.format(new Date(epochTime * KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC)); - Date date = sdf.parse(timestamp); - Timestamp eTime = new Timestamp(date.getTime()); + JsonArray valueResultArray = jsonElement.getAsJsonObject().getAsJsonArray(KruizeConstants.DataSourceConstants + .DataSourceQueryJSONKeys.VALUES); + sdf.setTimeZone(TimeZone.getTimeZone(KruizeConstants.TimeUnitsExt.TimeZones.UTC)); - // Prepare interval results - prepareIntervalResults(containerDataResults, intervalResults, resMap, metricResults, - metricAggregationInfoResults, sTime, eTime, metricEntry, aggregationFunctionsEntry, value, format); + // Iterate over fetched metrics + //Timestamp sTime = new Timestamp(interval_start_time_epoc); + String start_timestamp = sdf.format(new Date(interval_start_time_epoc * KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC)); + Date start_date = sdf.parse(start_timestamp); + Timestamp sTime = new Timestamp(start_date.getTime()); + for (JsonElement element : valueResultArray) { + JsonArray valueArray = element.getAsJsonArray(); + long epochTime = valueArray.get(0).getAsLong(); + double value = valueArray.get(1).getAsDouble(); + String timestamp = sdf.format(new Date(epochTime * KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC)); + Date date = sdf.parse(timestamp); + Timestamp eTime = new Timestamp(date.getTime()); + + // Prepare interval results + prepareContainerIntervalResultsByPod(containerDataResultsByPod, intervalResultsMap, intervalResults, podLabel, metricLabelMapByNameAndPod, imageMetricLabelsMap, resMap, + metricAggregationInfoResults, sTime, eTime, metricEntry, aggregationFunctionsEntry, value, format); + sTime = eTime; + } } } } catch (Exception e) { @@ -2302,9 +2339,11 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz } } - containerData.setResults(containerDataResults); - if (!containerDataResults.isEmpty()) - setInterval_end_time(Collections.max(containerDataResults.keySet())); //TODO Temp fix invalid date is set if experiment having two container with different last seen date + List> aggr_data = convertContainerResultsByPodMapToCSV(containerDataResultsByPod, metricLabelMapByNameAndPod, "input.csv"); + HashMap aggrContainerDataResults = convertAggrDataToIntervalResults(aggr_data); + containerData.setResults(aggrContainerDataResults); + if (!aggrContainerDataResults.isEmpty()) + setInterval_end_time(Collections.max(aggrContainerDataResults.keySet())); //TODO Temp fix invalid date is set if experiment having two container with different last seen date } } @@ -2334,7 +2373,7 @@ private String getMaxDateQuery(PerformanceProfile metricProfile, String metricNa */ private void prepareIntervalResults(Map dataResultsMap, IntervalResults intervalResults, HashMap resMap, MetricResults metricResults, - MetricAggregationInfoResults metricAggregationInfoResults, Timestamp sTime, Timestamp eTime, Metric metricEntry, + MetricAggregationInfoResults metricAggregationInfoResults, Timestamp sTime, Timestamp eTime, Metric metricEntry, String aggrMetricName, Map.Entry aggregationFunctionsEntry, double value, String format) throws Exception { try { if (dataResultsMap.containsKey(eTime)) { @@ -2344,7 +2383,84 @@ private void prepareIntervalResults(Map dataResultsM intervalResults = new IntervalResults(); resMap = new HashMap<>(); } + + String metricEntryName; + String metricEntryMethod; + + if(null == metricEntry && null == aggregationFunctionsEntry) { + metricEntryName = aggrMetricName.split("_")[0]; + metricEntryMethod = aggrMetricName.split("_")[1]; + } else { + metricEntryName = metricEntry.getName(); + metricEntryMethod = aggregationFunctionsEntry.getKey(); + } + + AnalyzerConstants.MetricName metricName = AnalyzerConstants.MetricName.valueOf(metricEntryName); + if (resMap.containsKey(metricName)) { + metricResults = resMap.get(metricName); + metricAggregationInfoResults = metricResults.getAggregationInfoResult(); + } else { + metricResults = new MetricResults(); + metricAggregationInfoResults = new MetricAggregationInfoResults(); + } + + Method method = MetricAggregationInfoResults.class.getDeclaredMethod(KruizeConstants.APIMessages.SET + metricEntryMethod.substring(0, 1).toUpperCase() + metricEntryMethod.substring(1), Double.class); + method.invoke(metricAggregationInfoResults, value); + metricAggregationInfoResults.setFormat(format); + metricResults.setAggregationInfoResult(metricAggregationInfoResults); + metricResults.setName(metricEntryName); + metricResults.setFormat(format); + resMap.put(metricName, metricResults); + intervalResults.setMetricResultsMap(resMap); + intervalResults.setIntervalStartTime(sTime); //Todo this will change + intervalResults.setIntervalEndTime(eTime); + intervalResults.setDurationInMinutes((double) ((eTime.getTime() - sTime.getTime()) + / ((long) KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE + * KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC))); + dataResultsMap.put(eTime, intervalResults); + sTime = eTime; + } catch (Exception e) { + e.printStackTrace(); + throw new Exception(AnalyzerErrorConstants.APIErrors.UpdateRecommendationsAPI.METRIC_EXCEPTION + e.getMessage()); + } + } + + /** + * prepares interval results for container experiments by pod + */ + private void prepareContainerIntervalResultsByPod(HashMap> dataResultsMap, HashMap intervalResultsMap, IntervalResults intervalResults, + String podLabel, HashMap>> metricLabelMap, HashMap imageMetricLabelsMap, HashMap resMap, + MetricAggregationInfoResults metricAggregationInfoResults, Timestamp sTime, Timestamp eTime, Metric metricEntry, + Map.Entry aggregationFunctionsEntry, double value, String format) throws Exception { + try { + if (dataResultsMap.containsKey(eTime)) { + intervalResultsMap = dataResultsMap.get(eTime); + if(intervalResultsMap.containsKey(podLabel)) { + intervalResults = intervalResultsMap.get(podLabel); + resMap = intervalResults.getMetricResultsMap(); + } else { + intervalResults = new IntervalResults(); + resMap = new HashMap<>(); + } + } else { + intervalResultsMap = new HashMap<>(); + intervalResults = new IntervalResults(); + resMap = new HashMap<>(); + } + + //check for metric labels + HashMap> labelsMap; + if (metricLabelMap.containsKey(eTime)) { + labelsMap = metricLabelMap.get(eTime); + } else { + labelsMap = new HashMap<>(); + } + + labelsMap.put(metricEntry.getName(), imageMetricLabelsMap); + metricLabelMap.put(eTime, labelsMap); + AnalyzerConstants.MetricName metricName = AnalyzerConstants.MetricName.valueOf(metricEntry.getName()); + MetricResults metricResults; if (resMap.containsKey(metricName)) { metricResults = resMap.get(metricName); metricAggregationInfoResults = metricResults.getAggregationInfoResult(); @@ -2366,7 +2482,8 @@ private void prepareIntervalResults(Map dataResultsM intervalResults.setDurationInMinutes((double) ((eTime.getTime() - sTime.getTime()) / ((long) KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE * KruizeConstants.TimeConv.NO_OF_MSECS_IN_SEC))); - dataResultsMap.put(eTime, intervalResults); + intervalResultsMap.put(podLabel, intervalResults); + dataResultsMap.put(eTime, intervalResultsMap); sTime = eTime; } catch (Exception e) { e.printStackTrace(); @@ -2396,5 +2513,497 @@ public List filterMetricsBasedOnExpTypeAndK8sObject(PerformanceProfile m }) .toList(); } + + public static List> convertContainerResultsByPodMapToCSV(HashMap> containerDataResults, + HashMap>> metricLabelMap, String outputFilePath) { + + // Debugging: Print the absolute path where the file will be created + File file = new File(outputFilePath); + LOGGER.info("Attempting to create file at: {}", file.getAbsolutePath()); + + // List of expected queries + List queries = Arrays.asList( + "cpuRequest", + "cpuLimit", + "cpuUsage", + "cpuThrottle", + "memoryRequest", + "memoryLimit", + "memoryUsage", + "memoryRSS" + ); + + int totalColumns = 42; + // A map to store existing rows, with a key being the podLabel + Map csvRowMap = new HashMap<>(); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + // Predefined labels for the CSV header + String header = "interval_end,interval_start,container,namespace,pod,owner_kind,owner_name,workload_type,workload," + + "cpuRequest_min,cpuRequest_max,cpuRequest_avg,cpuRequest_sum,cpuLimit_min," + + "cpuLimit_max,cpuLimit_avg,cpuLimit_sum,cpuUsage_min,cpuUsage_max,cpuUsage_avg,cpuUsage_sum," + + "cpuThrottle_min,cpuThrottle_max,cpuThrottle_avg,cpuThrottle_sum,memoryRequest_min,memoryRequest_max," + + "memoryRequest_avg,memoryRequest_sum,memoryLimit_min,memoryLimit_max,memoryLimit_avg,memoryLimit_sum," + + "memoryUsage_min,memoryUsage_max,memoryUsage_avg,memoryUsage_sum,memoryRSS_min,memoryRSS_max," + + "memoryRSS_avg,memoryRSS_sum,k8_object_type,k8_object_name"; + writer.write(header); + writer.newLine(); + + // Iterate over the timestamp entries + for (Timestamp timestamp : metricLabelMap.keySet()) { + HashMap> metricMap = metricLabelMap.get(timestamp); + // Iterate over the metric names + for (String metricName : metricMap.keySet()) { + HashMap labelMap = metricMap.get(metricName); + + if(labelMap == null) { + continue; + } + + // Iterate over the labels and corresponding objects + for (String label : labelMap.keySet()) { + // Create a key combining the timestamp and metric name + String rowKey = timestamp.toString() + "_" + label; + // Use the label as the key for the csvRowMap + String[] dataRow = csvRowMap.getOrDefault(rowKey, new String[totalColumns]); + + MetricLabels metricLabel = labelMap.get(label); + String container = metricLabel.getContainer(); + String namespace = metricLabel.getNamespace(); + + // If this is a new row, initialize with empty values and set the initial columns + if (!csvRowMap.containsKey(rowKey)) { + Arrays.fill(dataRow, ""); + + // Add timestamp and static values at predefined positions + dataRow[0] = timestamp.toString(); + dataRow[2] = container; + dataRow[3] = namespace; + dataRow[4] = label; + } + + if(metricName.equals("imageOwners")) { + dataRow[5] = metricLabel.getOwner_kind(); + dataRow[6] = metricLabel.getOwner_name(); + } else if(metricName.equals("imageWorkloads")) { + dataRow[7] = metricLabel.getWorkload_name(); + dataRow[8] = metricLabel.getWorkload_kind(); + } + + // Add placeholder values for additional columns + csvRowMap.put(rowKey, dataRow); + } + } + } + + for(Timestamp timestamp : containerDataResults.keySet()) { + HashMap intervalResultsMap = containerDataResults.get(timestamp); + + for(String podLabel: intervalResultsMap.keySet()) { + // Create a key combining the timestamp and pod name + String rowKey = timestamp.toString() + "_" + podLabel; + // Use the rowKey as the key for the csvRowMap + String[] dataRow = csvRowMap.get(rowKey); + IntervalResults intervalResults = intervalResultsMap.get(podLabel); + dataRow[1] = intervalResults.getIntervalStartTime().toString(); + + HashMap metricResultsMap = intervalResults.getMetricResultsMap(); + // Fill in values for each query + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + MetricResults metricResults = metricResultsMap.get(AnalyzerConstants.MetricName.valueOf(query)); + if(null != metricResults) { + MetricAggregationInfoResults aggregationInfoResults = metricResults.getAggregationInfoResult(); + // Append min, max, avg, sum for the current query + dataRow[i * 4 + 9] = aggregationInfoResults.getMin() != null ? aggregationInfoResults.getMin().toString() : ""; + dataRow[i * 4 + 10] = aggregationInfoResults.getMax()!= null ? aggregationInfoResults.getMax().toString() : ""; + dataRow[i * 4 + 11] = aggregationInfoResults.getAvg() != null ? aggregationInfoResults.getAvg().toString() : ""; + dataRow[i * 4 + 12] = aggregationInfoResults.getSum() != null ? aggregationInfoResults.getSum().toString() : ""; + } else { + dataRow[i * 4 + 9] = ""; + dataRow[i * 4 + 10] = ""; + dataRow[i * 4 + 11] = ""; + dataRow[i * 4 + 12] = ""; + } + } + csvRowMap.put(rowKey, dataRow); + } + } + + // After processing all data, write the rows to the CSV + // Write all rows to the CSV file + for (String[] dataRow : csvRowMap.values()) { + + if(null == dataRow[1] || dataRow[1].isEmpty()) { + continue; + } + // Convert the dataRow array into a comma-separated string + StringBuilder csvRow = new StringBuilder(); + for (String value : dataRow) { + csvRow.append(value).append(","); + } + + // Remove trailing comma + csvRow.setLength(csvRow.length() - 1); + + // Write the row to the CSV + writer.write(csvRow.toString()); + writer.newLine(); // Move to the next line + } + + LOGGER.info("CSV file created successfully: {}", outputFilePath); + // After creating the file, read and print its content + LOGGER.info("\nPrinting CSV contents:\n"); + printCsv(outputFilePath); + + LOGGER.info("***********************************"); + LOGGER.info("Aggregation of workloads"); + LOGGER.info("***********************************"); + List> aggregatedMetricData = aggregateWorkloads(header, outputFilePath, "output.csv"); + + return aggregatedMetricData; + } catch (IOException e) { + LOGGER.error(String.valueOf(e)); + } + return null; + } + + public static List> aggregateWorkloads(String header, String filename, String outputResults) throws IOException { + LOGGER.info("Aggregating the data for file: {}", filename); + + // Load the CSV file into a List of Maps + List> records = readCsv(filename); + + Iterator> iterator = records.iterator(); + + // Remove the rows if there is no owner_kind, owner_name and workload + // Expected to ignore rows which can be pods / invalid + while (iterator.hasNext()) { + Map row = iterator.next(); + String owner_kind = row.get("owner_kind"); + String owner_name = row.get("owner_name"); + String workload = row.get("workload"); + String workload_type = row.get("workload_type"); + + if (null == owner_kind || null == owner_name || null == workload || null == workload_type || owner_kind.isEmpty() || owner_name.isEmpty() || + workload.isEmpty() || workload_type.isEmpty()) { + iterator.remove(); + LOGGER.info("removing a row"); + } + } + + + // Ignore rows with 'workload_type' value of 'job' + records.removeIf(row -> "job".equals(row.get("workload_type"))); + + // Add 'k8_object_type' column + + // Based on the data observed, these are the assumptions: + // If owner_kind is 'ReplicaSet' and workload is '', actual workload_type is ReplicaSet + // If owner_kind is 'ReplicationCOntroller' and workload is '', actual workload_type is ReplicationController + // If owner_kind and workload has some names, workload_type is same as derived through queries. + for (Map row : records) { + String ownerKind = row.get("owner_kind"); + String workload = row.get("workload"); + if ("ReplicaSet".equals(ownerKind) && (null==workload)) { + row.put("k8_object_type", "replicaset"); + } else if ("ReplicationController".equals(ownerKind) && (null==workload)) { + row.put("k8_object_type", "replicationcontroller"); + } else { + row.put("k8_object_type", row.get("workload_type")); + } + } + + // Update 'k8_object_name' based on 'workload' + // If the workload is (which indicates ReplicaSet and ReplicationCOntroller - ignoring pods/invalid cases), the name of the k8_object can be owner_name. + // If the workload has some other name, the k8_object_name is same as workload. In this case, owner_name cannot be used as there can be multiple owner_names for the same deployment(considering there are multiple replicasets) + for (Map row : records) { + if (null != (row.get("workload"))) { + row.put("k8_object_name", row.get("workload")); + } else { + row.put("k8_object_name", row.get("owner_name")); + } + } + + // Sort and group the data based on below columns to get a container for a workload and for an interval. + records.sort(Comparator.comparing((Map row) -> row.get("namespace")) + .thenComparing(row -> row.get("k8_object_type")) + .thenComparing(row -> row.get("workload")) + .thenComparing(row -> row.get("container")) + .thenComparing(row -> row.get("interval_start"))); + + + // Create output directory to store the output CSV files + String outputDir = "output"; + Files.createDirectories(Paths.get(outputDir)); + + // Group records and write to separate CSV files + int counter = 0; + Map>> groupedData = new HashMap<>(); + for (Map record : records) { + String key = String.join("|", record.get("namespace"), record.get("k8_object_type"), + record.get("workload"), record.get("container"), record.get("interval_start")); + groupedData.computeIfAbsent(key, k -> new ArrayList<>()).add(record); + } + LOGGER.info("groupedData: {}", groupedData); + + for (List> group : groupedData.values()) { + counter++; + String fileName = "file_" + counter + ".csv"; + writeCsv(outputDir + "/" + fileName, group); + LOGGER.info("{}/{}", outputDir, fileName); + } + + // Initialize variables for aggregation + List headerRow = new ArrayList<>(); + Set columnsToIgnore = new HashSet<>(Arrays.asList("pod", "owner_name", "node")); + List> aggData = new ArrayList<>(); // To store non-duplicate rows + + // Map to hold aggregated data for each workload + Map>> workloadDataMap = new HashMap<>(); + + // Step 1: Iterate through CSV files in the output directory + for (String fileName : new File(outputDir).list()) { + if (fileName.endsWith(".csv")) { + List> currentData = readCsv(outputDir + "/" + fileName); + LOGGER.info("Processing file: {}", fileName); + + // Step 2: Initialize headers and check for "resource_id" + if (headerRow.isEmpty() && !currentData.isEmpty()) { + headerRow.addAll(currentData.get(0).keySet()); + if (headerRow.contains("resource_id")) { + columnsToIgnore.add("resource_id"); + } + } + + // Step 3: Group the data by 'workload' and aggregate metrics for each workload + for (Map row : currentData) { + String interval_start = row.get("interval_start"); + String container = row.get("container"); + String k8_object_type = row.get("k8_object_type"); + String namespace = row.get("namespace"); + String workload = row.get("workload"); // Assuming 'workload' is a key in your data + String key = interval_start + "|" + container + "|" + k8_object_type + "|" + namespace+ "|" + workload; + workloadDataMap.putIfAbsent(key, new ArrayList<>()); + workloadDataMap.get(key).add(row); + } + } + } + + // Step 4: Calculate aggregates (avg, min, max, sum) for each workload + for (String uniqueKey : workloadDataMap.keySet()) { + List> workloadRows = workloadDataMap.get(uniqueKey); + + Map aggregatedRow = new HashMap<>(); + // Split the unique key to extract namespace, workload, and start_time + String[] keyParts = uniqueKey.split("\\|"); + aggregatedRow.put("interval_start", keyParts[0]); + aggregatedRow.put("container", keyParts[1]); + aggregatedRow.put("k8_object_type", keyParts[2]); + aggregatedRow.put("namespace", keyParts[3]); + aggregatedRow.put("workload", keyParts[4]); + + for (String key : headerRow) { + if (columnsToIgnore.contains(key)) continue; // Skip ignored columns + + if (key.endsWith("avg")) { + double avg = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? 0.0 : Double.parseDouble(value); + }).average().orElse(0); + aggregatedRow.put(key, String.valueOf(avg)); + } else if (key.endsWith("min")) { + double min = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? Double.POSITIVE_INFINITY : Double.parseDouble(value); + }).min().orElse(Double.POSITIVE_INFINITY); + aggregatedRow.put(key, String.valueOf(min)); + } else if (key.endsWith("max")) { + double max = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? Double.NEGATIVE_INFINITY : Double.parseDouble(value); + }).max().orElse(Double.NEGATIVE_INFINITY); + aggregatedRow.put(key, String.valueOf(max)); + } else if (key.endsWith("sum")) { + double sum = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? 0.0 : Double.parseDouble(value); + }).sum(); + aggregatedRow.put(key, String.valueOf(sum)); + } else { + // For non-aggregatable columns, just use the value from the first row + aggregatedRow.put(key, workloadRows.get(0).get(key)); + } + } + + // Step 5: Add aggregated row to aggData + aggData.add(aggregatedRow); + } + for (Map row : aggData) { + // Remove keys from the row that are in the columnsToIgnore set + for (String column : columnsToIgnore) { + row.remove(column); + } + // Replace "Infinity" and "-Infinity" with empty strings in the row + for (String key : row.keySet()) { + String value = row.get(key); + if ("Infinity".equals(value) || "-Infinity".equals(value)) { + row.put(key, ""); // Set to empty string + } else if((key.contains("avg") || key.contains("sum")) && value.equals("0.0")) { + row.put(key, ""); + } + } + } + + // Write the final aggregated results to the output CSV + writeFinalResults(header, aggData, outputResults, columnsToIgnore); + + // Write aggregated data to final output CSV + printCsv(outputResults); + + return aggData; + } + + // Method to read CSV file and return List of Maps + private static List> readCsv(String filePath) throws IOException { + List> records = new ArrayList<>(); + List headers = new ArrayList<>(); + + try (BufferedReader br = Files.newBufferedReader(Paths.get(filePath))) { + String line; + boolean isHeader = true; + while ((line = br.readLine()) != null) { + String[] values = line.split(","); + if (isHeader) { + headers.addAll(Arrays.asList(values)); + isHeader = false; + } else { + Map rowMap = new HashMap<>(); + for (int i = 0; i < headers.size(); i++) { + if (i < values.length) { + rowMap.put(headers.get(i), values[i]); + } else { + rowMap.put(headers.get(i), null); // Handle missing columns + } + } + records.add(rowMap); + } + } + } + return records; + } + + // Write List of Maps to a CSV file + private static void writeCsv(String filePath, List> data){ + if (data.isEmpty()) return; + + try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(filePath))) { + // Write header + writer.write(String.join(",", data.get(0).keySet())); + writer.newLine(); + + // Write each row + for (Map row : data) { + writer.write(String.join(",", row.values())); + writer.newLine(); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + // Print CSV file contents + private static void printCsv(String filePath) { + LOGGER.info("Contents of {}", filePath + ":"); + try (BufferedReader br = Files.newBufferedReader(Paths.get(filePath))) { + String line; + while ((line = br.readLine()) != null) { + LOGGER.info(line); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + + // Write the final aggregated results to the output CSV file + private static void writeFinalResults(String headerRow, List> aggData, + String outputFile, Set columnsToIgnore) { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { + // Step 1: Write the header + List filteredHeader = new ArrayList<>(); + List headerColumns = List.of(headerRow.split(",")); + for (String column : headerColumns) { + if (!columnsToIgnore.contains(column)) { + filteredHeader.add(column); // Add only non-ignored columns to the filtered header + } + } + + // Step 2: Write the filtered header to the file + writer.write(String.join(",", filteredHeader)); + writer.newLine(); + + // Step 3: Write each row's data for only non-ignored columns + for (Map row : aggData) { + List filteredValues = new ArrayList<>(); + for (String column : filteredHeader) { + filteredValues.add(row.getOrDefault(column, "")); // Add only non-ignored column values + } + writer.write(String.join(",", filteredValues)); + writer.newLine(); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + } + + /* Converts aggregated data to container intervalResults */ + public HashMap convertAggrDataToIntervalResults(List> aggr_data) throws Exception { + HashMap containerDataResults = new HashMap<>(); + + List cpuFunction = Arrays.asList(AnalyzerConstants.MetricName.cpuUsage.toString(), AnalyzerConstants.MetricName.cpuThrottle.toString(), AnalyzerConstants.MetricName.cpuLimit.toString(), AnalyzerConstants.MetricName.cpuRequest.toString()); + List memFunction = Arrays.asList(AnalyzerConstants.MetricName.memoryLimit.toString(), AnalyzerConstants.MetricName.memoryRequest.toString(), AnalyzerConstants.MetricName.memoryRSS.toString(), AnalyzerConstants.MetricName.memoryUsage.toString()); + String format = null; + + IntervalResults intervalResults = new IntervalResults(); + MetricAggregationInfoResults metricAggregationInfoResults = new MetricAggregationInfoResults(); + HashMap resMap = new HashMap<>(); + MetricResults metricResults = null; + + + for(Map data_map:aggr_data) { + String e_timestamp = data_map.get("interval_end"); + Timestamp eTime = Timestamp.valueOf(e_timestamp); + + String s_timestamp = data_map.get("interval_start"); + Timestamp sTime = Timestamp.valueOf(s_timestamp); + + for(Map.Entry metric: data_map.entrySet()){ + String metricName = metric.getKey(); + String aggrMetricValue = metric.getValue(); + String metricNameWoFunction = metricName.split("_")[0]; + + + if(!cpuFunction.contains(metricNameWoFunction) && !memFunction.contains(metricNameWoFunction)) { + continue; + } + + if (null == aggrMetricValue || aggrMetricValue.isEmpty()) { + continue; + } + + double metricValue = Double.parseDouble(aggrMetricValue); + if (cpuFunction.contains(metricNameWoFunction)) { + format = KruizeConstants.JSONKeys.CORES; + } else if (memFunction.contains(metricNameWoFunction)) { + format = KruizeConstants.JSONKeys.BYTES; + } + + prepareIntervalResults(containerDataResults, intervalResults, resMap, metricResults, metricAggregationInfoResults, sTime, eTime, null, metricName, null, metricValue, format); + } + + } + return containerDataResults; + } } From ac207da0cc94419f953a7b8e8e22d029cb6bfd06 Mon Sep 17 00:00:00 2001 From: Shreya Date: Mon, 11 Nov 2024 19:42:07 +0530 Subject: [PATCH 4/5] Remove check to ignore 'job' workload_type --- .../analyzer/recommendations/engine/RecommendationEngine.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java index 397c08f64..75033d727 100644 --- a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java +++ b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java @@ -2692,10 +2692,6 @@ public static List> aggregateWorkloads(String header, String } } - - // Ignore rows with 'workload_type' value of 'job' - records.removeIf(row -> "job".equals(row.get("workload_type"))); - // Add 'k8_object_type' column // Based on the data observed, these are the assumptions: From ef3552af5ae01f4636ec736776d7398e9607ffc1 Mon Sep 17 00:00:00 2001 From: Shreya Date: Tue, 12 Nov 2024 10:26:43 +0530 Subject: [PATCH 5/5] Refactor code, moving helper functions to recommendation utils --- .../engine/RecommendationEngine.java | 569 ++++-------------- .../utils/RecommendationUtils.java | 360 ++++++++++- 2 files changed, 492 insertions(+), 437 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java index 75033d727..cd8d2bbac 100644 --- a/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java +++ b/src/main/java/com/autotune/analyzer/recommendations/engine/RecommendationEngine.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import static com.autotune.analyzer.recommendations.RecommendationConstants.RecommendationValueConstants.*; +import static com.autotune.analyzer.recommendations.utils.RecommendationUtils.*; import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.CHARACTER_ENCODING; import static com.autotune.analyzer.utils.AnalyzerErrorConstants.AutotuneObjectErrors.MISSING_EXPERIMENT_NAME; @@ -2107,6 +2108,9 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz MetricResults metricResults = null; MetricAggregationInfoResults metricAggregationInfoResults = null; + String inputFile = "input.csv"; + String outputFile = "output.csv"; + List metricList = filterMetricsBasedOnExpTypeAndK8sObject(metricProfile, AnalyzerConstants.MetricName.maxDate.name(), kruizeObject.getExperimentType()); @@ -2339,7 +2343,9 @@ private void fetchContainerMetricsBasedOnDataSourceAndProfile(KruizeObject kruiz } } - List> aggr_data = convertContainerResultsByPodMapToCSV(containerDataResultsByPod, metricLabelMapByNameAndPod, "input.csv"); + String header = generateCSVHeader(); + convertContainerResultsByPodMapToCSV(containerDataResultsByPod, metricLabelMapByNameAndPod, inputFile, header); + List> aggr_data = aggregateWorkloads(header, inputFile, outputFile); HashMap aggrContainerDataResults = convertAggrDataToIntervalResults(aggr_data); containerData.setResults(aggrContainerDataResults); if (!aggrContainerDataResults.isEmpty()) @@ -2514,242 +2520,8 @@ public List filterMetricsBasedOnExpTypeAndK8sObject(PerformanceProfile m .toList(); } - public static List> convertContainerResultsByPodMapToCSV(HashMap> containerDataResults, - HashMap>> metricLabelMap, String outputFilePath) { - - // Debugging: Print the absolute path where the file will be created - File file = new File(outputFilePath); - LOGGER.info("Attempting to create file at: {}", file.getAbsolutePath()); - - // List of expected queries - List queries = Arrays.asList( - "cpuRequest", - "cpuLimit", - "cpuUsage", - "cpuThrottle", - "memoryRequest", - "memoryLimit", - "memoryUsage", - "memoryRSS" - ); - - int totalColumns = 42; - // A map to store existing rows, with a key being the podLabel - Map csvRowMap = new HashMap<>(); - - try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { - // Predefined labels for the CSV header - String header = "interval_end,interval_start,container,namespace,pod,owner_kind,owner_name,workload_type,workload," + - "cpuRequest_min,cpuRequest_max,cpuRequest_avg,cpuRequest_sum,cpuLimit_min," - + "cpuLimit_max,cpuLimit_avg,cpuLimit_sum,cpuUsage_min,cpuUsage_max,cpuUsage_avg,cpuUsage_sum," + - "cpuThrottle_min,cpuThrottle_max,cpuThrottle_avg,cpuThrottle_sum,memoryRequest_min,memoryRequest_max," + - "memoryRequest_avg,memoryRequest_sum,memoryLimit_min,memoryLimit_max,memoryLimit_avg,memoryLimit_sum," + - "memoryUsage_min,memoryUsage_max,memoryUsage_avg,memoryUsage_sum,memoryRSS_min,memoryRSS_max," + - "memoryRSS_avg,memoryRSS_sum,k8_object_type,k8_object_name"; - writer.write(header); - writer.newLine(); - - // Iterate over the timestamp entries - for (Timestamp timestamp : metricLabelMap.keySet()) { - HashMap> metricMap = metricLabelMap.get(timestamp); - // Iterate over the metric names - for (String metricName : metricMap.keySet()) { - HashMap labelMap = metricMap.get(metricName); - - if(labelMap == null) { - continue; - } - - // Iterate over the labels and corresponding objects - for (String label : labelMap.keySet()) { - // Create a key combining the timestamp and metric name - String rowKey = timestamp.toString() + "_" + label; - // Use the label as the key for the csvRowMap - String[] dataRow = csvRowMap.getOrDefault(rowKey, new String[totalColumns]); - - MetricLabels metricLabel = labelMap.get(label); - String container = metricLabel.getContainer(); - String namespace = metricLabel.getNamespace(); - - // If this is a new row, initialize with empty values and set the initial columns - if (!csvRowMap.containsKey(rowKey)) { - Arrays.fill(dataRow, ""); - - // Add timestamp and static values at predefined positions - dataRow[0] = timestamp.toString(); - dataRow[2] = container; - dataRow[3] = namespace; - dataRow[4] = label; - } - - if(metricName.equals("imageOwners")) { - dataRow[5] = metricLabel.getOwner_kind(); - dataRow[6] = metricLabel.getOwner_name(); - } else if(metricName.equals("imageWorkloads")) { - dataRow[7] = metricLabel.getWorkload_name(); - dataRow[8] = metricLabel.getWorkload_kind(); - } - - // Add placeholder values for additional columns - csvRowMap.put(rowKey, dataRow); - } - } - } - - for(Timestamp timestamp : containerDataResults.keySet()) { - HashMap intervalResultsMap = containerDataResults.get(timestamp); - - for(String podLabel: intervalResultsMap.keySet()) { - // Create a key combining the timestamp and pod name - String rowKey = timestamp.toString() + "_" + podLabel; - // Use the rowKey as the key for the csvRowMap - String[] dataRow = csvRowMap.get(rowKey); - IntervalResults intervalResults = intervalResultsMap.get(podLabel); - dataRow[1] = intervalResults.getIntervalStartTime().toString(); - - HashMap metricResultsMap = intervalResults.getMetricResultsMap(); - // Fill in values for each query - for (int i = 0; i < queries.size(); i++) { - String query = queries.get(i); - MetricResults metricResults = metricResultsMap.get(AnalyzerConstants.MetricName.valueOf(query)); - if(null != metricResults) { - MetricAggregationInfoResults aggregationInfoResults = metricResults.getAggregationInfoResult(); - // Append min, max, avg, sum for the current query - dataRow[i * 4 + 9] = aggregationInfoResults.getMin() != null ? aggregationInfoResults.getMin().toString() : ""; - dataRow[i * 4 + 10] = aggregationInfoResults.getMax()!= null ? aggregationInfoResults.getMax().toString() : ""; - dataRow[i * 4 + 11] = aggregationInfoResults.getAvg() != null ? aggregationInfoResults.getAvg().toString() : ""; - dataRow[i * 4 + 12] = aggregationInfoResults.getSum() != null ? aggregationInfoResults.getSum().toString() : ""; - } else { - dataRow[i * 4 + 9] = ""; - dataRow[i * 4 + 10] = ""; - dataRow[i * 4 + 11] = ""; - dataRow[i * 4 + 12] = ""; - } - } - csvRowMap.put(rowKey, dataRow); - } - } - - // After processing all data, write the rows to the CSV - // Write all rows to the CSV file - for (String[] dataRow : csvRowMap.values()) { - - if(null == dataRow[1] || dataRow[1].isEmpty()) { - continue; - } - // Convert the dataRow array into a comma-separated string - StringBuilder csvRow = new StringBuilder(); - for (String value : dataRow) { - csvRow.append(value).append(","); - } - - // Remove trailing comma - csvRow.setLength(csvRow.length() - 1); - - // Write the row to the CSV - writer.write(csvRow.toString()); - writer.newLine(); // Move to the next line - } - - LOGGER.info("CSV file created successfully: {}", outputFilePath); - // After creating the file, read and print its content - LOGGER.info("\nPrinting CSV contents:\n"); - printCsv(outputFilePath); - - LOGGER.info("***********************************"); - LOGGER.info("Aggregation of workloads"); - LOGGER.info("***********************************"); - List> aggregatedMetricData = aggregateWorkloads(header, outputFilePath, "output.csv"); - - return aggregatedMetricData; - } catch (IOException e) { - LOGGER.error(String.valueOf(e)); - } - return null; - } - public static List> aggregateWorkloads(String header, String filename, String outputResults) throws IOException { LOGGER.info("Aggregating the data for file: {}", filename); - - // Load the CSV file into a List of Maps - List> records = readCsv(filename); - - Iterator> iterator = records.iterator(); - - // Remove the rows if there is no owner_kind, owner_name and workload - // Expected to ignore rows which can be pods / invalid - while (iterator.hasNext()) { - Map row = iterator.next(); - String owner_kind = row.get("owner_kind"); - String owner_name = row.get("owner_name"); - String workload = row.get("workload"); - String workload_type = row.get("workload_type"); - - if (null == owner_kind || null == owner_name || null == workload || null == workload_type || owner_kind.isEmpty() || owner_name.isEmpty() || - workload.isEmpty() || workload_type.isEmpty()) { - iterator.remove(); - LOGGER.info("removing a row"); - } - } - - // Add 'k8_object_type' column - - // Based on the data observed, these are the assumptions: - // If owner_kind is 'ReplicaSet' and workload is '', actual workload_type is ReplicaSet - // If owner_kind is 'ReplicationCOntroller' and workload is '', actual workload_type is ReplicationController - // If owner_kind and workload has some names, workload_type is same as derived through queries. - for (Map row : records) { - String ownerKind = row.get("owner_kind"); - String workload = row.get("workload"); - if ("ReplicaSet".equals(ownerKind) && (null==workload)) { - row.put("k8_object_type", "replicaset"); - } else if ("ReplicationController".equals(ownerKind) && (null==workload)) { - row.put("k8_object_type", "replicationcontroller"); - } else { - row.put("k8_object_type", row.get("workload_type")); - } - } - - // Update 'k8_object_name' based on 'workload' - // If the workload is (which indicates ReplicaSet and ReplicationCOntroller - ignoring pods/invalid cases), the name of the k8_object can be owner_name. - // If the workload has some other name, the k8_object_name is same as workload. In this case, owner_name cannot be used as there can be multiple owner_names for the same deployment(considering there are multiple replicasets) - for (Map row : records) { - if (null != (row.get("workload"))) { - row.put("k8_object_name", row.get("workload")); - } else { - row.put("k8_object_name", row.get("owner_name")); - } - } - - // Sort and group the data based on below columns to get a container for a workload and for an interval. - records.sort(Comparator.comparing((Map row) -> row.get("namespace")) - .thenComparing(row -> row.get("k8_object_type")) - .thenComparing(row -> row.get("workload")) - .thenComparing(row -> row.get("container")) - .thenComparing(row -> row.get("interval_start"))); - - - // Create output directory to store the output CSV files - String outputDir = "output"; - Files.createDirectories(Paths.get(outputDir)); - - // Group records and write to separate CSV files - int counter = 0; - Map>> groupedData = new HashMap<>(); - for (Map record : records) { - String key = String.join("|", record.get("namespace"), record.get("k8_object_type"), - record.get("workload"), record.get("container"), record.get("interval_start")); - groupedData.computeIfAbsent(key, k -> new ArrayList<>()).add(record); - } - LOGGER.info("groupedData: {}", groupedData); - - for (List> group : groupedData.values()) { - counter++; - String fileName = "file_" + counter + ".csv"; - writeCsv(outputDir + "/" + fileName, group); - LOGGER.info("{}/{}", outputDir, fileName); - } - // Initialize variables for aggregation List headerRow = new ArrayList<>(); Set columnsToIgnore = new HashSet<>(Arrays.asList("pod", "owner_name", "node")); @@ -2758,246 +2530,171 @@ public static List> aggregateWorkloads(String header, String // Map to hold aggregated data for each workload Map>> workloadDataMap = new HashMap<>(); - // Step 1: Iterate through CSV files in the output directory - for (String fileName : new File(outputDir).list()) { - if (fileName.endsWith(".csv")) { - List> currentData = readCsv(outputDir + "/" + fileName); - LOGGER.info("Processing file: {}", fileName); - - // Step 2: Initialize headers and check for "resource_id" - if (headerRow.isEmpty() && !currentData.isEmpty()) { - headerRow.addAll(currentData.get(0).keySet()); - if (headerRow.contains("resource_id")) { - columnsToIgnore.add("resource_id"); - } - } - - // Step 3: Group the data by 'workload' and aggregate metrics for each workload - for (Map row : currentData) { - String interval_start = row.get("interval_start"); - String container = row.get("container"); - String k8_object_type = row.get("k8_object_type"); - String namespace = row.get("namespace"); - String workload = row.get("workload"); // Assuming 'workload' is a key in your data - String key = interval_start + "|" + container + "|" + k8_object_type + "|" + namespace+ "|" + workload; - workloadDataMap.putIfAbsent(key, new ArrayList<>()); - workloadDataMap.get(key).add(row); - } + try { + // Load the CSV file into a List of Maps + List> records = readCsv(filename); + + // Remove the rows if there is no owner_kind, owner_name and workload + // Expected to ignore rows which can be pods / invalid + for (Map row: records) { + String owner_kind = row.get("owner_kind"); + String owner_name = row.get("owner_name"); + String workload = row.get("workload"); + String workload_type = row.get("workload_type"); + + records.removeIf(map -> null == owner_kind || null == owner_name || null == workload || null == workload_type || owner_kind.isEmpty() || owner_name.isEmpty() || + workload.isEmpty() || workload_type.isEmpty()); } - } - // Step 4: Calculate aggregates (avg, min, max, sum) for each workload - for (String uniqueKey : workloadDataMap.keySet()) { - List> workloadRows = workloadDataMap.get(uniqueKey); - - Map aggregatedRow = new HashMap<>(); - // Split the unique key to extract namespace, workload, and start_time - String[] keyParts = uniqueKey.split("\\|"); - aggregatedRow.put("interval_start", keyParts[0]); - aggregatedRow.put("container", keyParts[1]); - aggregatedRow.put("k8_object_type", keyParts[2]); - aggregatedRow.put("namespace", keyParts[3]); - aggregatedRow.put("workload", keyParts[4]); - - for (String key : headerRow) { - if (columnsToIgnore.contains(key)) continue; // Skip ignored columns - - if (key.endsWith("avg")) { - double avg = workloadRows.stream().mapToDouble(r -> { - String value = r.get(key); - return (null == value || value.isEmpty() || value.equals("null")) ? 0.0 : Double.parseDouble(value); - }).average().orElse(0); - aggregatedRow.put(key, String.valueOf(avg)); - } else if (key.endsWith("min")) { - double min = workloadRows.stream().mapToDouble(r -> { - String value = r.get(key); - return (null == value || value.isEmpty() || value.equals("null")) ? Double.POSITIVE_INFINITY : Double.parseDouble(value); - }).min().orElse(Double.POSITIVE_INFINITY); - aggregatedRow.put(key, String.valueOf(min)); - } else if (key.endsWith("max")) { - double max = workloadRows.stream().mapToDouble(r -> { - String value = r.get(key); - return (null == value || value.isEmpty() || value.equals("null")) ? Double.NEGATIVE_INFINITY : Double.parseDouble(value); - }).max().orElse(Double.NEGATIVE_INFINITY); - aggregatedRow.put(key, String.valueOf(max)); - } else if (key.endsWith("sum")) { - double sum = workloadRows.stream().mapToDouble(r -> { - String value = r.get(key); - return (null == value || value.isEmpty() || value.equals("null")) ? 0.0 : Double.parseDouble(value); - }).sum(); - aggregatedRow.put(key, String.valueOf(sum)); + // Add 'k8_object_type' column + + // Based on the data observed, these are the assumptions: + // If owner_kind is 'ReplicaSet' and workload is '', actual workload_type is ReplicaSet + // If owner_kind is 'ReplicationCOntroller' and workload is '', actual workload_type is ReplicationController + // If owner_kind and workload has some names, workload_type is same as derived through queries. + for (Map row : records) { + String ownerKind = row.get("owner_kind"); + String workload = row.get("workload"); + if ("ReplicaSet".equals(ownerKind) && (null == workload)) { + row.put("k8_object_type", "replicaset"); + } else if ("ReplicationController".equals(ownerKind) && (null == workload)) { + row.put("k8_object_type", "replicationcontroller"); } else { - // For non-aggregatable columns, just use the value from the first row - aggregatedRow.put(key, workloadRows.get(0).get(key)); + row.put("k8_object_type", row.get("workload_type")); } } - // Step 5: Add aggregated row to aggData - aggData.add(aggregatedRow); - } - for (Map row : aggData) { - // Remove keys from the row that are in the columnsToIgnore set - for (String column : columnsToIgnore) { - row.remove(column); - } - // Replace "Infinity" and "-Infinity" with empty strings in the row - for (String key : row.keySet()) { - String value = row.get(key); - if ("Infinity".equals(value) || "-Infinity".equals(value)) { - row.put(key, ""); // Set to empty string - } else if((key.contains("avg") || key.contains("sum")) && value.equals("0.0")) { - row.put(key, ""); + // Update 'k8_object_name' based on 'workload' + // If the workload is (which indicates ReplicaSet and ReplicationCOntroller - ignoring pods/invalid cases), the name of the k8_object can be owner_name. + // If the workload has some other name, the k8_object_name is same as workload. In this case, owner_name cannot be used as there can be multiple owner_names for the same deployment(considering there are multiple replicasets) + for (Map row : records) { + if (null != (row.get("workload"))) { + row.put("k8_object_name", row.get("workload")); + } else { + row.put("k8_object_name", row.get("owner_name")); } } - } - - // Write the final aggregated results to the output CSV - writeFinalResults(header, aggData, outputResults, columnsToIgnore); - // Write aggregated data to final output CSV - printCsv(outputResults); + // Sort and group the data based on below columns to get a container for a workload and for an interval. + records.sort(Comparator.comparing((Map row) -> row.get("namespace")) + .thenComparing(row -> row.get("k8_object_type")) + .thenComparing(row -> row.get("workload")) + .thenComparing(row -> row.get("container")) + .thenComparing(row -> row.get("interval_start"))); + + + // Create output directory to store the output CSV files + String outputDir = "output"; + Files.createDirectories(Paths.get(outputDir)); + + // Group records and write to separate CSV files + int counter = 0; + Map>> groupedData = new HashMap<>(); + for (Map record : records) { + String key = String.join("|", record.get("namespace"), record.get("k8_object_type"), + record.get("workload"), record.get("container"), record.get("interval_start")); + groupedData.computeIfAbsent(key, k -> new ArrayList<>()).add(record); + } + LOGGER.info("groupedData: {}", groupedData); - return aggData; - } + for (List> group : groupedData.values()) { + counter++; + String fileName = "file_" + counter + ".csv"; + writeCsv(outputDir + "/" + fileName, group); + LOGGER.info("{}/{}", outputDir, fileName); + } - // Method to read CSV file and return List of Maps - private static List> readCsv(String filePath) throws IOException { - List> records = new ArrayList<>(); - List headers = new ArrayList<>(); - - try (BufferedReader br = Files.newBufferedReader(Paths.get(filePath))) { - String line; - boolean isHeader = true; - while ((line = br.readLine()) != null) { - String[] values = line.split(","); - if (isHeader) { - headers.addAll(Arrays.asList(values)); - isHeader = false; - } else { - Map rowMap = new HashMap<>(); - for (int i = 0; i < headers.size(); i++) { - if (i < values.length) { - rowMap.put(headers.get(i), values[i]); - } else { - rowMap.put(headers.get(i), null); // Handle missing columns + // Step 1: Iterate through CSV files in the output directory + for (String fileName : new File(outputDir).list()) { + if (fileName.endsWith(".csv")) { + List> currentData = readCsv(outputDir + "/" + fileName); + LOGGER.info("Processing file: {}", fileName); + + // Step 2: Initialize headers and check for "resource_id" + if (headerRow.isEmpty() && !currentData.isEmpty()) { + headerRow.addAll(currentData.get(0).keySet()); + if (headerRow.contains("resource_id")) { + columnsToIgnore.add("resource_id"); } } - records.add(rowMap); + + // Step 3: Group the data by 'workload' and aggregate metrics for each workload + for (Map row : currentData) { + String interval_start = row.get("interval_start"); + String container = row.get("container"); + String k8_object_type = row.get("k8_object_type"); + String namespace = row.get("namespace"); + String workload = row.get("workload"); + String key = interval_start + "|" + container + "|" + k8_object_type + "|" + namespace + "|" + workload; + workloadDataMap.putIfAbsent(key, new ArrayList<>()); + workloadDataMap.get(key).add(row); + } } } - } - return records; - } - // Write List of Maps to a CSV file - private static void writeCsv(String filePath, List> data){ - if (data.isEmpty()) return; + // Step 4: Calculate aggregates (avg, min, max, sum) for each workload + calculateAggregateValuesForWorkload(workloadDataMap, aggData, headerRow, columnsToIgnore); - try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(filePath))) { - // Write header - writer.write(String.join(",", data.get(0).keySet())); - writer.newLine(); + // Write the final aggregated results to the output CSV + writeFinalResults(header, aggData, outputResults, columnsToIgnore); - // Write each row - for (Map row : data) { - writer.write(String.join(",", row.values())); - writer.newLine(); - } - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } + // Write aggregated data to final output CSV + printCsv(outputResults); - // Print CSV file contents - private static void printCsv(String filePath) { - LOGGER.info("Contents of {}", filePath + ":"); - try (BufferedReader br = Files.newBufferedReader(Paths.get(filePath))) { - String line; - while ((line = br.readLine()) != null) { - LOGGER.info(line); - } } catch (Exception e) { LOGGER.error(e.getMessage()); } - } - - - // Write the final aggregated results to the output CSV file - private static void writeFinalResults(String headerRow, List> aggData, - String outputFile, Set columnsToIgnore) { - try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { - // Step 1: Write the header - List filteredHeader = new ArrayList<>(); - List headerColumns = List.of(headerRow.split(",")); - for (String column : headerColumns) { - if (!columnsToIgnore.contains(column)) { - filteredHeader.add(column); // Add only non-ignored columns to the filtered header - } - } - - // Step 2: Write the filtered header to the file - writer.write(String.join(",", filteredHeader)); - writer.newLine(); - - // Step 3: Write each row's data for only non-ignored columns - for (Map row : aggData) { - List filteredValues = new ArrayList<>(); - for (String column : filteredHeader) { - filteredValues.add(row.getOrDefault(column, "")); // Add only non-ignored column values - } - writer.write(String.join(",", filteredValues)); - writer.newLine(); - } - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } + return aggData.isEmpty() ? null: aggData; } /* Converts aggregated data to container intervalResults */ public HashMap convertAggrDataToIntervalResults(List> aggr_data) throws Exception { HashMap containerDataResults = new HashMap<>(); - List cpuFunction = Arrays.asList(AnalyzerConstants.MetricName.cpuUsage.toString(), AnalyzerConstants.MetricName.cpuThrottle.toString(), AnalyzerConstants.MetricName.cpuLimit.toString(), AnalyzerConstants.MetricName.cpuRequest.toString()); - List memFunction = Arrays.asList(AnalyzerConstants.MetricName.memoryLimit.toString(), AnalyzerConstants.MetricName.memoryRequest.toString(), AnalyzerConstants.MetricName.memoryRSS.toString(), AnalyzerConstants.MetricName.memoryUsage.toString()); - String format = null; + try { + List cpuFunction = Arrays.asList(AnalyzerConstants.MetricName.cpuUsage.toString(), AnalyzerConstants.MetricName.cpuThrottle.toString(), AnalyzerConstants.MetricName.cpuLimit.toString(), AnalyzerConstants.MetricName.cpuRequest.toString()); + List memFunction = Arrays.asList(AnalyzerConstants.MetricName.memoryLimit.toString(), AnalyzerConstants.MetricName.memoryRequest.toString(), AnalyzerConstants.MetricName.memoryRSS.toString(), AnalyzerConstants.MetricName.memoryUsage.toString()); + String format = null; - IntervalResults intervalResults = new IntervalResults(); - MetricAggregationInfoResults metricAggregationInfoResults = new MetricAggregationInfoResults(); - HashMap resMap = new HashMap<>(); - MetricResults metricResults = null; + IntervalResults intervalResults = new IntervalResults(); + MetricAggregationInfoResults metricAggregationInfoResults = new MetricAggregationInfoResults(); + HashMap resMap = new HashMap<>(); + MetricResults metricResults = null; - for(Map data_map:aggr_data) { - String e_timestamp = data_map.get("interval_end"); - Timestamp eTime = Timestamp.valueOf(e_timestamp); + for (Map data_map : aggr_data) { + String e_timestamp = data_map.get("interval_end"); + Timestamp eTime = Timestamp.valueOf(e_timestamp); - String s_timestamp = data_map.get("interval_start"); - Timestamp sTime = Timestamp.valueOf(s_timestamp); + String s_timestamp = data_map.get("interval_start"); + Timestamp sTime = Timestamp.valueOf(s_timestamp); - for(Map.Entry metric: data_map.entrySet()){ - String metricName = metric.getKey(); - String aggrMetricValue = metric.getValue(); - String metricNameWoFunction = metricName.split("_")[0]; + for (Map.Entry metric : data_map.entrySet()) { + String metricName = metric.getKey(); + String aggrMetricValue = metric.getValue(); + String metricNameWoFunction = metricName.split("_")[0]; - if(!cpuFunction.contains(metricNameWoFunction) && !memFunction.contains(metricNameWoFunction)) { - continue; - } + if (!cpuFunction.contains(metricNameWoFunction) && !memFunction.contains(metricNameWoFunction)) { + continue; + } - if (null == aggrMetricValue || aggrMetricValue.isEmpty()) { - continue; - } + if (null == aggrMetricValue || aggrMetricValue.isEmpty()) { + continue; + } - double metricValue = Double.parseDouble(aggrMetricValue); - if (cpuFunction.contains(metricNameWoFunction)) { - format = KruizeConstants.JSONKeys.CORES; - } else if (memFunction.contains(metricNameWoFunction)) { - format = KruizeConstants.JSONKeys.BYTES; - } + double metricValue = Double.parseDouble(aggrMetricValue); + if (cpuFunction.contains(metricNameWoFunction)) { + format = KruizeConstants.JSONKeys.CORES; + } else if (memFunction.contains(metricNameWoFunction)) { + format = KruizeConstants.JSONKeys.BYTES; + } - prepareIntervalResults(containerDataResults, intervalResults, resMap, metricResults, metricAggregationInfoResults, sTime, eTime, null, metricName, null, metricValue, format); + prepareIntervalResults(containerDataResults, intervalResults, resMap, metricResults, metricAggregationInfoResults, sTime, eTime, null, metricName, null, metricValue, format); + } } - + } catch (Exception e) { + LOGGER.error(e.getMessage()); } return containerDataResults; } diff --git a/src/main/java/com/autotune/analyzer/recommendations/utils/RecommendationUtils.java b/src/main/java/com/autotune/analyzer/recommendations/utils/RecommendationUtils.java index 45085f33c..7bddcf9c1 100644 --- a/src/main/java/com/autotune/analyzer/recommendations/utils/RecommendationUtils.java +++ b/src/main/java/com/autotune/analyzer/recommendations/utils/RecommendationUtils.java @@ -5,6 +5,8 @@ import com.autotune.analyzer.recommendations.RecommendationConstants; import com.autotune.analyzer.recommendations.term.Terms; import com.autotune.analyzer.utils.AnalyzerConstants; +import com.autotune.common.data.metrics.MetricAggregationInfoResults; +import com.autotune.common.data.metrics.MetricLabels; import com.autotune.common.data.metrics.MetricResults; import com.autotune.common.data.result.ContainerData; import com.autotune.common.data.result.IntervalResults; @@ -20,8 +22,10 @@ import com.autotune.common.data.system.info.device.accelerator.metadata.AcceleratorMetaDataService; import com.autotune.common.data.system.info.device.accelerator.metadata.AcceleratorProfile; -import java.io.IOException; +import java.io.*; import java.net.URLEncoder; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -370,5 +374,359 @@ public static String getSupportedModelBasedOnModelName(String modelName) { return null; } + + /** + * Generates CSV header row including aggregation functions + * @return header row with all the metrics as columns + */ + public static String generateCSVHeader(){ + + // StringBuilder to collect all columns + StringBuilder finalHeaderRow = new StringBuilder(); + + try { + List metricsWithoutAggrFunctions = Arrays.asList("interval_end", "interval_start", "container", + "namespace", "pod", "owner_kind", "owner_name", "workload_type", "workload", "k8_object_type", "k8_object_name"); + + List metricsWithAggrFunctions = Arrays.asList("cpuRequest", "cpuLimit", "cpuUsage", "cpuThrottle", + "memoryRequest", "memoryLimit", "memoryUsage", "memoryRSS"); + + List aggrFunctions = Arrays.asList("min", "max", "avg", "sum"); + + // Loop through main headers and append them to the StringBuilder + for (String currentHeader : metricsWithoutAggrFunctions) { + finalHeaderRow.append(currentHeader).append(","); + } + + for (String query : metricsWithAggrFunctions) { + for (String subHeader : aggrFunctions) { + String dynamicHeader = query + "_" + subHeader; + finalHeaderRow.append(dynamicHeader).append(","); + } + } + + // Remove trailing comma at the end + if (!finalHeaderRow.isEmpty()) { + finalHeaderRow.deleteCharAt(finalHeaderRow.length() - 1); + } + } catch (Exception e) { + LOGGER.info(e.getMessage()); + } + return finalHeaderRow.toString(); + } + + /** + * Converts input query data of all the metrics to CSV file + * @param containerDataResults + * @param metricLabelMap + * @param inputFilePath + * @param header + */ + public static void convertContainerResultsByPodMapToCSV(HashMap> containerDataResults, + HashMap>> metricLabelMap, String inputFilePath, String header) { + + File file = new File(inputFilePath); + LOGGER.info("Attempting to create file at: {}", file.getAbsolutePath()); + + // List of expected queries + List queries = Arrays.asList( + "cpuRequest", + "cpuLimit", + "cpuUsage", + "cpuThrottle", + "memoryRequest", + "memoryLimit", + "memoryUsage", + "memoryRSS" + ); + + // A map to store existing rows, with key being the podLabel + Map csvRowMap = new HashMap<>(); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + // Predefined labels for the CSV header + //String header = generateCSVHeader(); + int totalColumns = (Arrays.asList(header.split("\\,")).size())+1; + writer.write(header); + writer.newLine(); + + // Iterate over the timestamp entries + for (Timestamp timestamp : metricLabelMap.keySet()) { + HashMap> metricMap = metricLabelMap.get(timestamp); + // Iterate over the metric names + for (String metricName : metricMap.keySet()) { + HashMap labelMap = metricMap.get(metricName); + + if(labelMap == null) { + continue; + } + + // Iterate over the labels and corresponding objects + for (String label : labelMap.keySet()) { + // Create a key combining the timestamp and metric name + String rowKey = timestamp.toString() + "_" + label; + // Use the label as the key for the csvRowMap + String[] dataRow = csvRowMap.getOrDefault(rowKey, new String[totalColumns]); + + MetricLabels metricLabel = labelMap.get(label); + String container = metricLabel.getContainer(); + String namespace = metricLabel.getNamespace(); + + // If this is a new row, initialize with empty values and set the initial columns + if (!csvRowMap.containsKey(rowKey)) { + Arrays.fill(dataRow, ""); + + // Add timestamp and static values at predefined positions + dataRow[0] = timestamp.toString(); + dataRow[2] = container; + dataRow[3] = namespace; + dataRow[4] = label; + } + + if(metricName.equals("imageOwners")) { + dataRow[5] = metricLabel.getOwner_kind(); + dataRow[6] = metricLabel.getOwner_name(); + } else if(metricName.equals("imageWorkloads")) { + dataRow[7] = metricLabel.getWorkload_name(); + dataRow[8] = metricLabel.getWorkload_kind(); + } + + // Add placeholder values for additional columns + csvRowMap.put(rowKey, dataRow); + } + } + } + + for(Timestamp timestamp : containerDataResults.keySet()) { + HashMap intervalResultsMap = containerDataResults.get(timestamp); + + for(String podLabel: intervalResultsMap.keySet()) { + // Create a key combining the timestamp and pod name + String rowKey = timestamp.toString() + "_" + podLabel; + // Use the rowKey as key for the csvRowMap + String[] dataRow = csvRowMap.get(rowKey); + IntervalResults intervalResults = intervalResultsMap.get(podLabel); + dataRow[1] = intervalResults.getIntervalStartTime().toString(); + + HashMap metricResultsMap = intervalResults.getMetricResultsMap(); + // Fill in values for each query + for (int i = 0; i < queries.size(); i++) { + String query = queries.get(i); + MetricResults metricResults = metricResultsMap.get(AnalyzerConstants.MetricName.valueOf(query)); + int index = i * 4 + 11; + if(null != metricResults) { + MetricAggregationInfoResults aggregationInfoResults = metricResults.getAggregationInfoResult(); + // Append min, max, avg, sum for the current query + dataRow[index] = aggregationInfoResults.getMin() != null ? aggregationInfoResults.getMin().toString() : ""; + dataRow[index + 1] = aggregationInfoResults.getMax()!= null ? aggregationInfoResults.getMax().toString() : ""; + dataRow[index + 2] = aggregationInfoResults.getAvg() != null ? aggregationInfoResults.getAvg().toString() : ""; + dataRow[index + 3] = aggregationInfoResults.getSum() != null ? aggregationInfoResults.getSum().toString() : ""; + } else { + dataRow[index] = ""; + dataRow[index + 1] = ""; + dataRow[index + 2] = ""; + dataRow[index + 3] = ""; + } + } + csvRowMap.put(rowKey, dataRow); + } + } + + // After processing all data, write the rows to the CSV + // Write all rows to the CSV file + for (String[] dataRow : csvRowMap.values()) { + + if(null == dataRow[1] || dataRow[1].isEmpty()) { + continue; + } + // Convert the dataRow array into a comma-separated string + StringBuilder csvRow = new StringBuilder(); + for (String value : dataRow) { + csvRow.append(value).append(","); + } + + // Remove trailing comma + csvRow.setLength(csvRow.length() - 1); + + // Write the row to the CSV + writer.write(csvRow.toString()); + writer.newLine(); // Move to the next line + } + + LOGGER.info("CSV file created successfully: {}", inputFilePath); + // TODO: remove before merging + printCsv(inputFilePath); + } catch (IOException e) { + LOGGER.error(String.valueOf(e)); + } + } + + /** + * Calculate aggregates (avg, min, max, sum) for each workload + */ + public static void calculateAggregateValuesForWorkload(Map>> workloadDataMap, + List> aggData, List headerRow, Set columnsToIgnore) { + try { + for (String uniqueKey : workloadDataMap.keySet()) { + List> workloadRows = workloadDataMap.get(uniqueKey); + + Map aggregatedRow = new HashMap<>(); + // Split the unique key to extract namespace, workload, and start_time + String[] keyParts = uniqueKey.split("\\|"); + aggregatedRow.put("interval_start", keyParts[0]); + aggregatedRow.put("container", keyParts[1]); + aggregatedRow.put("k8_object_type", keyParts[2]); + aggregatedRow.put("namespace", keyParts[3]); + aggregatedRow.put("workload", keyParts[4]); + + for (String key : headerRow) { + if (columnsToIgnore.contains(key)) continue; // Skip ignored columns + + if (key.endsWith("avg")) { + double avg = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? 0.0 : Double.parseDouble(value); + }).average().orElse(0); + aggregatedRow.put(key, String.valueOf(avg)); + } else if (key.endsWith("min")) { + double min = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? Double.POSITIVE_INFINITY : Double.parseDouble(value); + }).min().orElse(Double.POSITIVE_INFINITY); + aggregatedRow.put(key, String.valueOf(min)); + } else if (key.endsWith("max")) { + double max = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? Double.NEGATIVE_INFINITY : Double.parseDouble(value); + }).max().orElse(Double.NEGATIVE_INFINITY); + aggregatedRow.put(key, String.valueOf(max)); + } else if (key.endsWith("sum")) { + double sum = workloadRows.stream().mapToDouble(r -> { + String value = r.get(key); + return (null == value || value.isEmpty() || value.equals("null")) ? 0.0 : Double.parseDouble(value); + }).sum(); + aggregatedRow.put(key, String.valueOf(sum)); + } else { + // For columns which cannot be aggregated, just use the value from the first row + aggregatedRow.put(key, workloadRows.get(0).get(key)); + } + } + + aggData.add(aggregatedRow); + } + for (Map row : aggData) { + // Remove keys from the row that are in the columnsToIgnore set + for (String column : columnsToIgnore) { + row.remove(column); + } + // Replace "Infinity" and "-Infinity" with empty strings in the row + for (String key : row.keySet()) { + String value = row.get(key); + if (String.valueOf(Double.POSITIVE_INFINITY).equals(value) || String.valueOf(Double.NEGATIVE_INFINITY).equals(value)) { + row.put(key, ""); // Set to empty string + } else if ((key.contains("avg") || key.contains("sum")) && value.equals("0.0")) { + row.put(key, ""); + } + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + // Method to read CSV file and return List of Maps + public static List> readCsv(String filePath) { + List> records = new ArrayList<>(); + List headers = new ArrayList<>(); + + try (BufferedReader br = Files.newBufferedReader(Paths.get(filePath))) { + String line; + boolean isHeader = true; + while ((line = br.readLine()) != null) { + String[] values = line.split(","); + if (isHeader) { + headers.addAll(Arrays.asList(values)); + isHeader = false; + } else { + Map rowMap = new HashMap<>(); + for (int i = 0; i < headers.size(); i++) { + if (i < values.length) { + rowMap.put(headers.get(i), values[i]); + } else { + rowMap.put(headers.get(i), null); // Handle missing columns + } + } + records.add(rowMap); + } + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + return records; + } + + // Write List of Maps to a CSV file + public static void writeCsv(String filePath, List> data) { + if (data.isEmpty()) return; + + try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(filePath))) { + // Write header + writer.write(String.join(",", data.get(0).keySet())); + writer.newLine(); + + // Write each row + for (Map row : data) { + writer.write(String.join(",", row.values())); + writer.newLine(); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + // TODO: remove below functions after code reviews + // Print CSV file contents + public static void printCsv(String filePath) { + LOGGER.info("Contents of {}", filePath + ":"); + try (BufferedReader br = Files.newBufferedReader(Paths.get(filePath))) { + String line; + while ((line = br.readLine()) != null) { + System.out.println(line); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + // Write the final aggregated results to the output CSV file + public static void writeFinalResults(String headerRow, List> aggData, + String outputFile, Set columnsToIgnore) { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) { + // Step 1: Write the header + List filteredHeader = new ArrayList<>(); + List headerColumns = List.of(headerRow.split(",")); + for (String column : headerColumns) { + if (!columnsToIgnore.contains(column)) { + filteredHeader.add(column); // Add only non-ignored columns to the filtered header + } + } + + // Step 2: Write the filtered header to the file + writer.write(String.join(",", filteredHeader)); + writer.newLine(); + + // Step 3: Write each row's data for only non-ignored columns + for (Map row : aggData) { + List filteredValues = new ArrayList<>(); + for (String column : filteredHeader) { + filteredValues.add(row.getOrDefault(column, "")); // Add only non-ignored column values + } + writer.write(String.join(",", filteredValues)); + writer.newLine(); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + } }