Skip to content

Commit

Permalink
fix: add timeout for reads to prevent halts (#111)
Browse files Browse the repository at this point in the history
* fix: add timeout for reads to prevent halts

* fix: reconnect to another node when local is ahead

* added more logs
  • Loading branch information
javiersuweijie authored Mar 14, 2024
1 parent 7936b42 commit c22b45e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
10 changes: 9 additions & 1 deletion block_feed/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (ags *AggregateSubscription) Subscribe(rpcIndex int) (chan *BlockResult, er
// if not, the local blockchain is behind, in such case we would need to sync from Rpc.
if firstBlock := <-cWS; firstBlock.Block.Header.Height != ags.lastKnownBlock+1 {
log.Printf("[block_feed/aggregate] received the first block(%d), but local blockchain is at (%d)\n", firstBlock.Block.Header.Height, ags.lastKnownBlock)
if ags.lastKnownBlock >= firstBlock.Block.Header.Height {
log.Printf("[block_feed/aggregate] local blockchain is ahead, reconnecting to another node. \n")
ags.setSyncState(false)
ags.Close()
ags.Reconnect()
return ags.aggregateBlockChannel, nil
}
go func() {
go ags.rpc.SyncFromUntil(ags.lastKnownBlock+1, firstBlock.Block.Header.Height, rpcIndex)
for {
Expand All @@ -82,7 +89,7 @@ func (ags *AggregateSubscription) Subscribe(rpcIndex int) (chan *BlockResult, er
}
}

log.Printf("[block_feed/aggregate] switching to ws...")
log.Printf("[block_feed/aggregate] switching to ws at height %d...", ags.lastKnownBlock)

// patch ws to aggregate
for {
Expand All @@ -99,6 +106,7 @@ func (ags *AggregateSubscription) Subscribe(rpcIndex int) (chan *BlockResult, er
} else {
// if block feeder got upto this point,
// it is relatively safe that mantle is synced
log.Printf("[block_feed/aggregate] received block at height %d...", r.Block.Height)
ags.setSyncState(true)
ags.aggregateBlockChannel <- r
ags.lastKnownBlock = r.Block.Height
Expand Down
3 changes: 3 additions & 0 deletions block_feed/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"github.com/gorilla/websocket"
"log"
"time"
)

var _ BlockFeed = (*WSSubscription)(nil)
Expand Down Expand Up @@ -97,6 +98,8 @@ func handleInitialHandhake(ws *websocket.Conn) error {
func receiveBlockEvents(ws *websocket.Conn, c chan *BlockResult) {
defer close(c)
for {
// There should be a block every ~6s so 20 seconds would mean the connection is faulty
ws.SetReadDeadline(time.Now().Add(time.Second * 20))
_, message, err := ws.ReadMessage()

// if read message failed,
Expand Down

0 comments on commit c22b45e

Please sign in to comment.