diff --git a/topdown/cache/cache.go b/topdown/cache/cache.go index 55ed340619..347803d4d7 100644 --- a/topdown/cache/cache.go +++ b/topdown/cache/cache.go @@ -154,11 +154,14 @@ func NewInterQueryCache(config *Config) InterQueryCache { func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache { iqCache := newCache(config) if iqCache.staleEntryEvictionTimePeriodSeconds() > 0 { - cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second) go func() { + cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second) for { select { case <-cleanupTicker.C: + // NOTE: We stop the ticker and create a new one here to ensure that applications + // get _at least_ staleEntryEvictionTimePeriodSeconds with the cache unlocked; + // see https://github.com/open-policy-agent/opa/pull/7188/files#r1855342998 cleanupTicker.Stop() iqCache.cleanStaleValues() cleanupTicker = time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second) diff --git a/topdown/cache/cache_test.go b/topdown/cache/cache_test.go index ede1983c44..1423a1b363 100644 --- a/topdown/cache/cache_test.go +++ b/topdown/cache/cache_test.go @@ -408,14 +408,17 @@ func TestInsertWithExpiryAndEviction(t *testing.T) { t.Fatalf("Unexpected error %v", err) } - cache := NewInterQueryCacheWithContext(context.Background(), config) + // This starts a background ticker at stale_entry_eviction_period_seconds to clean up items. + ctx, cancel := context.WithCancel(context.Background()) + cache := NewInterQueryCacheWithContext(ctx, config) + t.Cleanup(cancel) cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) cache.InsertWithExpiry(ast.StringTerm("force_evicted_foo").Value, cacheValue, time.Now().Add(100*time.Second)) if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); !found { t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) } - cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(900*time.Millisecond)) if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) } @@ -425,7 +428,7 @@ func TestInsertWithExpiryAndEviction(t *testing.T) { } // Ensure stale entries clean up routine runs at least once - time.Sleep(2 * time.Second) + time.Sleep(1100 * time.Millisecond) // Entry deleted even though not expired because force evicted when foo is inserted if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); found { @@ -454,20 +457,23 @@ func TestInsertHighTTLWithStaleEntryCleanup(t *testing.T) { t.Fatalf("Unexpected error %v", err) } - cache := NewInterQueryCacheWithContext(context.Background(), config) + // This starts a background ticker at stale_entry_eviction_period_seconds to clean up items. + ctx, cancel := context.WithCancel(context.Background()) + cache := NewInterQueryCacheWithContext(ctx, config) + t.Cleanup(cancel) cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second)) if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) } - cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(900*time.Millisecond)) if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { t.Fatalf("Expected cache entry with value %v, found no entry", fetchedCacheValue) } // Ensure stale entries clean up routine runs at least once - time.Sleep(2 * time.Second) + time.Sleep(1100 * time.Millisecond) cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { @@ -497,7 +503,10 @@ func TestInsertHighTTLWithoutStaleEntryCleanup(t *testing.T) { t.Fatalf("Unexpected error %v", err) } - cache := NewInterQueryCacheWithContext(context.Background(), config) + // This starts a background ticker at stale_entry_eviction_period_seconds to clean up items. + ctx, cancel := context.WithCancel(context.Background()) + cache := NewInterQueryCacheWithContext(ctx, config) + t.Cleanup(cancel) cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second)) @@ -537,14 +546,17 @@ func TestZeroExpiryTime(t *testing.T) { t.Fatalf("Unexpected error %v", err) } - cache := NewInterQueryCacheWithContext(context.Background(), config) + // This starts a background ticker at stale_entry_eviction_period_seconds to clean up items. + ctx, cancel := context.WithCancel(context.Background()) + cache := NewInterQueryCacheWithContext(ctx, config) + t.Cleanup(cancel) cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Time{}) if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) } - time.Sleep(2 * time.Second) + time.Sleep(1100 * time.Millisecond) // Stale entry cleanup routine skips zero time cache entries if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { @@ -574,7 +586,7 @@ func TestCancelNewInterQueryCacheWithContext(t *testing.T) { } cancel() - time.Sleep(2 * time.Second) + time.Sleep(1100 * time.Millisecond) // Stale entry cleanup routine stopped as context was cancelled if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found {