-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreshold.go
More file actions
137 lines (113 loc) · 3.12 KB
/
threshold.go
File metadata and controls
137 lines (113 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package ewrap
import (
"sync"
"time"
)
// CircuitBreaker implements the circuit breaker pattern for error handling.
type CircuitBreaker struct {
name string
maxFailures int
timeout time.Duration
failureCount int
lastFailure time.Time
state CircuitState
observer Observer
mu sync.RWMutex
onStateChange func(name string, from, to CircuitState)
}
// CircuitState represents the state of a circuit breaker.
type CircuitState int
const (
// CircuitClosed indicates normal operation.
CircuitClosed CircuitState = iota
// CircuitOpen indicates the circuit is broken.
CircuitOpen
// CircuitHalfOpen indicates the circuit is testing recovery.
CircuitHalfOpen
)
// NewCircuitBreaker creates a new circuit breaker.
func NewCircuitBreaker(name string, maxFailures int, timeout time.Duration) *CircuitBreaker {
return NewCircuitBreakerWithObserver(name, maxFailures, timeout, nil)
}
// NewCircuitBreakerWithObserver creates a new circuit breaker with an observer.
func NewCircuitBreakerWithObserver(name string, maxFailures int, timeout time.Duration, observer Observer) *CircuitBreaker {
if observer == nil {
observer = newNoopObserver()
}
return &CircuitBreaker{
name: name,
maxFailures: maxFailures,
timeout: timeout,
state: CircuitClosed,
observer: observer,
}
}
// OnStateChange sets a callback for state changes.
func (cb *CircuitBreaker) OnStateChange(callback func(name string, from, to CircuitState)) {
cb.mu.Lock()
cb.onStateChange = callback
cb.mu.Unlock()
}
// SetObserver sets an observer for the circuit breaker.
func (cb *CircuitBreaker) SetObserver(observer Observer) {
cb.mu.Lock()
defer cb.mu.Unlock()
if observer == nil {
observer = newNoopObserver()
}
cb.observer = observer
}
// RecordFailure records a failure and potentially opens the circuit.
func (cb *CircuitBreaker) RecordFailure() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failureCount++
cb.lastFailure = time.Now()
if cb.state == CircuitClosed && cb.failureCount >= cb.maxFailures {
cb.transitionTo(CircuitOpen)
}
}
// RecordSuccess records a success and potentially closes the circuit.
func (cb *CircuitBreaker) RecordSuccess() {
cb.mu.Lock()
defer cb.mu.Unlock()
if cb.state == CircuitHalfOpen {
cb.failureCount = 0
cb.transitionTo(CircuitClosed)
}
}
// CanExecute checks if the operation can be executed.
func (cb *CircuitBreaker) CanExecute() bool {
cb.mu.RLock()
defer cb.mu.RUnlock()
switch cb.state {
case CircuitClosed, CircuitHalfOpen:
return true
case CircuitOpen:
if time.Since(cb.lastFailure) > cb.timeout {
cb.mu.RUnlock()
cb.mu.Lock()
cb.transitionTo(CircuitHalfOpen)
cb.mu.Unlock()
cb.mu.RLock()
return true
}
return false
default:
return false
}
}
// transitionTo changes the circuit breaker state.
func (cb *CircuitBreaker) transitionTo(newState CircuitState) {
if cb.state == newState {
return
}
oldState := cb.state
cb.state = newState
if cb.observer != nil {
cb.observer.RecordCircuitStateTransition(cb.name, oldState, newState)
}
if cb.onStateChange != nil {
go cb.onStateChange(cb.name, oldState, newState)
}
}