From 2ec2aa6855ea980f0ff6198fee4a3aaba20c602d Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 6 Jan 2025 16:07:07 +1100 Subject: [PATCH] feat(shed): actor state diff stats --- cmd/lotus-shed/state-stats.go | 93 ++++++++++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 12 deletions(-) diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 88b21f4076..4f6c69cdd4 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -360,7 +360,7 @@ var statSnapshotCmd = &cli.Command{ Flags: []cli.Flag{ &cli.StringFlag{ Name: "tipset", - Usage: "specify tipset to call method on (pass comma separated array of cids)", + Usage: "specify tipset to call method on (pass comma separated array of cids or @ to specify tipset by height)", }, &cli.IntFlag{ Name: "workers", @@ -618,11 +618,19 @@ The top level stats reported for an actor is computed independently of all field accounting of the true size of the actor in the state datastore. The calculation of these stats results in the actor state being traversed twice. The dag-cache-size flag can be used -to reduce the number of decode operations performed by caching the decoded object after first access.`, +to reduce the number of decode operations performed by caching the decoded object after first access. + +When using the diff-tipset flag, the stats output will only include the mutated state between the two tipsets, not +the total state of the actor in either tipset. +`, Flags: []cli.Flag{ &cli.StringFlag{ Name: "tipset", - Usage: "specify tipset to call method on (pass comma separated array of cids)", + Usage: "specify tipset to call method on (pass comma separated array of cids or @ to specify tipset by height)", + }, + &cli.StringFlag{ + Name: "diff-tipset", + Usage: "specify tipset to diff against, stat output will include only the mutated state between the two tipsets (pass comma separated array of cids or @ to specify tipset by height)", }, &cli.IntFlag{ Name: "workers", @@ -688,12 +696,12 @@ to reduce the number of decode operations performed by caching the decoded objec numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") - eg, egctx := errgroup.WithContext(ctx) - jobs := make(chan address.Address, numWorkers) results := make(chan actorStats, numWorkers) - worker := func(ctx context.Context, id int) error { + sc := &statCollector{} + + worker := func(ctx context.Context, id int, ts *types.TipSet) error { completed := 0 defer func() { log.Infow("worker done", "id", id, "completed", completed) @@ -720,7 +728,7 @@ to reduce the number of decode operations performed by caching the decoded objec } } - actStats, err := collectStats(ctx, addr, actor, dag) + actStats, err := sc.collectStats(ctx, addr, actor, dag) if err != nil { return err } @@ -738,20 +746,68 @@ to reduce the number of decode operations performed by caching the decoded objec } } + eg, egctx := errgroup.WithContext(ctx) for w := 0; w < numWorkers; w++ { id := w eg.Go(func() error { - return worker(egctx, id) + return worker(egctx, id, ts) }) } + done := make(chan struct{}) go func() { - defer close(jobs) + defer func() { + close(jobs) + close(done) + }() for _, addr := range addrs { jobs <- addr } }() + // if diff-tipset is set, we need to load the actors from the diff tipset and compare, so we'll + // discard the results for this run, then run the workers again with a new set of jobs and take + // the results from the second run which should just include the diff. + + if diffTs := cctx.String("diff-tipset"); diffTs != "" { + // read and discard results + go func() { + for range results { // nolint:revive + } + }() + + _ = eg.Wait() + log.Infow("done with first pass, starting diff") + close(results) + + <-done + + dts, err := lcli.ParseTipSetRef(ctx, tsr, diffTs) + if err != nil { + return err + } + // TODO: if anyone cares for the "all" case, re-load actors here + log.Infow("diff tipset", "parentstate", dts.ParentState()) + + jobs = make(chan address.Address, numWorkers) + results = make(chan actorStats, numWorkers) + + eg, egctx = errgroup.WithContext(ctx) + for w := 0; w < numWorkers; w++ { + id := w + eg.Go(func() error { + return worker(egctx, id, dts) + }) + } + + go func() { + defer close(jobs) + for _, addr := range addrs { + jobs <- addr + } + }() + } + go func() { // error is check later _ = eg.Wait() @@ -866,7 +922,12 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, return results, nil } -func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { +type statCollector struct { + rootCidSet *cid.Set + fieldCidSets map[string]*cid.Set +} + +func (sc *statCollector) collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) nd, err := dag.Get(ctx, actor.Head) @@ -903,6 +964,14 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, } } + if sc.rootCidSet == nil { + sc.rootCidSet = cid.NewSet() + sc.fieldCidSets = make(map[string]*cid.Set) + for _, field := range fields { + sc.fieldCidSets[field.Name] = cid.NewSet() + } + } + actStats := actorStats{ Address: addr, Actor: actor, @@ -913,7 +982,7 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, walk: carWalkFunc, } - if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil { return actorStats{}, err } @@ -925,7 +994,7 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, walk: carWalkFunc, } - if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, sc.fieldCidSets[field.Name].Visit, merkledag.Concurrent()); err != nil { return actorStats{}, err }