Skip to content

Commit

Permalink
feat: bulk delete jobs API & modify Optimus Apply to use bulk delete (#…
Browse files Browse the repository at this point in the history
…255)

* feat: add service logic

* feat: integrate bulk delete to client

* feat: update job service logic & handler

* feat: update proton

* feat: fix client

* feat: fix test

* feat: refactor approach

* feat: restructure downstream deletion

* feat: fix after sync with master

* feat: fix tracker already exists

* feat: unit test

* feat: fix variable naming & remove excessive logs

* feat: update logic to only use deletionTrakerMap

* fix: intermittent unit test fix

* fix: handling empty job

* feat: update proton commit
  • Loading branch information
ahmadnaufal authored Jul 25, 2024
1 parent 6fcebc4 commit 06bccc0
Show file tree
Hide file tree
Showing 11 changed files with 1,500 additions and 491 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "01ffd0ca223431ae24a8de44827de0d96afef9b2"
PROTON_COMMIT := "10aa548f6af9b12a1560a9f2f80610a7e46fdf13"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
55 changes: 38 additions & 17 deletions client/cmd/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *applyCommand) RunE(cmd *cobra.Command, _ []string) error {
var (
addJobRequest = []*pb.AddJobSpecificationsRequest{}
updateJobRequest = []*pb.UpdateJobSpecificationsRequest{}
deleteJobRequest = []*pb.DeleteJobSpecificationRequest{}
deleteJobRequest = []*pb.BulkDeleteJobsRequest_JobToDelete{}
migrateJobRequest = []*pb.ChangeJobNamespaceRequest{}
addResourceRequest = []*pb.CreateResourceRequest{}
updateResourceRequest = []*pb.UpdateResourceRequest{}
Expand All @@ -130,7 +130,7 @@ func (c *applyCommand) RunE(cmd *cobra.Command, _ []string) error {
addJobRequest = append(addJobRequest, c.getAddJobRequest(namespace, plans)...)
updateJobRequest = append(updateJobRequest, c.getUpdateJobRequest(namespace, plans)...)
updateJobRequest = append(updateJobRequest, updateFromMigrateJobs...)
deleteJobRequest = append(deleteJobRequest, c.getDeleteJobRequest(namespace, plans)...)
deleteJobRequest = append(deleteJobRequest, c.getBulkDeleteJobsRequest(namespace, plans)...)
migrateJobRequest = append(migrateJobRequest, migrateJobs...)
// resource request preparation
migrateResources, updateFromMigrateResources := c.getMigrateResourceRequest(namespace, plans)
Expand All @@ -149,7 +149,7 @@ func (c *applyCommand) RunE(cmd *cobra.Command, _ []string) error {
migratedJobs := c.executeJobMigrate(ctx, jobClient, migrateJobRequest)
updatedJobs := c.executeJobUpdate(ctx, jobClient, updateJobRequest)
// job deletion < resource deletion
deletedJobs := c.executeJobDelete(ctx, jobClient, deleteJobRequest)
deletedJobs := c.executeJobBulkDelete(ctx, jobClient, &pb.BulkDeleteJobsRequest{ProjectName: plans.ProjectName, Jobs: deleteJobRequest})
deletedResources := c.executeResourceDelete(ctx, resourceClient, deleteResourceRequest)

// update plan file, delete successful operations
Expand Down Expand Up @@ -185,17 +185,41 @@ func (c *applyCommand) printFailed(namespaceName, operation, kind, name, cause s
c.isOperationFail = true
}

func (c *applyCommand) executeJobDelete(ctx context.Context, client pb.JobSpecificationServiceClient, requests []*pb.DeleteJobSpecificationRequest) []string {
func (c *applyCommand) printFailedAll(operation, kind, cause string) {
c.logger.Error("[all] %s: %s %s ❌", operation, kind)
if c.verbose && cause != "" {
c.logger.Error(cause)
}
c.isOperationFail = true
}

func (c *applyCommand) executeJobBulkDelete(ctx context.Context, client pb.JobSpecificationServiceClient, request *pb.BulkDeleteJobsRequest) []string {
if len(request.Jobs) == 0 {
return []string{}
}

response, err := client.BulkDeleteJobs(ctx, request)
if err != nil {
c.printFailedAll("bulk-delete", "job", err.Error())
return nil
}

// if no failure, check the status of each bulk deletion
deletedJobs := []string{}
for _, request := range requests {
_, err := client.DeleteJobSpecification(ctx, request)
if err != nil {
c.printFailed(request.NamespaceName, "delete", "job", request.GetJobName(), err.Error())
for _, jobToDelete := range request.Jobs {
result, found := response.ResultsByJobName[jobToDelete.JobName]
if !found {
continue
}
c.printSuccess(request.NamespaceName, "delete", "job", request.GetJobName())
deletedJobs = append(deletedJobs, request.GetJobName())

if result.GetSuccess() {
c.printSuccess(jobToDelete.NamespaceName, "bulk-delete", "job", jobToDelete.JobName)
deletedJobs = append(deletedJobs, jobToDelete.JobName)
} else {
c.printFailed(jobToDelete.NamespaceName, "bulk-delete", "job", jobToDelete.JobName, result.GetMessage())
}
}

return deletedJobs
}

Expand Down Expand Up @@ -386,18 +410,15 @@ func (c *applyCommand) getUpdateJobRequest(namespace *config.Namespace, plans pl
}
}

func (c *applyCommand) getDeleteJobRequest(namespace *config.Namespace, plans plan.Plan) []*pb.DeleteJobSpecificationRequest {
jobsToBeDeleted := []*pb.DeleteJobSpecificationRequest{}
func (*applyCommand) getBulkDeleteJobsRequest(namespace *config.Namespace, plans plan.Plan) []*pb.BulkDeleteJobsRequest_JobToDelete {
jobsToDelete := []*pb.BulkDeleteJobsRequest_JobToDelete{}
for _, currentPlan := range plans.Job.Delete.GetByNamespace(namespace.Name) {
jobsToBeDeleted = append(jobsToBeDeleted, &pb.DeleteJobSpecificationRequest{
ProjectName: c.config.Project.Name,
jobsToDelete = append(jobsToDelete, &pb.BulkDeleteJobsRequest_JobToDelete{
NamespaceName: namespace.Name,
JobName: currentPlan.Name,
CleanHistory: false,
Force: false,
})
}
return jobsToBeDeleted
return jobsToDelete
}

func (c *applyCommand) getMigrateJobRequest(namespace *config.Namespace, plans plan.Plan) ([]*pb.ChangeJobNamespaceRequest, []*pb.UpdateJobSpecificationsRequest) {
Expand Down
17 changes: 17 additions & 0 deletions core/job/dto/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dto

import (
"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/tenant"
)

type JobToDeleteRequest struct {
Namespace tenant.NamespaceName
JobName job.Name
}

type BulkDeleteTracker struct {
JobName string
Message string
Success bool
}
36 changes: 36 additions & 0 deletions core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type JobModificationService interface {
Delete(ctx context.Context, jobTenant tenant.Tenant, jobName job.Name, cleanFlag, forceFlag bool) (affectedDownstream []job.FullName, err error)
ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec, jobNamesWithInvalidSpec []job.Name, logWriter writer.LogWriter) error
ChangeNamespace(ctx context.Context, jobSourceTenant, jobNewTenant tenant.Tenant, jobName job.Name) error
BulkDeleteJobs(ctx context.Context, projectName tenant.ProjectName, jobsToDelete []*dto.JobToDeleteRequest) (map[string]dto.BulkDeleteTracker, error)
}

type JobQueryService interface {
Expand Down Expand Up @@ -158,6 +159,41 @@ func (jh *JobHandler) DeleteJobSpecification(ctx context.Context, deleteRequest
}, nil
}

func (jh *JobHandler) BulkDeleteJobs(ctx context.Context, bulkDeleteRequest *pb.BulkDeleteJobsRequest) (*pb.BulkDeleteJobsResponse, error) {
jobsToDelete := []*dto.JobToDeleteRequest{}
for _, j := range bulkDeleteRequest.GetJobs() {
jobsToDelete = append(jobsToDelete, &dto.JobToDeleteRequest{
Namespace: tenant.NamespaceName(j.NamespaceName),
JobName: job.Name(j.JobName),
})
}
projectName := tenant.ProjectName(bulkDeleteRequest.GetProjectName())

bulkDeleteTracker, err := jh.jobService.BulkDeleteJobs(ctx, projectName, jobsToDelete)
if err != nil {
errorMsg := "failed to do bulk delete"
jh.l.Error(fmt.Sprintf("%s: %s", errorMsg, err.Error()))
return nil, errors.GRPCErr(err, errorMsg)
}

responseMap := map[string]*pb.BulkDeleteJobsResponse_JobDeletionStatus{}
for _, jobToDelete := range bulkDeleteRequest.GetJobs() {
jobResult, found := bulkDeleteTracker[jobToDelete.GetJobName()]
if !found {
continue
}

responseMap[jobToDelete.GetJobName()] = &pb.BulkDeleteJobsResponse_JobDeletionStatus{
Message: jobResult.Message,
Success: jobResult.Success,
}
}

return &pb.BulkDeleteJobsResponse{
ResultsByJobName: responseMap,
}, nil
}

func (jh *JobHandler) ChangeJobNamespace(ctx context.Context, changeRequest *pb.ChangeJobNamespaceRequest) (*pb.ChangeJobNamespaceResponse, error) {
jobSourceTenant, err := tenant.NewTenant(changeRequest.ProjectName, changeRequest.NamespaceName)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions core/job/handler/v1beta1/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,36 @@ func (_m *JobService) Validate(ctx context.Context, request dto.ValidateRequest)
return r0, r1
}

// BulkDeleteJobs provides a mock function with given fields: ctx, projectName, jobsToDelete
func (_m *JobService) BulkDeleteJobs(ctx context.Context, projectName tenant.ProjectName, jobsToDelete []*dto.JobToDeleteRequest) (map[string]dto.BulkDeleteTracker, error) {
ret := _m.Called(ctx, projectName, jobsToDelete)

if len(ret) == 0 {
panic("no return value specified for BulkDeleteJobs")
}

var r0 map[string]dto.BulkDeleteTracker
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, []*dto.JobToDeleteRequest) (map[string]dto.BulkDeleteTracker, error)); ok {
return rf(ctx, projectName, jobsToDelete)
}
if rf, ok := ret.Get(0).(func(context.Context, tenant.ProjectName, []*dto.JobToDeleteRequest) map[string]dto.BulkDeleteTracker); ok {
r0 = rf(ctx, projectName, jobsToDelete)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]dto.BulkDeleteTracker)
}
}

if rf, ok := ret.Get(1).(func(context.Context, tenant.ProjectName, []*dto.JobToDeleteRequest) error); ok {
r1 = rf(ctx, projectName, jobsToDelete)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// ReplaceAllJobSpecificationsServer is an autogenerated mock type for the ReplaceAllJobSpecificationsServer type
type ReplaceAllJobSpecificationsServer struct {
mock.Mock
Expand Down
Loading

0 comments on commit 06bccc0

Please sign in to comment.