Skip to content

Commit

Permalink
Merge pull request #1947 from josephschorr/small-rev-improvements
Browse files Browse the repository at this point in the history
Small optimized revision handling improvements
  • Loading branch information
vroldanbet authored Jun 18, 2024
2 parents b4b78c1 + 37c79f5 commit 0b73cf2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
8 changes: 3 additions & 5 deletions internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func NewSingleflightDatastoreProxy(d datastore.Datastore) datastore.Datastore {

type singleflightProxy struct {
headRevGroup singleflight.Group[string, datastore.Revision]
optRevGroup singleflight.Group[string, datastore.Revision]
checkRevGroup singleflight.Group[string, string]
statsGroup singleflight.Group[string, datastore.Stats]
delegate datastore.Datastore
Expand All @@ -34,10 +33,9 @@ func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserF
}

func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
rev, _, err := p.optRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.Revision, error) {
return p.delegate.OptimizedRevision(ctx)
})
return rev, err
// NOTE: Optimized revisions are singleflighted by the underlying datastore via the
// CachedOptimizedRevisions struct.
return p.delegate.OptimizedRevision(ctx)
}

func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error {
Expand Down
16 changes: 8 additions & 8 deletions internal/datastore/revisions/optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
adjustedNow = localNow.Add(-1 * time.Duration(rand.Int63n(cor.maxRevisionStaleness.Nanoseconds())) * time.Nanosecond)
}

cor.Lock()
cor.RLock()
for _, candidate := range cor.candidates {
if candidate.validThrough.After(adjustedNow) {
cor.RUnlock()
log.Ctx(ctx).Debug().Time("now", localNow).Time("valid", candidate.validThrough).Msg("returning cached revision")
span.AddEvent("returning cached revision")
cor.Unlock()
return candidate.revision, nil
}
}
cor.Unlock()
cor.RUnlock()

newQuantizedRevision, err, _ := cor.updateGroup.Do("", func() (interface{}, error) {
log.Ctx(ctx).Debug().Time("now", localNow).Msg("computing new revision")
Expand All @@ -72,10 +72,9 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
}

rvt := localNow.Add(validFor)
cor.Lock()
defer cor.Unlock()

// Prune the candidates that have definitely expired
cor.Lock()
var numToDrop uint
for _, candidate := range cor.candidates {
if candidate.validThrough.Add(cor.maxRevisionStaleness).Before(localNow) {
Expand All @@ -84,11 +83,12 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
break
}
}
cor.candidates = cor.candidates[numToDrop:]

cor.candidates = cor.candidates[numToDrop:]
cor.candidates = append(cor.candidates, validRevision{optimized, rvt})
log.Ctx(ctx).Debug().Time("now", localNow).Time("valid", rvt).Stringer("validFor", validFor).Msg("setting valid through")
cor.Unlock()

log.Ctx(ctx).Debug().Time("now", localNow).Time("valid", rvt).Stringer("validFor", validFor).Msg("setting valid through")
return optimized, nil
})
if err != nil {
Expand All @@ -99,7 +99,7 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat

// CachedOptimizedRevisions does caching and deduplication for requests for optimized revisions.
type CachedOptimizedRevisions struct {
sync.Mutex
sync.RWMutex

maxRevisionStaleness time.Duration
optimizedFunc OptimizedRevisionFunction
Expand Down

0 comments on commit 0b73cf2

Please sign in to comment.