diff --git a/CHANGELOG.md b/CHANGELOG.md index c539eb4973..9229afdc5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [BUGFIX] Security: Fix stored XSS vulnerability in Alertmanager and Store Gateway status pages by replacing `text/template` with `html/template`. #7512 * [BUGFIX] Security: Limit decompressed gzip output in `ParseProtoReader` and OTLP ingestion path. The decompressed body is now capped by `-distributor.otlp-max-recv-msg-size`. #7515 * [BUGFIX] Tenant Federation: Fix regex resolver clearing known users list when user scan fails. #7534 +* [BUGFIX] Distributor: Fix a panic (`slice bounds out of range`) in the stream push path when the context deadline expires while the worker goroutine is still marshalling a `WriteRequest`. #7541 ## 1.21.0 2026-04-24 diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 1744238dfd..8928ff4e2f 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -57,11 +57,10 @@ type HealthAndIngesterClient interface { } type streamWriteJob struct { - req *cortexpb.StreamWriteRequest - resp *cortexpb.WriteResponse - ctx context.Context - cancel context.CancelFunc - err error + req *cortexpb.StreamWriteRequest + resp *cortexpb.WriteResponse + err error + sendDone chan struct{} } type closableHealthAndIngesterClient struct { @@ -112,16 +111,12 @@ func (c *closableHealthAndIngesterClient) PushStreamConnection(ctx context.Conte Request: in, } - reqCtx, reqCancel := context.WithCancel(ctx) - defer reqCancel() - job := &streamWriteJob{ - req: streamReq, - ctx: reqCtx, - cancel: reqCancel, + req: streamReq, + sendDone: make(chan struct{}), } c.streamPushChan <- job - <-reqCtx.Done() + <-job.sendDone return job.resp, job.err }) } @@ -185,9 +180,11 @@ func (c *closableHealthAndIngesterClient) Close() error { if !ok { break drainingLoop } - if job != nil && job.cancel != nil { + if job != nil { job.err = errors.New("stream connection ingester client closing") - job.cancel() + if job.sendDone != nil { + close(job.sendDone) + } } default: close(c.streamPushChan) @@ -239,18 +236,18 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error { err = stream.Send(job.req) if err == io.EOF { job.resp = &cortexpb.WriteResponse{} - job.cancel() + close(job.sendDone) return } if err != nil { job.err = err - job.cancel() + close(job.sendDone) continue } resp, err := stream.Recv() if err == io.EOF { job.resp = &cortexpb.WriteResponse{} - job.cancel() + close(job.sendDone) return } job.resp = resp @@ -258,7 +255,7 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error { if err == nil && job.resp.Code != http.StatusOK { job.err = httpgrpc.Errorf(int(job.resp.Code), "%s", job.resp.Message) } - job.cancel() + close(job.sendDone) } } }() diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 4f8316147a..08520b9ae0 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -2,6 +2,7 @@ package client import ( "context" + "fmt" "net/http/httptest" "strconv" "testing" @@ -11,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/cortexpb" @@ -155,10 +157,8 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) streamChan := make(chan *streamWriteJob, 1) - jobCtx, jobCancel := context.WithCancel(context.Background()) job := &streamWriteJob{ - ctx: jobCtx, - cancel: jobCancel, + sendDone: make(chan struct{}), } streamChan <- job @@ -178,6 +178,14 @@ func TestClosableHealthAndIngesterClient_Close_WithActiveStream(t *testing.T) { _, ok := <-client.streamPushChan assert.False(t, ok, "stream channel should be closed") + // Verify job.sendDone was closed by Close() + select { + case <-job.sendDone: + // Success - sendDone was closed + case <-time.After(100 * time.Millisecond): + t.Error("job.sendDone was not closed") + } + // Verify context is cancelled select { case <-client.streamCtx.Done(): @@ -191,21 +199,11 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) streamChan := make(chan *streamWriteJob, 2) - job1Cancelled := false - job2Cancelled := false + job1Done := make(chan struct{}) + job2Done := make(chan struct{}) - job1 := &streamWriteJob{ - ctx: context.Background(), - cancel: func() { - job1Cancelled = true - }, - } - job2 := &streamWriteJob{ - ctx: context.Background(), - cancel: func() { - job2Cancelled = true - }, - } + job1 := &streamWriteJob{sendDone: job1Done} + job2 := &streamWriteJob{sendDone: job2Done} streamChan <- job1 streamChan <- job2 @@ -230,9 +228,17 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) { t.Error("stream context was not cancelled") } - // Verify jobs were cancelled - assert.True(t, job1Cancelled, "job1 should have been cancelled") - assert.True(t, job2Cancelled, "job2 should have been cancelled") + // Verify jobs were closed (sendDone channels closed) + select { + case <-job1Done: + case <-time.After(500 * time.Millisecond): + t.Error("job1.sendDone was not closed") + } + select { + case <-job2Done: + case <-time.After(500 * time.Millisecond): + t.Error("job2.sendDone was not closed") + } } type mockClientStream struct { @@ -249,6 +255,120 @@ func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) { return &cortexpb.WriteResponse{}, nil } +// slowSendStream simulates a slow gRPC stream. +// Send() pre-computes the buffer size (mirroring gRPC codec step 1), +// sleeps for sendDelay so the caller's context deadline can fire first, +// then calls MarshalToSizedBuffer (gRPC codec step 2). +type slowSendStream struct { + grpc.ClientStream + sendDelay time.Duration + panicCh chan any +} + +func (s *slowSendStream) Send(req *cortexpb.StreamWriteRequest) (retErr error) { + defer func() { + if r := recover(); r != nil { + s.panicCh <- r // forward the panic value to the test + } else { + s.panicCh <- nil + } + }() + + // gRPC codec pre-computes buffer size. + size := req.Size() + buf := make([]byte, size) + + // Sleep so the caller's ctx deadline fires and PushStreamConnection returns. + // After the sleep the caller may have grown the timeseries. + time.Sleep(s.sendDelay) + + // marshal into the pre-allocated buffer. + // Panics when actual data > size (the bug). + _, err := req.MarshalToSizedBuffer(buf) + return err +} + +func (s *slowSendStream) Recv() (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil +} + +// TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows is an +// end-to-end regression test for the distributor panic. +func TestPushStreamConnection_PanicWhenCtxExpiresAndTimeseriesGrows(t *testing.T) { + const ( + // ctxDeadline mirrors distributor.remote-timeout. + // Kept short here so the test completes quickly. + ctxDeadline = 20 * time.Millisecond + // sendDelay must exceed ctxDeadline so the deadline fires while Send() + // is still sleeping between Size() and MarshalToSizedBuffer(). + sendDelay = 200 * time.Millisecond + ) + + ts := cortexpb.TimeseriesFromPool() + ts.Labels = append(ts.Labels, + cortexpb.LabelAdapter{Name: "__name__", Value: "test_metric"}, + cortexpb.LabelAdapter{Name: "job", Value: "test"}, + ) + ts.Samples = append(ts.Samples, cortexpb.Sample{Value: 1.0, TimestampMs: 1000}) + + timeseries := cortexpb.PreallocTimeseriesSliceFromPool() + timeseries = append(timeseries, cortexpb.PreallocTimeseries{TimeSeries: ts}) + + writeReq := &cortexpb.WriteRequest{Timeseries: timeseries} + + panicCh := make(chan any, 1) + stream := &slowSendStream{ + sendDelay: sendDelay, + panicCh: panicCh, + } + + mockIng := &mockIngester{} + mockIng.On("PushStream", mock.Anything, mock.Anything).Return(stream, nil) + + streamCtx, streamCancel := context.WithCancel(context.Background()) + defer streamCancel() + + client := &closableHealthAndIngesterClient{ + IngesterClient: mockIng, + conn: &mockClientConn{}, + addr: "test-addr", + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), + streamCtx: streamCtx, + streamCancel: streamCancel, + streamPushChan: make(chan *streamWriteJob, 1), + } + + workerCtx := user.InjectOrgID(streamCtx, "test-worker") + require.NoError(t, client.worker(workerCtx)) + + // Call PushStreamConnection with a context that expires before Send() finishes. + pushCtx, pushCancel := context.WithTimeout( + user.InjectOrgID(context.Background(), "test-tenant"), + ctxDeadline, + ) + defer pushCancel() + + // PushStreamConnection blocks until Send()+Recv() complete. + client.PushStreamConnection(pushCtx, writeReq) //nolint:errcheck + + for i := range 100 { + ts.Labels = append(ts.Labels, cortexpb.LabelAdapter{ + Name: fmt.Sprintf("extra_label_%d", i), + Value: fmt.Sprintf("extra_value_%d", i), + }) + } + + // No panic expected: Send() already completed before labels were appended. + select { + case panicVal := <-panicCh: + require.Nil(t, panicVal, + "unexpected panic in MarshalToSizedBuffer: the fix should prevent "+ + "timeseries from being reused while Send() is still marshalling") + case <-time.After(sendDelay + time.Second): + t.Fatal("timed out waiting for Send() to complete") + } +} + func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) streamChan := make(chan *streamWriteJob)