Skip to content

Commit

Permalink
Merge pull request 0xPolygonHermez#1565 from 0xPolygonHermez/fix/ws-h…
Browse files Browse the repository at this point in the history
…ttp-same-port

Fix/ws http same port
  • Loading branch information
MorettiGeorgiev authored Dec 17, 2024
2 parents 6239b09 + 2fcc111 commit e09961a
Showing 1 changed file with 90 additions and 72 deletions.
162 changes: 90 additions & 72 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ var rootCmd = &cobra.Command{
Short: "rpcdaemon is JSON RPC server that connects to Erigon node for remote DB access",
}

var (
stateCacheStr string
)
var stateCacheStr string

func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
utils.CobraFlags(rootCmd, debug.Flags, utils.MetricFlags, logging.Flags)
Expand Down Expand Up @@ -162,7 +160,6 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
}

rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error {

err := cfg.StateCache.CacheSize.UnmarshalText([]byte(stateCacheStr))
if err != nil {
return fmt.Errorf("state.cache value of %v is not valid", stateCacheStr)
Expand Down Expand Up @@ -307,7 +304,8 @@ func EmbeddedServices(ctx context.Context,
func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger, rootCancel context.CancelFunc) (
db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
stateCache kvcache.Cache, blockReader services.FullBlockReader, engine consensus.EngineReader,
ff *rpchelper.Filters, agg *libstate.Aggregator, err error) {
ff *rpchelper.Filters, agg *libstate.Aggregator, err error,
) {
if !cfg.WithDatadir && cfg.PrivateApiAddr == "" {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("either remote db or local db must be specified")
}
Expand Down Expand Up @@ -632,12 +630,77 @@ func startRegularRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []r
}

httpHandler := node.NewHTTPHandlerStack(srv, cfg.HttpCORSDomain, cfg.HttpVirtualHost, cfg.HttpCompression)

graphQLHandler := graphql.CreateHandler(defaultAPIList)

apiHandler, err := createHandler(*cfg, defaultAPIList, httpHandler, nil, graphQLHandler, nil)
var wsHandler http.Handler

if cfg.WebsocketEnabled {
wsSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, false, logger, cfg.RPCSlowLogThreshold)
wsHttpHandler := wsSrv.WebsocketHandler(cfg.WebsocketCORSDomain, nil, cfg.WebsocketCompression, logger)
wsHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsHttpHandler.ServeHTTP(w, r)
})
allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
return err
}

var wsApiFlags []string
for _, flag := range cfg.WebsocketApi {
if flag != "engine" {
wsApiFlags = append(wsApiFlags, flag)
}
}

if err := node.RegisterApisFromWhitelist(defaultAPIList, wsApiFlags, wsSrv, false, logger); err != nil {
return fmt.Errorf("could not start register WS apis: %w", err)
}
wsSrv.SetAllowList(allowListForRPC)

wsSrv.SetBatchLimit(cfg.BatchLimit)

var defaultAPIList []rpc.API

for _, api := range rpcAPI {
if api.Namespace != "engine" {
defaultAPIList = append(defaultAPIList, api)
}
}

var apiFlags []string
for _, flag := range cfg.API {
if flag != "engine" {
apiFlags = append(apiFlags, flag)
}
}

if err := node.RegisterApisFromWhitelist(defaultAPIList, apiFlags, wsSrv, false, logger); err != nil {
return fmt.Errorf("could not start register RPC apis: %w", err)
}

if cfg.WebsocketPort != cfg.HttpPort {
wsEndpoint := fmt.Sprintf("tcp://%s:%d", cfg.WebsocketListenAddress, cfg.WebsocketPort)

wsListener, wsHttpAddr, err := node.StartHTTPEndpoint(wsEndpoint, &node.HttpEndpointConfig{
Timeouts: cfg.HTTPTimeouts,
}, wsHandler)
if err != nil {
return fmt.Errorf("could not start ws RPC api: %w", err)
}

defer func() {
wsSrv.Stop()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = wsListener.Shutdown(shutdownCtx)
log.Info("WS endpoint closed", "url", wsHttpAddr)
}()
}
}

apiHandler, err := createApiHandler(*cfg, defaultAPIList, httpHandler, wsHandler, graphQLHandler)
if err != nil {
return err
return fmt.Errorf("could not create API handler: %w", err)
}

