Skip to content

Commit

Permalink
feat!: adds support for gRPC streaming and removes dependency on RPC (#…
Browse files Browse the repository at this point in the history
…3915)

Co-authored-by: Maciej Zwoliński <[email protected]>
  • Loading branch information
2 people authored and vgonkivs committed Jan 15, 2025
1 parent e11cd71 commit 46c6e9b
Show file tree
Hide file tree
Showing 30 changed files with 479 additions and 318 deletions.
71 changes: 56 additions & 15 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,65 @@ package core
import (
"fmt"

retryhttp "github.com/hashicorp/go-retryablehttp"
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/client/http"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// Client is an alias to Core Client.
type Client = client.Client
// Client is a core gRPC client.
type Client struct {
coregrpc.BlockAPIClient
host, port string
conn *grpc.ClientConn
}

// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP.
func NewRemote(ip, port string) (Client, error) {
httpClient := retryhttp.NewClient()
httpClient.RetryMax = 2
// suppress logging
httpClient.Logger = nil
// NewClient creates a new Client that communicates with a remote Core endpoint over gRPC.
// The connection is not started when creating the client.
// Use the Start method to start the connection.
func NewClient(host, port string) *Client {
return &Client{
host: host,
port: port,
}
}

return http.NewWithClient(
fmt.Sprintf("tcp://%s:%s", ip, port),
"/websocket",
httpClient.StandardClient(),
// Start created the Client's gRPC connection with optional dial options.
// If the connection is already started, it does nothing.
func (c *Client) Start(opts ...grpc.DialOption) error {
if c.IsRunning() {
return nil
}
if len(opts) == 0 {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.NewClient(
fmt.Sprintf("%s:%s", c.host, c.port),
opts...,
)
if err != nil {
return err
}
c.conn = conn

c.BlockAPIClient = coregrpc.NewBlockAPIClient(conn)
return nil
}

// IsRunning checks if the client's connection is established and ready for use.
// It returns true if the connection is active, false otherwise.
func (c *Client) IsRunning() bool {
return c.conn != nil && c.BlockAPIClient != nil
}

// Stop terminates the Client's gRPC connection and releases all related resources.
// If the connection is already stopped, it does nothing.
func (c *Client) Stop() error {
if !c.IsRunning() {
return nil
}
defer func() {
c.conn = nil
c.BlockAPIClient = nil
}()
return c.conn.Close()
}
4 changes: 2 additions & 2 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// extendBlock extends the given block data, returning the resulting
// ExtendedDataSquare (EDS). If there are no transactions in the block,
// nil is returned in place of the eds.
func extendBlock(data types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
if app.IsEmptyBlockRef(&data, appVersion) {
func extendBlock(data *types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
if app.IsEmptyBlockRef(data, appVersion) {
return share.EmptyEDS(), nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestTrulyEmptySquare(t *testing.T) {
SquareSize: 1,
}

eds, err := extendBlock(data, appconsts.LatestVersion)
eds, err := extendBlock(&data, appconsts.LatestVersion)
require.NoError(t, err)
require.True(t, eds.Equals(share.EmptyEDS()))
}
Expand All @@ -38,7 +38,7 @@ func TestEmptySquareWithZeroTxs(t *testing.T) {
Txs: []types.Tx{},
}

eds, err := extendBlock(data, appconsts.LatestVersion)
eds, err := extendBlock(&data, appconsts.LatestVersion)
require.NoError(t, err)
require.True(t, eds.Equals(share.EmptyEDS()))

Expand Down
25 changes: 8 additions & 17 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"time"

"github.com/tendermint/tendermint/types"
"golang.org/x/sync/errgroup"

libhead "github.com/celestiaorg/go-header"
Expand Down Expand Up @@ -62,8 +61,7 @@ func NewExchange(

func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
log.Debugw("requesting header", "height", height)
intHeight := int64(height)
return ce.getExtendedHeaderByHeight(ctx, &intHeight)
return ce.getExtendedHeaderByHeight(ctx, int64(height))
}

func (ce *Exchange) GetRangeByHeight(
Expand Down Expand Up @@ -129,12 +127,12 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("fetching block by hash %s: %w", hash.String(), err)
}

comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height)
comm, vals, err := ce.fetcher.GetBlockInfo(ctx, block.Height)
if err != nil {
return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err)
}

eds, err := extendBlock(block.Data, block.Header.Version.App)
eds, err := extendBlock(&block.Data, block.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
Expand Down Expand Up @@ -162,29 +160,22 @@ func (ce *Exchange) Head(
_ ...libhead.HeadOption[*header.ExtendedHeader],
) (*header.ExtendedHeader, error) {
log.Debug("requesting head")
return ce.getExtendedHeaderByHeight(ctx, nil)
return ce.getExtendedHeaderByHeight(ctx, 0)
}

func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*header.ExtendedHeader, error) {
func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) {
b, err := ce.fetcher.GetSignedBlock(ctx, height)
if err != nil {
if height == nil {
return nil, fmt.Errorf("fetching signed block for head from core: %w", err)
}
return nil, fmt.Errorf("fetching signed block at height %d from core: %w", *height, err)
return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err)
}
log.Debugw("fetched signed block from core", "height", b.Header.Height)

eds, err := extendBlock(b.Data, b.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}

// TODO(@Wondertan): This is a hack to deref Data, allowing GC to pick it up.
// The better footgun-less solution is to change core.ResultSignedBlock fields to be pointers instead of values.
b.Data = types.Data{}

eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds)
// create extended header
eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}
Expand Down
16 changes: 12 additions & 4 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"bytes"
"context"
"net"
"testing"
"time"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
genBlock, err := fetcher.GetBlock(ctx, genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)
Expand All @@ -61,6 +62,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
require.NoError(t, err)
assert.True(t, has)
}
require.NoError(t, fetcher.Stop(ctx))
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
Expand All @@ -87,7 +89,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
genBlock, err := fetcher.GetBlock(ctx, genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)
Expand Down Expand Up @@ -166,7 +168,13 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
// flakiness with accessing account state)
_, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure?
require.NoError(t, err)
return NewBlockFetcher(cctx.Client), cctx
host, port, err := net.SplitHostPort(cctx.GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)
return fetcher, cctx
}

// fillBlocks fills blocks until the context is canceled.
Expand Down Expand Up @@ -202,7 +210,7 @@ func generateNonEmptyBlocks(
sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
defer func() {
err = fetcher.UnsubscribeNewBlockEvent(ctx)
err = fetcher.Stop(ctx)
require.NoError(t, err)
}()

Expand Down
Loading

0 comments on commit 46c6e9b

Please sign in to comment.