Skip to content

Commit

Permalink
Make scheduler use lastExecutedAt instead of next planned executeAt (#73
Browse files Browse the repository at this point in the history
)
  • Loading branch information
evlekht authored Dec 3, 2024
1 parent befeaeb commit 73fcb4c
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 89 deletions.
5 changes: 5 additions & 0 deletions migrations/scheduler/2_last_executed_at.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE jobs
DROP last_executed_at;

ALTER TABLE jobs
ADD execute_at BIGINT NOT NULL;
5 changes: 5 additions & 0 deletions migrations/scheduler/2_last_executed_at.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE jobs
DROP execute_at;

ALTER TABLE jobs
ADD last_executed_at BIGINT NOT NULL DEFAULT 0;
22 changes: 20 additions & 2 deletions pkg/database/sqlite/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func New(logger *zap.SugaredLogger, cfg DBConfig, dbName string) (*DB, error) {
DB: db,
}

if err := s.migrate(dbName, cfg.MigrationsPath); err != nil {
if err := s.migrate(dbName, cfg.MigrationsPath, false); err != nil {
return nil, err
}

Expand All @@ -55,7 +55,21 @@ func (s *DB) Close() error {
return nil
}

func (s *DB) migrate(dbName, migrationsPath string) error {
var _ migrate.Logger = (*migrationLogger)(nil)

type migrationLogger struct {
*zap.SugaredLogger
}

func (l *migrationLogger) Printf(format string, v ...interface{}) {
l.Infof(format, v...)
}

func (l *migrationLogger) Verbose() bool {
return false
}

func (s *DB) migrate(dbName, migrationsPath string, logMigrations bool) error {
s.Logger.Infof("Performing db migrations...")

driver, err := sqlite3.WithInstance(s.DB.DB, &sqlite3.Config{})
Expand All @@ -70,6 +84,10 @@ func (s *DB) migrate(dbName, migrationsPath string) error {
return err
}

if logMigrations {
migration.Log = &migrationLogger{s.Logger}
}

version, dirty, err := migration.Version()
if err != nil && !errors.Is(err, migrate.ErrNilVersion) {
s.Logger.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package scheduler
import "time"

type Job struct {
Name string
ExecuteAt time.Time
Period time.Duration
Name string
LastExecutedAt time.Time
Period time.Duration
}
57 changes: 29 additions & 28 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,37 +96,44 @@ func (s *scheduler) Start(ctx context.Context) error {
period := job.Period

now := s.clock.Now()
durationUntilFirstExecution := time.Duration(0)
if job.ExecuteAt.After(now) {
durationUntilFirstExecution = job.ExecuteAt.Sub(now)
durationUntilFirstExecution := job.LastExecutedAt.Add(job.Period).Sub(now)
if durationUntilFirstExecution < 0 {
durationUntilFirstExecution = 0
}

onceDone := make(chan struct{})

handler := func() {
handler := func(tickTime time.Time) {
// TODO @evlekht panic handling?
if err := s.updateJobExecutionTime(ctx, jobName); err != nil {
if err := s.updateJobExecutionTime(ctx, jobName, tickTime); err != nil {
s.logger.Errorf("failed to update job execution time: %v", err)
return // TODO @evlekht handle error, maybe retry ?
}
jobHandler()
}

timer := s.clock.AfterFunc(durationUntilFirstExecution, func() {
handler()
close(onceDone)
})
// first execution
timer := s.clock.NewTimer(durationUntilFirstExecution)
s.setJobTimer(job.Name, &timerStopper{timer})
go func() {
select {
case tickTime := <-timer.Chan():
handler(tickTime)
case <-timersCtx.Done():
}
close(onceDone)
}()

// periodic execution
go func() {
<-onceDone
ticker := s.clock.NewTicker(period)
defer ticker.Stop()
s.setJobTimer(job.Name, ticker)
for {
select {
case <-ticker.Chan():
handler()
case tickTime := <-ticker.Chan():
handler(tickTime)
case <-timersCtx.Done():
return
}
Expand All @@ -148,8 +155,8 @@ func (s *scheduler) Stop() error {
return nil
}

// Schedule schedules a job to be executed every period. If jobID is empty, a new job is created.
// Otherwise, the existing job period is updated and expiration time is set to min(current expiration time, now + period).
// Schedules a job to be executed every period.
// If there is already scheduled job with the same name, then its period is updated.
func (s *scheduler) Schedule(ctx context.Context, period time.Duration, jobName string) error {
session, err := s.storage.NewSession(ctx)
if err != nil {
Expand All @@ -164,22 +171,16 @@ func (s *scheduler) Schedule(ctx context.Context, period time.Duration, jobName
return err
}

executeAt := s.clock.Now().Add(period)

lastExecutedAt := time.Time{}
if job != nil {
job.Period = period
if executeAt.Before(job.ExecuteAt) {
job.ExecuteAt = executeAt
}
} else {
job = &Job{
Name: jobName,
ExecuteAt: executeAt,
Period: period,
}
lastExecutedAt = job.LastExecutedAt
}

if err := s.storage.UpsertJob(ctx, session, job); err != nil {
if err := s.storage.UpsertJob(ctx, session, &Job{
Name: jobName,
LastExecutedAt: lastExecutedAt,
Period: period,
}); err != nil {
s.logger.Errorf("failed to store scheduled job: %v", err)
return err
}
Expand All @@ -193,7 +194,7 @@ func (s *scheduler) RegisterJobHandler(jobName string, jobHandler func()) {
s.registryLock.Unlock()
}

func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string) error {
func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string, executionTime time.Time) error {
session, err := s.storage.NewSession(ctx)
if err != nil {
s.logger.Errorf("failed to create storage session: %v", err)
Expand All @@ -207,7 +208,7 @@ func (s *scheduler) updateJobExecutionTime(ctx context.Context, jobName string)
return err
}

job.ExecuteAt = s.clock.Now().Add(job.Period)
job.LastExecutedAt = executionTime

if err := s.storage.UpsertJob(ctx, session, job); err != nil {
s.logger.Errorf("failed to store scheduled job: %v", err)
Expand Down
Loading

0 comments on commit 73fcb4c

Please sign in to comment.