Skip to content

Commit

Permalink
Merge pull request #9074 from ipfs/rcgmr-auto-scale
Browse files Browse the repository at this point in the history
feat: go-libp2p v0.21 (rcmgr auto scaling)
  • Loading branch information
lidel authored Aug 16, 2022
2 parents a982068 + 77251b6 commit 00f2a64
Show file tree
Hide file tree
Showing 16 changed files with 814 additions and 537 deletions.
1 change: 1 addition & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
var opts = []corehttp.ServeOption{
corehttp.MetricsCollectionOption("api"),
corehttp.MetricsOpenCensusCollectionOption(),
corehttp.MetricsOpenCensusDefaultPrometheusRegistry(),
corehttp.CheckVersionOption(),
corehttp.CommandsOption(*cctx),
corehttp.WebUIOption,
Expand Down
8 changes: 6 additions & 2 deletions config/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ type ConnMgr struct {
// <https://github.com/libp2p/go-libp2p-resource-manager#readme>
type ResourceMgr struct {
// Enables the Network Resource Manager feature, default to on.
Enabled Flag `json:",omitempty"`
Limits *rcmgr.BasicLimiterConfig `json:",omitempty"`
Enabled Flag `json:",omitempty"`
Limits *rcmgr.LimitConfig `json:",omitempty"`
// A list of multiaddrs that can bypass normal system limits (but are still
// limited by the allowlist scope). Convenience config around
// https://pkg.go.dev/github.com/libp2p/go-libp2p-resource-manager#Allowlist.Add
Allowlist []string `json:",omitempty"`
}

const (
Expand Down
2 changes: 1 addition & 1 deletion core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel

// set scope limit to new values (when limit.json is passed as a second arg)
if req.Files != nil {
var newLimit rcmgr.BasicLimitConfig
var newLimit rcmgr.BaseLimit
it := req.Files.Entries()
if it.Next() {
file := files.FileFromEntry(it)
Expand Down
24 changes: 24 additions & 0 deletions core/corehttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@ func MetricsOpenCensusCollectionOption() ServeOption {
}
}

// MetricsOpenCensusDefaultPrometheusRegistry registers the default prometheus
// registry as an exporter to OpenCensus metrics. This means that OpenCensus
// metrics will show up in the prometheus metrics endpoint
func MetricsOpenCensusDefaultPrometheusRegistry() ServeOption {
return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
log.Info("Init OpenCensus with default prometheus registry")

pe, err := ocprom.NewExporter(ocprom.Options{
Registry: prometheus.DefaultRegisterer.(*prometheus.Registry),
OnError: func(err error) {
log.Errorw("OC default registry ERROR", "error", err)
},
})
if err != nil {
return nil, err
}

// register prometheus with opencensus
view.RegisterExporter(pe)

return mux, nil
}
}

// MetricsCollectionOption adds collection of net/http-related metrics.
func MetricsCollectionOption(handlerName string) ServeOption {
return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
Expand Down
182 changes: 61 additions & 121 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p-resource-manager/obs"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats/view"

"go.uber.org/fx"
)
Expand All @@ -34,7 +37,7 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {

enabled := cfg.ResourceMgr.Enabled.WithDefault(false)

/// ENV overrides Config (if present)
// ENV overrides Config (if present)
switch os.Getenv("LIBP2P_RCMGR") {
case "0", "false":
enabled = false
Expand All @@ -50,21 +53,41 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
}

defaultLimits := adjustedDefaultLimits(cfg)
limits := adjustedDefaultLimits(cfg)

var limits rcmgr.BasicLimiterConfig
if cfg.ResourceMgr.Limits != nil {
limits = *cfg.ResourceMgr.Limits
l := *cfg.ResourceMgr.Limits
l.Apply(limits)
limits = l
}

limiter, err := rcmgr.NewLimiter(limits, defaultLimits)
limiter := rcmgr.NewFixedLimiter(limits)

str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {
return nil, opts, err
}

libp2p.SetDefaultServiceLimits(limiter)
ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics()), rcmgr.WithTraceReporter(str)}

if len(cfg.ResourceMgr.Allowlist) > 0 {
var mas []multiaddr.Multiaddr
for _, maStr := range cfg.ResourceMgr.Allowlist {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
log.Errorf("failed to parse multiaddr=%v for allowlist, skipping. err=%v", maStr, err)
continue
}
mas = append(mas, ma)
}
ropts = append(ropts, rcmgr.WithAllowlistedMultiaddrs(mas))
log.Infof("Setting allowlist to: %v", mas)
}

ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics())}
err = view.Register(rcmgrObs.DefaultViews...)
if err != nil {
return nil, opts, fmt.Errorf("registering rcmgr obs views: %w", err)
}

