Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

We have not used taints for a long time #164

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 14 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
RPC Gateway
===
> [!CAUTION]
> The rpc-gateway is in development mode, and you should not consider it
> stable yet.

RPC Gateway acts as a failover proxy routing ETH RPC requests across configured RPC nodes. For every ETH RPC node(group) configured the RPC Gateway tracks its latency, current height and error rates. These are then used to determine whether or not to failover.
## RPC Gateway

The rpc-gateway is a failover proxy for node providers. When health checks
fail, the rpc-gateway automatically routes requests to a backup node provider.

From a high level it simply looks like this:
```mermaid
sequenceDiagram
Alice->>RPC Gateway: eth_call
Expand All @@ -22,42 +25,24 @@ Infura-->>RPC Gateway: {"result":[...]}
RPC Gateway-->>Alice: {"result":[...]}
```

The gateway assesses the health of the underlying RPC provider by:
- continuously (configurable how often) checking the blockNumber, if the request fails or timeouts it marks it as unhealthy (configurable thresholds)
- every request that fails will be rerouted to the next available healthy target after a configurable amount of retries
- if it will be rerouted the current target will be "tainted"

## Developing
## Development

Start dependent services
```zsh
```console
docker-compose up
```

Make sure the test pass
```zsh
go test
```console
go test -v ./...
```

To run the app locally
```zsh
go run . --config ./example_config.yml
```

## Running & Configuration

Build the binary:
```
go build
```

