From 7a55f30426282e389d0f623d37487a14b4f5d36d Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Mon, 7 Oct 2024 19:34:00 +0530 Subject: [PATCH 1/5] introducing limit and chunks to support scalabity feature Signed-off-by: msvinaykumar --- .../serviceObjects/BulkJobStatus.java | 28 +++++---- .../analyzer/services/BulkService.java | 7 ++- .../analyzer/workerimpl/BulkJobManager.java | 63 +++++++++++++++---- .../operator/KruizeDeploymentInfo.java | 4 +- .../com/autotune/utils/KruizeConstants.java | 7 +-- 5 files changed, 75 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index 17b0d787c..205fb541d 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -21,6 +21,7 @@ import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.Map; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.JOB_ID; @@ -32,17 +33,20 @@ public class BulkJobStatus { private String jobID; private String status; private int progress; - private Data data; + // Mapping each experiment group (like "1-10", "11-20") to its corresponding data + @JsonProperty("data") + private Map batchData; + @JsonProperty("start_time") private String startTime; // Change to String to store formatted time @JsonProperty("end_time") private String endTime; // Change to String to store formatted time - public BulkJobStatus(String jobID, String status, int progress, Data data, Instant startTime) { + public BulkJobStatus(String jobID, String status, int progress, Map data, Instant startTime) { this.jobID = jobID; this.status = status; this.progress = progress; - this.data = data; + this.batchData = data; setStartTime(startTime); } @@ -66,14 +70,6 @@ public void setProgress(int progress) { this.progress = progress; } - public Data getData() { - return data; - } - - public void setData(Data data) { - this.data = data; - } - public String getStartTime() { return startTime; } @@ -99,6 +95,14 @@ private String formatInstantAsUTCString(Instant instant) { return formatter.format(instant); } + public Map getBatchData() { + return batchData; + } + + public void setBatchData(Map batchData) { + this.batchData = batchData; + } + // Inner class for the data field public static class Data { private Experiments experiments; @@ -281,6 +285,4 @@ public int completionPercentage() { return (int) ((completed.size() * 100.0) / totalTasks); } } - - } diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index 40325bcce..aa0cbdcb0 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -32,12 +32,15 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static com.autotune.operator.KruizeDeploymentInfo.BULK_API_CHUNK_SIZE; + import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.CHARACTER_ENCODING; import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.JSON_CONTENT_TYPE; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; @@ -119,7 +122,9 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) new ArrayList<>() )) ); - jobStatusMap.put(jobID, new BulkJobStatus(jobID, IN_PROGRESS, 0, data, Instant.now())); + Map batchData = new HashMap<>(); + batchData.put(String.format("0-%s", BULK_API_CHUNK_SIZE), data); + jobStatusMap.put(jobID, new BulkJobStatus(jobID, IN_PROGRESS, 0, batchData, Instant.now())); // Submit the job to be processed asynchronously executorService.submit(new BulkJobManager(jobID, jobStatusMap, payload)); diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index c2e1d2d8c..2a4d11b86 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -27,26 +27,19 @@ import com.autotune.common.k8sObjects.TrialSettings; import com.autotune.common.utils.CommonUtils; import com.autotune.database.service.ExperimentDBService; -import com.autotune.operator.KruizeDeploymentInfo; import com.autotune.utils.KruizeConstants; import com.autotune.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.ProtocolException; -import java.net.URL; import java.sql.Timestamp; -import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; + +import static com.autotune.operator.KruizeDeploymentInfo.BULK_API_CHUNK_SIZE; import static com.autotune.operator.KruizeDeploymentInfo.bulk_thread_pool_size; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; @@ -84,6 +77,7 @@ public class BulkJobManager implements Runnable { private Map jobStatusMap; private BulkInput bulkInput; + public BulkJobManager(String jobID, Map jobStatusMap, BulkInput payload) { this.jobID = jobID; this.jobStatusMap = jobStatusMap; @@ -145,6 +139,7 @@ public void run() { metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, uniqueKey, 0, 0, 0); } List recommendationsRequiredExperiments = new CopyOnWriteArrayList<>(); + List newExperiments = new CopyOnWriteArrayList<>(); if (null == metadataInfo) { jobStatusMap.get(jobID).setStatus(COMPLETED); } else { @@ -194,9 +189,11 @@ public void run() { try { ValidationOutputData output = new ExperimentDBService().addExperimentToDB(createExperimentAPIObject); if (output.isSuccess()) { + /*jobStatusMap.get(jobID).getData().getExperiments().setNewExperiments( jobStatusMap.get(jobID).getData().getExperiments().setNewExperiments( appendExperiments(jobStatusMap.get(jobID).getData().getExperiments().getNewExperiments(), experiment_name) - ); + );*/ + newExperiments.add(experiment_name); } recommendationsRequiredExperiments.add(experiment_name); } catch (Exception e) { @@ -211,10 +208,13 @@ public void run() { } } jobStatusMap.get(jobID).setStatus(IN_PROGRESS); - jobStatusMap.get(jobID).getData().getRecommendations().getData().setInqueue(recommendationsRequiredExperiments); - jobStatusMap.get(jobID).getData().getRecommendations().setTotalCount(recommendationsRequiredExperiments.size()); + //jobStatusMap.get(jobID).getData().getRecommendations().getData().setInqueue(recommendationsRequiredExperiments); + //jobStatusMap.get(jobID).getData().getRecommendations().setTotalCount(recommendationsRequiredExperiments.size()); + chunkAndStore(recommendationsRequiredExperiments, BULK_API_CHUNK_SIZE, jobStatusMap.get(jobID)); } + /* ExecutorService executor = Executors.newFixedThreadPool(3); + ExecutorService executor = Executors.newFixedThreadPool(3); ExecutorService executor = Executors.newFixedThreadPool(bulk_thread_pool_size); for (String name : recommendationsRequiredExperiments) { executor.submit(() -> { @@ -263,11 +263,48 @@ public void run() { // Close the connection connection.disconnect(); }); - } + }*/ } catch (Exception e) { LOGGER.error(e.getMessage()); e.printStackTrace(); jobStatusMap.get(jobID).setStatus("FAILED"); } } + + private void chunkAndStore(List recommendationsRequiredExperiments, int chunkSize, BulkJobStatus bulkJobStatus) { + int totalExperiments = recommendationsRequiredExperiments.size(); + Map batchData = bulkJobStatus.getBatchData(); + int chunkCount = 0; + // Process each chunk + for (int i = 0; i < totalExperiments; i += chunkSize) { + chunkCount++; + + // Define the chunk start and end indices + int start = i + 1; + int end = Math.min(i + chunkSize, totalExperiments); + + // Generate the key in the format "start-end" + String key = start + "-" + end; + + // Get the sublist (chunk) for the current range + List currentChunk = recommendationsRequiredExperiments.subList(i, end); + + if (batchData.containsKey(key)) { + batchData.get(key).getRecommendations().getData().setInqueue(currentChunk); + } else { + BulkJobStatus.Data data = new BulkJobStatus.Data( + new BulkJobStatus.Experiments(new ArrayList<>(), new ArrayList<>()), + new BulkJobStatus.Recommendations(0, 0, new BulkJobStatus.RecommendationData( + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>(), + new ArrayList<>() + )) + ); + data.getRecommendations().getData().setInqueue(currentChunk); + batchData.put(key, data); + } + + } + } } diff --git a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java index 5ce3e4ee5..8d203fc5c 100644 --- a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java +++ b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java @@ -80,8 +80,8 @@ public class KruizeDeploymentInfo { public static Boolean local = false; public static Boolean log_http_req_resp = false; public static String recommendations_url; - public static Integer bulk_thread_pool_size = 3; - + public static int BULK_API_LIMIT = 1000; + public static int BULK_API_CHUNK_SIZE = 10; public static int generate_recommendations_date_range_limit_in_days = 15; public static Integer delete_partition_threshold_in_days = DELETE_PARTITION_THRESHOLD_IN_DAYS; private static Hashtable tunableLayerPair; diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 154d8c000..92932dafc 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -169,7 +169,6 @@ public static final class JSONKeys { public static final String POD_METRICS = "pod_metrics"; public static final String CONTAINER_METRICS = "container_metrics"; public static final String METRICS = "metrics"; - public static final String METRIC = "metric"; public static final String CONFIG = "config"; public static final String CURRENT = "current"; public static final String NAME = "name"; @@ -265,10 +264,6 @@ public static final class JSONKeys { public static final String PLOTS_DATAPOINTS = "datapoints"; public static final String PLOTS_DATA = "plots_data"; public static final String CONFIDENCE_LEVEL = "confidence_level"; - public static final String HOSTNAME = "Hostname"; - public static final String UUID = "UUID"; - public static final String DEVICE = "device"; - public static final String MODEL_NAME = "modelName"; private JSONKeys() { } @@ -677,6 +672,8 @@ public static final class KRUIZE_CONFIG_ENV_NAME { public static final String LOCAL = "local"; public static final String LOG_HTTP_REQ_RESP = "logAllHttpReqAndResp"; public static final String RECOMMENDATIONS_URL = "recommendationsURL"; + public static final String BULK_API_LIMIT = "bulkapilimit"; + public static final String BULK_API_CHUNK_SIZE = "bulkapichunksize"; public static final String BULK_THREAD_POOL_SIZE = "bulkThreadPoolSize"; } From 4925036662a94df40de9ec7a47157c084ab5d95f Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Wed, 9 Oct 2024 23:59:58 +0530 Subject: [PATCH 2/5] incorporated review comments Signed-off-by: msvinaykumar --- .../serviceObjects/BulkJobStatus.java | 175 +++++---- .../analyzer/services/BulkService.java | 30 +- .../analyzer/workerimpl/BulkJobManager.java | 362 +++++++++--------- .../database/dao/ExperimentDAOImpl.java | 18 +- .../operator/KruizeDeploymentInfo.java | 3 +- .../com/autotune/utils/KruizeConstants.java | 5 + 6 files changed, 301 insertions(+), 292 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index 205fb541d..aea5a9559 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -15,38 +15,38 @@ *******************************************************************************/ package com.autotune.analyzer.serviceObjects; +import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonProperty; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Map; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.JOB_ID; /** * Bulk API Response payload Object. */ +@JsonFilter("jobFilter") public class BulkJobStatus { @JsonProperty(JOB_ID) private String jobID; private String status; - private int progress; - // Mapping each experiment group (like "1-10", "11-20") to its corresponding data - @JsonProperty("data") - private Map batchData; - - @JsonProperty("start_time") + private int total_experiments; + private int processed_experiments; + private Data data; + @JsonProperty("job_start_time") private String startTime; // Change to String to store formatted time - @JsonProperty("end_time") + @JsonProperty("job_end_time") private String endTime; // Change to String to store formatted time - public BulkJobStatus(String jobID, String status, int progress, Map data, Instant startTime) { + public BulkJobStatus(String jobID, String status, Data data, Instant startTime) { this.jobID = jobID; this.status = status; - this.progress = progress; - this.batchData = data; + this.data = data; setStartTime(startTime); } @@ -62,14 +62,6 @@ public void setStatus(String status) { this.status = status; } - public int getProgress() { - return progress; - } - - public void setProgress(int progress) { - this.progress = progress; - } - public String getStartTime() { return startTime; } @@ -78,6 +70,10 @@ public void setStartTime(Instant startTime) { this.startTime = formatInstantAsUTCString(startTime); } + public void setStartTime(String startTime) { + this.startTime = startTime; + } + public String getEndTime() { return endTime; } @@ -86,6 +82,34 @@ public void setEndTime(Instant endTime) { this.endTime = formatInstantAsUTCString(endTime); } + public void setEndTime(String endTime) { + this.endTime = endTime; + } + + public int getTotal_experiments() { + return total_experiments; + } + + public void setTotal_experiments(int total_experiments) { + this.total_experiments = total_experiments; + } + + public int getProcessed_experiments() { + return processed_experiments; + } + + public void setProcessed_experiments(int processed_experiments) { + this.processed_experiments = processed_experiments; + } + + public Data getData() { + return data; + } + + public void setData(Data data) { + this.data = data; + } + // Utility function to format Instant into the required UTC format private String formatInstantAsUTCString(Instant instant) { DateTimeFormatter formatter = DateTimeFormatter @@ -95,12 +119,13 @@ private String formatInstantAsUTCString(Instant instant) { return formatter.format(instant); } - public Map getBatchData() { - return batchData; - } - - public void setBatchData(Map batchData) { - this.batchData = batchData; + @Override + public BulkJobStatus clone() { + try { + return (BulkJobStatus) super.clone(); + } catch (CloneNotSupportedException e) { + throw new AssertionError(); + } } // Inner class for the data field @@ -163,34 +188,12 @@ public void setUpdatedExperiments(List updatedExperiments) { // Inner class for recommendations public static class Recommendations { - @JsonProperty("count") - private int totalCount; - @JsonProperty("completed") - private int completedCount; private RecommendationData data; - public Recommendations(int totalCount, int completedCount, RecommendationData data) { - this.totalCount = totalCount; - this.completedCount = completedCount; + public Recommendations(RecommendationData data) { this.data = data; } - public int getTotalCount() { - return totalCount; - } - - public void setTotalCount(int totalCount) { - this.totalCount = totalCount; - } - - public int getCompletedCount() { - return this.data.getCompleted().size(); - } - - public void setCompletedCount(int completedCount) { - this.completedCount = completedCount; - } - public RecommendationData getData() { return data; } @@ -202,74 +205,74 @@ public void setData(RecommendationData data) { // Inner class for recommendation data public static class RecommendationData { - private List completed; - private List progress; - private List inqueue; - private List failed; - - public RecommendationData(List completed, List progress, List inqueue, List failed) { - this.completed = completed; - this.progress = progress; - this.inqueue = inqueue; + private List processed = Collections.synchronizedList(new ArrayList<>()); + private List processing = Collections.synchronizedList(new ArrayList<>()); + private List unprocessed = Collections.synchronizedList(new ArrayList<>()); + private List failed = Collections.synchronizedList(new ArrayList<>()); + + public RecommendationData(List processed, List processing, List unprocessed, List failed) { + this.processed = processed; + this.processing = processing; + this.unprocessed = unprocessed; this.failed = failed; } - public List getCompleted() { - return completed; + public List getProcessed() { + return processed; } - public void setCompleted(List completed) { - this.completed = completed; + public synchronized void setProcessed(List processed) { + this.processed = processed; } - public List getProgress() { - return progress; + public List getProcessing() { + return processing; } - public void setProgress(List progress) { - this.progress = progress; + public synchronized void setProcessing(List processing) { + this.processing = processing; } - public List getInqueue() { - return inqueue; + public List getUnprocessed() { + return unprocessed; } - public void setInqueue(List inqueue) { - this.inqueue = inqueue; + public synchronized void setUnprocessed(List unprocessed) { + this.unprocessed = unprocessed; } public List getFailed() { return failed; } - public void setFailed(List failed) { + public synchronized void setFailed(List failed) { this.failed = failed; } // Move elements from inqueue to progress - public void moveToProgress(String element) { - if (inqueue.contains(element)) { - inqueue.remove(element); - if (!progress.contains(element)) { - progress.add(element); + public synchronized void moveToProgress(String element) { + if (unprocessed.contains(element)) { + unprocessed.remove(element); + if (!processing.contains(element)) { + processing.add(element); } } } // Move elements from progress to completed - public void moveToCompleted(String element) { - if (progress.contains(element)) { - progress.remove(element); - if (!completed.contains(element)) { - completed.add(element); + public synchronized void moveToCompleted(String element) { + if (processing.contains(element)) { + processing.remove(element); + if (!processed.contains(element)) { + processed.add(element); } } } // Move elements from progress to failed - public void moveToFailed(String element) { - if (progress.contains(element)) { - progress.remove(element); + public synchronized void moveToFailed(String element) { + if (processing.contains(element)) { + processing.remove(element); if (!failed.contains(element)) { failed.add(element); } @@ -278,11 +281,13 @@ public void moveToFailed(String element) { // Calculate the percentage of completion public int completionPercentage() { - int totalTasks = completed.size() + progress.size() + inqueue.size() + failed.size(); + int totalTasks = processed.size() + processing.size() + unprocessed.size() + failed.size(); if (totalTasks == 0) { return (int) 0.0; } - return (int) ((completed.size() * 100.0) / totalTasks); + return (int) ((processed.size() * 100.0) / totalTasks); } + + } } diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index aa0cbdcb0..1f7e3debf 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -19,6 +19,8 @@ import com.autotune.analyzer.serviceObjects.BulkJobStatus; import com.autotune.analyzer.workerimpl.BulkJobManager; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; +import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,17 +34,13 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static com.autotune.operator.KruizeDeploymentInfo.BULK_API_CHUNK_SIZE; - -import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.CHARACTER_ENCODING; -import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.JSON_CONTENT_TYPE; +import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.*; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; /** @@ -69,11 +67,15 @@ public void init(ServletConfig config) throws ServletException { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { String jobID = req.getParameter(JOB_ID); - BulkJobStatus jobStatus = jobStatusMap.get(jobID); + String verboseParam = req.getParameter(VERBOSE); + // If the parameter is not provided (null), default it to false + boolean verbose = verboseParam != null && Boolean.parseBoolean(verboseParam); + BulkJobStatus jobDetails = jobStatusMap.get(jobID); resp.setContentType(JSON_CONTENT_TYPE); resp.setCharacterEncoding(CHARACTER_ENCODING); + SimpleFilterProvider filters = new SimpleFilterProvider(); - if (jobStatus == null) { + if (jobDetails == null) { sendErrorResponse( resp, null, @@ -85,7 +87,13 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se resp.setStatus(HttpServletResponse.SC_OK); // Return the JSON representation of the JobStatus object ObjectMapper objectMapper = new ObjectMapper(); - String jsonResponse = objectMapper.writeValueAsString(jobStatus); + if (!verbose) { + filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAllExcept("data")); + } else { + filters.addFilter("jobFilter", SimpleBeanPropertyFilter.serializeAll()); + } + objectMapper.setFilterProvider(filters); + String jsonResponse = objectMapper.writeValueAsString(jobDetails); resp.getWriter().write(jsonResponse); } catch (Exception e) { e.printStackTrace(); @@ -115,16 +123,14 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) String jobID = UUID.randomUUID().toString(); BulkJobStatus.Data data = new BulkJobStatus.Data( new BulkJobStatus.Experiments(new ArrayList<>(), new ArrayList<>()), - new BulkJobStatus.Recommendations(0, 0, new BulkJobStatus.RecommendationData( + new BulkJobStatus.Recommendations(new BulkJobStatus.RecommendationData( new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>() )) ); - Map batchData = new HashMap<>(); - batchData.put(String.format("0-%s", BULK_API_CHUNK_SIZE), data); - jobStatusMap.put(jobID, new BulkJobStatus(jobID, IN_PROGRESS, 0, batchData, Instant.now())); + jobStatusMap.put(jobID, new BulkJobStatus(jobID, IN_PROGRESS, data, Instant.now())); // Submit the job to be processed asynchronously executorService.submit(new BulkJobManager(jobID, jobStatusMap, payload)); diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index 2a4d11b86..80b3565c7 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -27,22 +27,28 @@ import com.autotune.common.k8sObjects.TrialSettings; import com.autotune.common.utils.CommonUtils; import com.autotune.database.service.ExperimentDBService; +import com.autotune.operator.KruizeDeploymentInfo; import com.autotune.utils.KruizeConstants; import com.autotune.utils.Utils; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; - -import static com.autotune.operator.KruizeDeploymentInfo.BULK_API_CHUNK_SIZE; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static com.autotune.operator.KruizeDeploymentInfo.bulk_thread_pool_size; -import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; +import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.COMPLETED; +import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.CREATE_EXPERIMENT_CONFIG_BEAN; /** @@ -92,178 +98,89 @@ public static List appendExperiments(List allExperiments, String @Override public void run() { try { - - String uniqueKey = null; - // Process labels in the 'include' section - if (this.bulkInput.getFilter() != null && this.bulkInput.getFilter().getInclude() != null) { - // Initialize StringBuilder for uniqueKey - StringBuilder includeLabelsBuilder = new StringBuilder(); - Map includeLabels = this.bulkInput.getFilter().getInclude().getLabels(); - if (includeLabels != null && !includeLabels.isEmpty()) { - includeLabels.forEach((key, value) -> - includeLabelsBuilder.append(key).append("=").append("\"" + value + "\"").append(",") - ); - // Remove trailing comma - if (includeLabelsBuilder.length() > 0) { - includeLabelsBuilder.setLength(includeLabelsBuilder.length() - 1); - } - LOGGER.debug("Include Labels: " + includeLabelsBuilder.toString()); - uniqueKey = includeLabelsBuilder.toString(); - } - } + BulkJobStatus jobData = jobStatusMap.get(jobID); + String uniqueKey = getLabels(this.bulkInput.getFilter()); if (null == this.bulkInput.getDatasource()) { this.bulkInput.setDatasource(CREATE_EXPERIMENT_CONFIG_BEAN.getDatasourceName()); } DataSourceMetadataInfo metadataInfo = null; DataSourceManager dataSourceManager = new DataSourceManager(); DataSourceInfo datasource = CommonUtils.getDataSourceInfo(this.bulkInput.getDatasource()); - - - if (null != this.bulkInput.getTime_range() && this.bulkInput.getTime_range().getStart() != null && this.bulkInput.getTime_range().getEnd() != null) { - // Extract interval start and end times - String intervalEndTimeStr = this.bulkInput.getTime_range().getStart(); - String intervalStartTimeStr = this.bulkInput.getTime_range().getEnd(); - long interval_end_time_epoc = 0; - long interval_start_time_epoc = 0; - LocalDateTime localDateTime = LocalDateTime.parse(intervalEndTimeStr, DateTimeFormatter.ofPattern(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT)); - interval_end_time_epoc = localDateTime.toEpochSecond(ZoneOffset.UTC); - Timestamp interval_end_time = Timestamp.from(localDateTime.toInstant(ZoneOffset.UTC)); - localDateTime = LocalDateTime.parse(intervalStartTimeStr, DateTimeFormatter.ofPattern(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT)); - interval_start_time_epoc = localDateTime.toEpochSecond(ZoneOffset.UTC); - Timestamp interval_start_time = Timestamp.from(localDateTime.toInstant(ZoneOffset.UTC)); - int steps = CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDuration() * KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE; // todo fetch experiment recommendations setting measurement - //Get metaData - metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, uniqueKey, interval_start_time_epoc, interval_end_time_epoc, steps); - } else { - //Get metaData + JSONObject daterange = processDateRange(this.bulkInput.getTime_range()); + if (null != daterange) + metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, uniqueKey, (Long) daterange.get("start_time"), (Long) daterange.get("end_time"), (Integer) daterange.get("steps")); + else { metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, uniqueKey, 0, 0, 0); } - List recommendationsRequiredExperiments = new CopyOnWriteArrayList<>(); - List newExperiments = new CopyOnWriteArrayList<>(); if (null == metadataInfo) { - jobStatusMap.get(jobID).setStatus(COMPLETED); + jobData.setStatus(COMPLETED); } else { - Collection dataSourceCollection = metadataInfo.getDataSourceHashMap().values(); - for (DataSource ds : dataSourceCollection) { - HashMap clusterHashMap = ds.getDataSourceClusterHashMap(); - for (DataSourceCluster dsc : clusterHashMap.values()) { - HashMap namespaceHashMap = dsc.getDataSourceNamespaceHashMap(); - for (DataSourceNamespace namespace : namespaceHashMap.values()) { - HashMap dataSourceWorkloadHashMap = namespace.getDataSourceWorkloadHashMap(); - if (dataSourceWorkloadHashMap != null) { - for (DataSourceWorkload dsw : dataSourceWorkloadHashMap.values()) { - HashMap dataSourceContainerHashMap = dsw.getDataSourceContainerHashMap(); - if (dataSourceContainerHashMap != null) { - for (DataSourceContainer dc : dataSourceContainerHashMap.values()) { - CreateExperimentAPIObject createExperimentAPIObject = new CreateExperimentAPIObject(); - createExperimentAPIObject.setMode(CREATE_EXPERIMENT_CONFIG_BEAN.getMode()); - createExperimentAPIObject.setTargetCluster(CREATE_EXPERIMENT_CONFIG_BEAN.getTarget()); - createExperimentAPIObject.setApiVersion(CREATE_EXPERIMENT_CONFIG_BEAN.getVersion()); - String experiment_name = this.bulkInput.getDatasource() + "|" + dsc.getDataSourceClusterName() + "|" + namespace.getDataSourceNamespaceName() - + "|" + dsw.getDataSourceWorkloadName() + "(" + dsw.getDataSourceWorkloadType() + ")" + "|" + dc.getDataSourceContainerName(); - createExperimentAPIObject.setExperimentName(experiment_name); - createExperimentAPIObject.setDatasource(this.bulkInput.getDatasource()); - createExperimentAPIObject.setClusterName(dsc.getDataSourceClusterName()); - createExperimentAPIObject.setPerformanceProfile(CREATE_EXPERIMENT_CONFIG_BEAN.getPerformanceProfile()); - List kubernetesAPIObjectList = new ArrayList<>(); - KubernetesAPIObject kubernetesAPIObject = new KubernetesAPIObject(); - ContainerAPIObject cao = new ContainerAPIObject(dc.getDataSourceContainerName(), - dc.getDataSourceContainerImageName(), null, null); - kubernetesAPIObject.setContainerAPIObjects(Arrays.asList(cao)); - kubernetesAPIObject.setName(dsw.getDataSourceWorkloadName()); - kubernetesAPIObject.setType(dsw.getDataSourceWorkloadType()); - kubernetesAPIObject.setNamespace(namespace.getDataSourceNamespaceName()); - kubernetesAPIObjectList.add(kubernetesAPIObject); - createExperimentAPIObject.setKubernetesObjects(kubernetesAPIObjectList); - RecommendationSettings rs = new RecommendationSettings(); - rs.setThreshold(CREATE_EXPERIMENT_CONFIG_BEAN.getThreshold()); - createExperimentAPIObject.setRecommendationSettings(rs); - TrialSettings trialSettings = new TrialSettings(); - trialSettings.setMeasurement_durationMinutes(CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDurationStr()); - createExperimentAPIObject.setTrialSettings(trialSettings); - List kruizeExpList = new ArrayList<>(); + Map createExperimentAPIObjectMap = getExperimentMap(metadataInfo); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type + jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); + jobData.setProcessed_experiments(0); + ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); + ExecutorService generateExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); + for (CreateExperimentAPIObject apiObject : createExperimentAPIObjectMap.values()) { + createExecutor.submit(() -> { + String experiment_name = apiObject.getExperimentName(); + BulkJobStatus.Experiments newExperiments = jobData.getData().getExperiments(); + BulkJobStatus.RecommendationData recommendationData = jobData.getData().getRecommendations().getData(); + try { + ValidationOutputData output = new ExperimentDBService().addExperimentToDB(apiObject); + if (output.isSuccess()) { + jobData.getData().getExperiments().setNewExperiments( + appendExperiments(newExperiments.getNewExperiments(), experiment_name) + ); + } + generateExecutor.submit(() -> { + + jobData.getData().getRecommendations().getData().setUnprocessed( + appendExperiments(recommendationData.getUnprocessed(), experiment_name) + ); + + URL url = null; + HttpURLConnection connection = null; + int statusCode = 0; + try { + url = new URL(String.format(KruizeDeploymentInfo.recommendations_url, experiment_name)); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + + recommendationData.moveToProgress(experiment_name); + + statusCode = connection.getResponseCode(); + } catch (IOException e) { + LOGGER.error(e.getMessage()); + + recommendationData.moveToFailed(experiment_name); + + throw new RuntimeException(e); + } finally { + if (null != connection) connection.disconnect(); + } + if (statusCode == HttpURLConnection.HTTP_CREATED) { - createExperimentAPIObject.setExperiment_id(Utils.generateID(createExperimentAPIObject.toString())); - createExperimentAPIObject.setStatus(AnalyzerConstants.ExperimentStatus.IN_PROGRESS); + recommendationData.moveToCompleted(experiment_name); + jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); - try { - ValidationOutputData output = new ExperimentDBService().addExperimentToDB(createExperimentAPIObject); - if (output.isSuccess()) { - /*jobStatusMap.get(jobID).getData().getExperiments().setNewExperiments( - jobStatusMap.get(jobID).getData().getExperiments().setNewExperiments( - appendExperiments(jobStatusMap.get(jobID).getData().getExperiments().getNewExperiments(), experiment_name) - );*/ - newExperiments.add(experiment_name); - } - recommendationsRequiredExperiments.add(experiment_name); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } + if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { + jobData.setStatus(COMPLETED); + jobStatusMap.get(jobID).setEndTime(Instant.now()); } + } else { + + recommendationData.moveToFailed(experiment_name); + } - } + }); + } catch (Exception e) { + e.printStackTrace(); + recommendationData.moveToFailed(experiment_name); } - } + }); } - jobStatusMap.get(jobID).setStatus(IN_PROGRESS); - //jobStatusMap.get(jobID).getData().getRecommendations().getData().setInqueue(recommendationsRequiredExperiments); - //jobStatusMap.get(jobID).getData().getRecommendations().setTotalCount(recommendationsRequiredExperiments.size()); - - chunkAndStore(recommendationsRequiredExperiments, BULK_API_CHUNK_SIZE, jobStatusMap.get(jobID)); } - /* ExecutorService executor = Executors.newFixedThreadPool(3); - ExecutorService executor = Executors.newFixedThreadPool(3); - ExecutorService executor = Executors.newFixedThreadPool(bulk_thread_pool_size); - for (String name : recommendationsRequiredExperiments) { - executor.submit(() -> { - URL url = null; - try { - url = new URL(String.format(KruizeDeploymentInfo.recommendations_url, name)); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } - HttpURLConnection connection = null; - try { - connection = (HttpURLConnection) url.openConnection(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - throw new RuntimeException(e); - } - try { - connection.setRequestMethod("POST"); - } catch (ProtocolException e) { - LOGGER.error(e.getMessage()); - throw new RuntimeException(e); - } - // Get the response code from /helloworld - int statusCode = 0; - try { - jobStatusMap.get(jobID).getData().getRecommendations().getData().moveToProgress(name); - statusCode = connection.getResponseCode(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); - throw new RuntimeException(e); - } - - if (statusCode == HttpURLConnection.HTTP_CREATED) { - jobStatusMap.get(jobID).getData().getRecommendations().getData().moveToCompleted(name); - } else { - jobStatusMap.get(jobID).getData().getRecommendations().getData().moveToFailed(name); - } - jobStatusMap.get(jobID).setProgress(jobStatusMap.get(jobID).getData().getRecommendations().getData().completionPercentage()); - if (jobStatusMap.get(jobID).getProgress() == 100) { - jobStatusMap.get(jobID).setStatus(COMPLETED); // Mark the job as completed - jobStatusMap.get(jobID).setEndTime(Instant.now()); - jobStatusMap.get(jobID).getData().getRecommendations().setCompletedCount( - jobStatusMap.get(jobID).getData().getRecommendations().getData().getCompleted().size() - ); - } - // Close the connection - connection.disconnect(); - }); - }*/ } catch (Exception e) { LOGGER.error(e.getMessage()); e.printStackTrace(); @@ -271,40 +188,111 @@ public void run() { } } - private void chunkAndStore(List recommendationsRequiredExperiments, int chunkSize, BulkJobStatus bulkJobStatus) { - int totalExperiments = recommendationsRequiredExperiments.size(); - Map batchData = bulkJobStatus.getBatchData(); - int chunkCount = 0; - // Process each chunk - for (int i = 0; i < totalExperiments; i += chunkSize) { - chunkCount++; - - // Define the chunk start and end indices - int start = i + 1; - int end = Math.min(i + chunkSize, totalExperiments); - // Generate the key in the format "start-end" - String key = start + "-" + end; + Map getExperimentMap(DataSourceMetadataInfo metadataInfo) { + Map createExperimentAPIObjectMap = new HashMap<>(); + Collection dataSourceCollection = metadataInfo.getDataSourceHashMap().values(); + for (DataSource ds : dataSourceCollection) { + HashMap clusterHashMap = ds.getDataSourceClusterHashMap(); + for (DataSourceCluster dsc : clusterHashMap.values()) { + HashMap namespaceHashMap = dsc.getDataSourceNamespaceHashMap(); + for (DataSourceNamespace namespace : namespaceHashMap.values()) { + HashMap dataSourceWorkloadHashMap = namespace.getDataSourceWorkloadHashMap(); + if (dataSourceWorkloadHashMap != null) { + for (DataSourceWorkload dsw : dataSourceWorkloadHashMap.values()) { + HashMap dataSourceContainerHashMap = dsw.getDataSourceContainerHashMap(); + if (dataSourceContainerHashMap != null) { + for (DataSourceContainer dc : dataSourceContainerHashMap.values()) { + CreateExperimentAPIObject createExperimentAPIObject = new CreateExperimentAPIObject(); + createExperimentAPIObject.setMode(CREATE_EXPERIMENT_CONFIG_BEAN.getMode()); + createExperimentAPIObject.setTargetCluster(CREATE_EXPERIMENT_CONFIG_BEAN.getTarget()); + createExperimentAPIObject.setApiVersion(CREATE_EXPERIMENT_CONFIG_BEAN.getVersion()); + String experiment_name = this.bulkInput.getDatasource() + "|" + dsc.getDataSourceClusterName() + "|" + namespace.getDataSourceNamespaceName() + + "|" + dsw.getDataSourceWorkloadName() + "(" + dsw.getDataSourceWorkloadType() + ")" + "|" + dc.getDataSourceContainerName(); + createExperimentAPIObject.setExperimentName(experiment_name); + createExperimentAPIObject.setDatasource(this.bulkInput.getDatasource()); + createExperimentAPIObject.setClusterName(dsc.getDataSourceClusterName()); + createExperimentAPIObject.setPerformanceProfile(CREATE_EXPERIMENT_CONFIG_BEAN.getPerformanceProfile()); + List kubernetesAPIObjectList = new ArrayList<>(); + KubernetesAPIObject kubernetesAPIObject = new KubernetesAPIObject(); + ContainerAPIObject cao = new ContainerAPIObject(dc.getDataSourceContainerName(), + dc.getDataSourceContainerImageName(), null, null); + kubernetesAPIObject.setContainerAPIObjects(Arrays.asList(cao)); + kubernetesAPIObject.setName(dsw.getDataSourceWorkloadName()); + kubernetesAPIObject.setType(dsw.getDataSourceWorkloadType()); + kubernetesAPIObject.setNamespace(namespace.getDataSourceNamespaceName()); + kubernetesAPIObjectList.add(kubernetesAPIObject); + createExperimentAPIObject.setKubernetesObjects(kubernetesAPIObjectList); + RecommendationSettings rs = new RecommendationSettings(); + rs.setThreshold(CREATE_EXPERIMENT_CONFIG_BEAN.getThreshold()); + createExperimentAPIObject.setRecommendationSettings(rs); + TrialSettings trialSettings = new TrialSettings(); + trialSettings.setMeasurement_durationMinutes(CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDurationStr()); + createExperimentAPIObject.setTrialSettings(trialSettings); + List kruizeExpList = new ArrayList<>(); - // Get the sublist (chunk) for the current range - List currentChunk = recommendationsRequiredExperiments.subList(i, end); + createExperimentAPIObject.setExperiment_id(Utils.generateID(createExperimentAPIObject.toString())); + createExperimentAPIObject.setStatus(AnalyzerConstants.ExperimentStatus.IN_PROGRESS); + createExperimentAPIObject.setExperimentType(AnalyzerConstants.ExperimentTypes.CONTAINER_EXPERIMENT); + createExperimentAPIObjectMap.put(experiment_name, createExperimentAPIObject); + } + } + } + } + } + } + } + return createExperimentAPIObjectMap; + } - if (batchData.containsKey(key)) { - batchData.get(key).getRecommendations().getData().setInqueue(currentChunk); - } else { - BulkJobStatus.Data data = new BulkJobStatus.Data( - new BulkJobStatus.Experiments(new ArrayList<>(), new ArrayList<>()), - new BulkJobStatus.Recommendations(0, 0, new BulkJobStatus.RecommendationData( - new ArrayList<>(), - new ArrayList<>(), - new ArrayList<>(), - new ArrayList<>() - )) - ); - data.getRecommendations().getData().setInqueue(currentChunk); - batchData.put(key, data); + private String getLabels(BulkInput.FilterWrapper filter) { + String uniqueKey = null; + try { + // Process labels in the 'include' section + if (filter != null && filter.getInclude() != null) { + // Initialize StringBuilder for uniqueKey + StringBuilder includeLabelsBuilder = new StringBuilder(); + Map includeLabels = filter.getInclude().getLabels(); + if (includeLabels != null && !includeLabels.isEmpty()) { + includeLabels.forEach((key, value) -> + includeLabelsBuilder.append(key).append("=").append("\"" + value + "\"").append(",") + ); + // Remove trailing comma + if (includeLabelsBuilder.length() > 0) { + includeLabelsBuilder.setLength(includeLabelsBuilder.length() - 1); + } + LOGGER.debug("Include Labels: " + includeLabelsBuilder.toString()); + uniqueKey = includeLabelsBuilder.toString(); + } } + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error(e.getMessage()); + } + return uniqueKey; + } + private JSONObject processDateRange(BulkInput.TimeRange timeRange) { + JSONObject dateRange = null; + if (null != timeRange && timeRange.getStart() != null && timeRange.getEnd() != null) { + String intervalEndTimeStr = timeRange.getStart(); + String intervalStartTimeStr = timeRange.getEnd(); + long interval_end_time_epoc = 0; + long interval_start_time_epoc = 0; + LocalDateTime localDateTime = LocalDateTime.parse(intervalEndTimeStr, DateTimeFormatter.ofPattern(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT)); + interval_end_time_epoc = localDateTime.toEpochSecond(ZoneOffset.UTC); + Timestamp interval_end_time = Timestamp.from(localDateTime.toInstant(ZoneOffset.UTC)); + localDateTime = LocalDateTime.parse(intervalStartTimeStr, DateTimeFormatter.ofPattern(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT)); + interval_start_time_epoc = localDateTime.toEpochSecond(ZoneOffset.UTC); + Timestamp interval_start_time = Timestamp.from(localDateTime.toInstant(ZoneOffset.UTC)); + int steps = CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDuration() * KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE; // todo fetch experiment recommendations setting measurement + dateRange = new JSONObject(); + dateRange.put("start_time", interval_start_time_epoc); + dateRange.put("end_time", interval_end_time_epoc); + dateRange.put("steps", steps); } + return dateRange; } + + } diff --git a/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java b/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java index 9a6174e8f..7b72baf77 100644 --- a/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java +++ b/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java @@ -344,7 +344,9 @@ public ValidationOutputData addRecommendationToDB(KruizeRecommendationEntry reco tx = session.beginTransaction(); session.persist(recommendationEntry); tx.commit(); - updateExperimentTypeInKruizeRecommendationEntry(recommendationEntry); + if (null == recommendationEntry.getExperimentType() || recommendationEntry.getExperimentType().isEmpty()) { + updateExperimentTypeInKruizeRecommendationEntry(recommendationEntry); + } validationOutputData.setSuccess(true); statusValue = "success"; } else { @@ -1091,12 +1093,14 @@ private void getExperimentTypeInKruizeExperimentEntry(List experimentType = query.getResultList(); - if (null != experimentType && !experimentType.isEmpty()) { - entry.setExperimentType(experimentType.get(0)); + if (null == entry.getExperimentType() || entry.getExperimentType().isEmpty()) { + String sql = DBConstants.SQLQUERY.SELECT_EXPERIMENT_EXP_TYPE; + Query query = session.createNativeQuery(sql); + query.setParameter("experiment_id", entry.getExperiment_id()); + List experimentType = query.getResultList(); + if (null != experimentType && !experimentType.isEmpty()) { + entry.setExperimentType(experimentType.get(0)); + } } } } diff --git a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java index 8d203fc5c..214fab595 100644 --- a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java +++ b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java @@ -81,7 +81,8 @@ public class KruizeDeploymentInfo { public static Boolean log_http_req_resp = false; public static String recommendations_url; public static int BULK_API_LIMIT = 1000; - public static int BULK_API_CHUNK_SIZE = 10; + public static int BULK_API_MAX_BATCH_SIZE = 100; + public static Integer bulk_thread_pool_size = 3; public static int generate_recommendations_date_range_limit_in_days = 15; public static Integer delete_partition_threshold_in_days = DELETE_PARTITION_THRESHOLD_IN_DAYS; private static Hashtable tunableLayerPair; diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 92932dafc..047e1bd1a 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -170,6 +170,7 @@ public static final class JSONKeys { public static final String CONTAINER_METRICS = "container_metrics"; public static final String METRICS = "metrics"; public static final String CONFIG = "config"; + public static final String METRIC = "metric"; public static final String CURRENT = "current"; public static final String NAME = "name"; public static final String QUERY = "query"; @@ -264,6 +265,10 @@ public static final class JSONKeys { public static final String PLOTS_DATAPOINTS = "datapoints"; public static final String PLOTS_DATA = "plots_data"; public static final String CONFIDENCE_LEVEL = "confidence_level"; + public static final String HOSTNAME = "Hostname"; + public static final String UUID = "UUID"; + public static final String DEVICE = "device"; + public static final String MODEL_NAME = "modelName"; private JSONKeys() { } From 779f1bd0928dedd1edba1f8dd911488d7ea94b99 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Thu, 10 Oct 2024 00:23:54 +0530 Subject: [PATCH 3/5] incorporated review comments Signed-off-by: msvinaykumar --- .../serviceObjects/BulkJobStatus.java | 14 +-- .../analyzer/workerimpl/BulkJobManager.java | 108 +++++++++--------- .../com/autotune/utils/KruizeConstants.java | 2 + 3 files changed, 65 insertions(+), 59 deletions(-) diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index aea5a9559..d45f37774 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -42,6 +42,7 @@ public class BulkJobStatus { private String startTime; // Change to String to store formatted time @JsonProperty("job_end_time") private String endTime; // Change to String to store formatted time + private String message; public BulkJobStatus(String jobID, String status, Data data, Instant startTime) { this.jobID = jobID; @@ -119,13 +120,12 @@ private String formatInstantAsUTCString(Instant instant) { return formatter.format(instant); } - @Override - public BulkJobStatus clone() { - try { - return (BulkJobStatus) super.clone(); - } catch (CloneNotSupportedException e) { - throw new AssertionError(); - } + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; } // Inner class for the data field diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index 80b3565c7..4d8bfb2dc 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -47,8 +47,7 @@ import java.util.concurrent.Executors; import static com.autotune.operator.KruizeDeploymentInfo.bulk_thread_pool_size; -import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.COMPLETED; -import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.CREATE_EXPERIMENT_CONFIG_BEAN; +import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; /** @@ -118,67 +117,72 @@ public void run() { Map createExperimentAPIObjectMap = getExperimentMap(metadataInfo); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); jobData.setProcessed_experiments(0); - ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); - ExecutorService generateExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); - for (CreateExperimentAPIObject apiObject : createExperimentAPIObjectMap.values()) { - createExecutor.submit(() -> { - String experiment_name = apiObject.getExperimentName(); - BulkJobStatus.Experiments newExperiments = jobData.getData().getExperiments(); - BulkJobStatus.RecommendationData recommendationData = jobData.getData().getRecommendations().getData(); - try { - ValidationOutputData output = new ExperimentDBService().addExperimentToDB(apiObject); - if (output.isSuccess()) { - jobData.getData().getExperiments().setNewExperiments( - appendExperiments(newExperiments.getNewExperiments(), experiment_name) - ); - } - generateExecutor.submit(() -> { + if (jobData.getTotal_experiments() > KruizeDeploymentInfo.BULK_API_LIMIT) { + jobStatusMap.get(jobID).setStatus(FAILED); + jobStatusMap.get(jobID).setMessage(String.format(LIMIT_MESSAGE, KruizeDeploymentInfo.BULK_API_LIMIT)); + } else { + ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); + ExecutorService generateExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); + for (CreateExperimentAPIObject apiObject : createExperimentAPIObjectMap.values()) { + createExecutor.submit(() -> { + String experiment_name = apiObject.getExperimentName(); + BulkJobStatus.Experiments newExperiments = jobData.getData().getExperiments(); + BulkJobStatus.RecommendationData recommendationData = jobData.getData().getRecommendations().getData(); + try { + ValidationOutputData output = new ExperimentDBService().addExperimentToDB(apiObject); + if (output.isSuccess()) { + jobData.getData().getExperiments().setNewExperiments( + appendExperiments(newExperiments.getNewExperiments(), experiment_name) + ); + } + generateExecutor.submit(() -> { - jobData.getData().getRecommendations().getData().setUnprocessed( - appendExperiments(recommendationData.getUnprocessed(), experiment_name) - ); + jobData.getData().getRecommendations().getData().setUnprocessed( + appendExperiments(recommendationData.getUnprocessed(), experiment_name) + ); - URL url = null; - HttpURLConnection connection = null; - int statusCode = 0; - try { - url = new URL(String.format(KruizeDeploymentInfo.recommendations_url, experiment_name)); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); + URL url = null; + HttpURLConnection connection = null; + int statusCode = 0; + try { + url = new URL(String.format(KruizeDeploymentInfo.recommendations_url, experiment_name)); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); - recommendationData.moveToProgress(experiment_name); + recommendationData.moveToProgress(experiment_name); - statusCode = connection.getResponseCode(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); + statusCode = connection.getResponseCode(); + } catch (IOException e) { + LOGGER.error(e.getMessage()); - recommendationData.moveToFailed(experiment_name); + recommendationData.moveToFailed(experiment_name); - throw new RuntimeException(e); - } finally { - if (null != connection) connection.disconnect(); - } - if (statusCode == HttpURLConnection.HTTP_CREATED) { + throw new RuntimeException(e); + } finally { + if (null != connection) connection.disconnect(); + } + if (statusCode == HttpURLConnection.HTTP_CREATED) { - recommendationData.moveToCompleted(experiment_name); - jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); + recommendationData.moveToCompleted(experiment_name); + jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); - if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { - jobData.setStatus(COMPLETED); - jobStatusMap.get(jobID).setEndTime(Instant.now()); - } + if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { + jobData.setStatus(COMPLETED); + jobStatusMap.get(jobID).setEndTime(Instant.now()); + } - } else { + } else { - recommendationData.moveToFailed(experiment_name); + recommendationData.moveToFailed(experiment_name); - } - }); - } catch (Exception e) { - e.printStackTrace(); - recommendationData.moveToFailed(experiment_name); - } - }); + } + }); + } catch (Exception e) { + e.printStackTrace(); + recommendationData.moveToFailed(experiment_name); + } + }); + } } } } catch (Exception e) { diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 047e1bd1a..a2303e88a 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -774,6 +774,8 @@ public static final class KRUIZE_BULK_API { public static final String JOB_NOT_FOUND_MSG = "Job not found"; public static final String IN_PROGRESS = "IN_PROGRESS"; public static final String COMPLETED = "COMPLETED"; + public static final String FAILED = "FAILED"; + public static final String LIMIT_MESSAGE = "The number of experiments exceeds %s"; // TODO : Bulk API Create Experiments defaults public static final CreateExperimentConfigBean CREATE_EXPERIMENT_CONFIG_BEAN; From 5daed6a35349ed1fedd4adce2f1948b3d7e733a5 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Thu, 10 Oct 2024 11:52:33 +0530 Subject: [PATCH 4/5] MD file update Signed-off-by: msvinaykumar --- design/BulkAPI.md | 182 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 140 insertions(+), 42 deletions(-) diff --git a/design/BulkAPI.md b/design/BulkAPI.md index 4faec7a1f..bd7327c89 100644 --- a/design/BulkAPI.md +++ b/design/BulkAPI.md @@ -93,64 +93,162 @@ GET /bulk?job_id=123e4567-e89b-12d3-a456-426614174000 ```json { - "job_id": "123e4567-e89b-12d3-a456-426614174000", - "status": "IN-PROGRESS", - "progress": 30, + "status": "COMPLETED", + "total_experiments": 23, + "processed_experiments": 23, + "job_id": "54905959-77d4-42ba-8e06-90bb97b823b9", + "job_start_time": "2024-10-10T06:07:09.066Z", + "job_end_time": "2024-10-10T06:07:17.471Z" +} +``` + +```bash +GET /bulk?job_id=123e4567-e89b-12d3-a456-426614174000&verbose=true +``` + +**Body (JSON):** +When verbose=true, additional detailed information about the job is provided. + +```json +{ + "status": "IN_PROGRESS", + "total_experiments": 23, + "processed_experiments": 22, "data": { "experiments": { "new": [ - "a", - "b", - "c" + "prometheus-1|default|monitoring|node-exporter(daemonset)|node-exporter", + "prometheus-1|default|cadvisor|cadvisor(daemonset)|cadvisor", + "prometheus-1|default|monitoring|alertmanager-main(statefulset)|config-reloader", + "prometheus-1|default|monitoring|alertmanager-main(statefulset)|alertmanager", + "prometheus-1|default|monitoring|prometheus-operator(deployment)|kube-rbac-proxy", + "prometheus-1|default|kube-system|coredns(deployment)|coredns", + "prometheus-1|default|monitoring|prometheus-k8s(statefulset)|config-reloader", + "prometheus-1|default|monitoring|blackbox-exporter(deployment)|kube-rbac-proxy", + "prometheus-1|default|monitoring|prometheus-operator(deployment)|prometheus-operator", + "prometheus-1|default|monitoring|node-exporter(daemonset)|kube-rbac-proxy", + "prometheus-1|default|monitoring|kube-state-metrics(deployment)|kube-rbac-proxy-self", + "prometheus-1|default|monitoring|kube-state-metrics(deployment)|kube-state-metrics", + "prometheus-1|default|monitoring|kruize(deployment)|kruize", + "prometheus-1|default|monitoring|blackbox-exporter(deployment)|module-configmap-reloader", + "prometheus-1|default|monitoring|prometheus-k8s(statefulset)|prometheus", + "prometheus-1|default|monitoring|kube-state-metrics(deployment)|kube-rbac-proxy-main", + "prometheus-1|default|kube-system|kube-proxy(daemonset)|kube-proxy", + "prometheus-1|default|monitoring|prometheus-adapter(deployment)|prometheus-adapter", + "prometheus-1|default|monitoring|grafana(deployment)|grafana", + "prometheus-1|default|kube-system|kindnet(daemonset)|kindnet-cni", + "prometheus-1|default|monitoring|kruize-db-deployment(deployment)|kruize-db", + "prometheus-1|default|monitoring|blackbox-exporter(deployment)|blackbox-exporter" ], "updated": [], - "failed": [] + "failed": null }, "recommendations": { - "count": 9, - "completed": 3, - "experiments": { - "completed": [ - "exp1", - "exp2", - "exp3" + "data": { + "processed": [ + "prometheus-1|default|monitoring|alertmanager-main(statefulset)|config-reloader", + "prometheus-1|default|monitoring|node-exporter(daemonset)|node-exporter", + "prometheus-1|default|local-path-storage|local-path-provisioner(deployment)|local-path-provisioner", + "prometheus-1|default|monitoring|alertmanager-main(statefulset)|alertmanager", + "prometheus-1|default|monitoring|prometheus-operator(deployment)|kube-rbac-proxy", + "prometheus-1|default|kube-system|coredns(deployment)|coredns", + "prometheus-1|default|monitoring|blackbox-exporter(deployment)|kube-rbac-proxy", + "prometheus-1|default|monitoring|prometheus-k8s(statefulset)|config-reloader", + "prometheus-1|default|monitoring|prometheus-operator(deployment)|prometheus-operator", + "prometheus-1|default|monitoring|node-exporter(daemonset)|kube-rbac-proxy", + "prometheus-1|default|monitoring|kube-state-metrics(deployment)|kube-rbac-proxy-self", + "prometheus-1|default|monitoring|kube-state-metrics(deployment)|kube-state-metrics", + "prometheus-1|default|monitoring|kruize(deployment)|kruize", + "prometheus-1|default|monitoring|blackbox-exporter(deployment)|module-configmap-reloader", + "prometheus-1|default|monitoring|prometheus-k8s(statefulset)|prometheus", + "prometheus-1|default|monitoring|kube-state-metrics(deployment)|kube-rbac-proxy-main", + "prometheus-1|default|kube-system|kube-proxy(daemonset)|kube-proxy", + "prometheus-1|default|monitoring|prometheus-adapter(deployment)|prometheus-adapter", + "prometheus-1|default|monitoring|grafana(deployment)|grafana", + "prometheus-1|default|kube-system|kindnet(daemonset)|kindnet-cni", + "prometheus-1|default|monitoring|kruize-db-deployment(deployment)|kruize-db", + "prometheus-1|default|monitoring|blackbox-exporter(deployment)|blackbox-exporter" ], - "progress": [ - "exp1", - "exp2", - "exp3" + "processing": [ + "prometheus-1|default|cadvisor|cadvisor(daemonset)|cadvisor" ], - "new": [ - "exp1", - "exp2", - "exp3" + "unprocessed": [ ], "failed": [] } } }, - "job_start_time": "2024-09-23T10:58:47.048Z", - "job_end_time": "2024-09-23T11:01:52.205Z" + "job_id": "5798a2df-6c67-467b-a3c2-befe634a0e3a", + "job_start_time": "2024-10-09T18:09:31.549Z", + "job_end_time": null } ``` ### Response Parameters -- **job_id:** Unique identifier for the job. -- **status:** Current status of the job. Possible values: `"IN-PROGRESS"`, `"COMPLETED"`, `"FAILED"`. -- **progress:** Percentage of job completion. -- **data:** Contains detailed information about the experiments and recommendations. - - **experiments:** Tracks the status of experiments. - - **new:** List of newly created experiments. - - **updated:** List of updated experiments. - - **failed:** List of experiments that failed. - - **recommendations:** Provides details on recommendations. - - **count:** Total number of recommendations. - - **completed:** Number of completed recommendations. - - **experiments:** - - **completed:** List of experiments with completed recommendations. - - **progress:** List of experiments in progress. - - **new:** List of new experiments. - - **failed:** List of failed experiments. -- **job_start_time:** Timestamp indicating when the job started. -- **job_end_time:** Timestamp indicating when the job finished. +## API Description: Experiment and Recommendation Processing Status + +This API response describes the status of a job that processes multiple experiments and generates recommendations for +resource optimization in Kubernetes environments. Below is a breakdown of the JSON response: + +### Fields: + +- **status**: + - **Type**: `String` + - **Description**: Current status of the job. Can be "IN_PROGRESS", "COMPLETED", "FAILED", etc. + +- **total_experiments**: + - **Type**: `Integer` + - **Description**: Total number of experiments to be processed in the job. + +- **processed_experiments**: + - **Type**: `Integer` + - **Description**: Number of experiments that have been processed so far. + +- **data**: + - **Type**: `Object` + - **Description**: Contains detailed information about the experiments and recommendations being processed. + + - **experiments**: + - **new**: + - **Type**: `Array of Strings` + - **Description**: List of new experiments that have been identified but not yet processed. + + - **updated**: + - **Type**: `Array of Strings` + - **Description**: List of experiments that were previously processed but have now been updated. + + - **failed**: + - **Type**: `null or Array` + - **Description**: List of experiments that failed during processing. If no failures, the value is `null`. + + - **recommendations**: + - **data**: + - **processed**: + - **Type**: `Array of Strings` + - **Description**: List of experiments for which recommendations have already been processed. + + - **processing**: + - **Type**: `Array of Strings` + - **Description**: List of experiments that are currently being processed for recommendations. + + - **unprocessed**: + - **Type**: `Array of Strings` + - **Description**: List of experiments that have not yet been processed for recommendations. + + - **failed**: + - **Type**: `Array of Strings` + - **Description**: List of experiments for which the recommendation process failed. + +- **job_id**: + - **Type**: `String` + - **Description**: Unique identifier for the job. + +- **job_start_time**: + - **Type**: `String (ISO 8601 format)` + - **Description**: Start timestamp of the job. + +- **job_end_time**: + - **Type**: `String (ISO 8601 format) or null` + - **Description**: End timestamp of the job. If the job is still in progress, this will be `null`. + From 6e9cc0625aaeb8b7456d2c60ab65c8c2d0530e29 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Thu, 10 Oct 2024 17:31:33 +0530 Subject: [PATCH 5/5] incorporated review comments Signed-off-by: msvinaykumar --- .../java/com/autotune/analyzer/workerimpl/BulkJobManager.java | 1 + src/main/java/com/autotune/utils/KruizeConstants.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index 4d8bfb2dc..c827fd289 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -113,6 +113,7 @@ public void run() { } if (null == metadataInfo) { jobData.setStatus(COMPLETED); + jobData.setMessage(NOTHING); } else { Map createExperimentAPIObjectMap = getExperimentMap(metadataInfo); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index a2303e88a..ab3732843 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -775,7 +775,8 @@ public static final class KRUIZE_BULK_API { public static final String IN_PROGRESS = "IN_PROGRESS"; public static final String COMPLETED = "COMPLETED"; public static final String FAILED = "FAILED"; - public static final String LIMIT_MESSAGE = "The number of experiments exceeds %s"; + public static final String LIMIT_MESSAGE = "The number of experiments exceeds %s."; + public static final String NOTHING = "Nothing to do."; // TODO : Bulk API Create Experiments defaults public static final CreateExperimentConfigBean CREATE_EXPERIMENT_CONFIG_BEAN;