Skip to content

Commit

Permalink
Merge pull request #1 from vechain/master
Browse files Browse the repository at this point in the history
sync
  • Loading branch information
YeahNotSewerSide authored Jul 24, 2024
2 parents 5f6e8c0 + e6c91cb commit 0d2ce59
Show file tree
Hide file tree
Showing 27 changed files with 276 additions and 99 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/on-master-commit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ jobs:
"branch": "${{ github.ref }}",
"repository": "${{ github.repository }}",
"commit-author": "${{ github.event.head_commit.author.name }}",
"lint-status": "${{ needs.lint.result != 'success' && ':alert: Failure' || ':white_check_mark: Success' }}"
"license-check": "${{ needs.license-check.result != 'success' && ':alert: Failure' || ':white_check_mark: Success' }}"
"lint-status": "${{ needs.lint.result != 'success' && ':alert: Failure' || ':white_check_mark: Success' }}",
"license-check": "${{ needs.license-check.result != 'success' && ':alert: Failure' || ':white_check_mark: Success' }}",
"module-check": "${{ needs.module-check.result != 'success' && ':alert: Failure' || ':white_check_mark: Success' }}"
}
13 changes: 11 additions & 2 deletions api/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package events

import (
"context"
"fmt"
"net/http"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -54,19 +55,27 @@ func (e *Events) handleFilter(w http.ResponseWriter, req *http.Request) error {
return utils.BadRequest(errors.WithMessage(err, "body"))
}
if filter.Options != nil && filter.Options.Limit > e.limit {
return utils.Forbidden(errors.New("options.limit exceeds the maximum allowed value"))
return utils.Forbidden(fmt.Errorf("options.limit exceeds the maximum allowed value of %d", e.limit))
}
if filter.Options == nil {
// if filter.Options is nil, set to the default limit +1
// to detect whether there are more logs than the default limit
filter.Options = &logdb.Options{
Offset: 0,
Limit: e.limit,
Limit: e.limit + 1,
}
}

fes, err := e.filter(req.Context(), &filter)
if err != nil {
return err
}

// ensure the result size is less than the configured limit
if len(fes) > int(e.limit) {
return utils.Forbidden(fmt.Errorf("the number of filtered logs exceeds the maximum allowed value of %d, please use pagination", e.limit))
}

return utils.WriteJSON(w, fes)
}

