From 6a20148cdbaf6c2608bd9651bf8ac5c44610a74d Mon Sep 17 00:00:00 2001 From: Dmitry Fedoseev Date: Mon, 30 Mar 2026 22:14:41 +0300 Subject: [PATCH 1/5] pool: add Strategy interface for customizable connection selection Add Strategy interface to allow users choose how connections are selected in ConnectionPool. Previously, round-robin was hardcoded. RoundRobinStrategy is the default implementation matching existing behavior. Part of tarantool/go-tarantool#509 --- pool/round_robin_strategy.go | 113 +++++++++++++++++++++++++++++++++++ pool/strategy.go | 26 ++++++++ 2 files changed, 139 insertions(+) create mode 100644 pool/round_robin_strategy.go create mode 100644 pool/strategy.go diff --git a/pool/round_robin_strategy.go b/pool/round_robin_strategy.go new file mode 100644 index 000000000..e09e7cb88 --- /dev/null +++ b/pool/round_robin_strategy.go @@ -0,0 +1,113 @@ +package pool + +import ( + "sync" + "sync/atomic" + + "github.com/tarantool/go-tarantool/v3" +) + +// roundRobinStrategy implements a round-robin connection selection strategy. +// All connections are active and selected in round-robin order. +type roundRobinStrategy struct { + conns []*tarantool.Connection + indexById map[string]uint + mutex sync.RWMutex + current uint64 +} + +// newRoundRobinStrategy creates a new round-robin strategy. +func newRoundRobinStrategy(expectedSize int) *roundRobinStrategy { + return &roundRobinStrategy{ + conns: make([]*tarantool.Connection, 0, expectedSize), + indexById: make(map[string]uint, expectedSize), + current: 0, + } +} + +// Add adds or updates a connection with the given ID (upsert). +func (s *roundRobinStrategy) Add(id string, conn *tarantool.Connection) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if idx, exists := s.indexById[id]; exists { + s.conns[idx] = conn // Update existing + return + } + + s.indexById[id] = uint(len(s.conns)) + s.conns = append(s.conns, conn) +} + +// Get returns a connection by ID. +func (s *roundRobinStrategy) Get(id string) *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + return s.conns[index] +} + +// Remove removes a connection by ID. +func (s *roundRobinStrategy) Remove(id string) *tarantool.Connection { + s.mutex.Lock() + defer s.mutex.Unlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + delete(s.indexById, id) + + conn := s.conns[index] + s.conns = append(s.conns[:index], s.conns[index+1:]...) + + for id, idx := range s.indexById { + if idx > index { + s.indexById[id] = idx - 1 + } + } + + return conn +} + +// Next returns the next connection in round-robin order. +func (s *roundRobinStrategy) Next() *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if len(s.conns) == 0 { + return nil + } + + return s.conns[s.nextIndex()] +} + +// Connections returns a map of all connections by their ID. +func (s *roundRobinStrategy) Connections() map[string]*tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + result := make(map[string]*tarantool.Connection, len(s.indexById)) + for id, index := range s.indexById { + result[id] = s.conns[index] + } + return result +} + +// Len returns the number of connections managed by this strategy. +func (s *roundRobinStrategy) Len() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.conns) +} + +func (s *roundRobinStrategy) nextIndex() uint64 { + next := atomic.AddUint64(&s.current, 1) + return (next - 1) % uint64(len(s.conns)) +} diff --git a/pool/strategy.go b/pool/strategy.go new file mode 100644 index 000000000..efcf95da6 --- /dev/null +++ b/pool/strategy.go @@ -0,0 +1,26 @@ +package pool + +import "github.com/tarantool/go-tarantool/v3" + +// Strategy defines the interface for connection selection strategies. +// Strategies own connections directly and provide round-robin or other +// selection algorithms. +type Strategy interface { + // Add adds a connection with the given ID to the strategy. + Add(id string, conn *tarantool.Connection) + + // Remove removes a connection by ID. + Remove(id string) *tarantool.Connection + + // Get returns a connection by ID. + Get(id string) *tarantool.Connection + + // Next returns the next connection according to the strategy's algorithm. + Next() *tarantool.Connection + + // Connections returns all connections managed by this strategy. + Connections() map[string]*tarantool.Connection + + // Len returns the number of connections managed by this strategy. + Len() int +} From 2d19e5ac45d420a22b09ff253afc09fa5526ef99 Mon Sep 17 00:00:00 2001 From: Dmitry Fedoseev Date: Tue, 31 Mar 2026 13:29:23 +0300 Subject: [PATCH 2/5] pool: add ActiveStandbyStrategy with active/standby separation Add an strategy that limits how many connections receive traffic. Useful when you have many replicas but want to limit active connections to a few (e.g., reduce load on databases). Only the first N connections become "active" and receive requests. Remaining connections are "standby" and get promoted when active connections are removed. Connections() returns only active connections since that's what Next() can potentially return. Part of tarantool/go-tarantool#509 --- pool/active_standby_strategy.go | 203 ++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 pool/active_standby_strategy.go diff --git a/pool/active_standby_strategy.go b/pool/active_standby_strategy.go new file mode 100644 index 000000000..4d0c56a89 --- /dev/null +++ b/pool/active_standby_strategy.go @@ -0,0 +1,203 @@ +package pool + +import ( + "sync" + "sync/atomic" + + "github.com/tarantool/go-tarantool/v3" +) + +// activeStandbyStrategy implements a strategy with active/standby separation. +// Only a subset of connections (primary) are active and receive traffic. +// Standby connections are promoted when active ones are removed. +type activeStandbyStrategy struct { + primaryCount int + + // All connections + conns []*tarantool.Connection + indexById map[string]uint + + // Active connections for round-robin + activeConns []*tarantool.Connection + activeIndex map[string]uint + + mutex sync.RWMutex + current uint64 +} + +// newActiveStandbyStrategy creates a new active/standby strategy. +// primaryCount is the maximum number of active connections. +// expectedSize is used for pre-allocation. +func newActiveStandbyStrategy(primaryCount int, expectedSize int) *activeStandbyStrategy { + return &activeStandbyStrategy{ + primaryCount: primaryCount, + conns: make([]*tarantool.Connection, 0, expectedSize), + indexById: make(map[string]uint, expectedSize), + activeConns: make([]*tarantool.Connection, 0, primaryCount), + activeIndex: make(map[string]uint, primaryCount), + } +} + +// Add adds or updates a connection with the given ID (upsert). +func (s *activeStandbyStrategy) Add(id string, conn *tarantool.Connection) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if idx, exists := s.indexById[id]; exists { + s.conns[idx] = conn + if activeIdx, active := s.activeIndex[id]; active { + s.activeConns[activeIdx] = conn + } + return + } + + s.indexById[id] = uint(len(s.conns)) + s.conns = append(s.conns, conn) + + if len(s.activeConns) < s.primaryCount { + s.addToActiveLocked(id) + } +} + +// Get returns a connection by ID. +func (s *activeStandbyStrategy) Get(id string) *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + return s.conns[index] +} + +// Remove removes a connection by ID. +func (s *activeStandbyStrategy) Remove(id string) *tarantool.Connection { + s.mutex.Lock() + defer s.mutex.Unlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + conn := s.conns[index] + + if _, active := s.activeIndex[id]; active { + s.removeFromActiveLocked(id) + } + + delete(s.indexById, id) + s.conns = append(s.conns[:index], s.conns[index+1:]...) + + for id, idx := range s.indexById { + if idx > index { + s.indexById[id] = idx - 1 + } + } + + s.promoteStandbyLocked() + + return conn +} + +// Next returns the next active connection in round-robin order. +func (s *activeStandbyStrategy) Next() *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if len(s.activeConns) == 0 { + return nil + } + + return s.activeConns[s.nextIndex()] +} + +func (s *activeStandbyStrategy) nextIndex() uint64 { + next := atomic.AddUint64(&s.current, 1) + return (next - 1) % uint64(len(s.activeConns)) +} + +// Connections returns eligible connections (active only). +func (s *activeStandbyStrategy) Connections() map[string]*tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + result := make(map[string]*tarantool.Connection, len(s.activeIndex)) + for id, idx := range s.activeIndex { + result[id] = s.activeConns[idx] + } + return result +} + +func (s *activeStandbyStrategy) addToActiveLocked(id string) { + if _, active := s.activeIndex[id]; active { + return + } + + index, exists := s.indexById[id] + if !exists { + return + } + + s.activeIndex[id] = uint(len(s.activeConns)) + s.activeConns = append(s.activeConns, s.conns[index]) +} + +func (s *activeStandbyStrategy) removeFromActiveLocked(id string) { + index, active := s.activeIndex[id] + if !active { + return + } + + delete(s.activeIndex, id) + s.activeConns = append(s.activeConns[:index], s.activeConns[index+1:]...) + + for id, idx := range s.activeIndex { + if idx > index { + s.activeIndex[id] = idx - 1 + } + } +} + +func (s *activeStandbyStrategy) promoteStandbyLocked() { + for len(s.activeConns) < s.primaryCount { + promoted := false + for id := range s.indexById { + if _, active := s.activeIndex[id]; active { + continue + } + + s.addToActiveLocked(id) + promoted = true + break + } + + if !promoted { + break + } + } +} + +// StandbyCount returns the number of standby connections. +func (s *activeStandbyStrategy) StandbyCount() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.conns) - len(s.activeConns) +} + +// IsActive checks if a connection is active. +func (s *activeStandbyStrategy) IsActive(id string) bool { + s.mutex.RLock() + defer s.mutex.RUnlock() + _, active := s.activeIndex[id] + return active +} + +// Len returns the number of active connections managed by this strategy. +func (s *activeStandbyStrategy) Len() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.activeConns) +} From 17603b87de8d803e61e9661e0b394b23148c374e Mon Sep 17 00:00:00 2001 From: Dmitry Fedoseev Date: Tue, 31 Mar 2026 13:36:34 +0300 Subject: [PATCH 3/5] pool: add Store, Selector and StrategyBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Store centralizes connection state (role, health) in one place. Previously, state was scattered across multiple components, requiring manual synchronization which was error-prone. Selector connects Store to strategies. It observes Store changes and updates the appropriate strategy. Unhealthy connections are excluded from all strategies. This keeps strategies simple—they don't need to know about health. StrategyBuilder lets users choose strategy implementations: - RoundRobinBuilder: round-robin for all modes (default) - ActiveStandbyBuilder: active/standby for all modes Part of tarantool/go-tarantool#509 --- pool/selector.go | 166 ++++++++++++++++++++++++++++++++++ pool/store.go | 190 +++++++++++++++++++++++++++++++++++++++ pool/strategy_builder.go | 43 +++++++++ 3 files changed, 399 insertions(+) create mode 100644 pool/selector.go create mode 100644 pool/store.go create mode 100644 pool/strategy_builder.go diff --git a/pool/selector.go b/pool/selector.go new file mode 100644 index 000000000..ece63ebdc --- /dev/null +++ b/pool/selector.go @@ -0,0 +1,166 @@ +package pool + +import ( + "github.com/tarantool/go-tarantool/v3" +) + +// selector routes connections to appropriate strategies based on role. +// It observes the store for changes and updates strategies accordingly. +type selector struct { + rwStrategy Strategy + roStrategy Strategy + anyStrategy Strategy +} + +// newSelector creates a new selector. +func newSelector( + store *Store, + rwStrategy Strategy, + roStrategy Strategy, + anyStrategy Strategy, +) *selector { + s := &selector{ + rwStrategy: rwStrategy, + roStrategy: roStrategy, + anyStrategy: anyStrategy, + } + + store.AddObserver(s) + + return s +} + +// OnUpsert implements StoreObserver. +func (s *selector) OnUpsert(name string, curr Entry, prev Entry, existed bool) { + applyOne(s.anyStrategy, name, curr, prev, existed, eligibleAny) + applyOne(s.rwStrategy, name, curr, prev, existed, eligibleRW) + applyOne(s.roStrategy, name, curr, prev, existed, eligibleRO) +} + +// OnRemove implements StoreObserver. +func (s *selector) OnRemove(name string) { + s.anyStrategy.Remove(name) + s.rwStrategy.Remove(name) + s.roStrategy.Remove(name) +} + +// Select returns a connection based on the mode. +func (s *selector) Select(mode Mode) (*tarantool.Connection, error) { + switch mode { + case RW: + if conn := s.rwStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoRwInstance + + case RO: + if conn := s.roStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoRoInstance + + case PreferRW: + if conn := s.rwStrategy.Next(); conn != nil { + return conn, nil + } + if conn := s.roStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoHealthyInstance + + case PreferRO: + if conn := s.roStrategy.Next(); conn != nil { + return conn, nil + } + if conn := s.rwStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoHealthyInstance + + default: // ANY + if conn := s.anyStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoHealthyInstance + } +} + +// Get returns a connection by name. +func (s *selector) Get(name string) *tarantool.Connection { + return s.anyStrategy.Get(name) +} + +// Connections returns all eligible connections. +func (s *selector) Connections() map[string]*tarantool.Connection { + return s.anyStrategy.Connections() +} + +// ConnectionsByMode returns connections filtered by mode. +func (s *selector) ConnectionsByMode(mode Mode) map[string]*tarantool.Connection { + switch mode { + case RW: + return s.rwStrategy.Connections() + case RO: + return s.roStrategy.Connections() + default: + return s.anyStrategy.Connections() + } +} + +// IsEmpty checks if there are no connections for the given mode. +func (s *selector) IsEmpty(mode Mode) bool { + switch mode { + case ANY: + return s.anyStrategy.Len() == 0 + case RW: + return s.rwStrategy.Len() == 0 + case RO: + return s.roStrategy.Len() == 0 + case PreferRW, PreferRO: + return s.rwStrategy.Len() == 0 && s.roStrategy.Len() == 0 + default: + return true + } +} + +func eligibleAny(e Entry) bool { + return e.Conn != nil && e.healthy +} + +func eligibleRW(e Entry) bool { + return e.Conn != nil && e.healthy && e.Role == MasterRole +} + +func eligibleRO(e Entry) bool { + return e.Conn != nil && e.healthy && e.Role == ReplicaRole +} + +func applyOne( + strategy Strategy, + name string, + curr, prev Entry, + existed bool, + eligible func(Entry) bool, +) { + newOk := eligible(curr) + oldOk := existed && eligible(prev) + + switch { + case !oldOk && newOk: + // became eligible => add + strategy.Add(name, curr.Conn) + + case oldOk && !newOk: + // became ineligible => remove + strategy.Remove(name) + + case oldOk && newOk: + // still eligible; if conn replaced => update pointer + if prev.Conn != curr.Conn { + strategy.Add(name, curr.Conn) + } + + default: + // still ineligible => do nothing + } +} diff --git a/pool/store.go b/pool/store.go new file mode 100644 index 000000000..c170baca0 --- /dev/null +++ b/pool/store.go @@ -0,0 +1,190 @@ +package pool + +import ( + "sync" + + "github.com/tarantool/go-tarantool/v3" +) + +// Entry represents a connection with its metadata. +type Entry struct { + Conn *tarantool.Connection + Role Role + healthy bool +} + +// StoreObserver is notified when store changes. +type StoreObserver interface { + // OnUpsert is called when a connection is added or updated. + // name is the connection name. + // curr is the current entry, prev is the previous entry (if existed). + // existed is true if the entry already existed. + OnUpsert(name string, curr Entry, prev Entry, existed bool) + + // OnRemove is called when a connection is removed. + // name is the connection name. + OnRemove(name string) +} + +// Store is a thread-safe storage for connection entries. +// It notifies observers about changes via StoreObserver interface. +type Store struct { + mu sync.RWMutex + entries map[string]Entry + observers []StoreObserver +} + +// NewStore creates a new Store. +func NewStore() *Store { + return &Store{ + entries: make(map[string]Entry), + } +} + +// AddObserver registers an observer. +// Observers are added only during initialization and never removed. +func (s *Store) AddObserver(observer StoreObserver) { + s.observers = append(s.observers, observer) +} + +// Get returns an entry by name. +func (s *Store) Get(name string) (Entry, bool) { + s.mu.RLock() + e, ok := s.entries[name] + s.mu.RUnlock() + return e, ok +} + +// Upsert adds or updates a connection entry. +// It notifies observers with the current and previous entry. +func (s *Store) Upsert(name string, conn *tarantool.Connection, role Role) (Entry, bool) { + s.mu.Lock() + + var prev Entry + var existed bool + if e, ok := s.entries[name]; ok { + prev = e + existed = true + } + + curr := Entry{ + Conn: conn, + Role: role, + healthy: true, + } + s.entries[name] = curr + + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnUpsert(name, curr, prev, existed) + } + + return prev, existed +} + +// UpdateHealth updates the health status of a connection. +func (s *Store) UpdateHealth(name string, healthy bool) bool { + s.mu.Lock() + + e, ok := s.entries[name] + if !ok { + s.mu.Unlock() + return false + } + + if e.healthy == healthy { + s.mu.Unlock() + return true // No change + } + + prev := e + e.healthy = healthy + s.entries[name] = e + curr := e + + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnUpsert(name, curr, prev, true) + } + + return true +} + +// UpdateRole updates the role of a connection. +func (s *Store) UpdateRole(name string, role Role) bool { + s.mu.Lock() + + e, ok := s.entries[name] + if !ok { + s.mu.Unlock() + return false + } + + if e.Role == role { + s.mu.Unlock() + return true // No change + } + + prev := e + e.Role = role + s.entries[name] = e + curr := e + + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnUpsert(name, curr, prev, true) + } + + return true +} + +// Remove removes a connection entry. +func (s *Store) Remove(name string) (Entry, bool) { + s.mu.Lock() + + old, ok := s.entries[name] + if !ok { + s.mu.Unlock() + return Entry{}, false + } + + delete(s.entries, name) + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnRemove(name) + } + + return old, true +} + +// All returns all entries. +func (s *Store) All() map[string]Entry { + s.mu.RLock() + result := make(map[string]Entry, len(s.entries)) + for k, v := range s.entries { + result[k] = v + } + s.mu.RUnlock() + return result +} + +// AllHealthy returns all entries with healthy status. +func (s *Store) AllHealthy() map[string]Entry { + s.mu.RLock() + result := make(map[string]Entry) + for k, v := range s.entries { + if v.healthy { + result[k] = v + } + } + s.mu.RUnlock() + return result +} diff --git a/pool/strategy_builder.go b/pool/strategy_builder.go new file mode 100644 index 000000000..9b41be1e8 --- /dev/null +++ b/pool/strategy_builder.go @@ -0,0 +1,43 @@ +package pool + +// StrategyBuilder creates strategies for the connection pool. +// It allows users to customize how connections are selected. +type StrategyBuilder interface { + // Build creates strategies for RW, RO, and ANY modes. + // ExpectedSize is the expected number of connections for pre-allocation. + Build(expectedSize int) (rw, ro, any Strategy) +} + +// RoundRobinBuilder creates round-robin strategies for all modes. +// This is the default strategy builder. +type RoundRobinBuilder struct{} + +// Build creates round-robin strategies. +func (b RoundRobinBuilder) Build(expectedSize int) (rw, ro, any Strategy) { + return newRoundRobinStrategy(expectedSize), + newRoundRobinStrategy(expectedSize), + newRoundRobinStrategy(expectedSize) +} + +// ActiveStandbyBuilder creates active/standby strategies for all modes. +// +// ActiveStandby maintains a limited number of "active" connections that +// receive traffic. When an active connection is removed, a standby +// connection is automatically promoted. +type ActiveStandbyBuilder struct { + // RWPrimaryCount is the maximum number of active connections for RW mode. + RWPrimaryCount int + + // ROPrimaryCount is the maximum number of active connections for RO mode. + ROPrimaryCount int +} + +// Build creates active/standby strategies for all modes. +// AnyPrimaryCount is calculated as RWPrimaryCount + ROPrimaryCount. +func (b ActiveStandbyBuilder) Build(expectedSize int) (rw, ro, any Strategy) { + anyPrimaryCount := b.RWPrimaryCount + b.ROPrimaryCount + + return newActiveStandbyStrategy(b.RWPrimaryCount, expectedSize), + newActiveStandbyStrategy(b.ROPrimaryCount, expectedSize), + newActiveStandbyStrategy(anyPrimaryCount, expectedSize) +} From b5cef18f88b714248f4f9b43b65eb44616ff808c Mon Sep 17 00:00:00 2001 From: Dmitry Fedoseev Date: Tue, 31 Mar 2026 14:07:49 +0300 Subject: [PATCH 4/5] refactor: use Store and Selector in ConnectionPool Replace direct pool manipulation with Store + Selector architecture. Previously, Pool updated rwPool/roPool/anyPool directly. Now it updates Store, and Selector propagates changes to strategies. New Opts.StrategyBuilder option lets users choose strategy type. Default is RoundRobinBuilder (same behavior as before). Remove old round_robin.go, replaced by RoundRobinStrategy. Public API unchanged. Part of tarantool/go-tarantool#509 --- pool/connection_pool.go | 287 ++++++++++++++++++---------------------- pool/round_robin.go | 112 ---------------- 2 files changed, 132 insertions(+), 267 deletions(-) delete mode 100644 pool/round_robin.go diff --git a/pool/connection_pool.go b/pool/connection_pool.go index cb7b530ec..d47da590c 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -81,6 +81,9 @@ type Opts struct { CheckTimeout time.Duration // ConnectionHandler provides an ability to handle connection updates. ConnectionHandler ConnectionHandler + // StrategyBuilder creates strategies for connection selection. + // If nil, RoundRobinBuilder is used by default. + StrategyBuilder StrategyBuilder } /* @@ -111,9 +114,8 @@ type ConnectionPool struct { state state done chan struct{} - roPool *roundRobinStrategy - rwPool *roundRobinStrategy - anyPool *roundRobinStrategy + store *Store + selector *selector poolsMutex sync.RWMutex watcherContainer watcherContainer } @@ -167,18 +169,24 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, } size := len(instances) - rwPool := newRoundRobinStrategy(size) - roPool := newRoundRobinStrategy(size) - anyPool := newRoundRobinStrategy(size) + + store := NewStore() + + builder := opts.StrategyBuilder + if builder == nil { + builder = RoundRobinBuilder{} + } + + rwStrategy, roStrategy, anyStrategy := builder.Build(size) + selector := newSelector(store, rwStrategy, roStrategy, anyStrategy) p := &ConnectionPool{ - ends: make(map[string]*endpoint), - opts: opts, - state: connectedState, - done: make(chan struct{}), - rwPool: rwPool, - roPool: roPool, - anyPool: anyPool, + ends: make(map[string]*endpoint), + opts: opts, + state: connectedState, + done: make(chan struct{}), + store: store, + selector: selector, } fillCtx, fillCancel := context.WithCancel(ctx) @@ -242,20 +250,7 @@ func (p *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { if p.state.get() != connectedState { return false, nil } - switch mode { - case ANY: - return !p.anyPool.IsEmpty(), nil - case RW: - return !p.rwPool.IsEmpty(), nil - case RO: - return !p.roPool.IsEmpty(), nil - case PreferRW: - fallthrough - case PreferRO: - return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil - default: - return false, ErrNoHealthyInstance - } + return !p.selector.IsEmpty(mode), nil } // ConfiguredTimeout gets timeout of current connection. @@ -498,14 +493,8 @@ func (p *ConnectionPool) NewWatcher(key string, watcher.container.add(watcher) - rr := p.anyPool - if mode == RW { - rr = p.rwPool - } else if mode == RO { - rr = p.roPool - } + conns := p.selector.ConnectionsByMode(mode) - conns := rr.GetConnections() for _, conn := range conns { // Check that connection supports watchers. if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS, conn.ProtocolInfo().Features) { @@ -524,7 +513,7 @@ func (p *ConnectionPool) NewWatcher(key string, // the argument of type Mode is unused. func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) tarantool.Future { if connectedReq, ok := req.(tarantool.ConnectedRequest); ok { - conns := p.anyPool.GetConnections() + conns := p.selector.Connections() isOurConnection := false for _, conn := range conns { // Compare raw pointers. @@ -548,7 +537,7 @@ func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) tarantool.Futu // DoInstance sends the request into a target instance and returns a future. func (p *ConnectionPool) DoInstance(req tarantool.Request, name string) tarantool.Future { - conn := p.anyPool.GetConnection(name) + conn := p.selector.Get(name) if conn == nil { return tarantool.NewFutureWithErr(nil, ErrNoHealthyInstance) } @@ -611,31 +600,28 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er } func (p *ConnectionPool) getConnectionFromPool(name string) (*tarantool.Connection, Role) { - if conn := p.rwPool.GetConnection(name); conn != nil { - return conn, MasterRole - } - - if conn := p.roPool.GetConnection(name); conn != nil { - return conn, ReplicaRole + entry, ok := p.store.Get(name) + if !ok { + return nil, UnknownRole } - - return p.anyPool.GetConnection(name), UnknownRole + return entry.Conn, entry.Role } func (p *ConnectionPool) deleteConnection(name string) { - if conn := p.anyPool.DeleteConnection(name); conn != nil { - if conn := p.rwPool.DeleteConnection(name); conn == nil { - p.roPool.DeleteConnection(name) - } - // The internal connection deinitialization. - p.watcherContainer.mutex.RLock() - defer p.watcherContainer.mutex.RUnlock() - - _ = p.watcherContainer.foreach(func(watcher *poolWatcher) error { - watcher.unwatch(conn) - return nil - }) + entry, ok := p.store.Remove(name) + if !ok { + return } + conn := entry.Conn + + // The internal connection deinitialization. + p.watcherContainer.mutex.RLock() + defer p.watcherContainer.mutex.RUnlock() + + _ = p.watcherContainer.foreach(func(watcher *poolWatcher) error { + watcher.unwatch(conn) + return nil + }) } func (p *ConnectionPool) addConnection(name string, @@ -673,14 +659,8 @@ func (p *ConnectionPool) addConnection(name string, } } - p.anyPool.AddConnection(name, conn) + p.store.Upsert(name, conn, role) - switch role { - case MasterRole: - p.rwPool.AddConnection(name, conn) - case ReplicaRole: - p.roPool.AddConnection(name, conn) - } return nil } @@ -750,39 +730,26 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { if role, err := p.getConnectionRole(e.conn); err == nil { if e.role != role { - p.deleteConnection(e.name) + oldRole := e.role + p.store.UpdateRole(e.name, role) + + // Update watchers for the connection. + p.updateConnectionWatchers(e.conn, oldRole, role) + p.poolsMutex.Unlock() - p.handlerDeactivated(e.name, e.conn, e.role) + p.handlerDeactivated(e.name, e.conn, oldRole) opened := p.handlerDiscovered(e.name, e.conn, role) if !opened { _ = e.conn.Close() + p.store.Remove(e.name) e.conn = nil e.role = UnknownRole return } - p.poolsMutex.Lock() - if p.state.get() != connectedState { - p.poolsMutex.Unlock() - - _ = e.conn.Close() - p.handlerDeactivated(e.name, e.conn, role) - e.conn = nil - e.role = UnknownRole - return - } - - if p.addConnection(e.name, e.conn, role) != nil { - p.poolsMutex.Unlock() - - _ = e.conn.Close() - p.handlerDeactivated(e.name, e.conn, role) - e.conn = nil - e.role = UnknownRole - return - } e.role = role + return } p.poolsMutex.Unlock() return @@ -798,6 +765,44 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { } } +// updateConnectionWatchers updates watchers when connection role changes. +func (p *ConnectionPool) updateConnectionWatchers(conn *tarantool.Connection, + oldRole, newRole Role) { + // Check if connection supports watchers + if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS, conn.ProtocolInfo().Features) { + return + } + + p.watcherContainer.mutex.RLock() + defer p.watcherContainer.mutex.RUnlock() + + _ = p.watcherContainer.foreach(func(watcher *poolWatcher) error { + var wasWatching, shouldWatch bool + + switch watcher.mode { + case RW: + wasWatching = oldRole == MasterRole + shouldWatch = newRole == MasterRole + case RO: + wasWatching = oldRole == ReplicaRole + shouldWatch = newRole == ReplicaRole + default: // ANY, PreferRW, PreferRO + wasWatching = oldRole == MasterRole || oldRole == ReplicaRole + shouldWatch = newRole == MasterRole || newRole == ReplicaRole + } + + if wasWatching && !shouldWatch { + watcher.unwatch(conn) + } else if !wasWatching && shouldWatch { + if err := watcher.watch(conn); err != nil { + log.Printf("tarantool: failed to watch for %s after role change: %s", conn.Addr().String(), err) + } + } + + return nil + }) +} + func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { e.conn = nil e.role = UnknownRole @@ -856,26 +861,6 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { return err } -func (p *ConnectionPool) reconnect(ctx context.Context, e *endpoint) { - p.poolsMutex.Lock() - - if p.state.get() != connectedState { - p.poolsMutex.Unlock() - return - } - - p.deleteConnection(e.name) - p.poolsMutex.Unlock() - - p.handlerDeactivated(e.name, e.conn, e.role) - e.conn = nil - e.role = UnknownRole - - if err := p.tryConnect(ctx, e); err != nil { - log.Printf("tarantool: reconnect to %s failed: %s\n", e.name, err) - } -} - func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { timer := time.NewTicker(p.opts.CheckTimeout) defer timer.Stop() @@ -943,33 +928,17 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { // Will be processed at an upper level. case <-e.shutdown: // Will be processed at an upper level. - case <-e.notify: - if e.conn != nil && e.conn.ClosedNow() { - p.poolsMutex.Lock() - if p.state.get() == connectedState { - p.deleteConnection(e.name) - p.poolsMutex.Unlock() - p.handlerDeactivated(e.name, e.conn, e.role) - e.conn = nil - e.role = UnknownRole - } else { - p.poolsMutex.Unlock() - } - } + case event := <-e.notify: + p.handleConnEvent(e, event) case <-timer.C: - // Reopen connection. - // Relocate connection between subpools - // if ro/rw was updated. - switch { - case e.conn == nil: + // Check for role updates and reconnect if needed. + if e.conn == nil { + // Try to reconnect if connection is nil. if err := p.tryConnect(ctx, e); err != nil { - log.Printf("tarantool: reopen connection to %s failed: %s\n", - e.name, err) + log.Printf("tarantool: reconnect to %s failed: %s\n", e.name, err) } - case !e.conn.ClosedNow(): + } else if !e.conn.ClosedNow() { p.updateConnection(e) - default: - p.reconnect(ctx, e) } } } @@ -977,38 +946,46 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { } } -func (p *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { - switch mode { - case ANY: - if next := p.anyPool.GetNextConnection(); next != nil { - return next, nil - } - case RW: - if next := p.rwPool.GetNextConnection(); next != nil { - return next, nil - } - return nil, ErrNoRwInstance - case RO: - if next := p.roPool.GetNextConnection(); next != nil { - return next, nil - } - return nil, ErrNoRoInstance - case PreferRW: - if next := p.rwPool.GetNextConnection(); next != nil { - return next, nil +// handleConnEvent processes connection events from Notify channel. +func (p *ConnectionPool) handleConnEvent(e *endpoint, event tarantool.ConnEvent) { + switch event.Kind { + case tarantool.Disconnected: + p.poolsMutex.Lock() + if p.state.get() == connectedState { + p.store.UpdateHealth(e.name, false) } - if next := p.roPool.GetNextConnection(); next != nil { - return next, nil + p.poolsMutex.Unlock() + + case tarantool.ReconnectFailed: + p.poolsMutex.Lock() + if p.state.get() == connectedState { + p.store.UpdateHealth(e.name, false) } - case PreferRO: - if next := p.roPool.GetNextConnection(); next != nil { - return next, nil + p.poolsMutex.Unlock() + + case tarantool.Connected: + p.poolsMutex.Lock() + if p.state.get() == connectedState { + p.store.UpdateHealth(e.name, true) } - if next := p.rwPool.GetNextConnection(); next != nil { - return next, nil + p.poolsMutex.Unlock() + + case tarantool.Closed: + p.poolsMutex.Lock() + if p.state.get() == connectedState && e.conn != nil { + p.deleteConnection(e.name) + p.poolsMutex.Unlock() + p.handlerDeactivated(e.name, e.conn, e.role) + e.conn = nil + e.role = UnknownRole + } else { + p.poolsMutex.Unlock() } } - return nil, ErrNoHealthyInstance +} + +func (p *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { + return p.selector.Select(mode) } func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool { diff --git a/pool/round_robin.go b/pool/round_robin.go deleted file mode 100644 index f3ccb014c..000000000 --- a/pool/round_robin.go +++ /dev/null @@ -1,112 +0,0 @@ -package pool - -import ( - "sync" - "sync/atomic" - - "github.com/tarantool/go-tarantool/v3" -) - -type roundRobinStrategy struct { - conns []*tarantool.Connection - indexById map[string]uint - mutex sync.RWMutex - size uint64 - current uint64 -} - -func newRoundRobinStrategy(size int) *roundRobinStrategy { - return &roundRobinStrategy{ - conns: make([]*tarantool.Connection, 0, size), - indexById: make(map[string]uint, size), - size: 0, - current: 0, - } -} - -func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection { - r.mutex.RLock() - defer r.mutex.RUnlock() - - index, found := r.indexById[id] - if !found { - return nil - } - - return r.conns[index] -} - -func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection { - r.mutex.Lock() - defer r.mutex.Unlock() - - if r.size == 0 { - return nil - } - - index, found := r.indexById[id] - if !found { - return nil - } - - delete(r.indexById, id) - - conn := r.conns[index] - r.conns = append(r.conns[:index], r.conns[index+1:]...) - r.size -= 1 - - for k, v := range r.indexById { - if v > index { - r.indexById[k] = v - 1 - } - } - - return conn -} - -func (r *roundRobinStrategy) IsEmpty() bool { - r.mutex.RLock() - defer r.mutex.RUnlock() - - return r.size == 0 -} - -func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection { - r.mutex.RLock() - defer r.mutex.RUnlock() - - if r.size == 0 { - return nil - } - return r.conns[r.nextIndex()] -} - -func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection { - r.mutex.RLock() - defer r.mutex.RUnlock() - - conns := map[string]*tarantool.Connection{} - for id, index := range r.indexById { - conns[id] = r.conns[index] - } - - return conns -} - -func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) { - r.mutex.Lock() - defer r.mutex.Unlock() - - if idx, ok := r.indexById[id]; ok { - r.conns[idx] = conn - } else { - r.conns = append(r.conns, conn) - r.indexById[id] = uint(r.size) - r.size += 1 - } -} - -func (r *roundRobinStrategy) nextIndex() uint64 { - next := atomic.AddUint64(&r.current, 1) - return (next - 1) % r.size -} From 0d0504c21cb4efb5a049f41489cd238aa7078366 Mon Sep 17 00:00:00 2001 From: Dmitry Fedoseev Date: Tue, 31 Mar 2026 14:28:03 +0300 Subject: [PATCH 5/5] test: add tests for Strategy, Store, and Selector Replace old round_robin_test.go with comprehensive tests for the new components. Closes tarantool/go-tarantool#509 --- CHANGELOG.md | 4 + pool/round_robin_test.go | 90 --------- pool/strategy_test.go | 383 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 387 insertions(+), 90 deletions(-) delete mode 100644 pool/round_robin_test.go create mode 100644 pool/strategy_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ede41c79b..6fef27b9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added +* New pluggable architecture for connection selection in `ConnectionPool`: + `Store`, `Selector`, `Strategy`, `StrategyBuilder` interfaces (#509). +* New `RoundRobinBuilder` and `ActiveStandbyBuilder` implementations and + `Opts.StrategyBuilder` option for custom strategies (#509). * New types for MessagePack extensions compatible with go-option (#459). * Added `box.MustNew` wrapper for `box.New` without an error (#448). * Added missing IPROTO feature flags to greeting negotiation diff --git a/pool/round_robin_test.go b/pool/round_robin_test.go deleted file mode 100644 index dcc219fd4..000000000 --- a/pool/round_robin_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package pool - -import ( - "testing" - - "github.com/tarantool/go-tarantool/v3" -) - -const ( - validAddr1 = "x" - validAddr2 = "y" -) - -func TestRoundRobinAddDelete(t *testing.T) { - rr := newRoundRobinStrategy(10) - - addrs := []string{validAddr1, validAddr2} - conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} - - for i, addr := range addrs { - rr.AddConnection(addr, conns[i]) - } - - for i, addr := range addrs { - if conn := rr.DeleteConnection(addr); conn != conns[i] { - t.Errorf("Unexpected connection on address %s", addr) - } - } - if !rr.IsEmpty() { - t.Errorf("RoundRobin does not empty") - } -} - -func TestRoundRobinAddDuplicateDelete(t *testing.T) { - rr := newRoundRobinStrategy(10) - - conn1 := &tarantool.Connection{} - conn2 := &tarantool.Connection{} - - rr.AddConnection(validAddr1, conn1) - rr.AddConnection(validAddr1, conn2) - - if rr.DeleteConnection(validAddr1) != conn2 { - t.Errorf("Unexpected deleted connection") - } - if !rr.IsEmpty() { - t.Errorf("RoundRobin does not empty") - } - if rr.DeleteConnection(validAddr1) != nil { - t.Errorf("Unexpected value after second deletion") - } -} - -func TestRoundRobinGetNextConnection(t *testing.T) { - rr := newRoundRobinStrategy(10) - - addrs := []string{validAddr1, validAddr2} - conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} - - for i, addr := range addrs { - rr.AddConnection(addr, conns[i]) - } - - expectedConns := []*tarantool.Connection{conns[0], conns[1], conns[0], conns[1]} - for i, expected := range expectedConns { - if rr.GetNextConnection() != expected { - t.Errorf("Unexpected connection on %d call", i) - } - } -} - -func TestRoundRobinStrategy_GetConnections(t *testing.T) { - rr := newRoundRobinStrategy(10) - - addrs := []string{validAddr1, validAddr2} - conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} - - for i, addr := range addrs { - rr.AddConnection(addr, conns[i]) - } - - rr.GetConnections()[validAddr2] = conns[0] // GetConnections() returns a copy. - rrConns := rr.GetConnections() - - for i, addr := range addrs { - if conns[i] != rrConns[addr] { - t.Errorf("Unexpected connection on %s addr", addr) - } - } -} diff --git a/pool/strategy_test.go b/pool/strategy_test.go new file mode 100644 index 000000000..1012ce89c --- /dev/null +++ b/pool/strategy_test.go @@ -0,0 +1,383 @@ +package pool + +import ( + "testing" + + "github.com/tarantool/go-tarantool/v3" +) + +// roundRobinStrategy Tests + +func TestRoundRobinStrategy_AddRemove(t *testing.T) { + s := newRoundRobinStrategy(10) + + ids := []string{"conn1", "conn2"} + conns := []*tarantool.Connection{{}, {}} + + for i, id := range ids { + s.Add(id, conns[i]) + } + + for i, id := range ids { + removed := s.Remove(id) + if removed != conns[i] { + t.Errorf("Remove(%q) = %p, want %p", id, removed, conns[i]) + } + } + + if len(s.Connections()) != 0 { + t.Errorf("Connections() should be empty, got %d", len(s.Connections())) + } +} + +func TestRoundRobinStrategy_AddUpsert(t *testing.T) { + s := newRoundRobinStrategy(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn1", conn2) // Should update (upsert) + + conns := s.Connections() + if len(conns) != 1 { + t.Errorf("Connections() len = %d, want 1", len(conns)) + } + if conns["conn1"] != conn2 { + t.Errorf("Connections()[conn1] = %p, want %p (updated)", conns["conn1"], conn2) + } +} + +func TestRoundRobinStrategy_RemoveNonExistent(t *testing.T) { + s := newRoundRobinStrategy(10) + + removed := s.Remove("nonexistent") + if removed != nil { + t.Errorf("Remove(nonexistent) = %p, want nil", removed) + } +} + +func TestRoundRobinStrategy_Next(t *testing.T) { + s := newRoundRobinStrategy(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + + expected := []*tarantool.Connection{conn1, conn2, conn1, conn2} + for i, want := range expected { + got := s.Next() + if got != want { + t.Errorf("Next() call %d = %p, want %p", i, got, want) + } + } +} + +func TestRoundRobinStrategy_NextEmpty(t *testing.T) { + s := newRoundRobinStrategy(10) + + got := s.Next() + if got != nil { + t.Errorf("Next() on empty strategy = %p, want nil", got) + } +} + +func TestRoundRobinStrategy_RemoveIndexUpdate(t *testing.T) { + s := newRoundRobinStrategy(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + conn3 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + s.Add("conn3", conn3) + + s.Remove("conn2") + + expected := []*tarantool.Connection{conn1, conn3, conn1, conn3} + for i, want := range expected { + got := s.Next() + if got != want { + t.Errorf("Next() after remove, call %d = %p, want %p", i, got, want) + } + } +} + +// activeStandbyStrategy Tests + +func TestActiveStandbyStrategy_ActiveSlots(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + conn3 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + s.Add("conn3", conn3) // Should be standby (only 2 active slots). + + if s.Len() != 2 { + t.Errorf("ActiveCount() = %d, want 2", s.Len()) + } + if s.StandbyCount() != 1 { + t.Errorf("StandbyCount() = %d, want 1", s.StandbyCount()) + } +} + +func TestActiveStandbyStrategy_PromotionOnRemove(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + conn3 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + s.Add("conn3", conn3) // Standby. + + // Remove active connection. + removed := s.Remove("conn1") + if removed != conn1 { + t.Errorf("Remove(conn1) = %p, want %p", removed, conn1) + } + + // Standby should be promoted. + if s.Len() != 2 { + t.Errorf("ActiveCount() after remove = %d, want 2", s.Len()) + } + if !s.IsActive("conn3") { + t.Errorf("conn3 should be promoted after conn1 removal") + } +} + +func TestActiveStandbyStrategy_RemoveNonExistent(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + removed := s.Remove("nonexistent") + if removed != nil { + t.Errorf("Remove(nonexistent) = %p, want nil", removed) + } +} + +func TestActiveStandbyStrategy_Next(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + + expected := []*tarantool.Connection{conn1, conn2, conn1, conn2} + for i, want := range expected { + got := s.Next() + if got != want { + t.Errorf("Next() call %d = %p, want %p", i, got, want) + } + } +} + +func TestActiveStandbyStrategy_NextOnlyActive(t *testing.T) { + s := newActiveStandbyStrategy(1, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) // Standby. + + // Next should only return conn1 (active). + for i := 0; i < 4; i++ { + got := s.Next() + if got != conn1 { + t.Errorf("Next() call %d = %p, want %p (conn1)", i, got, conn1) + } + } +} + +// selector Tests + +type selectorTest struct { + store *Store + rwStrategy *roundRobinStrategy + roStrategy *roundRobinStrategy + anyStrategy *roundRobinStrategy + sel *selector +} + +func setupSelectorTest() selectorTest { + store := NewStore() + rwStrategy := newRoundRobinStrategy(10) + roStrategy := newRoundRobinStrategy(10) + anyStrategy := newRoundRobinStrategy(10) + sel := newSelector(store, rwStrategy, roStrategy, anyStrategy) + return selectorTest{ + store: store, + rwStrategy: rwStrategy, + roStrategy: roStrategy, + anyStrategy: anyStrategy, + sel: sel, + } +} + +func TestSelector_Select_RW(t *testing.T) { + test := setupSelectorTest() + + masterConn := &tarantool.Connection{} + test.store.Upsert("master", masterConn, MasterRole) + + // RW mode should return master. + conn, err := test.rwStrategy.Next(), error(nil) + if err != nil { + t.Errorf("Next() error = %v", err) + } + if conn != masterConn { + t.Errorf("Next() = %p, want %p", conn, masterConn) + } +} + +func TestSelector_Select_RO(t *testing.T) { + test := setupSelectorTest() + + masterConn := &tarantool.Connection{} + replicaConn := &tarantool.Connection{} + + test.store.Upsert("master", masterConn, MasterRole) + test.store.Upsert("replica", replicaConn, ReplicaRole) + + // RO mode should return replica. + conn := test.roStrategy.Next() + if conn != replicaConn { + t.Errorf("Next() = %p, want %p", conn, replicaConn) + } +} + +func TestSelector_Select_ANY(t *testing.T) { + test := setupSelectorTest() + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + test.store.Upsert("conn1", conn1, MasterRole) + test.store.Upsert("conn2", conn2, ReplicaRole) + + // ANY mode should return any connection. + seen := make(map[*tarantool.Connection]bool) + for i := 0; i < 10; i++ { + conn := test.anyStrategy.Next() + seen[conn] = true + } + // Should have seen both connections. + if len(seen) != 2 { + t.Errorf("Next() should rotate between all connections, saw %d unique", len(seen)) + } +} + +func TestSelector_RoleChange(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + // Initially master. + if test.rwStrategy.Next() == nil { + t.Errorf("rwStrategy.Next() should return connection") + } + + // Change role to replica. + test.store.UpdateRole("conn", ReplicaRole) + + // Now should be in RO pool. + if test.rwStrategy.Next() != nil { + t.Errorf("rwStrategy.Next() should return nil after role change") + } + if test.roStrategy.Next() == nil { + t.Errorf("roStrategy.Next() should return connection after role change") + } +} + +func TestSelector_Remove(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + test.store.Remove("conn") + + // Should be removed from all strategies. + if !test.sel.IsEmpty(RW) { + t.Errorf("IsEmpty(RW) = false, want true") + } + if !test.sel.IsEmpty(ANY) { + t.Errorf("IsEmpty(ANY) = false, want true") + } +} + +func TestSelector_Get(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + got := test.sel.Get("conn") + if got != conn { + t.Errorf("Get(conn) = %p, want %p", got, conn) + } + + got = test.sel.Get("nonexistent") + if got != nil { + t.Errorf("Get(nonexistent) = %p, want nil", got) + } +} + +func TestSelector_HealthChange(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + // Initially healthy. + if test.rwStrategy.Next() == nil { + t.Errorf("rwStrategy.Next() should return connection") + } + + // Become unhealthy. + test.store.UpdateHealth("conn", false) + + // Should not be available. + if test.rwStrategy.Next() != nil { + t.Errorf("rwStrategy.Next() should return nil when unhealthy") + } + if !test.sel.IsEmpty(RW) { + t.Errorf("IsEmpty(RW) = false, want true when unhealthy") + } + + // Become healthy again. + test.store.UpdateHealth("conn", true) + + // Should be available again. + if test.rwStrategy.Next() == nil { + t.Errorf("rwStrategy.Next() should return connection after recovery") + } +} + +func TestSelector_ConnectionUpdate(t *testing.T) { + test := setupSelectorTest() + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + test.store.Upsert("conn", conn1, MasterRole) + if test.sel.Get("conn") != conn1 { + t.Errorf("Get(conn) = %p, want %p", test.sel.Get("conn"), conn1) + } + + // Update connection pointer. + test.store.Upsert("conn", conn2, MasterRole) + if test.sel.Get("conn") != conn2 { + t.Errorf("Get(conn) = %p, want %p", test.sel.Get("conn"), conn2) + } +}