Skip to content

Commit

Permalink
feat(abciclient): expire checktx abci calls on messages from p2p
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Jan 16, 2024
1 parent c39ef2c commit 052af38
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 24 deletions.
7 changes: 5 additions & 2 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ type requestAndResponse struct {
*types.Request
*types.Response

mtx sync.Mutex
mtx sync.Mutex
// context for the request; we check if it's not expired before sending
ctx context.Context
signal chan struct{}
}

Expand Down Expand Up @@ -90,10 +92,11 @@ func (reqResp *requestAndResponse) priority() (priority int8) {
return priority
}

func makeReqRes(req *types.Request) *requestAndResponse {
func makeReqRes(ctx context.Context, req *types.Request) *requestAndResponse {
return &requestAndResponse{
Request: req,
Response: nil,
ctx: ctx,
signal: make(chan struct{}),
}
}
Expand Down
48 changes: 36 additions & 12 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ import (
sync "github.com/sasha-s/go-deadlock"

"github.com/dashpay/tenderdash/abci/types"
tmsync "github.com/dashpay/tenderdash/internal/libs/sync"
"github.com/dashpay/tenderdash/libs/log"
tmnet "github.com/dashpay/tenderdash/libs/net"
"github.com/dashpay/tenderdash/libs/service"
)

const (
// Maximum number of requests stored in the request queue
RequestQueueSize = 1000
)

// This is goroutine-safe, but users should beware that the application in
// general is not meant to be interfaced with concurrent callers.
type socketClient struct {
Expand All @@ -31,8 +37,8 @@ type socketClient struct {

// Requests queue
reqQueue *prque.Prque[int8, *requestAndResponse]
// Signal emitted when new request is added to the queue.
reqSignal chan struct{}
// Wake up sender when new request is added to the queue.
reqWaker *tmsync.Waker

mtx sync.Mutex
err error
Expand All @@ -52,9 +58,9 @@ func NewSocketClient(logger log.Logger, addr string, mustConnect bool, metrics *
}

cli := &socketClient{
logger: logger,
reqQueue: prque.New[int8, *requestAndResponse](nil),
reqSignal: make(chan struct{}),
logger: logger,
reqQueue: prque.New[int8, *requestAndResponse](nil),
reqWaker: tmsync.NewWaker(),

mustConnect: mustConnect,
addr: addr,
Expand Down Expand Up @@ -141,14 +147,29 @@ func (cli *socketClient) dequeue() *requestAndResponse {
return reqres
}

func (cli *socketClient) reqQueueEmpty() bool {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.reqQueue.Empty()
}

func (cli *socketClient) sendRequestsRoutine(ctx context.Context, conn io.Writer) {
bw := bufio.NewWriter(conn)
for {
// wait for new message to arrive
select {
case <-ctx.Done():
return
case <-cli.reqSignal:
case <-cli.reqWaker.Sleep():
}

for !cli.reqQueueEmpty() {
reqres := cli.dequeue()
if err := reqres.ctx.Err(); err != nil {
// request expired, skip it
cli.logger.Debug("abci.socketClient request expired, skipping", "req", reqres.Request.Value, "error", err)
continue
}

// N.B. We must track request before sending it out, otherwise the
// server may reply before we do it, and the receiver will fail for an
Expand Down Expand Up @@ -236,15 +257,13 @@ func (cli *socketClient) doRequest(ctx context.Context, req *types.Request) (*ty
return nil, errors.New("client has stopped")
}

reqres := makeReqRes(req)
reqres := makeReqRes(ctx, req)
cli.enqueue(reqres)

select {
case cli.reqSignal <- struct{}{}:
case <-ctx.Done():
return nil, fmt.Errorf("can't queue req: %w", ctx.Err())
}
// Asynchronously wake up the sender.
cli.reqWaker.Wake()

// wait for response for our request
select {
case <-reqres.signal:
if err := cli.Error(); err != nil {
Expand All @@ -263,6 +282,11 @@ func (cli *socketClient) drainQueue() {
cli.mtx.Lock()
defer cli.mtx.Unlock()

if err := cli.reqWaker.Close(); err != nil {
cli.logger.Debug("abci.socketClient failed to close waker", "err", err)
}
cli.reqWaker = tmsync.NewWaker()

cli.reqQueue.Reset()
// mark all in-flight messages as resolved (they will get cli.Error())
for req := cli.reqSent.Front(); req != nil; req = req.Next() {
Expand Down
17 changes: 9 additions & 8 deletions internal/mempool/p2p_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ func (h *mempoolP2PMessageHandler) Handle(ctx context.Context, _ *client.Client,
known := 0
failed := 0
for _, tx := range protoTxs {
if err := h.checker.CheckTx(ctx, tx, nil, txInfo); err != nil {
subCtx, subCtxCancel := context.WithTimeout(ctx, 5*time.Second)
defer subCtxCancel()

if err := h.checker.CheckTx(subCtx, tx, nil, txInfo); err != nil {
if errors.Is(err, types.ErrTxInCache) {
// if the tx is in the cache,
// then we've been gossiped a
Expand All @@ -69,13 +72,11 @@ func (h *mempoolP2PMessageHandler) Handle(ctx context.Context, _ *client.Client,
known++
continue
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Do not propagate context
// cancellation errors, but do
// not continue to check
// transactions from this
// message if we are shutting down.
return err

// In case of ctx cancelation, we return error as we are most likely shutting down.
// Otherwise we just reject the tx.
if errCtx := ctx.Err(); errCtx != nil {
return errCtx
}
failed++
logger.Error("checktx failed for tx",
Expand Down
9 changes: 7 additions & 2 deletions internal/p2p/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,15 @@ func (c *Client) GetSyncStatus(ctx context.Context) error {
}

// SendTxs sends a transaction to the peer
func (c *Client) SendTxs(ctx context.Context, peerID types.NodeID, tx types.Tx) error {
func (c *Client) SendTxs(ctx context.Context, peerID types.NodeID, tx ...types.Tx) error {
txs := make([][]byte, len(tx))
for i := 0; i < len(tx); i++ {
txs[i] = tx[i]
}

return c.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protomem.Txs{Txs: [][]byte{tx}},
Message: &protomem.Txs{Txs: txs},
})
}

Expand Down

0 comments on commit 052af38

Please sign in to comment.