Skip to content

Commit

Permalink
feat(thorgen): ability to watch live events
Browse files Browse the repository at this point in the history
  • Loading branch information
darrenvechain committed Oct 31, 2024
1 parent d4d5cdc commit 87c9d05
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 49 deletions.
108 changes: 81 additions & 27 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,93 @@
package blocks

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/darrenvechain/thorgo/client"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
)

type subscriber struct {
sub chan *client.ExpandedBlock
ctx context.Context
}

type Blocks struct {
client *client.Client
best atomic.Value
client *client.Client
best atomic.Value
subscribers sync.Map // Using sync.Map for concurrent access
}

func New(c *client.Client) *Blocks {
return &Blocks{client: c}
b := &Blocks{client: c}
go b.poll()
return b
}

// poll sends the expanded block to all active subscribers.
func (b *Blocks) poll() {
var previous *client.ExpandedBlock
var err error
backoff := 5 * time.Second

for {
previous, err = b.Expanded("best")
if err != nil {
time.Sleep(backoff)
continue
}
break
}

for {
nextBlockTime := time.Unix(previous.Timestamp, 0).Add(10 * time.Second)
now := time.Now().UTC()
if now.Before(nextBlockTime) {
time.Sleep(nextBlockTime.Add(100 * time.Millisecond).Sub(now))
}

next, err := b.Expanded("best")
if err != nil {
time.Sleep(2 * time.Second)
continue
}
if previous.ID != next.ID {
b.subscribers.Range(func(key, value interface{}) bool {
sub := value.(subscriber)
select {
case <-sub.ctx.Done():
b.subscribers.Delete(key)
close(sub.sub)
return false
default:
sub.sub <- next
}
return true
})
previous = next
} else {
// Sleep for a second if the block hasn't changed.
time.Sleep(1 * time.Second)
continue
}

}

Check failure on line 79 in blocks/blocks.go

View workflow job for this annotation

GitHub Actions / unit-tests

unnecessary trailing newline (whitespace)
}

// Subscribe adds a new subscriber to the block stream.
// The subscriber will receive the latest block produced.
// The subscriber will be removed when the context is done.
func (b *Blocks) Subscribe(ctx context.Context) <-chan *client.ExpandedBlock {
sub := make(chan *client.ExpandedBlock)
id := uuid.New().String()
s := subscriber{sub: sub, ctx: ctx}
b.subscribers.Store(id, s)
return sub
}

// ByID returns the block by the given ID.
Expand Down Expand Up @@ -66,28 +138,10 @@ func (b *Blocks) Expanded(revision string) (*client.ExpandedBlock, error) {

// Ticker waits for the next block to be produced
// Returns the next block
func (b *Blocks) Ticker() (*client.Block, error) {
best, err := b.Best()
if err != nil {
return nil, err
}

// Sleep until the current block + 10 seconds
predictedTime := time.Unix(best.Timestamp, 0).Add(10 * time.Second)
time.Sleep(time.Until(predictedTime))

ticker := time.NewTicker(1 * time.Second)
timeout := time.NewTimer(30 * time.Second)

for {
select {
case <-ticker.C:
nextBlock, err := b.client.Block(fmt.Sprintf("%d", best.Number+1))
if err == nil {
return nextBlock, nil
}
case <-timeout.C:
return nil, fmt.Errorf("timed out waiting for next block")
}
}
func (b *Blocks) Ticker() (*client.ExpandedBlock, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub := b.Subscribe(ctx)
blk := <-sub
return blk, nil
}
40 changes: 34 additions & 6 deletions blocks/blocks_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package blocks

import (
"context"
"testing"
"time"

"github.com/darrenvechain/thorgo/client"
"github.com/darrenvechain/thorgo/internal/testcontainer"
Expand All @@ -23,44 +25,70 @@ func TestMain(m *testing.M) {
}

// TestGetBestBlock fetches the best block from the network
func TestGetBestBlock(t *testing.T) {
func TestBlocks_Best(t *testing.T) {
block, err := blocks.Best()
assert.NoError(t, err)
assert.NotNil(t, block)
}

// TestGetBlockByNumber fetches a block by its number
func TestGetBlockByNumber(t *testing.T) {
func TestBlocks_ByNumber(t *testing.T) {
block, err := blocks.ByNumber(0)
assert.NoError(t, err)
assert.NotNil(t, block)
}

// TestGetBlockByID fetches a block by its ID
func TestGetBlockByID(t *testing.T) {
func TestBlocks_ByID(t *testing.T) {
block, err := blocks.ByID(solo.GenesisID())
assert.NoError(t, err)
assert.NotNil(t, block)
}

// TestGetFinalizedBlock fetches the finalized block from the network
func TestGetFinalizedBlock(t *testing.T) {
func TestBlocks_Finalized(t *testing.T) {
block, err := blocks.Finalized()
assert.NoError(t, err)
assert.NotNil(t, block)
}

// TestGetExpandedBlock fetches a block where all the transactions are expanded
// It accepts a revision, which can be a block ID, block number, "best" or "finalized"
func TestGetExpandedBlock(t *testing.T) {
func TestBlocks_Expanded(t *testing.T) {
block, err := blocks.Expanded(solo.GenesisID().Hex())
assert.NoError(t, err)
assert.NotNil(t, block)
}

// TestWaitForNextBlock waits for the next block to be produced
func TestWaitForNextBlock(t *testing.T) {
func TestBlocks_Ticker(t *testing.T) {
block, err := blocks.Ticker()
assert.NoError(t, err)
assert.NotNil(t, block)
}

func TestBlocks_Subscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub := blocks.Subscribe(ctx)
ticker := time.NewTicker(20 * time.Second)
for {
select {
case <-ticker.C:
t.Fatal("timed out waiting for block")
case blk := <-sub:
assert.NotNil(t, blk)
return
}
}
}

func TestBlocks_Unsubscribe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// cancel the context before we start the subscription
cancel()
sub := blocks.Subscribe(ctx)
blk, ok := <-sub
assert.Nil(t, blk)
assert.False(t, ok)
}
18 changes: 17 additions & 1 deletion client/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ type BlockTransaction struct {
}

type ExpandedBlock struct {
Block
Number int64 `json:"number"`
ID common.Hash `json:"id"`
Size int64 `json:"size"`
ParentID common.Hash `json:"parentID"`
Timestamp int64 `json:"timestamp"`
GasLimit int64 `json:"gasLimit"`
Beneficiary common.Address `json:"beneficiary"`
GasUsed int64 `json:"gasUsed"`
TotalScore int64 `json:"totalScore"`
TxsRoot common.Hash `json:"txsRoot"`
TxsFeatures int64 `json:"txsFeatures"`
StateRoot common.Hash `json:"stateRoot"`
ReceiptsRoot common.Hash `json:"receiptsRoot"`
Com bool `json:"com"`
Signer common.Address `json:"signer"`
IsTrunk bool `json:"isTrunk"`
IsFinalized bool `json:"isFinalized"`
Transactions []BlockTransaction `json:"transactions"`
}
2 changes: 1 addition & 1 deletion client/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Output struct {
type Event struct {
Address common.Address `json:"address"`
Topics []common.Hash `json:"topics"`
Data string `json:"data"`
Data hexutil.Bytes `json:"data"`
}

type ReceiptMeta struct {
Expand Down
Loading

0 comments on commit 87c9d05

Please sign in to comment.