-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtime_window.go
More file actions
112 lines (99 loc) · 2.49 KB
/
time_window.go
File metadata and controls
112 lines (99 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package ratelimit
import (
"context"
"errors"
"fmt"
slog "github.com/vearne/simplelog"
"sync"
"time"
)
// nolint: govet
type SlideTimeWindowLimiter struct {
sync.Mutex
interval time.Duration
duration time.Duration
throughput int
windowBuckets int
durationPerBucket time.Duration
lastUpdateTime time.Time
buckets []int
}
func NewSlideTimeWindowLimiter(throughput int, duration time.Duration, windowBuckets int) (Limiter, error) {
s := SlideTimeWindowLimiter{buckets: make([]int, windowBuckets)}
s.throughput = throughput
s.durationPerBucket = duration / time.Duration(windowBuckets)
s.duration = duration
s.lastUpdateTime = time.Now()
s.windowBuckets = windowBuckets
s.interval = duration / time.Duration(throughput)
for i := 0; i < windowBuckets; i++ {
s.buckets[i] = 0
}
return &s, nil
}
// wait until take a token or timeout
func (r *SlideTimeWindowLimiter) Wait(ctx context.Context) (err error) {
ok, err := r.Take(ctx)
slog.Debug("r.Take")
if err != nil {
return err
}
if ok {
return nil
}
deadline, ok := ctx.Deadline()
minWaitTime := r.interval
slog.Debug("minWaitTime:%v", minWaitTime)
if ok {
if deadline.Before(time.Now().Add(minWaitTime)) {
slog.Debug("can't get token before %v", deadline)
return fmt.Errorf("can't get token before %v", deadline)
}
}
for {
timer := time.NewTimer(minWaitTime)
select {
// 执行的代码
case <-ctx.Done():
return errors.New("context timeout")
case <-timer.C:
ok, err := r.Take(ctx)
if err != nil {
return err
}
if ok {
return nil
}
}
}
}
func (s *SlideTimeWindowLimiter) Take(ctx context.Context) (bool, error) {
s.Lock()
defer s.Unlock()
nowTime := time.Now()
lastBucketIndex := int(s.lastUpdateTime.UnixNano()/int64(s.durationPerBucket)) % s.windowBuckets
nowBucketIndex := int(nowTime.UnixNano()/int64(s.durationPerBucket)) % s.windowBuckets
if nowTime.Sub(s.lastUpdateTime) > s.durationPerBucket*time.Duration(s.windowBuckets-1) {
for i := 0; i < s.windowBuckets; i++ {
s.buckets[i] = 0
}
} else if nowBucketIndex != lastBucketIndex {
for i := (lastBucketIndex + 1) % s.windowBuckets; i != nowBucketIndex; i = (i + 1) % s.windowBuckets {
s.buckets[i] = 0
}
}
if s.throughput-s.Count() > 0 {
s.buckets[nowBucketIndex]++
s.lastUpdateTime = time.Now()
return true, nil
} else {
return false, nil
}
}
func (s *SlideTimeWindowLimiter) Count() int {
total := 0
for i := 0; i < s.windowBuckets; i++ {
total += s.buckets[i]
}
return total
}