diff --git a/CHANGELOG.md b/CHANGELOG.md index 43f5dd18418..a55db1be965 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ * [BUGFIX] Distributor: Return remote write V2 stats headers properly when the request is HA deduplicated. #7240 * [BUGFIX] Cache: Fix Redis Cluster EXECABORT error in MSet by using individual SET commands instead of transactions for cluster mode. #7262 * [BUGFIX] Distributor: Fix an `index out of range` panic in PRW2.0 handler caused by dirty metadata when reusing requests from `sync.Pool`. #7299 +* [BUGFIX] Distributor: Fix data corruption in the push handler caused by shallow copying `Samples` and `Histograms` when converting Remote Write V2 requests to V1. #7337 ## 1.20.1 2025-12-03 diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index bc27a3cff3f..5d8cdbc72ca 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -5,6 +5,7 @@ package integration import ( "math/rand" "path" + "sync" "testing" "time" @@ -535,6 +536,150 @@ func Test_WriteStatWithReplication(t *testing.T) { testPushHeader(t, writeStats, 20, 0, 0) } +// This test verifies PRW1 and PRW2 memory pools do not interfere with each other. +func TestIngest_PRW2_MemoryIndependence(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + + // Start Cortex + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + cPRW1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-prw1") + require.NoError(t, err) + + cPRW2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-prw2") + require.NoError(t, err) + var wg sync.WaitGroup + + scrapeInterval := 5 * time.Second + end := time.Now() + start := end.Add(-time.Hour * 2) + + expectedPushesPerProtocol := int(end.Sub(start) / scrapeInterval) + + // We will concurrently push two distinct metrics using two different protocols. + // test_metric_prw1 is pushed via PRW1 with Value: 1.0 + // test_metric_prw2 is pushed via PRW2 with Value: 999.0 + // If the memory pool overlaps due to shallow copy during V2->V1 conversion, + // test_metric_prw1 will occasionally read 999.0. + wg.Add(2) + + // Goroutine 1: Send PRW1 Requests + go func() { + defer wg.Done() + // Iterate from start to end by scrapeInterval + for t := start; t.Before(end); t = t.Add(scrapeInterval) { + ts := t.UnixMilli() + + seriesV1 := []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric_prw1"}, + }, + Samples: []prompb.Sample{ + {Value: 1.0, Timestamp: ts}, + }, + }, + } + _, _ = cPRW1.Push(seriesV1) + } + }() + + // Goroutine 2: Send PRW2 Requests + go func() { + defer wg.Done() + // Iterate from start to end by scrapeInterval + for t := start; t.Before(end); t = t.Add(scrapeInterval) { + ts := t.UnixMilli() + + symbols := []string{"", "__name__", "test_metric_prw2"} + seriesV2 := []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{Value: 999.0, Timestamp: ts}}, + }, + } + _, _ = cPRW2.PushV2(symbols, seriesV2) + } + }() + + // Wait for all concurrent pushes to finish + wg.Wait() + + // Check PRW1 requests + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(expectedPushesPerProtocol)), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw1")))) + // Check PRW2 requests + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(expectedPushesPerProtocol)), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw2")))) + + resultV1, err := cPRW1.QueryRange(`test_metric_prw1`, start, end, scrapeInterval) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, resultV1.Type()) + + matrixV1, ok := resultV1.(model.Matrix) + require.True(t, ok) + require.NotEmpty(t, matrixV1) + + // Validate no data pollution occurred. + for _, series := range matrixV1 { + for _, sample := range series.Values { + assert.Equal(t, 1.0, float64(sample.Value), "Memory pool overlapped: PRW1 metric has been corrupted!") + } + } + + resultV2, err := cPRW2.QueryRange(`test_metric_prw2`, start, end, scrapeInterval) + require.NoError(t, err) + matrixV2, ok := resultV2.(model.Matrix) + require.True(t, ok) + require.NotEmpty(t, matrixV2) + + // Validate no data pollution occurred. + for _, series := range matrixV2 { + for _, sample := range series.Values { + assert.Equal(t, 999.0, float64(sample.Value), "Memory pool overlapped: PRW2 metric has been corrupted!") + } + } +} + func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSamples, expectedHistogram, expectedExemplars int) { require.Equal(t, expectedSamples, stats.Samples) require.Equal(t, expectedHistogram, stats.Histograms) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 71617496557..bbd5a3b7937 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -251,13 +251,14 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni return v1Req, err } + ts := cortexpb.TimeseriesFromPool() + ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs) + ts.Samples = append(ts.Samples, v2Ts.Samples...) + ts.Exemplars = exemplars + ts.Histograms = append(ts.Histograms, v2Ts.Histograms...) + v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{ - TimeSeries: &cortexpb.TimeSeries{ - Labels: cortexpb.FromLabelsToLabelAdapters(lbs), - Samples: v2Ts.Samples, - Exemplars: exemplars, - Histograms: v2Ts.Histograms, - }, + TimeSeries: ts, }) if shouldConvertV2Metadata(v2Ts.Metadata) { diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index eb21ca46ffd..c842fd4d48f 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -378,6 +378,19 @@ func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels) + + for i := range v1Req.Timeseries { + if len(v1Req.Timeseries[i].Samples) == 0 { + v1Req.Timeseries[i].Samples = nil + } + if len(v1Req.Timeseries[i].Exemplars) == 0 { + v1Req.Timeseries[i].Exemplars = nil + } + if len(v1Req.Timeseries[i].Histograms) == 0 { + v1Req.Timeseries[i].Histograms = nil + } + } + require.NoError(t, err) require.Equal(t, test.expectedV1Req, v1Req) }) @@ -1168,3 +1181,46 @@ func TestHandler_RemoteWriteV2_MetadataPoolReset(t *testing.T) { resp2 := sendRequest(&req2Proto) require.Equal(t, http.StatusNoContent, resp2.Code) } + +func Test_convertV2RequestToV1_DeepCopy(t *testing.T) { + fh := tsdbutil.GenerateTestFloatHistogram(1) + ph := cortexpb.FloatHistogramToHistogramProto(4, fh) + + v2Req := &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_metric"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + Samples: []cortexpb.Sample{ + {Value: 1.0, TimestampMs: 1000}, + }, + Exemplars: []cortexpb.ExemplarV2{ + {LabelsRefs: []uint32{1, 2}, Value: 2.0, Timestamp: 1000}, + }, + Histograms: []cortexpb.Histogram{ + ph, + }, + }, + }, + }, + }, + } + + v1Req, err := convertV2RequestToV1(v2Req, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries, 1) + + v1Ts := v1Req.Timeseries[0] + v2Ts := v2Req.Timeseries[0] + + require.True(t, len(v1Ts.Samples) > 0 && len(v2Ts.Samples) > 0) + require.NotSame(t, &v1Ts.Samples[0], &v2Ts.Samples[0], "Samples array must not share the same memory address") + + require.True(t, len(v1Ts.Exemplars) > 0 && len(v2Ts.Exemplars) > 0) + require.NotSame(t, &v1Ts.Exemplars[0], &v2Ts.Exemplars[0], "Exemplars array must not share the same memory address") + + require.True(t, len(v1Ts.Histograms) > 0 && len(v2Ts.Histograms) > 0) + require.NotSame(t, &v1Ts.Histograms[0], &v2Ts.Histograms[0], "Histograms array must not share the same memory address") +}