diff --git a/Makefile b/Makefile index 9a5c5b140..cdeab5bb5 100644 --- a/Makefile +++ b/Makefile @@ -66,7 +66,7 @@ define make_docker_dev -f docker/Dockerfile.dev ./build endef -ADDON_SERVICES = bootstrap journal provision certs timescale-reader timescale-writer postgres-reader postgres-writer +ADDON_SERVICES = bootstrap journal provision certs timescale-reader timescale-writer postgres-reader postgres-writer re EXTERNAL_SERVICES = vault prometheus diff --git a/cmd/re/main.go b/cmd/re/main.go index f2a687c93..c638498c7 100644 --- a/cmd/re/main.go +++ b/cmd/re/main.go @@ -181,6 +181,11 @@ func main() { go chc.CallHome(ctx) } + // Start scheduler + g.Go(func() error { + return svc.StartScheduler(ctx) + }) + // Start all servers g.Go(func() error { return httpSvc.Start() @@ -201,7 +206,7 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, auth idp := uuid.New() // csvc = authzmw.AuthorizationMiddleware(csvc, authz) - csvc := re.NewService(repo, idp, nil) + csvc := re.NewService(repo, idp, nil, re.NewTicker(time.Minute)) return csvc, nil } diff --git a/docker/.env b/docker/.env index 688dcb06d..9bc0f24f9 100644 --- a/docker/.env +++ b/docker/.env @@ -343,7 +343,7 @@ MG_RE_DB_HOST=re-db MG_RE_DB_PORT=5432 MG_RE_DB_USER=magistrala MG_RE_DB_PASS=magistrala -MG_RE_DB_NAME=rule_engine +MG_RE_DB_NAME=rules_engine MG_RE_DB_SSL_MODE=disable MG_RE_DB_SSL_CERT= MG_RE_DB_SSL_KEY= diff --git a/re/api/transport.go b/re/api/transport.go index a924e4bd3..17dcdf85c 100644 --- a/re/api/transport.go +++ b/re/api/transport.go @@ -101,7 +101,7 @@ func decodeAddRuleRequest(_ context.Context, r *http.Request) (interface{}, erro } var rule re.Rule if err := json.NewDecoder(r.Body).Decode(&rule); err != nil { - return nil, err + return nil, errors.Wrap(err, apiutil.ErrValidation) } return addRuleReq{Rule: rule}, nil } diff --git a/re/mocks/repo.go b/re/mocks/repo.go new file mode 100644 index 000000000..14f49542c --- /dev/null +++ b/re/mocks/repo.go @@ -0,0 +1,189 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + context "context" + + re "github.com/absmach/magistrala/re" + mock "github.com/stretchr/testify/mock" +) + +// Repository is an autogenerated mock type for the Repository type +type Repository struct { + mock.Mock +} + +// AddRule provides a mock function with given fields: ctx, r +func (_m *Repository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) { + ret := _m.Called(ctx, r) + + if len(ret) == 0 { + panic("no return value specified for AddRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, re.Rule) (re.Rule, error)); ok { + return rf(ctx, r) + } + if rf, ok := ret.Get(0).(func(context.Context, re.Rule) re.Rule); ok { + r0 = rf(ctx, r) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, re.Rule) error); ok { + r1 = rf(ctx, r) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ListRules provides a mock function with given fields: ctx, pm +func (_m *Repository) ListRules(ctx context.Context, pm re.PageMeta) (re.Page, error) { + ret := _m.Called(ctx, pm) + + if len(ret) == 0 { + panic("no return value specified for ListRules") + } + + var r0 re.Page + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, re.PageMeta) (re.Page, error)); ok { + return rf(ctx, pm) + } + if rf, ok := ret.Get(0).(func(context.Context, re.PageMeta) re.Page); ok { + r0 = rf(ctx, pm) + } else { + r0 = ret.Get(0).(re.Page) + } + + if rf, ok := ret.Get(1).(func(context.Context, re.PageMeta) error); ok { + r1 = rf(ctx, pm) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RemoveRule provides a mock function with given fields: ctx, id +func (_m *Repository) RemoveRule(ctx context.Context, id string) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for RemoveRule") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateRule provides a mock function with given fields: ctx, r +func (_m *Repository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) { + ret := _m.Called(ctx, r) + + if len(ret) == 0 { + panic("no return value specified for UpdateRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, re.Rule) (re.Rule, error)); ok { + return rf(ctx, r) + } + if rf, ok := ret.Get(0).(func(context.Context, re.Rule) re.Rule); ok { + r0 = rf(ctx, r) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, re.Rule) error); ok { + r1 = rf(ctx, r) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpdateRuleStatus provides a mock function with given fields: ctx, id, status +func (_m *Repository) UpdateRuleStatus(ctx context.Context, id string, status re.Status) (re.Rule, error) { + ret := _m.Called(ctx, id, status) + + if len(ret) == 0 { + panic("no return value specified for UpdateRuleStatus") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, re.Status) (re.Rule, error)); ok { + return rf(ctx, id, status) + } + if rf, ok := ret.Get(0).(func(context.Context, string, re.Status) re.Rule); ok { + r0 = rf(ctx, id, status) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, re.Status) error); ok { + r1 = rf(ctx, id, status) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ViewRule provides a mock function with given fields: ctx, id +func (_m *Repository) ViewRule(ctx context.Context, id string) (re.Rule, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for ViewRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (re.Rule, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) re.Rule); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewRepository creates a new instance of Repository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewRepository(t interface { + mock.TestingT + Cleanup(func()) +}) *Repository { + mock := &Repository{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/re/mocks/service.go b/re/mocks/service.go new file mode 100644 index 000000000..02f9a34c1 --- /dev/null +++ b/re/mocks/service.go @@ -0,0 +1,263 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + context "context" + + authn "github.com/absmach/supermq/pkg/authn" + + mock "github.com/stretchr/testify/mock" + + re "github.com/absmach/magistrala/re" +) + +// Service is an autogenerated mock type for the Service type +type Service struct { + mock.Mock +} + +// AddRule provides a mock function with given fields: ctx, session, r +func (_m *Service) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { + ret := _m.Called(ctx, session, r) + + if len(ret) == 0 { + panic("no return value specified for AddRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok { + return rf(ctx, session, r) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok { + r0 = rf(ctx, session, r) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok { + r1 = rf(ctx, session, r) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ConsumeAsync provides a mock function with given fields: ctx, messages +func (_m *Service) ConsumeAsync(ctx context.Context, messages interface{}) { + _m.Called(ctx, messages) +} + +// DisableRule provides a mock function with given fields: ctx, session, id +func (_m *Service) DisableRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) { + ret := _m.Called(ctx, session, id) + + if len(ret) == 0 { + panic("no return value specified for DisableRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (re.Rule, error)); ok { + return rf(ctx, session, id) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) re.Rule); ok { + r0 = rf(ctx, session, id) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok { + r1 = rf(ctx, session, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EnableRule provides a mock function with given fields: ctx, session, id +func (_m *Service) EnableRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) { + ret := _m.Called(ctx, session, id) + + if len(ret) == 0 { + panic("no return value specified for EnableRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (re.Rule, error)); ok { + return rf(ctx, session, id) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) re.Rule); ok { + r0 = rf(ctx, session, id) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok { + r1 = rf(ctx, session, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Errors provides a mock function with given fields: +func (_m *Service) Errors() <-chan error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Errors") + } + + var r0 <-chan error + if rf, ok := ret.Get(0).(func() <-chan error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan error) + } + } + + return r0 +} + +// ListRules provides a mock function with given fields: ctx, session, pm +func (_m *Service) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (re.Page, error) { + ret := _m.Called(ctx, session, pm) + + if len(ret) == 0 { + panic("no return value specified for ListRules") + } + + var r0 re.Page + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.PageMeta) (re.Page, error)); ok { + return rf(ctx, session, pm) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.PageMeta) re.Page); ok { + r0 = rf(ctx, session, pm) + } else { + r0 = ret.Get(0).(re.Page) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, re.PageMeta) error); ok { + r1 = rf(ctx, session, pm) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RemoveRule provides a mock function with given fields: ctx, session, id +func (_m *Service) RemoveRule(ctx context.Context, session authn.Session, id string) error { + ret := _m.Called(ctx, session, id) + + if len(ret) == 0 { + panic("no return value specified for RemoveRule") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) error); ok { + r0 = rf(ctx, session, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StartScheduler provides a mock function with given fields: ctx +func (_m *Service) StartScheduler(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for StartScheduler") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateRule provides a mock function with given fields: ctx, session, r +func (_m *Service) UpdateRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { + ret := _m.Called(ctx, session, r) + + if len(ret) == 0 { + panic("no return value specified for UpdateRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok { + return rf(ctx, session, r) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok { + r0 = rf(ctx, session, r) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok { + r1 = rf(ctx, session, r) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ViewRule provides a mock function with given fields: ctx, session, id +func (_m *Service) ViewRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) { + ret := _m.Called(ctx, session, id) + + if len(ret) == 0 { + panic("no return value specified for ViewRule") + } + + var r0 re.Rule + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (re.Rule, error)); ok { + return rf(ctx, session, id) + } + if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) re.Rule); ok { + r0 = rf(ctx, session, id) + } else { + r0 = ret.Get(0).(re.Rule) + } + + if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok { + r1 = rf(ctx, session, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewService creates a new instance of Service. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewService(t interface { + mock.TestingT + Cleanup(func()) +}) *Service { + mock := &Service{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/re/mocks/ticker.go b/re/mocks/ticker.go new file mode 100644 index 000000000..1707c11c0 --- /dev/null +++ b/re/mocks/ticker.go @@ -0,0 +1,55 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +// Copyright (c) Abstract Machines + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// Ticker is an autogenerated mock type for the Ticker type +type Ticker struct { + mock.Mock +} + +// Stop provides a mock function with given fields: +func (_m *Ticker) Stop() { + _m.Called() +} + +// Tick provides a mock function with given fields: +func (_m *Ticker) Tick() <-chan time.Time { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Tick") + } + + var r0 <-chan time.Time + if rf, ok := ret.Get(0).(func() <-chan time.Time); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan time.Time) + } + } + + return r0 +} + +// NewTicker creates a new instance of Ticker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewTicker(t interface { + mock.TestingT + Cleanup(func()) +}) *Ticker { + mock := &Ticker{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/re/postgres/init.go b/re/postgres/init.go index 6bdf98d5a..902353188 100644 --- a/re/postgres/init.go +++ b/re/postgres/init.go @@ -32,9 +32,10 @@ func Migration() *migrate.MemoryMigrationSource { status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), logic_value BYTEA, - recurring_time TIMESTAMP[], - recurring_type SMALLINT, - recurring_period SMALLINT + time TIMESTAMP, + recurring SMALLINT, + recurring_period SMALLINT, + start_datetime TIMESTAMP )`, }, Down: []string{ diff --git a/re/postgres/repository.go b/re/postgres/repository.go index f3752a06a..761eb171b 100644 --- a/re/postgres/repository.go +++ b/re/postgres/repository.go @@ -18,16 +18,16 @@ import ( const ( addRuleQuery = ` INSERT INTO rules (id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status) + output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status) VALUES (:id, :name, :domain_id, :metadata, :input_channel, :input_topic, :logic_type, :logic_value, - :output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status) + :output_channel, :output_topic, :start_datetime, :time, :recurring, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status) RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status; + output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status; ` viewRuleQuery = ` SELECT id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, output_channel, - output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status + output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status FROM rules WHERE id = $1; ` @@ -36,11 +36,11 @@ const ( UPDATE rules SET name = :name, metadata = :metadata, input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type, logic_value = :logic_value, output_channel = :output_channel, output_topic = :output_topic, - recurring_time = :recurring_time, recurring_type = :recurring_type, + start_datetime = :start_datetime, time = :time, recurring = :recurring, recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by, status = :status WHERE id = :id RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status; + output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status; ` removeRuleQuery = ` @@ -53,12 +53,12 @@ const ( SET status = $2 WHERE id = $1 RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status; + output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status; ` listRulesQuery = ` SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel, - output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status + output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status FROM rules r %s %s; ` diff --git a/re/postgres/rule.go b/re/postgres/rule.go index c25f47273..07404b353 100644 --- a/re/postgres/rule.go +++ b/re/postgres/rule.go @@ -10,29 +10,29 @@ import ( "github.com/absmach/magistrala/re" "github.com/absmach/supermq/pkg/errors" - "github.com/jackc/pgx/v5/pgtype" ) // dbRule represents the database structure for a Rule. type dbRule struct { - ID string `db:"id"` - Name string `db:"name"` - DomainID string `db:"domain_id"` - Metadata []byte `db:"metadata,omitempty"` - InputChannel string `db:"input_channel"` - InputTopic sql.NullString `db:"input_topic"` - LogicType re.ScriptType `db:"logic_type"` - LogicValue string `db:"logic_value"` - OutputChannel sql.NullString `db:"output_channel"` - OutputTopic sql.NullString `db:"output_topic"` - RecurringTime *pgtype.Array[string] `db:"recurring_time"` - RecurringType re.ReccuringType `db:"recurring_type"` - RecurringPeriod uint `db:"recurring_period"` - Status re.Status `db:"status"` - CreatedAt time.Time `db:"created_at"` - CreatedBy string `db:"created_by"` - UpdatedAt time.Time `db:"updated_at"` - UpdatedBy string `db:"updated_by"` + ID string `db:"id"` + Name string `db:"name"` + DomainID string `db:"domain_id"` + Metadata []byte `db:"metadata,omitempty"` + InputChannel string `db:"input_channel"` + InputTopic sql.NullString `db:"input_topic"` + LogicType re.ScriptType `db:"logic_type"` + LogicValue string `db:"logic_value"` + OutputChannel sql.NullString `db:"output_channel"` + OutputTopic sql.NullString `db:"output_topic"` + StartDateTime time.Time `db:"start_datetime"` + Time time.Time `db:"time"` + Recurring re.Recurring `db:"recurring"` + RecurringPeriod uint `db:"recurring_period"` + Status re.Status `db:"status"` + CreatedAt time.Time `db:"created_at"` + CreatedBy string `db:"created_by"` + UpdatedAt time.Time `db:"updated_at"` + UpdatedBy string `db:"updated_by"` } func ruleToDb(r re.Rule) (dbRule, error) { @@ -55,8 +55,9 @@ func ruleToDb(r re.Rule) (dbRule, error) { LogicValue: r.Logic.Value, OutputChannel: toNullString(r.OutputChannel), OutputTopic: toNullString(r.OutputTopic), - RecurringTime: toStringArray(r.Schedule.Time), - RecurringType: r.Schedule.RecurringType, + StartDateTime: r.Schedule.StartDateTime, + Time: r.Schedule.Time, + Recurring: r.Schedule.Recurring, RecurringPeriod: r.Schedule.RecurringPeriod, Status: r.Status, CreatedAt: r.CreatedAt, @@ -87,8 +88,9 @@ func dbToRule(dto dbRule) (re.Rule, error) { OutputChannel: fromNullString(dto.OutputChannel), OutputTopic: fromNullString(dto.OutputTopic), Schedule: re.Schedule{ - Time: toTimeSlice(dto.RecurringTime), - RecurringType: dto.RecurringType, + StartDateTime: dto.StartDateTime, + Time: dto.Time, + Recurring: dto.Recurring, RecurringPeriod: dto.RecurringPeriod, }, Status: re.Status(dto.Status), @@ -112,29 +114,3 @@ func fromNullString(nullString sql.NullString) string { } return nullString.String } - -func toStringArray(times []time.Time) *pgtype.Array[string] { - var strArray []string - for _, t := range times { - strArray = append(strArray, t.Format(time.RFC3339)) - } - ret := pgtype.Array[string]{ - Elements: strArray, - Valid: true, - } - return &ret -} - -func toTimeSlice(strArray *pgtype.Array[string]) []time.Time { - if strArray == nil || !strArray.Valid { - return []time.Time{} - } - var times []time.Time - for _, s := range strArray.Elements { - t, err := time.Parse(time.RFC3339, s) - if err == nil { - times = append(times, t) - } - } - return times -} diff --git a/re/schedule.go b/re/schedule.go new file mode 100644 index 000000000..b1cd1246f --- /dev/null +++ b/re/schedule.go @@ -0,0 +1,109 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package re + +import ( + "encoding/json" + "time" +) + +type Schedule struct { + StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active + Time time.Time `json:"time"` // Specific time for the rule to run + Recurring Recurring `json:"recurring"` // None, Daily, Weekly, Monthly + RecurringPeriod uint `json:"recurring_period"` // Controls how many intervals to skip between executions: 1 = every interval, 2 = every second interval, etc. +} + +func (s Schedule) MarshalJSON() ([]byte, error) { + type Alias Schedule + jTimes := struct { + StartDateTime string `json:"start_datetime"` + Time string `json:"time"` + *Alias + }{ + StartDateTime: s.StartDateTime.Format(timeFormat), + Time: s.Time.Format(timeFormat), + Alias: (*Alias)(&s), + } + return json.Marshal(jTimes) +} + +func (s *Schedule) UnmarshalJSON(data []byte) error { + type Alias Schedule + aux := struct { + StartDateTime string `json:"start_datetime"` + Time string `json:"time"` + *Alias + }{ + Alias: (*Alias)(s), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + if aux.StartDateTime != "" { + startDateTime, err := time.Parse(timeFormat, aux.StartDateTime) + if err != nil { + return err + } + s.StartDateTime = startDateTime + } + + if aux.Time != "" { + time, err := time.Parse(timeFormat, aux.Time) + if err != nil { + return err + } + s.Time = time + } + return nil +} + +// Type can be daily, weekly or monthly. +type Recurring uint + +const ( + None Recurring = iota + Daily + Weekly + Monthly +) + +func (rt Recurring) String() string { + switch rt { + case Daily: + return "daily" + case Weekly: + return "weekly" + case Monthly: + return "monthly" + default: + return "none" + } +} + +func (rt Recurring) MarshalJSON() ([]byte, error) { + return json.Marshal(rt.String()) +} + +func (rt *Recurring) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + switch s { + case "daily": + *rt = Daily + case "weekly": + *rt = Weekly + case "monthly": + *rt = Monthly + case "none": + *rt = None + default: + return ErrInvalidRecurringType + } + return nil +} diff --git a/re/service.go b/re/service.go index b53ccbd78..5c29900ab 100644 --- a/re/service.go +++ b/re/service.go @@ -5,6 +5,7 @@ package re import ( "context" + "errors" "time" "github.com/absmach/supermq" @@ -15,6 +16,15 @@ import ( lua "github.com/yuin/gopher-lua" ) +const ( + timeFormat = "2006-01-02T15:04" + hoursInDay = 24 + daysInWeek = 7 + monthsInYear = 12 +) + +var ErrInvalidRecurringType = errors.New("invalid recurring type") + type ( ScriptType uint Metadata map[string]interface{} @@ -24,22 +34,6 @@ type ( } ) -// Type can be daily, weekly or monthly. -type ReccuringType uint - -const ( - None ReccuringType = iota - Daily - Weekly - Monthly -) - -type Schedule struct { - Time []time.Time `json:"date,omitempty"` - RecurringType ReccuringType `json:"recurring_type"` - RecurringPeriod uint `json:"recurring_period"` // 1 meaning every Recurring value, 2 meaning every other, and so on. -} - type Rule struct { ID string `json:"id"` Name string `json:"name"` @@ -58,6 +52,7 @@ type Rule struct { UpdatedBy string `json:"updated_by,omitempty"` } +//go:generate mockery --name Repository --output=./mocks --filename repo.go --quiet --note "Copyright (c) Abstract Machines" type Repository interface { AddRule(ctx context.Context, r Rule) (Rule, error) ViewRule(ctx context.Context, id string) (Rule, error) @@ -69,15 +64,18 @@ type Repository interface { // PageMeta contains page metadata that helps navigation. type PageMeta struct { - Total uint64 `json:"total" db:"total"` - Offset uint64 `json:"offset" db:"offset"` - Limit uint64 `json:"limit" db:"limit"` - Dir string `json:"dir" db:"dir"` - Name string `json:"name" db:"name"` - InputChannel string `json:"input_channel,omitempty" db:"input_channel"` - OutputChannel string `json:"output_channel,omitempty" db:"output_channel"` - Status Status `json:"status,omitempty" db:"status"` - Domain string `json:"domain_id,omitempty" db:"domain_id"` + Total uint64 `json:"total" db:"total"` + Offset uint64 `json:"offset" db:"offset"` + Limit uint64 `json:"limit" db:"limit"` + Dir string `json:"dir" db:"dir"` + Name string `json:"name" db:"name"` + InputChannel string `json:"input_channel,omitempty" db:"input_channel"` + OutputChannel string `json:"output_channel,omitempty" db:"output_channel"` + Status Status `json:"status,omitempty" db:"status"` + Domain string `json:"domain_id,omitempty" db:"domain_id"` + ScheduledBefore *time.Time `json:"scheduled_before,omitempty" db:"scheduled_before"` // Filter rules scheduled before this time + ScheduledAfter *time.Time `json:"scheduled_after,omitempty" db:"scheduled_after"` // Filter rules scheduled after this time + Recurring *Recurring `json:"recurring,omitempty" db:"recurring"` // Filter by recurring type } type Page struct { @@ -85,6 +83,7 @@ type Page struct { Rules []Rule `json:"rules"` } +//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines" type Service interface { consumers.AsyncConsumer AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) @@ -94,6 +93,7 @@ type Service interface { RemoveRule(ctx context.Context, session authn.Session, id string) error EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error) DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error) + StartScheduler(ctx context.Context) error } type re struct { @@ -101,14 +101,16 @@ type re struct { repo Repository pubSub messaging.PubSub errors chan error + ticker Ticker } -func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub) Service { +func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub, tck Ticker) Service { return &re{ repo: repo, idp: idp, pubSub: pubSub, errors: make(chan error), + ticker: tck, } } @@ -117,12 +119,23 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, if err != nil { return Rule{}, err } - r.CreatedAt = time.Now() + now := time.Now() + r.CreatedAt = now r.ID = id r.CreatedBy = session.UserID r.DomainID = session.DomainID r.Status = EnabledStatus - return re.repo.AddRule(ctx, r) + + if r.Schedule.StartDateTime.IsZero() { + r.Schedule.StartDateTime = now + } + + rule, err := re.repo.AddRule(ctx, r) + if err != nil { + return Rule{}, err + } + + return rule, nil } func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error) { @@ -227,3 +240,84 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error return re.pubSub.Publish(ctx, m.Channel, m) } } + +func (re *re) StartScheduler(ctx context.Context) error { + defer re.ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-re.ticker.Tick(): + startTime := time.Now() + + pm := PageMeta{ + Status: EnabledStatus, + ScheduledBefore: &startTime, + } + + page, err := re.repo.ListRules(ctx, pm) + if err != nil { + return err + } + + for _, rule := range page.Rules { + if rule.shouldRun(startTime) { + go func(r Rule) { + msg := &messaging.Message{ + Channel: r.InputChannel, + Created: startTime.Unix(), + } + re.errors <- re.process(ctx, r, msg) + }(rule) + } + } + } + } +} + +func (r Rule) shouldRun(startTime time.Time) bool { + // Don't run if the rule's start time is in the future + // This allows scheduling rules to start at a specific future time + if r.Schedule.StartDateTime.After(startTime) { + return false + } + + t := r.Schedule.Time.Truncate(time.Minute) + if t.Equal(startTime) { + return true + } + + if r.Schedule.RecurringPeriod == 0 { + return false + } + + period := int(r.Schedule.RecurringPeriod) + + switch r.Schedule.Recurring { + case Daily: + if r.Schedule.RecurringPeriod > 0 { + daysSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / hoursInDay + if int(daysSinceStart)%period == 0 { + return true + } + } + case Weekly: + if r.Schedule.RecurringPeriod > 0 { + weeksSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / (hoursInDay * daysInWeek) + if int(weeksSinceStart)%period == 0 { + return true + } + } + case Monthly: + if r.Schedule.RecurringPeriod > 0 { + monthsSinceStart := (startTime.Year()-r.Schedule.StartDateTime.Year())*monthsInYear + + int(startTime.Month()-r.Schedule.StartDateTime.Month()) + if monthsSinceStart%period == 0 { + return true + } + } + } + + return false +} diff --git a/re/service_test.go b/re/service_test.go new file mode 100644 index 000000000..97756b21b --- /dev/null +++ b/re/service_test.go @@ -0,0 +1,187 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package re_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/0x6flab/namegenerator" + "github.com/absmach/magistrala/internal/testsutil" + "github.com/absmach/magistrala/pkg/errors" + "github.com/absmach/magistrala/re" + "github.com/absmach/magistrala/re/mocks" + repoerr "github.com/absmach/supermq/pkg/errors/repository" + pubsubmocks "github.com/absmach/supermq/pkg/messaging/mocks" + "github.com/absmach/supermq/pkg/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var ( + namegen = namegenerator.NewGenerator() + rule = re.Rule{ + ID: testsutil.GenerateUUID(&testing.T{}), + Name: namegen.Generate(), + InputChannel: "test.channel", + Status: re.EnabledStatus, + Schedule: re.Schedule{ + StartDateTime: time.Now().Add(-time.Hour), // Started an hour ago + Recurring: re.Daily, + RecurringPeriod: 1, + Time: time.Now().Add(-time.Hour), + }, + } + futureRule = re.Rule{ + ID: testsutil.GenerateUUID(&testing.T{}), + Name: namegen.Generate(), + InputChannel: "test.channel", + Status: re.EnabledStatus, + Schedule: re.Schedule{ + StartDateTime: time.Now().Add(24 * time.Hour), + Recurring: re.None, + }, + } +) + +func newService(t *testing.T) (re.Service, *mocks.Repository, *mocks.Ticker) { + repo := new(mocks.Repository) + mockTicker := new(mocks.Ticker) + idProvider := uuid.NewMock() + pubsub := pubsubmocks.NewPubSub(t) + return re.NewService(repo, idProvider, pubsub, mockTicker), repo, mockTicker +} + +func TestStartScheduler(t *testing.T) { + now := time.Now().Truncate(time.Minute) + svc, repo, ticker := newService(t) + + cases := []struct { + desc string + err error + pageMeta re.PageMeta + page re.Page + listErr error + setupCtx func() (context.Context, context.CancelFunc) + }{ + { + desc: "start scheduler with canceled context", + err: context.Canceled, + pageMeta: re.PageMeta{ + Status: re.EnabledStatus, + ScheduledBefore: &now, + }, + setupCtx: func() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + }, + { + desc: "start scheduler with timeout", + err: context.DeadlineExceeded, + pageMeta: re.PageMeta{ + Status: re.EnabledStatus, + ScheduledBefore: &now, + }, + setupCtx: func() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), time.Millisecond) + }, + }, + { + desc: "start scheduler with deadline exceeded", + err: context.DeadlineExceeded, + pageMeta: re.PageMeta{ + Status: re.EnabledStatus, + ScheduledBefore: &now, + }, + page: re.Page{}, + setupCtx: func() (context.Context, context.CancelFunc) { + return context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) + }, + }, + { + desc: "start scheduler successfully processes rules", + err: context.Canceled, + pageMeta: re.PageMeta{ + Status: re.EnabledStatus, + ScheduledBefore: &now, + }, + page: re.Page{ + Rules: []re.Rule{rule}, + }, + setupCtx: func() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + }, + { + desc: "start scheduler with list error", + err: repoerr.ErrViewEntity, + pageMeta: re.PageMeta{ + Status: re.EnabledStatus, + ScheduledBefore: &now, + }, + page: re.Page{}, + listErr: repoerr.ErrViewEntity, + setupCtx: func() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + }, + { + desc: "start scheduler with rule to be run in the future", + err: context.Canceled, + pageMeta: re.PageMeta{ + Status: re.EnabledStatus, + ScheduledBefore: &now, + }, + page: re.Page{ + Rules: []re.Rule{futureRule}, + }, + setupCtx: func() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + repoCall := repo.On("ListRules", mock.Anything, mock.Anything).Return(tc.page, tc.listErr) + tickChan := make(chan time.Time) + tickCall := ticker.On("Tick").Return((<-chan time.Time)(tickChan)) + tickCall1 := ticker.On("Stop").Return() + ctx, cancel := tc.setupCtx() + defer cancel() + errc := make(chan error) + + go func() { + errc <- svc.StartScheduler(ctx) + }() + + switch tc.desc { + case "start scheduler with canceled context": + cancel() + case "start scheduler successfully processes rules": + tickChan <- time.Now() + time.Sleep(100 * time.Millisecond) + cancel() + case "start scheduler with rule to be run in the future": + tickChan <- time.Now() + time.Sleep(100 * time.Millisecond) + cancel() + case "start scheduler with list error": + tickChan <- time.Now() + time.Sleep(100 * time.Millisecond) + if err := svc.Errors(); err != nil { + cancel() + } + } + + err := <-errc + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("expected error %v but got %v", tc.err, err)) + repoCall.Unset() + tickCall.Unset() + tickCall1.Unset() + }) + } +} diff --git a/re/ticker.go b/re/ticker.go new file mode 100644 index 000000000..e75d1c97a --- /dev/null +++ b/re/ticker.go @@ -0,0 +1,24 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package re + +import "time" + +//go:generate mockery --name Ticker --output=./mocks --filename ticker.go --quiet --note "Copyright (c) Abstract Machines" +type Ticker interface { + Tick() <-chan time.Time + Stop() +} + +type timeTicker struct { + *time.Ticker +} + +func NewTicker(d time.Duration) Ticker { + return &timeTicker{time.NewTicker(d)} +} + +func (t *timeTicker) Tick() <-chan time.Time { + return t.C +}