Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #5039

## 1.21.0 in progress

Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4246,6 +4246,20 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness: <duration> | default = 1m]

# Per-tenant TTL for cached query results in the cache backend
# (Memcached/Redis/FIFO). This is the standard TTL for results that do not
# overlap with the out-of-order time window. 0 (default) means use the global
# cache backend TTL configuration.
# CLI flag: -frontend.results-cache-ttl
[results_cache_ttl: <duration> | default = 0s]

# Per-tenant TTL for cached query results that overlap with the out-of-order
# time window. These results may still receive out-of-order samples, so they
# typically use a shorter TTL. 0 (default) means use the global cache backend
# TTL configuration.
# CLI flag: -frontend.out-of-order-results-cache-ttl
[out_of_order_results_cache_ttl: <duration> | default = 0s]

# Maximum number of queriers that can handle requests for a single tenant. If
# set to 0 or value higher than number of available queriers, *all* queriers
# will handle requests for the tenant. If the value is < 1, it will be treated
Expand Down
7 changes: 5 additions & 2 deletions pkg/chunk/cache/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"sync"
"time"

opentracing "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -38,6 +39,7 @@ type backgroundCache struct {
type backgroundWrite struct {
keys []string
bufs [][]byte
ttl time.Duration
}

// NewBackground returns a new Cache that does stores on background goroutines.
Expand Down Expand Up @@ -81,13 +83,14 @@ func (c *backgroundCache) Stop() {
const keysPerBatch = 100

// Store writes keys for the cache in the background.
func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) {
for len(keys) > 0 {
num := min(keysPerBatch, len(keys))

bgWrite := backgroundWrite{
keys: keys[:num],
bufs: bufs[:num],
ttl: ttl,
}
select {
case c.bgWrites <- bgWrite:
Expand Down Expand Up @@ -115,7 +118,7 @@ func (c *backgroundCache) writeBackLoop() {
return
}
c.queueLength.Sub(float64(len(bgWrite.keys)))
c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs)
c.Cache.Store(context.Background(), bgWrite.keys, bgWrite.bufs, bgWrite.ttl)

case <-c.quit:
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
// Whatsmore, we found partially successful Fetches were often treated as failed
// when they returned an error.
type Cache interface {
Store(ctx context.Context, key []string, buf [][]byte)
Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string)
Store(ctx context.Context, key []string, buf [][]byte, ttl time.Duration)
Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missing []string)
Stop()
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunkenc.Chunk) {
chunks = append(chunks, cleanChunk)
}

cache.Store(context.Background(), keys, bufs)
cache.Store(context.Background(), keys, bufs, 0)
return keys, chunks
}

Expand All @@ -45,7 +45,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch
index := rand.Intn(len(keys))
key := keys[index]

found, bufs, missingKeys := cache.Fetch(context.Background(), []string{key})
found, bufs, missingKeys := cache.Fetch(context.Background(), []string{key}, 0)
require.Len(t, found, 1)
require.Len(t, bufs, 1)
require.Len(t, missingKeys, 0)
Expand All @@ -58,7 +58,7 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch

