Skip to content

Commit

Permalink
[3.1.12] Restore reverted stats (#7353)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbrks authored Feb 5, 2025
1 parent 6e045be commit afacff6
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
24 changes: 19 additions & 5 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ type CacheStats struct {
NonMobileIgnoredCount *SgwIntStat `json:"non_mobile_ignored_count"`
// The total number of active channels.
NumActiveChannels *SgwIntStat `json:"num_active_channels"`
// The total number of skipped sequences.
// The total number of skipped sequences. This is a cumulative value.
NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"`
// The total number of pending sequences. These are out-of-sequence entries waiting to be cached.
PendingSeqLen *SgwIntStat `json:"pending_seq_len"`
Expand All @@ -393,8 +393,12 @@ type CacheStats struct {
RevisionCacheHits *SgwIntStat `json:"rev_cache_hits"`
// The total number of revision cache misses.
RevisionCacheMisses *SgwIntStat `json:"rev_cache_misses"`
// The current length of the pending skipped sequence queue.
SkippedSeqLen *SgwIntStat `json:"skipped_seq_len"`
// Deprecated: The current length of the pending skipped sequence queue. This is equivalent to NumCurrentSeqsSkipped.
DeprecatedSkippedSeqLen *SgwIntStat `json:"skipped_seq_len"`
// Deprecated: DeprecatedSkippedSeqCap UNUSED
DeprecatedSkippedSeqCap *SgwIntStat `json:"skipped_seq_cap"`
// NumCurrentSeqsSkipped is the number of items in the pending skipped sequence queue. This used to be DeprecatedSkippedSeqLen.
NumCurrentSeqsSkipped *SgwIntStat `json:"current_skipped_seq_count"`
// The total view_queries.
ViewQueries *SgwIntStat `json:"view_queries"`
}
Expand Down Expand Up @@ -1144,7 +1148,15 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.SkippedSeqLen, err = NewIntStat(SubsystemCacheKey, "skipped_seq_len", labelKeys, labelVals, prometheus.GaugeValue, 0)
resUtil.DeprecatedSkippedSeqLen, err = NewIntStat(SubsystemCacheKey, "skipped_seq_len", labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.DeprecatedSkippedSeqCap, err = NewIntStat(SubsystemCacheKey, "skipped_seq_cap", labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.NumCurrentSeqsSkipped, err = NewIntStat(SubsystemCacheKey, "current_skipped_seq_count", labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -1178,12 +1190,14 @@ func (d *DbStats) unregisterCacheStats() {
prometheus.Unregister(d.CacheStats.NonMobileIgnoredCount)
prometheus.Unregister(d.CacheStats.NumActiveChannels)
prometheus.Unregister(d.CacheStats.NumSkippedSeqs)
prometheus.Unregister(d.CacheStats.NumCurrentSeqsSkipped)
prometheus.Unregister(d.CacheStats.PendingSeqLen)
prometheus.Unregister(d.CacheStats.RevisionCacheBypass)
prometheus.Unregister(d.CacheStats.RevisionCacheHits)
prometheus.Unregister(d.CacheStats.RevisionCacheMisses)
prometheus.Unregister(d.CacheStats.SkippedSeqLen)
prometheus.Unregister(d.CacheStats.ViewQueries)
prometheus.Unregister(d.CacheStats.DeprecatedSkippedSeqLen) //nolint
prometheus.Unregister(d.CacheStats.DeprecatedSkippedSeqCap) //nolint
}

func (d *DbStats) Cache() *CacheStats {
Expand Down
10 changes: 6 additions & 4 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error {
c.db.DbStats.Cache().AbandonedSeqs.Add(numRemoved)

base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Not Found:%d for database %s.", len(oldSkippedSequences), base.MD(c.db.Name))
oldSkippedSequences = nil
return nil
}

Expand Down Expand Up @@ -880,14 +879,16 @@ func (h *LogPriorityQueue) Pop() interface{} {

func (c *changeCache) RemoveSkipped(x uint64) error {
err := c.skippedSeqs.Remove(x)
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(int64(c.skippedSeqs.skippedList.Len()))
c.db.DbStats.Cache().DeprecatedSkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) //nolint
return err
}

// Removes a set of sequences. Logs warning on removal error, returns count of successfully removed.
func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) (removedCount int64) {
numRemoved := c.skippedSeqs.RemoveSequences(ctx, sequences)
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(int64(c.skippedSeqs.skippedList.Len()))
c.db.DbStats.Cache().DeprecatedSkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) //nolint //nolint
return numRemoved
}

Expand All @@ -901,7 +902,8 @@ func (c *changeCache) PushSkipped(ctx context.Context, sequence uint64) {
base.InfofCtx(ctx, base.KeyCache, "Error pushing skipped sequence: %d, %v", sequence, err)
return
}
c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len()))
c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(int64(c.skippedSeqs.skippedList.Len()))
c.db.DbStats.Cache().DeprecatedSkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) //nolint
}

