Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MG-36 - Global ticker for scheduled rules #42

Merged
merged 16 commits into from
Jan 24, 2025

Conversation

nyagamunene
Copy link
Contributor

@nyagamunene nyagamunene commented Jan 17, 2025

What type of PR is this?

This is a feature because it adds the following functionality: it adds a startdatetime variable and adds a filter for fetching rules by schedule and a global ticker that will iterates over rules every minute and check if there are rules that fit criteria it triggers them.

What does this do?

  1. Adds Start date and time
  2. Adds Filter for fetching rules by schedule
  3. Adds Global ticker that will iterate over rules every minute and check if there are rules that fit criteria using 2) and trigger them if there are such rules

Which issue(s) does this PR fix/relate to?

Have you included tests for your changes?

Yes

Did you document any new/modified feature?

No

Notes

Signed-off-by: nyagamunene <[email protected]>
@nyagamunene nyagamunene self-assigned this Jan 17, 2025
Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
@nyagamunene nyagamunene marked this pull request as ready for review January 20, 2025 20:52
Copy link
Contributor

@dborovcanin dborovcanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide tests for this.

Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
re/service.go Outdated
@@ -34,10 +43,100 @@ const (
Monthly
)

func (rt ReccuringType) String() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move Schedule-related structs and methods to a new schedule.go file.

re/service.go Outdated
RecurringType ReccuringType `json:"recurring_type"`
RecurringPeriod uint `json:"recurring_period"` // 1 meaning every Recurring value, 2 meaning every other, and so on.
StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active
RecurringTime []time.Time `json:"recurring_time,omitempty"` // Specific times for the rule to run
Copy link
Contributor

@dborovcanin dborovcanin Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use only one time instead of slices here (to simplify scheduler).

re/service.go Outdated
StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active
RecurringTime []time.Time `json:"recurring_time,omitempty"` // Specific times for the rule to run
RecurringType ReccuringType `json:"recurring_type"` // None, Daily, Weekly, Monthly
RecurringPeriod uint `json:"recurring_period"` // 1 meaning every Recurring value, 2 meaning every other, and so on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the comment - 1 = every RecurringType, 2 = every other RecurringType...

re/service.go Outdated
RecurringPeriod uint `json:"recurring_period"` // 1 meaning every Recurring value, 2 meaning every other, and so on.
}

func (s Schedule) MarshalJSON() ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the DB, we store it as TIMESTAMP (not TIMESTAMPZ) because we don't want to take DBMS server timezone into account; we'll rather always work with UTC internally. That being said, we need to take care all the inputs are taken with timezone and converted to UTC.

re/service.go Outdated
@@ -15,6 +17,13 @@ import (
lua "github.com/yuin/gopher-lua"
)

const (
timeFormat = "2006-01-02T15:04"
timeZone = 3 * time.Hour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use timezone at all?

re/service.go Outdated
}

type re struct {
idp supermq.IDProvider
repo Repository
pubSub messaging.PubSub
errors chan error
ticker *time.Ticker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a ticker here, it's enclosed in StartScheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a ticker here, it's enclosed in StartScheduler.

This is needed here so that I can be able to mock the ticker

re/service.go Outdated
Comment on lines 386 to 391
func (re *re) newTicker(t time.Duration) *time.Ticker {
if re.ticker != nil {
return re.ticker
}
return time.NewTicker(t)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove. StartScheduler will be called only once anyway (no need to make it "singleton" in any way).

`

listRulesQuery = `
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status
output_topic, start_datetime, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably need to improve and make this query smarter (so we eliminate some rules already here), but we'll do it in the follow-up PRs.

Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
rule, err := s.AddRule(ctx, session, req.Rule)
if err != nil {
fmt.Printf("error is %+v\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

re/schedule.go Outdated

type Schedule struct {
StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active
RecurringTime time.Time `json:"recurring_time"` // Specific time for the rule to run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to Time.

re/schedule.go Outdated
type Schedule struct {
StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active
RecurringTime time.Time `json:"recurring_time"` // Specific time for the rule to run
RecurringType ReccuringType `json:"recurring_type"` // None, Daily, Weekly, Monthly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to Recurring.

re/schedule.go Outdated
StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active
RecurringTime time.Time `json:"recurring_time"` // Specific time for the rule to run
RecurringType ReccuringType `json:"recurring_type"` // None, Daily, Weekly, Monthly
RecurringPeriod uint `json:"recurring_period"` // Controls how many intervals to skip between executions: 1 runs at every interval, 2 runs at every second interval, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controls how many intervals to skip between executions: 1 = every Recurring value, 2 = every other Recurring value...

re/service.go Outdated
func (re *re) shouldRunRule(rule Rule, startTime time.Time) bool {
now := time.Now().Add(timeZone).Truncate(time.Minute)
now := time.Now().Truncate(time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can pass only Schedule func (re *re) shouldRun(s Schedule) bool, no need for the whole rule or start time here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has not been addressed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the method attached to the Rule, so we have rule.ShouldRun(time.Time).

re/service.go Outdated
Comment on lines 284 to 289
if t.Year() == now.Year() &&
t.Month() == now.Month() &&
t.Day() == now.Day() &&
t.Hour() == now.Hour() &&
t.Minute() == now.Minute() {
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you simplify via:

if s.StartDateTime.Equal(now) {
    return true
}

Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
re/service.go Outdated Show resolved Hide resolved
re/service.go Outdated
case Daily:
if s.RecurringPeriod > 0 {
daysSinceStart := startTime.Sub(s.StartDateTime).Hours() / 24
if int(daysSinceStart)%int(s.RecurringPeriod) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract period := int(s.RecurringPeriod) before switch.

re/service.go Outdated
}
case Monthly:
if s.RecurringPeriod > 0 {
monthsSinceStart := (startTime.Year()-s.StartDateTime.Year())*12 +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we multiplying by 12?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are converting from years to months

re/service.go Outdated
switch s.Recurring {
case Daily:
if s.RecurringPeriod > 0 {
daysSinceStart := startTime.Sub(s.StartDateTime).Hours() / 24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract 24 and 27 * 7 to constants.

Signed-off-by: nyagamunene <[email protected]>
Signed-off-by: nyagamunene <[email protected]>
cmd/re/main.go Outdated
Comment on lines 186 to 187
err = svc.StartScheduler(ctx)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return svc.StartScheduler(ctx)

re/service.go Outdated
func (re *re) shouldRunRule(rule Rule, startTime time.Time) bool {
now := time.Now().Add(timeZone).Truncate(time.Minute)
now := time.Now().Truncate(time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the method attached to the Rule, so we have rule.ShouldRun(time.Time).

Signed-off-by: nyagamunene <[email protected]>
re/service.go Outdated
}
}

func (r Rule) shouldRunRule(startTime time.Time) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to shouldRun.

Signed-off-by: nyagamunene <[email protected]>
@dborovcanin dborovcanin merged commit 1b6b7f5 into absmach:main Jan 24, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a global ticker for scheduled rules
2 participants