diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 7f5049caf04..e8418eb97ec 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -14,12 +14,14 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "go.opencensus.io/stats" + "go.opencensus.io/tag" "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" @@ -266,7 +268,13 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.txnViewsCond.L = &ss.txnViewsMx ss.txnSyncCond.L = &ss.txnSyncMx ss.chainSyncCond.L = &ss.chainSyncMx - ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + baseCtx := context.Background() + ctx, err := tag.New(baseCtx, tag.Insert(metrics.Network, buildconstants.NetworkBundle)) + if err != nil { + return nil, xerrors.Errorf("failed to create context with network tag: %w", err) + } + ss.ctx, ss.cancel = context.WithCancel(ctx) ss.reifyCond.L = &ss.reifyMx ss.reifyPend = make(map[cid.Cid]struct{}) diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 75a09068cc7..1d4a277b388 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -9,6 +9,10 @@ import ( "path/filepath" "sync" + "github.com/filecoin-project/lotus/build/buildconstants" + "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/network" @@ -21,10 +25,6 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.uber.org/fx" - - "github.com/filecoin-project/lotus/metrics" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" ) var rcmgrMetricsOnce sync.Once @@ -179,6 +179,7 @@ type rcmgrMetrics struct{} func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -194,6 +195,7 @@ func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -209,6 +211,7 @@ func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -219,6 +222,7 @@ func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) if dir == network.DirInbound { ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) } else { @@ -229,46 +233,54 @@ func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { func (r rcmgrMetrics) AllowPeer(p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) stats.Record(ctx, metrics.RcmgrAllowPeer.M(1)) } func (r rcmgrMetrics) BlockPeer(p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) stats.Record(ctx, metrics.RcmgrBlockPeer.M(1)) } func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrAllowProto.M(1)) } func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrBlockProto.M(1)) } func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1)) } func (r rcmgrMetrics) AllowService(svc string) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrAllowSvc.M(1)) } func (r rcmgrMetrics) BlockService(svc string) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrBlockSvc.M(1)) } func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) { ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1)) } diff --git a/node/rpc.go b/node/rpc.go index ede1b924cd4..c9af4de3774 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" @@ -48,7 +49,9 @@ func ServeRPC(h http.Handler, id string, addr multiaddr.Multiaddr) (StopFunc, er Handler: h, ReadHeaderTimeout: 30 * time.Second, BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, id)) + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + ctx, _ = tag.New(ctx, tag.Upsert(metrics.APIInterface, id)) return ctx }, } diff --git a/paychmgr/manager.go b/paychmgr/manager.go index 97073801272..482c2c0b7f7 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -20,6 +20,10 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/lotus/build/buildconstants" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/tag" ) var log = logging.Logger("paych") @@ -70,6 +74,10 @@ type Manager struct { func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, pchstore *Store, api PaychAPI) *Manager { impl := &managerAPIImpl{StateManagerAPI: sm, PaychAPI: api} + + // Add network tag to context + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + return &Manager{ ctx: ctx, shutdown: shutdown, @@ -82,13 +90,16 @@ func NewManager(ctx context.Context, shutdown func(), sm stmgr.StateManagerAPI, // newManager is used by the tests to supply mocks func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + pm := &Manager{ store: pchstore, sa: &stateAccessor{sm: pchapi}, channels: make(map[string]*channelAccessor), pchapi: pchapi, } - pm.ctx, pm.shutdown = context.WithCancel(context.Background()) + pm.ctx, pm.shutdown = context.WithCancel(ctx) return pm, pm.Start() } diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index e6def455112..4c379619be1 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -17,6 +17,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/metrics" @@ -46,6 +47,12 @@ func NewDBIndex(al *alerting.Alerting, db *harmonydb.DB) *DBIndex { } } +// addNetworkTag adds the network tag to the context for metrics +func addNetworkTag(ctx context.Context) context.Context { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + return ctx +} + func (dbi *DBIndex) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) { var sectorEntries []struct { @@ -120,6 +127,8 @@ func splitString(str string) []string { } func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error { + ctx = addNetworkTag(ctx) + var allow, deny = make([]string, 0, len(si.AllowTypes)), make([]string, 0, len(si.DenyTypes)) if _, hasAlert := dbi.pathAlerts[si.ID]; dbi.alerting != nil && !hasAlert { @@ -312,6 +321,8 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri } func (dbi *DBIndex) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { + ctx = addNetworkTag(ctx) + retryWait := time.Millisecond * 20 retryReportHealth: _, err := dbi.harmonyDB.Exec(ctx, @@ -378,6 +389,7 @@ func (dbi *DBIndex) checkFileType(fileType storiface.SectorFileType) bool { } func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error { + ctx = addNetworkTag(ctx) if !dbi.checkFileType(ft) { return xerrors.Errorf("invalid filetype") @@ -678,6 +690,8 @@ func (dbi *DBIndex) StorageInfo(ctx context.Context, id storiface.ID) (storiface } func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, miner abi.ActorID) ([]storiface.StorageInfo, error) { + ctx = addNetworkTag(ctx) + var err error var spaceReq uint64 switch pathType { diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index ce58f148920..176562468ac 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -8,8 +8,11 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/tag" ) const ( @@ -51,6 +54,7 @@ func (ch *changeHandler) start() { } func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error { + ctx = addNetworkTag(ctx) // Get the current deadline period di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key()) if err != nil { @@ -210,6 +214,7 @@ func (p *proveHandler) run() { } func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { + ctx = addNetworkTag(ctx) // If the post window has expired, abort the current proof if p.current != nil && newTS.Height() >= p.current.di.Close { // Cancel the context on the current proof @@ -387,6 +392,7 @@ func (s *submitHandler) run() { // processHeadChange is called when the chain head changes func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) { + ctx = addNetworkTag(ctx) s.currentCtx = ctx s.currentTS = advance s.currentDI = di @@ -540,3 +546,8 @@ func NextDeadline(currentDeadline *dline.Info) *dline.Info { func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info { return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod(), miner.WPoStChallengeWindow(), miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff) } + +func addNetworkTag(ctx context.Context) context.Context { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Network, buildconstants.NetworkBundle)) + return ctx +}