Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1 - BulkJobsave - table sql #1453

Open
wants to merge 4 commits into
base: mvp_demo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions design/KruizeDatabaseDesign.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The Kruize Autotune project has the following entities:
2. kruize_results
3. kruize_recommendations
4. kruize_performance_profiles
5. kruize_jobs

## **kruize_experiments**

Expand Down Expand Up @@ -819,3 +820,76 @@ curl --location --request POST 'http://127.0.0.1:8080/createPerformanceProfile'
```
insert into kruize_performance_profiles;
```

## **kruize_bulkjobs**

---

This table stores job-level data, including information such as job status, start and end times, notification details, experiments details total and processed counts.

```sql
CREATE TABLE kruize_bulkjobs (
job_id UUID NOT NULL,
end_time TIMESTAMP(6),
start_time TIMESTAMP(6),
notifications JSONB,
experiments JSONB,
processed_count INTEGER,
status VARCHAR(255),
total_count INTEGER,
webhook VARCHAR(255),
PRIMARY KEY (job_id)
);
```
Sample data
```json
{
"status": "COMPLETED",
"total_experiments": 686,
"processed_experiments": 686,
"notifications": null,
"experiments": {
"prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28902795(job)|collect-profiles": {
"notification": null,
"recommendations": {
"status": "PROCESSED",
"notifications": null
}
},
"prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28908435(job)|collect-profiles": {
"notification": null,
"recommendations": {
"status": "PROCESSED",
"notifications": null
}
},
"prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28904820(job)|collect-profiles": {
"notification": null,
"recommendations": {
"status": "PROCESSED",
"notifications": null
}
}},
"webhook": null,
"job_id": "3d14daf3-0f27-4848-8f5e-d9e890c5730e",
"job_start_time": "2024-12-19T06:28:11.536Z",
"job_end_time": "2024-12-19T06:30:27.764Z"
}
```
When handling an "experiments" column with a large JSON field being updated by multiple threads, the primary considerations are ensuring concurrency, minimizing contention, and optimizing performance. This can be achieved by:

Optimizing Updates:
Partial Updates:
Update only the specific fields within the JSON, rather than replacing the entire document. The jsonb_set() function can be used for partial updates.
Batch Updates:
Group multiple updates into a single transaction to reduce overhead and minimize contention.

Note: This approach is particularly relevant to PostgreSQL databases.

**Example:**
Let's say we want to update a part of the experiments field, for example, changing the value of the recommendations.status field of a specific experiment.
```sql
UPDATE kruize_bulkjobs
SET experiments = jsonb_set(experiments, '{prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28902795(job)|collect-profiles,recommendations,status}', '"NEW_STATUS"')
WHERE job_id = '3d14daf3-0f27-4848-8f5e-d9e890c5730e';
```
5 changes: 5 additions & 0 deletions migrations/kruize_local_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ alter table kruize_lm_experiments add column metadata_id bigint references krui
alter table if exists kruize_lm_experiments add constraint UK_lm_experiment_name unique (experiment_name);
create table IF NOT EXISTS kruize_metric_profiles (api_version varchar(255), kind varchar(255), metadata jsonb, name varchar(255) not null, k8s_type varchar(255), profile_version float(53) not null, slo jsonb, primary key (name));
create table IF NOT EXISTS kruize_lm_recommendations (interval_end_time timestamp(6) not null, experiment_name varchar(255) not null, cluster_name varchar(255), extended_data jsonb, version varchar(255),experiment_type varchar(255), primary key (experiment_name, interval_end_time)) PARTITION BY RANGE (interval_end_time);
create table IF NOT EXISTS kruize_bulkjobs (job_id uuid not null, end_time timestamp(6), start_time timestamp(6), notifications jsonb,experiments jsonb, processed_count integer, status varchar(255), total_count integer, webhook varchar(255), primary key (job_id));




Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.Executors;

import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.*;
import static com.autotune.operator.KruizeDeploymentInfo.cacheJobInMemory;
import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*;

/**
Expand Down Expand Up @@ -153,7 +154,8 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
// Generate a unique jobID
String jobID = UUID.randomUUID().toString();
BulkJobStatus jobStatus = new BulkJobStatus(jobID, IN_PROGRESS, Instant.now());
jobStatusMap.put(jobID, jobStatus);
if(cacheJobInMemory)
jobStatusMap.put(jobID, jobStatus);
// Submit the job to be processed asynchronously
executorService.submit(new BulkJobManager(jobID, jobStatus, payload));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import com.autotune.database.table.*;
import com.autotune.database.table.lm.BulkJob;
import com.autotune.database.table.lm.KruizeLMExperimentEntry;
import com.autotune.database.table.lm.KruizeLMRecommendationEntry;
import com.autotune.operator.KruizeDeploymentInfo;
Expand Down Expand Up @@ -65,6 +66,7 @@ public static void buildSessionFactory() {
configuration.addAnnotatedClass(KruizeDSMetadataEntry.class);
configuration.addAnnotatedClass(KruizeMetricProfileEntry.class);
configuration.addAnnotatedClass(KruizeAuthenticationEntry.class);
configuration.addAnnotatedClass(BulkJob.class);
}
LOGGER.info("DB is trying to connect to {}", connectionURL);
sfTemp = configuration.buildSessionFactory();
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/autotune/database/table/lm/BulkJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.autotune.database.table.lm;

import jakarta.persistence.*;

import java.sql.Timestamp;
import java.util.UUID;


@Entity
@Table(name = "kruize_bulkjobs")
public class BulkJob {
@Id
@Column(name = "job_id")
private UUID jobId;
private String status;
@Column(name = "total_count")
private int totalExperiments;
@Column(name = "processed_count")
private int processedExperiments;
@Column(name = "start_time")
private Timestamp jobStartTime;
@Column(name = "end_time")
private Timestamp jobEndTime;
private String webhook;

@Column(columnDefinition = "jsonb")
private String notifications; // Stored as JSON string

@Column(columnDefinition = "jsonb")
private String experiments; // JSONB field for experiments data

// Getters and Setters
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class KruizeDeploymentInfo {
public static String database_admin_username;
public static String database_admin_password;
public static String database_ssl_mode;

public static boolean cacheJobInMemory = true;
public static String cloudwatch_logs_access_key_id;
public static String cloudwatch_logs_secret_access_key;
public static String cloudwatch_logs_log_group;
Expand Down