-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathmutex.go
112 lines (100 loc) · 2.25 KB
/
mutex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package yiigo
import (
"context"
"errors"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
// DistributedMutex 分布式锁
type DistributedMutex interface {
// Lock 获取锁
Lock(ctx context.Context) (bool, error)
// TryLock 尝试获取锁
TryLock(ctx context.Context, attempts int, delay time.Duration) (bool, error)
// UnLock 释放锁
UnLock(ctx context.Context) error
}
// distributed 基于「Redis」实现的分布式锁
type distributed struct {
cli *redis.Client
key string
token string
expire time.Duration
}
func (d *distributed) Lock(ctx context.Context) (bool, error) {
select {
case <-ctx.Done(): // timeout or canceled
return false, ctx.Err()
default:
}
if err := d.lock(ctx); err != nil {
return false, err
}
return len(d.token) != 0, nil
}
func (d *distributed) TryLock(ctx context.Context, attempts int, interval time.Duration) (bool, error) {
for i := 0; i < attempts; i++ {
select {
case <-ctx.Done(): // timeout or canceled
return false, ctx.Err()
default:
}
// attempt to acquire lock
if err := d.lock(ctx); err != nil {
return false, err
}
if len(d.token) != 0 {
return true, nil
}
time.Sleep(interval)
}
return false, nil
}
func (d *distributed) UnLock(ctx context.Context) error {
if len(d.token) == 0 {
return nil
}
script := `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`
return d.cli.Eval(context.WithoutCancel(ctx), script, []string{d.key}, d.token).Err()
}
func (d *distributed) lock(ctx context.Context) error {
token := uuid.New().String()
ok, err := d.cli.SetNX(ctx, d.key, token, d.expire).Result()
if err != nil {
// 尝试GET一次:避免因redis网络错误导致误加锁
v, _err := d.cli.Get(ctx, d.key).Result()
if _err != nil {
if errors.Is(_err, redis.Nil) {
return err
}
return _err
}
if v == token {
d.token = token
}
return nil
}
if ok {
d.token = token
}
return nil
}
// RedisMutex 基于Redis实现的分布式锁实例
func RedisMutex(cli *redis.Client, key string, ttl time.Duration) DistributedMutex {
mutex := &distributed{
cli: cli,
key: key,
expire: ttl,
}
if mutex.expire == 0 {
mutex.expire = time.Second * 10
}
return mutex
}