Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512
* [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515
* [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534
* [BUGFIX] Distributor: Fix a panic (`slice bounds out of range`) in the stream push path when the context deadline expires while the worker goroutine is still marshalling a `WriteRequest`. #7541

## 1.21.0 2026-04-24

Expand Down
33 changes: 15 additions & 18 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ type HealthAndIngesterClient interface {
}

type streamWriteJob struct {
req *cortexpb.StreamWriteRequest
resp *cortexpb.WriteResponse
ctx context.Context
cancel context.CancelFunc
err error
req *cortexpb.StreamWriteRequest
resp *cortexpb.WriteResponse
err error
sendDone chan struct{}
}

type closableHealthAndIngesterClient struct {
Expand Down Expand Up @@ -112,16 +111,12 @@ func (c *closableHealthAndIngesterClient) PushStreamConnection(ctx context.Conte
Request: in,
}

reqCtx, reqCancel := context.WithCancel(ctx)
defer reqCancel()

job := &streamWriteJob{
req: streamReq,
ctx: reqCtx,
cancel: reqCancel,
req: streamReq,
sendDone: make(chan struct{}),
}
c.streamPushChan <- job
<-reqCtx.Done()
<-job.sendDone
return job.resp, job.err
})
}
Expand Down Expand Up @@ -185,9 +180,11 @@ func (c *closableHealthAndIngesterClient) Close() error {
if !ok {
break drainingLoop
}
if job != nil && job.cancel != nil {
if job != nil {
job.err = errors.New("stream connection ingester client closing")
job.cancel()
if job.sendDone != nil {
close(job.sendDone)
}
}
default:
close(c.streamPushChan)
Expand Down Expand Up @@ -239,26 +236,26 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error {
err = stream.Send(job.req)
if err == io.EOF {
job.resp = &cortexpb.WriteResponse{}
job.cancel()
close(job.sendDone)
return
}
if err != nil {
job.err = err
job.cancel()
close(job.sendDone)
continue
}
resp, err := stream.Recv()
if err == io.EOF {
job.resp = &cortexpb.WriteResponse{}
job.cancel()
close(job.sendDone)
return
}
job.resp = resp
job.err = err
if err == nil && job.resp.Code != http.StatusOK {
job.err = httpgrpc.Errorf(int(job.resp.Code), "%s", job.resp.Message)
}
job.cancel()
close(job.sendDone)
}
}
}()
Expand Down
160 changes: 140 additions & 20 deletions pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"fmt"
"net/http/httptest"
"strconv"
"testing"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/cortexpb"
Expand Down Expand Up @@ -155,10 +157,8 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
streamChan := make(chan *streamWriteJob, 1)

jobCtx, jobCancel := context.WithCancel(context.Background())
job := &streamWriteJob{
ctx: jobCtx,
cancel: jobCancel,
sendDone: make(chan struct{}),
}
streamChan <- job

Expand All @@ -178,6 +178,14 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) {
_, ok := <-client.streamPushChan
assert.False(t, ok, "stream channel should be closed")

// Verify job.sendDone was closed by Close()
select {
case <-job.sendDone:
// Success - sendDone was closed
case <-time.After(100 * time.Millisecond):
t.Error("job.sendDone was not closed")
}

// Verify context is cancelled
select {
case <-client.streamCtx.Done():
Expand All @@ -191,21 +199,11 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
streamChan := make(chan *streamWriteJob, 2)

job1Cancelled := false
job2Cancelled := false
job1Done := make(chan struct{})
job2Done := make(chan struct{})

job1 := &streamWriteJob{
ctx: context.Background(),
cancel: func() {
job1Cancelled = true
},
}
job2 := &streamWriteJob{
ctx: context.Background(),
cancel: func() {
job2Cancelled = true
},
}
job1 := &streamWriteJob{sendDone: job1Done}
job2 := &streamWriteJob{sendDone: job2Done}
streamChan <- job1
streamChan <- job2

Expand All @@ -230,9 +228,17 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
t.Error("stream context was not cancelled")
}

// Verify jobs were cancelled
assert.True(t, job1Cancelled, "job1 should have been cancelled")
assert.True(t, job2Cancelled, "job2 should have been cancelled")
// Verify jobs were closed (sendDone channels closed)
select {
case <-job1Done:
case <-time.After(500 * time.Millisecond):
t.Error("job1.sendDone was not closed")
}
select {
case <-job2Done:
case <-time.After(500 * time.Millisecond):
t.Error("job2.sendDone was not closed")
}
}