The statically linked `rpc-gateway` binary has one flag `--config` that defaults to `./config.yml` simply run it by:
```
./rpc-gateway --config ~/.rpc-gateway/config.yml
```console
DEBUG=true go run cmd/rpcgateway/main.go --config example_config.yml
```


### Configuration
## Configuration

```yaml
metrics:
Expand All @@ -83,23 +68,3 @@ targets: # the order here determines the failover order
http: # ws is supported by default, it will be a sticky connection.
url: "https://alchemy.com/rpc/<apikey>"
```

## Websockets

Websockets are sticky and are handled transparently.

## Taints

Taints are a way for the `HealthcheckManager` to mark a node as unhealthy even though it responds to RPC calls. Some reasons for that are:
- BlockNumber is way behind a "quorum".
- A number of proxied requests fail in a given time.

Currently taint clearing is not implemented yet.

## Build Docker images locally
We should build multi-arch image so the image can be run in both `arm64` and `amd64` arch.

```zsh
TAG="$(git rev-parse HEAD)"
docker buildx build --platform linux/amd64,linux/arm64 -t 883408475785.dkr.ecr.us-east-1.amazonaws.com/rpc-gateway:${TAG} --push .
```
73 changes: 4 additions & 69 deletions internal/proxy/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ type HealthCheckerConfig struct {
SuccessThreshold uint `yaml:"healthcheckInterval"`
}

const (
// Initially we wait for 30s then remove the taint.
initialTaintWaitTime = time.Second * 30
// We do exponential backoff taint removal but the wait time won't be more than 10 minutes.
maxTaintWaitTime = time.Minute * 10
// Reset taint wait time (to `initialTaintWaitTime`) if it's been 5 minutes since the last taint removal.
resetTaintWaitTimeAfterDuration = time.Minute * 5
)

type HealthChecker struct {
client *rpc.Client
httpClient *http.Client
Expand All @@ -51,15 +42,6 @@ type HealthChecker struct {
// gasLimit received from the GasLeft.sol contract call.
gasLimit uint64

// RPCHealthChecker can be tainted by the abstraction on top. Reasons:
// Forced failover
// Blocknumber is behind the other
isTainted bool
// The time when the last taint removal happened
lastTaintRemoval time.Time
// The current wait time for the taint removal
currentTaintWaitTime time.Duration

// is the ethereum RPC node healthy according to the RPCHealthchecker
isHealthy bool

Expand All @@ -75,11 +57,10 @@ func NewHealthChecker(config HealthCheckerConfig) (*HealthChecker, error) {
client.SetHeader("User-Agent", userAgent)

healthchecker := &HealthChecker{
client: client,
httpClient: &http.Client{},
config: config,
isHealthy: true,
currentTaintWaitTime: initialTaintWaitTime,
client: client,
httpClient: &http.Client{},
config: config,
isHealthy: true,
}

return healthchecker, nil
Expand Down Expand Up @@ -190,11 +171,6 @@ func (h *HealthChecker) IsHealthy() bool {
h.mu.RLock()
defer h.mu.RUnlock()

if h.isTainted {
// If the healthchecker is tainted, we always return unhealthy
return false
}

return h.isHealthy
}

Expand All @@ -211,44 +187,3 @@ func (h *HealthChecker) GasLimit() uint64 {

return h.gasLimit
}

func (h *HealthChecker) IsTainted() bool {
h.mu.RLock()
defer h.mu.RUnlock()

return h.isTainted
}

func (h *HealthChecker) Taint() {
h.mu.Lock()
defer h.mu.Unlock()

if h.isTainted {
return
}
h.isTainted = true
// Increase the wait time (exponentially) for taint removal if the RPC was tainted
// within resetTaintWaitTimeAfterDuration since the last taint removal
if time.Since(h.lastTaintRemoval) <= resetTaintWaitTimeAfterDuration {
h.currentTaintWaitTime *= 2
if h.currentTaintWaitTime > maxTaintWaitTime {
h.currentTaintWaitTime = maxTaintWaitTime
}
} else {
h.currentTaintWaitTime = initialTaintWaitTime
}
zap.L().Info("RPC Tainted", zap.String("name", h.config.Name), zap.Int64("taintWaitTime", int64(h.currentTaintWaitTime)))
go func() {
<-time.After(h.currentTaintWaitTime)
h.RemoveTaint()
}()
}

func (h *HealthChecker) RemoveTaint() {
h.mu.Lock()
defer h.mu.Unlock()

h.isTainted = false
h.lastTaintRemoval = time.Now()
zap.L().Info("RPC Taint Removed", zap.String("name", h.config.Name))
}
4 changes: 2 additions & 2 deletions internal/proxy/healthchecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func TestBasicHealthchecker(t *testing.T) {
// TODO: can be flaky due to cloudflare-eth endpoint
assert.True(t, healthchecker.IsHealthy())

healthchecker.Taint()
healthchecker.isHealthy = false
assert.False(t, healthchecker.IsHealthy())

healthchecker.RemoveTaint()
healthchecker.isHealthy = true
assert.True(t, healthchecker.IsHealthy())
}

Expand Down
22 changes: 14 additions & 8 deletions internal/proxy/healthcheckmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,26 @@ func (h *HealthCheckManager) runLoop(c context.Context) error {
}
}

func (h *HealthCheckManager) IsHealthy(name string) bool {
for _, hc := range h.hcs {
if hc.Name() == name && hc.IsHealthy() {
return true
}
}

return false
}

func (h *HealthCheckManager) reportStatusMetrics() {
for _, hc := range h.hcs {
healthy := 0
tainted := 0
if hc.IsHealthy() {
healthy = 1
}
if hc.IsTainted() {
tainted = 1
h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(1)
} else {
h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(0)
}

h.metricRPCProviderGasLimit.WithLabelValues(hc.Name()).Set(float64(hc.BlockNumber()))
h.metricRPCProviderBlockNumber.WithLabelValues(hc.Name()).Set(float64(hc.BlockNumber()))
h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "healthy").Set(float64(healthy))
h.metricRPCProviderStatus.WithLabelValues(hc.Name(), "tainted").Set(float64(tainted))
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

for _, target := range p.targets {
if !p.healthcheckManager.IsHealthy(target.Config.Name) {
continue
}
start := time.Now()

pw := NewResponseWriter()
Expand Down
54 changes: 24 additions & 30 deletions internal/rpcgateway/rpcgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,34 @@ import (
)

type RPCGateway struct {
config RPCGatewayConfig
httpFailoverProxy *proxy.Proxy
healthcheckManager *proxy.HealthCheckManager
server *http.Server
config RPCGatewayConfig
proxy *proxy.Proxy
hcm *proxy.HealthCheckManager
server *http.Server
}

func (r *RPCGateway) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.server.Handler.ServeHTTP(w, req)
}

func (r *RPCGateway) Start(ctx context.Context) error {
func (r *RPCGateway) Start(c context.Context) error {
zap.L().Info("starting rpc gateway")

go func() {
zap.L().Info("starting healthcheck manager")
err := r.healthcheckManager.Start(ctx)
err := r.hcm.Start(c)
if err != nil {
// TODO: Handle gracefully
zap.L().Fatal("failed to start healthcheck manager", zap.Error(err))
}
}()

r.server.Addr = fmt.Sprintf(":%s", r.config.Proxy.Port)

return r.server.ListenAndServe()
}

func (r *RPCGateway) Stop(ctx context.Context) error {
func (r *RPCGateway) Stop(c context.Context) error {
zap.L().Info("stopping rpc gateway")
err := r.healthcheckManager.Stop(ctx)
err := r.hcm.Stop(c)
if err != nil {
zap.L().Error("healthcheck manager failed to stop gracefully", zap.Error(err))
}
Expand All @@ -57,18 +55,18 @@ func (r *RPCGateway) Stop(ctx context.Context) error {
}

func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
healthcheckManager := proxy.NewHealthCheckManager(
hcm := proxy.NewHealthCheckManager(
proxy.HealthCheckManagerConfig{
Targets: config.Targets,
Config: config.HealthChecks,
})
httpFailoverProxy := proxy.NewProxy(
proxy := proxy.NewProxy(
proxy.Config{
Proxy: config.Proxy,
Targets: config.Targets,
HealthChecks: config.HealthChecks,
},
healthcheckManager,
hcm,
)

r := mux.NewRouter()
Expand All @@ -84,24 +82,20 @@ func NewRPCGateway(config RPCGatewayConfig) *RPCGateway {
zapmw.Request(zapcore.InfoLevel, "request"),
zapmw.Recoverer(zapcore.ErrorLevel, "recover", zapmw.RecovererDefault),
)

srv := &http.Server{
Handler: r,
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
ReadHeaderTimeout: 5 * time.Second,
}

gateway := &RPCGateway{
config: config,
httpFailoverProxy: httpFailoverProxy,
healthcheckManager: healthcheckManager,
server: srv,
r.PathPrefix("/").Handler(proxy)

return &RPCGateway{
config: config,
proxy: proxy,
hcm: hcm,
server: &http.Server{
Addr: fmt.Sprintf(":%s", config.Proxy.Port),
Handler: r,
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
ReadHeaderTimeout: time.Second * 5,
},
}

r.PathPrefix("/").Handler(httpFailoverProxy)

return gateway
}

func NewRPCGatewayFromConfigFile(path string) (*RPCGatewayConfig, error) {
Expand Down
Loading