Expand Down
14 changes: 11 additions & 3 deletions api/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestOption(t *testing.T) {
db := createDb(t)
initEventServer(t, db, 5)
defer ts.Close()
insertBlocks(t, db, 10)
insertBlocks(t, db, 5)

filter := events.EventFilter{
CriteriaSet: make([]*events.EventCriteria, 0),
Expand All @@ -73,22 +73,30 @@ func TestOption(t *testing.T) {
}

res, statusCode := httpPost(t, ts.URL+"/events", filter)
assert.Equal(t, "options.limit exceeds the maximum allowed value", strings.Trim(string(res), "\n"))
assert.Equal(t, "options.limit exceeds the maximum allowed value of 5", strings.Trim(string(res), "\n"))
assert.Equal(t, http.StatusForbidden, statusCode)

filter.Options.Limit = 5
_, statusCode = httpPost(t, ts.URL+"/events", filter)
assert.Equal(t, http.StatusOK, statusCode)

// with nil options, should use default limit
// with nil options, should use default limit, when the filtered lower
// or equal to the limit, should return the filtered events
filter.Options = nil
res, statusCode = httpPost(t, ts.URL+"/events", filter)
assert.Equal(t, http.StatusOK, statusCode)
var tLogs []*events.FilteredEvent
if err := json.Unmarshal(res, &tLogs); err != nil {
t.Fatal(err)
}
assert.Equal(t, http.StatusOK, statusCode)
assert.Equal(t, 5, len(tLogs))

// when the filtered events exceed the limit, should return the forbidden
insertBlocks(t, db, 6)
res, statusCode = httpPost(t, ts.URL+"/events", filter)
assert.Equal(t, http.StatusForbidden, statusCode)
assert.Equal(t, "the number of filtered logs exceeds the maximum allowed value of 5, please use pagination", strings.Trim(string(res), "\n"))
}

// Test functions
Expand Down
25 changes: 20 additions & 5 deletions api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import (
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/vechain/thor/v2/metrics"
)

var (
metricHttpReqCounter = metrics.LazyLoadCounterVec("api_request_count", []string{"name", "code", "method"})
metricHttpReqDuration = metrics.LazyLoadHistogramVec("api_duration_ms", []string{"name", "code", "method"}, metrics.BucketHTTPReqs)
metricHttpReqCounter = metrics.LazyLoadCounterVec("api_request_count", []string{"name", "code", "method"})
metricHttpReqDuration = metrics.LazyLoadHistogramVec("api_duration_ms", []string{"name", "code", "method"}, metrics.BucketHTTPReqs)
metricsActiveWebsocketCount = metrics.LazyLoadGaugeVec("api_active_websocket_count", []string{"subject"})
)

// metricsResponseWriter is a wrapper around http.ResponseWriter that captures the status code.
Expand Down Expand Up @@ -58,22 +60,35 @@ func metricsMiddleware(next http.Handler) http.Handler {
rt := mux.CurrentRoute(r)

var (
enabled = false
name = ""
enabled = false
name = ""
subscription = ""
)

// all named route will be recorded
if rt != nil && rt.GetName() != "" {
enabled = true
name = rt.GetName()
if strings.HasPrefix(name, "subscriptions") {
// example path: /subscriptions/txpool -> subject = txpool
paths := strings.Split(r.URL.Path, "/")
if len(paths) > 2 {
subscription = paths[2]
}
}
}

now := time.Now()
mrw := newMetricsResponseWriter(w)
if subscription != "" {
metricsActiveWebsocketCount().AddWithLabel(1, map[string]string{"subject": subscription})
}

next.ServeHTTP(mrw, r)

if enabled {
if subscription != "" {
metricsActiveWebsocketCount().AddWithLabel(-1, map[string]string{"subject": subscription})
} else if enabled {
metricHttpReqCounter().AddWithLabel(1, map[string]string{"name": name, "code": strconv.Itoa(mrw.statusCode), "method": r.Method})
metricHttpReqDuration().ObserveWithLabels(time.Since(now).Milliseconds(), map[string]string{"name": name, "code": strconv.Itoa(mrw.statusCode), "method": r.Method})
}
Expand Down
88 changes: 88 additions & 0 deletions api/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,25 @@ import (
"math"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/vechain/thor/v2/api/accounts"
"github.com/vechain/thor/v2/api/subscriptions"
"github.com/vechain/thor/v2/chain"
"github.com/vechain/thor/v2/cmd/thor/solo"
"github.com/vechain/thor/v2/genesis"
"github.com/vechain/thor/v2/metrics"
"github.com/vechain/thor/v2/muxdb"
"github.com/vechain/thor/v2/state"
"github.com/vechain/thor/v2/thor"
"github.com/vechain/thor/v2/txpool"
)

func init() {
Expand Down Expand Up @@ -103,6 +109,88 @@ func TestMetricsMiddleware(t *testing.T) {
assert.Equal(t, "accounts_get_account", labels[2].GetValue())
}

func TestWebsocketMetrics(t *testing.T) {
db := muxdb.NewMem()
stater := state.NewStater(db)
gene := genesis.NewDevnet()

b, _, _, err := gene.Build(stater)
if err != nil {
t.Fatal(err)
}
repo, _ := chain.NewRepository(db, b)

router := mux.NewRouter()
sub := subscriptions.New(repo, []string{"*"}, 10, txpool.New(repo, stater, txpool.Options{}))
sub.Mount(router, "/subscriptions")
router.PathPrefix("/metrics").Handler(metrics.HTTPHandler())
router.Use(metricsMiddleware)
ts := httptest.NewServer(router)

// initiate 1 beat subscription, active websocket should be 1
u := url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/beat"}
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
assert.Nil(t, err)

body, _ := httpGet(t, ts.URL+"/metrics")
parser := expfmt.TextParser{}
metrics, err := parser.TextToMetricFamilies(bytes.NewReader(body))
assert.Nil(t, err)

m := metrics["thor_metrics_api_active_websocket_count"].GetMetric()
assert.Equal(t, 1, len(m), "should be 1 metric entries")
assert.Equal(t, float64(1), m[0].GetGauge().GetValue())

labels := m[0].GetLabel()
assert.Equal(t, "subject", labels[0].GetName())
assert.Equal(t, "beat", labels[0].GetValue())

// initiate 1 beat subscription, active websocket should be 2
_, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
assert.Nil(t, err)

body, _ = httpGet(t, ts.URL+"/metrics")
metrics, err = parser.TextToMetricFamilies(bytes.NewReader(body))
assert.Nil(t, err)

m = metrics["thor_metrics_api_active_websocket_count"].GetMetric()
assert.Equal(t, 1, len(m), "should be 1 metric entries")
assert.Equal(t, float64(2), m[0].GetGauge().GetValue())

// close 1 beat subscription, active websocket should be 1
conn.Close()
// ensure close is done
<-time.After(100 * time.Millisecond)

body, _ = httpGet(t, ts.URL+"/metrics")
metrics, err = parser.TextToMetricFamilies(bytes.NewReader(body))
assert.Nil(t, err)

m = metrics["thor_metrics_api_active_websocket_count"].GetMetric()
assert.Equal(t, 1, len(m), "should be 1 metric entries")
assert.Equal(t, float64(1), m[0].GetGauge().GetValue())

// initiate 1 block subscription, active websocket should be 2
u = url.URL{Scheme: "ws", Host: strings.TrimPrefix(ts.URL, "http://"), Path: "/subscriptions/block"}
_, _, err = websocket.DefaultDialer.Dial(u.String(), nil)
assert.Nil(t, err)

body, _ = httpGet(t, ts.URL+"/metrics")
metrics, err = parser.TextToMetricFamilies(bytes.NewReader(body))
assert.Nil(t, err)

m = metrics["thor_metrics_api_active_websocket_count"].GetMetric()
assert.Equal(t, 2, len(m), "should be 2 metric entries")
// both m[0] and m[1] should have the value of 1
assert.Equal(t, float64(1), m[0].GetGauge().GetValue())
assert.Equal(t, float64(1), m[1].GetGauge().GetValue())

// m[1] should have the subject of block
labels = m[1].GetLabel()
assert.Equal(t, "subject", labels[0].GetName())
assert.Equal(t, "block", labels[0].GetValue())
}

func httpGet(t *testing.T, url string) ([]byte, int) {
res, err := http.Get(url) // nolint:gosec
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (s *Subscriptions) Mount(root *mux.Router, pathPrefix string) {
Methods(http.MethodGet).
Name("subscriptions_pending_tx").
HandlerFunc(utils.WrapHandlerFunc(s.handlePendingTransactions))
sub.Path("/{subject}").
sub.Path("/{subject:beat|beat2|block|event|transfer}").
Methods(http.MethodGet).
Name("subscriptions_subject").
HandlerFunc(utils.WrapHandlerFunc(s.handleSubject))
Expand Down
13 changes: 11 additions & 2 deletions api/transfers/transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package transfers

import (
"context"
"fmt"
"net/http"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -60,19 +61,27 @@ func (t *Transfers) handleFilterTransferLogs(w http.ResponseWriter, req *http.Re
return utils.BadRequest(errors.WithMessage(err, "body"))
}
if filter.Options != nil && filter.Options.Limit > t.limit {
return utils.Forbidden(errors.New("options.limit exceeds the maximum allowed value"))
return utils.Forbidden(fmt.Errorf("options.limit exceeds the maximum allowed value of %d", t.limit))
}
if filter.Options == nil {
// if filter.Options is nil, set to the default limit +1
// to detect whether there are more logs than the default limit
filter.Options = &logdb.Options{
Offset: 0,
Limit: t.limit,
Limit: t.limit + 1,
}
}

tLogs, err := t.filter(req.Context(), &filter)
if err != nil {
return err
}

// ensure the result size is less than the configured limit
if len(tLogs) > int(t.limit) {
return utils.Forbidden(fmt.Errorf("the number of filtered logs exceeds the maximum allowed value of %d, please use pagination", t.limit))
}

return utils.WriteJSON(w, tLogs)
}

Expand Down
14 changes: 11 additions & 3 deletions api/transfers/transfers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestOption(t *testing.T) {
db := createDb(t)
initTransferServer(t, db, 5)
defer ts.Close()
insertBlocks(t, db, 10)
insertBlocks(t, db, 5)

filter := transfers.TransferFilter{
CriteriaSet: make([]*logdb.TransferCriteria, 0),
Expand All @@ -68,22 +68,30 @@ func TestOption(t *testing.T) {
}

res, statusCode := httpPost(t, ts.URL+"/transfers", filter)
assert.Equal(t, "options.limit exceeds the maximum allowed value", strings.Trim(string(res), "\n"))
assert.Equal(t, "options.limit exceeds the maximum allowed value of 5", strings.Trim(string(res), "\n"))
assert.Equal(t, http.StatusForbidden, statusCode)

filter.Options.Limit = 5
_, statusCode = httpPost(t, ts.URL+"/transfers", filter)
assert.Equal(t, http.StatusOK, statusCode)

// with nil options, should use default limit
// with nil options, should use default limit, when the filtered lower
// or equal to the limit, should return the filtered transfers
filter.Options = nil
res, statusCode = httpPost(t, ts.URL+"/transfers", filter)
assert.Equal(t, http.StatusOK, statusCode)
var tLogs []*events.FilteredEvent
if err := json.Unmarshal(res, &tLogs); err != nil {
t.Fatal(err)
}
assert.Equal(t, http.StatusOK, statusCode)
assert.Equal(t, 5, len(tLogs))

// when the filtered transfers exceed the limit, should return the forbidden
insertBlocks(t, db, 6)
res, statusCode = httpPost(t, ts.URL+"/transfers", filter)
assert.Equal(t, http.StatusForbidden, statusCode)
assert.Equal(t, "the number of filtered logs exceeds the maximum allowed value of 5, please use pagination", strings.Trim(string(res), "\n"))
}

// Test functions
Expand Down
2 changes: 1 addition & 1 deletion cmd/disco/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
Name: "netrestrict",
Usage: "restrict network communication to the given IP networks (CIDR masks)",
}
verbosityFlag = cli.IntFlag{
verbosityFlag = cli.UintFlag{
Name: "verbosity",
Value: log.LegacyLevelWarn,
Usage: "log verbosity (0-9)",
Expand Down
9 changes: 6 additions & 3 deletions cmd/disco/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/pkg/errors"
"github.com/vechain/thor/v2/log"
cli "gopkg.in/urfave/cli.v1"
)

Expand All @@ -38,7 +37,11 @@ var (
)

func run(ctx *cli.Context) error {
initLogger(ctx)
lvl, err := readIntFromUInt64Flag(ctx.Uint64(verbosityFlag.Name))
if err != nil {
return errors.Wrap(err, "parse verbosity flag")
}
initLogger(lvl)

natm, err := nat.Parse(ctx.String("nat"))
if err != nil {
Expand Down Expand Up @@ -89,7 +92,7 @@ func run(ctx *cli.Context) error {
if err != nil {
return err
}
log.Info("Running", "URI", net.Self().String())
fmt.Println("Running", net.Self().String())

select {}
}
Expand Down
Loading

0 comments on commit 0d2ce59

Please sign in to comment.