type mockClientStream struct {
Expand All @@ -249,6 +255,120 @@ func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}

// slowSendStream simulates a slow gRPC stream.
// Send() pre-computes the buffer size (mirroring gRPC codec step 1),
// sleeps for sendDelay so the caller's context deadline can fire first,
// then calls MarshalToSizedBuffer (gRPC codec step 2).
type slowSendStream struct {
grpc.ClientStream
sendDelay time.Duration
panicCh chan any
}

func (s *slowSendStream) Send(req *cortexpb.StreamWriteRequest) (retErr error) {
defer func() {
if r := recover(); r != nil {
s.panicCh <- r // forward the panic value to the test
} else {
s.panicCh <- nil
}
}()

// gRPC codec pre-computes buffer size.
size := req.Size()
buf := make([]byte, size)

// Sleep so the caller's ctx deadline fires and PushStreamConnection returns.
// After the sleep the caller may have grown the timeseries.
time.Sleep(s.sendDelay)

// marshal into the pre-allocated buffer.
// Panics when actual data > size (the bug).
_, err := req.MarshalToSizedBuffer(buf)
return err
}

func (s *slowSendStream) Recv() (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}

// TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows is an
// end-to-end regression test for the distributor panic.
func TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows(t *testing.T) {
const (
// ctxDeadline mirrors distributor.remote-timeout.
// Kept short here so the test completes quickly.
ctxDeadline = 20 * time.Millisecond
// sendDelay must exceed ctxDeadline so the deadline fires while Send()
// is still sleeping between Size() and MarshalToSizedBuffer().
sendDelay = 200 * time.Millisecond
)

ts := cortexpb.TimeseriesFromPool()
ts.Labels = append(ts.Labels,
cortexpb.LabelAdapter{Name: "__name__", Value: "test_metric"},
cortexpb.LabelAdapter{Name: "job", Value: "test"},
)
ts.Samples = append(ts.Samples, cortexpb.Sample{Value: 1.0, TimestampMs: 1000})

timeseries := cortexpb.PreallocTimeseriesSliceFromPool()
timeseries = append(timeseries, cortexpb.PreallocTimeseries{TimeSeries: ts})

writeReq := &cortexpb.WriteRequest{Timeseries: timeseries}

panicCh := make(chan any, 1)
stream := &slowSendStream{
sendDelay: sendDelay,
panicCh: panicCh,
}

mockIng := &mockIngester{}
mockIng.On("PushStream", mock.Anything, mock.Anything).Return(stream, nil)

streamCtx, streamCancel := context.WithCancel(context.Background())
defer streamCancel()

client := &closableHealthAndIngesterClient{
IngesterClient: mockIng,
conn: &mockClientConn{},
addr: "test-addr",
inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}),
streamCtx: streamCtx,
streamCancel: streamCancel,
streamPushChan: make(chan *streamWriteJob, 1),
}

workerCtx := user.InjectOrgID(streamCtx, "test-worker")
require.NoError(t, client.worker(workerCtx))

// Call PushStreamConnection with a context that expires before Send() finishes.
pushCtx, pushCancel := context.WithTimeout(
user.InjectOrgID(context.Background(), "test-tenant"),
ctxDeadline,
)
defer pushCancel()

// PushStreamConnection blocks until Send()+Recv() complete.
client.PushStreamConnection(pushCtx, writeReq) //nolint:errcheck

for i := range 100 {
ts.Labels = append(ts.Labels, cortexpb.LabelAdapter{
Name: fmt.Sprintf("extra_label_%d", i),
Value: fmt.Sprintf("extra_value_%d", i),
})
}

// No panic expected: Send() already completed before labels were appended.
select {
case panicVal := <-panicCh:
require.Nil(t, panicVal,
"unexpected panic in MarshalToSizedBuffer: the fix should prevent "+
"timeseries from being reused while Send() is still marshalling")
case <-time.After(sendDelay + time.Second):
t.Fatal("timed out waiting for Send() to complete")
}
}

func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
streamChan := make(chan *streamWriteJob)
Expand Down
Loading