diff --git a/abci/client/client.go b/abci/client/client.go index 19c6a7bf61..5f74015272 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -51,7 +51,9 @@ type requestAndResponse struct { *types.Request *types.Response - mtx sync.Mutex + mtx sync.Mutex + // context for the request; we check if it's not expired before sending + ctx context.Context signal chan struct{} } @@ -90,10 +92,11 @@ func (reqResp *requestAndResponse) priority() (priority int8) { return priority } -func makeReqRes(req *types.Request) *requestAndResponse { +func makeReqRes(ctx context.Context, req *types.Request) *requestAndResponse { return &requestAndResponse{ Request: req, Response: nil, + ctx: ctx, signal: make(chan struct{}), } } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index a96fe7977d..7aa7deedf2 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -14,11 +14,17 @@ import ( sync "github.com/sasha-s/go-deadlock" "github.com/dashpay/tenderdash/abci/types" + tmsync "github.com/dashpay/tenderdash/internal/libs/sync" "github.com/dashpay/tenderdash/libs/log" tmnet "github.com/dashpay/tenderdash/libs/net" "github.com/dashpay/tenderdash/libs/service" ) +const ( + // Maximum number of requests stored in the request queue + RequestQueueSize = 1000 +) + // This is goroutine-safe, but users should beware that the application in // general is not meant to be interfaced with concurrent callers. type socketClient struct { @@ -31,8 +37,8 @@ type socketClient struct { // Requests queue reqQueue *prque.Prque[int8, *requestAndResponse] - // Signal emitted when new request is added to the queue. - reqSignal chan struct{} + // Wake up sender when new request is added to the queue. + reqWaker *tmsync.Waker mtx sync.Mutex err error @@ -52,9 +58,9 @@ func NewSocketClient(logger log.Logger, addr string, mustConnect bool, metrics * } cli := &socketClient{ - logger: logger, - reqQueue: prque.New[int8, *requestAndResponse](nil), - reqSignal: make(chan struct{}), + logger: logger, + reqQueue: prque.New[int8, *requestAndResponse](nil), + reqWaker: tmsync.NewWaker(), mustConnect: mustConnect, addr: addr, @@ -141,14 +147,29 @@ func (cli *socketClient) dequeue() *requestAndResponse { return reqres } +func (cli *socketClient) reqQueueEmpty() bool { + cli.mtx.Lock() + defer cli.mtx.Unlock() + return cli.reqQueue.Empty() +} + func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer) { bw := bufio.NewWriter(conn) for { + // wait for new message to arrive select { case <-ctx.Done(): return - case <-cli.reqSignal: + case <-cli.reqWaker.Sleep(): + } + + for !cli.reqQueueEmpty() { reqres := cli.dequeue() + if err := reqres.ctx.Err(); err != nil { + // request expired, skip it + cli.logger.Debug("abci.socketClient request expired, skipping", "req", reqres.Request.Value, "error", err) + continue + } // N.B. We must track request before sending it out, otherwise the // server may reply before we do it, and the receiver will fail for an @@ -236,15 +257,13 @@ func (cli *socketClient) doRequest(ctx context.Context, req *types.Request) (*ty return nil, errors.New("client has stopped") } - reqres := makeReqRes(req) + reqres := makeReqRes(ctx, req) cli.enqueue(reqres) - select { - case cli.reqSignal <- struct{}{}: - case <-ctx.Done(): - return nil, fmt.Errorf("can't queue req: %w", ctx.Err()) - } + // Asynchronously wake up the sender. + cli.reqWaker.Wake() + // wait for response for our request select { case <-reqres.signal: if err := cli.Error(); err != nil { @@ -263,6 +282,11 @@ func (cli *socketClient) drainQueue() { cli.mtx.Lock() defer cli.mtx.Unlock() + if err := cli.reqWaker.Close(); err != nil { + cli.logger.Debug("abci.socketClient failed to close waker", "err", err) + } + cli.reqWaker = tmsync.NewWaker() + cli.reqQueue.Reset() // mark all in-flight messages as resolved (they will get cli.Error()) for req := cli.reqSent.Front(); req != nil; req = req.Next() { diff --git a/internal/mempool/p2p_msg_handler.go b/internal/mempool/p2p_msg_handler.go index f449be5f11..1da679fedf 100644 --- a/internal/mempool/p2p_msg_handler.go +++ b/internal/mempool/p2p_msg_handler.go @@ -58,7 +58,10 @@ func (h *mempoolP2PMessageHandler) Handle(ctx context.Context, _ *client.Client, known := 0 failed := 0 for _, tx := range protoTxs { - if err := h.checker.CheckTx(ctx, tx, nil, txInfo); err != nil { + subCtx, subCtxCancel := context.WithTimeout(ctx, 5*time.Second) + defer subCtxCancel() + + if err := h.checker.CheckTx(subCtx, tx, nil, txInfo); err != nil { if errors.Is(err, types.ErrTxInCache) { // if the tx is in the cache, // then we've been gossiped a @@ -69,13 +72,11 @@ func (h *mempoolP2PMessageHandler) Handle(ctx context.Context, _ *client.Client, known++ continue } - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Do not propagate context - // cancellation errors, but do - // not continue to check - // transactions from this - // message if we are shutting down. - return err + + // In case of ctx cancelation, we return error as we are most likely shutting down. + // Otherwise we just reject the tx. + if errCtx := ctx.Err(); errCtx != nil { + return errCtx } failed++ logger.Error("checktx failed for tx", diff --git a/internal/p2p/client/client.go b/internal/p2p/client/client.go index fffc4a36df..3ea4009090 100644 --- a/internal/p2p/client/client.go +++ b/internal/p2p/client/client.go @@ -224,10 +224,15 @@ func (c *Client) GetSyncStatus(ctx context.Context) error { } // SendTxs sends a transaction to the peer -func (c *Client) SendTxs(ctx context.Context, peerID types.NodeID, tx types.Tx) error { +func (c *Client) SendTxs(ctx context.Context, peerID types.NodeID, tx ...types.Tx) error { + txs := make([][]byte, len(tx)) + for i := 0; i < len(tx); i++ { + txs[i] = tx[i] + } + return c.Send(ctx, p2p.Envelope{ To: peerID, - Message: &protomem.Txs{Txs: [][]byte{tx}}, + Message: &protomem.Txs{Txs: txs}, }) }