Skip to content

Commit

Permalink
feat: Add ABCIMethodLatency histogram [BLO-791] (#26)
Browse files Browse the repository at this point in the history
* Add ABCIMethodLatency Histogram

* instrument Prepare/ProcessProposal

* testing

* integration tests

* fix docker-build

* report latencies in all cases

* extra tests
  • Loading branch information
nivasan1 authored Jan 23, 2024
1 parent efc7ea8 commit 2b0af7d
Show file tree
Hide file tree
Showing 13 changed files with 316 additions and 125 deletions.
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
go.work*
go.sum
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ To see all network metrics, open a new terminal and run the following command an
$ make run-prom-client
```

## Metrics

### Oracle Service Metrics

* metrics relevant to the oracle service's health + operation are [here](./oracle/metrics/README.md)
* metrics relevant to the operation / health of the oracle's providers are [here](./providers/base/metrics/README.md)

### Oracle Application / Network Metrics

* metrics relevant to the network's (that is running the instance slinky) performance are [here](./service/metrics/README.md)

## Future Work

The oracle side car is a combination of the oracle and provider packages. This is being moved to a [separate repository](https://github.com/skip-mev/slinky-sidecar).
68 changes: 41 additions & 27 deletions abci/proposals/proposals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package proposals
import (
"bytes"
"fmt"

"github.com/cometbft/cometbft/types/time"
"time"

servicemetrics "github.com/skip-mev/slinky/service/metrics"

Expand Down Expand Up @@ -93,11 +92,24 @@ func NewProposalHandler(
// enabled, the handler will inject the extended commit info into the proposal.
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req *cometabci.RequestPrepareProposal) (*cometabci.ResponsePrepareProposal, error) {
startTime := time.Now()
var (
extInfoBz []byte
err error
extInfoBz []byte
err error
wrappedPrepareProposalLatency time.Duration
)
startTime := time.Now()

// report the slinky specific PrepareProposal latency
defer func() {
totalLatency := time.Since(startTime)
h.logger.Info(
"recording handle time metrics of prepare-proposal (seconds)",
"total latency", totalLatency.Seconds(),
"wrapped prepare proposal latency", wrappedPrepareProposalLatency.Seconds(),
"slinky prepare proposal latency", (totalLatency - wrappedPrepareProposalLatency).Seconds(),
)
h.metrics.ObserveABCIMethodLatency(servicemetrics.PrepareProposal, totalLatency-wrappedPrepareProposalLatency)
}()

if req == nil {
h.logger.Error("PrepareProposalHandler received a nil request")
Expand Down Expand Up @@ -154,29 +166,19 @@ func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
}
}

prepareBlockStart := time.Now()
// Build the proposal.
// Build the proposal. Get the duration that the wrapped prepare proposal handler executed for.
wrappedPrepareProposalStartTime := time.Now()
resp, err := h.prepareProposalHandler(ctx, req)
wrappedPrepareProposalLatency = time.Since(wrappedPrepareProposalStartTime)
if err != nil {
h.logger.Error("failed to prepare proposal", "err", err)
return &cometabci.ResponsePrepareProposal{Txs: make([][]byte, 0)}, err
}
prepareBlockEnd := time.Now()
h.logger.Info("wrapped prepareProposalHandler produced response ", "txs", len(resp.Txs))

// Inject our VE Tx ( if extInfoBz is non-empty), and resize our response Txs to respect req.MaxTxBytes
resp.Txs = h.injectAndResize(resp.Txs, extInfoBz, req.MaxTxBytes+int64(len(extInfoBz)))

// Record total time--not including h.prepareProposalHandler
preHandleTime := prepareBlockStart.Sub(startTime)
postHandleTime := time.Now().Sub(prepareBlockEnd)
h.logger.Info(
"recording handle time metrics",
"pre_handle_time", preHandleTime,
"post_handle_time", postHandleTime,
"total_time", preHandleTime+postHandleTime,
)
h.metrics.ObservePrepareProposalTime(preHandleTime + postHandleTime)
h.logger.Info(
"prepared proposal",
"txs", len(resp.Txs),
Expand Down Expand Up @@ -223,18 +225,26 @@ func (h *ProposalHandler) injectAndResize(appTxs [][]byte, injectTx []byte, maxS
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *cometabci.RequestProcessProposal) (*cometabci.ResponseProcessProposal, error) {
start := time.Now()
if req == nil {
h.logger.Error("ProcessProposalHandler received a nil request")
return nil, fmt.Errorf("received a nil request")
}
var wrappedProcessProposalLatency time.Duration

// Defer a function to record the total time it took to process the proposal.
defer func() {
processDuration := time.Now().Sub(start)
totalLatency := time.Since(start)
h.logger.Info(
"recording process proposal time",
"duration", processDuration,
"recording handle time metrics of process-proposal (seconds)",
"total latency", totalLatency.Seconds(),
"wrapped prepare proposal latency", wrappedProcessProposalLatency.Seconds(),
"slinky prepare proposal latency", (totalLatency - wrappedProcessProposalLatency).Seconds(),
)
h.metrics.ObserveProcessProposalTime(processDuration)
h.metrics.ObserveABCIMethodLatency(servicemetrics.ProcessProposal, totalLatency-wrappedProcessProposalLatency)
}()

// this should never happen, but just in case
if req == nil {
h.logger.Error("ProcessProposalHandler received a nil request")
return nil, fmt.Errorf("received a nil request")
}

voteExtensionsEnabled := ve.VoteExtensionsEnabled(ctx)

h.logger.Info(
Expand Down Expand Up @@ -276,6 +286,10 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
req.Txs = req.Txs[NumInjectedTxs:]
}

return h.processProposalHandler(ctx, req)
// call the wrapped process-proposal
wrappedProcessProposalStartTime := time.Now()
resp, err := h.processProposalHandler(ctx, req)
wrappedProcessProposalLatency = time.Since(wrappedProcessProposalStartTime)
return resp, err
}
}
153 changes: 153 additions & 0 deletions abci/proposals/proposals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/big"
"testing"
"time"

servicemetrics "github.com/skip-mev/slinky/service/metrics"

Expand All @@ -23,6 +24,7 @@ import (
currencypairmocks "github.com/skip-mev/slinky/abci/strategies/currencypair/mocks"
"github.com/skip-mev/slinky/abci/testutils"
"github.com/skip-mev/slinky/abci/ve"
servicemetricsmocks "github.com/skip-mev/slinky/service/metrics/mocks"
oracletypes "github.com/skip-mev/slinky/x/oracle/types"
)

Expand Down Expand Up @@ -854,6 +856,157 @@ func (s *ProposalsTestSuite) TestProcessProposal() {
}
}

func (s *ProposalsTestSuite) TestProposalLatency() {
// check that no latency is reported for a failed PrepareProposal
metricsmocks := servicemetricsmocks.NewMetrics(s.T())

// check that latency reported in upstream logic is ignored
s.Run("wrapped prepare proposal latency is ignored", func() {
propHandler := proposals.NewProposalHandler(
log.NewTestLogger(s.T()),
func(ctx sdk.Context, rpp *cometabci.RequestPrepareProposal) (*cometabci.ResponsePrepareProposal, error) {
// simulate a long-running prepare proposal
time.Sleep(200 * time.Millisecond)
return &cometabci.ResponsePrepareProposal{
Txs: nil,
}, nil
},
nil,
func(ctx sdk.Context, height int64, extInfo cometabci.ExtendedCommitInfo) error {
time.Sleep(100 * time.Millisecond)
return nil
},
codec.NewDefaultVoteExtensionCodec(),
codec.NewDefaultExtendedCommitCodec(),
currencypairmocks.NewCurrencyPairStrategy(s.T()),
metricsmocks,
)

req := s.createRequestPrepareProposal( // the votes here are invalid, but that's fine
cometabci.ExtendedCommitInfo{
Round: 1,
Votes: nil,
},
nil,
4, // vote extensions will be enabled
)

s.ctx = s.ctx.WithBlockHeight(4)
metricsmocks.On("ObserveABCIMethodLatency", servicemetrics.PrepareProposal, mock.Anything).Return().Run(func(args mock.Arguments) {
// the second arg shld be a duration
latency := args.Get(1).(time.Duration)
s.Require().True(latency >= 100*time.Millisecond) // shld have included latency from validate vote extensions
s.Require().True(latency < 300*time.Millisecond) // shld have ignored wrapped prepare-proposal latency
}).Once()

_, err := propHandler.PrepareProposalHandler()(s.ctx, req)
s.Require().NoError(err)
})

s.Run("prepare proposal latency is reported in the case of failures", func() {
propHandler := proposals.NewProposalHandler(
log.NewTestLogger(s.T()),
nil,
nil,
func(ctx sdk.Context, height int64, extInfo cometabci.ExtendedCommitInfo) error {
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("error in validate vote extensions")
},
codec.NewDefaultVoteExtensionCodec(),
codec.NewDefaultExtendedCommitCodec(),
currencypairmocks.NewCurrencyPairStrategy(s.T()),
metricsmocks,
)

req := s.createRequestPrepareProposal( // the votes here are invalid, but that's fine
cometabci.ExtendedCommitInfo{
Round: 1,
Votes: nil,
},
nil,
4, // vote extensions will be enabled
)
metricsmocks.On("ObserveABCIMethodLatency", servicemetrics.PrepareProposal, mock.Anything).Return().Run(func(args mock.Arguments) {
// the second arg shld be a duration
latency := args.Get(1).(time.Duration)
s.Require().True(latency >= 100*time.Millisecond) // shld have included latency from validate vote extensions
}).Once()

_, err := propHandler.PrepareProposalHandler()(s.ctx, req)
s.Require().Error(err, "error in validate vote extensions")
})

s.Run("wrapped process proposal latency is ignored", func() {
propHandler := proposals.NewProposalHandler(
log.NewTestLogger(s.T()),
baseapp.NoOpPrepareProposal(),
func(ctx sdk.Context, req *cometabci.RequestProcessProposal) (*cometabci.ResponseProcessProposal, error) {
// simulate a long-running process proposal
time.Sleep(200 * time.Millisecond)
return &cometabci.ResponseProcessProposal{}, nil
},
func(ctx sdk.Context, height int64, extInfo cometabci.ExtendedCommitInfo) error {
// simulate a long-running validate vote extensions
time.Sleep(100 * time.Millisecond)
return nil
},
codec.NewDefaultVoteExtensionCodec(),
codec.NewDefaultExtendedCommitCodec(),
currencypairmocks.NewCurrencyPairStrategy(s.T()),
metricsmocks,
)

_, extInfoBz, err := testutils.CreateExtendedCommitInfo(
[]cometabci.ExtendedVoteInfo{},
codec.NewDefaultExtendedCommitCodec(),
)
s.Require().NoError(err)

req := s.createRequestProcessProposal([][]byte{extInfoBz}, 4)
metricsmocks.On("ObserveABCIMethodLatency", servicemetrics.ProcessProposal, mock.Anything).Return().Run(func(args mock.Arguments) {
// the second arg shld be a duration
latency := args.Get(1).(time.Duration)
s.Require().True(latency >= 100*time.Millisecond) // shld have included validate vote extensions latency
s.Require().True(latency < 300*time.Millisecond) // shld have ignored the wrapped processProposal latency
}).Once()

_, err = propHandler.ProcessProposalHandler()(s.ctx, req)
s.Require().NoError(err)
})

s.Run("process proposal latency is reported in the case of failures", func() {
propHandler := proposals.NewProposalHandler(
log.NewTestLogger(s.T()),
nil,
nil,
func(ctx sdk.Context, height int64, extInfo cometabci.ExtendedCommitInfo) error {
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("error in validate vote extensions")
},
codec.NewDefaultVoteExtensionCodec(),
codec.NewDefaultExtendedCommitCodec(),
currencypairmocks.NewCurrencyPairStrategy(s.T()),
metricsmocks,
)

_, extInfoBz, err := testutils.CreateExtendedCommitInfo(
[]cometabci.ExtendedVoteInfo{},
codec.NewDefaultExtendedCommitCodec(),
)
s.Require().NoError(err)

req := s.createRequestProcessProposal([][]byte{extInfoBz}, 4)
metricsmocks.On("ObserveABCIMethodLatency", servicemetrics.ProcessProposal, mock.Anything).Return().Run(func(args mock.Arguments) {
// the second arg shld be a duration
latency := args.Get(1).(time.Duration)
s.Require().True(latency >= 100*time.Millisecond) // shld have included validate vote extensions latency
}).Once()

_, err = propHandler.ProcessProposalHandler()(s.ctx, req)
s.Require().Error(err, "error in validate vote extensions")
})
}

func (s *ProposalsTestSuite) createRequestPrepareProposal(
extendedCommitInfo cometabci.ExtendedCommitInfo,
txs [][]byte,
Expand Down
4 changes: 2 additions & 2 deletions abci/ve/vote_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type VoteExtensionHandler struct {
// voteExtensionCodec is an interface to handle the marshalling / unmarshalling of vote-extensions
voteExtensionCodec compression.VoteExtensionCodec

// preBlocker is utilzed to update and retrieve the latest on-chain price information.
// preBlocker is utilized to update and retrieve the latest on-chain price information.
preBlocker sdk.PreBlocker
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (h *VoteExtensionHandler) ExtendVoteHandler() sdk.ExtendVoteHandler {
reqCtx, cancel := context.WithTimeout(ctx.Context(), h.timeout)
defer cancel()

// To ensure liveliness, we return a vote even if the oracle is not running
// To ensure liveness, we return a vote even if the oracle is not running
// or if the oracle returns a bad response.
oracleResp, err := h.oracleClient.Prices(reqCtx, &service.QueryPricesRequest{})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion contrib/images/slinky.e2e.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ RUN make build-test-app
## This will expose the tendermint and cosmos ports alongside
## starting up the sim app and the slinky daemon
FROM ubuntu:rolling
EXPOSE 26656 26657 1317 9090 7171 26655
EXPOSE 26656 26657 1317 9090 7171 26655 8081
ENTRYPOINT ["slinkyd", "start"]

COPY --from=builder /src/slinky/build/* /usr/local/bin/
Expand Down
2 changes: 1 addition & 1 deletion contrib/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ scrape_configs:
# scheme defaults to 'http'.

static_configs:
- targets: ["host.docker.internal:8000"]
- targets: ["host.docker.internal:8000"]
22 changes: 22 additions & 0 deletions service/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Oracle Application / Service Metrics

## `oracle_response_latency`

* **purpose**
* This prometheus histogram measures the RTT time taken (per request) from the `metrics_client`'s request to the oracle's server's response.
* Observations from this histogram are measured in nano-seconds

## `oracle_responses`

* **purpose**
* This prometheus counter measures the the # of oracle responses that a `metrics_client` has received
* **labels**
* `status` := (failure, success)

## `oracle_ABCI_method_latency`

* **purpose**
* This prometheus histogram measures the latency (per request) in seconds of ABCI method calls
* The latency is measured over all of the slinky-specific code, and ignores any down-stream dependencies
* **labels**
* `method`: one of (ExtendVote, PrepareProposal, ProcessProposal, VerifyVoteExtension, FinalizeBlock), this is the ABCI method that this latency report resulted from
Loading

0 comments on commit 2b0af7d

Please sign in to comment.