Skip to content

Commit

Permalink
Disconnect ws connections exceeding limit of requests
Browse files Browse the repository at this point in the history
  • Loading branch information
martinboehm committed Nov 29, 2023
1 parent 08389b5 commit 3c29b07
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 11 deletions.
2 changes: 1 addition & 1 deletion api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ func (w *Worker) GetAddress(address string, page int, txsOnPage int, option Acco
if ed.contractInfo != nil && ed.contractInfo.Type == bchain.ERC20TokenType {
r.Erc20Contract = ed.contractInfo
}
glog.Info("GetAddress ", address, ", ", time.Since(start))
glog.Info("GetAddress-", option, " ", address, ", ", time.Since(start))
return r, nil
}

Expand Down
7 changes: 7 additions & 0 deletions blockbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"runtime/debug"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -505,6 +506,12 @@ func newInternalState(config *common.Config, d *db.RocksDB, enableSubNewTx bool)
}
is.Host = name
}

is.WsGetAccountInfoLimit, _ = strconv.Atoi(os.Getenv(strings.ToUpper(is.CoinShortcut) + "_WS_GETACCOUNTINFO_LIMIT"))
if is.WsGetAccountInfoLimit > 0 {
glog.Info("WsGetAccountInfoLimit enabled with limit ", is.WsGetAccountInfoLimit)
is.WsLimitExceedingIPs = make(map[string]int)
}
return is, nil
}

Expand Down
16 changes: 16 additions & 0 deletions common/internalstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type InternalState struct {
BlockGolombFilterP uint8 `json:"block_golomb_filter_p"`
BlockFilterScripts string `json:"block_filter_scripts"`
BlockFilterUseZeroedKey bool `json:"block_filter_use_zeroed_key"`

// allowed number of fetched accounts over websocket
WsGetAccountInfoLimit int `json:"-"`
WsLimitExceedingIPs map[string]int `json:"-"`
}

