From a404edd13d49c2db88f9712faf1a9edf5f419a42 Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Fri, 16 Feb 2024 17:13:44 +0100 Subject: [PATCH 1/2] We have not used taints for a long time Yes, it was a dead code. --- internal/proxy/healthchecker.go | 73 ++-------------------------- internal/proxy/healthchecker_test.go | 4 +- internal/proxy/healthcheckmanager.go | 22 ++++++--- internal/proxy/proxy.go | 3 ++ internal/rpcgateway/rpcgateway.go | 54 +++++++++----------- 5 files changed, 47 insertions(+), 109 deletions(-) diff --git a/internal/proxy/healthchecker.go b/internal/proxy/healthchecker.go index b24ef23..6071df7 100644 --- a/internal/proxy/healthchecker.go +++ b/internal/proxy/healthchecker.go @@ -32,15 +32,6 @@ type HealthCheckerConfig struct { SuccessThreshold uint `yaml:"healthcheckInterval"` } -const ( - // Initially we wait for 30s then remove the taint. - initialTaintWaitTime = time.Second * 30 - // We do exponential backoff taint removal but the wait time won't be more than 10 minutes. - maxTaintWaitTime = time.Minute * 10 - // Reset taint wait time (to `initialTaintWaitTime`) if it's been 5 minutes since the last taint removal. - resetTaintWaitTimeAfterDuration = time.Minute * 5 -) - type HealthChecker struct { client *rpc.Client httpClient *http.Client @@ -51,15 +42,6 @@ type HealthChecker struct { // gasLimit received from the GasLeft.sol contract call. gasLimit uint64 - // RPCHealthChecker can be tainted by the abstraction on top. Reasons: - // Forced failover - // Blocknumber is behind the other - isTainted bool - // The time when the last taint removal happened - lastTaintRemoval time.Time - // The current wait time for the taint removal - currentTaintWaitTime time.Duration - // is the ethereum RPC node healthy according to the RPCHealthchecker isHealthy bool @@ -75,11 +57,10 @@ func NewHealthChecker(config HealthCheckerConfig) (*HealthChecker, error) { client.SetHeader("User-Agent", userAgent) healthchecker := &HealthChecker{ - client: client, - httpClient: &http.Client{}, - config: config, - isHealthy: true, - currentTaintWaitTime: initialTaintWaitTime, + client: client, + httpClient: &http.Client{}, + config: config, + isHealthy: true, } return healthchecker, nil @@ -190,11 +171,6 @@ func (h *HealthChecker) IsHealthy() bool { h.mu.RLock() defer h.mu.RUnlock() - if h.isTainted { - // If the healthchecker is tainted, we always return unhealthy - return false - } - return h.isHealthy } @@ -211,44 +187,3 @@ func (h *HealthChecker) GasLimit() uint64 { return h.gasLimit } - -func (h *HealthChecker) IsTainted() bool { - h.mu.RLock() - defer h.mu.RUnlock() - - return h.isTainted -} - -func (h *HealthChecker) Taint() { - h.mu.Lock() - defer h.mu.Unlock() - - if h.isTainted { - return - } - h.isTainted = true - // Increase the wait time (exponentially) for taint removal if the RPC was tainted - // within resetTaintWaitTimeAfterDuration since the last taint removal - if time.Since(h.lastTaintRemoval) <= resetTaintWaitTimeAfterDuration { - h.currentTaintWaitTime *= 2 - if h.currentTaintWaitTime > maxTaintWaitTime { - h.currentTaintWaitTime = maxTaintWaitTime - } - } else { - h.currentTaintWaitTime = initialTaintWaitTime - } - zap.L().Info("RPC Tainted", zap.String("name", h.config.Name), zap.Int64("taintWaitTime", int64(h.currentTaintWaitTime))) - go func() { - <-time.After(h.currentTaintWaitTime) - h.RemoveTaint() - }() -} - -func (h *HealthChecker) RemoveTaint() { - h.mu.Lock() - defer h.mu.Unlock() - - h.isTainted = false - h.lastTaintRemoval = time.Now() - zap.L().Info("RPC Taint Removed", zap.String("name", h.config.Name)) -} diff --git a/internal/proxy/healthchecker_test.go b/internal/proxy/healthchecker_test.go index 42ec390..69a14a2 100644 --- a/internal/proxy/healthchecker_test.go +++ b/internal/proxy/healthchecker_test.go @@ -40,10 +40,10 @@ func TestBasicHealthchecker(t *testing.T) { // TODO: can be flaky due to cloudflare-eth endpoint assert.True(t, healthchecker.IsHealthy()) - healthchecker.Taint() + healthchecker.isHealthy = false assert.False(t, healthchecker.IsHealthy()) - healthchecker.RemoveTaint() + healthchecker.isHealthy = true assert.True(t, healthchecker.IsHealthy()) } diff --git a/internal/proxy/healthcheckmanager.go b/internal/proxy/healthcheckmanager.go index fb4d83b..d6d841e 100644 --- a/internal/proxy/healthcheckmanager.go +++ b/internal/proxy/healthcheckmanager.go @@ -97,20 +97,26 @@ func (h *HealthCheckManager) runLoop(c context.Context) error { } } +func (h *HealthCheckManager) IsHealthy(name string) bool { + for _, hc := range h.hcs { + if hc.Name() == name && hc.IsHealthy() { + return true + } + } + + return false +} + func (h *HealthCheckManager) reportStatusMetrics() { for _, hc := range h.hcs { - healthy := 0 - tainted := 0 if hc.IsHealthy() { - healthy = 1 - } - if hc.IsTainted() { - tainted = 1 + h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(1) + } else { + h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(0) } + h.metricRPCProviderGasLimit.WithLabelValues(hc.Name()).Set(float64(hc.BlockNumber())) h.metricRPCProviderBlockNumber.WithLabelValues(hc.Name()).Set(float64(hc.BlockNumber())) - h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(float64(healthy)) - h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "tainted").Set(float64(tainted)) } } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 5e90d7a..98dc83b 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -118,6 +118,9 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { } for _, target := range p.targets { + if !p.healthcheckManager.IsHealthy(target.Config.Name) { + continue + } start := time.Now() pw := NewResponseWriter() diff --git a/internal/rpcgateway/rpcgateway.go b/internal/rpcgateway/rpcgateway.go index 99ac5a2..048909b 100644 --- a/internal/rpcgateway/rpcgateway.go +++ b/internal/rpcgateway/rpcgateway.go @@ -19,36 +19,34 @@ import ( ) type RPCGateway struct { - config RPCGatewayConfig - httpFailoverProxy *proxy.Proxy - healthcheckManager *proxy.HealthCheckManager - server *http.Server + config RPCGatewayConfig + proxy *proxy.Proxy + hcm *proxy.HealthCheckManager + server *http.Server } func (r *RPCGateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { r.server.Handler.ServeHTTP(w, req) } -func (r *RPCGateway) Start(ctx context.Context) error { +func (r *RPCGateway) Start(c context.Context) error { zap.L().Info("starting rpc gateway") go func() { zap.L().Info("starting healthcheck manager") - err := r.healthcheckManager.Start(ctx) + err := r.hcm.Start(c) if err != nil { // TODO: Handle gracefully zap.L().Fatal("failed to start healthcheck manager", zap.Error(err)) } }() - r.server.Addr = fmt.Sprintf(":%s", r.config.Proxy.Port) - return r.server.ListenAndServe() } -func (r *RPCGateway) Stop(ctx context.Context) error { +func (r *RPCGateway) Stop(c context.Context) error { zap.L().Info("stopping rpc gateway") - err := r.healthcheckManager.Stop(ctx) + err := r.hcm.Stop(c) if err != nil { zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err)) } @@ -57,18 +55,18 @@ func (r *RPCGateway) Stop(ctx context.Context) error { } func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { - healthcheckManager := proxy.NewHealthCheckManager( + hcm := proxy.NewHealthCheckManager( proxy.HealthCheckManagerConfig{ Targets: config.Targets, Config: config.HealthChecks, }) - httpFailoverProxy := proxy.NewProxy( + proxy := proxy.NewProxy( proxy.Config{ Proxy: config.Proxy, Targets: config.Targets, HealthChecks: config.HealthChecks, }, - healthcheckManager, + hcm, ) r := mux.NewRouter() @@ -84,24 +82,20 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway { zapmw.Request(zapcore.InfoLevel, "request"), zapmw.Recoverer(zapcore.ErrorLevel, "recover", zapmw.RecovererDefault), ) - - srv := &http.Server{ - Handler: r, - WriteTimeout: 15 * time.Second, - ReadTimeout: 15 * time.Second, - ReadHeaderTimeout: 5 * time.Second, - } - - gateway := &RPCGateway{ - config: config, - httpFailoverProxy: httpFailoverProxy, - healthcheckManager: healthcheckManager, - server: srv, + r.PathPrefix("/").Handler(proxy) + + return &RPCGateway{ + config: config, + proxy: proxy, + hcm: hcm, + server: &http.Server{ + Addr: fmt.Sprintf(":%s", config.Proxy.Port), + Handler: r, + WriteTimeout: time.Second * 15, + ReadTimeout: time.Second * 15, + ReadHeaderTimeout: time.Second * 5, + }, } - - r.PathPrefix("/").Handler(httpFailoverProxy) - - return gateway } func NewRPCGatewayFromConfigFile(path string) (*RPCGatewayConfig, error) { From c551f0f76441a20cfe9e0bf711a4e7b7e3eca106 Mon Sep 17 00:00:00 2001 From: Marcin Wolny Date: Fri, 16 Feb 2024 17:25:16 +0100 Subject: [PATCH 2/2] Update README --- README.md | 63 +++++++++++++------------------------------------------ 1 file changed, 14 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index f557be3..1888d99 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,12 @@ -RPC Gateway -=== +> [!CAUTION] +> The rpc-gateway is in development mode, and you should not consider it +> stable yet. -RPC Gateway acts as a failover proxy routing ETH RPC requests across configured RPC nodes. For every ETH RPC node(group) configured the RPC Gateway tracks its latency, current height and error rates. These are then used to determine whether or not to failover. +## RPC Gateway + +The rpc-gateway is a failover proxy for node providers. When health checks +fail, the rpc-gateway automatically routes requests to a backup node provider. -From a high level it simply looks like this: ```mermaid sequenceDiagram Alice->>RPC Gateway: eth_call @@ -22,42 +25,24 @@ Infura-->>RPC Gateway: {"result":[...]} RPC Gateway-->>Alice: {"result":[...]} ``` -The gateway assesses the health of the underlying RPC provider by: -- continuously (configurable how often) checking the blockNumber, if the request fails or timeouts it marks it as unhealthy (configurable thresholds) -- every request that fails will be rerouted to the next available healthy target after a configurable amount of retries - - if it will be rerouted the current target will be "tainted" - -## Developing +## Development Start dependent services -```zsh +```console docker-compose up ``` Make sure the test pass -```zsh -go test +```console +go test -v ./... ``` To run the app locally -```zsh -go run . --config ./example_config.yml -``` - -## Running & Configuration - -Build the binary: -``` -go build -``` - -The statically linked `rpc-gateway` binary has one flag `--config` that defaults to `./config.yml` simply run it by: -``` -./rpc-gateway --config ~/.rpc-gateway/config.yml +```console +DEBUG=true go run cmd/rpcgateway/main.go --config example_config.yml ``` - -### Configuration +## Configuration ```yaml metrics: @@ -83,23 +68,3 @@ targets: # the order here determines the failover order http: # ws is supported by default, it will be a sticky connection. url: "https://alchemy.com/rpc/" ``` - -## Websockets - -Websockets are sticky and are handled transparently. - -## Taints - -Taints are a way for the `HealthcheckManager` to mark a node as unhealthy even though it responds to RPC calls. Some reasons for that are: -- BlockNumber is way behind a "quorum". -- A number of proxied requests fail in a given time. - -Currently taint clearing is not implemented yet. - -## Build Docker images locally -We should build multi-arch image so the image can be run in both `arm64` and `amd64` arch. - -```zsh -TAG="$(git rev-parse HEAD)" -docker buildx build --platform linux/amd64,linux/arm64 -t 883408475785.dkr.ecr.us-east-1.amazonaws.com/rpc-gateway:${TAG} --push . -```