Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [BUGFIX] Distributor: Return remote write V2 stats headers properly when the request is HA deduplicated. #7240
* [BUGFIX] Cache: Fix Redis Cluster EXECABORT error in MSet by using individual SET commands instead of transactions for cluster mode. #7262
* [BUGFIX] Distributor: Fix an `index out of range` panic in PRW2.0 handler caused by dirty metadata when reusing requests from `sync.Pool`. #7299
* [BUGFIX] Distributor: Fix data corruption in the push handler caused by shallow copying `Samples` and `Histograms` when converting Remote Write V2 requests to V1. #7337


## 1.20.1 2025-12-03
Expand Down
145 changes: 145 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package integration
import (
"math/rand"
"path"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -535,6 +536,150 @@ func Test_WriteStatWithReplication(t *testing.T) {
testPushHeader(t, writeStats, 20, 0, 0)
}

// This test verifies PRW1 and PRW2 memory pools do not interfere with each other.
func TestIngest_PRW2_MemoryIndependence(t *testing.T) {
const blockRangePeriod = 5 * time.Second

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
"-distributor.remote-writev2-enabled": "true",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})

// Start Cortex
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

cPRW1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-prw1")
require.NoError(t, err)

cPRW2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-prw2")
require.NoError(t, err)
var wg sync.WaitGroup

scrapeInterval := 5 * time.Second
end := time.Now()
start := end.Add(-time.Hour * 2)

expectedPushesPerProtocol := int(end.Sub(start) / scrapeInterval)

// We will concurrently push two distinct metrics using two different protocols.
// test_metric_prw1 is pushed via PRW1 with Value: 1.0
// test_metric_prw2 is pushed via PRW2 with Value: 999.0
// If the memory pool overlaps due to shallow copy during V2->V1 conversion,
// test_metric_prw1 will occasionally read 999.0.
wg.Add(2)

// Goroutine 1: Send PRW1 Requests
go func() {
defer wg.Done()
// Iterate from start to end by scrapeInterval
for t := start; t.Before(end); t = t.Add(scrapeInterval) {
ts := t.UnixMilli()

seriesV1 := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric_prw1"},
},
Samples: []prompb.Sample{
{Value: 1.0, Timestamp: ts},
},
},
}
_, _ = cPRW1.Push(seriesV1)
}
}()

// Goroutine 2: Send PRW2 Requests
go func() {
defer wg.Done()
// Iterate from start to end by scrapeInterval
for t := start; t.Before(end); t = t.Add(scrapeInterval) {
ts := t.UnixMilli()

symbols := []string{"", "__name__", "test_metric_prw2"}
seriesV2 := []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{{Value: 999.0, Timestamp: ts}},
},
}
_, _ = cPRW2.PushV2(symbols, seriesV2)
}
}()

// Wait for all concurrent pushes to finish
wg.Wait()

// Check PRW1 requests
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(expectedPushesPerProtocol)), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw1"))))
// Check PRW2 requests
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(expectedPushesPerProtocol)), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw2"))))

resultV1, err := cPRW1.QueryRange(`test_metric_prw1`, start, end, scrapeInterval)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, resultV1.Type())

matrixV1, ok := resultV1.(model.Matrix)
require.True(t, ok)
require.NotEmpty(t, matrixV1)

// Validate no data pollution occurred.
for _, series := range matrixV1 {
for _, sample := range series.Values {
assert.Equal(t, 1.0, float64(sample.Value), "Memory pool overlapped: PRW1 metric has been corrupted!")
}
}

resultV2, err := cPRW2.QueryRange(`test_metric_prw2`, start, end, scrapeInterval)
require.NoError(t, err)
matrixV2, ok := resultV2.(model.Matrix)
require.True(t, ok)
require.NotEmpty(t, matrixV2)

// Validate no data pollution occurred.
for _, series := range matrixV2 {
for _, sample := range series.Values {
assert.Equal(t, 999.0, float64(sample.Value), "Memory pool overlapped: PRW2 metric has been corrupted!")
}
}
}

func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSamples, expectedHistogram, expectedExemplars int) {
require.Equal(t, expectedSamples, stats.Samples)
require.Equal(t, expectedHistogram, stats.Histograms)
Expand Down
13 changes: 7 additions & 6 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,14 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni
return v1Req, err
}

ts := cortexpb.TimeseriesFromPool()
ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs)
ts.Samples = append(ts.Samples, v2Ts.Samples...)
ts.Exemplars = exemplars
ts.Histograms = append(ts.Histograms, v2Ts.Histograms...)

v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: cortexpb.FromLabelsToLabelAdapters(lbs),
Samples: v2Ts.Samples,
Exemplars: exemplars,
Histograms: v2Ts.Histograms,
},
TimeSeries: ts,
})

if shouldConvertV2Metadata(v2Ts.Metadata) {
Expand Down
56 changes: 56 additions & 0 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,19 @@ func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) {
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels)

for i := range v1Req.Timeseries {
if len(v1Req.Timeseries[i].Samples) == 0 {
v1Req.Timeseries[i].Samples = nil
}
if len(v1Req.Timeseries[i].Exemplars) == 0 {
v1Req.Timeseries[i].Exemplars = nil
}
if len(v1Req.Timeseries[i].Histograms) == 0 {
v1Req.Timeseries[i].Histograms = nil
}
}

require.NoError(t, err)
require.Equal(t, test.expectedV1Req, v1Req)
})
Expand Down Expand Up @@ -1168,3 +1181,46 @@ func TestHandler_RemoteWriteV2_MetadataPoolReset(t *testing.T) {
resp2 := sendRequest(&req2Proto)
require.Equal(t, http.StatusNoContent, resp2.Code)
}

func Test_convertV2RequestToV1_DeepCopy(t *testing.T) {
fh := tsdbutil.GenerateTestFloatHistogram(1)
ph := cortexpb.FloatHistogramToHistogramProto(4, fh)

v2Req := &cortexpb.PreallocWriteRequestV2{
WriteRequestV2: cortexpb.WriteRequestV2{
Symbols: []string{"", "__name__", "test_metric"},
Timeseries: []cortexpb.PreallocTimeseriesV2{
{
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: []uint32{1, 2},
Samples: []cortexpb.Sample{
{Value: 1.0, TimestampMs: 1000},
},
Exemplars: []cortexpb.ExemplarV2{
{LabelsRefs: []uint32{1, 2}, Value: 2.0, Timestamp: 1000},
},
Histograms: []cortexpb.Histogram{
ph,
},
},
},
},
},
}

v1Req, err := convertV2RequestToV1(v2Req, false)
require.NoError(t, err)
require.Len(t, v1Req.Timeseries, 1)

v1Ts := v1Req.Timeseries[0]
v2Ts := v2Req.Timeseries[0]

require.True(t, len(v1Ts.Samples) > 0 && len(v2Ts.Samples) > 0)
require.NotSame(t, &v1Ts.Samples[0], &v2Ts.Samples[0], "Samples array must not share the same memory address")

require.True(t, len(v1Ts.Exemplars) > 0 && len(v2Ts.Exemplars) > 0)
require.NotSame(t, &v1Ts.Exemplars[0], &v2Ts.Exemplars[0], "Exemplars array must not share the same memory address")

require.True(t, len(v1Ts.Histograms) > 0 && len(v2Ts.Histograms) > 0)
require.NotSame(t, &v1Ts.Histograms[0], &v2Ts.Histograms[0], "Histograms array must not share the same memory address")
}
Loading