diff --git a/re/postgres/init.go b/re/postgres/init.go index 5fb5745a9..6bdf98d5a 100644 --- a/re/postgres/init.go +++ b/re/postgres/init.go @@ -17,23 +17,24 @@ func Migration() *migrate.MemoryMigrationSource { // STATUS 0 to imply enabled and 1 to imply disabled Up: []string{ `CREATE TABLE IF NOT EXISTS rules ( - id VARCHAR(36) PRIMARY KEY, - name VARCHAR(1024), - domain_id VARCHAR(36) NOT NULL, - metadata JSONB, - created_at TIMESTAMP, - updated_at TIMESTAMP, - updated_by VARCHAR(254), - input_channel VARCHAR(36), - input_topic TEXT, - output_channel VARCHAR(36), - output_topic TEXT, - status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), - logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), - logic_value BYTEA, - recurring_time TIMESTAMP[], - recurring_type SMALLINT, - recurring_period SMALLINT + id VARCHAR(36) PRIMARY KEY, + name VARCHAR(1024), + domain_id VARCHAR(36) NOT NULL, + metadata JSONB, + created_by VARCHAR(254), + created_at TIMESTAMP, + updated_at TIMESTAMP, + updated_by VARCHAR(254), + input_channel VARCHAR(36), + input_topic TEXT, + output_channel VARCHAR(36), + output_topic TEXT, + status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), + logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0), + logic_value BYTEA, + recurring_time TIMESTAMP[], + recurring_type SMALLINT, + recurring_period SMALLINT )`, }, Down: []string{ diff --git a/re/postgres/repository.go b/re/postgres/repository.go index bb105b7ff..706fddbf9 100644 --- a/re/postgres/repository.go +++ b/re/postgres/repository.go @@ -17,30 +17,30 @@ import ( // SQL Queries as Strings. const ( addRuleQuery = ` - INSERT INTO rules (id, name, domain_id, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status) - VALUES (:id, :name, :domain_id, :input_channel, :input_topic, :logic_type, :logic_value, + INSERT INTO rules (id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, + output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status) + VALUES (:id, :name, :domain_id, :metadata, :input_channel, :input_topic, :logic_type, :logic_value, :output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :created_at, :updated_at, :updated_by, :status) - RETURNING id, name, domain_id, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status; + RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, + output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status; ` viewRuleQuery = ` - SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel, - output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status + SELECT id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, output_channel, + output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status FROM rules WHERE id = $1; ` updateRuleQuery = ` UPDATE rules - SET name = :name, input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type, + SET name = :name, metadata = :metadata, input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type, logic_value = :logic_value, output_channel = :output_channel, output_topic = :output_topic, recurring_time = :recurring_time, recurring_type = :recurring_type, recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by, status = :status WHERE id = :id - RETURNING id, name, domain_id, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status; + RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, + output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status; ` removeRuleQuery = ` @@ -52,13 +52,13 @@ const ( UPDATE rules SET status = $2 WHERE id = $1 - RETURNING id, name, domain_id, input_channel, input_topic, logic_type, logic_value, - output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status; + RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, + output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status; ` listRulesQuery = ` SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel, - output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status + output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status FROM rules r %s %s; ` @@ -74,7 +74,10 @@ func NewRepository(db postgres.Database) re.Repository { } func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) { - dbr := ruleToDb(r) + dbr, err := ruleToDb(r) + if err != nil { + return re.Rule{}, err + } row, err := repo.DB.NamedQueryContext(ctx, addRuleQuery, dbr) if err != nil { return re.Rule{}, err @@ -88,7 +91,10 @@ func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule } } - rule := dbToRule(dbRule) + rule, err := dbToRule(dbRule) + if err != nil { + return re.Rule{}, err + } return rule, nil } @@ -102,7 +108,10 @@ func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rul if err := row.StructScan(&dbr); err != nil { return re.Rule{}, err } - ret := dbToRule(dbr) + ret, err := dbToRule(dbr) + if err != nil { + return re.Rule{}, err + } return ret, nil } @@ -118,13 +127,19 @@ func (repo *PostgresRepository) UpdateRuleStatus(ctx context.Context, id string, return re.Rule{}, err } - rule := dbToRule(dbr) + rule, err := dbToRule(dbr) + if err != nil { + return re.Rule{}, err + } return rule, nil } func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) { - dbr := ruleToDb(r) + dbr, err := ruleToDb(r) + if err != nil { + return re.Rule{}, err + } row, err := repo.DB.NamedQueryContext(ctx, updateRuleQuery, dbr) if err != nil { return re.Rule{}, err @@ -137,7 +152,10 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R return re.Rule{}, err } } - rule := dbToRule(dbRule) + rule, err := dbToRule(dbRule) + if err != nil { + return re.Rule{}, err + } return rule, nil } @@ -177,7 +195,11 @@ func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) ( if err := rows.StructScan(&r); err != nil { return re.Page{}, errors.Wrap(repoerr.ErrViewEntity, err) } - rules = append(rules, dbToRule(r)) + ret, err := dbToRule(r) + if err != nil { + return re.Page{}, err + } + rules = append(rules, ret) } cq := fmt.Sprintf(totalQuery, pq) diff --git a/re/postgres/rule.go b/re/postgres/rule.go index 1dc2e6165..c25f47273 100644 --- a/re/postgres/rule.go +++ b/re/postgres/rule.go @@ -5,9 +5,11 @@ package postgres import ( "database/sql" + "encoding/json" "time" "github.com/absmach/magistrala/re" + "github.com/absmach/supermq/pkg/errors" "github.com/jackc/pgx/v5/pgtype" ) @@ -16,6 +18,7 @@ type dbRule struct { ID string `db:"id"` Name string `db:"name"` DomainID string `db:"domain_id"` + Metadata []byte `db:"metadata,omitempty"` InputChannel string `db:"input_channel"` InputTopic sql.NullString `db:"input_topic"` LogicType re.ScriptType `db:"logic_type"` @@ -32,11 +35,20 @@ type dbRule struct { UpdatedBy string `db:"updated_by"` } -func ruleToDb(r re.Rule) dbRule { +func ruleToDb(r re.Rule) (dbRule, error) { + metadata := []byte("{}") + if len(r.Metadata) > 0 { + b, err := json.Marshal(r.Metadata) + if err != nil { + return dbRule{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + metadata = b + } return dbRule{ ID: r.ID, Name: r.Name, DomainID: r.DomainID, + Metadata: metadata, InputChannel: r.InputChannel, InputTopic: toNullString(r.InputTopic), LogicType: r.Logic.Type, @@ -51,14 +63,21 @@ func ruleToDb(r re.Rule) dbRule { CreatedBy: r.CreatedBy, UpdatedAt: r.UpdatedAt, UpdatedBy: r.UpdatedBy, - } + }, nil } -func dbToRule(dto dbRule) re.Rule { +func dbToRule(dto dbRule) (re.Rule, error) { + var metadata re.Metadata + if dto.Metadata != nil { + if err := json.Unmarshal(dto.Metadata, &metadata); err != nil { + return re.Rule{}, errors.Wrap(errors.ErrMalformedEntity, err) + } + } return re.Rule{ ID: dto.ID, Name: dto.Name, DomainID: dto.DomainID, + Metadata: metadata, InputChannel: dto.InputChannel, InputTopic: fromNullString(dto.InputTopic), Logic: re.Script{ @@ -77,7 +96,7 @@ func dbToRule(dto dbRule) re.Rule { CreatedBy: dto.CreatedBy, UpdatedAt: dto.UpdatedAt, UpdatedBy: dto.UpdatedBy, - } + }, nil } func toNullString(value string) sql.NullString { diff --git a/re/service.go b/re/service.go index e17064ad9..2277e25a7 100644 --- a/re/service.go +++ b/re/service.go @@ -15,12 +15,14 @@ import ( lua "github.com/yuin/gopher-lua" ) -type ScriptType uint - -type Script struct { - Type ScriptType `json:"type"` - Value string `json:"value"` -} +type ( + ScriptType uint + Metadata map[string]interface{} + Script struct { + Type ScriptType `json:"type"` + Value string `json:"value"` + } +) // Type can be daily, weekly or monthly. type ReccuringType uint @@ -42,6 +44,7 @@ type Rule struct { ID string `json:"id"` Name string `json:"name"` DomainID string `json:"domain"` + Metadata Metadata `json:"metadata,omitempty"` InputChannel string `json:"input_channel"` InputTopic string `json:"input_topic"` Logic Script `json:"logic"`