Skip to content

Commit

Permalink
refactor: only show direct downstream when deleting jobs (#246)
Browse files Browse the repository at this point in the history
* refactor: only show direct downstream when deleting jobs

* refactor: add 2 case unit test

* refactor: use list of list of downstreams

* refactor: skip lint
  • Loading branch information
ahmadnaufal committed Jul 3, 2024
1 parent 0e5c6d7 commit 7c993e7
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 24 deletions.
79 changes: 55 additions & 24 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,12 +845,20 @@ func (j *JobService) RefreshResourceDownstream(ctx context.Context, resourceURNs
return me.ToErr()
}

func validateDeleteJob(jobTenant tenant.Tenant, downstreams []*job.Downstream, toDeleteMap map[job.FullName]*job.Spec, jobToDelete *job.Spec, logWriter writer.LogWriter, me *errors.MultiError) bool {
notDeleted, safeToDelete := isJobSafeToDelete(toDeleteMap, job.DownstreamList(downstreams).GetDownstreamFullNames())
func validateDeleteJob(jobTenant tenant.Tenant, downstreamsPerLevel [][]*job.Downstream, toDeleteMap map[job.FullName]*job.Spec, jobToDelete *job.Spec, logWriter writer.LogWriter, me *errors.MultiError) bool {
notDeleted, safeToDelete := isJobSafeToDelete(toDeleteMap, downstreamsPerLevel)

if !safeToDelete {
// only show direct downstreams that is not deleted in the error.
// if all direct downstreams are to be deleted, show all of them
directDownstreams := notDeleted[0]
if len(directDownstreams) == 0 {
directDownstreams = downstreamsPerLevel[0]
}

// TODO: refactor to put the log writer outside
errorMsg := fmt.Sprintf("deletion of job %s will fail. job is being used by %s", jobToDelete.Name().String(), job.FullNames(notDeleted).String())
jobFullNames := job.DownstreamList(directDownstreams).GetDownstreamFullNames()
errorMsg := fmt.Sprintf("deletion of job %s will fail. job is being used by %s", jobToDelete.Name().String(), jobFullNames.String())
logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), errorMsg))
me.Append(errors.NewError(errors.ErrFailedPrecond, job.EntityJob, errorMsg))
return false
Expand All @@ -859,39 +867,54 @@ func validateDeleteJob(jobTenant tenant.Tenant, downstreams []*job.Downstream, t
return true
}

func isJobSafeToDelete(toDeleteMap map[job.FullName]*job.Spec, downstreamFullNames []job.FullName) ([]job.FullName, bool) {
notDeleted := []job.FullName{}
for _, downstreamFullName := range downstreamFullNames {
if _, ok := toDeleteMap[downstreamFullName]; !ok {
notDeleted = append(notDeleted, downstreamFullName)
func isJobSafeToDelete(toDeleteMap map[job.FullName]*job.Spec, downstreamsPerLevel [][]*job.Downstream) ([][]*job.Downstream, bool) {
notDeleted := make([][]*job.Downstream, len(downstreamsPerLevel))
isSafeToDelete := true

for i, downstreams := range downstreamsPerLevel {
for _, downstream := range downstreams {
downstreamFullName := downstream.FullName()
if _, ok := toDeleteMap[downstreamFullName]; !ok {
notDeleted[i] = append(notDeleted[i], downstream)
isSafeToDelete = false
}
}
}

return notDeleted, len(notDeleted) == 0
return notDeleted, isSafeToDelete
}

func (j *JobService) getAllDownstreams(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, visited map[job.FullName]bool) ([]*job.Downstream, error) {
func (j *JobService) getAllDownstreams(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, visited map[job.FullName]bool, level int) ([][]*job.Downstream, error) {
currentJobFullName := job.FullNameFrom(projectName, jobName)
downstreams := []*job.Downstream{}
visited[currentJobFullName] = true
downstreamsPerLevel := make([][]*job.Downstream, level+1)

childJobs, err := j.downstreamRepo.GetDownstreamByJobName(ctx, projectName, jobName)
if err != nil {
j.logger.Error("error getting downstream jobs for job [%s]: %s", jobName, err)
return nil, err
}
if len(childJobs) > 0 {
downstreamsPerLevel[level] = append(downstreamsPerLevel[level], childJobs...)
}

for _, childJob := range childJobs {
downstreams = append(downstreams, childJob)
if visited[childJob.FullName()] {
continue
}
childDownstreams, err := j.getAllDownstreams(ctx, childJob.ProjectName(), childJob.Name(), visited)
childDownstreamsPerLevel, err := j.getAllDownstreams(ctx, childJob.ProjectName(), childJob.Name(), visited, level+1)
if err != nil {
j.logger.Error("error getting all downstreams for job [%s]: %s", childJob.Name(), err)
return nil, err
}
downstreams = append(downstreams, childDownstreams...)
for i, lr := range childDownstreamsPerLevel {
if len(downstreamsPerLevel) <= i {
downstreamsPerLevel = append(downstreamsPerLevel, []*job.Downstream{}) //nolint:makezero
}
downstreamsPerLevel[i] = append(downstreamsPerLevel[i], lr...)
}
}
return downstreams, nil
return downstreamsPerLevel, nil
}

func (*JobService) getIdentifierToJobsMap(jobsToValidateMap map[job.Name]*job.WithUpstream) map[string][]*job.WithUpstream {
Expand Down Expand Up @@ -999,37 +1022,45 @@ func (j *JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, to
for _, spec := range toDelete {
// TODO: reuse Delete method and pass forceFlag as false
fullName := job.FullNameFrom(jobTenant.ProjectName(), spec.Name())
downstreams, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), map[job.FullName]bool{})
downstreamsPerLevel, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), map[job.FullName]bool{}, 0)
if err != nil {
j.logger.Error("error getting downstreams for job [%s]: %s", spec.Name(), err)
logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] pre-delete check for job %s failed: %s", jobTenant.NamespaceName().String(), spec.Name().String(), err.Error()))
me.Append(err)
continue
}

isSafeToDelete := validateDeleteJob(jobTenant, downstreams, toDeleteMap, spec, logWriter, me)
isSafeToDelete := validateDeleteJob(jobTenant, downstreamsPerLevel, toDeleteMap, spec, logWriter, me)
if !isSafeToDelete {
j.logger.Warn("job [%s] is not safe to be deleted", spec.Name())
continue
}

logWriter.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] deleting job %s", jobTenant.NamespaceName().String(), spec.Name().String()))

// flatten downstreams per level into a single list,
// with direct downstreams will be put first & the leaf downstreams at the end
downstreams := []*job.Downstream{}
for _, dss := range downstreamsPerLevel {
downstreams = append(downstreams, dss...)
}

isDeletionFail := false
for i := len(downstreams) - 1; i >= 0 && !isDeletionFail; i-- {
if alreadyDeleted[downstreams[i].FullName()] {
downstream := downstreams[i]
if alreadyDeleted[downstream.FullName()] {
continue
}
if err = j.jobRepo.Delete(ctx, downstreams[i].ProjectName(), downstreams[i].Name(), false); err != nil {
j.logger.Error("error deleting [%s] as downstream of [%s]", downstreams[i].Name(), spec.Name())
logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] deleting job %s failed: %s", downstreams[i].NamespaceName().String(), downstreams[i].Name().String(), err.Error()))
if err = j.jobRepo.Delete(ctx, downstream.ProjectName(), downstream.Name(), false); err != nil {
j.logger.Error("error deleting [%s] as downstream of [%s]", downstream.Name(), spec.Name())
logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] deleting job %s failed: %s", downstream.NamespaceName().String(), downstream.Name().String(), err.Error()))
me.Append(err)
isDeletionFail = true
} else {
alreadyDeleted[downstreams[i].FullName()] = true
alreadyDeleted[downstream.FullName()] = true
j.raiseDeleteEvent(jobTenant, spec.Name())
raiseJobEventMetric(jobTenant, job.MetricJobEventStateDeleted, 1)
deletedJobNames = append(deletedJobNames, downstreams[i].Name())
deletedJobNames = append(deletedJobNames, downstream.Name())
}
}

Expand Down Expand Up @@ -1487,7 +1518,7 @@ func (j *JobService) validateOneJobForDeletion(
jobTenant tenant.Tenant, spec *job.Spec,
specByFullName map[job.FullName]*job.Spec,
) []dto.ValidateResult {
downstreams, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), make(map[job.FullName]bool))
downstreams, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), make(map[job.FullName]bool), 0)
if err != nil {
result := dto.ValidateResult{
Stage: dto.StageDeletionValidation,
Expand Down
128 changes: 128 additions & 0 deletions core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3843,6 +3843,134 @@ func TestJobService(t *testing.T) {
assert.EqualValues(t, expectedResult["job2"], actualResult["job2"])
assert.NoError(t, actualError)
})

t.Run("show only job's direct dependencies when the job has multiple-level dependencies", func(t *testing.T) {
// testcase for case A -> B -> C, and A & B are the jobs to be deleted
// when showing direct downstream,
tenantDetailsGetter := new(TenantDetailsGetter)
defer tenantDetailsGetter.AssertExpectations(t)

jobRepo := new(JobRepository)
defer jobRepo.AssertExpectations(t)

downstreamRepo := new(DownstreamRepository)
defer downstreamRepo.AssertExpectations(t)

jobRunInputCompiler := NewJobRunInputCompiler(t)
resourceExistenceChecker := NewResourceExistenceChecker(t)

jobService := service.NewJobService(jobRepo, nil, downstreamRepo, nil, nil, tenantDetailsGetter, nil, log, nil, compiler.NewEngine(), jobRunInputCompiler, resourceExistenceChecker)

jobSpec1, err := job.NewSpecBuilder(1, "job1", "optimus@goto", jobSchedule, jobWindow, jobTask).Build()
assert.NoError(t, err)
jobSpec2, err := job.NewSpecBuilder(1, "job2", "optimus@goto", jobSchedule, jobWindow, jobTask).Build()
assert.NoError(t, err)
jobSpec3, err := job.NewSpecBuilder(1, "job3", "optimus@goto", jobSchedule, jobWindow, jobTask).Build()
assert.NoError(t, err)

job1 := job.NewJob(sampleTenant, jobSpec1, resource.ZeroURN(), nil, false)
job1Downstream := job.NewDownstream(jobSpec2.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName)
job2Downstream := job.NewDownstream(jobSpec3.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job1")).Return(job1, nil)

tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil)

downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec1.Name()).Return([]*job.Downstream{job1Downstream}, nil)
downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec2.Name()).Return([]*job.Downstream{job2Downstream}, nil)
downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec3.Name()).Return([]*job.Downstream{}, nil)