if os.Getenv("LIBP2P_DEBUG_RCMGR") != "" {
traceFilePath := filepath.Join(repoPath, NetLimitTraceFilename)
Expand Down Expand Up @@ -195,39 +218,24 @@ func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) {
}
}

func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig, error) {
var result rcmgr.BasicLimitConfig
func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BaseLimit, error) {
var result rcmgr.BaseLimit
getLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}
limit := limiter.Limit()
switch l := limit.(type) {
case *rcmgr.StaticLimit:
result.Dynamic = false
case *rcmgr.BaseLimit:
result.Memory = l.Memory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD

case *rcmgr.DynamicLimit:
result.Dynamic = true
result.MemoryFraction = l.MemoryLimit.MemoryFraction
result.MinMemory = l.MemoryLimit.MinMemory
result.MaxMemory = l.MemoryLimit.MaxMemory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD

result.Streams = l.Streams
result.StreamsInbound = l.StreamsInbound
result.StreamsOutbound = l.StreamsOutbound
result.Conns = l.Conns
result.ConnsInbound = l.ConnsInbound
result.ConnsOutbound = l.ConnsOutbound
result.FD = l.FD
default:
return fmt.Errorf("unknown limit type %T", limit)
}
Expand All @@ -237,89 +245,36 @@ func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig

switch {
case scope == config.ResourceMgrSystemScope:
err := mgr.ViewSystem(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewSystem(func(s network.ResourceScope) error { return getLimit(s) })
case scope == config.ResourceMgrTransientScope:
err := mgr.ViewTransient(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewTransient(func(s network.ResourceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err := mgr.ViewService(svc, func(s network.ServiceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewService(svc, func(s network.ServiceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
pid, err := peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewPeer(pid, func(s network.PeerScope) error { return getLimit(s) })
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
}

// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config.
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BasicLimitConfig) error {
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BaseLimit) error {
setLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}

var newLimit rcmgr.Limit
if limit.Dynamic {
newLimit = &rcmgr.DynamicLimit{
MemoryLimit: rcmgr.MemoryLimit{
MemoryFraction: limit.MemoryFraction,
MinMemory: limit.MinMemory,
MaxMemory: limit.MaxMemory,
},
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
} else {
newLimit = &rcmgr.StaticLimit{
Memory: limit.Memory,
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
}

limiter.SetLimit(newLimit)
limiter.SetLimit(&limit)
return nil
}

Expand All @@ -329,65 +284,50 @@ func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limi
}

if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
cfg.Swarm.ResourceMgr.Limits = &rcmgr.LimitConfig{}
}
configLimits := cfg.Swarm.ResourceMgr.Limits

var setConfigFunc func()
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.System = &limit }

err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.System = limit }
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.Transient = &limit }

err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.Transient = limit }
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
return setLimit(s)
})
err = mgr.ViewService(svc, func(s network.ServiceScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Service == nil {
configLimits.Service = map[string]rcmgr.BasicLimitConfig{}
configLimits.Service = map[string]rcmgr.BaseLimit{}
}
configLimits.Service[svc] = limit
}

case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return setLimit(s)
})
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Protocol == nil {
configLimits.Protocol = map[string]rcmgr.BasicLimitConfig{}
configLimits.Protocol = map[protocol.ID]rcmgr.BaseLimit{}
}
configLimits.Protocol[proto] = limit
configLimits.Protocol[protocol.ID(proto)] = limit
}

case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return setLimit(s)
})
err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Peer == nil {
configLimits.Peer = map[string]rcmgr.BasicLimitConfig{}
configLimits.Peer = map[peer.ID]rcmgr.BaseLimit{}
}
configLimits.Peer[p] = limit
configLimits.Peer[pid] = limit
}

default:
return fmt.Errorf("invalid scope %q", scope)
}
Expand All @@ -397,7 +337,7 @@ func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limi
}

if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
cfg.Swarm.ResourceMgr.Limits = &rcmgr.LimitConfig{}
}
setConfigFunc()

Expand Down
Loading

0 comments on commit 00f2a64

Please sign in to comment.