Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,17 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now
}

// resolveConflicts allocates lot of memory, so if we can avoid it, do that.
if tokensChanged && conflictingTokensExist(thisIngesterMap) {
// Check when tokens changed or any ingester is in JOINING state (observe period).
shouldCheckConflicts := tokensChanged
if !shouldCheckConflicts {
for _, ing := range thisIngesterMap {
if ing.State == JOINING {
shouldCheckConflicts = true
break
}
}
}
if shouldCheckConflicts && conflictingTokensExist(thisIngesterMap) {
resolveConflicts(thisIngesterMap)
}

Expand Down Expand Up @@ -781,23 +791,40 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) {
} else if oing.Timestamp == ing.Timestamp && ing.State != LEFT && oing.State == LEFT {
// we accept LEFT even if timestamp hasn't changed
toUpdated.Ingesters[name] = oing
if !tokensEqual(ing.Tokens, oing.Tokens) {
tokensChanged = true
}
}
}
}

// Check for token conflicts when tokens changed or when any ingester is still
// in JOINING state (observe period). During the observe period, concurrent joins
// may have produced duplicate tokens that need resolution.
shouldCheckConflicts := tokensChanged
if !shouldCheckConflicts {
for _, ing := range out.Ingesters {
if ing.State == JOINING {
shouldCheckConflicts = true
break
}
}
}

// resolveConflicts allocates a lot of memory, so if we can avoid it, do that.
if tokensChanged && conflictingTokensExist(out.Ingesters) {
if shouldCheckConflicts && conflictingTokensExist(out.Ingesters) {
resolveConflicts(out.Ingesters)

//Recheck if any instance was updated by the resolveConflict
//All ingesters in toUpdated have already passed the timestamp check, so we can skip checking again
// Refresh all entries already in toUpdated (their tokens may have changed).
for name := range toUpdated.Ingesters {
//name must appear in out Ingesters, so we can skip the contains key check
toUpdated.Ingesters[name] = out.Ingesters[name]
}
// Also include any existing ingester whose tokens were changed by resolveConflicts.
for name, oing := range out.Ingesters {
if _, alreadyUpdated := toUpdated.Ingesters[name]; alreadyUpdated {
continue
}
if ing, ok := d.Ingesters[name]; ok && !tokensEqual(ing.Tokens, oing.Tokens) {
toUpdated.Ingesters[name] = oing
}
}
}

return toUpdated, toDelete, nil
Expand Down
45 changes: 45 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestInstanceDesc_IsHealthy_ForIngesterOperations(t *testing.T) {
Expand Down Expand Up @@ -746,6 +747,50 @@ func TestDesc_FindDifference(t *testing.T) {
}
}

func TestDesc_FindDifference_ConcurrentConflictResolutionIsDeterministic(t *testing.T) {
// Simulate two ingesters with duplicate tokens both doing CAS at the same time
// during the observe period (JOINING state). Both read the same DDB state (current)
// and produce the same new state (out). FindDifference must produce identical
// toUpdate results so they don't write conflicting resolutions to DDB.
current := &Desc{Ingesters: map[string]InstanceDesc{
"ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: JOINING},
"ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: JOINING},
}}

// ing-A does a heartbeat CAS (only timestamp changes)
outA := &Desc{Ingesters: map[string]InstanceDesc{
"ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 110, State: JOINING},
"ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: JOINING},
}}

// ing-B does a heartbeat CAS (only timestamp changes)
outB := &Desc{Ingesters: map[string]InstanceDesc{
"ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: JOINING},
"ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 110, State: JOINING},
}}

toUpdateA, _, errA := current.FindDifference(outA)
toUpdateB, _, errB := current.FindDifference(outB)

require.NoError(t, errA)
require.NoError(t, errB)

// Both must resolve the conflict: ing-A wins tokens 1,2,3 (lower name), ing-B loses them.
updatedA := toUpdateA.(*Desc)
updatedB := toUpdateB.(*Desc)

// ing-A's resolution should strip tokens 1,2,3 from ing-B
require.Contains(t, updatedA.Ingesters, "ing-B")
assert.Equal(t, []uint32{30, 40}, updatedA.Ingesters["ing-B"].Tokens)

// ing-B's resolution should also strip tokens 1,2,3 from ing-B
require.Contains(t, updatedB.Ingesters, "ing-B")
assert.Equal(t, []uint32{30, 40}, updatedB.Ingesters["ing-B"].Tokens)

// Both produce the same token assignment for ing-B — deterministic resolution.
assert.Equal(t, updatedA.Ingesters["ing-B"].Tokens, updatedB.Ingesters["ing-B"].Tokens)
}

func Test_resolveConflicts(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading