From 4e58a32e1444f028a17dcc410927bd4d1e092523 Mon Sep 17 00:00:00 2001 From: otherview Date: Thu, 6 Jun 2024 15:42:28 +0100 Subject: [PATCH] Try to reproduce issue --- Makefile | 2 +- api/subscriptions/subscriptions_test.go | 42 +++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 62419c23d..3fd91f17f 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,7 @@ $(CURDIR)/bin/thor \ $(CURDIR)/bin/disco test:| go_version_check #@ Run the tests - @go test -cover $(PACKAGES) + @go test -v --count=1 -cover $(PACKAGES) test-coverage:| go_version_check #@ Run the tests with coverage @go test -race -coverprofile=coverage.out -covermode=atomic $(PACKAGES) diff --git a/api/subscriptions/subscriptions_test.go b/api/subscriptions/subscriptions_test.go index 589a6d58d..ee58114c0 100644 --- a/api/subscriptions/subscriptions_test.go +++ b/api/subscriptions/subscriptions_test.go @@ -30,10 +30,11 @@ var txPool *txpool.TxPool var repo *chain.Repository var blocks []*block.Block -func TestMain(t *testing.T) { +func TestSubs(t *testing.T) { initSubscriptionsServer(t) defer ts.Close() + testHandlePendingTransactions(t) testHandleSubjectWithBlock(t) testHandleSubjectWithEvent(t) testHandleSubjectWithTransfer(t) @@ -42,6 +43,43 @@ func TestMain(t *testing.T) { testHandleSubjectWithNonValidArgument(t) } +func testHandlePendingTransactions(t *testing.T) { + // This channel makes sure the new tx is notified to mempool subscribers + // and then to pendingTx as well so that websocket has the tx to read + txChan := make(chan *txpool.TxEvent) + sub := txPool.SubscribeTxEvent(txChan) + defer sub.Unsubscribe() + + u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/txpool"} + + conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) + assert.NoError(t, err) + defer func() { assert.NoError(t, conn.Close()) }() + + // Check the protocol upgrade to websocket + assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode) + assert.Equal(t, "Upgrade", resp.Header.Get("Connection")) + assert.Equal(t, "websocket", resp.Header.Get("Upgrade")) + + // Add a new tx to the mempool + transaction := createTx(t, repo, 1) + assert.NoError(t, txPool.AddLocal(transaction)) + + // Wait for the tx to be notified from mempool + <-txChan + + _, msg, err := conn.ReadMessage() + + assert.NoError(t, err) + + var pendingTx *PendingTxIDMessage + if err := json.Unmarshal(msg, &pendingTx); err != nil { + t.Fatal(err) + } else { + assert.Equal(t, transaction.ID(), pendingTx.ID) + } +} + func testHandleSubjectWithBlock(t *testing.T) { genesisBlock := blocks[0] queryArg := fmt.Sprintf("pos=%s", genesisBlock.Header().ID().String()) @@ -49,7 +87,7 @@ func testHandleSubjectWithBlock(t *testing.T) { conn, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) assert.NoError(t, err) - defer conn.Close() + defer func() { assert.NoError(t, conn.Close()) }() // Check the protocol upgrade to websocket assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)