Skip to content

Commit

Permalink
Revert "feat: clean query assets before execution (#301)" (#316)
Browse files Browse the repository at this point in the history
This reverts commit 4d19658.
  • Loading branch information
deryrahman authored Jan 3, 2025
1 parent c2e873b commit 757da04
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 229 deletions.
24 changes: 12 additions & 12 deletions core/scheduler/service/deployment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestDeploymentService(t *testing.T) {
defer jobRepo.AssertExpectations(t)

runService := service.NewJobRunService(logger,
jobRepo, nil, nil, nil, nil, nil, nil, nil, nil, nil)
jobRepo, nil, nil, nil, nil, nil, nil, nil, nil)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.NotNil(t, err)
Expand All @@ -98,7 +98,7 @@ func TestDeploymentService(t *testing.T) {
defer priorityResolver.AssertExpectations(t)

runService := service.NewJobRunService(logger,
jobRepo, nil, nil, nil, nil, priorityResolver, nil, nil, nil, nil)
jobRepo, nil, nil, nil, nil, priorityResolver, nil, nil, nil)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.NotNil(t, err)
Expand All @@ -119,7 +119,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.NotNil(t, err)
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.Nil(t, err)
Expand All @@ -172,7 +172,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadToScheduler(ctx, proj1Name)
assert.NotNil(t, err)
Expand All @@ -196,7 +196,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Error(t, err)
Expand All @@ -218,7 +218,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Error(t, err)
Expand All @@ -241,7 +241,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Error(t, err)
Expand All @@ -255,7 +255,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, nil, nil, nil, nil,
mScheduler, nil, nil, nil, nil, nil)
mScheduler, nil, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Error(t, err)
Expand All @@ -279,7 +279,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Nil(t, err)
Expand All @@ -302,7 +302,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil, nil)
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Nil(t, err)
Expand All @@ -316,7 +316,7 @@ func TestDeploymentService(t *testing.T) {
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, nil, nil, nil, nil,
mScheduler, nil, nil, nil, nil, nil)
mScheduler, nil, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Nil(t, err)
Expand Down
27 changes: 2 additions & 25 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ const (
scheduleDelay metricType = "schedule_delay"
)

const (
devEnableCleanUpQuery = "DEV__ENABLE_CLEAN_UP_QUERY"
)

var jobRunEventsMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "operator_stats",
Help: "total job run events received",
Expand Down Expand Up @@ -103,10 +99,6 @@ type ProjectGetter interface {
Get(context.Context, tenant.ProjectName) (*tenant.Project, error)
}

type AssetCleaner interface {
CleanAssets(ctx context.Context, taskName string, compiledAssets map[string]string) (map[string]string, error)
}

type JobRunService struct {
l log.Logger
repo JobRunRepository
Expand All @@ -118,7 +110,6 @@ type JobRunService struct {
priorityResolver PriorityResolver
compiler JobInputCompiler
projectGetter ProjectGetter
assetCleaner AssetCleaner
}

func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, config scheduler.RunConfig) (*scheduler.ExecutorInput, error) {
Expand Down Expand Up @@ -155,20 +146,7 @@ func (s *JobRunService) JobRunInput(ctx context.Context, projectName tenant.Proj
details.Job.Task.Config[k] = v
}

executorInput, err := s.compiler.Compile(ctx, details, config, executedAt)
if err != nil {
return nil, err
}

if enable, ok := details.Job.Task.Config[devEnableCleanUpQuery]; ok && enable == "true" {
cleanedAssets, err := s.assetCleaner.CleanAssets(ctx, details.Job.Task.Name, executorInput.Files)
if err != nil {
return nil, err
}
executorInput.Files = cleanedAssets
}

return executorInput, nil
return s.compiler.Compile(ctx, details, config, executedAt)
}

func (s *JobRunService) GetJobRunsByFilter(ctx context.Context, projectName tenant.ProjectName, jobName scheduler.JobName, filters ...filter.FilterOpt) ([]*scheduler.JobRun, error) {
Expand Down Expand Up @@ -684,7 +662,7 @@ func (s *JobRunService) UpdateJobState(ctx context.Context, event *scheduler.Eve

func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRunRepository, replayRepo JobReplayRepository,
operatorRunRepo OperatorRunRepository, scheduler Scheduler, resolver PriorityResolver, compiler JobInputCompiler, eventHandler EventHandler,
projectGetter ProjectGetter, assetCleaner AssetCleaner,
projectGetter ProjectGetter,
) *JobRunService {
return &JobRunService{
l: logger,
Expand All @@ -697,6 +675,5 @@ func NewJobRunService(logger log.Logger, jobRepo JobRepository, jobRunRepo JobRu
priorityResolver: resolver,
compiler: compiler,
projectGetter: projectGetter,
assetCleaner: assetCleaner,
}
}
Loading

0 comments on commit 757da04

Please sign in to comment.