// StartedSync signals start of synchronization
Expand Down Expand Up @@ -341,3 +345,15 @@ func SetInShutdown() {
func IsInShutdown() bool {
return atomic.LoadInt32(&inShutdown) != 0
}

func (is *InternalState) AddWsLimitExceedingIP(ip string) {
is.mux.Lock()
defer is.mux.Unlock()
is.WsLimitExceedingIPs[ip] = is.WsLimitExceedingIPs[ip] + 1
}

func (is *InternalState) ResetWsLimitExceedingIPs() {
is.mux.Lock()
defer is.mux.Unlock()
is.WsLimitExceedingIPs = make(map[string]int)
}
29 changes: 29 additions & 0 deletions server/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"html/template"
"net/http"
"path/filepath"
"sort"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewInternalServer(binding, certFiles string, db *db.RocksDB, chain bchain.B
serveMux.HandleFunc(path+"metrics", promhttp.Handler().ServeHTTP)
serveMux.HandleFunc(path, s.index)
serveMux.HandleFunc(path+"admin", s.htmlTemplateHandler(s.adminIndex))
serveMux.HandleFunc(path+"admin/ws-limit-exceeding-ips", s.htmlTemplateHandler(s.wsLimitExceedingIPs))
if s.chainParser.GetChainType() == bchain.ChainEthereumType {
serveMux.HandleFunc(path+"admin/internal-data-errors", s.htmlTemplateHandler(s.internalDataErrors))
}
Expand Down Expand Up @@ -115,10 +117,17 @@ func (s *InternalServer) index(w http.ResponseWriter, r *http.Request) {
const (
adminIndexTpl = iota + errorInternalTpl + 1
adminInternalErrorsTpl
adminLimitExceedingIPS

internalTplCount
)

// WsLimitExceedingIP is used to transfer data to the templates
type WsLimitExceedingIP struct {
IP string
Count int
}

// InternalTemplateData is used to transfer data to the templates
type InternalTemplateData struct {
CoinName string
Expand All @@ -128,6 +137,8 @@ type InternalTemplateData struct {
Error *api.APIError
InternalDataErrors []db.BlockInternalDataError
RefetchingInternalData bool
WsGetAccountInfoLimit int
WsLimitExceedingIPs []WsLimitExceedingIP
}

func (s *InternalServer) newTemplateData(r *http.Request) *InternalTemplateData {
Expand Down Expand Up @@ -161,6 +172,7 @@ func (s *InternalServer) parseTemplates() []*template.Template {
t[errorInternalTpl] = createTemplate("./static/internal_templates/error.html", "./static/internal_templates/base.html")
t[adminIndexTpl] = createTemplate("./static/internal_templates/index.html", "./static/internal_templates/base.html")
t[adminInternalErrorsTpl] = createTemplate("./static/internal_templates/block_internal_data_errors.html", "./static/internal_templates/base.html")
t[adminLimitExceedingIPS] = createTemplate("./static/internal_templates/ws_limit_exceeding_ips.html", "./static/internal_templates/base.html")
return t
}

Expand All @@ -185,3 +197,20 @@ func (s *InternalServer) internalDataErrors(w http.ResponseWriter, r *http.Reque
data.RefetchingInternalData = s.api.IsRefetchingInternalData()
return adminInternalErrorsTpl, data, nil
}

func (s *InternalServer) wsLimitExceedingIPs(w http.ResponseWriter, r *http.Request) (tpl, *InternalTemplateData, error) {
if r.Method == http.MethodPost {
s.is.ResetWsLimitExceedingIPs()
}
data := s.newTemplateData(r)
ips := make([]WsLimitExceedingIP, 0, len(s.is.WsLimitExceedingIPs))
for k, v := range s.is.WsLimitExceedingIPs {
ips = append(ips, WsLimitExceedingIP{k, v})
}
sort.Slice(ips, func(i, j int) bool {
return ips[i].Count > ips[j].Count
})
data.WsLimitExceedingIPs = ips
data.WsGetAccountInfoLimit = s.is.WsGetAccountInfoLimit
return adminLimitExceedingIPS, data, nil
}
44 changes: 34 additions & 10 deletions server/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ var (
)

type websocketChannel struct {
id uint64
conn *websocket.Conn
out chan *WsRes
ip string
requestHeader http.Header
alive bool
aliveLock sync.Mutex
addrDescs []string // subscribed address descriptors as strings
id uint64
conn *websocket.Conn
out chan *WsRes
ip string
requestHeader http.Header
alive bool
aliveLock sync.Mutex
addrDescs []string // subscribed address descriptors as strings
getAddressInfoDescriptorsMux sync.Mutex
getAddressInfoDescriptors map[string]struct{}
}

// WebsocketServer is a handle to websocket server
Expand Down Expand Up @@ -112,7 +114,11 @@ func checkOrigin(r *http.Request) bool {
}

func getIP(r *http.Request) string {
ip := r.Header.Get("X-Real-Ip")
ip := r.Header.Get("cf-connecting-ip")
if ip != "" {
return ip
}
ip = r.Header.Get("X-Real-Ip")
if ip != "" {
return ip
}
Expand All @@ -138,6 +144,9 @@ func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
requestHeader: r.Header,
alive: true,
}
if s.is.WsGetAccountInfoLimit > 0 {
c.getAddressInfoDescriptors = make(map[string]struct{})
}
go s.inputLoop(c)
go s.outputLoop(c)
s.onConnect(c)
Expand All @@ -148,11 +157,13 @@ func (s *WebsocketServer) GetHandler() http.Handler {
return s
}

func (s *WebsocketServer) closeChannel(c *websocketChannel) {
func (s *WebsocketServer) closeChannel(c *websocketChannel) bool {
if c.CloseOut() {
c.conn.Close()
s.onDisconnect(c)
return true
}
return false
}

func (c *websocketChannel) CloseOut() bool {
Expand Down Expand Up @@ -259,6 +270,19 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe
"getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r, err := unmarshalGetAccountInfoRequest(req.Params)
if err == nil {
if s.is.WsGetAccountInfoLimit > 0 {
c.getAddressInfoDescriptorsMux.Lock()
c.getAddressInfoDescriptors[r.Descriptor] = struct{}{}
l := len(c.getAddressInfoDescriptors)
c.getAddressInfoDescriptorsMux.Unlock()
if l > s.is.WsGetAccountInfoLimit {
if s.closeChannel(c) {
glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip)
s.is.AddWsLimitExceedingIP(c.ip)
}
return
}
}
rv, err = s.getAccountInfo(r)
}
return
Expand Down
3 changes: 3 additions & 0 deletions static/internal_templates/index.html
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{{define "specific"}}
<div class="row">
<div class="col"><a href="/admin/ws-limit-exceeding-ips">IP addresses that exceeded websocket usage limit</a></div>
</div>
{{if eq .ChainType 1}}
<div class="row">
<div class="col"><a href="/admin/internal-data-errors">Internal Data Errors</a></div>
Expand Down
29 changes: 29 additions & 0 deletions static/internal_templates/ws_limit_exceeding_ips.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{{define "specific"}}
<h3>IP addresses disconnected for exceeding websocket limit</h3>
<div class="row g-0">
<div class="col-md-11">Distinct ip addresses that exceeded limit of {{.WsGetAccountInfoLimit}} requests since last reset: {{len .WsLimitExceedingIPs}}</div>
<div class="col-md-1 justify-content-right">
<form method="POST" action="/admin/ws-limit-exceeding-ips">
<button type="submit" class="btn btn-outline-secondary">Reset</button>
</form>
</div>
</row>
<div>
<table class="table table-hover">
<thead>
<tr>
<th>IP</th>
<th>Count</th>
</tr>
</thead>
<tbody>
{{range $d := .WsLimitExceedingIPs}}
<tr>
<td>{{$d.IP}}</a></td>
<td>{{$d.Count}}</td>
</tr>
{{end}}
</tbody>
</table>
</div>
{{end}}

0 comments on commit 3c29b07

Please sign in to comment.