From afacff645aa5e304e1969b6c17a226618e59dfc1 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 5 Feb 2025 16:08:21 +0000 Subject: [PATCH] [3.1.12] Restore reverted stats (#7353) --- base/stats.go | 24 +++++++++++++++++++----- db/change_cache.go | 10 ++++++---- db/change_cache_test.go | 10 +++++++--- rest/changes_test.go | 14 +++++++------- 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/base/stats.go b/base/stats.go index b3cf558f11..76798eb689 100644 --- a/base/stats.go +++ b/base/stats.go @@ -383,7 +383,7 @@ type CacheStats struct { NonMobileIgnoredCount *SgwIntStat `json:"non_mobile_ignored_count"` // The total number of active channels. NumActiveChannels *SgwIntStat `json:"num_active_channels"` - // The total number of skipped sequences. + // The total number of skipped sequences. This is a cumulative value. NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"` // The total number of pending sequences. These are out-of-sequence entries waiting to be cached. PendingSeqLen *SgwIntStat `json:"pending_seq_len"` @@ -393,8 +393,12 @@ type CacheStats struct { RevisionCacheHits *SgwIntStat `json:"rev_cache_hits"` // The total number of revision cache misses. RevisionCacheMisses *SgwIntStat `json:"rev_cache_misses"` - // The current length of the pending skipped sequence queue. - SkippedSeqLen *SgwIntStat `json:"skipped_seq_len"` + // Deprecated: The current length of the pending skipped sequence queue. This is equivalent to NumCurrentSeqsSkipped. + DeprecatedSkippedSeqLen *SgwIntStat `json:"skipped_seq_len"` + // Deprecated: DeprecatedSkippedSeqCap UNUSED + DeprecatedSkippedSeqCap *SgwIntStat `json:"skipped_seq_cap"` + // NumCurrentSeqsSkipped is the number of items in the pending skipped sequence queue. This used to be DeprecatedSkippedSeqLen. + NumCurrentSeqsSkipped *SgwIntStat `json:"current_skipped_seq_count"` // The total view_queries. ViewQueries *SgwIntStat `json:"view_queries"` } @@ -1144,7 +1148,15 @@ func (d *DbStats) initCacheStats() error { if err != nil { return err } - resUtil.SkippedSeqLen, err = NewIntStat(SubsystemCacheKey, "skipped_seq_len", labelKeys, labelVals, prometheus.GaugeValue, 0) + resUtil.DeprecatedSkippedSeqLen, err = NewIntStat(SubsystemCacheKey, "skipped_seq_len", labelKeys, labelVals, prometheus.GaugeValue, 0) + if err != nil { + return err + } + resUtil.DeprecatedSkippedSeqCap, err = NewIntStat(SubsystemCacheKey, "skipped_seq_cap", labelKeys, labelVals, prometheus.GaugeValue, 0) + if err != nil { + return err + } + resUtil.NumCurrentSeqsSkipped, err = NewIntStat(SubsystemCacheKey, "current_skipped_seq_count", labelKeys, labelVals, prometheus.CounterValue, 0) if err != nil { return err } @@ -1178,12 +1190,14 @@ func (d *DbStats) unregisterCacheStats() { prometheus.Unregister(d.CacheStats.NonMobileIgnoredCount) prometheus.Unregister(d.CacheStats.NumActiveChannels) prometheus.Unregister(d.CacheStats.NumSkippedSeqs) + prometheus.Unregister(d.CacheStats.NumCurrentSeqsSkipped) prometheus.Unregister(d.CacheStats.PendingSeqLen) prometheus.Unregister(d.CacheStats.RevisionCacheBypass) prometheus.Unregister(d.CacheStats.RevisionCacheHits) prometheus.Unregister(d.CacheStats.RevisionCacheMisses) - prometheus.Unregister(d.CacheStats.SkippedSeqLen) prometheus.Unregister(d.CacheStats.ViewQueries) + prometheus.Unregister(d.CacheStats.DeprecatedSkippedSeqLen) //nolint + prometheus.Unregister(d.CacheStats.DeprecatedSkippedSeqCap) //nolint } func (d *DbStats) Cache() *CacheStats { diff --git a/db/change_cache.go b/db/change_cache.go index e2bfc08732..9aa2b91de2 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -317,7 +317,6 @@ func (c *changeCache) CleanSkippedSequenceQueue(ctx context.Context) error { c.db.DbStats.Cache().AbandonedSeqs.Add(numRemoved) base.InfofCtx(ctx, base.KeyCache, "CleanSkippedSequenceQueue complete. Not Found:%d for database %s.", len(oldSkippedSequences), base.MD(c.db.Name)) - oldSkippedSequences = nil return nil } @@ -880,14 +879,16 @@ func (h *LogPriorityQueue) Pop() interface{} { func (c *changeCache) RemoveSkipped(x uint64) error { err := c.skippedSeqs.Remove(x) - c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) + c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(int64(c.skippedSeqs.skippedList.Len())) + c.db.DbStats.Cache().DeprecatedSkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) //nolint return err } // Removes a set of sequences. Logs warning on removal error, returns count of successfully removed. func (c *changeCache) RemoveSkippedSequences(ctx context.Context, sequences []uint64) (removedCount int64) { numRemoved := c.skippedSeqs.RemoveSequences(ctx, sequences) - c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) + c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(int64(c.skippedSeqs.skippedList.Len())) + c.db.DbStats.Cache().DeprecatedSkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) //nolint //nolint return numRemoved } @@ -901,7 +902,8 @@ func (c *changeCache) PushSkipped(ctx context.Context, sequence uint64) { base.InfofCtx(ctx, base.KeyCache, "Error pushing skipped sequence: %d, %v", sequence, err) return } - c.db.DbStats.Cache().SkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) + c.db.DbStats.Cache().NumCurrentSeqsSkipped.Set(int64(c.skippedSeqs.skippedList.Len())) + c.db.DbStats.Cache().DeprecatedSkippedSeqLen.Set(int64(c.skippedSeqs.skippedList.Len())) //nolint } func (c *changeCache) GetSkippedSequencesOlderThanMaxWait() (oldSequences []uint64) { diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 3c86a964fb..43de5e0b47 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -1325,7 +1325,7 @@ func TestStopChangeCache(t *testing.T) { WriteDirect(t, db, []string{"ABC"}, 3) // Artificially add 3 skipped, and back date skipped entry by 2 hours to trigger attempted view retrieval during Clean call - timeAdded := time.Now().Add(time.Duration(time.Hour * -2)) + timeAdded := time.Now().Add(time.Hour * -2) err := db.changeCache.skippedSeqs.Push(&SkippedSequence{3, timeAdded.Unix()}) require.NoError(t, err) @@ -1395,7 +1395,9 @@ func TestSkippedSequenceCompaction(t *testing.T) { // assert that length is 1 require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int64(1), dbContext.DbStats.Cache().SkippedSeqLen.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.Cache().NumCurrentSeqsSkipped.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.Cache().DeprecatedSkippedSeqLen.Value()) //nolint + assert.Equal(c, int64(0), dbContext.DbStats.Cache().DeprecatedSkippedSeqCap.Value()) //nolint }, time.Second*10, time.Millisecond*100) // run clean skipped sequence task @@ -1403,8 +1405,10 @@ func TestSkippedSequenceCompaction(t *testing.T) { // assert that the above skipped sequence (4) is remains in list, abandoned sequences should still be 1 require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Equal(c, int64(1), dbContext.DbStats.Cache().SkippedSeqLen.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.Cache().NumCurrentSeqsSkipped.Value()) + assert.Equal(c, int64(1), dbContext.DbStats.Cache().DeprecatedSkippedSeqLen.Value()) //nolint assert.Equal(c, int64(1), dbContext.DbStats.Cache().AbandonedSeqs.Value()) + assert.Equal(c, int64(0), dbContext.DbStats.Cache().DeprecatedSkippedSeqCap.Value()) //nolint }, time.Second*10, time.Millisecond*100) // assert that sequence 4 still exists on list diff --git a/rest/changes_test.go b/rest/changes_test.go index d5cb214dc6..9bc5868bfa 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -317,16 +317,16 @@ func TestJumpInSequencesAtAllocatorSkippedSequenceFill(t *testing.T) { // wait for value to move from pending to cache and skipped list to fill require.EventuallyWithT(t, func(c *assert.CollectT) { rt.GetDatabase().UpdateCalculatedStats(ctx) - assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) + assert.Equal(c, int64(18), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqLen.Value()) //nolint }, time.Second*10, time.Millisecond*100) docVrs := rt.UpdateDoc("doc", vrs.Rev, `{"prob": "lol"}`) // wait skipped list to be emptied by release of sequence range require.EventuallyWithT(t, func(c *assert.CollectT) { rt.GetDatabase().UpdateCalculatedStats(ctx) assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.PendingSeqLen.Value()) - //assert.Equal(c, int64(1), rt.GetDatabase().DbStats.CacheStats.SkippedSeqCap.Value()) - //assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) + assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqLen.Value()) //nolint }, time.Second*10, time.Millisecond*100) doc1Vrs := rt.PutDoc("doc1", `{"prop":true}`) changes, err := rt.WaitForChanges(2, "/{{.keyspace}}/_changes", "", true) @@ -381,9 +381,9 @@ func TestJumpInSequencesAtAllocatorRangeInPending(t *testing.T) { // assert that nothing has been pushed to skipped require.EventuallyWithT(t, func(c *assert.CollectT) { rt.GetDatabase().UpdateCalculatedStats(ctx) - //assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.SkippedSeqCap.Value()) - //assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) - assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.SkippedSeqLen.Value()) + assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.NumCurrentSeqsSkipped.Value()) + assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqCap.Value()) // nolint + assert.Equal(c, int64(0), rt.GetDatabase().DbStats.CacheStats.DeprecatedSkippedSeqLen.Value()) // nolint }, time.Second*10, time.Millisecond*100) doc1Vrs := rt.PutDoc("doc1", `{"prop":true}`) changes, err := rt.WaitForChanges(2, "/{{.keyspace}}/_changes", "", true)