From cb6d7f0bc736a6f9033b7ac829fd7733afdba1e2 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 7 Jan 2025 20:50:48 +1100 Subject: [PATCH] fixup! feat(shed): actor state diff stats --- cmd/lotus-shed/state-stats.go | 344 ++++++++++++++++++++++++++++++---- 1 file changed, 305 insertions(+), 39 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 453d89d3839..2e8d08d2a6e 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -18,17 +18,32 @@ import ( offline "github.com/ipfs/boxo/exchange/offline" "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" format "github.com/ipfs/go-ipld-format" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/node/bindnode" "github.com/ipld/go-ipld-prime/printer" + "github.com/ipld/go-ipld-prime/schema" + schemadmt "github.com/ipld/go-ipld-prime/schema/dmt" + schemadsl "github.com/ipld/go-ipld-prime/schema/dsl" + "github.com/ipld/go-ipld-prime/traversal" "github.com/urfave/cli/v2" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-amt-ipld/v4" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-hamt-ipld/v3" "github.com/filecoin-project/go-state-types/abi" gstactors "github.com/filecoin-project/go-state-types/actors" + gstbuiltin "github.com/filecoin-project/go-state-types/builtin" + miner16 "github.com/filecoin-project/go-state-types/builtin/v16/miner" + "github.com/filecoin-project/go-state-types/builtin/v16/util/adt" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" @@ -65,7 +80,9 @@ type fieldItem struct { type blockRepr struct { Cid cid.Cid + Size uint64 Representation string `json:",omitempty"` + KnownType string `json:",omitempty"` } type job struct { @@ -133,18 +150,44 @@ func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan return out } +type representationType string + +const ( + representationTypeNone representationType = "none" + representationTypeCid representationType = "cid" + representationTypeShort representationType = "short" + representationTypeFull representationType = "full" + representationTypeDagJson representationType = "dagjson" +) + +func parseRepresentationType(s string) (representationType, error) { + switch s { + case "", "none": + return representationTypeNone, nil + case "cid": + return representationTypeCid, nil + case "short": + return representationTypeShort, nil + case "full": + return representationTypeFull, nil + case "dagjson": + return representationTypeDagJson, nil + default: + return "", xerrors.Errorf("unknown representation type: %s", s) + } +} + type dagStatCollector struct { - ds format.NodeGetter - walk func(format.Node) ([]*format.Link, error) - collectCids bool - representBlocks bool + ds format.NodeGetter + walk func(format.Node) ([]*format.Link, error) + representationType representationType statsLk sync.Mutex stats api.ObjStat blocks []blockRepr } -func (dsc *dagStatCollector) record(c cid.Cid, nd format.Node) error { +func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error { size, err := nd.Size() if err != nil { return err @@ -155,15 +198,33 @@ func (dsc *dagStatCollector) record(c cid.Cid, nd format.Node) error { dsc.stats.Size = dsc.stats.Size + size dsc.stats.Links = dsc.stats.Links + 1 - if dsc.collectCids { - br := blockRepr{Cid: c} - if dsc.representBlocks { + if dsc.representationType != representationTypeNone { + br := blockRepr{Cid: nd.Cid(), Size: size} + + if dsc.representationType != representationTypeCid { node, err := ipld.Decode(nd.RawData(), dagcbor.Decode) if err != nil { return xerrors.Errorf("decoding node: %w", err) } - br.Representation = printer.Config{OmitScalarValues: true}.Sprint(node) + + switch dsc.representationType { + case representationTypeDagJson: + dj, err := ipld.Encode(node, dagjson.Encode) + if err != nil { + return xerrors.Errorf("encoding node to dag-json: %w", err) + } + br.Representation = string(dj) + case representationTypeShort, representationTypeFull: + br.Representation = printer.Config{OmitScalarValues: dsc.representationType == representationTypeShort}.Sprint(node) + } } + + if typ, err := matchKnownBlockType(ctx, nd); err != nil { + log.Warnf("failed to match block type: %s", err) + } else { + br.KnownType = typ + } + dsc.blocks = append(dsc.blocks, br) } @@ -176,7 +237,7 @@ func (dsc *dagStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*forma return nil, err } - if err := dsc.record(c, nd); err != nil { + if err := dsc.record(ctx, nd); err != nil { return nil, err } @@ -658,13 +719,9 @@ the total state of the actor in either tipset. Name: "diff-tipset", Usage: "specify tipset to diff against, stat output will include only the mutated state between the two tipsets (pass comma separated array of cids or @ to specify tipset by height)", }, - &cli.BoolFlag{ - Name: "list-blocks", - Usage: "list the CIDs of blocks in the stat set being processed, in the case of a diff-tipset this will be the blocks that are different between the two tipsets", - }, - &cli.BoolFlag{ - Name: "print-blocks", - Usage: "provide a human-readable representation of each of the blocks in the stat set being processed, implies --list-blocks", + &cli.StringFlag{ + Name: "show-blocks", + Usage: "show blocks as one of 'none', 'cid' a 'short' or 'full' representation of the block contents, or 'dagjson' to dump the full block contents as DAG-JSON. In the case of a diff-tipset this will be the blocks that are different between the two tipsets.", }, &cli.IntFlag{ Name: "workers", @@ -729,16 +786,18 @@ the total state of the actor in either tipset. numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") - printBlocks := cctx.Bool("print-blocks") - listBlocks := (printBlocks || cctx.Bool("list-blocks")) && !cctx.IsSet("diff-tipset") // if diff, don't list on first pass + reprType, err := parseRepresentationType(cctx.String("show-blocks")) + if err != nil { + return err + } + if cctx.IsSet("diff-tipset") { // if diff, don't list on first pass + reprType = representationTypeNone + } jobs := make(chan address.Address, numWorkers) results := make(chan actorStats, numWorkers) - sc := &statCollector{ - collectCids: listBlocks, - representBlocks: printBlocks, - } + sc := &statCollector{representationType: reprType} worker := func(ctx context.Context, id int, ts *types.TipSet) error { completed := 0 @@ -830,8 +889,8 @@ the total state of the actor in either tipset. jobs = make(chan address.Address, numWorkers) results = make(chan actorStats, numWorkers) - listBlocks = cctx.Bool("list-blocks") || printBlocks - sc.collectCids = listBlocks + reprType, _ := parseRepresentationType(cctx.String("show-blocks")) + sc.representationType = reprType eg, egctx = errgroup.WithContext(ctx) for w := 0; w < numWorkers; w++ { @@ -964,10 +1023,9 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, } type statCollector struct { - rootCidSet *cid.Set - collectCids bool - representBlocks bool - fieldCidSets map[string]*cid.Set + rootCidSet *cid.Set + representationType representationType + fieldCidSets map[string]*cid.Set } func (sc *statCollector) collectStats( @@ -1026,10 +1084,9 @@ func (sc *statCollector) collectStats( } dsc := &dagStatCollector{ - ds: dag, - walk: carWalkFunc, - collectCids: sc.collectCids, - representBlocks: sc.representBlocks, + ds: dag, + walk: carWalkFunc, + representationType: sc.representationType, } if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil { @@ -1037,16 +1094,15 @@ func (sc *statCollector) collectStats( } actStats.Stats = dsc.stats - if sc.collectCids { + if sc.representationType != representationTypeNone { actStats.Blocks = dsc.blocks } for _, field := range fields { dsc := &dagStatCollector{ - ds: dag, - walk: carWalkFunc, - collectCids: sc.collectCids, - representBlocks: sc.representBlocks, + ds: dag, + walk: carWalkFunc, + representationType: sc.representationType, } if err := merkledag.Walk(ctx, func(ctx context.Context, c cid.Cid) ([]*format.Link, error) { @@ -1057,7 +1113,7 @@ func (sc *statCollector) collectStats( } field.Stats = dsc.stats - if sc.collectCids { + if sc.representationType != representationTypeNone { field.Blocks = dsc.blocks } @@ -1117,3 +1173,213 @@ func DumpSnapshotStats(stats map[string]api.ObjStat) { fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, k, 10, sizeStr, 14, fmt.Sprintf("(%d)", stat.Size), 24, stat.Links) } } + +// matchKnownBlockType attempts to determine the type of a block by inspecting its bytes. First we +// attempt to decode it as part of a HAMT or AMT, and if we get one, we inspect the types of the +// values. Otherwise we attempt to decode it as a known type using matchKnownBlockTypeFromBytes. +func matchKnownBlockType(ctx context.Context, nd format.Node) (string, error) { + // block store with just one block in it, for interacting with the hamt and amt libraries + store := cbor.NewMemCborStore() + if err := store.(*cbor.BasicIpldStore).Blocks.Put(ctx, nd); err != nil { + return "", err + } + + // try to load as a HAMT root/node (they are the same thing) + if _, err := hamt.LoadNode(ctx, store, nd.Cid(), append(adt.DefaultHamtOptions, hamt.UseTreeBitWidth(gstbuiltin.DefaultHamtBitwidth))...); err == nil { + // got a HAMT, now inspect it + hamtNode, err := ipld.DecodeUsingPrototype(nd.RawData(), dagcbor.Decode, bindnode.Prototype(nil, knownTypeSystem.TypeByName("HamtNode"))) + if err != nil { + return "", xerrors.Errorf("failed to decode HamtNode: %w", err) + } + typ, err := matchHamtValues(hamtNode) + if err != nil { + return "", err + } + return fmt.Sprintf("HAMTNode{%d}%s", gstbuiltin.DefaultHamtBitwidth, typ), nil + } + + // try to load as an AMT root, we have to try all bitwidths used in the chain + for _, bitwidth := range []uint{2, 3, 4, 5, 6} { + if _, err := amt.LoadAMT(ctx, store, nd.Cid(), append(adt.DefaultAmtOptions, amt.UseTreeBitWidth(bitwidth))...); err == nil { + // got an AMT root, now inspect it + amtRoot, err := ipld.DecodeUsingPrototype(nd.RawData(), dagcbor.Decode, bindnode.Prototype(nil, knownTypeSystem.TypeByName("AmtRoot"))) + if err != nil { + return "", xerrors.Errorf("failed to decode AmtRoot: %w", err) + } + values, err := traversal.Get(amtRoot, datamodel.ParsePath("Node/Values")) + if err != nil { + return "", xerrors.Errorf("failed to get AmtRoot.Node.Values: %w", err) + } + typ, err := matchAmtValues(values) + if err != nil { + return "", err + } + return fmt.Sprintf("AMTRoot{%d}%s", bitwidth, typ), nil + } + } + + // try to load as an AMT intermediate node, which we can't do using the amt package so we'll + // infer by schema + if amtNode, err := ipld.DecodeUsingPrototype(nd.RawData(), dagcbor.Decode, bindnode.Prototype(nil, knownTypeSystem.TypeByName("AmtNode"))); err == nil { + // got an AMT node, now inspect it + values, err := amtNode.LookupByString("Values") + if err != nil { + return "", xerrors.Errorf("failed to get AmtNode.Values: %w", err) + } + typ, err := matchAmtValues(values) + if err != nil { + return "", err + } + return "AmtNode" + typ, nil + } + + return matchKnownBlockTypeFromBytes(nd.RawData()) +} + +// given a datamodel.Node form of the Values array within an AMT node, attempt to determine the +// type of the values by iterating through them all and checking from their bytes. +func matchAmtValues(values datamodel.Node) (string, error) { + var match string + itr := values.ListIterator() + for !itr.Done() { + _, v, err := itr.Next() + if err != nil { + return "", err + } + enc, err := ipld.Encode(v, dagcbor.Encode) + if err != nil { + return "", err + } + if m, _ := matchKnownBlockTypeFromBytes(enc); m != "" { + if match == "" { + match = m + } else if match != m { + return "", xerrors.Errorf("inconsistent types in AMT values") + } + } + } + if match != "" { + return "[" + match + "]", nil + } + return "", nil +} + +// given a datamodel.Node form of a HAMT node, attempt to determine the type of the values, if there +// are any, by iterating through them all and checking from their bytes. +func matchHamtValues(hamtNode datamodel.Node) (string, error) { + pointers, err := hamtNode.LookupByString("Pointers") + if err != nil { + return "", xerrors.Errorf("failed to get HamtNode.Pointers: %w", err) + } + var match string + itr := pointers.ListIterator() + for !itr.Done() { + _, v, err := itr.Next() + if err != nil { + return "", err + } + b, err := v.LookupByString("Bucket") + if err == nil { + bitr := b.ListIterator() + for !bitr.Done() { + _, kv, err := bitr.Next() + if err != nil { + return "", err + } + bval, err := kv.LookupByString("Value") + if err != nil { + return "", err + } + enc, err := ipld.Encode(bval, dagcbor.Encode) + if err != nil { + return "", err + } + if m, _ := matchKnownBlockTypeFromBytes(enc); m != "" { + if match == "" { + match = m + } else if match != m { + return "", xerrors.Errorf("inconsistent types in HAMT values") + } + } + } + } + } + if match != "" { + return "[" + match + "]", nil + } + return "", nil +} + +// matchKnownBlockTypeFromBytes attempts to determine the type of a block by inspecting its bytes. +// We use a fixed list of known types that have a CBORUnmarshaler that we believe may be possible. +// This list is not exhaustive and should be expanded as unknown types are encountered. +func matchKnownBlockTypeFromBytes(b []byte) (string, error) { + if _, err := cbg.ReadCid(bytes.NewReader(b)); err == nil { + return "Cid", nil + } + known := map[string]cbg.CBORUnmarshaler{ + // Fill this out with known types when you see them missing and can identify them + "BlockHeader": &types.BlockHeader{}, + "miner16.State": &miner16.State{}, + "miner16.MinerInfo": &miner16.MinerInfo{}, + "miner16.Deadlines": &miner16.Deadlines{}, + "miner16.Deadline": &miner16.Deadline{}, + "miner16.Partition": &miner16.Partition{}, + "miner16.ExpirationSet": &miner16.ExpirationSet{}, + "miner16.WindowedPoSt": &miner16.WindowedPoSt{}, + "miner16.SectorOnChainInfo": &miner16.SectorOnChainInfo{}, + "miner16.SectorPreCommitOnChainInfo": &miner16.SectorPreCommitOnChainInfo{}, + "Bitfield": &bitfield.BitField{}, + } + for name, v := range known { + if err := v.UnmarshalCBOR(bytes.NewReader(b)); err == nil { + return name, nil + } + } + return "", nil +} + +const knownTypesSchema = ` +type HamtNode struct { + Bitfield Bytes + Pointers [Pointer] +} representation tuple + +type Pointer union { + | Any link # link to HamtNode + | Bucket list +} representation kinded + +type Bucket [KV] + +type KV struct { + Key Bytes + Value Any +} representation tuple + +type AmtNode struct { + Bmap Bytes + Links [Link] + Values [Any] +} representation tuple + +type AmtRoot struct { + BitWidth Int + Height Int + Count Int + Node AmtNode +} representation tuple +` + +var knownTypeSystem schema.TypeSystem + +func init() { + sch, err := schemadsl.ParseBytes([]byte(knownTypesSchema)) + if err != nil { + panic(err) + } + knownTypeSystem.Init() + if err := schemadmt.Compile(&knownTypeSystem, sch); err != nil { + panic(err) + } +}