From 9af77e579dfb41fd1912ff3e4fcc2857ea71cbbb Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 17 Mar 2026 04:26:22 +0000 Subject: [PATCH 1/3] per tenant cache ttl Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 1 + pkg/chunk/cache/background.go | 7 +- pkg/chunk/cache/cache.go | 4 +- pkg/chunk/cache/cache_test.go | 8 +- pkg/chunk/cache/fifo_cache.go | 18 +- pkg/chunk/cache/fifo_cache_test.go | 53 +++- pkg/chunk/cache/instrumented.go | 9 +- pkg/chunk/cache/memcached.go | 12 +- pkg/chunk/cache/memcached_client_test.go | 10 +- pkg/chunk/cache/memcached_test.go | 43 ++- pkg/chunk/cache/mock.go | 23 +- pkg/chunk/cache/redis_cache.go | 7 +- pkg/chunk/cache/redis_cache_test.go | 6 +- pkg/chunk/cache/redis_client.go | 11 +- pkg/chunk/cache/redis_client_test.go | 58 +++- pkg/chunk/cache/snappy.go | 9 +- pkg/chunk/cache/tiered.go | 15 +- pkg/chunk/cache/tiered_test.go | 6 +- .../instant_query_middlewares_test.go | 13 + pkg/querier/tripperware/limits.go | 13 + .../tripperware/queryrange/limits_test.go | 26 +- .../tripperware/queryrange/results_cache.go | 57 +++- .../queryrange/results_cache_test.go | 262 +++++++++++++++++- .../tripperware/test_shard_by_query_utils.go | 12 + pkg/util/validation/exporter_test.go | 2 + pkg/util/validation/limits.go | 17 ++ 26 files changed, 628 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fef5cebcf3..bb174e4ec42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ * [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics. * [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293 * [ENHANCEMENT] Ingester: Added `cortex_ingester_ingested_histogram_buckets` metric to track number of histogram buckets ingested per user. #7297 +* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #5039 * [BUGFIX] Distributor: Add bounds checking for symbol references in Remote Write V2 requests to prevent panics when UnitRef or HelpRef exceed the symbols array length. #7290 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go index 1e74fe50129..b526915b4a1 100644 --- a/pkg/chunk/cache/background.go +++ b/pkg/chunk/cache/background.go @@ -4,6 +4,7 @@ import ( "context" "flag" "sync" + "time" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -38,6 +39,7 @@ type backgroundCache struct { type backgroundWrite struct { keys []string bufs [][]byte + ttl time.Duration } // NewBackground returns a new Cache that does stores on background goroutines. @@ -81,13 +83,14 @@ func (c *backgroundCache) Stop() { const keysPerBatch = 100 // Store writes keys for the cache in the background. -func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) { for len(keys) > 0 { num := min(keysPerBatch, len(keys)) bgWrite := backgroundWrite{ keys: keys[:num], bufs: bufs[:num], + ttl: ttl, } select { case c.bgWrites <- bgWrite: @@ -115,7 +118,7 @@ func (c *backgroundCache) writeBackLoop() { return } c.queueLength.Sub(float64(len(bgWrite.keys))) - c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs) + c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs, bgWrite.ttl) case <-c.quit: return diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index 5d03bd5279d..28601f2e67c 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -18,8 +18,8 @@ import ( // Whatsmore, we found partially successful Fetches were often treated as failed // when they returned an error. type Cache interface { - Store(ctx context.Context, key []string, buf [][]byte) - Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) + Store(ctx context.Context, key []string, buf [][]byte, ttl time.Duration) + Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missing []string) Stop() } diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index 5ed7314caac..41e75b522c5 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -36,7 +36,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunkenc.Chunk) { chunks = append(chunks, cleanChunk) } - cache.Store(context.Background(), keys, bufs) + cache.Store(context.Background(), keys, bufs, 0) return keys, chunks } @@ -45,7 +45,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch index := rand.Intn(len(keys)) key := keys[index] - found, bufs, missingKeys := cache.Fetch(context.Background(), []string{key}) + found, bufs, missingKeys := cache.Fetch(context.Background(), []string{key}, 0) require.Len(t, found, 1) require.Len(t, bufs, 1) require.Len(t, missingKeys, 0) @@ -58,7 +58,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunkenc.Chunk) { // test getting them all - found, bufs, missingKeys := cache.Fetch(context.Background(), keys) + found, bufs, missingKeys := cache.Fetch(context.Background(), keys, 0) require.Len(t, found, len(keys)) require.Len(t, bufs, len(keys)) require.Len(t, missingKeys, 0) @@ -75,7 +75,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks [] func testCacheMiss(t *testing.T, cache cache.Cache) { for range 100 { key := strconv.Itoa(rand.Int()) // arbitrary key which should fail: no chunk key is a single integer - found, bufs, missing := cache.Fetch(context.Background(), []string{key}) + found, bufs, missing := cache.Fetch(context.Background(), []string{key}, 0) require.Empty(t, found) require.Empty(t, bufs) require.Len(t, missing, 1) diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go index 1b7bc044225..3be8764d04d 100644 --- a/pkg/chunk/cache/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -88,6 +88,7 @@ type cacheEntry struct { updated time.Time key string value []byte + ttl time.Duration } // NewFifoCache returns a new initialised FifoCache of size. @@ -178,7 +179,7 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l } // Fetch implements Cache. -func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) { +func (c *FifoCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missing []string) { found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys)) for _, key := range keys { val, ok := c.Get(ctx, key) @@ -194,14 +195,19 @@ func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, b } // Store implements Cache. -func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) { +func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte, ttl time.Duration) { + // If TTL is 0, fall back to configured validity + if ttl == 0 { + ttl = c.validity + } + c.entriesAdded.Inc() c.lock.Lock() defer c.lock.Unlock() for i := range keys { - c.put(keys[i], values[i]) + c.put(keys[i], values[i], ttl) } } @@ -220,7 +226,7 @@ func (c *FifoCache) Stop() { c.memoryBytes.Set(float64(0)) } -func (c *FifoCache) put(key string, value []byte) { +func (c *FifoCache) put(key string, value []byte, ttl time.Duration) { // See if we already have the item in the cache. element, ok := c.entries[key] if ok { @@ -235,6 +241,7 @@ func (c *FifoCache) put(key string, value []byte) { updated: time.Now(), key: key, value: value, + ttl: ttl, } entrySz := sizeOf(entry) @@ -281,7 +288,8 @@ func (c *FifoCache) Get(ctx context.Context, key string) ([]byte, bool) { element, ok := c.entries[key] if ok { entry := element.Value.(*cacheEntry) - if c.validity == 0 || time.Since(entry.updated) < c.validity { + // Use per-entry TTL (which may have been defaulted from c.validity) + if entry.ttl == 0 || time.Since(entry.updated) < entry.ttl { return entry.value, true } diff --git a/pkg/chunk/cache/fifo_cache_test.go b/pkg/chunk/cache/fifo_cache_test.go index 3515d077357..e93bfca77be 100644 --- a/pkg/chunk/cache/fifo_cache_test.go +++ b/pkg/chunk/cache/fifo_cache_test.go @@ -51,7 +51,7 @@ func TestFifoCacheEviction(t *testing.T) { keys = append(keys, key) values = append(values, value) } - c.Store(ctx, keys, values) + c.Store(ctx, keys, values, 0) require.Len(t, c.entries, cnt) assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(1)) @@ -93,7 +93,7 @@ func TestFifoCacheEviction(t *testing.T) { keys = append(keys, key) values = append(values, value) } - c.Store(ctx, keys, values) + c.Store(ctx, keys, values, 0) require.Len(t, c.entries, cnt) assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(2)) @@ -139,7 +139,7 @@ func TestFifoCacheEviction(t *testing.T) { copy(value, vstr) values = append(values, value) } - c.Store(ctx, keys, values) + c.Store(ctx, keys, values, 0) require.Len(t, c.entries, cnt) for i := cnt; i < cnt+evicted; i++ { @@ -191,7 +191,8 @@ func TestFifoCacheExpiry(t *testing.T) { c.Store(ctx, []string{key1, key2, key4, key3, key2, key1}, - [][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1}) + [][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1}, + 0) value, ok := c.Get(ctx, key1) require.True(t, ok) @@ -239,6 +240,50 @@ func genBytes(n uint8) []byte { return arr } +func TestFifoCacheTTLFallback(t *testing.T) { + cfg := FifoCacheConfig{ + MaxSizeItems: 10, + Validity: 100 * time.Millisecond, // Default TTL + } + c := NewFifoCache("test", cfg, nil, log.NewNopLogger()) + ctx := context.Background() + + // Test 1: TTL=0 should use configured Validity (100ms) + c.Store(ctx, []string{"key1"}, [][]byte{[]byte("value1")}, 0) + + // Should exist immediately + value, ok := c.Get(ctx, "key1") + require.True(t, ok) + require.Equal(t, []byte("value1"), value) + + // Should still exist at 50ms (half of 100ms) + time.Sleep(50 * time.Millisecond) + value, ok = c.Get(ctx, "key1") + require.True(t, ok) + require.Equal(t, []byte("value1"), value) + + // Should expire after 100ms + time.Sleep(60 * time.Millisecond) // Total: 110ms + _, ok = c.Get(ctx, "key1") + require.False(t, ok, "entry should have expired after configured Validity") + + // Test 2: Custom TTL should override configured Validity + customTTL := 20 * time.Millisecond + c.Store(ctx, []string{"key2"}, [][]byte{[]byte("value2")}, customTTL) + + // Should exist immediately + value, ok = c.Get(ctx, "key2") + require.True(t, ok) + require.Equal(t, []byte("value2"), value) + + // Should expire after custom TTL (20ms), not default (100ms) + time.Sleep(30 * time.Millisecond) + _, ok = c.Get(ctx, "key2") + require.False(t, ok, "entry should have expired after custom TTL") + + c.Stop() +} + func TestBytesParsing(t *testing.T) { tests := []struct { input string diff --git a/pkg/chunk/cache/instrumented.go b/pkg/chunk/cache/instrumented.go index 0f4b1d1178a..7c9062581f2 100644 --- a/pkg/chunk/cache/instrumented.go +++ b/pkg/chunk/cache/instrumented.go @@ -2,6 +2,7 @@ package cache import ( "context" + "time" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -64,7 +65,7 @@ type instrumentedCache struct { requestDuration *instr.HistogramCollector } -func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) { for j := range bufs { i.storedValueSize.Observe(float64(len(bufs[j]))) } @@ -73,12 +74,12 @@ func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]b _ = instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error { sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys", len(keys))) - i.Cache.Store(ctx, keys, bufs) + i.Cache.Store(ctx, keys, bufs, ttl) return nil }) } -func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) { +func (i *instrumentedCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) ([]string, [][]byte, []string) { var ( found []string bufs [][]byte @@ -90,7 +91,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, sp := ot.SpanFromContext(ctx) sp.LogFields(otlog.Int("keys requested", len(keys))) - found, bufs, missing = i.Cache.Fetch(ctx, keys) + found, bufs, missing = i.Cache.Fetch(ctx, keys, ttl) sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(keys)-len(found))) return nil }) diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index 7a458ee2a42..70ee76fa41d 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -132,7 +132,7 @@ func memcacheStatusCode(err error) string { } // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. -func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { +func (c *Memcached) Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missed []string) { if c.cfg.BatchSize == 0 { found, bufs, missed = c.fetch(ctx, keys) return @@ -233,13 +233,19 @@ loopResults: } // Store stores the key in the cache. -func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) { + // If TTL is 0, fall back to configured expiration + if ttl == 0 { + ttl = c.cfg.Expiration + } + expiration := int32(ttl.Seconds()) + for i := range keys { err := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error { item := memcache.Item{ Key: keys[i], Value: bufs[i], - Expiration: int32(c.cfg.Expiration.Seconds()), + Expiration: expiration, } return c.memcache.Set(&item) }) diff --git a/pkg/chunk/cache/memcached_client_test.go b/pkg/chunk/cache/memcached_client_test.go index 028fba8ef46..b32d26a9e00 100644 --- a/pkg/chunk/cache/memcached_client_test.go +++ b/pkg/chunk/cache/memcached_client_test.go @@ -8,7 +8,8 @@ import ( type mockMemcache struct { sync.RWMutex - contents map[string][]byte + contents map[string][]byte + lastExpiration int32 } func newMockMemcache() *mockMemcache { @@ -35,5 +36,12 @@ func (m *mockMemcache) Set(item *memcache.Item) error { m.Lock() defer m.Unlock() m.contents[item.Key] = item.Value + m.lastExpiration = item.Expiration return nil } + +func (m *mockMemcache) GetLastExpiration() int32 { + m.RLock() + defer m.RUnlock() + return m.lastExpiration +} diff --git a/pkg/chunk/cache/memcached_test.go b/pkg/chunk/cache/memcached_test.go index ebb2581e9ea..1a09dee6402 100644 --- a/pkg/chunk/cache/memcached_test.go +++ b/pkg/chunk/cache/memcached_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "testing" + "time" "github.com/bradfitz/gomemcache/memcache" "github.com/go-kit/log" @@ -52,9 +53,9 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) { keys = append(keys, fmt.Sprint(i)) bufs = append(bufs, fmt.Append(nil, i)) } - memcache.Store(ctx, keys, bufs) + memcache.Store(ctx, keys, bufs, 0) - found, bufs, missing := memcache.Fetch(ctx, keysIncMissing) + found, bufs, missing := memcache.Fetch(ctx, keysIncMissing, 0) for i := range numKeys { if i%5 == 0 { require.Equal(t, fmt.Sprint(i), missing[0]) @@ -126,10 +127,10 @@ func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) { keys = append(keys, fmt.Sprint(i)) bufs = append(bufs, fmt.Append(nil, i)) } - memcache.Store(ctx, keys, bufs) + memcache.Store(ctx, keys, bufs, 0) for range 10 { - found, bufs, missing := memcache.Fetch(ctx, keysIncMissing) + found, bufs, missing := memcache.Fetch(ctx, keysIncMissing, 0) require.Equal(t, len(found), len(bufs)) for i := range found { @@ -187,8 +188,38 @@ func testMemcachedStopping(t *testing.T, memcache *cache.Memcached) { bufs = append(bufs, fmt.Append(nil, i)) } - memcache.Store(ctx, keys, bufs) + memcache.Store(ctx, keys, bufs, 0) + + go memcache.Fetch(ctx, keys, 0) + memcache.Stop() +} + +func TestMemcachedTTLFallback(t *testing.T) { + client := newMockMemcache() + defaultExpiration := 100 * time.Second + + memcache := cache.NewMemcached( + cache.MemcachedConfig{ + Expiration: defaultExpiration, + }, + client, + "test", + nil, + log.NewNopLogger(), + ) + + ctx := context.Background() + + // Test 1: TTL=0 should use configured Expiration (100s) + memcache.Store(ctx, []string{"key1"}, [][]byte{[]byte("value1")}, 0) + require.Equal(t, int32(defaultExpiration.Seconds()), client.GetLastExpiration(), + "TTL=0 should use configured default expiration") + + // Test 2: Custom TTL should override configured Expiration + customTTL := 20 * time.Second + memcache.Store(ctx, []string{"key2"}, [][]byte{[]byte("value2")}, customTTL) + require.Equal(t, int32(customTTL.Seconds()), client.GetLastExpiration(), + "custom TTL should override default expiration") - go memcache.Fetch(ctx, keys) memcache.Stop() } diff --git a/pkg/chunk/cache/mock.go b/pkg/chunk/cache/mock.go index 6503aea80dc..68b97550226 100644 --- a/pkg/chunk/cache/mock.go +++ b/pkg/chunk/cache/mock.go @@ -3,22 +3,26 @@ package cache import ( "context" "sync" + "time" ) -type mockCache struct { +// MockCache is a simple in-memory cache for testing that also captures the last TTL used. +type MockCache struct { sync.Mutex - cache map[string][]byte + cache map[string][]byte + lastTTL time.Duration } -func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) { +func (m *MockCache) Store(_ context.Context, keys []string, bufs [][]byte, ttl time.Duration) { m.Lock() defer m.Unlock() + m.lastTTL = ttl for i := range keys { m.cache[keys[i]] = bufs[i] } } -func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) { +func (m *MockCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missing []string) { m.Lock() defer m.Unlock() for _, key := range keys { @@ -33,12 +37,19 @@ func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, b return } -func (m *mockCache) Stop() { +func (m *MockCache) Stop() { +} + +// GetLastTTL returns the TTL from the last Store call (useful for testing TTL behavior). +func (m *MockCache) GetLastTTL() time.Duration { + m.Lock() + defer m.Unlock() + return m.lastTTL } // NewMockCache makes a new MockCache. func NewMockCache() Cache { - return &mockCache{ + return &MockCache{ cache: map[string][]byte{}, } } diff --git a/pkg/chunk/cache/redis_cache.go b/pkg/chunk/cache/redis_cache.go index 4ac9850b1da..6cc9206cc04 100644 --- a/pkg/chunk/cache/redis_cache.go +++ b/pkg/chunk/cache/redis_cache.go @@ -2,6 +2,7 @@ package cache import ( "context" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -55,7 +56,7 @@ func redisStatusCode(err error) string { } // Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. -func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) { +func (c *RedisCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missed []string) { const method = "RedisCache.MGet" var items [][]byte // Run a tracked request, using c.requestDuration to monitor requests. @@ -93,8 +94,8 @@ func (c *RedisCache) Fetch(ctx context.Context, keys []string) (found []string, } // Store stores the key in the cache. -func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte) { - err := c.redis.MSet(ctx, keys, bufs) +func (c *RedisCache) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) { + err := c.redis.MSet(ctx, keys, bufs, ttl) if err != nil { level.Error(util_log.WithContext(ctx, c.logger)).Log("msg", "failed to put to redis", "name", c.name, "err", err) } diff --git a/pkg/chunk/cache/redis_cache_test.go b/pkg/chunk/cache/redis_cache_test.go index 154e688066d..753fc8a91e9 100644 --- a/pkg/chunk/cache/redis_cache_test.go +++ b/pkg/chunk/cache/redis_cache_test.go @@ -28,10 +28,10 @@ func TestRedisCache(t *testing.T) { ctx := context.Background() - c.Store(ctx, keys, bufs) + c.Store(ctx, keys, bufs, 0) // test hits - found, data, missed := c.Fetch(ctx, keys) + found, data, missed := c.Fetch(ctx, keys, 0) require.Len(t, found, nHit) require.Len(t, missed, 0) @@ -41,7 +41,7 @@ func TestRedisCache(t *testing.T) { } // test misses - found, _, missed = c.Fetch(ctx, miss) + found, _, missed = c.Fetch(ctx, miss, 0) require.Len(t, found, 0) require.Len(t, missed, nMiss) diff --git a/pkg/chunk/cache/redis_client.go b/pkg/chunk/cache/redis_client.go index 678b31dd1aa..1ebe0cd5874 100644 --- a/pkg/chunk/cache/redis_client.go +++ b/pkg/chunk/cache/redis_client.go @@ -88,7 +88,12 @@ func (c *RedisClient) Ping(ctx context.Context) error { return nil } -func (c *RedisClient) MSet(ctx context.Context, keys []string, values [][]byte) error { +func (c *RedisClient) MSet(ctx context.Context, keys []string, values [][]byte, ttl time.Duration) error { + // If TTL is 0, fall back to configured expiration + if ttl == 0 { + ttl = c.expiration + } + var cancel context.CancelFunc if c.timeout > 0 { ctx, cancel = context.WithTimeout(ctx, c.timeout) @@ -107,7 +112,7 @@ func (c *RedisClient) MSet(ctx context.Context, keys []string, values [][]byte) if isCluster { // For cluster mode, use individual SET commands to avoid cross-slot transaction errors for i := range keys { - err := c.rdb.Set(ctx, keys[i], values[i], c.expiration).Err() + err := c.rdb.Set(ctx, keys[i], values[i], ttl).Err() if err != nil { return err } @@ -118,7 +123,7 @@ func (c *RedisClient) MSet(ctx context.Context, keys []string, values [][]byte) // For single/sentinel mode, use transaction pipeline for atomicity pipe := c.rdb.TxPipeline() for i := range keys { - pipe.Set(ctx, keys[i], values[i], c.expiration) + pipe.Set(ctx, keys[i], values[i], ttl) } _, err := pipe.Exec(ctx) return err diff --git a/pkg/chunk/cache/redis_client_test.go b/pkg/chunk/cache/redis_client_test.go index bef316890f4..e976fe94e19 100644 --- a/pkg/chunk/cache/redis_client_test.go +++ b/pkg/chunk/cache/redis_client_test.go @@ -42,7 +42,7 @@ func TestRedisClient(t *testing.T) { miss := []string{"miss1", "miss2"} // set values - err := tt.client.MSet(ctx, keys, bufs) + err := tt.client.MSet(ctx, keys, bufs, 0) require.Nil(t, err) // get keys @@ -124,7 +124,7 @@ func TestRedisClusterCrossSlotMSet(t *testing.T) { } // This should not fail even with cross-slot keys in cluster mode - err = cluster.MSet(ctx, keys, values) + err = cluster.MSet(ctx, keys, values, 0) require.Nil(t, err, "MSet should work with cross-slot keys in cluster mode") // Verify all keys were set correctly @@ -136,3 +136,57 @@ func TestRedisClusterCrossSlotMSet(t *testing.T) { require.Equal(t, values[i], value, "Retrieved value should match set value for key %s", keys[i]) } } + +func TestRedisTTLFallback(t *testing.T) { + defaultExpiration := 100 * time.Second + client, err := mockRedisClientWithExpiration(defaultExpiration) + require.Nil(t, err) + defer client.Close() + + ctx := context.Background() + + // Test 1: TTL=0 should use configured expiration (100s) + err = client.MSet(ctx, []string{"key1"}, [][]byte{[]byte("value1")}, 0) + require.Nil(t, err, "MSet with TTL=0 should succeed and use default expiration") + + // Verify key was set with correct value + values, err := client.MGet(ctx, []string{"key1"}) + require.Nil(t, err) + require.Equal(t, []byte("value1"), values[0]) + + // Verify TTL is set to default expiration (100s, allow 2s delta for test execution time) + ttl, err := client.rdb.TTL(ctx, "key1").Result() + require.Nil(t, err) + require.InDelta(t, defaultExpiration.Seconds(), ttl.Seconds(), 10.0, + "TTL=0 should use configured default expiration") + + // Test 2: Custom TTL should override configured expiration + customTTL := 20 * time.Second + err = client.MSet(ctx, []string{"key2"}, [][]byte{[]byte("value2")}, customTTL) + require.Nil(t, err, "MSet with custom TTL should succeed") + + // Verify key was set with correct value + values, err = client.MGet(ctx, []string{"key2"}) + require.Nil(t, err) + require.Equal(t, []byte("value2"), values[0]) + + // Verify TTL is set to custom value (20s, allow 2s delta for test execution time) + ttl, err = client.rdb.TTL(ctx, "key2").Result() + require.Nil(t, err) + require.InDelta(t, customTTL.Seconds(), ttl.Seconds(), 10.0, + "custom TTL should override default expiration") +} + +func mockRedisClientWithExpiration(expiration time.Duration) (*RedisClient, error) { + redisServer, err := miniredis.Run() + if err != nil { + return nil, err + } + return &RedisClient{ + expiration: expiration, + timeout: 100 * time.Millisecond, + rdb: redis.NewClient(&redis.Options{ + Addr: redisServer.Addr(), + }), + }, nil +} diff --git a/pkg/chunk/cache/snappy.go b/pkg/chunk/cache/snappy.go index eb0bd908c13..1f6f6327700 100644 --- a/pkg/chunk/cache/snappy.go +++ b/pkg/chunk/cache/snappy.go @@ -2,6 +2,7 @@ package cache import ( "context" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -23,17 +24,17 @@ func NewSnappy(next Cache, logger log.Logger) Cache { } } -func (s *snappyCache) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (s *snappyCache) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) { cs := make([][]byte, 0, len(bufs)) for _, buf := range bufs { c := snappy.Encode(nil, buf) cs = append(cs, c) } - s.next.Store(ctx, keys, cs) + s.next.Store(ctx, keys, cs, ttl) } -func (s *snappyCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) { - found, bufs, missing := s.next.Fetch(ctx, keys) +func (s *snappyCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) ([]string, [][]byte, []string) { + found, bufs, missing := s.next.Fetch(ctx, keys, ttl) ds := make([][]byte, 0, len(bufs)) for _, buf := range bufs { d, err := snappy.Decode(nil, buf) diff --git a/pkg/chunk/cache/tiered.go b/pkg/chunk/cache/tiered.go index bb2012ac318..82f39a48058 100644 --- a/pkg/chunk/cache/tiered.go +++ b/pkg/chunk/cache/tiered.go @@ -1,6 +1,9 @@ package cache -import "context" +import ( + "context" + "time" +) type tiered []Cache @@ -19,13 +22,13 @@ func IsEmptyTieredCache(cache Cache) bool { return ok && len(c) == 0 } -func (t tiered) Store(ctx context.Context, keys []string, bufs [][]byte) { +func (t tiered) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) { for _, c := range []Cache(t) { - c.Store(ctx, keys, bufs) + c.Store(ctx, keys, bufs, ttl) } } -func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) { +func (t tiered) Fetch(ctx context.Context, keys []string, ttl time.Duration) ([]string, [][]byte, []string) { found := make(map[string][]byte, len(keys)) missing := keys previousCaches := make([]Cache, 0, len(t)) @@ -36,8 +39,8 @@ func (t tiered) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, [ passBufs [][]byte ) - passKeys, passBufs, missing = c.Fetch(ctx, missing) - tiered(previousCaches).Store(ctx, passKeys, passBufs) + passKeys, passBufs, missing = c.Fetch(ctx, missing, ttl) + tiered(previousCaches).Store(ctx, passKeys, passBufs, ttl) for i, key := range passKeys { found[key] = passBufs[i] diff --git a/pkg/chunk/cache/tiered_test.go b/pkg/chunk/cache/tiered_test.go index c4f85eb63ae..966027cd353 100644 --- a/pkg/chunk/cache/tiered_test.go +++ b/pkg/chunk/cache/tiered_test.go @@ -24,10 +24,10 @@ func TestTiered(t *testing.T) { level1, level2 := cache.NewMockCache(), cache.NewMockCache() cache := cache.NewTiered([]cache.Cache{level1, level2}) - level1.Store(context.Background(), []string{"key1"}, [][]byte{[]byte("hello")}) - level2.Store(context.Background(), []string{"key2"}, [][]byte{[]byte("world")}) + level1.Store(context.Background(), []string{"key1"}, [][]byte{[]byte("hello")}, 0) + level2.Store(context.Background(), []string{"key2"}, [][]byte{[]byte("world")}, 0) - keys, bufs, missing := cache.Fetch(context.Background(), []string{"key1", "key2", "key3"}) + keys, bufs, missing := cache.Fetch(context.Background(), []string{"key1", "key2", "key3"}, 0) require.Equal(t, []string{"key1", "key2"}, keys) require.Equal(t, [][]byte{[]byte("hello"), []byte("world")}, bufs) require.Equal(t, []string{"key3"}, missing) diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go index 6646ec1bc0d..2118e737693 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/querysharding" @@ -277,6 +278,18 @@ func (m mockLimitsShard) QueryRejection(userID string) validation.QueryRejection return m.queryRejection } +func (mockLimitsShard) ResultsCacheTTL(userID string) time.Duration { + return 0 +} + +func (mockLimitsShard) OutOfOrderResultsCacheTTL(userID string) time.Duration { + return 0 +} + +func (mockLimitsShard) OutOfOrderTimeWindow(userID string) model.Duration { + return 0 +} + type singleHostRoundTripper struct { host string next http.RoundTripper diff --git a/pkg/querier/tripperware/limits.go b/pkg/querier/tripperware/limits.go index 3695e2da0f0..546422974e2 100644 --- a/pkg/querier/tripperware/limits.go +++ b/pkg/querier/tripperware/limits.go @@ -3,6 +3,8 @@ package tripperware import ( "time" + "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -26,6 +28,17 @@ type Limits interface { // to prevent caching of very recent results. MaxCacheFreshness(string) time.Duration + // ResultsCacheTTL returns the standard TTL for cached query results. + // Returns 0 if not configured, meaning use global backend TTL. + ResultsCacheTTL(userID string) time.Duration + + // OutOfOrderResultsCacheTTL returns the TTL for cached results that may contain out-of-order samples. + // Returns 0 if not configured, meaning use global backend TTL. + OutOfOrderResultsCacheTTL(userID string) time.Duration + + // OutOfOrderTimeWindow returns the allowed time window for ingestion of out-of-order samples. + OutOfOrderTimeWindow(userID string) model.Duration + // QueryVerticalShardSize returns the maximum number of queriers that can handle requests for this user. QueryVerticalShardSize(userID string) int diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 31d3008e5cd..a4469f2aafe 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -231,11 +232,14 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { } type mockLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration - maxCacheFreshness time.Duration - maxQueryResponseSize int64 - queryVerticalShardSize int + maxQueryLookback time.Duration + maxQueryLength time.Duration + maxCacheFreshness time.Duration + maxQueryResponseSize int64 + queryVerticalShardSize int + resultsCacheTTL time.Duration + outOfOrderResultsCacheTTL time.Duration + outOfOrderWindow time.Duration } func (m mockLimits) MaxQueryLookback(string) time.Duration { @@ -270,6 +274,18 @@ func (m mockLimits) QueryRejection(userID string) validation.QueryRejection { return validation.QueryRejection{} } +func (m mockLimits) ResultsCacheTTL(userID string) time.Duration { + return m.resultsCacheTTL +} + +func (m mockLimits) OutOfOrderResultsCacheTTL(userID string) time.Duration { + return m.outOfOrderResultsCacheTTL +} + +func (m mockLimits) OutOfOrderTimeWindow(userID string) model.Duration { + return model.Duration(m.outOfOrderWindow) +} + type mockHandler struct { mock.Mock } diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index 15733ca34e8..66b04bcbe78 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -254,7 +254,8 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar return s.next.Do(ctx, r) } - cached, ok := s.get(ctx, key) + ttl := s.getTTLForExtents(ctx, []tripperware.Extent{{Start: r.GetStart(), End: r.GetEnd()}}) + cached, ok := s.get(ctx, key, ttl) if ok { response, extents, err = s.handleHit(ctx, r, cached, maxCacheTime) } else { @@ -726,8 +727,8 @@ func (s resultsCache) filterRecentExtents(req tripperware.Request, maxCacheFresh return extents, nil } -func (s resultsCache) get(ctx context.Context, key string) ([]tripperware.Extent, bool) { - found, bufs, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)}) +func (s resultsCache) get(ctx context.Context, key string, ttl time.Duration) ([]tripperware.Extent, bool) { + found, bufs, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)}, ttl) if len(found) != 1 { return nil, false } @@ -758,7 +759,55 @@ func (s resultsCache) get(ctx context.Context, key string) ([]tripperware.Extent return resp.Extents, true } +// getTTLForExtents calculates the appropriate TTL for given extents based on whether +// they overlap with the out-of-order time window. +func (s resultsCache) getTTLForExtents(ctx context.Context, extents []tripperware.Extent) time.Duration { + tenantIDs, err := users.TenantIDs(ctx) + if err != nil { + tenantIDs = nil + } + + var resultsCacheTTL, outOfOrderCacheTTL time.Duration + if len(tenantIDs) > 0 { + // Use smallest non-zero TTL to respect the most restrictive tenant's cache policy + resultsCacheTTL = validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, s.limits.ResultsCacheTTL) + outOfOrderCacheTTL = validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, s.limits.OutOfOrderResultsCacheTTL) + } + + if s.extentsOverlapOutOfOrderWindow(extents, tenantIDs) { + return outOfOrderCacheTTL + } + return resultsCacheTTL +} + +// extentsOverlapOutOfOrderWindow checks if any extent overlaps with the out-of-order time window. +// Returns true if any extent's end time is within the out-of-order window. +func (s resultsCache) extentsOverlapOutOfOrderWindow(extents []tripperware.Extent, tenantIDs []string) bool { + if len(tenantIDs) == 0 { + return false + } + + outOfOrderWindow := validation.MaxDurationPerTenant(tenantIDs, func(userID string) time.Duration { + return time.Duration(s.limits.OutOfOrderTimeWindow(userID)) + }) + + if outOfOrderWindow == 0 { + return false + } + + nowMs := time.Now().UnixMilli() + outOfOrderCutoffMs := nowMs - int64(outOfOrderWindow/time.Millisecond) + + for _, extent := range extents { + if extent.End >= outOfOrderCutoffMs { + return true + } + } + + return false +} func (s resultsCache) put(ctx context.Context, key string, extents []tripperware.Extent) { + ttl := s.getTTLForExtents(ctx, extents) buf, err := proto.Marshal(&tripperware.CachedResponse{ Key: key, Extents: extents, @@ -768,7 +817,7 @@ func (s resultsCache) put(ctx context.Context, key string, extents []tripperware return } - s.cache.Store(ctx, []string{cache.HashKey(key)}, [][]byte{buf}) + s.cache.Store(ctx, []string{cache.HashKey(key)}, [][]byte{buf}, ttl) } func jaegerSpanID(ctx context.Context) string { diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index 24f8dbd4879..82b831847d7 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -1441,15 +1441,15 @@ func Test_resultsCache_MissingData(t *testing.T) { Response: nil, }}) - extents, hit := rc.get(ctx, "empty") + extents, hit := rc.get(ctx, "empty", 0) require.Empty(t, extents) require.False(t, hit) - extents, hit = rc.get(ctx, "notempty") + extents, hit = rc.get(ctx, "notempty", 0) require.Equal(t, len(extents), 1) require.True(t, hit) - extents, hit = rc.get(ctx, "mixed") + extents, hit = rc.get(ctx, "mixed", 0) require.Equal(t, len(extents), 0) require.False(t, hit) } @@ -1583,7 +1583,7 @@ func TestResultsCacheFillCompatibility(t *testing.T) { key := splitter(day).GenerateCacheKey(ctx, users.JoinTenantIDs(tenantIDs), parsedRequest) cacheKey := cache.HashKey(key) - found, bufs, _ := c.Fetch(ctx, []string{cacheKey}) + found, bufs, _ := c.Fetch(ctx, []string{cacheKey}, 0) require.Equal(t, []string{cacheKey}, found) require.Len(t, bufs, 1) @@ -1762,3 +1762,257 @@ func TestPrometheusResponseExtractor_Extract_Histograms(t *testing.T) { }) } } + +func TestExtentsOverlapOutOfOrderWindow(t *testing.T) { + now := time.Now() + nowMs := now.UnixMilli() + oneHourAgo := now.Add(-1 * time.Hour).UnixMilli() + twoHoursAgo := now.Add(-2 * time.Hour).UnixMilli() + thirtyMinutesAgo := now.Add(-30 * time.Minute).UnixMilli() + + tests := []struct { + name string + extents []tripperware.Extent + outOfOrderWindow time.Duration + expectedOverlap bool + }{ + { + name: "extent ends before out-of-order window - no overlap", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: twoHoursAgo + 1000}, // 2 hours ago + }, + outOfOrderWindow: 1 * time.Hour, + expectedOverlap: false, + }, + { + name: "extent ends exactly at cutoff boundary - overlaps (boundary case)", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: oneHourAgo}, // ends exactly at 1h ago + }, + outOfOrderWindow: 1 * time.Hour, + expectedOverlap: true, // >= check includes boundary + }, + { + name: "extent ends within out-of-order window - overlaps", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: thirtyMinutesAgo}, // 30 min ago + }, + outOfOrderWindow: 1 * time.Hour, + expectedOverlap: true, + }, + { + name: "extent ends at now - overlaps", + extents: []tripperware.Extent{ + {Start: oneHourAgo, End: nowMs}, + }, + outOfOrderWindow: 1 * time.Hour, + expectedOverlap: true, + }, + { + name: "multiple extents, one overlaps - overlaps", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: twoHoursAgo + 1000}, // old, no overlap + {Start: twoHoursAgo, End: thirtyMinutesAgo}, // overlaps + }, + outOfOrderWindow: 1 * time.Hour, + expectedOverlap: true, + }, + { + name: "zero out-of-order window - no overlap", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: nowMs}, + }, + outOfOrderWindow: 0, + expectedOverlap: false, + }, + { + name: "empty extents - no overlap", + extents: []tripperware.Extent{}, + outOfOrderWindow: 1 * time.Hour, + expectedOverlap: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + limits := mockLimits{outOfOrderWindow: tc.outOfOrderWindow} + cfg := ResultsCacheConfig{ + CacheConfig: cache.Config{ + Cache: cache.NewMockCache(), + }, + } + rm, _, err := NewResultsCacheMiddleware( + log.NewNopLogger(), + cfg, + splitter(day), + limits, + PrometheusCodec, + PrometheusResponseExtractor{}, + nil, + nil, + ) + require.NoError(t, err) + rc := rm.Wrap(nil).(*resultsCache) + + overlap := rc.extentsOverlapOutOfOrderWindow(tc.extents, []string{"tenant-a"}) + assert.Equal(t, tc.expectedOverlap, overlap) + }) + } +} + +func TestResultsCachePutTTLSelection(t *testing.T) { + now := time.Now() + oneHourAgo := now.Add(-1 * time.Hour).UnixMilli() + twoHoursAgo := now.Add(-2 * time.Hour).UnixMilli() + + tests := []struct { + name string + extents []tripperware.Extent + resultsCacheTTL time.Duration + outOfOrderCacheTTL time.Duration + outOfOrderWindow time.Duration + expectedTTL time.Duration + }{ + { + name: "old data uses results_cache_ttl", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: twoHoursAgo + 1000}, // 2 hours ago, no overlap + }, + resultsCacheTTL: 24 * time.Hour, + outOfOrderCacheTTL: 5 * time.Minute, + outOfOrderWindow: 1 * time.Hour, + expectedTTL: 24 * time.Hour, + }, + { + name: "recent data uses out_of_order_results_cache_ttl", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: oneHourAgo}, // overlaps with 1h window + }, + resultsCacheTTL: 24 * time.Hour, + outOfOrderCacheTTL: 5 * time.Minute, + outOfOrderWindow: 1 * time.Hour, + expectedTTL: 5 * time.Minute, + }, + { + name: "zero out-of-order window uses results_cache_ttl", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: oneHourAgo}, + }, + resultsCacheTTL: 12 * time.Hour, + outOfOrderCacheTTL: 5 * time.Minute, + outOfOrderWindow: 0, // no out-of-order support + expectedTTL: 12 * time.Hour, + }, + { + name: "zero TTLs use backend defaults", + extents: []tripperware.Extent{ + {Start: twoHoursAgo, End: twoHoursAgo + 1000}, + }, + resultsCacheTTL: 0, + outOfOrderCacheTTL: 0, + outOfOrderWindow: 0, + expectedTTL: 0, // backend default + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockCache := cache.NewMockCache() + limits := mockLimits{ + resultsCacheTTL: tc.resultsCacheTTL, + outOfOrderResultsCacheTTL: tc.outOfOrderCacheTTL, + outOfOrderWindow: tc.outOfOrderWindow, + } + + cfg := ResultsCacheConfig{ + CacheConfig: cache.Config{ + Cache: mockCache, + }, + } + rm, _, err := NewResultsCacheMiddleware( + log.NewNopLogger(), + cfg, + splitter(day), + limits, + PrometheusCodec, + PrometheusResponseExtractor{}, + nil, + nil, + ) + require.NoError(t, err) + rc := rm.Wrap(nil).(*resultsCache) + + ctx := user.InjectOrgID(context.Background(), "tenant-a") + rc.put(ctx, "test-key", tc.extents) + + assert.Equal(t, tc.expectedTTL, mockCache.(*cache.MockCache).GetLastTTL()) + }) + } +} + +func TestResultsCacheWithPerTenantTTL(t *testing.T) { + t.Parallel() + + // Create a mock cache that captures TTL + mockCache := cache.NewMockCache() + + // Configure per-tenant TTLs + limits := mockLimits{ + resultsCacheTTL: 24 * time.Hour, + outOfOrderResultsCacheTTL: 5 * time.Minute, + outOfOrderWindow: 1 * time.Hour, + } + + cfg := ResultsCacheConfig{ + CacheConfig: cache.Config{ + Cache: mockCache, + }, + } + rcm, _, err := NewResultsCacheMiddleware( + log.NewNopLogger(), + cfg, + splitter(day), + limits, + PrometheusCodec, + PrometheusResponseExtractor{}, + nil, + nil, + ) + require.NoError(t, err) + + // Create a handler that returns a response + handlerCalls := 0 + rc := rcm.Wrap(tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { + handlerCalls++ + return parsedResponse, nil + })) + + ctx := user.InjectOrgID(context.Background(), "tenant-a") + + // Test 1: Query old data (> 1 hour ago) - should use long TTL + now := time.Now() + oldStart := now.Add(-3 * time.Hour).UnixNano() / 1e6 + oldEnd := now.Add(-2 * time.Hour).UnixNano() / 1e6 + oldReq := parsedRequest.WithStartEnd(oldStart, oldEnd) + + _, err = rc.Do(ctx, oldReq) + require.NoError(t, err) + require.Equal(t, 1, handlerCalls, "first request should call handler") + + // Verify long TTL was used (24h) + lastTTL := mockCache.(*cache.MockCache).GetLastTTL() + assert.Equal(t, 24*time.Hour, lastTTL, "old data should use long TTL") + + // Test 2: Query recent data (overlaps with out-of-order window) - should use short TTL + recentStart := now.Add(-2 * time.Hour).UnixNano() / 1e6 + recentEnd := now.Add(-30 * time.Minute).UnixNano() / 1e6 // 30 min ago, within 1h window + recentReq := parsedRequest.WithStartEnd(recentStart, recentEnd) + + _, err = rc.Do(ctx, recentReq) + require.NoError(t, err) + require.Equal(t, 2, handlerCalls, "second request should call handler") + + // Verify short TTL was used (5m) + lastTTL = mockCache.(*cache.MockCache).GetLastTTL() + assert.Equal(t, 5*time.Minute, lastTTL, "recent data should use short out-of-order TTL") +} diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index d39a5dd2317..65c0e12f3ed 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -515,6 +515,18 @@ func (m mockLimits) QueryRejection(userID string) validation.QueryRejection { return m.queryRejection } +func (mockLimits) ResultsCacheTTL(userID string) time.Duration { + return 0 +} + +func (mockLimits) OutOfOrderResultsCacheTTL(userID string) time.Duration { + return 0 +} + +func (mockLimits) OutOfOrderTimeWindow(userID string) model.Duration { + return 0 +} + type singleHostRoundTripper struct { host string next http.RoundTripper diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 01f96b92750..0b1ef21ce8b 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -96,6 +96,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="max_total_label_value_length_for_unoptimized_regex",user="tenant-a"} 0 cortex_overrides{limit_name="native_histogram_ingestion_burst_size",user="tenant-a"} 0 cortex_overrides{limit_name="native_histogram_ingestion_rate",user="tenant-a"} 1.7976931348623157e+308 + cortex_overrides{limit_name="out_of_order_results_cache_ttl",user="tenant-a"} 0 cortex_overrides{limit_name="out_of_order_time_window",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_enabled",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_tenant_shard_size",user="tenant-a"} 0 @@ -106,6 +107,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="query_vertical_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="reject_old_samples",user="tenant-a"} 0 cortex_overrides{limit_name="reject_old_samples_max_age",user="tenant-a"} 1.2096e+06 + cortex_overrides{limit_name="results_cache_ttl",user="tenant-a"} 0 cortex_overrides{limit_name="ruler_evaluation_delay_duration",user="tenant-a"} 0 cortex_overrides{limit_name="ruler_max_rule_groups_per_tenant",user="tenant-a"} 0 cortex_overrides{limit_name="ruler_max_rules_per_rule_group",user="tenant-a"} 0 diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 2f14b5cab8c..73f09fe3407 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -191,6 +191,8 @@ type Limits struct { MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` MaxQueryResponseSize int64 `yaml:"max_query_response_size" json:"max_query_response_size"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness"` + ResultsCacheTTL model.Duration `yaml:"results_cache_ttl" json:"results_cache_ttl"` + OutOfOrderResultsCacheTTL model.Duration `yaml:"out_of_order_results_cache_ttl" json:"out_of_order_results_cache_ttl"` MaxQueriersPerTenant float64 `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size"` QueryPartialData bool `yaml:"query_partial_data" json:"query_partial_data" doc:"nocli|description=Enable to allow queries to be evaluated with data from a single zone, if other zones are not available.|default=false"` @@ -316,6 +318,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxCacheFreshness.Set("1m") f.Int64Var(&l.MaxQueryResponseSize, "frontend.max-query-response-size", 0, "The maximum total uncompressed query response size. If the query was sharded the limit is applied to the total response size of all shards. This limit is enforced in query-frontend for `query` and `query_range` APIs. 0 to disable.") f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") + // ResultsCacheTTL and OutOfOrderResultsCacheTTL default to 0 (use global cache config expiration) + f.Var(&l.ResultsCacheTTL, "frontend.results-cache-ttl", "Per-tenant TTL for cached query results in the cache backend (Memcached/Redis/FIFO). This is the standard TTL for results that do not overlap with the out-of-order time window. 0 (default) means use the global cache backend TTL configuration.") + f.Var(&l.OutOfOrderResultsCacheTTL, "frontend.out-of-order-results-cache-ttl", "Per-tenant TTL for cached query results that overlap with the out-of-order time window. These results may still receive out-of-order samples, so they typically use a shorter TTL. 0 (default) means use the global cache backend TTL configuration.") f.Float64Var(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. If the value is < 1, it will be treated as a percentage and the gets a percentage of the total queriers. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.") f.IntVar(&l.QueryVerticalShardSize, "frontend.query-vertical-shard-size", 0, "[Experimental] Number of shards to use when distributing shardable PromQL queries.") f.BoolVar(&l.QueryPriority.Enabled, "frontend.query-priority.enabled", false, "Whether queries are assigned with priorities.") @@ -825,6 +830,18 @@ func (o *Overrides) MaxCacheFreshness(userID string) time.Duration { return time.Duration(o.GetOverridesForUser(userID).MaxCacheFreshness) } +// ResultsCacheTTL returns the standard TTL for cached query results. +// Returns 0 if not configured, meaning use global backend TTL. +func (o *Overrides) ResultsCacheTTL(userID string) time.Duration { + return time.Duration(o.GetOverridesForUser(userID).ResultsCacheTTL) +} + +// OutOfOrderResultsCacheTTL returns the TTL for cached results that may contain out-of-order samples. +// Returns 0 if not configured, meaning use global backend TTL. +func (o *Overrides) OutOfOrderResultsCacheTTL(userID string) time.Duration { + return time.Duration(o.GetOverridesForUser(userID).OutOfOrderResultsCacheTTL) +} + // MaxQueriersPerUser returns the maximum number of queriers that can handle requests for this user. func (o *Overrides) MaxQueriersPerUser(userID string) float64 { return o.GetOverridesForUser(userID).MaxQueriersPerTenant From 9d125d86b84d70ff835227af095c723a1f3fddab Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 17 Mar 2026 04:59:59 +0000 Subject: [PATCH 2/3] adding docs for per-tenant cache ttl Signed-off-by: Shvejan Mutheboyina --- docs/configuration/config-file-reference.md | 14 ++++++++++ .../tripperware/queryrange/limits_test.go | 14 +++++----- .../queryrange/results_cache_test.go | 28 +++++++++---------- 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index bb392be1e9b..b2bd78549c0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4246,6 +4246,20 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -frontend.max-cache-freshness [max_cache_freshness: | default = 1m] +# Per-tenant TTL for cached query results in the cache backend +# (Memcached/Redis/FIFO). This is the standard TTL for results that do not +# overlap with the out-of-order time window. 0 (default) means use the global +# cache backend TTL configuration. +# CLI flag: -frontend.results-cache-ttl +[results_cache_ttl: | default = 0s] + +# Per-tenant TTL for cached query results that overlap with the out-of-order +# time window. These results may still receive out-of-order samples, so they +# typically use a shorter TTL. 0 (default) means use the global cache backend +# TTL configuration. +# CLI flag: -frontend.out-of-order-results-cache-ttl +[out_of_order_results_cache_ttl: | default = 0s] + # Maximum number of queriers that can handle requests for a single tenant. If # set to 0 or value higher than number of available queriers, *all* queriers # will handle requests for the tenant. If the value is < 1, it will be treated diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index a4469f2aafe..687a3a1359d 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -232,14 +232,14 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { } type mockLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration - maxCacheFreshness time.Duration - maxQueryResponseSize int64 - queryVerticalShardSize int - resultsCacheTTL time.Duration + maxQueryLookback time.Duration + maxQueryLength time.Duration + maxCacheFreshness time.Duration + maxQueryResponseSize int64 + queryVerticalShardSize int + resultsCacheTTL time.Duration outOfOrderResultsCacheTTL time.Duration - outOfOrderWindow time.Duration + outOfOrderWindow time.Duration } func (m mockLimits) MaxQueryLookback(string) time.Duration { diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index 82b831847d7..fd4a0705f1d 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -1866,12 +1866,12 @@ func TestResultsCachePutTTLSelection(t *testing.T) { twoHoursAgo := now.Add(-2 * time.Hour).UnixMilli() tests := []struct { - name string - extents []tripperware.Extent - resultsCacheTTL time.Duration - outOfOrderCacheTTL time.Duration - outOfOrderWindow time.Duration - expectedTTL time.Duration + name string + extents []tripperware.Extent + resultsCacheTTL time.Duration + outOfOrderCacheTTL time.Duration + outOfOrderWindow time.Duration + expectedTTL time.Duration }{ { name: "old data uses results_cache_ttl", @@ -1919,9 +1919,9 @@ func TestResultsCachePutTTLSelection(t *testing.T) { t.Run(tc.name, func(t *testing.T) { mockCache := cache.NewMockCache() limits := mockLimits{ - resultsCacheTTL: tc.resultsCacheTTL, + resultsCacheTTL: tc.resultsCacheTTL, outOfOrderResultsCacheTTL: tc.outOfOrderCacheTTL, - outOfOrderWindow: tc.outOfOrderWindow, + outOfOrderWindow: tc.outOfOrderWindow, } cfg := ResultsCacheConfig{ @@ -1958,9 +1958,9 @@ func TestResultsCacheWithPerTenantTTL(t *testing.T) { // Configure per-tenant TTLs limits := mockLimits{ - resultsCacheTTL: 24 * time.Hour, + resultsCacheTTL: 24 * time.Hour, outOfOrderResultsCacheTTL: 5 * time.Minute, - outOfOrderWindow: 1 * time.Hour, + outOfOrderWindow: 1 * time.Hour, } cfg := ResultsCacheConfig{ @@ -1991,8 +1991,8 @@ func TestResultsCacheWithPerTenantTTL(t *testing.T) { // Test 1: Query old data (> 1 hour ago) - should use long TTL now := time.Now() - oldStart := now.Add(-3 * time.Hour).UnixNano() / 1e6 - oldEnd := now.Add(-2 * time.Hour).UnixNano() / 1e6 + oldStart := now.Add(-3*time.Hour).UnixNano() / 1e6 + oldEnd := now.Add(-2*time.Hour).UnixNano() / 1e6 oldReq := parsedRequest.WithStartEnd(oldStart, oldEnd) _, err = rc.Do(ctx, oldReq) @@ -2004,8 +2004,8 @@ func TestResultsCacheWithPerTenantTTL(t *testing.T) { assert.Equal(t, 24*time.Hour, lastTTL, "old data should use long TTL") // Test 2: Query recent data (overlaps with out-of-order window) - should use short TTL - recentStart := now.Add(-2 * time.Hour).UnixNano() / 1e6 - recentEnd := now.Add(-30 * time.Minute).UnixNano() / 1e6 // 30 min ago, within 1h window + recentStart := now.Add(-2*time.Hour).UnixNano() / 1e6 + recentEnd := now.Add(-30*time.Minute).UnixNano() / 1e6 // 30 min ago, within 1h window recentReq := parsedRequest.WithStartEnd(recentStart, recentEnd) _, err = rc.Do(ctx, recentReq) From 2563dc63ee0f6429b2529fa003373f0bc0e29fa1 Mon Sep 17 00:00:00 2001 From: Shvejan Mutheboyina Date: Tue, 17 Mar 2026 06:26:26 +0000 Subject: [PATCH 3/3] fixup! per tenant cache ttl Signed-off-by: Shvejan Mutheboyina --- CHANGELOG.md | 2 +- .../tripperware/queryrange/results_cache.go | 15 +++++---------- .../tripperware/queryrange/results_cache_test.go | 13 ++++++++----- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb174e4ec42..9f6676a6fb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #5039 ## 1.21.0 in progress @@ -60,7 +61,6 @@ * [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics. * [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293 * [ENHANCEMENT] Ingester: Added `cortex_ingester_ingested_histogram_buckets` metric to track number of histogram buckets ingested per user. #7297 -* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #5039 * [BUGFIX] Distributor: Add bounds checking for symbol references in Remote Write V2 requests to prevent panics when UnitRef or HelpRef exceed the symbols array length. #7290 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index 66b04bcbe78..6506edfa6ef 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -254,7 +254,7 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar return s.next.Do(ctx, r) } - ttl := s.getTTLForExtents(ctx, []tripperware.Extent{{Start: r.GetStart(), End: r.GetEnd()}}) + ttl := s.getTTLForExtents(tenantIDs, []tripperware.Extent{{Start: r.GetStart(), End: r.GetEnd()}}) cached, ok := s.get(ctx, key, ttl) if ok { response, extents, err = s.handleHit(ctx, r, cached, maxCacheTime) @@ -282,7 +282,7 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar } extents[i].Response = any } - s.put(ctx, key, extents) + s.put(ctx, key, extents, tenantIDs) } if err == nil && !respWithStats { @@ -761,12 +761,7 @@ func (s resultsCache) get(ctx context.Context, key string, ttl time.Duration) ([ // getTTLForExtents calculates the appropriate TTL for given extents based on whether // they overlap with the out-of-order time window. -func (s resultsCache) getTTLForExtents(ctx context.Context, extents []tripperware.Extent) time.Duration { - tenantIDs, err := users.TenantIDs(ctx) - if err != nil { - tenantIDs = nil - } - +func (s resultsCache) getTTLForExtents(tenantIDs []string, extents []tripperware.Extent) time.Duration { var resultsCacheTTL, outOfOrderCacheTTL time.Duration if len(tenantIDs) > 0 { // Use smallest non-zero TTL to respect the most restrictive tenant's cache policy @@ -806,8 +801,8 @@ func (s resultsCache) extentsOverlapOutOfOrderWindow(extents []tripperware.Exten return false } -func (s resultsCache) put(ctx context.Context, key string, extents []tripperware.Extent) { - ttl := s.getTTLForExtents(ctx, extents) +func (s resultsCache) put(ctx context.Context, key string, extents []tripperware.Extent, tenantIDs []string) { + ttl := s.getTTLForExtents(tenantIDs, extents) buf, err := proto.Marshal(&tripperware.CachedResponse{ Key: key, Extents: extents, diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index fd4a0705f1d..1b1955bb121 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -1398,7 +1398,8 @@ func TestResultsCacheMaxFreshness(t *testing.T) { // fill cache key := splitter(day).GenerateCacheKey(ctx, "1", req) - rc.(*resultsCache).put(ctx, key, []tripperware.Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))}) + tenantIDs, _ := users.TenantIDs(ctx) + rc.(*resultsCache).put(ctx, key, []tripperware.Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))}, tenantIDs) resp, err := rc.Do(ctx, req) require.NoError(t, err) @@ -1427,19 +1428,20 @@ func Test_resultsCache_MissingData(t *testing.T) { require.NoError(t, err) rc := rm.Wrap(nil).(*resultsCache) ctx := context.Background() + tenantIDs, _ := users.TenantIDs(ctx) // fill up the cache rc.put(ctx, "empty", []tripperware.Extent{{ Start: 100, End: 200, Response: nil, - }}) - rc.put(ctx, "notempty", []tripperware.Extent{mkExtent(100, 120)}) + }}, tenantIDs) + rc.put(ctx, "notempty", []tripperware.Extent{mkExtent(100, 120)}, tenantIDs) rc.put(ctx, "mixed", []tripperware.Extent{mkExtent(100, 120), { Start: 120, End: 200, Response: nil, - }}) + }}, tenantIDs) extents, hit := rc.get(ctx, "empty", 0) require.Empty(t, extents) @@ -1943,7 +1945,8 @@ func TestResultsCachePutTTLSelection(t *testing.T) { rc := rm.Wrap(nil).(*resultsCache) ctx := user.InjectOrgID(context.Background(), "tenant-a") - rc.put(ctx, "test-key", tc.extents) + tenantIDs, _ := users.TenantIDs(ctx) + rc.put(ctx, "test-key", tc.extents, tenantIDs) assert.Equal(t, tc.expectedTTL, mockCache.(*cache.MockCache).GetLastTTL()) })