From 87c9d055fbb291813fa791942b989c7891c96436 Mon Sep 17 00:00:00 2001 From: Darren Kelly Date: Thu, 31 Oct 2024 19:18:30 +0000 Subject: [PATCH] feat(thorgen): ability to watch live events --- blocks/blocks.go | 108 ++++++++++++++++++------ blocks/blocks_test.go | 40 +++++++-- client/blocks.go | 18 +++- client/transactions.go | 2 +- cmd/thorgen/bind/source.go.tpl | 149 +++++++++++++++++++++++++++++---- 5 files changed, 268 insertions(+), 49 deletions(-) diff --git a/blocks/blocks.go b/blocks/blocks.go index 5a89b84..e8b919a 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -1,21 +1,93 @@ package blocks import ( + "context" "fmt" + "sync" "sync/atomic" "time" "github.com/darrenvechain/thorgo/client" "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" ) +type subscriber struct { + sub chan *client.ExpandedBlock + ctx context.Context +} + type Blocks struct { - client *client.Client - best atomic.Value + client *client.Client + best atomic.Value + subscribers sync.Map // Using sync.Map for concurrent access } func New(c *client.Client) *Blocks { - return &Blocks{client: c} + b := &Blocks{client: c} + go b.poll() + return b +} + +// poll sends the expanded block to all active subscribers. +func (b *Blocks) poll() { + var previous *client.ExpandedBlock + var err error + backoff := 5 * time.Second + + for { + previous, err = b.Expanded("best") + if err != nil { + time.Sleep(backoff) + continue + } + break + } + + for { + nextBlockTime := time.Unix(previous.Timestamp, 0).Add(10 * time.Second) + now := time.Now().UTC() + if now.Before(nextBlockTime) { + time.Sleep(nextBlockTime.Add(100 * time.Millisecond).Sub(now)) + } + + next, err := b.Expanded("best") + if err != nil { + time.Sleep(2 * time.Second) + continue + } + if previous.ID != next.ID { + b.subscribers.Range(func(key, value interface{}) bool { + sub := value.(subscriber) + select { + case <-sub.ctx.Done(): + b.subscribers.Delete(key) + close(sub.sub) + return false + default: + sub.sub <- next + } + return true + }) + previous = next + } else { + // Sleep for a second if the block hasn't changed. + time.Sleep(1 * time.Second) + continue + } + + } +} + +// Subscribe adds a new subscriber to the block stream. +// The subscriber will receive the latest block produced. +// The subscriber will be removed when the context is done. +func (b *Blocks) Subscribe(ctx context.Context) <-chan *client.ExpandedBlock { + sub := make(chan *client.ExpandedBlock) + id := uuid.New().String() + s := subscriber{sub: sub, ctx: ctx} + b.subscribers.Store(id, s) + return sub } // ByID returns the block by the given ID. @@ -66,28 +138,10 @@ func (b *Blocks) Expanded(revision string) (*client.ExpandedBlock, error) { // Ticker waits for the next block to be produced // Returns the next block -func (b *Blocks) Ticker() (*client.Block, error) { - best, err := b.Best() - if err != nil { - return nil, err - } - - // Sleep until the current block + 10 seconds - predictedTime := time.Unix(best.Timestamp, 0).Add(10 * time.Second) - time.Sleep(time.Until(predictedTime)) - - ticker := time.NewTicker(1 * time.Second) - timeout := time.NewTimer(30 * time.Second) - - for { - select { - case <-ticker.C: - nextBlock, err := b.client.Block(fmt.Sprintf("%d", best.Number+1)) - if err == nil { - return nextBlock, nil - } - case <-timeout.C: - return nil, fmt.Errorf("timed out waiting for next block") - } - } +func (b *Blocks) Ticker() (*client.ExpandedBlock, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sub := b.Subscribe(ctx) + blk := <-sub + return blk, nil } diff --git a/blocks/blocks_test.go b/blocks/blocks_test.go index 62ad6d0..3599256 100644 --- a/blocks/blocks_test.go +++ b/blocks/blocks_test.go @@ -1,7 +1,9 @@ package blocks import ( + "context" "testing" + "time" "github.com/darrenvechain/thorgo/client" "github.com/darrenvechain/thorgo/internal/testcontainer" @@ -23,28 +25,28 @@ func TestMain(m *testing.M) { } // TestGetBestBlock fetches the best block from the network -func TestGetBestBlock(t *testing.T) { +func TestBlocks_Best(t *testing.T) { block, err := blocks.Best() assert.NoError(t, err) assert.NotNil(t, block) } // TestGetBlockByNumber fetches a block by its number -func TestGetBlockByNumber(t *testing.T) { +func TestBlocks_ByNumber(t *testing.T) { block, err := blocks.ByNumber(0) assert.NoError(t, err) assert.NotNil(t, block) } // TestGetBlockByID fetches a block by its ID -func TestGetBlockByID(t *testing.T) { +func TestBlocks_ByID(t *testing.T) { block, err := blocks.ByID(solo.GenesisID()) assert.NoError(t, err) assert.NotNil(t, block) } // TestGetFinalizedBlock fetches the finalized block from the network -func TestGetFinalizedBlock(t *testing.T) { +func TestBlocks_Finalized(t *testing.T) { block, err := blocks.Finalized() assert.NoError(t, err) assert.NotNil(t, block) @@ -52,15 +54,41 @@ func TestGetFinalizedBlock(t *testing.T) { // TestGetExpandedBlock fetches a block where all the transactions are expanded // It accepts a revision, which can be a block ID, block number, "best" or "finalized" -func TestGetExpandedBlock(t *testing.T) { +func TestBlocks_Expanded(t *testing.T) { block, err := blocks.Expanded(solo.GenesisID().Hex()) assert.NoError(t, err) assert.NotNil(t, block) } // TestWaitForNextBlock waits for the next block to be produced -func TestWaitForNextBlock(t *testing.T) { +func TestBlocks_Ticker(t *testing.T) { block, err := blocks.Ticker() assert.NoError(t, err) assert.NotNil(t, block) } + +func TestBlocks_Subscribe(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sub := blocks.Subscribe(ctx) + ticker := time.NewTicker(20 * time.Second) + for { + select { + case <-ticker.C: + t.Fatal("timed out waiting for block") + case blk := <-sub: + assert.NotNil(t, blk) + return + } + } +} + +func TestBlocks_Unsubscribe(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // cancel the context before we start the subscription + cancel() + sub := blocks.Subscribe(ctx) + blk, ok := <-sub + assert.Nil(t, blk) + assert.False(t, ok) +} diff --git a/client/blocks.go b/client/blocks.go index 790c411..539ffe9 100644 --- a/client/blocks.go +++ b/client/blocks.go @@ -57,6 +57,22 @@ type BlockTransaction struct { } type ExpandedBlock struct { - Block + Number int64 `json:"number"` + ID common.Hash `json:"id"` + Size int64 `json:"size"` + ParentID common.Hash `json:"parentID"` + Timestamp int64 `json:"timestamp"` + GasLimit int64 `json:"gasLimit"` + Beneficiary common.Address `json:"beneficiary"` + GasUsed int64 `json:"gasUsed"` + TotalScore int64 `json:"totalScore"` + TxsRoot common.Hash `json:"txsRoot"` + TxsFeatures int64 `json:"txsFeatures"` + StateRoot common.Hash `json:"stateRoot"` + ReceiptsRoot common.Hash `json:"receiptsRoot"` + Com bool `json:"com"` + Signer common.Address `json:"signer"` + IsTrunk bool `json:"isTrunk"` + IsFinalized bool `json:"isFinalized"` Transactions []BlockTransaction `json:"transactions"` } diff --git a/client/transactions.go b/client/transactions.go index 7be4bf4..3abca96 100644 --- a/client/transactions.go +++ b/client/transactions.go @@ -56,7 +56,7 @@ type Output struct { type Event struct { Address common.Address `json:"address"` Topics []common.Hash `json:"topics"` - Data string `json:"data"` + Data hexutil.Bytes `json:"data"` } type ReceiptMeta struct { diff --git a/cmd/thorgen/bind/source.go.tpl b/cmd/thorgen/bind/source.go.tpl index 44511b3..6beb405 100644 --- a/cmd/thorgen/bind/source.go.tpl +++ b/cmd/thorgen/bind/source.go.tpl @@ -4,22 +4,20 @@ package {{.Package}} import ( + "context" "errors" "math/big" "strings" - ethereum "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" "github.com/darrenvechain/thorgo" "github.com/darrenvechain/thorgo/accounts" + "github.com/darrenvechain/thorgo/client" + "github.com/darrenvechain/thorgo/crypto/tx" "github.com/darrenvechain/thorgo/transactions" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/darrenvechain/thorgo/crypto/tx" - "github.com/darrenvechain/thorgo/client" ) // Reference imports to suppress errors if they are not otherwise used. @@ -27,11 +25,8 @@ var ( _ = errors.New _ = big.NewInt _ = strings.NewReader - _ = ethereum.NotFound _ = bind.Bind _ = common.Big1 - _ = types.BloomLookup - _ = event.NewSubscription _ = abi.ConvertType _ = hexutil.MustDecode ) @@ -144,7 +139,7 @@ var ( // {{.Normalized.Name}} is a free data retrieval call binding the contract method 0x{{printf "%x" .Original.ID}}. // // Solidity: {{.Original.String}} - func (_{{$contract.Type}} *{{$contract.Type}}) {{.Normalized.Name}}(opts *bind.CallOpts {{range .Normalized.Inputs}}, {{.Name}} {{bindtype .Type $structs}} {{end}}) ({{if .Structured}}struct{ {{range .Normalized.Outputs}}{{.Name}} {{bindtype .Type $structs}};{{end}} },{{else}}{{range .Normalized.Outputs}}{{bindtype .Type $structs}},{{end}}{{end}} error) { + func (_{{$contract.Type}} *{{$contract.Type}}) {{.Normalized.Name}}({{range .Normalized.Inputs}} {{.Name}} {{bindtype .Type $structs}}, {{end}}) ({{if .Structured}}struct{ {{range .Normalized.Outputs}}{{.Name}} {{bindtype .Type $structs}};{{end}} },{{else}}{{range .Normalized.Outputs}}{{bindtype .Type $structs}},{{end}}{{end}} error) { var out []interface{} err := _{{$contract.Type}}.Call(&out, "{{.Original.Name}}" {{range .Normalized.Inputs}}, {{.Name}}{{end}}) {{if .Structured}} @@ -270,8 +265,6 @@ var ( } {{ end }} - - filter := &client.EventFilter{ Range: rang, Options: opts, @@ -303,5 +296,133 @@ var ( return events, nil } + + // Watch{{.Normalized.Name}} listens for on chain events binding the contract event 0x{{printf "%x" .Original.ID}}. + // + // Solidity: {{.Original.String}} + func (_{{$contract.Type}} *{{$contract.Type}}) Watch{{.Normalized.Name}}({{ if gt $indexedArgCount 0 }}criteria []{{$contract.Type}}{{.Normalized.Name}}Criteria, {{ end }} ctx context.Context) (chan *{{$contract.Type}}{{.Normalized.Name}}, error) { + topicHash := _{{$contract.Type}}.contract.ABI.Events["{{.Normalized.Name}}"].ID + + {{ if gt $indexedArgCount 0 }} + criteriaSet := make([]client.EventCriteria, len(criteria)) + for i, c := range criteria { + crteria := client.EventCriteria{ + Address: &_{{$contract.Type}}.contract.Address, + Topic0: &topicHash, + } + {{- range $index, $element := .Normalized.Inputs }} + {{- if .Indexed }} + if c.{{capitalise .Name}} != nil { + {{- $type := bindtype .Type $structs }} + {{- if (eq (slice $type 0 1) "*") }} + matcher := c.{{capitalise .Name}} + {{- else }} + matcher := *c.{{capitalise .Name}} + {{- end }} + topics, err := abi.MakeTopics([]interface{}{matcher}) + if err != nil { + return nil, err + } + + {{- if eq $index 0}} + crteria.Topic1 = &topics[0][0] + {{- end}} + {{- if eq $index 1}} + crteria.Topic2 = &topics[0][0] + {{- end}} + {{- if eq $index 2}} + crteria.Topic3 = &topics[0][0] + {{- end}} + {{- if eq $index 3}} + crteria.Topic4 = &topics[0][0] + {{- end}} + } + {{- end }} + {{- end }} + + criteriaSet[i] = crteria + } + + if len(criteriaSet) == 0 { + criteriaSet = append(criteriaSet, client.EventCriteria{ + Address: &_{{$contract.Type}}.contract.Address, + Topic0: &topicHash, // Add Topic0 here + }) + } + {{ else }} + criteriaSet := []client.EventCriteria{ + client.EventCriteria{ + Address: &_{{$contract.Type}}.contract.Address, + Topic0: &topicHash, + }, + } + {{ end }} + + eventChan := make(chan *{{$contract.Type}}{{.Normalized.Name}}, 100) + blockSub := _{{$contract.Type}}.thor.Blocks.Subscribe(ctx) + + go func() { + defer close(eventChan) + + for { + select { + case block := <-blockSub: + // for range in block txs + for _, tx := range block.Transactions { + for index, outputs := range tx.Outputs { + for _, event := range outputs.Events { + if event.Address == _B3tr.contract.Address { + if event.Topics[0] == topicHash { + for _, c := range criteriaSet { + matches := true + if c.Topic1 != nil && *c.Topic1 != event.Topics[1] { + matches = false + } + if c.Topic2 != nil && *c.Topic2 != event.Topics[2] { + matches = false + } + if c.Topic3 != nil && *c.Topic3 != event.Topics[3] { + matches = false + } + if c.Topic4 != nil && *c.Topic4 != event.Topics[4] { + matches = false + } + + if matches { + log := client.EventLog{ + Address: &_B3tr.contract.Address, + Topics: event.Topics, + Data: event.Data, + Meta: client.LogMeta{ + BlockID: block.ID, + BlockNumber: block.Number, + BlockTime: block.Timestamp, + TxID: tx.ID, + TxOrigin: tx.Origin, + ClauseIndex: int64(index), + }, + } + + ev := new({{$contract.Type}}{{.Normalized.Name}}) + if err := _{{$contract.Type}}.contract.UnpackLog(ev, "{{.Normalized.Name}}", log); err != nil { + continue + } + ev.Log = log + eventChan <- ev + } + } + } + } + } + } + } + case <-ctx.Done(): + return + } + } + }() + + return eventChan, nil + } {{end}} {{end}}