Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove l1 block, close resources #2242

Merged
merged 8 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions go/common/encoding.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,9 @@
package common

import (
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)

// EncodedL1Block the encoded version of an L1 block.
type EncodedL1Block []byte

func EncodeBlock(b *types.Block) (EncodedL1Block, error) {
encoded, err := rlp.EncodeToBytes(b)
if err != nil {
return nil, fmt.Errorf("could not encode block to bytes. Cause: %w", err)
}
return encoded, nil
}

func (eb EncodedL1Block) DecodeBlock() (*types.Block, error) {
b := types.Block{}
if err := rlp.DecodeBytes(eb, &b); err != nil {
return nil, fmt.Errorf("could not decode block from bytes. Cause: %w", err)
}
return &b, nil
}

func EncodeRollup(r *ExtRollup) (EncodedRollup, error) {
return rlp.EncodeToBytes(r)
}
Expand Down
6 changes: 0 additions & 6 deletions go/common/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package host
import (
"context"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ten-protocol/go-ten/go/common"
hostconfig "github.com/ten-protocol/go-ten/go/host/config"
"github.com/ten-protocol/go-ten/go/host/storage"
Expand Down Expand Up @@ -38,11 +37,6 @@ type Host interface {
NewHeadsChan() chan *common.BatchHeader
}

type BlockStream struct {
Stream <-chan *types.Block // the channel which will receive the consecutive, canonical blocks
Stop func() // function to permanently stop the stream and clean up any associated processes/resources
}

type BatchMsg struct {
Batches []*common.ExtBatch // The batches being sent.
IsLive bool // true if these batches are being sent as new, false if in response to a p2p request
Expand Down
8 changes: 4 additions & 4 deletions go/common/host/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ type L1DataService interface {
// Subscribe will register a block handler to receive new blocks as they arrive, returns unsubscribe func
Subscribe(handler L1BlockHandler) func()

FetchBlockByHeight(height *big.Int) (*types.Block, error)
FetchBlockByHeight(height *big.Int) (*types.Header, error)
// FetchNextBlock returns the next canonical block after a given block hash
// It returns the new block, a bool which is true if the block is the current L1 head and a bool if the block is on a different fork to prevBlock
FetchNextBlock(prevBlock gethcommon.Hash) (*types.Block, bool, error)
FetchNextBlock(prevBlock gethcommon.Hash) (*types.Header, bool, error)
// GetTenRelevantTransactions returns the events and transactions relevant to Ten
GetTenRelevantTransactions(block *common.L1Block) (*common.ProcessedL1Data, error)
GetTenRelevantTransactions(block *types.Header) (*common.ProcessedL1Data, error)
}

// L1BlockHandler is an interface for receiving new blocks from the repository as they arrive
type L1BlockHandler interface {
// HandleBlock will be called in a new goroutine for each new block as it arrives
HandleBlock(block *types.Block)
HandleBlock(block *types.Header)
}

// L1Publisher provides an interface for the host to interact with Ten data (management contract etc.) on L1
Expand Down
1 change: 0 additions & 1 deletion go/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type (
// MainNet aliases
L1Address = common.Address
L1BlockHash = common.Hash
L1Block = types.Block
L1Transaction = types.Transaction
L1Receipt = types.Receipt
L1Receipts = types.Receipts
Expand Down
22 changes: 0 additions & 22 deletions go/common/utils.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,19 @@
package common

import (
"math/big"
"math/rand"
"time"

"github.com/ethereum/go-ethereum/core/types"

gethcommon "github.com/ethereum/go-ethereum/common"
)

type (
Latency func() time.Duration
)

func MaxInt(x, y uint32) uint32 {
if x < y {
return y
}
return x
}

// ShortHash converts the hash to a shorter uint64 for printing.
func ShortHash(hash gethcommon.Hash) uint64 {
return hash.Big().Uint64()
}

// ShortAddress converts the address to a shorter uint64 for printing.
func ShortAddress(address gethcommon.Address) uint64 {
return address.Big().Uint64()
}

// ShortNonce converts the nonce to a shorter uint64 for printing.
func ShortNonce(nonce types.BlockNonce) uint64 {
return new(big.Int).SetBytes(nonce[4:]).Uint64()
}

// ExtractPotentialAddress - given a 32 byte hash , it checks whether it can be an address and extracts that
func ExtractPotentialAddress(hash gethcommon.Hash) *gethcommon.Address {
bitlen := hash.Big().BitLen()
Expand Down
58 changes: 33 additions & 25 deletions go/ethadapter/geth_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type gethRPCClient struct {
timeout time.Duration // the timeout for connecting to, or communicating with, the L1 node
logger gethlog.Logger
rpcURL string
blockCache *lru.Cache[gethcommon.Hash, *types.Block]
blockCache *lru.Cache[gethcommon.Hash, *types.Header]
}

// NewEthClientFromURL instantiates a new ethadapter.EthClient that connects to an ethereum node
Expand All @@ -58,7 +58,7 @@ func NewEthClientFromURL(rpcURL string, timeout time.Duration, logger gethlog.Lo
logger.Trace(fmt.Sprintf("Initialized eth node connection - addr: %s", rpcURL))

// cache recent blocks to avoid re-fetching them (they are often re-used for checking for forks etc.)
blkCache, err := lru.New[gethcommon.Hash, *types.Block](_defaultBlockCacheSize)
blkCache, err := lru.New[gethcommon.Hash, *types.Header](_defaultBlockCacheSize)
if err != nil {
return nil, fmt.Errorf("unable to initialize block cache - %w", err)
}
Expand All @@ -77,54 +77,53 @@ func NewEthClient(ipaddress string, port uint, timeout time.Duration, logger get
return NewEthClientFromURL(fmt.Sprintf("ws://%s:%d", ipaddress, port), timeout, logger)
}

func (e *gethRPCClient) FetchHeadBlock() (*types.Block, error) {
func (e *gethRPCClient) FetchHeadBlock() (*types.Header, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()

return e.client.BlockByNumber(ctx, nil)
return e.client.HeaderByNumber(ctx, nil)
}

func (e *gethRPCClient) Info() Info {
return Info{}
}

func (e *gethRPCClient) BlocksBetween(startingBlock *types.Header, lastBlock *types.Block) []*types.Block {
var blocksBetween []*types.Block
func (e *gethRPCClient) BlocksBetween(startingBlock *types.Header, lastBlock *types.Header) []*types.Header {
var blocksBetween []*types.Header
var err error

for currentBlk := lastBlock; currentBlk != nil && !bytes.Equal(currentBlk.Hash().Bytes(), startingBlock.Hash().Bytes()) && !bytes.Equal(currentBlk.ParentHash().Bytes(), gethcommon.HexToHash("").Bytes()); {
currentBlk, err = e.BlockByHash(currentBlk.ParentHash())
for currentBlk := lastBlock; currentBlk != nil && !bytes.Equal(currentBlk.Hash().Bytes(), startingBlock.Hash().Bytes()) && !bytes.Equal(currentBlk.ParentHash.Bytes(), gethcommon.HexToHash("").Bytes()); {
c := currentBlk.ParentHash
currentBlk, err = e.HeaderByHash(currentBlk.ParentHash)
if err != nil {
e.logger.Crit(fmt.Sprintf("could not fetch parent block with hash %s.", currentBlk.ParentHash().String()), log.ErrKey, err)
e.logger.Crit(fmt.Sprintf("could not fetch parent block with hash %s.", c.String()), log.ErrKey, err)
}
blocksBetween = append(blocksBetween, currentBlk)
}

return blocksBetween
}

func (e *gethRPCClient) IsBlockAncestor(block *types.Block, maybeAncestor common.L1BlockHash) bool {
func (e *gethRPCClient) IsBlockAncestor(block *types.Header, maybeAncestor common.L1BlockHash) bool {
if bytes.Equal(maybeAncestor.Bytes(), block.Hash().Bytes()) || bytes.Equal(maybeAncestor.Bytes(), (common.L1BlockHash{}).Bytes()) {
return true
}

if block.Number().Int64() == int64(common.L1GenesisHeight) {
if block.Number.Int64() == int64(common.L1GenesisHeight) {
return false
}

resolvedBlock, err := e.BlockByHash(maybeAncestor)
resolvedAncestorBlock, err := e.HeaderByHash(maybeAncestor)
if err != nil {
e.logger.Crit(fmt.Sprintf("could not fetch parent block with hash %s.", maybeAncestor.String()), log.ErrKey, err)
}
if resolvedBlock == nil {
if resolvedBlock.Number().Int64() >= block.Number().Int64() {
return false
}
if resolvedAncestorBlock.Number.Int64() >= block.Number.Int64() {
return false
}

p, err := e.BlockByHash(block.ParentHash())
p, err := e.HeaderByHash(block.ParentHash)
if err != nil {
e.logger.Crit(fmt.Sprintf("could not fetch parent block with hash %s", block.ParentHash().String()), log.ErrKey, err)
e.logger.Crit(fmt.Sprintf("could not fetch parent block with hash %s", block.ParentHash.String()), log.ErrKey, err)
}
if p == nil {
return false
Expand Down Expand Up @@ -166,7 +165,7 @@ func (e *gethRPCClient) BlockListener() (chan *types.Header, ethereum.Subscripti
defer cancel()

// we do not buffer here, we expect the consumer to always be ready to receive new blocks and not fall behind
ch := make(chan *types.Header, 1)
ch := make(chan *types.Header)
var sub ethereum.Subscription
var err error
err = retry.Do(func() error {
Expand All @@ -192,29 +191,38 @@ func (e *gethRPCClient) BlockNumber() (uint64, error) {
return e.client.BlockNumber(ctx)
}

func (e *gethRPCClient) BlockByNumber(n *big.Int) (*types.Block, error) {
func (e *gethRPCClient) HeaderByNumber(n *big.Int) (*types.Header, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()

return e.client.BlockByNumber(ctx, n)
return e.client.HeaderByNumber(ctx, n)
}

func (e *gethRPCClient) BlockByHash(hash gethcommon.Hash) (*types.Block, error) {
func (e *gethRPCClient) HeaderByHash(hash gethcommon.Hash) (*types.Header, error) {
block, found := e.blockCache.Get(hash)
if found {
return block, nil
cp := *block
return &cp, nil
}

// not in cache, fetch from RPC
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()

block, err := e.client.BlockByHash(ctx, hash)
block, err := e.client.HeaderByHash(ctx, hash)
if err != nil {
return nil, err
}
e.blockCache.Add(hash, block)
return block, nil
cp := *block
return &cp, nil
}

func (e *gethRPCClient) BlockByHash(hash gethcommon.Hash) (*types.Block, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()

return e.client.BlockByHash(ctx, hash)
}

func (e *gethRPCClient) CallContract(msg ethereum.CallMsg) ([]byte, error) {
Expand Down
13 changes: 7 additions & 6 deletions go/ethadapter/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ import (
// todo (#1617) - some of these methods are composed calls that should be decoupled in the future (ie: BlocksBetween or IsBlockAncestor)
type EthClient interface {
BlockNumber() (uint64, error) // retrieves the number of the head block
HeaderByHash(id gethcommon.Hash) (*types.Header, error) // retrieves a block header given a hash
BlockByHash(id gethcommon.Hash) (*types.Block, error) // retrieves a block given a hash
BlockByNumber(n *big.Int) (*types.Block, error) // retrieves a block given a number - returns head block if n is nil
HeaderByNumber(n *big.Int) (*types.Header, error) // retrieves a block given a number - returns head block if n is nil
SendTransaction(signedTx *types.Transaction) error // issues an ethereum transaction (expects signed tx)
TransactionReceipt(hash gethcommon.Hash) (*types.Receipt, error) // fetches the ethereum transaction receipt
TransactionByHash(hash gethcommon.Hash) (*types.Transaction, bool, error) // fetches the ethereum tx
Nonce(address gethcommon.Address) (uint64, error) // fetches the account nonce to use in the next transaction
BalanceAt(account gethcommon.Address, blockNumber *big.Int) (*big.Int, error) // fetches the balance of the account
GetLogs(q ethereum.FilterQuery) ([]types.Log, error) // fetches the logs for a given query

Info() Info // retrieves the node Info
FetchHeadBlock() (*types.Block, error) // retrieves the block at head height
BlocksBetween(block *types.Header, head *types.Block) []*types.Block // returns the blocks between two blocks
IsBlockAncestor(block *types.Block, proof common.L1BlockHash) bool // returns if the node considers a block the ancestor
BlockListener() (chan *types.Header, ethereum.Subscription) // subscribes to new blocks and returns a listener with the blocks heads and the subscription handler
Info() Info // retrieves the node Info
FetchHeadBlock() (*types.Header, error) // retrieves the block at head height
BlocksBetween(block *types.Header, head *types.Header) []*types.Header // returns the blocks between two blocks
IsBlockAncestor(block *types.Header, maybeAncestor common.L1BlockHash) bool // returns if the node considers a block the ancestor
BlockListener() (chan *types.Header, ethereum.Subscription) // subscribes to new blocks and returns a listener with the blocks heads and the subscription handler

CallContract(msg ethereum.CallMsg) ([]byte, error) // Runs the provided call message on the latest block.

Expand Down
18 changes: 7 additions & 11 deletions go/host/enclave/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func (g *Guardian) PromoteToActiveSequencer() error {
// Note: The L1 processing behaviour has two modes based on the state, either
// - enclave is behind: lookup blocks to feed it 1-by-1 (see `catchupWithL1()`), ignore new live blocks that arrive here
// - enclave is up-to-date: feed it these live blocks as they arrive, no need to lookup blocks
func (g *Guardian) HandleBlock(block *types.Block) {
g.logger.Debug("Received L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number())
func (g *Guardian) HandleBlock(block *types.Header) {
g.logger.Debug("Received L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number)
// record the newest block we've seen
g.state.OnReceivedBlock(block.Hash())
if !g.state.InSyncWithL1() {
Expand Down Expand Up @@ -463,8 +463,8 @@ func (g *Guardian) catchupWithL2() error {

// returns false if the block was not processed
// todo - @matt - think about removing the TryLock
func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, error) {
g.logger.Trace("submitting L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number())
func (g *Guardian) submitL1Block(block *types.Header, isLatest bool) (bool, error) {
g.logger.Trace("submitting L1 block", log.BlockHashKey, block.Hash(), log.BlockHeightKey, block.Number)
if !g.submitDataLock.TryLock() {
g.logger.Debug("Unable to submit block, enclave is busy processing data")
return false, nil
Expand All @@ -485,7 +485,7 @@ func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, er
// this is most common when we are returning to a previous fork and the enclave has already seen some of the blocks on it
// note: logging this because we don't expect it to happen often and would like visibility on that.
g.logger.Info("L1 block already processed by enclave, trying the next block", "block", block.Hash())
nextHeight := big.NewInt(0).Add(block.Number(), big.NewInt(1))
nextHeight := big.NewInt(0).Add(block.Number, big.NewInt(1))
nextCanonicalBlock, err := g.sl.L1Data().FetchBlockByHeight(nextHeight)
if err != nil {
return false, fmt.Errorf("failed to fetch next block after forking block=%s: %w", block.Hash(), err)
Expand All @@ -499,10 +499,6 @@ func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, er
g.state.OnProcessedBlock(block.Hash())
g.processL1BlockTransactions(block, rollupTxs, syncContracts)

if err != nil {
return false, fmt.Errorf("submitted block to enclave but could not store the block processing result. Cause: %w", err)
}

// todo: make sure this doesn't respond to old requests (once we have a proper protocol for that)
err = g.publishSharedSecretResponses(resp.ProducedSecretResponses)
if err != nil {
Expand All @@ -511,9 +507,9 @@ func (g *Guardian) submitL1Block(block *common.L1Block, isLatest bool) (bool, er
return true, nil
}

func (g *Guardian) processL1BlockTransactions(block *common.L1Block, rollupTxs []*common.L1RollupTx, syncContracts bool) {
func (g *Guardian) processL1BlockTransactions(block *types.Header, rollupTxs []*common.L1RollupTx, syncContracts bool) {
// TODO (@will) this should be removed and pulled from the L1
err := g.storage.AddBlock(block.Header())
err := g.storage.AddBlock(block)
if err != nil {
g.logger.Error("Could not add block to host db.", log.ErrKey, err)
}
Expand Down
Loading
Loading