diff --git a/migrations/scheduler/2_last_executed_at.down.sql b/migrations/scheduler/2_last_executed_at.down.sql new file mode 100644 index 00000000..249e69c4 --- /dev/null +++ b/migrations/scheduler/2_last_executed_at.down.sql @@ -0,0 +1,5 @@ +ALTER TABLE jobs +DROP last_executed_at; + +ALTER TABLE jobs +ADD execute_at BIGINT NOT NULL; \ No newline at end of file diff --git a/migrations/scheduler/2_last_executed_at.up.sql b/migrations/scheduler/2_last_executed_at.up.sql new file mode 100644 index 00000000..4c0d0566 --- /dev/null +++ b/migrations/scheduler/2_last_executed_at.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE jobs +DROP execute_at; + +ALTER TABLE jobs +ADD last_executed_at BIGINT NOT NULL DEFAULT 0; diff --git a/pkg/database/sqlite/storage.go b/pkg/database/sqlite/storage.go index 305bad0a..2f783488 100644 --- a/pkg/database/sqlite/storage.go +++ b/pkg/database/sqlite/storage.go @@ -35,7 +35,7 @@ func New(logger *zap.SugaredLogger, cfg DBConfig, dbName string) (*DB, error) { DB: db, } - if err := s.migrate(dbName, cfg.MigrationsPath); err != nil { + if err := s.migrate(dbName, cfg.MigrationsPath, false); err != nil { return nil, err } @@ -55,7 +55,21 @@ func (s *DB) Close() error { return nil } -func (s *DB) migrate(dbName, migrationsPath string) error { +var _ migrate.Logger = (*migrationLogger)(nil) + +type migrationLogger struct { + *zap.SugaredLogger +} + +func (l *migrationLogger) Printf(format string, v ...interface{}) { + l.Infof(format, v...) +} + +func (l *migrationLogger) Verbose() bool { + return false +} + +func (s *DB) migrate(dbName, migrationsPath string, logMigrations bool) error { s.Logger.Infof("Performing db migrations...") driver, err := sqlite3.WithInstance(s.DB.DB, &sqlite3.Config{}) @@ -70,6 +84,10 @@ func (s *DB) migrate(dbName, migrationsPath string) error { return err } + if logMigrations { + migration.Log = &migrationLogger{s.Logger} + } + version, dirty, err := migration.Version() if err != nil && !errors.Is(err, migrate.ErrNilVersion) { s.Logger.Error(err) diff --git a/pkg/scheduler/job.go b/pkg/scheduler/job.go index 165e92ea..1940796c 100644 --- a/pkg/scheduler/job.go +++ b/pkg/scheduler/job.go @@ -3,7 +3,7 @@ package scheduler import "time" type Job struct { - Name string - ExecuteAt time.Time - Period time.Duration + Name string + LastExecutedAt time.Time + Period time.Duration } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 410e2f68..75e73035 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -96,28 +96,35 @@ func (s *scheduler) Start(ctx context.Context) error { period := job.Period now := s.clock.Now() - durationUntilFirstExecution := time.Duration(0) - if job.ExecuteAt.After(now) { - durationUntilFirstExecution = job.ExecuteAt.Sub(now) + durationUntilFirstExecution := job.LastExecutedAt.Add(job.Period).Sub(now) + if durationUntilFirstExecution < 0 { + durationUntilFirstExecution = 0 } onceDone := make(chan struct{}) - handler := func() { + handler := func(tickTime time.Time) { // TODO @evlekht panic handling? - if err := s.updateJobExecutionTime(ctx, jobName); err != nil { + if err := s.updateJobExecutionTime(ctx, jobName, tickTime); err != nil { s.logger.Errorf("failed to update job execution time: %v", err) return // TODO @evlekht handle error, maybe retry ? } jobHandler() } - timer := s.clock.AfterFunc(durationUntilFirstExecution, func() { - handler() - close(onceDone) - }) + // first execution + timer := s.clock.NewTimer(durationUntilFirstExecution) s.setJobTimer(job.Name, &timerStopper{timer}) + go func() { + select { + case tickTime := <-timer.Chan(): + handler(tickTime) + case <-timersCtx.Done(): + } + close(onceDone) + }() + // periodic execution go func() { <-onceDone ticker := s.clock.NewTicker(period) @@ -125,8 +132,8 @@ func (s *scheduler) Start(ctx context.Context) error { s.setJobTimer(job.Name, ticker) for { select { - case <-ticker.Chan(): - handler() + case tickTime := <-ticker.Chan(): + handler(tickTime) case <-timersCtx.Done(): return } @@ -148,8 +155,8 @@ func (s *scheduler) Stop() error { return nil } -// Schedule schedules a job to be executed every period. If jobID is empty, a new job is created. -// Otherwise, the existing job period is updated and expiration time is set to min(current expiration time, now + period). +// Schedules a job to be executed every period. +// If there is already scheduled job with the same name, then its period is updated. func (s *scheduler) Schedule(ctx context.Context, period time.Duration, jobName string) error { session, err := s.storage.NewSession(ctx) if err != nil { @@ -164,22 +171,16 @@ func (s *scheduler) Schedule(ctx context.Context, period time.Duration, jobName return err } - executeAt := s.clock.Now().Add(period) - + lastExecutedAt := time.Time{} if job != nil { - job.Period = period - if executeAt.Before(job.ExecuteAt) { - job.ExecuteAt = executeAt - } - } else { - job = &Job{ - Name: jobName, - ExecuteAt: executeAt, - Period: period, - } + lastExecutedAt = job.LastExecutedAt } - if err := s.storage.UpsertJob(ctx, session, job); err != nil { + if err := s.storage.UpsertJob(ctx, session, &Job{ + Name: jobName, + LastExecutedAt: lastExecutedAt, + Period: period, + }); err != nil { s.logger.Errorf("failed to store scheduled job: %v", err) return err } @@ -193,7 +194,7 @@ func (s *scheduler) RegisterJobHandler(jobName string, jobHandler func()) { s.registryLock.Unlock() } -func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string) error { +func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string, executionTime time.Time) error { session, err := s.storage.NewSession(ctx) if err != nil { s.logger.Errorf("failed to create storage session: %v", err) @@ -207,7 +208,7 @@ func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string) return err } - job.ExecuteAt = s.clock.Now().Add(job.Period) + job.LastExecutedAt = executionTime if err := s.storage.UpsertJob(ctx, session, job); err != nil { s.logger.Errorf("failed to store scheduled job: %v", err) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 11dd21a3..704e1ed5 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -17,7 +17,7 @@ func TestScheduler_Start(t *testing.T) { require := require.New(t) ctx := context.Background() - clock := clockwork.NewFakeClockAt(time.Unix(0, 100)) + clock := clockwork.NewFakeClockAt(time.Unix(0, 10_000)) ctrl := gomock.NewController(t) storage := NewMockStorage(ctrl) epsilon := time.Millisecond @@ -25,38 +25,61 @@ func TestScheduler_Start(t *testing.T) { earlyJobExecuted := make(chan string) nowJobExecuted := make(chan string) + freshJobExecuted := make(chan string) lateJobExecuted := make(chan string) + // job that was executed before scheduler starts + // and next execution should be scheduled before scheduler starts + earlyJobPeriod := time.Duration(1000) earlyJob := Job{ - Name: "early_job", - ExecuteAt: clock.Now().Add(-1), - Period: 1000, + Name: "early_job", + LastExecutedAt: clock.Now().Add(-earlyJobPeriod - 1), + Period: earlyJobPeriod, } + + // job that was executed before scheduler starts + // and next execution should be scheduled right when scheduler starts + nowJobPeriod := time.Duration(1003) nowJob := Job{ - Name: "now_job", - ExecuteAt: clock.Now(), - Period: 1003, + Name: "now_job", + LastExecutedAt: clock.Now().Add(-nowJobPeriod), + Period: nowJobPeriod, + } + + // job that was never executed + // and next execution should be scheduled right when scheduler starts + freshJob := Job{ + Name: "fresh_job", + LastExecutedAt: time.Unix(0, 0), + Period: time.Duration(1005), } + + // job that was executed before scheduler starts + // and next execution should be scheduled after scheduler starts + lateJobPeriod := time.Duration(1007) lateJob := Job{ - Name: "late_job", - ExecuteAt: clock.Now().Add(1), - Period: 1007, + Name: "late_job", + LastExecutedAt: clock.Now().Add(-lateJobPeriod + 1), + Period: lateJobPeriod, } - jobs := []*Job{&earlyJob, &nowJob, &lateJob} + + jobs := []*Job{&earlyJob, &nowJob, &freshJob, &lateJob} jobsExecChansMap := map[string]chan string{ earlyJob.Name: earlyJobExecuted, nowJob.Name: nowJobExecuted, + freshJob.Name: freshJobExecuted, lateJob.Name: lateJobExecuted, } - jobsExecChans := []chan string{earlyJobExecuted, nowJobExecuted, lateJobExecuted} + jobsExecChans := []chan string{earlyJobExecuted, nowJobExecuted, freshJobExecuted, lateJobExecuted} // this is needed for correct time-advancement sequence - require.Less(earlyJob.ExecuteAt, clock.Now()) - require.Equal(nowJob.ExecuteAt, clock.Now()) + require.Less(earlyJob.LastExecutedAt.Add(earlyJob.Period), clock.Now()) + require.Equal(nowJob.LastExecutedAt.Add(nowJob.Period), clock.Now()) require.Less(earlyJob.Period, nowJob.Period) - require.Less(nowJob.Period, lateJob.Period) + require.Less(nowJob.Period, freshJob.Period) + require.Less(freshJob.Period, lateJob.Period) require.Less(lateJob.Period, timeout-epsilon) // *** mock & executionSequence setup @@ -77,29 +100,37 @@ func TestScheduler_Start(t *testing.T) { storage.EXPECT().GetAllJobs(ctx, storageSession).Return(jobs, nil) storage.EXPECT().Abort(storageSession) - // startOnce and periodic start goroutines - - // its clock.Now().Add(-1), but we need real execution time for next mock setup steps - // it will be corrected after - earlyJob.ExecuteAt = clock.Now() // real execution time + // timers and tickers goroutines + lastJobExecutionTime := map[string]time.Time{ + earlyJob.Name: earlyJob.LastExecutedAt, + nowJob.Name: nowJob.LastExecutedAt, + freshJob.Name: freshJob.LastExecutedAt, + lateJob.Name: lateJob.LastExecutedAt, + } for i := 0; i < numberOfFullCycles; i++ { for _, originalJob := range jobs { currentJob := Job{ - Name: originalJob.Name, - ExecuteAt: originalJob.ExecuteAt.Add(originalJob.Period * time.Duration(i)), - Period: originalJob.Period, + Name: originalJob.Name, + LastExecutedAt: lastJobExecutionTime[originalJob.Name], + Period: originalJob.Period, + } + + executionTime := lastJobExecutionTime[originalJob.Name].Add(originalJob.Period) + if executionTime.Before(clock.Now()) { + executionTime = clock.Now() } + lastJobExecutionTime[originalJob.Name] = executionTime newJob := &Job{ - Name: originalJob.Name, - ExecuteAt: currentJob.ExecuteAt.Add(originalJob.Period), - Period: originalJob.Period, + Name: originalJob.Name, + LastExecutedAt: executionTime, + Period: originalJob.Period, } - if len(executionSequence) == 0 || executionSequence[len(executionSequence)-1].time != currentJob.ExecuteAt { + if len(executionSequence) == 0 || executionSequence[len(executionSequence)-1].time != newJob.LastExecutedAt { executionSequence = append(executionSequence, executionStep{ - time: currentJob.ExecuteAt, + time: newJob.LastExecutedAt, jobs: []Job{currentJob}, initialTimer: i == 0, }) @@ -116,9 +147,6 @@ func TestScheduler_Start(t *testing.T) { } } - // correct earlyJob.ExecuteAt - earlyJob.ExecuteAt = clock.Now().Add(-1) - // *** scheduler sch := New(zap.NewNop().Sugar(), storage, clock).(*scheduler) @@ -128,6 +156,9 @@ func TestScheduler_Start(t *testing.T) { sch.RegisterJobHandler(nowJob.Name, func() { nowJobExecuted <- nowJob.Name + " executed" }) + sch.RegisterJobHandler(freshJob.Name, func() { + freshJobExecuted <- freshJob.Name + " executed" + }) sch.RegisterJobHandler(lateJob.Name, func() { lateJobExecuted <- lateJob.Name + " executed" }) @@ -235,7 +266,7 @@ func TestScheduler_RegisterJobHandler(t *testing.T) { func TestScheduler_Schedule(t *testing.T) { type testCase struct { - storage func(context.Context, *gomock.Controller, clockwork.Clock, *testCase) Storage + storage func(context.Context, *gomock.Controller, *testCase) Storage existingJob *Job jobName string period time.Duration @@ -244,15 +275,14 @@ func TestScheduler_Schedule(t *testing.T) { tests := map[string]testCase{ "OK: New job": { - storage: func(ctx context.Context, ctrl *gomock.Controller, clock clockwork.Clock, tt *testCase) Storage { + storage: func(ctx context.Context, ctrl *gomock.Controller, tt *testCase) Storage { storage := NewMockStorage(ctrl) storageSession := &dummySession{} storage.EXPECT().NewSession(ctx).Return(storageSession, nil) storage.EXPECT().GetJobByName(ctx, storageSession, tt.jobName).Return(nil, ErrNotFound) storage.EXPECT().UpsertJob(ctx, storageSession, &Job{ - Name: tt.jobName, - ExecuteAt: clock.Now().Add(tt.period), - Period: tt.period, + Name: tt.jobName, + Period: tt.period, }).Return(nil) storage.EXPECT().Commit(storageSession).Return(nil) storage.EXPECT().Abort(storageSession) @@ -262,24 +292,24 @@ func TestScheduler_Schedule(t *testing.T) { period: 10 * time.Second, }, "OK: Existing job": { - storage: func(ctx context.Context, ctrl *gomock.Controller, _ clockwork.Clock, tt *testCase) Storage { + storage: func(ctx context.Context, ctrl *gomock.Controller, tt *testCase) Storage { storage := NewMockStorage(ctrl) storageSession := &dummySession{} storage.EXPECT().NewSession(ctx).Return(storageSession, nil) storage.EXPECT().GetJobByName(ctx, storageSession, tt.jobName).Return(tt.existingJob, nil) storage.EXPECT().UpsertJob(ctx, storageSession, &Job{ - Name: tt.jobName, - ExecuteAt: tt.existingJob.ExecuteAt, - Period: tt.period, + Name: tt.jobName, + LastExecutedAt: tt.existingJob.LastExecutedAt, + Period: tt.period, }).Return(nil) storage.EXPECT().Commit(storageSession).Return(nil) storage.EXPECT().Abort(storageSession) return storage }, existingJob: &Job{ - Name: "existing_job", - ExecuteAt: time.Now(), - Period: 10 * time.Second, + Name: "existing_job", + LastExecutedAt: time.Now(), + Period: 10 * time.Second, }, jobName: "existing_job", period: 15 * time.Second, @@ -293,7 +323,7 @@ func TestScheduler_Schedule(t *testing.T) { sch := New( zap.NewNop().Sugar(), - tt.storage(ctx, gomock.NewController(t), clock, &tt), + tt.storage(ctx, gomock.NewController(t), &tt), clock, ).(*scheduler) diff --git a/pkg/scheduler/storage/sqlite/jobs.go b/pkg/scheduler/storage/sqlite/jobs.go index cb625cda..39d5042c 100644 --- a/pkg/scheduler/storage/sqlite/jobs.go +++ b/pkg/scheduler/storage/sqlite/jobs.go @@ -16,9 +16,9 @@ const jobsTableName = "jobs" var _ scheduler.Storage = (*storage)(nil) type job struct { - Name string `db:"name"` - ExecuteAt int64 `db:"execute_at"` - Period int64 `db:"period"` + Name string `db:"name"` + LastExecutedAt int64 `db:"last_executed_at"` + Period int64 `db:"period"` } func (s *storage) GetJobByName(ctx context.Context, session scheduler.Session, jobName string) (*scheduler.Job, error) { @@ -103,15 +103,17 @@ func (s *storage) prepareJobsStmts(ctx context.Context) error { upsertJob, err := s.base.DB.PrepareNamedContext(ctx, fmt.Sprintf(` INSERT INTO %s ( name, - execute_at, + last_executed_at, period ) VALUES ( :name, - :execute_at, + :last_executed_at, :period ) ON CONFLICT(name) - DO UPDATE SET period = excluded.period + DO UPDATE SET + period = excluded.period, + last_executed_at = excluded.last_executed_at `, jobsTableName)) if err != nil { s.base.Logger.Error(err) @@ -133,16 +135,16 @@ func (s *storage) prepareJobsStmts(ctx context.Context) error { func modelFromJob(job *job) *scheduler.Job { return &scheduler.Job{ - Name: job.Name, - ExecuteAt: time.Unix(job.ExecuteAt, 0), - Period: time.Duration(job.Period) * time.Second, + Name: job.Name, + LastExecutedAt: time.Unix(job.LastExecutedAt, 0), + Period: time.Duration(job.Period) * time.Second, } } func jobFromModel(model *scheduler.Job) *job { return &job{ - Name: model.Name, - ExecuteAt: model.ExecuteAt.Unix(), - Period: int64(model.Period / time.Second), + Name: model.Name, + LastExecutedAt: model.LastExecutedAt.Unix(), + Period: int64(model.Period / time.Second), } }