Skip to content

Commit

Permalink
apinode add task aggregator (#777)
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhiran authored Dec 15, 2024
1 parent a74cdd1 commit 69dc91c
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 75 deletions.
21 changes: 11 additions & 10 deletions e2e/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ import (

func apiNodeInit(chDSN, dbFile, chainEndpoint, taskManagerContractAddr, ioidContractAddr string) (*apinode.APINode, string, error) {
cfg := apinodeconfig.Config{
LogLevel: slog.LevelInfo,
ServiceEndpoint: ":9000",
SequencerServiceEndpoint: "localhost:9001",
ProverServiceEndpoint: "localhost:9002",
DatabaseDSN: chDSN,
PrvKey: "",
ChainEndpoint: chainEndpoint,
BeginningBlockNumber: 0,
TaskManagerContractAddr: taskManagerContractAddr,
IoIDContractAddr: ioidContractAddr,
LogLevel: slog.LevelInfo,
ServiceEndpoint: ":9000",
SequencerServiceEndpoint: "localhost:9001",
TaskAggregatorIntervalSecond: 1,
ProverServiceEndpoint: "localhost:9002",
DatabaseDSN: chDSN,
PrvKey: "",
ChainEndpoint: chainEndpoint,
BeginningBlockNumber: 0,
TaskManagerContractAddr: taskManagerContractAddr,
IoIDContractAddr: ioidContractAddr,
}

db, err := apinodedb.New(dbFile, chDSN)
Expand Down
73 changes: 73 additions & 0 deletions service/apinode/aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package aggregator

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/iotexproject/w3bstream/service/apinode/db"
"github.com/iotexproject/w3bstream/service/sequencer/api"
"github.com/pkg/errors"
)

func Run(db *db.DB, sequencerAddr string, interval time.Duration) {
ticker := time.NewTicker(interval)
for range ticker.C {
ts, err := db.FetchAllTask()
if err != nil {
slog.Error("failed to fetch all tasks", "error", err)
continue
}
if len(ts) == 0 {
continue
}
if err := db.CreateTasks(ts); err != nil {
slog.Error("failed to create tasks", "error", err)
continue
}
time.Sleep(1 * time.Second) // after writing to clickhouse, reading immediately will not return the value.

tids := make([]string, 0, len(ts))
pt := map[string]string{}
for _, t := range ts {
tids = append(tids, t.TaskID)
pt[t.ProjectID] = t.TaskID
}
for _, t := range pt {
if err := notify(sequencerAddr, common.HexToHash(t)); err != nil {
slog.Error("failed to notify sequencer", "error", err)
continue
}
}
if err := db.DeleteTasks(tids); err != nil {
slog.Error("failed to delete tasks at local", "error", err)
}
}
}

func notify(sequencerAddr string, taskID common.Hash) error {
reqSequencer := &api.CreateTaskReq{TaskID: taskID}
reqSequencerJ, err := json.Marshal(reqSequencer)
if err != nil {
return errors.Wrap(err, "failed to marshal sequencer request")
}
resp, err := http.Post(fmt.Sprintf("http://%s/task", sequencerAddr), "application/json", bytes.NewBuffer(reqSequencerJ))
if err != nil {
return errors.Wrap(err, "failed to call sequencer service")
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err == nil {
err = errors.New(string(body))
}
return errors.Wrap(err, "failed to call sequencer service")
}
return nil
}
26 changes: 0 additions & 26 deletions service/apinode/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/iotexproject/w3bstream/project"
"github.com/iotexproject/w3bstream/service/apinode/db"
proverapi "github.com/iotexproject/w3bstream/service/prover/api"
sequencerapi "github.com/iotexproject/w3bstream/service/sequencer/api"
)

type errResp struct {
Expand Down Expand Up @@ -151,31 +150,6 @@ func (s *httpServer) createTask(c *gin.Context) {
return
}

reqSequencer := &sequencerapi.CreateTaskReq{TaskID: taskID}
reqSequencerJ, err := json.Marshal(reqSequencer)
if err != nil {
slog.Error("failed to marshal sequencer request", "error", err)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to marshal sequencer request")))
return
}
resp, err := http.Post(fmt.Sprintf("http://%s/task", s.sequencerAddr), "application/json", bytes.NewBuffer(reqSequencerJ))
if err != nil {
slog.Error("failed to call sequencer service", "error", err)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to call sequencer service")))
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err == nil {
err = errors.New(string(body))
}
slog.Error("failed to call sequencer service", "error", err)
c.JSON(http.StatusInternalServerError, newErrResp(errors.Wrap(err, "failed to call sequencer service")))
return
}

slog.Info("successfully processed message", "task_id", taskID.String())
c.JSON(http.StatusOK, &CreateTaskResp{
TaskID: taskID.String(),
Expand Down
4 changes: 4 additions & 0 deletions service/apinode/apinode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package apinode

import (
"log"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/iotexproject/w3bstream/monitor"
"github.com/iotexproject/w3bstream/service/apinode/aggregator"
"github.com/iotexproject/w3bstream/service/apinode/api"
"github.com/iotexproject/w3bstream/service/apinode/config"
"github.com/iotexproject/w3bstream/service/apinode/db"
Expand Down Expand Up @@ -42,6 +44,8 @@ func (n *APINode) Start() error {
return errors.Wrap(err, "failed to run contract monitor")
}

go aggregator.Run(n.db, n.cfg.SequencerServiceEndpoint, time.Duration(n.cfg.TaskAggregatorIntervalSecond)*time.Second)

go func() {
if err := api.Run(n.db, n.cfg.ServiceEndpoint, n.cfg.SequencerServiceEndpoint, n.cfg.ProverServiceEndpoint); err != nil {
log.Fatal(err)
Expand Down
48 changes: 25 additions & 23 deletions service/apinode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,34 @@ import (
)

type Config struct {
LogLevel slog.Level `env:"LOG_LEVEL,optional"`
ServiceEndpoint string `env:"HTTP_SERVICE_ENDPOINT"`
SequencerServiceEndpoint string `env:"SEQUENCER_SERVICE_ENDPOINT"`
ProverServiceEndpoint string `env:"PROVER_SERVICE_ENDPOINT"`
DatabaseDSN string `env:"DATABASE_DSN"`
PrvKey string `env:"PRIVATE_KEY,optional"`
ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"`
BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"`
TaskManagerContractAddr string `env:"TASK_MANAGER_CONTRACT_ADDRESS,optional"`
IoIDContractAddr string `env:"IOID_CONTRACT_ADDRESS,optional"`
LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"`
env string `env:"-"`
LogLevel slog.Level `env:"LOG_LEVEL,optional"`
ServiceEndpoint string `env:"HTTP_SERVICE_ENDPOINT"`
SequencerServiceEndpoint string `env:"SEQUENCER_SERVICE_ENDPOINT"`
TaskAggregatorIntervalSecond uint64 `env:"TASK_AGGREGATOR_INTERVAL_SECOND,optional"`
ProverServiceEndpoint string `env:"PROVER_SERVICE_ENDPOINT"`
DatabaseDSN string `env:"DATABASE_DSN"`
PrvKey string `env:"PRIVATE_KEY,optional"`
ChainEndpoint string `env:"CHAIN_ENDPOINT,optional"`
BeginningBlockNumber uint64 `env:"BEGINNING_BLOCK_NUMBER,optional"`
TaskManagerContractAddr string `env:"TASK_MANAGER_CONTRACT_ADDRESS,optional"`
IoIDContractAddr string `env:"IOID_CONTRACT_ADDRESS,optional"`
LocalDBDir string `env:"LOCAL_DB_DIRECTORY,optional"`
env string `env:"-"`
}

var defaultTestnetConfig = &Config{
LogLevel: slog.LevelInfo,
ServiceEndpoint: ":9000",
SequencerServiceEndpoint: "localhost:9001",
ProverServiceEndpoint: "localhost:9002",
PrvKey: "dbfe03b0406549232b8dccc04be8224fcc0afa300a33d4f335dcfdfead861c85",
ChainEndpoint: "https://babel-api.testnet.iotex.io",
BeginningBlockNumber: 28685000,
TaskManagerContractAddr: "0xF0714400a4C0C72007A9F910C5E3007614958636",
IoIDContractAddr: "0x45Ce3E6f526e597628c73B731a3e9Af7Fc32f5b7",
LocalDBDir: "./local_db",
env: "TESTNET",
LogLevel: slog.LevelInfo,
ServiceEndpoint: ":9000",
SequencerServiceEndpoint: "localhost:9001",
TaskAggregatorIntervalSecond: 1,
ProverServiceEndpoint: "localhost:9002",
PrvKey: "dbfe03b0406549232b8dccc04be8224fcc0afa300a33d4f335dcfdfead861c85",
ChainEndpoint: "https://babel-api.testnet.iotex.io",
BeginningBlockNumber: 28685000,
TaskManagerContractAddr: "0xF0714400a4C0C72007A9F910C5E3007614958636",
IoIDContractAddr: "0x45Ce3E6f526e597628c73B731a3e9Af7Fc32f5b7",
LocalDBDir: "./local_db",
env: "TESTNET",
}

func (c *Config) init() error {
Expand Down
32 changes: 17 additions & 15 deletions service/apinode/db/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,30 @@ import (
)

type Task struct {
TaskID string `ch:"task_id"`
DeviceID string `ch:"device_id"`
Nonce uint64 `ch:"nonce"`
ProjectID string `ch:"project_id"`
ProjectVersion string `ch:"project_version"`
Payload string `ch:"payload"`
Signature string `ch:"signature"`
SignatureAlgorithm string `ch:"signature_algorithm"`
HashAlgorithm string `ch:"hash_algorithm"`
CreatedAt time.Time `ch:"create_at"`
TaskID string `ch:"task_id" gorm:"primarykey"`
DeviceID string `ch:"device_id" gorm:"not null"`
Nonce uint64 `ch:"nonce" gorm:"not null"`
ProjectID string `ch:"project_id" gorm:"not null"`
ProjectVersion string `ch:"project_version" gorm:"not null"`
Payload string `ch:"payload" gorm:"not null"`
Signature string `ch:"signature" gorm:"not null"`
SignatureAlgorithm string `ch:"signature_algorithm" gorm:"not null"`
HashAlgorithm string `ch:"hash_algorithm" gorm:"not null"`
CreatedAt time.Time `ch:"created_at" gorm:"not null"`
}

func (p *DB) CreateTask(m *Task) error {
func (p *DB) CreateTasks(ts []*Task) error {
batch, err := p.ch.PrepareBatch(context.Background(), "INSERT INTO w3bstream_tasks")
if err != nil {
return errors.Wrap(err, "failed to prepare batch")
}
if err := batch.AppendStruct(m); err != nil {
return errors.Wrap(err, "failed to append struct")
for _, t := range ts {
if err := batch.AppendStruct(t); err != nil {
return errors.Wrap(err, "failed to append struct")
}
}
err = batch.Send()
return errors.Wrap(err, "failed to create task")
return errors.Wrap(err, "failed to create tasks")
}

func (p *DB) FetchTask(taskID common.Hash) (*Task, error) {
Expand All @@ -57,7 +59,7 @@ func migrateCH(conn driver.Conn) error {
signature String NOT NULL,
signature_algorithm String NOT NULL,
hash_algorithm String NOT NULL,
create_at DateTime NOT NULL
created_at DateTime NOT NULL
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY task_id
Expand Down
26 changes: 25 additions & 1 deletion service/apinode/db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ type ProjectDevice struct {
DeviceAddress string `gorm:"uniqueIndex:project_device_uniq,not null"`
}

func (p *DB) CreateTask(t *Task) error {
err := p.sqlite.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "task_id"}},
DoNothing: true,
}).Create(t).Error
return errors.Wrap(err, "failed to create task")
}

func (p *DB) FetchAllTask() ([]*Task, error) {
ts := []*Task{}
if err := p.sqlite.Order("created_at ASC").Find(&ts).Error; err != nil {
return nil, errors.Wrap(err, "failed to query all tasks")
}
return ts, nil
}

func (p *DB) DeleteTasks(taskIDs []string) error {
if len(taskIDs) == 0 {
return nil
}
err := p.sqlite.Unscoped().Where("task_id IN ?", taskIDs).Delete(&Task{}).Error
return errors.Wrap(err, "failed to delete tasks")
}

func (p *DB) UpsertAssignedTask(taskID common.Hash, prover common.Address) error {
t := AssignedTask{
TaskID: taskID.Hex(),
Expand Down Expand Up @@ -135,7 +159,7 @@ func newSqlite(localDBDir string) (*gorm.DB, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to connect sqlite")
}
if err := db.AutoMigrate(&scannedBlockNumber{}, &AssignedTask{}, &SettledTask{}, &ProjectDevice{}); err != nil {
if err := db.AutoMigrate(&scannedBlockNumber{}, &Task{}, &AssignedTask{}, &SettledTask{}, &ProjectDevice{}); err != nil {
return nil, errors.Wrap(err, "failed to migrate model")
}
return db, nil
Expand Down

0 comments on commit 69dc91c

Please sign in to comment.