Skip to content

Commit

Permalink
feat: replay decentralization - provide execution project as namespac…
Browse files Browse the repository at this point in the history
…e & project level config (#226)

* feat: read namespace config in replay

* feat: add unit test, export into function

* fix: lint

* feat: also add project-level config & do not mutate config in param

* feat: add map of supported plugins

* feat: add to optimus.sample.yaml

* feat: modify logic & update tests

* refactor: remove map usage & use config server instead

* fix: lint

* feat: add to config.sample.yaml
  • Loading branch information
ahmadnaufal authored Jun 3, 2024
1 parent e349540 commit cc10910
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 23 deletions.
9 changes: 9 additions & 0 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,12 @@ log:
# batch_interval_second: 1
# broker_urls:
# - localhost:9092

# replay:
# plugin_execution_project_config_names:
# bq2bq: EXECUTION_PROJECT
# bq2pg: BQ_EXECUTION_PROJECT
# pg2bq: BQ_EXECUTION_PROJECT
# bq2api: EXECUTION_PROJECT
# clevertap: EXECUTION_PROJECT
# transporterTask: EXECUTION_PROJECT
5 changes: 3 additions & 2 deletions config/config_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type PluginConfig struct {
}

type ReplayConfig struct {
ReplayTimeoutInMinutes int `mapstructure:"replay_timeout_in_minutes" default:"180"`
ExecutionIntervalInSeconds int `mapstructure:"execution_interval_in_seconds" default:"120"`
ReplayTimeoutInMinutes int `mapstructure:"replay_timeout_in_minutes" default:"180"`
ExecutionIntervalInSeconds int `mapstructure:"execution_interval_in_seconds" default:"120"`
PluginExecutionProjectConfigNames map[string]string `mapstructure:"plugin_execution_project_config_names"`
}

type Publisher struct {
Expand Down
77 changes: 75 additions & 2 deletions core/scheduler/service/replay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
getReplaysDayLimit = 30 // TODO: make it configurable via cli

metricJobReplay = "jobrun_replay_requests_total"

tenantReplayExecutionProjectConfigKey = "REPLAY_EXECUTION_PROJECT"
)

type SchedulerRunGetter interface {
Expand All @@ -34,6 +36,10 @@ type ReplayRepository interface {
GetReplayByID(ctx context.Context, replayID uuid.UUID) (*scheduler.ReplayWithRun, error)
}

type TenantGetter interface {
GetDetails(ctx context.Context, tnnt tenant.Tenant) (*tenant.WithDetails, error)
}

type ReplayValidator interface {
Validate(ctx context.Context, replayRequest *scheduler.Replay, jobCron *cron.ScheduleSpec) error
}
Expand All @@ -50,7 +56,14 @@ type ReplayService struct {
validator ReplayValidator
executor ReplayExecutor

tenantGetter TenantGetter

logger log.Logger

// stores mapping of task names (optimus plugin names) to its respective execution project config names.
// this mapping is needed because our bq plugins supporting execution project uses different config names inside the plugins.
// after the config naming is standardized, this map can be omitted
pluginToExecutionProjectKeyMap map[string]string
}

func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (replayID uuid.UUID, err error) {
Expand All @@ -60,6 +73,13 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant,
return uuid.Nil, err
}

newConfig, err := r.injectJobConfigWithTenantConfigs(ctx, tenant, jobName, config)
if err != nil {
r.logger.Error("unable to get namespace details for job %s: %s", jobName.String(), err)
return uuid.Nil, err
}
config.JobConfig = newConfig

replayReq := scheduler.NewReplayRequest(jobName, tenant, config, scheduler.ReplayStateCreated)
if err := r.validator.Validate(ctx, replayReq, jobCron); err != nil {
r.logger.Error("error validating replay request: %s", err)
Expand All @@ -84,6 +104,41 @@ func (r *ReplayService) CreateReplay(ctx context.Context, tenant tenant.Tenant,
return replayID, nil
}

func (r *ReplayService) injectJobConfigWithTenantConfigs(ctx context.Context, tnnt tenant.Tenant, jobName scheduler.JobName, config *scheduler.ReplayConfig) (map[string]string, error) {
// copy ReplayConfig to a new map to mutate it
newConfig := map[string]string{}
for cfgKey, cfgVal := range config.JobConfig {
newConfig[cfgKey] = cfgVal
}

// get tenant (project & namespace) configuration to obtain the execution project specifically for replay.
// note that the current behavior of GetDetails in the implementing struct prioritized namespace config over project config.
tenantWithDetails, err := r.tenantGetter.GetDetails(ctx, tnnt)
if err != nil {
return nil, errors.AddErrContext(err, scheduler.EntityReplay,
fmt.Sprintf("failed to get tenant details for project [%s], namespace [%s]",
tnnt.ProjectName(), tnnt.NamespaceName()))
}

job, err := r.jobRepo.GetJob(ctx, tnnt.ProjectName(), jobName)
if err != nil {
return nil, errors.AddErrContext(err, scheduler.EntityReplay,
fmt.Sprintf("failed to get job for job name [%s]", jobName))
}

tenantConfig := tenantWithDetails.GetConfigs()

// override the default execution project with the one in tenant config.
// only inject tenant-level config if execution project is not provided in ReplayConfig
overridedConfigKey, isSupported := r.pluginToExecutionProjectKeyMap[job.Task.Name]
tenantExecutionProject := tenantConfig[tenantReplayExecutionProjectConfigKey]
if isSupported && tenantExecutionProject != "" {
newConfig[overridedConfigKey] = tenantExecutionProject
}

return newConfig, nil
}

func (r *ReplayService) GetReplayList(ctx context.Context, projectName tenant.ProjectName) (replays []*scheduler.Replay, err error) {
return r.replayRepo.GetReplaysByProject(ctx, projectName, getReplaysDayLimit)
}
Expand Down Expand Up @@ -130,8 +185,26 @@ func (r *ReplayService) CancelReplay(ctx context.Context, replayWithRun *schedul
return r.replayRepo.UpdateReplayStatus(ctx, replayWithRun.Replay.ID(), scheduler.ReplayStateCancelled, cancelMessage)
}

func NewReplayService(replayRepo ReplayRepository, jobRepo JobRepository, validator ReplayValidator, worker ReplayExecutor, runGetter SchedulerRunGetter, logger log.Logger) *ReplayService {
return &ReplayService{replayRepo: replayRepo, jobRepo: jobRepo, validator: validator, executor: worker, runGetter: runGetter, logger: logger}
func NewReplayService(
replayRepo ReplayRepository,
jobRepo JobRepository,
tenantGetter TenantGetter,
validator ReplayValidator,
worker ReplayExecutor,
runGetter SchedulerRunGetter,
logger log.Logger,
pluginToExecutionProjectKeyMap map[string]string,
) *ReplayService {
return &ReplayService{
replayRepo: replayRepo,
jobRepo: jobRepo,
tenantGetter: tenantGetter,
validator: validator,
executor: worker,
runGetter: runGetter,
logger: logger,
pluginToExecutionProjectKeyMap: pluginToExecutionProjectKeyMap,
}
}

func getJobCron(ctx context.Context, l log.Logger, jobRepo JobRepository, tnnt tenant.Tenant, jobName scheduler.JobName) (*cron.ScheduleSpec, error) {
Expand Down
Loading

0 comments on commit cc10910

Please sign in to comment.