func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunkenc.Chunk) {
// test getting them all
found, bufs, missingKeys := cache.Fetch(context.Background(), keys)
found, bufs, missingKeys := cache.Fetch(context.Background(), keys, 0)
require.Len(t, found, len(keys))
require.Len(t, bufs, len(keys))
require.Len(t, missingKeys, 0)
Expand All @@ -75,7 +75,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
func testCacheMiss(t *testing.T, cache cache.Cache) {
for range 100 {
key := strconv.Itoa(rand.Int()) // arbitrary key which should fail: no chunk key is a single integer
found, bufs, missing := cache.Fetch(context.Background(), []string{key})
found, bufs, missing := cache.Fetch(context.Background(), []string{key}, 0)
require.Empty(t, found)
require.Empty(t, bufs)
require.Len(t, missing, 1)
Expand Down
18 changes: 13 additions & 5 deletions pkg/chunk/cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type cacheEntry struct {
updated time.Time
key string
value []byte
ttl time.Duration
}

// NewFifoCache returns a new initialised FifoCache of size.
Expand Down Expand Up @@ -178,7 +179,7 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
}

// Fetch implements Cache.
func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) {
func (c *FifoCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missing []string) {
found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys))
for _, key := range keys {
val, ok := c.Get(ctx, key)
Expand All @@ -194,14 +195,19 @@ func (c *FifoCache) Fetch(ctx context.Context, keys []string) (found []string, b
}

// Store implements Cache.
func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte) {
func (c *FifoCache) Store(ctx context.Context, keys []string, values [][]byte, ttl time.Duration) {
// If TTL is 0, fall back to configured validity
if ttl == 0 {
ttl = c.validity
}

c.entriesAdded.Inc()

c.lock.Lock()
defer c.lock.Unlock()

for i := range keys {
c.put(keys[i], values[i])
c.put(keys[i], values[i], ttl)
}
}

Expand All @@ -220,7 +226,7 @@ func (c *FifoCache) Stop() {
c.memoryBytes.Set(float64(0))
}

func (c *FifoCache) put(key string, value []byte) {
func (c *FifoCache) put(key string, value []byte, ttl time.Duration) {
// See if we already have the item in the cache.
element, ok := c.entries[key]
if ok {
Expand All @@ -235,6 +241,7 @@ func (c *FifoCache) put(key string, value []byte) {
updated: time.Now(),
key: key,
value: value,
ttl: ttl,
}
entrySz := sizeOf(entry)

Expand Down Expand Up @@ -281,7 +288,8 @@ func (c *FifoCache) Get(ctx context.Context, key string) ([]byte, bool) {
element, ok := c.entries[key]
if ok {
entry := element.Value.(*cacheEntry)
if c.validity == 0 || time.Since(entry.updated) < c.validity {
// Use per-entry TTL (which may have been defaulted from c.validity)
if entry.ttl == 0 || time.Since(entry.updated) < entry.ttl {
return entry.value, true
}

Expand Down
53 changes: 49 additions & 4 deletions pkg/chunk/cache/fifo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestFifoCacheEviction(t *testing.T) {
keys = append(keys, key)
values = append(values, value)
}
c.Store(ctx, keys, values)
c.Store(ctx, keys, values, 0)
require.Len(t, c.entries, cnt)

assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(1))
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestFifoCacheEviction(t *testing.T) {
keys = append(keys, key)
values = append(values, value)
}
c.Store(ctx, keys, values)
c.Store(ctx, keys, values, 0)
require.Len(t, c.entries, cnt)

assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(2))
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestFifoCacheEviction(t *testing.T) {
copy(value, vstr)
values = append(values, value)
}
c.Store(ctx, keys, values)
c.Store(ctx, keys, values, 0)
require.Len(t, c.entries, cnt)

for i := cnt; i < cnt+evicted; i++ {
Expand Down Expand Up @@ -191,7 +191,8 @@ func TestFifoCacheExpiry(t *testing.T) {

c.Store(ctx,
[]string{key1, key2, key4, key3, key2, key1},
[][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1})
[][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1},
0)

value, ok := c.Get(ctx, key1)
require.True(t, ok)
Expand Down Expand Up @@ -239,6 +240,50 @@ func genBytes(n uint8) []byte {
return arr
}

func TestFifoCacheTTLFallback(t *testing.T) {
cfg := FifoCacheConfig{
MaxSizeItems: 10,
Validity: 100 * time.Millisecond, // Default TTL
}
c := NewFifoCache("test", cfg, nil, log.NewNopLogger())
ctx := context.Background()

// Test 1: TTL=0 should use configured Validity (100ms)
c.Store(ctx, []string{"key1"}, [][]byte{[]byte("value1")}, 0)

// Should exist immediately
value, ok := c.Get(ctx, "key1")
require.True(t, ok)
require.Equal(t, []byte("value1"), value)

// Should still exist at 50ms (half of 100ms)
time.Sleep(50 * time.Millisecond)
value, ok = c.Get(ctx, "key1")
require.True(t, ok)
require.Equal(t, []byte("value1"), value)

// Should expire after 100ms
time.Sleep(60 * time.Millisecond) // Total: 110ms
_, ok = c.Get(ctx, "key1")
require.False(t, ok, "entry should have expired after configured Validity")

// Test 2: Custom TTL should override configured Validity
customTTL := 20 * time.Millisecond
c.Store(ctx, []string{"key2"}, [][]byte{[]byte("value2")}, customTTL)

// Should exist immediately
value, ok = c.Get(ctx, "key2")
require.True(t, ok)
require.Equal(t, []byte("value2"), value)

// Should expire after custom TTL (20ms), not default (100ms)
time.Sleep(30 * time.Millisecond)
_, ok = c.Get(ctx, "key2")
require.False(t, ok, "entry should have expired after custom TTL")

c.Stop()
}

func TestBytesParsing(t *testing.T) {
tests := []struct {
input string
Expand Down
9 changes: 5 additions & 4 deletions pkg/chunk/cache/instrumented.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"time"

ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -64,7 +65,7 @@ type instrumentedCache struct {
requestDuration *instr.HistogramCollector
}

func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) {
for j := range bufs {
i.storedValueSize.Observe(float64(len(bufs[j])))
}
Expand All @@ -73,12 +74,12 @@ func (i *instrumentedCache) Store(ctx context.Context, keys []string, bufs [][]b
_ = instr.CollectedRequest(ctx, method, i.requestDuration, instr.ErrorCode, func(ctx context.Context) error {
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.Int("keys", len(keys)))
i.Cache.Store(ctx, keys, bufs)
i.Cache.Store(ctx, keys, bufs, ttl)
return nil
})
}

func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) {
func (i *instrumentedCache) Fetch(ctx context.Context, keys []string, ttl time.Duration) ([]string, [][]byte, []string) {
var (
found []string
bufs [][]byte
Expand All @@ -90,7 +91,7 @@ func (i *instrumentedCache) Fetch(ctx context.Context, keys []string) ([]string,
sp := ot.SpanFromContext(ctx)
sp.LogFields(otlog.Int("keys requested", len(keys)))

found, bufs, missing = i.Cache.Fetch(ctx, keys)
found, bufs, missing = i.Cache.Fetch(ctx, keys, ttl)
sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(keys)-len(found)))
return nil
})
Expand Down
12 changes: 9 additions & 3 deletions pkg/chunk/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func memcacheStatusCode(err error) string {
}

// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.
func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string) {
func (c *Memcached) Fetch(ctx context.Context, keys []string, ttl time.Duration) (found []string, bufs [][]byte, missed []string) {
if c.cfg.BatchSize == 0 {
found, bufs, missed = c.fetch(ctx, keys)
return
Expand Down Expand Up @@ -233,13 +233,19 @@ loopResults:
}

// Store stores the key in the cache.
func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) {
func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte, ttl time.Duration) {
// If TTL is 0, fall back to configured expiration
if ttl == 0 {
ttl = c.cfg.Expiration
}
expiration := int32(ttl.Seconds())

for i := range keys {
err := instr.CollectedRequest(ctx, "Memcache.Put", c.requestDuration, memcacheStatusCode, func(_ context.Context) error {
item := memcache.Item{
Key: keys[i],
Value: bufs[i],
Expiration: int32(c.cfg.Expiration.Seconds()),
Expiration: expiration,
}
return c.memcache.Set(&item)
})
Expand Down
10 changes: 9 additions & 1 deletion pkg/chunk/cache/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

type mockMemcache struct {
sync.RWMutex
contents map[string][]byte
contents map[string][]byte
lastExpiration int32
}

func newMockMemcache() *mockMemcache {
Expand All @@ -35,5 +36,12 @@ func (m *mockMemcache) Set(item *memcache.Item) error {
m.Lock()
defer m.Unlock()
m.contents[item.Key] = item.Value
m.lastExpiration = item.Expiration
return nil
}

func (m *mockMemcache) GetLastExpiration() int32 {
m.RLock()
defer m.RUnlock()
return m.lastExpiration
}
Loading
Loading