diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 82d0f9ccb3..5a4f134c8a 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -289,7 +289,17 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now } // resolveConflicts allocates lot of memory, so if we can avoid it, do that. - if tokensChanged && conflictingTokensExist(thisIngesterMap) { + // Check when tokens changed or any ingester is in JOINING state (observe period). + shouldCheckConflicts := tokensChanged + if !shouldCheckConflicts { + for _, ing := range thisIngesterMap { + if ing.State == JOINING { + shouldCheckConflicts = true + break + } + } + } + if shouldCheckConflicts && conflictingTokensExist(thisIngesterMap) { resolveConflicts(thisIngesterMap) } @@ -781,23 +791,40 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { } else if oing.Timestamp == ing.Timestamp && ing.State != LEFT && oing.State == LEFT { // we accept LEFT even if timestamp hasn't changed toUpdated.Ingesters[name] = oing - if !tokensEqual(ing.Tokens, oing.Tokens) { - tokensChanged = true - } + } + } + } + + // Check for token conflicts when tokens changed or when any ingester is still + // in JOINING state (observe period). During the observe period, concurrent joins + // may have produced duplicate tokens that need resolution. + shouldCheckConflicts := tokensChanged + if !shouldCheckConflicts { + for _, ing := range out.Ingesters { + if ing.State == JOINING { + shouldCheckConflicts = true + break } } } // resolveConflicts allocates a lot of memory, so if we can avoid it, do that. - if tokensChanged && conflictingTokensExist(out.Ingesters) { + if shouldCheckConflicts && conflictingTokensExist(out.Ingesters) { resolveConflicts(out.Ingesters) - //Recheck if any instance was updated by the resolveConflict - //All ingesters in toUpdated have already passed the timestamp check, so we can skip checking again + // Refresh all entries already in toUpdated (their tokens may have changed). for name := range toUpdated.Ingesters { - //name must appear in out Ingesters, so we can skip the contains key check toUpdated.Ingesters[name] = out.Ingesters[name] } + // Also include any existing ingester whose tokens were changed by resolveConflicts. + for name, oing := range out.Ingesters { + if _, alreadyUpdated := toUpdated.Ingesters[name]; alreadyUpdated { + continue + } + if ing, ok := d.Ingesters[name]; ok && !tokensEqual(ing.Tokens, oing.Tokens) { + toUpdated.Ingesters[name] = oing + } + } } return toUpdated, toDelete, nil diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 16295ff354..2c71474778 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInstanceDesc_IsHealthy_ForIngesterOperations(t *testing.T) { @@ -746,6 +747,50 @@ func TestDesc_FindDifference(t *testing.T) { } } +func TestDesc_FindDifference_ConcurrentConflictResolutionIsDeterministic(t *testing.T) { + // Simulate two ingesters with duplicate tokens both doing CAS at the same time + // during the observe period (JOINING state). Both read the same DDB state (current) + // and produce the same new state (out). FindDifference must produce identical + // toUpdate results so they don't write conflicting resolutions to DDB. + current := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: JOINING}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: JOINING}, + }} + + // ing-A does a heartbeat CAS (only timestamp changes) + outA := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 110, State: JOINING}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: JOINING}, + }} + + // ing-B does a heartbeat CAS (only timestamp changes) + outB := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: JOINING}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 110, State: JOINING}, + }} + + toUpdateA, _, errA := current.FindDifference(outA) + toUpdateB, _, errB := current.FindDifference(outB) + + require.NoError(t, errA) + require.NoError(t, errB) + + // Both must resolve the conflict: ing-A wins tokens 1,2,3 (lower name), ing-B loses them. + updatedA := toUpdateA.(*Desc) + updatedB := toUpdateB.(*Desc) + + // ing-A's resolution should strip tokens 1,2,3 from ing-B + require.Contains(t, updatedA.Ingesters, "ing-B") + assert.Equal(t, []uint32{30, 40}, updatedA.Ingesters["ing-B"].Tokens) + + // ing-B's resolution should also strip tokens 1,2,3 from ing-B + require.Contains(t, updatedB.Ingesters, "ing-B") + assert.Equal(t, []uint32{30, 40}, updatedB.Ingesters["ing-B"].Tokens) + + // Both produce the same token assignment for ing-B — deterministic resolution. + assert.Equal(t, updatedA.Ingesters["ing-B"].Tokens, updatedB.Ingesters["ing-B"].Tokens) +} + func Test_resolveConflicts(t *testing.T) { tests := []struct { name string