diff --git a/e2e/init.go b/e2e/init.go index 63cb8d79..ff2a2840 100644 --- a/e2e/init.go +++ b/e2e/init.go @@ -41,16 +41,17 @@ import ( func apiNodeInit(chDSN, dbFile, chainEndpoint, taskManagerContractAddr, ioidContractAddr string) (*apinode.APINode, string, error) { cfg := apinodeconfig.Config{ - LogLevel: slog.LevelInfo, - ServiceEndpoint: ":9000", - SequencerServiceEndpoint: "localhost:9001", - ProverServiceEndpoint: "localhost:9002", - DatabaseDSN: chDSN, - PrvKey: "", - ChainEndpoint: chainEndpoint, - BeginningBlockNumber: 0, - TaskManagerContractAddr: taskManagerContractAddr, - IoIDContractAddr: ioidContractAddr, + LogLevel: slog.LevelInfo, + ServiceEndpoint: ":9000", + SequencerServiceEndpoint: "localhost:9001", + TaskAggregatorIntervalSecond: 1, + ProverServiceEndpoint: "localhost:9002", + DatabaseDSN: chDSN, + PrvKey: "", + ChainEndpoint: chainEndpoint, + BeginningBlockNumber: 0, + TaskManagerContractAddr: taskManagerContractAddr, + IoIDContractAddr: ioidContractAddr, } db, err := apinodedb.New(dbFile, chDSN) diff --git a/service/apinode/aggregator/aggregator.go b/service/apinode/aggregator/aggregator.go new file mode 100644 index 00000000..343dafca --- /dev/null +++ b/service/apinode/aggregator/aggregator.go @@ -0,0 +1,73 @@ +package aggregator + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/iotexproject/w3bstream/service/apinode/db" + "github.com/iotexproject/w3bstream/service/sequencer/api" + "github.com/pkg/errors" +) + +func Run(db *db.DB, sequencerAddr string, interval time.Duration) { + ticker := time.NewTicker(interval) + for range ticker.C { + ts, err := db.FetchAllTask() + if err != nil { + slog.Error("failed to fetch all tasks", "error", err) + continue + } + if len(ts) == 0 { + continue + } + if err := db.CreateTasks(ts); err != nil { + slog.Error("failed to create tasks", "error", err) + continue + } + time.Sleep(1 * time.Second) // after writing to clickhouse, reading immediately will not return the value. + + tids := make([]string, 0, len(ts)) + pt := map[string]string{} + for _, t := range ts { + tids = append(tids, t.TaskID) + pt[t.ProjectID] = t.TaskID + } + for _, t := range pt { + if err := notify(sequencerAddr, common.HexToHash(t)); err != nil { + slog.Error("failed to notify sequencer", "error", err) + continue + } + } + if err := db.DeleteTasks(tids); err != nil { + slog.Error("failed to delete tasks at local", "error", err) + } + } +} + +func notify(sequencerAddr string, taskID common.Hash) error { + reqSequencer := &api.CreateTaskReq{TaskID: taskID} + reqSequencerJ, err := json.Marshal(reqSequencer) + if err != nil { + return errors.Wrap(err, "failed to marshal sequencer request") + } + resp, err := http.Post(fmt.Sprintf("http://%s/task", sequencerAddr), "application/json", bytes.NewBuffer(reqSequencerJ)) + if err != nil { + return errors.Wrap(err, "failed to call sequencer service") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, err := io.ReadAll(resp.Body) + if err == nil { + err = errors.New(string(body)) + } + return errors.Wrap(err, "failed to call sequencer service") + } + return nil +} diff --git a/service/apinode/api/http.go b/service/apinode/api/http.go index 1b998d42..c264e900 100644 --- a/service/apinode/api/http.go +++ b/service/apinode/api/http.go @@ -24,7 +24,6 @@ import ( "github.com/iotexproject/w3bstream/project" "github.com/iotexproject/w3bstream/service/apinode/db" proverapi "github.com/iotexproject/w3bstream/service/prover/api" - sequencerapi "github.com/iotexproject/w3bstream/service/sequencer/api" ) type errResp struct { @@ -151,31 +150,6 @@ func (s *httpServer) createTask(c *gin.Context) { return } - reqSequencer := &sequencerapi.CreateTaskReq{TaskID: taskID} - reqSequencerJ, err := json.Marshal(reqSequencer) - if err != nil { - slog.Error("failed to marshal sequencer request", "error", err) - c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to marshal sequencer request"))) - return - } - resp, err := http.Post(fmt.Sprintf("http://%s/task", s.sequencerAddr), "application/json", bytes.NewBuffer(reqSequencerJ)) - if err != nil { - slog.Error("failed to call sequencer service", "error", err) - c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to call sequencer service"))) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, err := io.ReadAll(resp.Body) - if err == nil { - err = errors.New(string(body)) - } - slog.Error("failed to call sequencer service", "error", err) - c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to call sequencer service"))) - return - } - slog.Info("successfully processed message", "task_id", taskID.String()) c.JSON(http.StatusOK, &CreateTaskResp{ TaskID: taskID.String(), diff --git a/service/apinode/apinode.go b/service/apinode/apinode.go index 3c4c1323..9328b749 100644 --- a/service/apinode/apinode.go +++ b/service/apinode/apinode.go @@ -2,9 +2,11 @@ package apinode import ( "log" + "time" "github.com/ethereum/go-ethereum/common" "github.com/iotexproject/w3bstream/monitor" + "github.com/iotexproject/w3bstream/service/apinode/aggregator" "github.com/iotexproject/w3bstream/service/apinode/api" "github.com/iotexproject/w3bstream/service/apinode/config" "github.com/iotexproject/w3bstream/service/apinode/db" @@ -42,6 +44,8 @@ func (n *APINode) Start() error { return errors.Wrap(err, "failed to run contract monitor") } + go aggregator.Run(n.db, n.cfg.SequencerServiceEndpoint, time.Duration(n.cfg.TaskAggregatorIntervalSecond)*time.Second) + go func() { if err := api.Run(n.db, n.cfg.ServiceEndpoint, n.cfg.SequencerServiceEndpoint, n.cfg.ProverServiceEndpoint); err != nil { log.Fatal(err) diff --git a/service/apinode/config/config.go b/service/apinode/config/config.go index 71ea7238..331e03f8 100644 --- a/service/apinode/config/config.go +++ b/service/apinode/config/config.go @@ -8,32 +8,34 @@ import ( ) type Config struct { - LogLevel slog.Level `env:"LOG_LEVEL,optional"` - ServiceEndpoint string `env:"HTTP_SERVICE_ENDPOINT"` - SequencerServiceEndpoint string `env:"SEQUENCER_SERVICE_ENDPOINT"` - ProverServiceEndpoint string `env:"PROVER_SERVICE_ENDPOINT"` - DatabaseDSN string `env:"DATABASE_DSN"` - PrvKey string `env:"PRIVATE_KEY,optional"` - ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"` - BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"` - TaskManagerContractAddr string `env:"TASK_MANAGER_CONTRACT_ADDRESS,optional"` - IoIDContractAddr string `env:"IOID_CONTRACT_ADDRESS,optional"` - LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"` - env string `env:"-"` + LogLevel slog.Level `env:"LOG_LEVEL,optional"` + ServiceEndpoint string `env:"HTTP_SERVICE_ENDPOINT"` + SequencerServiceEndpoint string `env:"SEQUENCER_SERVICE_ENDPOINT"` + TaskAggregatorIntervalSecond uint64 `env:"TASK_AGGREGATOR_INTERVAL_SECOND,optional"` + ProverServiceEndpoint string `env:"PROVER_SERVICE_ENDPOINT"` + DatabaseDSN string `env:"DATABASE_DSN"` + PrvKey string `env:"PRIVATE_KEY,optional"` + ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"` + BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"` + TaskManagerContractAddr string `env:"TASK_MANAGER_CONTRACT_ADDRESS,optional"` + IoIDContractAddr string `env:"IOID_CONTRACT_ADDRESS,optional"` + LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"` + env string `env:"-"` } var defaultTestnetConfig = &Config{ - LogLevel: slog.LevelInfo, - ServiceEndpoint: ":9000", - SequencerServiceEndpoint: "localhost:9001", - ProverServiceEndpoint: "localhost:9002", - PrvKey: "dbfe03b0406549232b8dccc04be8224fcc0afa300a33d4f335dcfdfead861c85", - ChainEndpoint: "https://babel-api.testnet.iotex.io", - BeginningBlockNumber: 28685000, - TaskManagerContractAddr: "0xF0714400a4C0C72007A9F910C5E3007614958636", - IoIDContractAddr: "0x45Ce3E6f526e597628c73B731a3e9Af7Fc32f5b7", - LocalDBDir: "./local_db", - env: "TESTNET", + LogLevel: slog.LevelInfo, + ServiceEndpoint: ":9000", + SequencerServiceEndpoint: "localhost:9001", + TaskAggregatorIntervalSecond: 1, + ProverServiceEndpoint: "localhost:9002", + PrvKey: "dbfe03b0406549232b8dccc04be8224fcc0afa300a33d4f335dcfdfead861c85", + ChainEndpoint: "https://babel-api.testnet.iotex.io", + BeginningBlockNumber: 28685000, + TaskManagerContractAddr: "0xF0714400a4C0C72007A9F910C5E3007614958636", + IoIDContractAddr: "0x45Ce3E6f526e597628c73B731a3e9Af7Fc32f5b7", + LocalDBDir: "./local_db", + env: "TESTNET", } func (c *Config) init() error { diff --git a/service/apinode/db/clickhouse.go b/service/apinode/db/clickhouse.go index f84568d7..f2ad4421 100644 --- a/service/apinode/db/clickhouse.go +++ b/service/apinode/db/clickhouse.go @@ -12,28 +12,30 @@ import ( ) type Task struct { - TaskID string `ch:"task_id"` - DeviceID string `ch:"device_id"` - Nonce uint64 `ch:"nonce"` - ProjectID string `ch:"project_id"` - ProjectVersion string `ch:"project_version"` - Payload string `ch:"payload"` - Signature string `ch:"signature"` - SignatureAlgorithm string `ch:"signature_algorithm"` - HashAlgorithm string `ch:"hash_algorithm"` - CreatedAt time.Time `ch:"create_at"` + TaskID string `ch:"task_id" gorm:"primarykey"` + DeviceID string `ch:"device_id" gorm:"not null"` + Nonce uint64 `ch:"nonce" gorm:"not null"` + ProjectID string `ch:"project_id" gorm:"not null"` + ProjectVersion string `ch:"project_version" gorm:"not null"` + Payload string `ch:"payload" gorm:"not null"` + Signature string `ch:"signature" gorm:"not null"` + SignatureAlgorithm string `ch:"signature_algorithm" gorm:"not null"` + HashAlgorithm string `ch:"hash_algorithm" gorm:"not null"` + CreatedAt time.Time `ch:"created_at" gorm:"not null"` } -func (p *DB) CreateTask(m *Task) error { +func (p *DB) CreateTasks(ts []*Task) error { batch, err := p.ch.PrepareBatch(context.Background(), "INSERT INTO w3bstream_tasks") if err != nil { return errors.Wrap(err, "failed to prepare batch") } - if err := batch.AppendStruct(m); err != nil { - return errors.Wrap(err, "failed to append struct") + for _, t := range ts { + if err := batch.AppendStruct(t); err != nil { + return errors.Wrap(err, "failed to append struct") + } } err = batch.Send() - return errors.Wrap(err, "failed to create task") + return errors.Wrap(err, "failed to create tasks") } func (p *DB) FetchTask(taskID common.Hash) (*Task, error) { @@ -57,7 +59,7 @@ func migrateCH(conn driver.Conn) error { signature String NOT NULL, signature_algorithm String NOT NULL, hash_algorithm String NOT NULL, - create_at DateTime NOT NULL + created_at DateTime NOT NULL ) ENGINE = ReplacingMergeTree() PRIMARY KEY task_id diff --git a/service/apinode/db/sqlite.go b/service/apinode/db/sqlite.go index bc9a6ab9..424b2ed3 100644 --- a/service/apinode/db/sqlite.go +++ b/service/apinode/db/sqlite.go @@ -34,6 +34,30 @@ type ProjectDevice struct { DeviceAddress string `gorm:"uniqueIndex:project_device_uniq,not null"` } +func (p *DB) CreateTask(t *Task) error { + err := p.sqlite.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "task_id"}}, + DoNothing: true, + }).Create(t).Error + return errors.Wrap(err, "failed to create task") +} + +func (p *DB) FetchAllTask() ([]*Task, error) { + ts := []*Task{} + if err := p.sqlite.Order("created_at ASC").Find(&ts).Error; err != nil { + return nil, errors.Wrap(err, "failed to query all tasks") + } + return ts, nil +} + +func (p *DB) DeleteTasks(taskIDs []string) error { + if len(taskIDs) == 0 { + return nil + } + err := p.sqlite.Unscoped().Where("task_id IN ?", taskIDs).Delete(&Task{}).Error + return errors.Wrap(err, "failed to delete tasks") +} + func (p *DB) UpsertAssignedTask(taskID common.Hash, prover common.Address) error { t := AssignedTask{ TaskID: taskID.Hex(), @@ -135,7 +159,7 @@ func newSqlite(localDBDir string) (*gorm.DB, error) { if err != nil { return nil, errors.Wrap(err, "failed to connect sqlite") } - if err := db.AutoMigrate(&scannedBlockNumber{}, &AssignedTask{}, &SettledTask{}, &ProjectDevice{}); err != nil { + if err := db.AutoMigrate(&scannedBlockNumber{}, &Task{}, &AssignedTask{}, &SettledTask{}, &ProjectDevice{}); err != nil { return nil, errors.Wrap(err, "failed to migrate model") } return db, nil