-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_cluster.go
More file actions
135 lines (105 loc) · 3.55 KB
/
redis_cluster.go
File metadata and controls
135 lines (105 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package backend
import (
"context"
"errors"
"time"
"github.com/hyp3rd/ewrap"
"github.com/redis/go-redis/v9"
"github.com/hyp3rd/hypercache/internal/constants"
"github.com/hyp3rd/hypercache/internal/libs/serializer"
"github.com/hyp3rd/hypercache/internal/sentinel"
cache "github.com/hyp3rd/hypercache/pkg/cache/v2"
)
const (
clusterMaxRetries = 3
clusterRetriesDelay = 100 * time.Millisecond
)
// RedisCluster is a cache backend that stores items in a Redis Cluster.
// It mirrors the single-node Redis backend semantics but uses go-redis ClusterClient.
type RedisCluster struct {
rdb *redis.ClusterClient // redis cluster client
capacity int // capacity of the cache
itemPoolManager *cache.ItemPoolManager
keysSetName string
Serializer serializer.ISerializer
}
// NewRedisCluster creates a new Redis Cluster backend with the given options.
func NewRedisCluster(redisOptions ...Option[RedisCluster]) (IBackend[RedisCluster], error) {
rc := &RedisCluster{itemPoolManager: cache.NewItemPoolManager()}
ApplyOptions(rc, redisOptions...)
if rc.rdb == nil {
return nil, sentinel.ErrNilClient
}
if rc.capacity < 0 {
return nil, sentinel.ErrInvalidCapacity
}
if rc.keysSetName == "" {
rc.keysSetName = constants.RedisBackend
}
if rc.Serializer == nil {
var err error
rc.Serializer, err = serializer.New("msgpack")
if err != nil {
return nil, err
}
}
return rc, nil
}
// SetCapacity sets the capacity of the cluster backend.
func (cacheBackend *RedisCluster) SetCapacity(capacity int) {
if capacity < 0 {
return
}
cacheBackend.capacity = capacity
}
// Capacity returns the capacity of the cluster backend.
func (cacheBackend *RedisCluster) Capacity() int {
return cacheBackend.capacity
}
// Count returns the number of keys stored.
func (cacheBackend *RedisCluster) Count(ctx context.Context) int {
count, err := cacheBackend.rdb.DBSize(ctx).Result()
if err != nil {
return 0
}
return int(count)
}
// Get retrieves an item by key.
func (cacheBackend *RedisCluster) Get(ctx context.Context, key string) (*cache.Item, bool) {
isMember, err := cacheBackend.rdb.SIsMember(ctx, cacheBackend.keysSetName, key).Result()
if err != nil || !isMember {
return nil, false
}
pooled := cacheBackend.itemPoolManager.Get()
data, err := cacheBackend.rdb.HGet(ctx, key, "data").Bytes()
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, false
}
return nil, false
}
err = cacheBackend.Serializer.Unmarshal(data, pooled)
if err != nil {
return nil, false
}
out := *pooled
cacheBackend.itemPoolManager.Put(pooled)
return &out, true
}
// Set stores an item in the cluster.
func (cacheBackend *RedisCluster) Set(ctx context.Context, item *cache.Item) error {
return redisSet(ctx, cacheBackend.rdb, cacheBackend.keysSetName, item, cacheBackend.Serializer)
}
// List returns items matching optional filters.
func (cacheBackend *RedisCluster) List(ctx context.Context, filters ...IFilter) ([]*cache.Item, error) {
return redisList(ctx, cacheBackend.rdb, cacheBackend.keysSetName, cacheBackend.Serializer, cacheBackend.itemPoolManager, filters...)
}
// Remove deletes the specified keys.
func (cacheBackend *RedisCluster) Remove(ctx context.Context, keys ...string) error {
return redisRemove(ctx, cacheBackend.rdb, cacheBackend.keysSetName, keys...)
}
// Clear flushes the database.
func (cacheBackend *RedisCluster) Clear(ctx context.Context) error {
_, err := cacheBackend.rdb.FlushDB(ctx).Result()
return ewrap.Wrap(err, "flushing database", ewrap.WithRetry(clusterMaxRetries, clusterRetriesDelay))
}