diff --git a/config.sample.yaml b/config.sample.yaml index 640c793e63..b69b3b78a3 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -69,3 +69,12 @@ log: # batch_interval_second: 1 # broker_urls: # - localhost:9092 + +# replay: +# plugin_execution_project_config_names: +# bq2bq: EXECUTION_PROJECT +# bq2pg: BQ_EXECUTION_PROJECT +# pg2bq: BQ_EXECUTION_PROJECT +# bq2api: EXECUTION_PROJECT +# clevertap: EXECUTION_PROJECT +# transporterTask: EXECUTION_PROJECT diff --git a/config/config_server.go b/config/config_server.go index a47e0e9752..c64b921c1f 100644 --- a/config/config_server.go +++ b/config/config_server.go @@ -61,8 +61,9 @@ type PluginConfig struct { } type ReplayConfig struct { - ReplayTimeoutInMinutes int `mapstructure:"replay_timeout_in_minutes" default:"180"` - ExecutionIntervalInSeconds int `mapstructure:"execution_interval_in_seconds" default:"120"` + ReplayTimeoutInMinutes int `mapstructure:"replay_timeout_in_minutes" default:"180"` + ExecutionIntervalInSeconds int `mapstructure:"execution_interval_in_seconds" default:"120"` + PluginExecutionProjectConfigNames map[string]string `mapstructure:"plugin_execution_project_config_names"` } type Publisher struct { diff --git a/core/scheduler/service/replay_service.go b/core/scheduler/service/replay_service.go index ca52baf89e..395e8c566c 100644 --- a/core/scheduler/service/replay_service.go +++ b/core/scheduler/service/replay_service.go @@ -18,6 +18,8 @@ const ( getReplaysDayLimit = 30 // TODO: make it configurable via cli metricJobReplay = "jobrun_replay_requests_total" + + tenantReplayExecutionProjectConfigKey = "REPLAY_EXECUTION_PROJECT" ) type SchedulerRunGetter interface { @@ -34,6 +36,10 @@ type ReplayRepository interface { GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error) } +type TenantGetter interface { + GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error) +} + type ReplayValidator interface { Validate(ctx context.Context, replayRequest *scheduler.Replay, jobCron *cron.ScheduleSpec) error } @@ -50,7 +56,14 @@ type ReplayService struct { validator ReplayValidator executor ReplayExecutor + tenantGetter TenantGetter + logger log.Logger + + // stores mapping of task names (optimus plugin names) to its respective execution project config names. + // this mapping is needed because our bq plugins supporting execution project uses different config names inside the plugins. + // after the config naming is standardized, this map can be omitted + pluginToExecutionProjectKeyMap map[string]string } func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) { @@ -60,6 +73,13 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, return uuid.Nil, err } + newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, tenant, jobName, config) + if err != nil { + r.logger.Error("unable to get namespace details for job %s: %s", jobName.String(), err) + return uuid.Nil, err + } + config.JobConfig = newConfig + replayReq := scheduler.NewReplayRequest(jobName, tenant, config, scheduler.ReplayStateCreated) if err := r.validator.Validate(ctx, replayReq, jobCron); err != nil { r.logger.Error("error validating replay request: %s", err) @@ -84,6 +104,41 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, return replayID, nil } +func (r *ReplayService) injectJobConfigWithTenantConfigs(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (map[string]string, error) { + // copy ReplayConfig to a new map to mutate it + newConfig := map[string]string{} + for cfgKey, cfgVal := range config.JobConfig { + newConfig[cfgKey] = cfgVal + } + + // get tenant (project & namespace) configuration to obtain the execution project specifically for replay. + // note that the current behavior of GetDetails in the implementing struct prioritized namespace config over project config. + tenantWithDetails, err := r.tenantGetter.GetDetails(ctx, tnnt) + if err != nil { + return nil, errors.AddErrContext(err, scheduler.EntityReplay, + fmt.Sprintf("failed to get tenant details for project [%s], namespace [%s]", + tnnt.ProjectName(), tnnt.NamespaceName())) + } + + job, err := r.jobRepo.GetJob(ctx, tnnt.ProjectName(), jobName) + if err != nil { + return nil, errors.AddErrContext(err, scheduler.EntityReplay, + fmt.Sprintf("failed to get job for job name [%s]", jobName)) + } + + tenantConfig := tenantWithDetails.GetConfigs() + + // override the default execution project with the one in tenant config. + // only inject tenant-level config if execution project is not provided in ReplayConfig + overridedConfigKey, isSupported := r.pluginToExecutionProjectKeyMap[job.Task.Name] + tenantExecutionProject := tenantConfig[tenantReplayExecutionProjectConfigKey] + if isSupported && tenantExecutionProject != "" { + newConfig[overridedConfigKey] = tenantExecutionProject + } + + return newConfig, nil +} + func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error) { return r.replayRepo.GetReplaysByProject(ctx, projectName, getReplaysDayLimit) } @@ -130,8 +185,26 @@ func (r *ReplayService) CancelReplay(ctx context.Context, replayWithRun *schedul return r.replayRepo.UpdateReplayStatus(ctx, replayWithRun.Replay.ID(), scheduler.ReplayStateCancelled, cancelMessage) } -func NewReplayService(replayRepo ReplayRepository, jobRepo JobRepository, validator ReplayValidator, worker ReplayExecutor, runGetter SchedulerRunGetter, logger log.Logger) *ReplayService { - return &ReplayService{replayRepo: replayRepo, jobRepo: jobRepo, validator: validator, executor: worker, runGetter: runGetter, logger: logger} +func NewReplayService( + replayRepo ReplayRepository, + jobRepo JobRepository, + tenantGetter TenantGetter, + validator ReplayValidator, + worker ReplayExecutor, + runGetter SchedulerRunGetter, + logger log.Logger, + pluginToExecutionProjectKeyMap map[string]string, +) *ReplayService { + return &ReplayService{ + replayRepo: replayRepo, + jobRepo: jobRepo, + tenantGetter: tenantGetter, + validator: validator, + executor: worker, + runGetter: runGetter, + logger: logger, + pluginToExecutionProjectKeyMap: pluginToExecutionProjectKeyMap, + } } func getJobCron(ctx context.Context, l log.Logger, jobRepo JobRepository, tnnt tenant.Tenant, jobName scheduler.JobName) (*cron.ScheduleSpec, error) { diff --git a/core/scheduler/service/replay_service_test.go b/core/scheduler/service/replay_service_test.go index dac666a5e3..09001c2d10 100644 --- a/core/scheduler/service/replay_service_test.go +++ b/core/scheduler/service/replay_service_test.go @@ -21,7 +21,7 @@ import ( func TestReplayService(t *testing.T) { ctx := context.Background() projName := tenant.ProjectName("proj") - namespaceName := tenant.ProjectName("ns1") + namespaceName := tenant.NamespaceName("ns1") jobName := scheduler.JobName("sample_select") startTimeStr := "2023-01-02T15:00:00Z" startTime, _ := time.Parse(scheduler.ISODateFormat, startTimeStr) @@ -35,6 +35,9 @@ func TestReplayService(t *testing.T) { job := scheduler.Job{ Name: jobName, Tenant: tnnt, + Task: &scheduler.Task{ + Name: "bq2bq", + }, } jobWithDetails := &scheduler.JobWithDetails{ Job: &job, @@ -50,6 +53,22 @@ func TestReplayService(t *testing.T) { jobCron, _ := cron.ParseCronSchedule(jobCronStr) message := "sample message" + namespaceCfg := map[string]string{ + "REPLAY_EXECUTION_PROJECT": "example_project_from_namespace", + } + projectCfg := map[string]string{ + "REPLAY_EXECUTION_PROJECT": "example_project_from_project", + "STORAGE_PATH": "file:///tmp/", + "SCHEDULER_HOST": "http://localhost", + } + namespaceEntity, _ := tenant.NewNamespace(namespaceName.String(), projName, namespaceCfg) + projectEntity, _ := tenant.NewProject(projName.String(), projectCfg) + tenantWithDetails, _ := tenant.NewTenantDetails(projectEntity, namespaceEntity, tenant.PlainTextSecrets{}) + + taskNameToExecutionProjectMap := map[string]string{ + "bq2bq": "EXECUTION_PROJECT", + } + logger := log.NewLogrus() t.Run("CreateReplay", func(t *testing.T) { @@ -66,6 +85,9 @@ func TestReplayService(t *testing.T) { replayWorker := new(ReplayExecutor) defer replayWorker.AssertExpectations(t) + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + scheduledTime1Str := "2023-01-03T12:00:00Z" scheduledTime1, _ := time.Parse(scheduler.ISODateFormat, scheduledTime1Str) scheduledTime2 := scheduledTime1.Add(24 * time.Hour) @@ -76,16 +98,113 @@ func TestReplayService(t *testing.T) { replayReq := scheduler.NewReplayRequest(jobName, tnnt, replayConfig, scheduler.ReplayStateCreated) jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(jobWithDetails, nil) + tenantGetter.On("GetDetails", ctx, tnnt).Return(tenantWithDetails, nil) + jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil) replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil) replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil) replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe() - replayService := service.NewReplayService(replayRepository, jobRepository, replayValidator, replayWorker, nil, logger) + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, replayWorker, nil, logger, taskNameToExecutionProjectMap) result, err := replayService.CreateReplay(ctx, tnnt, jobName, replayConfig) assert.NoError(t, err) assert.Equal(t, replayID, result) }) + t.Run("should use namespace config EXECUTION_PROJECT if not provided in replay config", func(t *testing.T) { + replayRepository := new(ReplayRepository) + defer replayRepository.AssertExpectations(t) + + jobRepository := new(JobRepository) + defer jobRepository.AssertExpectations(t) + + replayValidator := new(ReplayValidator) + defer replayValidator.AssertExpectations(t) + + replayWorker := new(ReplayExecutor) + defer replayWorker.AssertExpectations(t) + + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + + replayConfigEmptyJobConfig := scheduler.NewReplayConfig(startTime, endTime, parallel, map[string]string{}, description) + replayConfigWithNamespaceConfig := scheduler.NewReplayConfig(startTime, endTime, parallel, map[string]string{"EXECUTION_PROJECT": "example_project_from_namespace"}, description) + + scheduledTime1Str := "2023-01-03T12:00:00Z" + scheduledTime1, _ := time.Parse(scheduler.ISODateFormat, scheduledTime1Str) + scheduledTime2 := scheduledTime1.Add(24 * time.Hour) + replayRuns := []*scheduler.JobRunStatus{ + {ScheduledAt: scheduledTime1, State: scheduler.StatePending}, + {ScheduledAt: scheduledTime2, State: scheduler.StatePending}, + } + replayReq := scheduler.NewReplayRequest(jobName, tnnt, replayConfigWithNamespaceConfig, scheduler.ReplayStateCreated) + + jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(jobWithDetails, nil) + tenantGetter.On("GetDetails", ctx, tnnt).Return(tenantWithDetails, nil) + jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil) + replayValidator.On("Validate", ctx, replayReq, jobCron).Return(nil) + replayRepository.On("RegisterReplay", ctx, replayReq, replayRuns).Return(replayID, nil) + replayWorker.On("Execute", replayID, tnnt, jobName).Return().Maybe() + + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, replayWorker, nil, logger, taskNameToExecutionProjectMap) + result, err := replayService.CreateReplay(ctx, tnnt, jobName, replayConfigEmptyJobConfig) + assert.NoError(t, err) + assert.Equal(t, replayID, result) + }) + + t.Run("should return error if get tenant error", func(t *testing.T) { + replayRepository := new(ReplayRepository) + defer replayRepository.AssertExpectations(t) + + jobRepository := new(JobRepository) + defer jobRepository.AssertExpectations(t) + + replayValidator := new(ReplayValidator) + defer replayValidator.AssertExpectations(t) + + replayWorker := new(ReplayExecutor) + defer replayWorker.AssertExpectations(t) + + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + + replayConfigEmptyJobConfig := scheduler.NewReplayConfig(startTime, endTime, parallel, map[string]string{}, description) + + internalErr := errors.New("internal error") + jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(jobWithDetails, nil) + tenantGetter.On("GetDetails", ctx, tnnt).Return(nil, internalErr) + + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, replayWorker, nil, logger, taskNameToExecutionProjectMap) + result, err := replayService.CreateReplay(ctx, tnnt, jobName, replayConfigEmptyJobConfig) + assert.ErrorIs(t, err, internalErr) + assert.Equal(t, uuid.Nil, result) + }) + + t.Run("should return error if get job in inject config returns error", func(t *testing.T) { + replayRepository := new(ReplayRepository) + defer replayRepository.AssertExpectations(t) + + jobRepository := new(JobRepository) + defer jobRepository.AssertExpectations(t) + + replayValidator := new(ReplayValidator) + defer replayValidator.AssertExpectations(t) + + replayWorker := new(ReplayExecutor) + defer replayWorker.AssertExpectations(t) + + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + + jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(jobWithDetails, nil) + tenantGetter.On("GetDetails", ctx, tnnt).Return(tenantWithDetails, nil) + jobRepository.On("GetJob", ctx, projName, jobName).Return(nil, errors.New("internal error")) + + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, replayWorker, nil, logger, taskNameToExecutionProjectMap) + result, err := replayService.CreateReplay(ctx, tnnt, jobName, replayConfig) + assert.ErrorContains(t, err, "failed to get job for job name") + assert.Equal(t, uuid.Nil, result) + }) + t.Run("should return error if not pass validation", func(t *testing.T) { replayRepository := new(ReplayRepository) defer replayRepository.AssertExpectations(t) @@ -99,12 +218,17 @@ func TestReplayService(t *testing.T) { replayWorker := new(ReplayExecutor) defer replayWorker.AssertExpectations(t) + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + replayReq := scheduler.NewReplayRequest(jobName, tnnt, replayConfig, scheduler.ReplayStateCreated) jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(jobWithDetails, nil) + tenantGetter.On("GetDetails", ctx, tnnt).Return(tenantWithDetails, nil) + jobRepository.On("GetJob", ctx, projName, jobName).Return(&job, nil) replayValidator.On("Validate", ctx, replayReq, jobCron).Return(errors.New("not passed validation")) - replayService := service.NewReplayService(replayRepository, jobRepository, replayValidator, replayWorker, nil, logger) + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, replayWorker, nil, logger, taskNameToExecutionProjectMap) result, err := replayService.CreateReplay(ctx, tnnt, jobName, replayConfig) assert.ErrorContains(t, err, "not passed validation") assert.Equal(t, uuid.Nil, result) @@ -120,10 +244,13 @@ func TestReplayService(t *testing.T) { replayValidator := new(ReplayValidator) defer replayValidator.AssertExpectations(t) + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + internalErr := errors.New("internal error") jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(nil, internalErr) - replayService := service.NewReplayService(replayRepository, jobRepository, replayValidator, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, nil, nil, logger, taskNameToExecutionProjectMap) result, err := replayService.CreateReplay(ctx, tnnt, jobName, replayConfig) assert.ErrorIs(t, err, internalErr) assert.Equal(t, uuid.Nil, result) @@ -142,11 +269,14 @@ func TestReplayService(t *testing.T) { replayWorker := new(ReplayExecutor) defer replayWorker.AssertExpectations(t) + tenantGetter := new(TenantGetter) + defer tenantGetter.AssertExpectations(t) + invalidTenant, _ := tenant.NewTenant(projName.String(), "invalid-namespace") jobRepository.On("GetJobDetails", ctx, projName, jobName).Return(jobWithDetails, nil) - replayService := service.NewReplayService(replayRepository, jobRepository, replayValidator, replayWorker, nil, logger) + replayService := service.NewReplayService(replayRepository, jobRepository, tenantGetter, replayValidator, replayWorker, nil, logger, taskNameToExecutionProjectMap) result, err := replayService.CreateReplay(ctx, invalidTenant, jobName, replayConfig) assert.ErrorContains(t, err, "job sample_select does not exist in invalid-namespace namespace") assert.Equal(t, uuid.Nil, result) @@ -163,7 +293,7 @@ func TestReplayService(t *testing.T) { replayRepository.On("GetReplaysByProject", ctx, mock.Anything, mock.Anything).Return(replays, nil) defer replayRepository.AssertExpectations(t) - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) result, err := replayService.GetReplayList(ctx, tnnt.ProjectName()) assert.NoError(t, err) assert.Len(t, result, 3) @@ -174,7 +304,7 @@ func TestReplayService(t *testing.T) { replayRepository.On("GetReplaysByProject", ctx, mock.Anything, mock.Anything).Return(nil, errors.New("some error")) defer replayRepository.AssertExpectations(t) - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) result, err := replayService.GetReplayList(ctx, tnnt.ProjectName()) assert.Error(t, err) assert.Nil(t, result) @@ -188,7 +318,7 @@ func TestReplayService(t *testing.T) { replayID := uuid.New() replayRepository.On("GetReplayByID", ctx, replayID).Return(nil, errs.NotFound("entity", "not found")) - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) result, err := replayService.GetReplayByID(ctx, replayID) assert.True(t, errs.IsErrorType(err, errs.ErrNotFound)) assert.Empty(t, result) @@ -200,7 +330,7 @@ func TestReplayService(t *testing.T) { replayID := uuid.New() replayRepository.On("GetReplayByID", ctx, replayID).Return(nil, errors.New("internal error")) - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) result, err := replayService.GetReplayByID(ctx, replayID) assert.Error(t, err) assert.Nil(t, result) @@ -221,7 +351,7 @@ func TestReplayService(t *testing.T) { }, }, nil) - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) result, err := replayService.GetReplayByID(ctx, replayID) assert.NoError(t, err) assert.NotNil(t, result) @@ -245,7 +375,7 @@ func TestReplayService(t *testing.T) { }, } - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) err := replayService.CancelReplay(ctx, replayWithRun) assert.ErrorContains(t, err, "replay has already been terminated with status success") }) @@ -267,7 +397,7 @@ func TestReplayService(t *testing.T) { errorMsg := "internal error" replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(errors.New(errorMsg)).Once() - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) err := replayService.CancelReplay(ctx, replayWithRun) assert.ErrorContains(t, err, errorMsg) }) @@ -288,7 +418,7 @@ func TestReplayService(t *testing.T) { replayRepository.On("UpdateReplayStatus", mock.Anything, replay.ID(), scheduler.ReplayStateCancelled, mock.Anything).Return(nil).Once() - replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, logger) + replayService := service.NewReplayService(replayRepository, nil, nil, nil, nil, nil, logger, nil) err := replayService.CancelReplay(ctx, replayWithRun) assert.NoError(t, err) }) @@ -301,7 +431,7 @@ func TestReplayService(t *testing.T) { jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(nil, errors.New("internal error")) - replayService := service.NewReplayService(nil, jobRepository, nil, nil, nil, logger) + replayService := service.NewReplayService(nil, jobRepository, nil, nil, nil, nil, logger, nil) result, err := replayService.GetRunsStatus(ctx, tnnt, jobName, replayConfig) assert.Error(t, err) assert.Nil(t, result) @@ -316,7 +446,7 @@ func TestReplayService(t *testing.T) { jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil) schedulerRunGetter.On("GetJobRuns", ctx, tnnt, mock.Anything, mock.Anything).Return(nil, errors.New("internal error")) - replayService := service.NewReplayService(nil, jobRepository, nil, nil, schedulerRunGetter, logger) + replayService := service.NewReplayService(nil, jobRepository, nil, nil, nil, schedulerRunGetter, logger, nil) result, err := replayService.GetRunsStatus(ctx, tnnt, jobName, replayConfig) assert.Error(t, err) assert.Nil(t, result) @@ -333,7 +463,7 @@ func TestReplayService(t *testing.T) { jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil) schedulerRunGetter.On("GetJobRuns", ctx, tnnt, mock.Anything, mock.Anything).Return(runs, nil) - replayService := service.NewReplayService(nil, jobRepository, nil, nil, schedulerRunGetter, logger) + replayService := service.NewReplayService(nil, jobRepository, nil, nil, nil, schedulerRunGetter, logger, nil) result, err := replayService.GetRunsStatus(ctx, tnnt, jobName, replayConfig) assert.NoError(t, err) assert.NotNil(t, result) @@ -358,7 +488,7 @@ func TestReplayService(t *testing.T) { jobRepository.On("GetJobDetails", mock.Anything, projName, jobName).Return(jobWithDetails, nil) schedulerRunGetter.On("GetJobRuns", ctx, tnnt, mock.Anything, mock.Anything).Return(runs, nil) - replayService := service.NewReplayService(nil, jobRepository, nil, nil, schedulerRunGetter, logger) + replayService := service.NewReplayService(nil, jobRepository, nil, nil, nil, schedulerRunGetter, logger, nil) result, err := replayService.GetRunsStatus(ctx, tnnt, jobName, replayConfig) assert.NoError(t, err) assert.NotNil(t, result) @@ -553,3 +683,38 @@ type ReplayExecutor struct { func (_m *ReplayExecutor) Execute(replayID uuid.UUID, jobTenant tenant.Tenant, jobName scheduler.JobName) { _m.Called(replayID, jobTenant, jobName) } + +// TenantGetter is an autogenerated mock type for the TenantGetter type +type TenantGetter struct { + mock.Mock +} + +// GetDetails provides a mock function with given fields: ctx, tnnt +func (_m *TenantGetter) GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error) { + ret := _m.Called(ctx, tnnt) + + if len(ret) == 0 { + panic("no return value specified for GetDetails") + } + + var r0 *tenant.WithDetails + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, tenant.Tenant) (*tenant.WithDetails, error)); ok { + return rf(ctx, tnnt) + } + if rf, ok := ret.Get(0).(func(context.Context, tenant.Tenant) *tenant.WithDetails); ok { + r0 = rf(ctx, tnnt) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*tenant.WithDetails) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, tenant.Tenant) error); ok { + r1 = rf(ctx, tnnt) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/optimus.sample.yaml b/optimus.sample.yaml index dd2a77a71c..592bb6a067 100644 --- a/optimus.sample.yaml +++ b/optimus.sample.yaml @@ -38,7 +38,9 @@ project: # # relative path where resource spec for BQ are stored # path: "bq" # # namespace variables usable in specifications -# config: {} +# config: +# # for bq-related jobs, execution project specific to replay jobs are stored here +# replay_execution_project: "data_replay_project" #- name: sample_namespace_2 # job: # path: "ns2/job" diff --git a/server/optimus.go b/server/optimus.go index 2857c2e5db..8b6d070161 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -338,7 +338,11 @@ func (s *OptimusServer) setupHandlers() error { replayRepository := schedulerRepo.NewReplayRepository(s.dbPool) replayWorker := schedulerService.NewReplayWorker(s.logger, replayRepository, jobProviderRepo, newScheduler, s.conf.Replay) replayValidator := schedulerService.NewValidator(replayRepository, newScheduler, jobProviderRepo) - replayService := schedulerService.NewReplayService(replayRepository, jobProviderRepo, replayValidator, replayWorker, newScheduler, s.logger) + replayService := schedulerService.NewReplayService( + replayRepository, jobProviderRepo, tenantService, + replayValidator, replayWorker, newScheduler, + s.logger, s.conf.Replay.PluginExecutionProjectConfigNames, + ) newJobRunService := schedulerService.NewJobRunService( s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository,