Skip to content

Commit

Permalink
feat(shed): actor state diff stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 6, 2025
1 parent fc70735 commit 2ec2aa6
Showing 1 changed file with 81 additions and 12 deletions.
93 changes: 81 additions & 12 deletions cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ var statSnapshotCmd = &cli.Command{
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tipset",
Usage: "specify tipset to call method on (pass comma separated array of cids)",
Usage: "specify tipset to call method on (pass comma separated array of cids or @<height> to specify tipset by height)",
},
&cli.IntFlag{
Name: "workers",
Expand Down Expand Up @@ -618,11 +618,19 @@ The top level stats reported for an actor is computed independently of all field
accounting of the true size of the actor in the state datastore.
The calculation of these stats results in the actor state being traversed twice. The dag-cache-size flag can be used
to reduce the number of decode operations performed by caching the decoded object after first access.`,
to reduce the number of decode operations performed by caching the decoded object after first access.
When using the diff-tipset flag, the stats output will only include the mutated state between the two tipsets, not
the total state of the actor in either tipset.
`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tipset",
Usage: "specify tipset to call method on (pass comma separated array of cids)",
Usage: "specify tipset to call method on (pass comma separated array of cids or @<height> to specify tipset by height)",
},
&cli.StringFlag{
Name: "diff-tipset",
Usage: "specify tipset to diff against, stat output will include only the mutated state between the two tipsets (pass comma separated array of cids or @<height> to specify tipset by height)",
},
&cli.IntFlag{
Name: "workers",
Expand Down Expand Up @@ -688,12 +696,12 @@ to reduce the number of decode operations performed by caching the decoded objec
numWorkers := cctx.Int("workers")
dagCacheSize := cctx.Int("dag-cache-size")

eg, egctx := errgroup.WithContext(ctx)

jobs := make(chan address.Address, numWorkers)
results := make(chan actorStats, numWorkers)

worker := func(ctx context.Context, id int) error {
sc := &statCollector{}

worker := func(ctx context.Context, id int, ts *types.TipSet) error {
completed := 0
defer func() {
log.Infow("worker done", "id", id, "completed", completed)
Expand All @@ -720,7 +728,7 @@ to reduce the number of decode operations performed by caching the decoded objec
}
}

actStats, err := collectStats(ctx, addr, actor, dag)
actStats, err := sc.collectStats(ctx, addr, actor, dag)
if err != nil {
return err
}
Expand All @@ -738,20 +746,68 @@ to reduce the number of decode operations performed by caching the decoded objec
}
}

eg, egctx := errgroup.WithContext(ctx)
for w := 0; w < numWorkers; w++ {
id := w
eg.Go(func() error {
return worker(egctx, id)
return worker(egctx, id, ts)
})
}

done := make(chan struct{})
go func() {
defer close(jobs)
defer func() {
close(jobs)
close(done)
}()
for _, addr := range addrs {
jobs <- addr
}
}()

// if diff-tipset is set, we need to load the actors from the diff tipset and compare, so we'll
// discard the results for this run, then run the workers again with a new set of jobs and take
// the results from the second run which should just include the diff.

if diffTs := cctx.String("diff-tipset"); diffTs != "" {
// read and discard results
go func() {
for range results { // nolint:revive
}
}()

_ = eg.Wait()
log.Infow("done with first pass, starting diff")
close(results)

<-done

dts, err := lcli.ParseTipSetRef(ctx, tsr, diffTs)
if err != nil {
return err
}
// TODO: if anyone cares for the "all" case, re-load actors here
log.Infow("diff tipset", "parentstate", dts.ParentState())

jobs = make(chan address.Address, numWorkers)
results = make(chan actorStats, numWorkers)

eg, egctx = errgroup.WithContext(ctx)
for w := 0; w < numWorkers; w++ {
id := w
eg.Go(func() error {
return worker(egctx, id, dts)
})
}

go func() {
defer close(jobs)
for _, addr := range addrs {
jobs <- addr
}
}()
}

go func() {
// error is check later
_ = eg.Wait()
Expand Down Expand Up @@ -866,7 +922,12 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter,
return results, nil
}

func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
type statCollector struct {
rootCidSet *cid.Set
fieldCidSets map[string]*cid.Set
}

func (sc *statCollector) collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))

nd, err := dag.Get(ctx, actor.Head)
Expand Down Expand Up @@ -903,6 +964,14 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor,
}
}

if sc.rootCidSet == nil {
sc.rootCidSet = cid.NewSet()
sc.fieldCidSets = make(map[string]*cid.Set)
for _, field := range fields {
sc.fieldCidSets[field.Name] = cid.NewSet()
}
}

actStats := actorStats{
Address: addr,
Actor: actor,
Expand All @@ -913,7 +982,7 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor,
walk: carWalkFunc,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

Expand All @@ -925,7 +994,7 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor,
walk: carWalkFunc,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil {
if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, sc.fieldCidSets[field.Name].Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

Expand Down

0 comments on commit 2ec2aa6

Please sign in to comment.