Skip to content

Commit

Permalink
sequencer: Improve visibility of long-running background tasks
Browse files Browse the repository at this point in the history
This change corrects a context-break, where callbacks executed by the key
scheduler were executed using a context from the scheduler's workgroup. In
cases where the lease loop recycles and the target database is executing
slowly, the old queries were not guaranteed to have been canceled when
the next iteration of the loop starts.

Replicator will now exit if a stuck task is detected by the lease loop since
there is no other way to guarantee that the task would not continue at some
arbitrary point in the future.

This commit depends on cockroachdb/field-eng-powertools#4
  • Loading branch information
bobvawter committed Aug 29, 2024
1 parent 26514ef commit 644ad7c
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cockroachdb/apd v1.1.0
github.com/cockroachdb/crlfmt v0.3.0
github.com/cockroachdb/datadriven v1.0.2
github.com/cockroachdb/field-eng-powertools v0.1.1
github.com/cockroachdb/field-eng-powertools v0.0.0-20240829142217-c680a7021280
github.com/dop251/goja v0.0.0-20230919151941-fc55792775de
github.com/evanw/esbuild v0.23.1
github.com/go-mysql-org/go-mysql v1.8.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ github.com/cockroachdb/crlfmt v0.3.0 h1:IaPIlidTHn7s493ozBcpnkF/HNzCNe7aBJmUMqgI
github.com/cockroachdb/crlfmt v0.3.0/go.mod h1:iRZvVcb8vDQK4dh64mPRZBucM85Xd/VLTE0cmO95/Kk=
github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA=
github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240828183038-0bbf63872b77 h1:7sffr3Hxd+2uNv1vAbdAgjyD/dXWxSNBBkyDGfE564w=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240828183038-0bbf63872b77/go.mod h1:DXZPzzi9pGluoaYnp6Nn9So+XGGcvgry8juo6hlfRi8=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240829142217-c680a7021280 h1:NU+dfZLij19FSZvr3E4MESM168MAvyHSz/UzBKbFc2o=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240829142217-c680a7021280/go.mod h1:DXZPzzi9pGluoaYnp6Nn9So+XGGcvgry8juo6hlfRi8=
github.com/cockroachdb/field-eng-powertools v0.1.1 h1:xF8Rb8c9oD55W0BE3yIDWYmF1U2RcV3BJ+pzNEKVpLQ=
github.com/cockroachdb/field-eng-powertools v0.1.1/go.mod h1:qAoRYmqNSHeI44wOoW4LvXnzRySbB2ocTXpd22OQ5aA=
github.com/cockroachdb/gostdlib v1.19.0 h1:cSISxkVnTlWhTkyple/T6NXzOi5659FkhxvUgZv+Eb0=
Expand Down
4 changes: 2 additions & 2 deletions internal/sequencer/besteffort/best_effort.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (s *bestEffort) Start(
ctx *stopper.Context, opts *sequencer.StartOptions,
) (types.MultiAcceptor, *notify.Var[sequencer.Stat], error) {
stats := notify.VarOf(sequencer.NewStat(opts.Group, &ident.TableMap[hlc.Range]{}))

sequtil.LeaseGroup(ctx, s.leases, opts.Group, func(ctx *stopper.Context, group *types.TableGroup) {
grace := s.cfg.TaskGracePeriod
sequtil.LeaseGroup(ctx, s.leases, grace, opts.Group, func(ctx *stopper.Context, group *types.TableGroup) {
for _, table := range opts.Group.Tables {
table := table // Capture.

Expand Down
7 changes: 7 additions & 0 deletions internal/sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const AssumeIdempotent = "assumeIdempotent"
const (
DefaultFlushPeriod = 1 * time.Second
DefaultFlushSize = 1_000
DefaultTaskGracePeriod = time.Minute
DefaultParallelism = 16
DefaultQuiescentPeriod = 10 * time.Second
DefaultRetireOffset = 24 * time.Hour
Expand All @@ -47,6 +48,7 @@ type Config struct {
QuiescentPeriod time.Duration // How often to sweep for queued mutations.
RetireOffset time.Duration // Delay removal of applied mutations.
ScanSize int // Limit on staging-table read queries.
TaskGracePeriod time.Duration // How long to allow previous iteration to clean up.
TimestampLimit int // The maximum number of timestamps to operate on.
}

Expand All @@ -66,6 +68,8 @@ func (c *Config) Bind(flags *pflag.FlagSet) {
"delay removal of applied mutations")
flags.IntVar(&c.ScanSize, "scanSize", DefaultScanSize,
"the number of rows to retrieve from staging")
flags.DurationVar(&c.TaskGracePeriod, "taskGracePeriod", DefaultTaskGracePeriod,
"how long to allow for task cleanup when recovering from errors")
flags.IntVar(&c.TimestampLimit, "timestampLimit", DefaultTimestampLimit,
"the maximum number of source timestamps to coalesce into a target transaction")
}
Expand All @@ -91,6 +95,9 @@ func (c *Config) Preflight() error {
if c.ScanSize == 0 {
c.ScanSize = DefaultScanSize
}
if c.TaskGracePeriod <= 0 {
c.TaskGracePeriod = DefaultTaskGracePeriod
}
if c.TimestampLimit <= 0 {
c.TimestampLimit = DefaultTimestampLimit
}
Expand Down
4 changes: 2 additions & 2 deletions internal/sequencer/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func (s *Core) Start(
ctx *stopper.Context, opts *sequencer.StartOptions,
) (types.MultiAcceptor, *notify.Var[sequencer.Stat], error) {
progress := notify.VarOf(sequencer.NewStat(opts.Group, &ident.TableMap[hlc.Range]{}))

grace := s.cfg.TaskGracePeriod
// Acquire a lease on the group name to prevent multiple sweepers
// from operating.
sequtil.LeaseGroup(ctx, s.leases, opts.Group, func(ctx *stopper.Context, group *types.TableGroup) {
sequtil.LeaseGroup(ctx, s.leases, grace, opts.Group, func(ctx *stopper.Context, group *types.TableGroup) {
// Report which instance of Replicator is processing the tables within the group.
activeGauges := make([]prometheus.Gauge, len(group.Tables))
for idx, tbl := range group.Tables {
Expand Down
13 changes: 11 additions & 2 deletions internal/sequencer/core/round.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
Expand Down Expand Up @@ -78,10 +79,10 @@ func (r *round) accumulate(segment *types.MultiBatch) error {

// scheduleCommit handles the error-retry logic around tryCommit.
func (r *round) scheduleCommit(
ctx context.Context, progressReport chan<- hlc.Range,
ctx *stopper.Context, progressReport chan<- hlc.Range,
) lockset.Outcome {
start := time.Now()
return r.Core.scheduler.Batch(r.batch, func() error {
work := func(ctx *stopper.Context) error {
// We want to close the reporting channel, unless we're asking
// to be retried.
finalReport := true
Expand Down Expand Up @@ -130,6 +131,14 @@ func (r *round) scheduleCommit(
// General error case: poison the keys.
r.poisoned.MarkPoisoned(r.batch)
return err
}

// We want to make this asynchronous work visible to any enclosing
// stoppers. This allows other components to be aware of and wait
// for background tasks like (relatively) long-running database
// queries to complete.
return r.Core.scheduler.Batch(r.batch, func() error {
return ctx.Call(work)
})
}

Expand Down
79 changes: 58 additions & 21 deletions internal/sequencer/sequtil/lease_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package sequtil
import (
"context"
"fmt"
"runtime"
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
Expand All @@ -34,44 +35,80 @@ import (
func LeaseGroup(
outer *stopper.Context,
leases types.Leases,
gracePeriod time.Duration,
group *types.TableGroup,
fn func(*stopper.Context, *types.TableGroup),
) {
entry := log.WithFields(log.Fields{
"enclosing": group.Enclosing,
"name": group.Name,
"tables": group.Tables,
})

// Start a goroutine in the outer context.
outer.Go(func(outer *stopper.Context) error {
// Run this in a loop in case of non-renewal. This is likely
// caused by database overload or any other case where we can't
// run SQL in a timely fashion.
for !outer.IsStopping() {
entry.Trace("waiting to acquire lease group")
// Acquire a lease.
leases.Singleton(outer, fmt.Sprintf("sequtil.Lease.%s", group.Name),
func(leaseContext context.Context) error {
log.Tracef("acquired gloabal lease for %s", group.Name)
defer log.Tracef("lost global lease for %s", group.Name)
entry.Debug("acquired lease group")
defer entry.Debug("released lease group")

for {
// Create a nested stopper whose lifetime is bound
// to that of the lease.
sub := stopper.WithContext(leaseContext)

// Allow the stopper chain to track this task.
_ = sub.Call(func(ctx *stopper.Context) error {
fn(ctx, group)
return nil
})

// Shut down the nested stopper. The call to
// Stop() is non-blocking.
entry.Debugf("stopping; waiting for %d tasks to complete", sub.Len())
sub.Stop(gracePeriod)

select {
case <-sub.Done(): // All task goroutines have exited.
case <-time.After(gracePeriod):
// If we have a stuck task, there's not much
// we can do other than to kill the process.
// Any caller waiting for the parent stopper
// to became done would also be blocked on
// waiting for the stuck task. We'll include
// a complete stack dump to help with
// diagnostics.
buf := make([]byte, 1024*1024)
buf = buf[:runtime.Stack(buf, true)]
log.WithFields(log.Fields{
"stack": string(buf),
"truncated": len(buf) == cap(buf),
}).Fatalf("background task stuck after %s", gracePeriod)
}

// Create a nested stopper whose lifetime is bound
// to that of the lease.
sub := stopper.WithContext(leaseContext)
// If the outer context is being shut down,
// release the lease.
if outer.IsStopping() {
entry.Trace("clean shutdown")
return types.ErrCancelSingleton
}

// Execute the callback from a goroutine. Tear down
// the stopper once the main callback has exited.
sub.Go(func(sub *stopper.Context) error {
defer sub.Stop(time.Second)
fn(sub, group)
return nil
})
// If the lease was canceled, return.
if err := leaseContext.Err(); err != nil {
return err
}

select {
case <-sub.Stopping():
// Defer release until all work has stopped.
// This avoids spammy cancellation errors.
<-sub.Done()
return types.ErrCancelSingleton
case <-sub.Done():
// The lease has expired, we'll just exit.
return sub.Err()
// We still hold the lease.
entry.Debug("restarting")
}
})
return nil
}
return nil
})
Expand Down

0 comments on commit 644ad7c

Please sign in to comment.