Skip to content

Commit

Permalink
sequencer: Improve visibility of long-running background tasks
Browse files Browse the repository at this point in the history
This change corrects a context-break, where callbacks executed by the key
scheduler were executed using a context from the scheduler's workgroup. In
cases where the lease loop recycles and the target database is executing
slowly, the old queries were not guaranteed to have been canceled when
the next iteration of the loop starts.

This commit depends on cockroachdb/field-eng-powertools#4
  • Loading branch information
bobvawter committed Aug 29, 2024
1 parent 4778a93 commit e7fd574
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cockroachdb/apd v1.1.0
github.com/cockroachdb/crlfmt v0.3.0
github.com/cockroachdb/datadriven v1.0.2
github.com/cockroachdb/field-eng-powertools v0.1.1
github.com/cockroachdb/field-eng-powertools v0.0.0-20240829142217-c680a7021280
github.com/dop251/goja v0.0.0-20230919151941-fc55792775de
github.com/evanw/esbuild v0.23.1
github.com/go-mysql-org/go-mysql v1.8.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ github.com/cockroachdb/crlfmt v0.3.0 h1:IaPIlidTHn7s493ozBcpnkF/HNzCNe7aBJmUMqgI
github.com/cockroachdb/crlfmt v0.3.0/go.mod h1:iRZvVcb8vDQK4dh64mPRZBucM85Xd/VLTE0cmO95/Kk=
github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA=
github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240828183038-0bbf63872b77 h1:7sffr3Hxd+2uNv1vAbdAgjyD/dXWxSNBBkyDGfE564w=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240828183038-0bbf63872b77/go.mod h1:DXZPzzi9pGluoaYnp6Nn9So+XGGcvgry8juo6hlfRi8=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240829142217-c680a7021280 h1:NU+dfZLij19FSZvr3E4MESM168MAvyHSz/UzBKbFc2o=
github.com/cockroachdb/field-eng-powertools v0.0.0-20240829142217-c680a7021280/go.mod h1:DXZPzzi9pGluoaYnp6Nn9So+XGGcvgry8juo6hlfRi8=
github.com/cockroachdb/field-eng-powertools v0.1.1 h1:xF8Rb8c9oD55W0BE3yIDWYmF1U2RcV3BJ+pzNEKVpLQ=
github.com/cockroachdb/field-eng-powertools v0.1.1/go.mod h1:qAoRYmqNSHeI44wOoW4LvXnzRySbB2ocTXpd22OQ5aA=
github.com/cockroachdb/gostdlib v1.19.0 h1:cSISxkVnTlWhTkyple/T6NXzOi5659FkhxvUgZv+Eb0=
Expand Down
13 changes: 11 additions & 2 deletions internal/sequencer/core/round.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
Expand Down Expand Up @@ -78,10 +79,10 @@ func (r *round) accumulate(segment *types.MultiBatch) error {

// scheduleCommit handles the error-retry logic around tryCommit.
func (r *round) scheduleCommit(
ctx context.Context, progressReport chan<- hlc.Range,
ctx *stopper.Context, progressReport chan<- hlc.Range,
) lockset.Outcome {
start := time.Now()
return r.Core.scheduler.Batch(r.batch, func() error {
work := func(ctx *stopper.Context) error {
// We want to close the reporting channel, unless we're asking
// to be retried.
finalReport := true
Expand Down Expand Up @@ -130,6 +131,14 @@ func (r *round) scheduleCommit(
// General error case: poison the keys.
r.poisoned.MarkPoisoned(r.batch)
return err
}

// We want to make this asynchronous work visible to any enclosing
// stoppers. This allows other components to be aware of and wait
// for background tasks like (relatively) long-running database
// queries to complete.
return r.Core.scheduler.Batch(r.batch, func() error {
return ctx.Call(work)
})
}

Expand Down
56 changes: 38 additions & 18 deletions internal/sequencer/sequtil/lease_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,61 @@ func LeaseGroup(
group *types.TableGroup,
fn func(*stopper.Context, *types.TableGroup),
) {
entry := log.WithFields(log.Fields{
"enclosing": group.Enclosing,
"name": group.Name,
"tables": group.Tables,
})

// Start a goroutine in the outer context.
outer.Go(func(outer *stopper.Context) error {
// Run this in a loop in case of non-renewal. This is likely
// caused by database overload or any other case where we can't
// run SQL in a timely fashion.
for !outer.IsStopping() {
entry.Trace("waiting to acquire lease group")
// Acquire a lease.
leases.Singleton(outer, fmt.Sprintf("sequtil.Lease.%s", group.Name),
func(leaseContext context.Context) error {
log.Tracef("acquired gloabal lease for %s", group.Name)
defer log.Tracef("lost global lease for %s", group.Name)
entry.Debug("acquired lease group")
defer entry.Debug("released lease group")

for {
// Create a nested stopper whose lifetime is bound
// to that of the lease.
sub := stopper.WithContext(leaseContext)

// Create a nested stopper whose lifetime is bound
// to that of the lease.
sub := stopper.WithContext(leaseContext)
// Allow the stopper chain to track this task.
_ = sub.Call(func(ctx *stopper.Context) error {
fn(ctx, group)
return nil
})

// Execute the callback from a goroutine. Tear down
// the stopper once the main callback has exited.
sub.Go(func(sub *stopper.Context) error {
defer sub.Stop(time.Second)
fn(sub, group)
return nil
})
// Shut down the nested stopper.
entry.Debugf("stopping; waiting for %d tasks to complete", sub.Len())
sub.Stop(time.Minute)

select {
case <-sub.Stopping():
// Defer release until all work has stopped.
// This avoids spammy cancellation errors.
<-sub.Done()
return types.ErrCancelSingleton
case <-sub.Done():
// The lease has expired, we'll just exit.
return sub.Err()

// If the outer context is being shut down,
// release the lease.
if outer.IsStopping() {
entry.Trace("clean shutdown")
return types.ErrCancelSingleton
}

// If the lease was canceled, return.
if err := leaseContext.Err(); err != nil {
return err
}

// We still hold the lease.
entry.Debug("restarting")
}
})
return nil
}
return nil
})
Expand Down

0 comments on commit e7fd574

Please sign in to comment.