Skip to content

Commit

Permalink
fix: replay config unchanged given for the same run issue (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
arinda-arif committed Jul 8, 2024
1 parent 7c993e7 commit 0bdeab1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
22 changes: 5 additions & 17 deletions internal/store/postgres/scheduler/replay_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,12 @@ func (r ReplayRepository) UpdateReplay(ctx context.Context, id uuid.UUID, replay
}

func (r ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error) {
getReplayRequest := `SELECT job_config FROM replay_request WHERE job_name=$1 AND namespace_name=$2 AND project_name=$3 AND start_time<=$4 AND $4<=end_time ORDER BY created_at ASC`
rows, err := r.db.Query(ctx, getReplayRequest, jobName, jobTenant.NamespaceName(), jobTenant.ProjectName(), scheduledAt)
if err != nil {
return nil, errors.Wrap(job.EntityJob, "unable to get replay job configs", err)
}
defer rows.Close()

getReplayRequest := `SELECT job_config FROM replay_request WHERE job_name=$1 AND namespace_name=$2 AND project_name=$3 AND start_time<=$4 AND $4<=end_time AND status=$5 ORDER BY created_at DESC LIMIT 1`
configs := map[string]string{}
for rows.Next() {
var rr replayRequest
if err := rows.Scan(&rr.JobConfig); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.NotFound(job.EntityJob, fmt.Sprintf("no replay found for scheduledAt %s", scheduledAt.String()))
}
return nil, errors.Wrap(scheduler.EntityJobRun, "unable to get the stored replay job cnfig", err)
}
for k, v := range rr.JobConfig {
configs[k] = v
if err := r.db.QueryRow(ctx, getReplayRequest, jobName, jobTenant.NamespaceName(), jobTenant.ProjectName(), scheduledAt, scheduler.ReplayStateInProgress.String()).
Scan(&configs); err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return nil, errors.Wrap(job.EntityJob, "unable to get replay job configs", err)
}
}
return configs, nil
Expand Down
29 changes: 26 additions & 3 deletions internal/store/postgres/scheduler/replay_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,41 @@ func TestPostgresSchedulerRepository(t *testing.T) {
scheduledAt, _ := time.Parse(scheduler.ISODateFormat, "2022-01-02T15:04:05Z")

replayConfig := scheduler.NewReplayConfig(startTime, endTime, true, map[string]string{"EXECUTION_PROJECT": "example1"}, description)
replayReq := scheduler.NewReplayRequest(jobBName, tnnt, replayConfig, scheduler.ReplayStateCreated)
replayReq := scheduler.NewReplayRequest(jobBName, tnnt, replayConfig, scheduler.ReplayStateInProgress)
_, err := replayRepo.RegisterReplay(ctx, replayReq, jobRunsAllPending)
assert.Nil(t, err)
replayConfig = scheduler.NewReplayConfig(startTime, endTime, true, replayJobConfig, description)
replayReq = scheduler.NewReplayRequest(jobBName, tnnt, replayConfig, scheduler.ReplayStateCreated)
replayReq = scheduler.NewReplayRequest(jobBName, tnnt, replayConfig, scheduler.ReplayStateInProgress)
_, err = replayRepo.RegisterReplay(ctx, replayReq, jobRunsAllPending)
assert.Nil(t, err)

actualReplayJobConfig, err := replayRepo.GetReplayJobConfig(ctx, tnnt, jobBName, scheduledAt)
assert.Nil(t, err)
assert.Equal(t, replayJobConfig, actualReplayJobConfig)
})
t.Run("return latest in progress replay task config for the same filter", func(t *testing.T) {
db := dbSetup()
replayRepo := postgres.NewReplayRepository(db)
startTime, _ := time.Parse(scheduler.ISODateFormat, "2022-01-01T15:04:05Z")
endTime, _ := time.Parse(scheduler.ISODateFormat, "2022-01-03T15:04:05Z")
scheduledAt, _ := time.Parse(scheduler.ISODateFormat, "2022-01-02T15:04:05Z")

firstJobConfig := map[string]string{"EXECUTION_PROJECT": "example1"}
firstReplayConfig := scheduler.NewReplayConfig(startTime, endTime, true, firstJobConfig, description)
replayReq := scheduler.NewReplayRequest(jobBName, tnnt, firstReplayConfig, scheduler.ReplayStateInProgress)
_, err := replayRepo.RegisterReplay(ctx, replayReq, jobRunsAllPending)
assert.Nil(t, err)

secondJobConfig := map[string]string{"EXECUTION_PROJECT": "example2"}
secondReplayConfig := scheduler.NewReplayConfig(startTime, endTime, true, secondJobConfig, description)
replayReq = scheduler.NewReplayRequest(jobBName, tnnt, secondReplayConfig, scheduler.ReplayStateInProgress)
_, err = replayRepo.RegisterReplay(ctx, replayReq, jobRunsAllPending)
assert.Nil(t, err)

actualReplayJobConfig, err := replayRepo.GetReplayJobConfig(ctx, tnnt, jobBName, scheduledAt)
assert.Nil(t, err)
assert.Equal(t, secondJobConfig, actualReplayJobConfig)
})
t.Run("return empty replay task config when there's no extra config in replay config", func(t *testing.T) {
db := dbSetup()
replayRepo := postgres.NewReplayRepository(db)
Expand All @@ -174,7 +197,7 @@ func TestPostgresSchedulerRepository(t *testing.T) {
scheduledAt, _ := time.Parse(scheduler.ISODateFormat, "2022-01-02T15:04:05Z")

replayConfig := scheduler.NewReplayConfig(startTime, endTime, true, map[string]string{}, description)
replayReq := scheduler.NewReplayRequest(jobBName, tnnt, replayConfig, scheduler.ReplayStateCreated)
replayReq := scheduler.NewReplayRequest(jobBName, tnnt, replayConfig, scheduler.ReplayStateInProgress)
_, err := replayRepo.RegisterReplay(ctx, replayReq, jobRunsAllPending)
assert.Nil(t, err)

Expand Down

0 comments on commit 0bdeab1

Please sign in to comment.