request := dto.ValidateRequest{
Tenant: sampleTenant,
JobSpecs: nil,
JobNames: []string{"job1"},
DeletionMode: true,
}

expectedResult := map[job.Name][]dto.ValidateResult{
"job1": {
{
Stage: "validation for deletion",
Messages: []string{"job is not safe for deletion", "validating job for deletion errors:\n failed precondition for entity job: deletion of job job1 will fail. job is being used by test-proj/job2"},
Success: false,
},
},
}

actualResult, actualError := jobService.Validate(ctx, request)

assert.EqualValues(t, expectedResult["job1"], actualResult["job1"])
assert.NoError(t, actualError)
})

t.Run("show each deleted jobs' direct downstream if 2 jobs to be deleted has a common remaining dependency", func(t *testing.T) {
// testcase for case A -> B -> C, and A & B are the jobs to be deleted
// when showing direct downstream,
tenantDetailsGetter := new(TenantDetailsGetter)
defer tenantDetailsGetter.AssertExpectations(t)

jobRepo := new(JobRepository)
defer jobRepo.AssertExpectations(t)

downstreamRepo := new(DownstreamRepository)
defer downstreamRepo.AssertExpectations(t)

jobRunInputCompiler := NewJobRunInputCompiler(t)
resourceExistenceChecker := NewResourceExistenceChecker(t)

jobService := service.NewJobService(jobRepo, nil, downstreamRepo, nil, nil, tenantDetailsGetter, nil, log, nil, compiler.NewEngine(), jobRunInputCompiler, resourceExistenceChecker)

jobSpec1, err := job.NewSpecBuilder(1, "job1", "optimus@goto", jobSchedule, jobWindow, jobTask).Build()
assert.NoError(t, err)
jobSpec2, err := job.NewSpecBuilder(1, "job2", "optimus@goto", jobSchedule, jobWindow, jobTask).Build()
assert.NoError(t, err)
jobSpec3, err := job.NewSpecBuilder(1, "job3", "optimus@goto", jobSchedule, jobWindow, jobTask).Build()
assert.NoError(t, err)

job1 := job.NewJob(sampleTenant, jobSpec1, resource.ZeroURN(), nil, false)
job2 := job.NewJob(sampleTenant, jobSpec2, resource.ZeroURN(), nil, false)
job1Downstream := job.NewDownstream(jobSpec2.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName)
job2Downstream := job.NewDownstream(jobSpec3.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName)

jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job1")).Return(job1, nil)
jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job2")).Return(job2, nil)

tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil)

downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec1.Name()).Return([]*job.Downstream{job1Downstream}, nil)
downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec2.Name()).Return([]*job.Downstream{job2Downstream}, nil)
downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec3.Name()).Return([]*job.Downstream{}, nil)

request := dto.ValidateRequest{
Tenant: sampleTenant,
JobSpecs: nil,
JobNames: []string{"job1", "job2"},
DeletionMode: true,
}

expectedResult := map[job.Name][]dto.ValidateResult{
"job1": {
{
Stage: "validation for deletion",
Messages: []string{"job is not safe for deletion", "validating job for deletion errors:\n failed precondition for entity job: deletion of job job1 will fail. job is being used by test-proj/job2"},
Success: false,
},
},
"job2": {
{
Stage: "validation for deletion",
Messages: []string{"job is not safe for deletion", "validating job for deletion errors:\n failed precondition for entity job: deletion of job job2 will fail. job is being used by test-proj/job3"},
Success: false,
},
},
}

actualResult, actualError := jobService.Validate(ctx, request)

assert.EqualValues(t, expectedResult["job1"], actualResult["job1"])
assert.EqualValues(t, expectedResult["job2"], actualResult["job2"])
assert.NoError(t, actualError)
})
})

t.Run("non-deletion mode validation", func(t *testing.T) {
Expand Down

0 comments on commit 7c993e7

Please sign in to comment.