func (c *changeCache) GetSkippedSequencesOlderThanMaxWait() (oldSequences []uint64) {
Expand Down
10 changes: 7 additions & 3 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ func TestStopChangeCache(t *testing.T) {
WriteDirect(t, db, []string{"ABC"}, 3)

// Artificially add 3 skipped, and back date skipped entry by 2 hours to trigger attempted view retrieval during Clean call
timeAdded := time.Now().Add(time.Duration(time.Hour * -2))
timeAdded := time.Now().Add(time.Hour * -2)
err := db.changeCache.skippedSeqs.Push(&SkippedSequence{3, timeAdded.Unix()})
require.NoError(t, err)

Expand Down Expand Up @@ -1395,16 +1395,20 @@ func TestSkippedSequenceCompaction(t *testing.T) {

// assert that length is 1
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, int64(1), dbContext.DbStats.Cache().SkippedSeqLen.Value())
assert.Equal(c, int64(1), dbContext.DbStats.Cache().NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(1), dbContext.DbStats.Cache().DeprecatedSkippedSeqLen.Value()) //nolint
assert.Equal(c, int64(0), dbContext.DbStats.Cache().DeprecatedSkippedSeqCap.Value()) //nolint
}, time.Second*10, time.Millisecond*100)

// run clean skipped sequence task
require.NoError(t, testChangeCache.CleanSkippedSequenceQueue(ctx))

// assert that the above skipped sequence (4) is remains in list, abandoned sequences should still be 1
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, int64(1), dbContext.DbStats.Cache().SkippedSeqLen.Value())
assert.Equal(c, int64(1), dbContext.DbStats.Cache().NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(1), dbContext.DbStats.Cache().DeprecatedSkippedSeqLen.Value()) //nolint
assert.Equal(c, int64(1), dbContext.DbStats.Cache().AbandonedSeqs.Value())
assert.Equal(c, int64(0), dbContext.DbStats.Cache().DeprecatedSkippedSeqCap.Value()) //nolint
}, time.Second*10, time.Millisecond*100)

// assert that sequence 4 still exists on list
Expand Down
14 changes: 7 additions & 7 deletions rest/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,16 @@ func TestJumpInSequencesAtAllocatorSkippedSequenceFill(t *testing.T) {
// wait for value to move from pending to cache and skipped list to fill
require.EventuallyWithT(t, func(c *assert.CollectT) {
rt.GetDatabase().UpdateCalculatedStats(ctx)
assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqLen.Value()) //nolint
}, time.Second*10, time.Millisecond*100)
docVrs := rt.UpdateDoc("doc", vrs.Rev, `{"prob": "lol"}`)
// wait skipped list to be emptied by release of sequence range
require.EventuallyWithT(t, func(c *assert.CollectT) {
rt.GetDatabase().UpdateCalculatedStats(ctx)
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.PendingSeqLen.Value())
//assert.Equal(c, int64(1), rt.GetDatabase().DbStats.CacheStats.SkippedSeqCap.Value())
//assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqLen.Value()) //nolint
}, time.Second*10, time.Millisecond*100)
doc1Vrs := rt.PutDoc("doc1", `{"prop":true}`)
changes, err := rt.WaitForChanges(2, "/{{.keyspace}}/_changes", "", true)
Expand Down Expand Up @@ -381,9 +381,9 @@ func TestJumpInSequencesAtAllocatorRangeInPending(t *testing.T) {
// assert that nothing has been pushed to skipped
require.EventuallyWithT(t, func(c *assert.CollectT) {
rt.GetDatabase().UpdateCalculatedStats(ctx)
//assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.SkippedSeqCap.Value())
//assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value())
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value())
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqCap.Value()) // nolint
assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqLen.Value()) // nolint
}, time.Second*10, time.Millisecond*100)
doc1Vrs := rt.PutDoc("doc1", `{"prop":true}`)
changes, err := rt.WaitForChanges(2, "/{{.keyspace}}/_changes", "", true)
Expand Down

0 comments on commit afacff6

Please sign in to comment.