From 92f71560bd54f703da86df3ff6bdd59bf68cd559 Mon Sep 17 00:00:00 2001 From: Sandy Chen Date: Fri, 22 May 2026 13:31:00 +0900 Subject: [PATCH 1/2] Fix ingester inflight query counter leak Signed-off-by: Sandy Chen --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 4 ++-- pkg/ingester/ingester_test.go | 9 ++++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c539eb49730..8f25f5e58af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512 * [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515 * [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534 +* [BUGFIX] Ingester: Fix inflight query counter leak when resource-based query protection rejects a request. #7539 ## 1.21.0 2026-04-24 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6116a318992..d1147806043 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2515,8 +2515,6 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) { } } - i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) - if i.resourceBasedLimiter != nil { if err := i.resourceBasedLimiter.AcceptNewRequest(); err != nil { level.Warn(i.logger).Log("msg", "failed to accept request", "err", err) @@ -2524,6 +2522,8 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) { } } + i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) + return func() { i.inflightQueryRequests.Dec() }, nil diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5dd0d1ec36b..d0d4d2d2373 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3533,7 +3533,9 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { {labels.FromStrings("__name__", "test_1", "route", "get_user", "status", "200"), 1, 100000}, } - i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry()) + cfg := defaultIngesterTestConfig(t) + cfg.DefaultLimits.MaxInflightQueryRequests = 1 + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -3566,6 +3568,11 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { // Expected error from isRetryableError in blocks_store_queryable.go require.ErrorIs(t, err, limiter.ErrResourceLimitReached) + require.Equal(t, int64(0), i.inflightQueryRequests.Load()) + + i.resourceBasedLimiter = nil + s = &mockQueryStreamServer{ctx: ctx} + require.NoError(t, i.QueryStream(rreq, s)) } func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { From b955dfa442a47f5222e26f4d5d61abd85d0a326c Mon Sep 17 00:00:00 2001 From: Sandy Chen Date: Fri, 22 May 2026 15:55:03 +0900 Subject: [PATCH 2/2] Address review feedback on inflight query leak fix - Add clarifying comment before the post-rejection QueryStream block, per inline review nit. - Replace `i.resourceBasedLimiter = nil` with mutating the underlying mock monitor's heap utilization, so the same live limiter admits the second query. The regression test now exercises the limiter machinery on the retry instead of bypassing it. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Sandy Chen --- pkg/ingester/ingester_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index d0d4d2d2373..c7f88246cd5 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3544,7 +3544,8 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { resource.CPU: 0.5, resource.Heap: 0.5, } - i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil, "ingester") + monitor := &mockResourceMonitor{cpu: 0.4, heap: 0.6} + i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(monitor, limits, nil, "ingester") require.NoError(t, err) // Wait until it's ACTIVE @@ -3570,7 +3571,8 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { require.ErrorIs(t, err, limiter.ErrResourceLimitReached) require.Equal(t, int64(0), i.inflightQueryRequests.Load()) - i.resourceBasedLimiter = nil + // Verify that a query not blocked by the limiter still succeeds after the rejected request. + monitor.heap = 0.4 s = &mockQueryStreamServer{ctx: ctx} require.NoError(t, i.QueryStream(rreq, s)) }