Skip to content

Commit

Permalink
Fix flakes in ./topdown/cache
Browse files Browse the repository at this point in the history
Per discussion in #7188, do *not* adjust the timing of cache scans to be more frequent; we want to wait at least staleEntryEvictionTimePeriodSeconds between cache scans, even if we hold the lock for a substantial period of time.

Signed-off-by: Evan Anderson <[email protected]>
  • Loading branch information
evankanderson authored and ashutosh-narkar committed Dec 4, 2024
1 parent bb4273a commit 84b554c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
5 changes: 4 additions & 1 deletion topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,14 @@ func NewInterQueryCache(config *Config) InterQueryCache {
func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache {
iqCache := newCache(config)
if iqCache.staleEntryEvictionTimePeriodSeconds() > 0 {
cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
go func() {
cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
for {
select {
case <-cleanupTicker.C:
// NOTE: We stop the ticker and create a new one here to ensure that applications
// get _at least_ staleEntryEvictionTimePeriodSeconds with the cache unlocked;
// see https://github.com/open-policy-agent/opa/pull/7188/files#r1855342998
cleanupTicker.Stop()
iqCache.cleanStaleValues()
cleanupTicker = time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
Expand Down
32 changes: 22 additions & 10 deletions topdown/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,17 @@ func TestInsertWithExpiryAndEviction(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at stale_entry_eviction_period_seconds to clean up items.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)

cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("force_evicted_foo").Value, cacheValue, time.Now().Add(100*time.Second))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue)
}
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second))
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(900*time.Millisecond))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue)
}
Expand All @@ -425,7 +428,7 @@ func TestInsertWithExpiryAndEviction(t *testing.T) {
}

// Ensure stale entries clean up routine runs at least once
time.Sleep(2 * time.Second)
time.Sleep(1100 * time.Millisecond)

// Entry deleted even though not expired because force evicted when foo is inserted
if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); found {
Expand Down Expand Up @@ -454,20 +457,23 @@ func TestInsertHighTTLWithStaleEntryCleanup(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at stale_entry_eviction_period_seconds to clean up items.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)

cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue)
}
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second))
cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(900*time.Millisecond))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found {
t.Fatalf("Expected cache entry with value %v, found no entry", fetchedCacheValue)
}

// Ensure stale entries clean up routine runs at least once
time.Sleep(2 * time.Second)
time.Sleep(1100 * time.Millisecond)

cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second))
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
Expand Down Expand Up @@ -497,7 +503,10 @@ func TestInsertHighTTLWithoutStaleEntryCleanup(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at stale_entry_eviction_period_seconds to clean up items.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)

cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second))
Expand Down Expand Up @@ -537,14 +546,17 @@ func TestZeroExpiryTime(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}

cache := NewInterQueryCacheWithContext(context.Background(), config)
// This starts a background ticker at stale_entry_eviction_period_seconds to clean up items.
ctx, cancel := context.WithCancel(context.Background())
cache := NewInterQueryCacheWithContext(ctx, config)
t.Cleanup(cancel)
cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20)
cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Time{})
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue)
}

time.Sleep(2 * time.Second)
time.Sleep(1100 * time.Millisecond)

// Stale entry cleanup routine skips zero time cache entries
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
Expand Down Expand Up @@ -574,7 +586,7 @@ func TestCancelNewInterQueryCacheWithContext(t *testing.T) {
}

cancel()
time.Sleep(2 * time.Second)
time.Sleep(1100 * time.Millisecond)

// Stale entry cleanup routine stopped as context was cancelled
if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {
Expand Down

0 comments on commit 84b554c

Please sign in to comment.