Skip to content

Commit

Permalink
Merge pull request #413 from qianbin/ws-ping-pong
Browse files Browse the repository at this point in the history
api/subscriptions: periodically ping client to prevent conn drop
  • Loading branch information
qianbin authored Nov 10, 2020
2 parents b13810f + a4a84e3 commit 7fa8d36
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions api/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package subscriptions
import (
"net/http"
"sync"
"time"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -35,6 +36,13 @@ var (
log = log15.New("pkg", "subscriptions")
)

const (
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 7) / 10
)

func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32) *Subscriptions {
return &Subscriptions{
backtraceLimit: backtraceLimit,
Expand Down Expand Up @@ -200,6 +208,11 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
s.wg.Add(1)
go func() {
defer s.wg.Done()
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
if _, _, err := conn.ReadMessage(); err != nil {
log.Debug("websocket read err", "err", err)
Expand All @@ -209,6 +222,8 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
}
}()
ticker := s.repo.NewTicker()
pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()
for {
msgs, hasMore, err := reader.Read()
if err != nil {
Expand All @@ -219,21 +234,25 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader) error {
return err
}
}
if !hasMore {
if hasMore {
select {
case <-s.done:
return nil
case <-closed:
return nil
case <-ticker.C():
case <-pingTicker.C:
conn.WriteMessage(websocket.PingMessage, nil)
default:
}
} else {
select {
case <-s.done:
return nil
case <-closed:
return nil
default:
case <-ticker.C():
case <-pingTicker.C:
conn.WriteMessage(websocket.PingMessage, nil)
}
}
}
Expand Down

0 comments on commit 7fa8d36

Please sign in to comment.