diff --git a/CHANGELOG.md b/CHANGELOG.md index c539eb4973..8f25f5e58a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512 * [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515 * [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534 +* [BUGFIX] Ingester: Fix inflight query counter leak when resource-based query protection rejects a request. #7539 ## 1.21.0 2026-04-24 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6116a31899..d114780604 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2515,8 +2515,6 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) { } } - i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) - if i.resourceBasedLimiter != nil { if err := i.resourceBasedLimiter.AcceptNewRequest(); err != nil { level.Warn(i.logger).Log("msg", "failed to accept request", "err", err) @@ -2524,6 +2522,8 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) { } } + i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc()) + return func() { i.inflightQueryRequests.Dec() }, nil diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5dd0d1ec36..c7f88246cd 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -3533,7 +3533,9 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { {labels.FromStrings("__name__", "test_1", "route", "get_user", "status", "200"), 1, 100000}, } - i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry()) + cfg := defaultIngesterTestConfig(t) + cfg.DefaultLimits.MaxInflightQueryRequests = 1 + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -3542,7 +3544,8 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { resource.CPU: 0.5, resource.Heap: 0.5, } - i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(&mockResourceMonitor{cpu: 0.4, heap: 0.6}, limits, nil, "ingester") + monitor := &mockResourceMonitor{cpu: 0.4, heap: 0.6} + i.resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(monitor, limits, nil, "ingester") require.NoError(t, err) // Wait until it's ACTIVE @@ -3566,6 +3569,12 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) { // Expected error from isRetryableError in blocks_store_queryable.go require.ErrorIs(t, err, limiter.ErrResourceLimitReached) + require.Equal(t, int64(0), i.inflightQueryRequests.Load()) + + // Verify that a query not blocked by the limiter still succeeds after the rejected request. + monitor.heap = 0.4 + s = &mockQueryStreamServer{ctx: ctx} + require.NoError(t, i.QueryStream(rreq, s)) } func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {