diff --git a/core/client.go b/core/client.go index 9636619b02..2bef6dba0c 100644 --- a/core/client.go +++ b/core/client.go @@ -3,24 +3,65 @@ package core import ( "fmt" - retryhttp "github.com/hashicorp/go-retryablehttp" - "github.com/tendermint/tendermint/rpc/client" - "github.com/tendermint/tendermint/rpc/client/http" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) -// Client is an alias to Core Client. -type Client = client.Client +// Client is a core gRPC client. +type Client struct { + coregrpc.BlockAPIClient + host, port string + conn *grpc.ClientConn +} -// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP. -func NewRemote(ip, port string) (Client, error) { - httpClient := retryhttp.NewClient() - httpClient.RetryMax = 2 - // suppress logging - httpClient.Logger = nil +// NewClient creates a new Client that communicates with a remote Core endpoint over gRPC. +// The connection is not started when creating the client. +// Use the Start method to start the connection. +func NewClient(host, port string) *Client { + return &Client{ + host: host, + port: port, + } +} - return http.NewWithClient( - fmt.Sprintf("tcp://%s:%s", ip, port), - "/websocket", - httpClient.StandardClient(), +// Start created the Client's gRPC connection with optional dial options. +// If the connection is already started, it does nothing. +func (c *Client) Start(opts ...grpc.DialOption) error { + if c.IsRunning() { + return nil + } + if len(opts) == 0 { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + conn, err := grpc.NewClient( + fmt.Sprintf("%s:%s", c.host, c.port), + opts..., ) + if err != nil { + return err + } + c.conn = conn + + c.BlockAPIClient = coregrpc.NewBlockAPIClient(conn) + return nil +} + +// IsRunning checks if the client's connection is established and ready for use. +// It returns true if the connection is active, false otherwise. +func (c *Client) IsRunning() bool { + return c.conn != nil && c.BlockAPIClient != nil +} + +// Stop terminates the Client's gRPC connection and releases all related resources. +// If the connection is already stopped, it does nothing. +func (c *Client) Stop() error { + if !c.IsRunning() { + return nil + } + defer func() { + c.conn = nil + c.BlockAPIClient = nil + }() + return c.conn.Close() } diff --git a/core/eds.go b/core/eds.go index 5e95f30103..186b5d1df2 100644 --- a/core/eds.go +++ b/core/eds.go @@ -24,8 +24,8 @@ import ( // extendBlock extends the given block data, returning the resulting // ExtendedDataSquare (EDS). If there are no transactions in the block, // nil is returned in place of the eds. -func extendBlock(data types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) { - if app.IsEmptyBlockRef(&data, appVersion) { +func extendBlock(data *types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) { + if app.IsEmptyBlockRef(data, appVersion) { return share.EmptyEDS(), nil } diff --git a/core/eds_test.go b/core/eds_test.go index 3bed7ea94e..f3ff767860 100644 --- a/core/eds_test.go +++ b/core/eds_test.go @@ -22,7 +22,7 @@ func TestTrulyEmptySquare(t *testing.T) { SquareSize: 1, } - eds, err := extendBlock(data, appconsts.LatestVersion) + eds, err := extendBlock(&data, appconsts.LatestVersion) require.NoError(t, err) require.True(t, eds.Equals(share.EmptyEDS())) } @@ -38,7 +38,7 @@ func TestEmptySquareWithZeroTxs(t *testing.T) { Txs: []types.Tx{}, } - eds, err := extendBlock(data, appconsts.LatestVersion) + eds, err := extendBlock(&data, appconsts.LatestVersion) require.NoError(t, err) require.True(t, eds.Equals(share.EmptyEDS())) diff --git a/core/exchange.go b/core/exchange.go index a813955968..90045d006e 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/tendermint/tendermint/types" "golang.org/x/sync/errgroup" libhead "github.com/celestiaorg/go-header" @@ -60,8 +59,7 @@ func NewExchange( func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { log.Debugw("requesting header", "height", height) - intHeight := int64(height) - return ce.getExtendedHeaderByHeight(ctx, &intHeight) + return ce.getExtendedHeaderByHeight(ctx, int64(height)) } func (ce *Exchange) GetRangeByHeight( @@ -127,12 +125,12 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende return nil, fmt.Errorf("fetching block by hash %s: %w", hash.String(), err) } - comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height) + comm, vals, err := ce.fetcher.GetBlockInfo(ctx, block.Height) if err != nil { return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err) } - eds, err := extendBlock(block.Data, block.Header.Version.App) + eds, err := extendBlock(&block.Data, block.Header.Version.App) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err) } @@ -160,16 +158,13 @@ func (ce *Exchange) Head( _ ...libhead.HeadOption[*header.ExtendedHeader], ) (*header.ExtendedHeader, error) { log.Debug("requesting head") - return ce.getExtendedHeaderByHeight(ctx, nil) + return ce.getExtendedHeaderByHeight(ctx, 0) } -func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*header.ExtendedHeader, error) { +func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) { b, err := ce.fetcher.GetSignedBlock(ctx, height) if err != nil { - if height == nil { - return nil, fmt.Errorf("fetching signed block for head from core: %w", err) - } - return nil, fmt.Errorf("fetching signed block at height %d from core: %w", *height, err) + return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err) } log.Debugw("fetched signed block from core", "height", b.Header.Height) @@ -177,12 +172,8 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err) } - - // TODO(@Wondertan): This is a hack to deref Data, allowing GC to pick it up. - // The better footgun-less solution is to change core.ResultSignedBlock fields to be pointers instead of values. - b.Data = types.Data{} - - eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds) + // create extended header + eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds) if err != nil { panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } diff --git a/core/exchange_test.go b/core/exchange_test.go index f7e69be8a4..4d9f9970dd 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -3,6 +3,7 @@ package core import ( "bytes" "context" + "net" "testing" "time" @@ -34,7 +35,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -61,6 +62,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { require.NoError(t, err) assert.True(t, has) } + require.NoError(t, fetcher.Stop(ctx)) } // TestExchange_DoNotStoreHistoric tests that the CoreExchange will not @@ -87,7 +89,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -117,7 +119,13 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn // flakiness with accessing account state) _, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure? require.NoError(t, err) - return NewBlockFetcher(cctx.Client), cctx + host, port, err := net.SplitHostPort(cctx.GRPCClient.Target()) + require.NoError(t, err) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) + return fetcher, cctx } // fillBlocks fills blocks until the context is canceled. @@ -153,7 +161,7 @@ func generateNonEmptyBlocks( sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) defer func() { - err = fetcher.UnsubscribeNewBlockEvent(ctx) + err = fetcher.Stop(ctx) require.NoError(t, err) }() diff --git a/core/fetcher.go b/core/fetcher.go index f2b160e108..19f7c12868 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -4,9 +4,14 @@ import ( "context" "errors" "fmt" + "io" + "sync/atomic" + "time" + "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" - coretypes "github.com/tendermint/tendermint/rpc/core/types" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" libhead "github.com/celestiaorg/go-header" @@ -14,27 +19,53 @@ import ( const newBlockSubscriber = "NewBlock/Events" +var ErrClientNotRunning = errors.New("gRPC connection to core node is not running") + +type SignedBlock struct { + Header *types.Header `json:"header"` + Commit *types.Commit `json:"commit"` + Data *types.Data `json:"data"` + ValidatorSet *types.ValidatorSet `json:"validator_set"` +} + var ( log = logging.Logger("core") newDataSignedBlockQuery = types.QueryForEvent(types.EventSignedBlock).String() ) type BlockFetcher struct { - client Client + client *Client - doneCh chan struct{} - cancel context.CancelFunc + doneCh chan struct{} + cancel context.CancelFunc + isListeningForBlocks atomic.Bool } // NewBlockFetcher returns a new `BlockFetcher`. -func NewBlockFetcher(client Client) *BlockFetcher { +func NewBlockFetcher(client *Client) (*BlockFetcher, error) { return &BlockFetcher{ client: client, + }, nil +} + +// Stop stops the block fetcher. +// The underlying gRPC connection needs to be stopped separately. +func (f *BlockFetcher) Stop(ctx context.Context) error { + f.cancel() + select { + case <-f.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) } } // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. -func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) { +func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, nil, ErrClientNotRunning + } commit, err := f.Commit(ctx, height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) @@ -45,7 +76,7 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. // commit and getting the latest validator set. Therefore, it is // best to get the validator set at the latest commit's height to // prevent this potential inconsistency. - valSet, err := f.ValidatorSet(ctx, &commit.Height) + valSet, err := f.ValidatorSet(ctx, commit.Height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting validator set at height %d: %w", height, err) } @@ -54,41 +85,66 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. } // GetBlock queries Core for a `Block` at the given height. -func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) { - res, err := f.client.Block(ctx, height) +// if the height is nil, use the latest height +func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, ErrClientNotRunning + } + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err } - - if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, height: %d", height) + block, err := receiveBlockByHeight(stream) + if err != nil { + return nil, err } - - return res.Block, nil + return block, nil } func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { - res, err := f.client.BlockByHash(ctx, hash) + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, ErrClientNotRunning + } + if hash == nil { + return nil, fmt.Errorf("cannot get block with nil hash") + } + stream, err := f.client.BlockByHash(ctx, &coregrpc.BlockByHashRequest{Hash: hash}) if err != nil { return nil, err } - - if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, hash: %s", hash.String()) + block, err := receiveBlockByHash(stream) + if err != nil { + return nil, err } - return res.Block, nil + return block, nil } // GetSignedBlock queries Core for a `Block` at the given height. -func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*coretypes.ResultSignedBlock, error) { - return f.client.SignedBlock(ctx, height) +// if the height is nil, use the latest height. +func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, ErrClientNotRunning + } + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) + if err != nil { + return nil, err + } + return receiveBlockByHeight(stream) } // Commit queries Core for a `Commit` from the block at // the given height. -func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit, error) { - res, err := f.client.Commit(ctx, height) +// If the height is nil, use the latest height. +func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, ErrClientNotRunning + } + res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height}) if err != nil { return nil, err } @@ -97,45 +153,55 @@ func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit return nil, fmt.Errorf("core/fetcher: commit not found at height %d", height) } - return res.Commit, nil + commit, err := types.CommitFromProto(res.Commit) + if err != nil { + return nil, err + } + + return commit, nil } // ValidatorSet queries Core for the ValidatorSet from the // block at the given height. -func (f *BlockFetcher) ValidatorSet(ctx context.Context, height *int64) (*types.ValidatorSet, error) { - perPage := 100 - - vals, total := make([]*types.Validator, 0), -1 - for page := 1; len(vals) != total; page++ { - res, err := f.client.Validators(ctx, height, &page, &perPage) - if err != nil { - return nil, err - } +// If the height is nil, use the latest height. +func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, ErrClientNotRunning + } + res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height}) + if err != nil { + return nil, err + } - if res != nil && len(res.Validators) == 0 { - return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) - } + if res != nil && res.ValidatorSet == nil { + return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) + } - total = res.Total - vals = append(vals, res.Validators...) + validatorSet, err := types.ValidatorSetFromProto(res.ValidatorSet) + if err != nil { + return nil, err } - return types.NewValidatorSet(vals), nil + return validatorSet, nil } // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - // start the client if not started yet + // return error if the client is still not started if !f.client.IsRunning() { - return nil, errors.New("client not running") + return nil, ErrClientNotRunning + } + if f.isListeningForBlocks.Load() { + return nil, fmt.Errorf("already subscribed to new blocks") } - ctx, cancel := context.WithCancel(ctx) f.cancel = cancel f.doneCh = make(chan struct{}) + f.isListeningForBlocks.Store(true) - eventChan, err := f.client.Subscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) + subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { return nil, err } @@ -144,18 +210,34 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types go func() { defer close(f.doneCh) defer close(signedBlockCh) + defer func() { f.isListeningForBlocks.Store(false) }() for { select { case <-ctx.Done(): return - case newEvent, ok := <-eventChan: - if !ok { - log.Errorw("fetcher: new blocks subscription channel closed unexpectedly") - return + default: + resp, err := subscription.Recv() + if err != nil { + log.Errorw("fetcher: error receiving new height", "err", err.Error()) + continue + } + withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) + signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) + ctxCancel() + if err != nil { + log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) + // sleeping a bit to avoid retrying instantly and give time for the gRPC connection + // to recover automatically. + time.Sleep(time.Second) + continue } - signedBlock := newEvent.Data.(types.EventDataSignedBlock) select { - case signedBlockCh <- signedBlock: + case signedBlockCh <- types.EventDataSignedBlock{ + Header: *signedBlock.Header, + Commit: *signedBlock.Commit, + ValidatorSet: *signedBlock.ValidatorSet, + Data: *signedBlock.Data, + }: case <-ctx.Done(): return } @@ -166,24 +248,105 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types return signedBlockCh, nil } -// UnsubscribeNewBlockEvent stops the subscription to new block events from Core. -func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error { - f.cancel() - select { - case <-f.doneCh: - case <-ctx.Done(): - return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) - } - return f.client.Unsubscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) -} - // IsSyncing returns the sync status of the Core connection: true for // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { - resp, err := f.client.Status(ctx) + // return error if the client is still not started + if !f.client.IsRunning() { + return false, ErrClientNotRunning + } + resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) if err != nil { return false, err } return resp.SyncInfo.CatchingUp, nil } + +func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( + *SignedBlock, + error, +) { + parts := make([]*tmproto.Part, 0) + + // receive the first part to get the block meta, commit, and validator set + firstPart, err := streamer.Recv() + if err != nil { + return nil, err + } + commit, err := types.CommitFromProto(firstPart.Commit) + if err != nil { + return nil, err + } + validatorSet, err := types.ValidatorSetFromProto(firstPart.ValidatorSet) + if err != nil { + return nil, err + } + parts = append(parts, firstPart.BlockPart) + + // receive the rest of the block + isLast := firstPart.IsLast + for !isLast { + resp, err := streamer.Recv() + if err != nil { + return nil, err + } + parts = append(parts, resp.BlockPart) + isLast = resp.IsLast + } + block, err := partsToBlock(parts) + if err != nil { + return nil, err + } + return &SignedBlock{ + Header: &block.Header, + Commit: commit, + Data: &block.Data, + ValidatorSet: validatorSet, + }, nil +} + +func receiveBlockByHash(streamer coregrpc.BlockAPI_BlockByHashClient) (*types.Block, error) { + parts := make([]*tmproto.Part, 0) + isLast := false + for !isLast { + resp, err := streamer.Recv() + if err != nil { + return nil, err + } + parts = append(parts, resp.BlockPart) + isLast = resp.IsLast + } + return partsToBlock(parts) +} + +// partsToBlock takes a slice of parts and generates the corresponding block. +// It empties the slice to optimize the memory usage. +func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { + partSet := types.NewPartSetFromHeader(types.PartSetHeader{ + Total: uint32(len(parts)), + }) + for _, part := range parts { + ok, err := partSet.AddPartWithoutProof(&types.Part{Index: part.Index, Bytes: part.Bytes}) + if err != nil { + return nil, err + } + if !ok { + return nil, err + } + } + pbb := new(tmproto.Block) + bz, err := io.ReadAll(partSet.GetReader()) + if err != nil { + return nil, err + } + err = proto.Unmarshal(bz, pbb) + if err != nil { + return nil, err + } + block, err := types.BlockFromProto(pbb) + if err != nil { + return nil, err + } + return block, nil +} diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 8b3af8e5e1..2f34e4bc05 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -4,6 +4,7 @@ package core import ( "context" + "net" "testing" "time" @@ -18,8 +19,13 @@ func TestBlockFetcherHeaderValues(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + node := StartTestNode(t) + host, port, err := net.SplitHostPort(node.GRPCClient.Target()) + require.NoError(t, err) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) @@ -33,10 +39,10 @@ func TestBlockFetcherHeaderValues(t *testing.T) { require.NoError(t, ctx.Err()) } // get Commit from current height - commit, err := fetcher.Commit(ctx, &h) + commit, err := fetcher.Commit(ctx, h) require.NoError(t, err) // get ValidatorSet from current height - valSet, err := fetcher.ValidatorSet(ctx, &h) + valSet, err := fetcher.ValidatorSet(ctx, h) require.NoError(t, err) // get next block var nextBlock types.EventDataSignedBlock @@ -51,5 +57,5 @@ func TestBlockFetcherHeaderValues(t *testing.T) { // compare ValidatorSet hash to the ValidatorsHash from first block height hexBytes := valSet.Hash() assert.Equal(t, nextBlock.ValidatorSet.Hash(), hexBytes) - require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx)) + require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 42afa42bcd..14eeab0bd7 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -2,6 +2,7 @@ package core import ( "context" + "net" "testing" "time" @@ -13,8 +14,12 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) @@ -24,16 +29,16 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { select { case newBlockFromChan := <-newBlockChan: h := newBlockFromChan.Header.Height - block, err := fetcher.GetSignedBlock(ctx, &h) + block, err := fetcher.GetSignedBlock(ctx, h) require.NoError(t, err) - assert.Equal(t, newBlockFromChan.Data, block.Data) - assert.Equal(t, newBlockFromChan.Header, block.Header) - assert.Equal(t, newBlockFromChan.Commit, block.Commit) - assert.Equal(t, newBlockFromChan.ValidatorSet, block.ValidatorSet) + assert.Equal(t, newBlockFromChan.Data, *block.Data) + assert.Equal(t, newBlockFromChan.Header, *block.Header) + assert.Equal(t, newBlockFromChan.Commit, *block.Commit) + assert.Equal(t, newBlockFromChan.ValidatorSet, *block.ValidatorSet) require.GreaterOrEqual(t, newBlockFromChan.Header.Height, int64(i)) case <-ctx.Done(): require.NoError(t, ctx.Err()) } } - require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx)) + require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/header_test.go b/core/header_test.go index 7b7eb3a7b7..d3c8ab116a 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "net" "testing" "github.com/stretchr/testify/assert" @@ -20,24 +21,28 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) <-sub height := int64(1) - b, err := fetcher.GetBlock(ctx, &height) + b, err := fetcher.GetBlock(ctx, height) require.NoError(t, err) - comm, val, err := fetcher.GetBlockInfo(ctx, &height) + comm, val, err := fetcher.GetBlockInfo(ctx, height) require.NoError(t, err) eds, err := extendBlock(b.Data, b.Header.Version.App) require.NoError(t, err) - headerExt, err := header.MakeExtendedHeader(&b.Header, comm, val, eds) + headerExt, err := header.MakeExtendedHeader(b.Header, comm, val, eds) require.NoError(t, err) assert.Equal(t, share.EmptyEDSRoots(), headerExt.DAH) diff --git a/core/listener.go b/core/listener.go index 1d7dc1b061..e2dd561dd1 100644 --- a/core/listener.go +++ b/core/listener.go @@ -109,9 +109,9 @@ func (cl *Listener) Start(context.Context) error { // Stop stops the listener loop. func (cl *Listener) Stop(ctx context.Context) error { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) + err := cl.fetcher.Stop(ctx) if err != nil { - log.Warnw("listener: unsubscribing from new block event", "err", err) + log.Warnw("listener: stopping gRPC block event", "err", err) } cl.cancel() @@ -154,7 +154,7 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat } func (cl *Listener) resubscribe(ctx context.Context) <-chan types.EventDataSignedBlock { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) + err := cl.fetcher.Stop(ctx) if err != nil { log.Warnw("listener: unsubscribe", "err", err) } @@ -226,7 +226,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS attribute.Int64("height", b.Header.Height), ) - eds, err := extendBlock(b.Data, b.Header.Version.App) + eds, err := extendBlock(&b.Data, b.Header.Version.App) if err != nil { return fmt.Errorf("extending block data: %w", err) } diff --git a/core/testing.go b/core/testing.go index 6d2aa8cc36..d4b5f6334b 100644 --- a/core/testing.go +++ b/core/testing.go @@ -1,13 +1,9 @@ package core import ( - "net" - "net/url" "testing" "time" - "github.com/stretchr/testify/require" - tmconfig "github.com/tendermint/tendermint/config" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/celestiaorg/celestia-app/v3/test/util/genesis" @@ -50,37 +46,12 @@ func StartTestNode(t *testing.T) testnode.Context { func StartTestNodeWithConfig(t *testing.T, cfg *testnode.Config) testnode.Context { cctx, _, _ := testnode.NewNetwork(t, cfg) // we want to test over remote http client, - // so we are as close to the real environment as possible - // however, it might be useful to use local tendermint client - // if you need to debug something inside of it - ip, port, err := getEndpoint(cfg.TmConfig) - require.NoError(t, err) - client, err := NewRemote(ip, port) - require.NoError(t, err) - - err = client.Start() - require.NoError(t, err) - t.Cleanup(func() { - err := client.Stop() - require.NoError(t, err) - }) - - cctx.WithClient(client) + // so we are as close to the real environment as possible, + // however, it might be useful to use a local tendermint client + // if you need to debug something inside it return cctx } -func getEndpoint(cfg *tmconfig.Config) (string, string, error) { - url, err := url.Parse(cfg.RPC.ListenAddress) - if err != nil { - return "", "", err - } - host, _, err := net.SplitHostPort(url.Host) - if err != nil { - return "", "", err - } - return host, url.Port(), nil -} - // generateRandomAccounts generates n random account names. func generateRandomAccounts(n int) []string { accounts := make([]string, n) diff --git a/go.mod b/go.mod index b2ad5e8a62..ae4ec1a064 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/gorilla/mux v1.8.1 github.com/grafana/otel-profiling-go v0.5.1 github.com/grafana/pyroscope-go v1.1.2 - github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.24.0 @@ -354,12 +353,12 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 + github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.44.0-tm-v0.34.35.0.20241127151511-4ca8293c2468 ) replace github.com/ipfs/boxo => github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b diff --git a/go.sum b/go.sum index bbb12f2221..b35cec4976 100644 --- a/go.sum +++ b/go.sum @@ -349,10 +349,10 @@ github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b h1:M9X7s1WJ/7Ju84 github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b/go.mod h1:OpUrJtGmZZktUqJvPOtmP8wSfEFcdF/55d3PNCcYLwc= github.com/celestiaorg/celestia-app/v3 v3.0.2 h1:N9KOGcedhbQpK4XfDZ/OG5za/bV94N4QE72o4gSZ+EA= github.com/celestiaorg/celestia-app/v3 v3.0.2/go.mod h1:Ut3ytZG2+RcmeCxrYyJ5KOGaFoGnVcShIN+IufyDDSY= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 h1:L4GTm+JUXhB0a/nGPMq6jEqqe6THuYSQ8m2kUCtZYqw= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 h1:f+fTe7GGk0/qgdzyqB8kk8EcDf9d6MC22khBTQiDXsU= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16/go.mod h1:07Z8HJqS8Rw4XlZ+ok3D3NM/X/in8mvcGLvl0Zb5wrA= +github.com/celestiaorg/celestia-core v1.44.0-tm-v0.34.35.0.20241127151511-4ca8293c2468 h1:3g5si/M4gT4S3vpZs0QWkEKXMTHz2EdHHmjUGkR3sgg= +github.com/celestiaorg/celestia-core v1.44.0-tm-v0.34.35.0.20241127151511-4ca8293c2468/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= +github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 h1:3lqz1cEs0wx1PWQQezEaYttAMYMsdxU677Jh58keyyc= +github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354/go.mod h1:xDj0DMkHYv8u4oMOV88QU7g2f9nbHjVYlJ3S/6HeTEs= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= github.com/celestiaorg/go-fraud v0.2.1/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.6.3 h1:VI+fsNxFLeUS7cNn0LgHP6Db66uslnKp/fgMg5nxqHg= @@ -600,8 +600,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= @@ -919,8 +917,6 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-getter v1.7.5 h1:dT58k9hQ/vbxNMwoI5+xFYAJuv6152UNvdHokfI5wE4= github.com/hashicorp/go-getter v1.7.5/go.mod h1:W7TalhMmbPmsSMdNjD0ZskARur/9GJ17cfHTRtXV744= -github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= -github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= @@ -929,8 +925,6 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= -github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo= github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I= diff --git a/nodebuilder/config_test.go b/nodebuilder/config_test.go index 519466d88e..227fefd3c7 100644 --- a/nodebuilder/config_test.go +++ b/nodebuilder/config_test.go @@ -61,8 +61,7 @@ func TestUpdateConfig(t *testing.T) { var outdatedConfig = ` [Core] IP = "0.0.0.0" - RPCPort = "0" - GRPCPort = "0" + Port = "0" [State] DefaultKeyName = "thisshouldnthavechanged" diff --git a/nodebuilder/core/config.go b/nodebuilder/core/config.go index b82be1d809..d13ab45f66 100644 --- a/nodebuilder/core/config.go +++ b/nodebuilder/core/config.go @@ -8,26 +8,23 @@ import ( ) const ( - DefaultRPCPort = "26657" - DefaultGRPCPort = "9090" + DefaultPort = "9090" ) var MetricsEnabled bool // Config combines all configuration fields for managing the relationship with a Core node. type Config struct { - IP string - RPCPort string - GRPCPort string + IP string + Port string } // DefaultConfig returns default configuration for managing the // node's connection to a Celestia-Core endpoint. func DefaultConfig() Config { return Config{ - IP: "", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "", + Port: DefaultPort, } } @@ -37,10 +34,7 @@ func (cfg *Config) Validate() error { return nil } - if cfg.RPCPort == "" { - return fmt.Errorf("nodebuilder/core: rpc port is not set") - } - if cfg.GRPCPort == "" { + if cfg.Port == "" { return fmt.Errorf("nodebuilder/core: grpc port is not set") } @@ -49,11 +43,7 @@ func (cfg *Config) Validate() error { return err } cfg.IP = ip - _, err = strconv.Atoi(cfg.RPCPort) - if err != nil { - return fmt.Errorf("nodebuilder/core: invalid rpc port: %s", err.Error()) - } - _, err = strconv.Atoi(cfg.GRPCPort) + _, err = strconv.Atoi(cfg.Port) if err != nil { return fmt.Errorf("nodebuilder/core: invalid grpc port: %s", err.Error()) } diff --git a/nodebuilder/core/config_test.go b/nodebuilder/core/config_test.go index 6535e28807..90994d6e15 100644 --- a/nodebuilder/core/config_test.go +++ b/nodebuilder/core/config_test.go @@ -15,9 +15,8 @@ func TestValidate(t *testing.T) { { name: "valid config", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectErr: false, }, @@ -29,52 +28,31 @@ func TestValidate(t *testing.T) { { name: "hostname preserved", cfg: Config{ - IP: "celestia.org", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "celestia.org", + Port: DefaultPort, }, expectErr: false, }, - { - name: "missing RPC port", - cfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, - }, - expectErr: true, - }, { name: "missing GRPC port", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, + IP: "127.0.0.1", }, expectErr: true, }, { name: "invalid IP, but will be accepted as host and not raise an error", cfg: Config{ - IP: "invalid-ip", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "invalid-ip", + Port: DefaultPort, }, expectErr: false, }, { - name: "invalid RPC port", - cfg: Config{ - IP: "127.0.0.1", - RPCPort: "invalid-port", - GRPCPort: DefaultGRPCPort, - }, - expectErr: true, - }, - { - name: "invalid GRPC port", + name: "invalid port", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: "invalid-port", + IP: "127.0.0.1", + Port: "invalid-port", }, expectErr: true, }, diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index 53c914a041..ae598a6029 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -4,6 +4,6 @@ import ( "github.com/celestiaorg/celestia-node/core" ) -func remote(cfg Config) (core.Client, error) { - return core.NewRemote(cfg.IP, cfg.RPCPort) +func remote(cfg Config) *core.Client { + return core.NewClient(cfg.IP, cfg.Port) } diff --git a/nodebuilder/core/flags.go b/nodebuilder/core/flags.go index 127ee5ee60..ffe608f54c 100644 --- a/nodebuilder/core/flags.go +++ b/nodebuilder/core/flags.go @@ -9,7 +9,6 @@ import ( var ( coreFlag = "core.ip" - coreRPCFlag = "core.rpc.port" coreGRPCFlag = "core.grpc.port" ) @@ -22,16 +21,11 @@ func Flags() *flag.FlagSet { "", "Indicates node to connect to the given core node. "+ "Example: , 127.0.0.1. , subdomain.domain.tld "+ - "Assumes RPC port 26657 and gRPC port 9090 as default unless otherwise specified.", - ) - flags.String( - coreRPCFlag, - DefaultRPCPort, - "Set a custom RPC port for the core node connection. The --core.ip flag must also be provided.", + "Assumes gRPC port 9090 as default unless otherwise specified.", ) flags.String( coreGRPCFlag, - DefaultGRPCPort, + DefaultPort, "Set a custom gRPC port for the core node connection. The --core.ip flag must also be provided.", ) return flags @@ -44,20 +38,15 @@ func ParseFlags( ) error { coreIP := cmd.Flag(coreFlag).Value.String() if coreIP == "" { - if cmd.Flag(coreGRPCFlag).Changed || cmd.Flag(coreRPCFlag).Changed { - return fmt.Errorf("cannot specify RPC/gRPC ports without specifying an IP address for --core.ip") + if cmd.Flag(coreGRPCFlag).Changed { + return fmt.Errorf("cannot specify gRPC port without specifying an IP address for --core.ip") } return nil } - if cmd.Flag(coreRPCFlag).Changed { - rpc := cmd.Flag(coreRPCFlag).Value.String() - cfg.RPCPort = rpc - } - if cmd.Flag(coreGRPCFlag).Changed { grpc := cmd.Flag(coreGRPCFlag).Value.String() - cfg.GRPCPort = grpc + cfg.Port = grpc } cfg.IP = coreIP diff --git a/nodebuilder/core/flags_test.go b/nodebuilder/core/flags_test.go index ce906de037..a1a11a9a23 100644 --- a/nodebuilder/core/flags_test.go +++ b/nodebuilder/core/flags_test.go @@ -20,9 +20,8 @@ func TestParseFlags(t *testing.T) { args: []string{}, inputCfg: Config{}, expectedCfg: Config{ - IP: "", - RPCPort: "", - GRPCPort: "", + IP: "", + Port: "", }, expectError: false, }, @@ -30,13 +29,11 @@ func TestParseFlags(t *testing.T) { name: "only core.ip", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + Port: DefaultPort, }, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectError: false, }, @@ -45,9 +42,8 @@ func TestParseFlags(t *testing.T) { args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{}, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectError: true, }, @@ -55,14 +51,12 @@ func TestParseFlags(t *testing.T) { name: "no flags, values from input config.toml ", args: []string{}, inputCfg: Config{ - IP: "127.162.36.1", - RPCPort: "1234", - GRPCPort: "5678", + IP: "127.162.36.1", + Port: "5678", }, expectedCfg: Config{ - IP: "127.162.36.1", - RPCPort: "1234", - GRPCPort: "5678", + IP: "127.162.36.1", + Port: "5678", }, expectError: false, }, @@ -70,27 +64,11 @@ func TestParseFlags(t *testing.T) { name: "only core.ip, with config.toml overridden defaults for ports", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - RPCPort: "1234", - GRPCPort: "5678", + Port: "5678", }, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: "1234", - GRPCPort: "5678", - }, - expectError: false, - }, - { - name: "core.ip and core.rpc.port", - args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345"}, - inputCfg: Config{ - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, - }, - expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: "12345", - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: "5678", }, expectError: false, }, @@ -98,32 +76,14 @@ func TestParseFlags(t *testing.T) { name: "core.ip and core.grpc.port", args: []string{"--core.ip=127.0.0.1", "--core.grpc.port=54321"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, - GRPCPort: DefaultGRPCPort, - }, - expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, - GRPCPort: "54321", + Port: DefaultPort, }, - expectError: false, - }, - { - name: "core.ip, core.rpc.port, and core.grpc.port", - args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345", "--core.grpc.port=54321"}, expectedCfg: Config{ - IP: "127.0.0.1", - RPCPort: "12345", - GRPCPort: "54321", + IP: "127.0.0.1", + Port: "54321", }, expectError: false, }, - { - name: "core.rpc.port without core.ip", - args: []string{"--core.rpc.port=12345"}, - expectedCfg: Config{}, - expectError: true, - }, { name: "core.grpc.port without core.ip", args: []string{"--core.grpc.port=54321"}, diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index 441907ce32..61a4e3468c 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -76,10 +76,10 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option )), fx.Provide(fx.Annotate( remote, - fx.OnStart(func(_ context.Context, client core.Client) error { + fx.OnStart(func(_ context.Context, client *core.Client) error { return client.Start() }), - fx.OnStop(func(_ context.Context, client core.Client) error { + fx.OnStop(func(_ context.Context, client *core.Client) error { return client.Stop() }), )), diff --git a/nodebuilder/core/opts.go b/nodebuilder/core/opts.go index 56347a5cb6..26b25b4541 100644 --- a/nodebuilder/core/opts.go +++ b/nodebuilder/core/opts.go @@ -8,8 +8,8 @@ import ( "github.com/celestiaorg/celestia-node/libs/fxutil" ) -// WithClient sets custom client for core process -func WithClient(client core.Client) fx.Option { +// WithClient sets a custom client for core process +func WithClient(client *core.Client) fx.Option { return fxutil.ReplaceAs(client, new(core.Client)) } diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index d2b7ebaf4e..a4cac93c99 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -2,6 +2,7 @@ package nodebuilder import ( "context" + "net" "testing" "github.com/stretchr/testify/require" @@ -19,7 +20,10 @@ func TestBridge_WithMockedCoreClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client := core.StartTestNode(t).Client + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + client := core.NewClient(host, port) + require.NoError(t, client.Start()) node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(client)) require.NoError(t, err) require.NotNil(t, node) diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index bcf5f01942..d78be08903 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -4,6 +4,7 @@ package nodebuilder import ( "context" + "net" "net/http" "net/http/httptest" "strconv" @@ -15,6 +16,7 @@ import ( collectormetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" "google.golang.org/protobuf/proto" + "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/nodebuilder/node" ) @@ -29,7 +31,16 @@ func TestLifecycle(t *testing.T) { for i, tt := range test { t.Run(strconv.Itoa(i), func(t *testing.T) { - node := TestNode(t, tt.tp) + // we're also creating a test node because the gRPC connection + // is started automatically when starting the node. + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + + cfg := DefaultConfig(tt.tp) + cfg.Core.IP = host + cfg.Core.Port = port + + node := TestNodeWithConfig(t, tt.tp, cfg) require.NotNil(t, node) require.NotNil(t, node.Config) require.NotNil(t, node.Host) @@ -41,7 +52,7 @@ func TestLifecycle(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := node.Start(ctx) + err = node.Start(ctx) require.NoError(t, err) err = node.Stop(ctx) @@ -67,9 +78,19 @@ func TestLifecycle_WithMetrics(t *testing.T) { for i, tt := range test { t.Run(strconv.Itoa(i), func(t *testing.T) { - node := TestNode( + // we're also creating a test node because the gRPC connection + // is started automatically when starting the node. + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + + cfg := DefaultConfig(tt.tp) + cfg.Core.IP = host + cfg.Core.Port = port + + node := TestNodeWithConfig( t, tt.tp, + cfg, WithMetrics( []otlpmetrichttp.Option{ otlpmetrichttp.WithEndpoint(otelCollectorURL), @@ -88,7 +109,7 @@ func TestLifecycle_WithMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := node.Start(ctx) + err = node.Start(ctx) require.NoError(t, err) err = node.Stop(ctx) diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index 39ab732368..c7c89b3ae2 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -30,7 +30,7 @@ func coreAccessor( *modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader], error, ) { - ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.GRPCPort, + ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.Port, network.String(), opts...) sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ diff --git a/nodebuilder/testing.go b/nodebuilder/testing.go index 0f2e046882..d205fca120 100644 --- a/nodebuilder/testing.go +++ b/nodebuilder/testing.go @@ -1,6 +1,7 @@ package nodebuilder import ( + "net" "testing" "github.com/cosmos/cosmos-sdk/crypto/hd" @@ -69,12 +70,15 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti fxutil.ReplaceAs(headertest.NewStore(t), new(libhead.Store[*header.ExtendedHeader])), ) - // in fact, we don't need core.Client in tests, but Bridge requires is a valid one - // or fails otherwise with failed attempt to connect with custom build client + // in fact, we don't need core.Client in tests, but the Bridge node requires a valid one. + // otherwise, it fails with a failed attempt to connect with a custom build client. if tp == node.Bridge { - cctx := core.StartTestNode(t) + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + client := core.NewClient(host, port) + require.NoError(t, client.Start()) opts = append(opts, - fxutil.ReplaceAs(cctx.Client, new(core.Client)), + fxutil.ReplaceAs(client, new(core.Client)), ) } diff --git a/nodebuilder/tests/fraud_test.go b/nodebuilder/tests/fraud_test.go index 524b306ac0..b156efe226 100644 --- a/nodebuilder/tests/fraud_test.go +++ b/nodebuilder/tests/fraud_test.go @@ -4,6 +4,7 @@ package tests import ( "context" + "net" "testing" "time" @@ -68,6 +69,8 @@ func TestFraudProofHandling(t *testing.T) { }) cfg := nodebuilder.DefaultConfig(node.Bridge) + cfg.Core.IP, cfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) // 1. bridge := sw.NewNodeWithConfig( node.Bridge, diff --git a/nodebuilder/tests/p2p_test.go b/nodebuilder/tests/p2p_test.go index 98e9fc15b4..4f7e87b5fb 100644 --- a/nodebuilder/tests/p2p_test.go +++ b/nodebuilder/tests/p2p_test.go @@ -4,6 +4,7 @@ package tests import ( "context" + "net" "testing" "time" @@ -77,8 +78,11 @@ func TestFullDiscoveryViaBootstrapper(t *testing.T) { // create and start a BN cfg := nodebuilder.DefaultConfig(node.Bridge) setTimeInterval(cfg, defaultTimeInterval) + var err error + cfg.Core.IP, cfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) bridge := sw.NewNodeWithConfig(node.Bridge, cfg) - err := bridge.Start(ctx) + err = bridge.Start(ctx) require.NoError(t, err) // use BN as the bootstrapper @@ -154,8 +158,11 @@ func TestRestartNodeDiscovery(t *testing.T) { // create and start a BN as a bootstrapper fullCfg := nodebuilder.DefaultConfig(node.Bridge) setTimeInterval(fullCfg, defaultTimeInterval) + var err error + fullCfg.Core.IP, fullCfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) bridge := sw.NewNodeWithConfig(node.Bridge, fullCfg) - err := bridge.Start(ctx) + err = bridge.Start(ctx) require.NoError(t, err) bridgeAddr := host.InfoFromHost(bridge.Host) diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go index a92117b7f2..6c6cccea50 100644 --- a/nodebuilder/tests/prune_test.go +++ b/nodebuilder/tests/prune_test.go @@ -5,6 +5,7 @@ package tests import ( "bytes" "context" + "net" "testing" "time" @@ -185,9 +186,12 @@ func TestConvertFromPrunedToArchival(t *testing.T) { for _, nt := range []node.Type{node.Bridge, node.Full} { pruningCfg := nodebuilder.DefaultConfig(nt) pruningCfg.Pruner.EnableService = true + var err error + pruningCfg.Core.IP, pruningCfg.Core.Port, err = net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) store := nodebuilder.MockStore(t, pruningCfg) pruningNode := sw.MustNewNodeWithStore(nt, store) - err := pruningNode.Start(ctx) + err = pruningNode.Start(ctx) require.NoError(t, err) err = pruningNode.Stop(ctx) require.NoError(t, err) diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 57d46bb64b..1f4a56b319 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -179,8 +179,15 @@ func (s *Swamp) setupGenesis() { store, err := store.NewStore(store.DefaultParameters(), s.t.TempDir()) require.NoError(s.t, err) + host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + require.NoError(s.t, err) + client := core.NewClient(host, port) + require.NoError(s.t, client.Start()) + fetcher, err := core.NewBlockFetcher(client) + require.NoError(s.t, err) + ex, err := core.NewExchange( - core.NewBlockFetcher(s.ClientContext.Client), + fetcher, store, header.MakeExtendedHeader, ) @@ -199,7 +206,7 @@ func (s *Swamp) DefaultTestConfig(tp node.Type) *nodebuilder.Config { require.NoError(s.t, err) cfg.Core.IP = ip - cfg.Core.GRPCPort = port + cfg.Core.Port = port return cfg } @@ -207,6 +214,9 @@ func (s *Swamp) DefaultTestConfig(tp node.Type) *nodebuilder.Config { // and a mockstore to the MustNewNodeWithStore method func (s *Swamp) NewBridgeNode(options ...fx.Option) *nodebuilder.Node { cfg := s.DefaultTestConfig(node.Bridge) + var err error + cfg.Core.IP, cfg.Core.Port, err = net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + require.NoError(s.t, err) store := nodebuilder.MockStore(s.t, cfg) return s.MustNewNodeWithStore(node.Bridge, store, options...) @@ -278,8 +288,16 @@ func (s *Swamp) NewNodeWithStore( switch tp { case node.Bridge: + host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + if err != nil { + return nil, err + } + client := core.NewClient(host, port) + if err := client.Start(); err != nil { + return nil, err + } options = append(options, - coremodule.WithClient(s.ClientContext.Client), + coremodule.WithClient(client), ) default: } diff --git a/state/core_access.go b/state/core_access.go index a363577b1c..1c127e20ce 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "sync" "time" @@ -69,7 +70,7 @@ type CoreAccessor struct { coreConn *grpc.ClientConn coreIP string - grpcPort string + port string network string // these fields are mutatable and thus need to be protected by a mutex @@ -91,7 +92,7 @@ func NewCoreAccessor( keyname string, getter libhead.Head[*header.ExtendedHeader], coreIP, - grpcPort string, + port string, network string, options ...Option, ) (*CoreAccessor, error) { @@ -105,7 +106,7 @@ func NewCoreAccessor( defaultSignerAccount: keyname, getter: getter, coreIP: coreIP, - grpcPort: grpcPort, + port: port, prt: prt, network: network, } @@ -123,7 +124,7 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { ca.ctx, ca.cancel = context.WithCancel(context.Background()) // dial given celestia-core endpoint - endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) + endpoint := net.JoinHostPort(ca.coreIP, ca.port) client, err := grpc.NewClient( endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()),