-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathredis_mutex.go
More file actions
130 lines (117 loc) · 2.62 KB
/
redis_mutex.go
File metadata and controls
130 lines (117 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package sync
import (
"context"
"time"
"github.com/go-redis/redis/v8"
)
// RedisSync ...
type RedisSync struct {
Opts *Options
Redis *redis.Client
}
// NewRedisSync ...
func NewRedisSync(redis *redis.Client, opts ...Option) *RedisSync {
return &RedisSync{
Redis: redis,
Opts: newOptions(opts...),
}
}
// NewMutex 创建互斥锁
func (r *RedisSync) NewMutex(key string, opts ...Option) WaitableMutexer {
optx := *r.Opts
for _, o := range opts {
o(&optx)
}
rm := &redisMutex{
sessionID: sessionID(),
opts: &optx,
redis: r.Redis,
key: key,
}
return rm
}
// redisMutex redis互斥锁
type redisMutex struct {
sessionID string
opts *Options
redis *redis.Client
key string
lockCtx context.Context
lockCancel context.CancelFunc
}
// Lock 加锁
func (rm *redisMutex) Lock() (err error) {
var flag bool
lockName := rm.lockName()
flag, err = rm.redis.SetNX(context.Background(), lockName, rm.sessionID, rm.opts.LockTimeout).Result()
if err != nil {
return
}
if !flag {
err = ErrLockFailed
return
}
rm.lockCtx, rm.lockCancel = context.WithCancel(context.Background())
go func() {
t := time.NewTimer(rm.opts.WaitRetry)
defer t.Stop()
for {
select {
case <-t.C:
_ = rm.redis.Expire(context.Background(), lockName, rm.opts.WaitRetry)
t.Reset(rm.opts.WaitRetry)
// TODO: 如果延迟失败
// fmt.Println("触发延迟key条件")
case <-rm.lockCtx.Done():
// fmt.Println("解锁触发上下文结束事件,结束整个for循环")
return
}
}
}()
return
}
const (
luaRelease = `if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end`
)
// Unlock 解锁
func (rm *redisMutex) Unlock() (err error) {
var flag bool
lockName := rm.lockName()
flag, err = rm.redis.Eval(context.Background(), luaRelease, []string{lockName}, rm.sessionID).Bool()
if err != nil {
return
}
if !flag {
err = ErrUnlockFailed
return
}
if rm.lockCancel != nil {
rm.lockCancel()
}
return
}
func (rm *redisMutex) lockName() string {
return rm.opts.KeyPrefix + rm.key
}
// LockWait 等待加锁成功,会一直重试直到加锁成功或者上下文被取消
func (rm *redisMutex) LockWait(ctx context.Context) (err error) {
for {
// 尝试加锁
err = rm.Lock()
if err == nil {
// 加锁成功
return nil
}
// 如果不是加锁失败错误,直接返回
if err != ErrLockFailed {
return err
}
// 加锁失败,等待一段时间后重试,同时监听上下文取消
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(rm.opts.RetryInterval):
// 继续重试
}
}
}