Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
135192: ttljob: adjust logs; reset progress when resuming; add time-based limiter for progress updates r=rafiss a=rafiss

### ttljob: adjust fractionCompleted logs

This makes the logs added in 3cbe0b4 slightly more useful by adjusting the variable names, and showing the running count of number of spans processed.

### ttljob: reset progress when resuming job

The job starts from the beginning when it is paused/resumed, so we need
to update the fraction completed back to 0.

### ttljob: add time-based limiting for updating progress

There was a risk of a thundering herd of processors all updating progress in rapid
succession (which may happen if they all process spans that don't have any
rows to delete). This patch adds a check so that the progress is only
updated at most once every 60 seconds (with some added jitter).

informs: cockroachdb#86884
Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Nov 15, 2024
2 parents d72000b + 3512665 commit b5d75ef
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re
progress := md.Progress
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
rowLevelTTL.JobTotalSpanCount = int64(jobSpanCount)
rowLevelTTL.JobProcessedSpanCount = 0
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 0,
}
ju.UpdateProgress(progress)
return nil
},
Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/ttl/ttljob/ttljob_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"context"
"math"
"math/rand"
"runtime"
"sync/atomic"
"time"
Expand Down Expand Up @@ -169,14 +170,18 @@ func (t *ttlProcessor) work(ctx context.Context) error {
var spansProccessedSinceLastUpdate atomic.Int64
var rowsProccessedSinceLastUpdate atomic.Int64

// Update progress for approximately every 1% of spans processed.
// Update progress for approximately every 1% of spans processed, at least
// 60 seconds apart with jitter.
updateEvery := max(1, processorSpanCount/100)
updateEveryDuration := 60*time.Second + time.Duration(rand.Int63n(10*1000))*time.Millisecond
lastUpdated := timeutil.Now()
updateFractionCompleted := func() error {
jobID := ttlSpec.JobID
lastUpdated = timeutil.Now()
spansToAdd := spansProccessedSinceLastUpdate.Swap(0)
rowsToAdd := rowsProccessedSinceLastUpdate.Swap(0)

var jobRowCount, jobSpanCount int64
var deletedRowCount, processedSpanCount, totalSpanCount int64
var fractionCompleted float32

err := jobRegistry.UpdateJobWithTxn(
Expand All @@ -188,8 +193,9 @@ func (t *ttlProcessor) work(ctx context.Context) error {
rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
rowLevelTTL.JobProcessedSpanCount += spansToAdd
rowLevelTTL.JobDeletedRowCount += rowsToAdd
jobRowCount = rowLevelTTL.JobDeletedRowCount
jobSpanCount = rowLevelTTL.JobTotalSpanCount
deletedRowCount = rowLevelTTL.JobDeletedRowCount
processedSpanCount = rowLevelTTL.JobProcessedSpanCount
totalSpanCount = rowLevelTTL.JobTotalSpanCount

fractionCompleted = float32(rowLevelTTL.JobProcessedSpanCount) / float32(rowLevelTTL.JobTotalSpanCount)
progress.Progress = &jobspb.Progress_FractionCompleted{
Expand All @@ -206,8 +212,8 @@ func (t *ttlProcessor) work(ctx context.Context) error {
processorID := t.ProcessorID
log.Infof(
ctx,
"TTL fractionCompleted updated processorID=%d tableID=%d jobRowCount=%d jobSpanCount=%d fractionCompleted=%.3f",
processorID, tableID, jobRowCount, jobSpanCount, fractionCompleted,
"TTL fractionCompleted updated processorID=%d tableID=%d deletedRowCount=%d processedSpanCount=%d totalSpanCount=%d fractionCompleted=%.3f",
processorID, tableID, deletedRowCount, processedSpanCount, totalSpanCount, fractionCompleted,
)
return nil
}
Expand Down Expand Up @@ -291,7 +297,9 @@ func (t *ttlProcessor) work(ctx context.Context) error {
// count.
spansProccessedSinceLastUpdate.Add(1)
}
if spansProccessedSinceLastUpdate.Load() >= updateEvery {

if spansProccessedSinceLastUpdate.Load() >= updateEvery &&
timeutil.Since(lastUpdated) >= updateEveryDuration {
if err := updateFractionCompleted(); err != nil {
return err
}
Expand Down

0 comments on commit b5d75ef

Please sign in to comment.