if cfg.HttpServerEnabled {
Expand Down Expand Up @@ -722,73 +785,29 @@ func startRegularRpcServer(ctx context.Context, cfg *httpcfg.HttpCfg, rpcAPI []r

log.Info("HTTP endpoint opened", info...)

if cfg.WebsocketEnabled {
wsSrv := rpc.NewServer(cfg.RpcBatchConcurrency, cfg.TraceRequests, cfg.RpcStreamingDisable, false, logger, cfg.RPCSlowLogThreshold)

allowListForRPC, err := parseAllowListForRPC(cfg.RpcAllowListFilePath)
if err != nil {
return err
}

var wsApiFlags []string
for _, flag := range cfg.WebsocketApi {
if flag != "engine" {
wsApiFlags = append(wsApiFlags, flag)
}
}

if err := node.RegisterApisFromWhitelist(defaultAPIList, wsApiFlags, wsSrv, false, logger); err != nil {
return fmt.Errorf("could not start register WS apis: %w", err)
}
wsSrv.SetAllowList(allowListForRPC)

wsSrv.SetBatchLimit(cfg.BatchLimit)

var defaultAPIList []rpc.API

for _, api := range rpcAPI {
if api.Namespace != "engine" {
defaultAPIList = append(defaultAPIList, api)
}
}

var apiFlags []string
for _, flag := range cfg.API {
if flag != "engine" {
apiFlags = append(apiFlags, flag)
}
}

if err := node.RegisterApisFromWhitelist(defaultAPIList, apiFlags, wsSrv, false, logger); err != nil {
return fmt.Errorf("could not start register RPC apis: %w", err)
}

wsEndpoint := fmt.Sprintf("tcp://%s:%d", cfg.WebsocketListenAddress, cfg.WebsocketPort)

wsHttpHandler := wsSrv.WebsocketHandler(cfg.WebsocketCORSDomain, nil, cfg.WebsocketCompression, logger)
wsHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsHttpHandler.ServeHTTP(w, r)
})
<-ctx.Done()
logger.Info("Exiting...")
return nil
}

wsListener, wsHttpAddr, err := node.StartHTTPEndpoint(wsEndpoint, &node.HttpEndpointConfig{
Timeouts: cfg.HTTPTimeouts,
}, wsHandler)
func createApiHandler(
cfg httpcfg.HttpCfg,
defaultAPIList []rpc.API,
httpHandler, wsHandler, graphQLHandler http.Handler,
) (http.Handler, error) {
if !cfg.WebsocketEnabled || cfg.WebsocketPort != cfg.HttpPort {
apiHandler, err := createHandler(cfg, defaultAPIList, httpHandler, nil, graphQLHandler, nil)
if err != nil {
return fmt.Errorf("could not start ws RPC api: %w", err)
return nil, fmt.Errorf("could not create API handler without WS handler: %w", err)
}

defer func() {
wsSrv.Stop()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = wsListener.Shutdown(shutdownCtx)
log.Info("WS endpoint closed", "url", wsHttpAddr)
}()
return apiHandler, nil
}

<-ctx.Done()
logger.Info("Exiting...")
return nil
apiHandler, err := createHandler(cfg, defaultAPIList, httpHandler, wsHandler, graphQLHandler, nil)
if err != nil {
return nil, fmt.Errorf("could not create API handler with WS handler: %w", err)
}
return apiHandler, nil
}

type engineInfo struct {
Expand Down Expand Up @@ -854,7 +873,7 @@ func ObtainJWTSecret(cfg *httpcfg.HttpCfg, logger log.Logger) ([]byte, error) {
jwtSecret := make([]byte, 32)
rand.Read(jwtSecret)

if err := os.WriteFile(cfg.JWTSecretPath, []byte(hexutility.Encode(jwtSecret)), 0600); err != nil {
if err := os.WriteFile(cfg.JWTSecretPath, []byte(hexutility.Encode(jwtSecret)), 0o600); err != nil {
return nil, err
}
logger.Info("Generated JWT secret", "path", cfg.JWTSecretPath)
Expand Down Expand Up @@ -971,7 +990,6 @@ func (e *remoteConsensusEngine) init(db kv.RoDB, blockReader services.FullBlockR
borKv, err := remotedb.NewRemote(gointerfaces.VersionFromProto(remotedbserver.KvServiceAPIVersion), logger, remoteKV).
WithBucketsConfig(kv.BorTablesCfg).
Open()

if err != nil {
return false
}
Expand Down

0 comments on commit e09961a

Please sign in to comment.