diff --git a/api/subscriptions/beat2_reader_test.go b/api/subscriptions/beat2_reader_test.go new file mode 100644 index 000000000..dbeecbd7f --- /dev/null +++ b/api/subscriptions/beat2_reader_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/thor" +) + +func TestBeat2Reader_Read(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + genesisBlk := generatedBlocks[0] + newBlock := generatedBlocks[1] + + // Act + beatReader := newBeat2Reader(repo, genesisBlk.Header().ID()) + res, ok, err := beatReader.Read() + + // Assert + assert.NoError(t, err) + assert.True(t, ok) + if beatMsg, ok := res[0].(*Beat2Message); !ok { + t.Fatal("unexpected type") + } else { + assert.Equal(t, newBlock.Header().Number(), beatMsg.Number) + assert.Equal(t, newBlock.Header().ID(), beatMsg.ID) + assert.Equal(t, newBlock.Header().ParentID(), beatMsg.ParentID) + assert.Equal(t, newBlock.Header().Timestamp(), beatMsg.Timestamp) + assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beatMsg.TxsFeatures) + } +} + +func TestBeat2Reader_Read_NoNewBlocksToRead(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + newBlock := generatedBlocks[1] + + // Act + beatReader := newBeat2Reader(repo, newBlock.Header().ID()) + res, ok, err := beatReader.Read() + + // Assert + assert.NoError(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} + +func TestBeat2Reader_Read_ErrorWhenReadingBlocks(t *testing.T) { + // Arrange + repo, _, _ := initChain(t) + + // Act + beatReader := newBeat2Reader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + res, ok, err := beatReader.Read() + + // Assert + assert.Error(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} diff --git a/api/subscriptions/beat_reader_test.go b/api/subscriptions/beat_reader_test.go new file mode 100644 index 000000000..6e6974af9 --- /dev/null +++ b/api/subscriptions/beat_reader_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/thor" +) + +func TestBeatReader_Read(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + genesisBlk := generatedBlocks[0] + newBlock := generatedBlocks[1] + + // Act + beatReader := newBeatReader(repo, genesisBlk.Header().ID()) + res, ok, err := beatReader.Read() + + // Assert + assert.NoError(t, err) + assert.True(t, ok) + if beatMsg, ok := res[0].(*BeatMessage); !ok { + t.Fatal("unexpected type") + } else { + assert.Equal(t, newBlock.Header().Number(), beatMsg.Number) + assert.Equal(t, newBlock.Header().ID(), beatMsg.ID) + assert.Equal(t, newBlock.Header().ParentID(), beatMsg.ParentID) + assert.Equal(t, newBlock.Header().Timestamp(), beatMsg.Timestamp) + assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beatMsg.TxsFeatures) + } +} + +func TestBeatReader_Read_NoNewBlocksToRead(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + newBlock := generatedBlocks[1] + + // Act + beatReader := newBeatReader(repo, newBlock.Header().ID()) + res, ok, err := beatReader.Read() + + // Assert + assert.NoError(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} + +func TestBeatReader_Read_ErrorWhenReadingBlocks(t *testing.T) { + // Arrange + repo, _, _ := initChain(t) + + // Act + beatReader := newBeatReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + res, ok, err := beatReader.Read() + + // Assert + assert.Error(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} diff --git a/api/subscriptions/block_reader_test.go b/api/subscriptions/block_reader_test.go new file mode 100644 index 000000000..d181f4fa1 --- /dev/null +++ b/api/subscriptions/block_reader_test.go @@ -0,0 +1,148 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/inconshreveable/log15" + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/block" + "github.com/vechain/thor/v2/chain" + "github.com/vechain/thor/v2/genesis" + "github.com/vechain/thor/v2/muxdb" + "github.com/vechain/thor/v2/packer" + "github.com/vechain/thor/v2/state" + "github.com/vechain/thor/v2/thor" + "github.com/vechain/thor/v2/tx" + "github.com/vechain/thor/v2/txpool" +) + +func init() { + log15.Root().SetHandler(log15.DiscardHandler()) +} + +func TestBlockReader_Read(t *testing.T) { + repo, generatedBlocks, _ := initChain(t) + genesisBlk := generatedBlocks[0] + newBlock := generatedBlocks[1] + + // Test case 1: Successful read next blocks + br := newBlockReader(repo, genesisBlk.Header().ID()) + res, ok, err := br.Read() + + assert.NoError(t, err) + assert.True(t, ok) + if resBlock, ok := res[0].(*BlockMessage); !ok { + t.Fatal("unexpected type") + } else { + assert.Equal(t, newBlock.Header().Number(), resBlock.Number) + assert.Equal(t, newBlock.Header().ParentID(), resBlock.ParentID) + } + + // Test case 2: There is no new block + br = newBlockReader(repo, newBlock.Header().ID()) + res, ok, err = br.Read() + + assert.NoError(t, err) + assert.False(t, ok) + assert.Empty(t, res) + + // Test case 3: Error when reading blocks + br = newBlockReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + res, ok, err = br.Read() + + assert.Error(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} + +func initChain(t *testing.T) (*chain.Repository, []*block.Block, *txpool.TxPool) { + db := muxdb.NewMem() + stater := state.NewStater(db) + gene := genesis.NewDevnet() + + b, _, _, err := gene.Build(stater) + if err != nil { + t.Fatal(err) + } + repo, _ := chain.NewRepository(db, b) + + txPool := txpool.New(repo, stater, txpool.Options{ + Limit: 100, + LimitPerAccount: 16, + MaxLifetime: time.Hour, + }) + + addr := thor.BytesToAddress([]byte("to")) + cla := tx.NewClause(&addr).WithValue(big.NewInt(10000)) + tr := new(tx.Builder). + ChainTag(repo.ChainTag()). + GasPriceCoef(1). + Expiration(10). + Gas(21000). + Nonce(1). + Clause(cla). + BlockRef(tx.NewBlockRef(0)). + Build() + + sig, err := crypto.Sign(tr.SigningHash().Bytes(), genesis.DevAccounts()[0].PrivateKey) + if err != nil { + t.Fatal(err) + } + tr = tr.WithSignature(sig) + packer := packer.New(repo, stater, genesis.DevAccounts()[0].Address, &genesis.DevAccounts()[0].Address, thor.NoFork) + sum, _ := repo.GetBlockSummary(b.Header().ID()) + flow, err := packer.Schedule(sum, uint64(time.Now().Unix())) + if err != nil { + t.Fatal(err) + } + err = flow.Adopt(tr) + if err != nil { + t.Fatal(err) + } + blk, stage, receipts, err := flow.Pack(genesis.DevAccounts()[0].PrivateKey, 0, false) + if err != nil { + t.Fatal(err) + } + if _, err := stage.Commit(); err != nil { + t.Fatal(err) + } + insertMockOutputEvent(receipts) + if err := repo.AddBlock(blk, receipts, 0); err != nil { + t.Fatal(err) + } + if err := repo.SetBestBlockID(blk.Header().ID()); err != nil { + t.Fatal(err) + } + return repo, []*block.Block{b, blk}, txPool +} + +// This is a helper function to forcly insert an event into the output receipts +func insertMockOutputEvent(receipts tx.Receipts) { + oldReceipt := receipts[0] + events := make(tx.Events, 0) + events = append(events, &tx.Event{ + Address: thor.BytesToAddress([]byte("to")), + Topics: []thor.Bytes32{thor.BytesToBytes32([]byte("topic"))}, + Data: []byte("data"), + }) + outputs := &tx.Output{ + Transfers: oldReceipt.Outputs[0].Transfers, + Events: events, + } + receipts[0] = &tx.Receipt{ + Reverted: oldReceipt.Reverted, + GasUsed: oldReceipt.GasUsed, + Outputs: []*tx.Output{outputs}, + GasPayer: oldReceipt.GasPayer, + Paid: oldReceipt.Paid, + Reward: oldReceipt.Reward, + } +} diff --git a/api/subscriptions/event_reader_test.go b/api/subscriptions/event_reader_test.go new file mode 100644 index 000000000..b1fe280ba --- /dev/null +++ b/api/subscriptions/event_reader_test.go @@ -0,0 +1,57 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/chain" +) + +func TestEventReader_Read(t *testing.T) { + repo, generatedBlocks, _ := initChain(t) + genesisBlk := generatedBlocks[0] + newBlock := generatedBlocks[1] + + er := &eventReader{ + repo: repo, + filter: &EventFilter{}, + blockReader: &mockBlockReaderWithError{}, + } + + // Test case 1: An error occurred while reading blocks + events, ok, err := er.Read() + assert.Error(t, err) + assert.Empty(t, events) + assert.False(t, ok) + + // Test case 2: Events are available to read + er = newEventReader(repo, genesisBlk.Header().ID(), &EventFilter{}) + + events, ok, err = er.Read() + + assert.NoError(t, err) + assert.True(t, ok) + var eventMessages []*EventMessage + for _, event := range events { + if msg, ok := event.(*EventMessage); ok { + eventMessages = append(eventMessages, msg) + } else { + t.Fatal("unexpected type") + } + } + assert.Equal(t, 1, len(eventMessages)) + eventMsg := eventMessages[0] + assert.Equal(t, newBlock.Header().ID(), eventMsg.Meta.BlockID) + assert.Equal(t, newBlock.Header().Number(), eventMsg.Meta.BlockNumber) +} + +type mockBlockReaderWithError struct{} + +func (m *mockBlockReaderWithError) Read() ([]*chain.ExtendedBlock, error) { + return nil, assert.AnError +} diff --git a/api/subscriptions/pending_tx_test.go b/api/subscriptions/pending_tx_test.go new file mode 100644 index 000000000..629b96e0c --- /dev/null +++ b/api/subscriptions/pending_tx_test.go @@ -0,0 +1,151 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/block" + "github.com/vechain/thor/v2/chain" + "github.com/vechain/thor/v2/genesis" + "github.com/vechain/thor/v2/muxdb" + "github.com/vechain/thor/v2/packer" + "github.com/vechain/thor/v2/state" + "github.com/vechain/thor/v2/thor" + "github.com/vechain/thor/v2/tx" + "github.com/vechain/thor/v2/txpool" +) + +func TestPendingTx_Subscribe(t *testing.T) { + _, _, txPool := initChain(t) + p := newPendingTx(txPool) + + // When initialized, there should be no listeners + assert.Empty(t, p.listeners, "There should be no listeners when initialized") + + ch := make(chan *tx.Transaction) + p.Subscribe(ch) + + assert.Contains(t, p.listeners, ch, "Subscribe should add the channel to the listeners") +} + +func TestPendingTx_Unsubscribe(t *testing.T) { + _, _, txPool := initChain(t) + p := newPendingTx(txPool) + + ch := make(chan *tx.Transaction) + ch2 := make(chan *tx.Transaction) + p.Subscribe(ch) + p.Subscribe(ch2) + + p.Unsubscribe(ch) + + assert.NotContains(t, p.listeners, ch, "Unsubscribe should remove the channel from the listeners") + assert.Contains(t, p.listeners, ch2, "Unsubscribe should not remove other channels") +} + +func TestPendingTx_DispatchLoop(t *testing.T) { + db := muxdb.NewMem() + gene := genesis.NewDevnet() + stater := state.NewStater(db) + b0, _, _, _ := gene.Build(stater) + repo, _ := chain.NewRepository(db, b0) + + txPool := txpool.New(repo, state.NewStater(db), txpool.Options{ + Limit: 100, + LimitPerAccount: 16, + MaxLifetime: time.Hour, + }) + p := newPendingTx(txPool) + + // Add new block to be in a sync state + addNewBlock(repo, stater, b0, t) + + // Create a channel to signal the end of the test + done := make(chan struct{}) + defer close(done) + + // Create a channel to receive the transaction + txCh := make(chan *tx.Transaction) + p.Subscribe(txCh) + + // Add a new tx to the mempool + transaction := createTx(t, repo, 0) + txPool.AddLocal(transaction) + + // Start the dispatch loop + go p.DispatchLoop(done) + + // Wait for the transaction to be dispatched + select { + case dispatchedTx := <-txCh: + assert.Equal(t, dispatchedTx, transaction) + case <-time.After(time.Second * 2): + t.Fatal("Timeout waiting for transaction dispatch") + } + + // Unsubscribe the channel + p.Unsubscribe(txCh) + + // Add another tx to the mempool + tx2 := createTx(t, repo, 1) + txPool.AddLocal(tx2) + + // Assert that the channel did not receive the second transaction + select { + case <-txCh: + t.Fatal("Received unexpected transaction") + case <-time.After(time.Second): + t.Log("No transaction received, which is expected") + } +} + +func addNewBlock(repo *chain.Repository, stater *state.Stater, b0 *block.Block, t *testing.T) { + packer := packer.New(repo, stater, genesis.DevAccounts()[0].Address, &genesis.DevAccounts()[0].Address, thor.NoFork) + sum, _ := repo.GetBlockSummary(b0.Header().ID()) + flow, err := packer.Schedule(sum, uint64(time.Now().Unix())) + if err != nil { + t.Fatal(err) + } + blk, stage, receipts, err := flow.Pack(genesis.DevAccounts()[0].PrivateKey, 0, false) + if err != nil { + t.Fatal(err) + } + if _, err := stage.Commit(); err != nil { + t.Fatal(err) + } + if err := repo.AddBlock(blk, receipts, 0); err != nil { + t.Fatal(err) + } + if err := repo.SetBestBlockID(blk.Header().ID()); err != nil { + t.Fatal(err) + } +} + +func createTx(t *testing.T, repo *chain.Repository, addressNumber uint) *tx.Transaction { + addr := thor.BytesToAddress([]byte("to")) + cla := tx.NewClause(&addr).WithValue(big.NewInt(10000)) + tx := new(tx.Builder). + ChainTag(repo.ChainTag()). + GasPriceCoef(1). + Expiration(10). + Gas(21000). + Nonce(1). + Clause(cla). + BlockRef(tx.NewBlockRef(0)). + Build() + sig, err := crypto.Sign(tx.SigningHash().Bytes(), genesis.DevAccounts()[addressNumber].PrivateKey) + if err != nil { + t.Fatal(err) + } + tx = tx.WithSignature(sig) + + return tx +} diff --git a/api/subscriptions/subscriptions.go b/api/subscriptions/subscriptions.go index ab8cda0d6..3665d2584 100644 --- a/api/subscriptions/subscriptions.go +++ b/api/subscriptions/subscriptions.go @@ -381,6 +381,6 @@ func (s *Subscriptions) Close() { func (s *Subscriptions) Mount(root *mux.Router, pathPrefix string) { sub := root.PathPrefix(pathPrefix).Subrouter() - sub.Path("/txpool").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handlePendingTransactions)) - sub.Path("/{subject}").Methods("Get").HandlerFunc(utils.WrapHandlerFunc(s.handleSubject)) + sub.Path("/txpool").Methods(http.MethodGet).HandlerFunc(utils.WrapHandlerFunc(s.handlePendingTransactions)) + sub.Path("/{subject}").Methods(http.MethodGet).HandlerFunc(utils.WrapHandlerFunc(s.handleSubject)) } diff --git a/api/subscriptions/subscriptions_test.go b/api/subscriptions/subscriptions_test.go new file mode 100644 index 000000000..f60c5a352 --- /dev/null +++ b/api/subscriptions/subscriptions_test.go @@ -0,0 +1,249 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/block" + "github.com/vechain/thor/v2/chain" + "github.com/vechain/thor/v2/thor" + "github.com/vechain/thor/v2/txpool" +) + +var ts *httptest.Server +var client *http.Client +var sub *Subscriptions +var txPool *txpool.TxPool +var repo *chain.Repository +var blocks []*block.Block + +func TestMain(t *testing.T) { + initSubscriptionsServer(t) + defer ts.Close() + + testHandlePendingTransactions(t) + testHandleSubjectWithBlock(t) + testHandleSubjectWithEvent(t) + testHandleSubjectWithTransfer(t) + testHandleSubjectWithBeat(t) + testHandleSubjectWithBeat2(t) + testHandleSubjectWithNonValidArgument(t) +} + +func testHandlePendingTransactions(t *testing.T) { + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/txpool"} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer conn.Close() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + // Add a new tx to the mempool + transaction := createTx(t, repo, 1) + txPool.AddLocal(transaction) + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var pendingTx *PendingTxIDMessage + if err := json.Unmarshal(msg, &pendingTx); err != nil { + t.Fatal(err) + } else { + assert.Equal(t, transaction.ID(), pendingTx.ID) + } +} + +func testHandleSubjectWithBlock(t *testing.T) { + genesisBlock := blocks[0] + queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/block", RawQuery: queryArg} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer conn.Close() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var blockMsg *BlockMessage + if err := json.Unmarshal(msg, &blockMsg); err != nil { + t.Fatal(err) + } else { + newBlock := blocks[1] + assert.Equal(t, newBlock.Header().Number(), blockMsg.Number) + assert.Equal(t, newBlock.Header().ID(), blockMsg.ID) + assert.Equal(t, newBlock.Header().Timestamp(), blockMsg.Timestamp) + } +} + +func testHandleSubjectWithEvent(t *testing.T) { + genesisBlock := blocks[0] + queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/event", RawQuery: queryArg} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer conn.Close() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var eventMsg *EventMessage + if err := json.Unmarshal(msg, &eventMsg); err != nil { + t.Fatal(err) + } else { + newBlock := blocks[1] + assert.Equal(t, newBlock.Header().Number(), eventMsg.Meta.BlockNumber) + assert.Equal(t, newBlock.Header().ID(), eventMsg.Meta.BlockID) + } +} + +func testHandleSubjectWithTransfer(t *testing.T) { + genesisBlock := blocks[0] + queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/transfer", RawQuery: queryArg} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer conn.Close() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var transferMsg *TransferMessage + if err := json.Unmarshal(msg, &transferMsg); err != nil { + t.Fatal(err) + } else { + newBlock := blocks[1] + assert.Equal(t, newBlock.Header().Number(), transferMsg.Meta.BlockNumber) + assert.Equal(t, newBlock.Header().ID(), transferMsg.Meta.BlockID) + } +} + +func testHandleSubjectWithBeat(t *testing.T) { + genesisBlock := blocks[0] + queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/beat", RawQuery: queryArg} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer conn.Close() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var beatMsg *BeatMessage + if err := json.Unmarshal(msg, &beatMsg); err != nil { + t.Fatal(err) + } else { + newBlock := blocks[1] + assert.Equal(t, newBlock.Header().Number(), beatMsg.Number) + assert.Equal(t, newBlock.Header().ID(), beatMsg.ID) + assert.Equal(t, newBlock.Header().Timestamp(), beatMsg.Timestamp) + } +} + +func testHandleSubjectWithBeat2(t *testing.T) { + genesisBlock := blocks[0] + queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/beat2", RawQuery: queryArg} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer conn.Close() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var beatMsg *Beat2Message + if err := json.Unmarshal(msg, &beatMsg); err != nil { + t.Fatal(err) + } else { + newBlock := blocks[1] + assert.Equal(t, newBlock.Header().Number(), beatMsg.Number) + assert.Equal(t, newBlock.Header().ID(), beatMsg.ID) + assert.Equal(t, newBlock.Header().GasLimit(), beatMsg.GasLimit) + } +} + +func testHandleSubjectWithNonValidArgument(t *testing.T) { + genesisBlock := blocks[0] + queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/randomArgument", RawQuery: queryArg} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + + assert.Error(t, err) + assert.Nil(t, conn) + assert.Equal(t, http.StatusNotFound, resp.StatusCode) +} + +func TestParseAddress(t *testing.T) { + addrStr := "0x0123456789abcdef0123456789abcdef01234567" + expectedAddr := thor.MustParseAddress(addrStr) + + result, err := parseAddress(addrStr) + + assert.NoError(t, err) + assert.Equal(t, expectedAddr, *result) +} + +func initSubscriptionsServer(t *testing.T) { + r, generatedBlocks, pool := initChain(t) + repo = r + txPool = pool + blocks = generatedBlocks + router := mux.NewRouter() + sub = New(repo, []string{}, 5, txPool) + sub.Mount(router, "/subscriptions") + ts = httptest.NewServer(router) + client = &http.Client{} +} diff --git a/api/subscriptions/transfer_reader_test.go b/api/subscriptions/transfer_reader_test.go new file mode 100644 index 000000000..8bef2175c --- /dev/null +++ b/api/subscriptions/transfer_reader_test.go @@ -0,0 +1,88 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/thor" +) + +func TestTransferReader_Read(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + genesisBlk := generatedBlocks[0] + newBlock := generatedBlocks[1] + filter := &TransferFilter{} + + // Act + br := newTransferReader(repo, genesisBlk.Header().ID(), filter) + res, ok, err := br.Read() + + // Assert + assert.NoError(t, err) + assert.True(t, ok) + if transferMsg, ok := res[0].(*TransferMessage); !ok { + t.Fatal("unexpected type") + } else { + assert.Equal(t, newBlock.Header().Number(), transferMsg.Meta.BlockNumber) + assert.Equal(t, newBlock.Header().ID(), transferMsg.Meta.BlockID) + assert.Equal(t, newBlock.Header().Timestamp(), transferMsg.Meta.BlockTimestamp) + assert.Equal(t, newBlock.Transactions()[0].ID(), transferMsg.Meta.TxID) + } +} + +func TestTransferReader_Read_NoNewBlocksToRead(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + newBlock := generatedBlocks[1] + filter := &TransferFilter{} + + // Act + br := newTransferReader(repo, newBlock.Header().ID(), filter) + res, ok, err := br.Read() + + // Assert + assert.NoError(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} + +func TestTransferReader_Read_ErrorWhenReadingBlocks(t *testing.T) { + // Arrange + repo, _, _ := initChain(t) + filter := &TransferFilter{} + + // Act + br := newTransferReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), filter) + res, ok, err := br.Read() + + // Assert + assert.Error(t, err) + assert.False(t, ok) + assert.Empty(t, res) +} + +func TestTransferReader_Read_NoTransferMatchingTheFilter(t *testing.T) { + // Arrange + repo, generatedBlocks, _ := initChain(t) + genesisBlk := generatedBlocks[0] + + nonExistingAddress := thor.MustParseAddress("0xffffffffffffffffffffffffffffffffffffffff") + badFilter := &TransferFilter{ + TxOrigin: &nonExistingAddress, + } + + // Act + br := newTransferReader(repo, genesisBlk.Header().ID(), badFilter) + res, ok, err := br.Read() + + // Assert + assert.NoError(t, err) + assert.True(t, ok) + assert.Empty(t, res) +} diff --git a/api/subscriptions/types_test.go b/api/subscriptions/types_test.go new file mode 100644 index 000000000..bffe862a5 --- /dev/null +++ b/api/subscriptions/types_test.go @@ -0,0 +1,338 @@ +// Copyright (c) 2024 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +package subscriptions + +import ( + "crypto/rand" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/assert" + "github.com/vechain/thor/v2/block" + "github.com/vechain/thor/v2/chain" + "github.com/vechain/thor/v2/genesis" + "github.com/vechain/thor/v2/muxdb" + "github.com/vechain/thor/v2/state" + "github.com/vechain/thor/v2/thor" + "github.com/vechain/thor/v2/tx" +) + +func TestConvertBlockWithBadSignature(t *testing.T) { + // Arrange + var sig [65]byte + rand.Read(sig[:]) + + b := new(block.Builder). + Build(). + WithSignature(sig[:]) + + extendedBlock := &chain.ExtendedBlock{Block: b, Obsolete: false} + + // Act + blockMessage, err := convertBlock(extendedBlock) + + // Assert + assert.Nil(t, blockMessage) + assert.Error(t, err) +} + +func TestConvertBlock(t *testing.T) { + // Arrange + b := new(block.Builder). + Build() + + sig, err := crypto.Sign(b.Header().SigningHash().Bytes(), genesis.DevAccounts()[0].PrivateKey) + if err != nil { + t.Fatal(err) + } + + b = b.WithSignature(sig) + extendedBlock := &chain.ExtendedBlock{Block: b, Obsolete: false} + + // Act + blockMessage, err := convertBlock(extendedBlock) + + // Assert + assert.NoError(t, err) + assert.Equal(t, b.Header().Number(), blockMessage.Number) + assert.Equal(t, b.Header().ParentID(), blockMessage.ParentID) + assert.Equal(t, uint32(b.Size()), blockMessage.Size) + assert.Equal(t, b.Header().ParentID(), blockMessage.ParentID) + assert.Equal(t, b.Header().Timestamp(), blockMessage.Timestamp) + assert.Equal(t, b.Header().GasLimit(), blockMessage.GasLimit) + assert.Equal(t, b.Header().Beneficiary(), blockMessage.Beneficiary) + assert.Equal(t, b.Header().GasUsed(), blockMessage.GasUsed) + assert.Equal(t, b.Header().TotalScore(), blockMessage.TotalScore) + assert.Equal(t, b.Header().TxsRoot(), blockMessage.TxsRoot) + assert.Equal(t, uint32(b.Header().TxsFeatures()), blockMessage.TxsFeatures) + assert.Equal(t, b.Header().StateRoot(), blockMessage.StateRoot) + assert.Equal(t, b.Header().ReceiptsRoot(), blockMessage.ReceiptsRoot) + assert.Equal(t, b.Header().COM(), blockMessage.COM) + blockSigner, err := b.Header().Signer() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, blockSigner, blockMessage.Signer) + assert.Equal(t, len(b.Transactions()), len(blockMessage.Transactions)) + assert.Equal(t, false, blockMessage.Obsolete) +} + +func TestConvertTransfer(t *testing.T) { + // Arrange + db := muxdb.NewMem() + stater := state.NewStater(db) + gene := genesis.NewDevnet() + + b, _, _, err := gene.Build(stater) + if err != nil { + t.Fatal(err) + } + repo, _ := chain.NewRepository(db, b) + + // New tx + transaction := new(tx.Builder). + ChainTag(repo.ChainTag()). + GasPriceCoef(1). + Expiration(10). + Gas(21000). + Nonce(1). + BlockRef(tx.NewBlockRef(0)). + Build() + + sig, err := crypto.Sign(transaction.SigningHash().Bytes(), genesis.DevAccounts()[0].PrivateKey) + if err != nil { + t.Fatal(err) + } + transaction = transaction.WithSignature(sig) + + // New block + blk := new(block.Builder). + Transaction(transaction). + Build() + + transfer := &tx.Transfer{ + Sender: thor.BytesToAddress([]byte("sender")), + Recipient: thor.BytesToAddress([]byte("recipient")), + Amount: big.NewInt(50), + } + + // Act + transferMessage, err := convertTransfer(blk.Header(), transaction, 0, transfer, false) + + // Assert + assert.NoError(t, err) + assert.Equal(t, transfer.Sender, transferMessage.Sender) + assert.Equal(t, transfer.Recipient, transferMessage.Recipient) + amount := (*math.HexOrDecimal256)(transfer.Amount) + assert.Equal(t, amount, transferMessage.Amount) + assert.Equal(t, blk.Header().ID(), transferMessage.Meta.BlockID) + assert.Equal(t, blk.Header().Number(), transferMessage.Meta.BlockNumber) + assert.Equal(t, blk.Header().Timestamp(), transferMessage.Meta.BlockTimestamp) + assert.Equal(t, transaction.ID(), transferMessage.Meta.TxID) + origin, err := transaction.Origin() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, origin, transferMessage.Meta.TxOrigin) + assert.Equal(t, uint32(0), transferMessage.Meta.ClauseIndex) + assert.Equal(t, false, transferMessage.Obsolete) +} + +func TestConvertEventWithBadSignature(t *testing.T) { + // Arrange + var sig [65]byte + rand.Read(sig[:]) + + // New tx + transaction := new(tx.Builder). + ChainTag(1). + GasPriceCoef(1). + Expiration(10). + Gas(21000). + Nonce(1). + BlockRef(tx.NewBlockRef(0)). + Build(). + WithSignature(sig[:]) + + // New block + blk := new(block.Builder). + Transaction(transaction). + Build() + + // New event + event := &tx.Event{} + + // Act + eventMessage, err := convertEvent(blk.Header(), transaction, 0, event, false) + + // Assert + assert.Error(t, err) + assert.Nil(t, eventMessage) +} + +func TestConvertEvent(t *testing.T) { + // Arrange + db := muxdb.NewMem() + stater := state.NewStater(db) + gene := genesis.NewDevnet() + + b, _, _, err := gene.Build(stater) + if err != nil { + t.Fatal(err) + } + repo, _ := chain.NewRepository(db, b) + + // New tx + transaction := new(tx.Builder). + ChainTag(repo.ChainTag()). + GasPriceCoef(1). + Expiration(10). + Gas(21000). + Nonce(1). + BlockRef(tx.NewBlockRef(0)). + Build() + + sig, err := crypto.Sign(transaction.SigningHash().Bytes(), genesis.DevAccounts()[0].PrivateKey) + if err != nil { + t.Fatal(err) + } + transaction = transaction.WithSignature(sig) + + // New block + blk := new(block.Builder). + Transaction(transaction). + Build() + + // New event + event := &tx.Event{ + Address: thor.BytesToAddress([]byte("address")), + Topics: []thor.Bytes32{ + {0x01}, + {0x02}, + {0x03}, + {0x04}, + {0x05}, + }, + Data: []byte("data"), + } + + // Act + eventMessage, err := convertEvent(blk.Header(), transaction, 0, event, false) + + // Assert + assert.NoError(t, err) + assert.Equal(t, event.Address, eventMessage.Address) + assert.Equal(t, hexutil.Encode(event.Data), eventMessage.Data) + assert.Equal(t, blk.Header().ID(), eventMessage.Meta.BlockID) + assert.Equal(t, blk.Header().Number(), eventMessage.Meta.BlockNumber) + assert.Equal(t, blk.Header().Timestamp(), eventMessage.Meta.BlockTimestamp) + assert.Equal(t, transaction.ID(), eventMessage.Meta.TxID) + signer, err := transaction.Origin() + if err != nil { + t.Fatal(err) + } + assert.Equal(t, signer, eventMessage.Meta.TxOrigin) + assert.Equal(t, uint32(0), eventMessage.Meta.ClauseIndex) + assert.Equal(t, event.Topics, eventMessage.Topics) + assert.Equal(t, false, eventMessage.Obsolete) +} + +func TestEventFilter_Match(t *testing.T) { + // Create an event filter + addr := thor.BytesToAddress([]byte("address")) + filter := &EventFilter{ + Address: &addr, + Topic0: &thor.Bytes32{0x01}, + Topic1: &thor.Bytes32{0x02}, + Topic2: &thor.Bytes32{0x03}, + Topic3: &thor.Bytes32{0x04}, + Topic4: &thor.Bytes32{0x05}, + } + + // Create an event that matches the filter + event := &tx.Event{ + Address: addr, + Topics: []thor.Bytes32{ + {0x01}, + {0x02}, + {0x03}, + {0x04}, + {0x05}, + }, + } + assert.True(t, filter.Match(event)) + + // Create an event that does not match the filter address + event = &tx.Event{ + Address: thor.BytesToAddress([]byte("other_address")), + Topics: []thor.Bytes32{ + {0x01}, + {0x02}, + {0x03}, + {0x04}, + {0x05}, + }, + } + assert.False(t, filter.Match(event)) + + // Create an event that does not match a filter topic + event = &tx.Event{ + Address: addr, + Topics: []thor.Bytes32{ + {0x05}, + {0x04}, + {0x03}, + {0x02}, + {0x01}, + }, + } + assert.False(t, filter.Match(event)) + + // Create an event that does not match a filter topic len + event = &tx.Event{ + Address: addr, + Topics: []thor.Bytes32{{0x01}}, + } + assert.False(t, filter.Match(event)) +} + +func TestTransferFilter_Match(t *testing.T) { + // Create a transfer filter + origin := thor.BytesToAddress([]byte("origin")) + sender := thor.BytesToAddress([]byte("sender")) + recipient := thor.BytesToAddress([]byte("recipient")) + filter := &TransferFilter{ + TxOrigin: &origin, + Sender: &sender, + Recipient: &recipient, + } + + // Create a transfer that matches the filter + transfer := &tx.Transfer{ + Sender: thor.BytesToAddress([]byte("sender")), + Recipient: thor.BytesToAddress([]byte("recipient")), + Amount: big.NewInt(100), + } + assert.True(t, filter.Match(transfer, origin)) + + // Create a transfer that does not match the filter + transfer = &tx.Transfer{ + Sender: thor.BytesToAddress([]byte("other_sender")), + Recipient: thor.BytesToAddress([]byte("recipient")), + Amount: big.NewInt(100), + } + assert.False(t, filter.Match(transfer, origin)) + assert.False(t, filter.Match(transfer, thor.BytesToAddress(nil))) + transfer = &tx.Transfer{ + Sender: sender, + Recipient: thor.BytesToAddress([]byte("other_recipient")), + Amount: big.NewInt(100), + } + assert.False(t, filter.Match(transfer, origin)) +} diff --git a/txpool/tx_pool.go b/txpool/tx_pool.go index e4f8ce5c7..f3dcd824e 100644 --- a/txpool/tx_pool.go +++ b/txpool/tx_pool.go @@ -252,11 +252,11 @@ func (p *TxPool) add(newTx *tx.Transaction, rejectNonExecutable bool, localSubmi return txRejectedError{"tx is not executable"} } + txObj.executable = executable if err := p.all.Add(txObj, p.options.LimitPerAccount); err != nil { return txRejectedError{err.Error()} } - txObj.executable = executable p.goes.Go(func() { p.txFeed.Send(&TxEvent{newTx, &executable}) })