-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwrite_queue.go
More file actions
313 lines (245 loc) · 6.68 KB
/
write_queue.go
File metadata and controls
313 lines (245 loc) · 6.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
package cachier
import (
"fmt"
"reflect"
"runtime"
"sync"
"github.com/gammazero/deque"
)
const (
QueueOperationSet = iota
QueueOperationDelete
QueueOperationDeletePredicate
QueueOperationPurge
)
type queueOperation interface {
Includes(op queueOperation) bool
IncludesKey(key string) bool
String() string
}
type queueOperationWithKey interface {
queueOperation
GetKey() string
}
type queueOperationSet[T any] struct {
Key string
Value *T
}
func (o *queueOperationSet[T]) GetType() int {
return QueueOperationSet
}
func (o *queueOperationSet[T]) GetKey() string {
return o.Key
}
func (o *queueOperationSet[T]) String() string {
return fmt.Sprintf("Set(%s)", o.Key)
}
func (o *queueOperationSet[T]) Includes(op queueOperation) bool {
if op, ok := op.(queueOperationWithKey); ok {
return o.Key == op.GetKey()
}
return false
}
func (o *queueOperationSet[T]) IncludesKey(key string) bool {
return o.Key == key
}
type queueOperationDelete struct {
Key string
}
func (o *queueOperationDelete) GetType() int {
return QueueOperationDelete
}
func (o *queueOperationDelete) GetKey() string {
return o.Key
}
func (o *queueOperationDelete) String() string {
return fmt.Sprintf("Delete(%s)", o.Key)
}
func (o *queueOperationDelete) Includes(op queueOperation) bool {
if op, ok := op.(queueOperationWithKey); ok {
return o.Key == op.GetKey()
}
return false
}
func (o *queueOperationDelete) IncludesKey(key string) bool {
return o.Key == key
}
type queueOperationDeletePredicate struct {
Predicate Predicate
}
func (o *queueOperationDeletePredicate) GetType() int {
return QueueOperationDeletePredicate
}
func (o *queueOperationDeletePredicate) String() string {
funcName := runtime.FuncForPC(reflect.ValueOf(o.Predicate).Pointer()).Name()
return fmt.Sprintf("DeletePredicate(%s)", funcName)
}
func (o *queueOperationDeletePredicate) Includes(op queueOperation) bool {
if op, ok := op.(queueOperationWithKey); ok {
return o.Predicate(op.GetKey())
}
return false
}
func (o *queueOperationDeletePredicate) IncludesKey(key string) bool {
return o.Predicate(key)
}
type queueOperationPurge struct{}
func (o *queueOperationPurge) GetType() int {
return QueueOperationPurge
}
func (o *queueOperationPurge) String() string {
return "Purge()"
}
func (o *queueOperationPurge) Includes(op queueOperation) bool {
return true
}
func (o *queueOperationPurge) IncludesKey(key string) bool {
return true
}
type writeQueue[T any] struct {
sync.Mutex
Queue deque.Deque[queueOperation] // Queue to hold write cache operations
Values map[string]*T // Map to hold currently valid values that were not yet written
CurrentlyWriting queueOperation // Queue write operation that is currently being processed
}
// newWriteQueue creates a new CircularQueue with the specified size
func newWriteQueue[T any]() *writeQueue[T] {
return &writeQueue[T]{
Values: make(map[string]*T),
CurrentlyWriting: nil,
}
}
// removeOverridden removes all operations from the queue that are overridden by the provided operation
func (q *writeQueue[T]) removeOverridden(op queueOperation) {
i := 0
for i < q.Queue.Len() {
iOp := q.Queue.At(i)
if op.Includes(iOp) {
q.Queue.Remove(i)
} else {
i++
}
}
}
// Get retrieves the value for a given key from the queue.
//
// Returns nil, true if the key is invalid. Returns nil, false if the key was not found.
func (q *writeQueue[T]) Get(key string) (*T, bool) {
q.Lock()
defer q.Unlock()
if value, ok := q.Values[key]; ok {
return value, true
}
for it := range q.Queue.Iter() {
if it.IncludesKey(key) {
return nil, true
}
}
return nil, false // Key not found
}
// Set adds a new key-value pair to the queue
func (q *writeQueue[T]) Set(key string, value *T) {
q.Lock()
defer q.Unlock()
op := &queueOperationSet[T]{Key: key, Value: value}
q.removeOverridden(op)
q.Queue.PushBack(op)
q.Values[key] = value
}
// Delete queues a key for deletion
func (q *writeQueue[T]) Delete(key string) {
q.Lock()
defer q.Unlock()
op := &queueOperationDelete{Key: key}
q.removeOverridden(op)
q.Queue.PushBack(op)
delete(q.Values, key)
}
// DeletePredicate queues a deletion of all keys matching the supplied predicate
func (q *writeQueue[T]) DeletePredicate(pred Predicate) {
q.Lock()
defer q.Unlock()
op := &queueOperationDeletePredicate{Predicate: pred}
q.removeOverridden(op)
q.Queue.PushBack(op)
for key := range q.Values {
if pred(key) {
delete(q.Values, key)
}
}
}
// Count returns the number of keys in the queue
func (q *writeQueue[T]) Count() int {
q.Lock()
defer q.Unlock()
return len(q.Values)
}
// CountPredicate counts the number of keys in the queue that satisfy the given predicate
func (q *writeQueue[T]) CountPredicate(pred Predicate) int {
q.Lock()
defer q.Unlock()
count := 0
for key := range q.Values {
if pred(key) {
count++ // Count valid keys that satisfy the predicate
}
}
return count // Return the total count
}
// Purge removes all records from the queue
func (q *writeQueue[T]) Purge() {
q.Lock()
defer q.Unlock()
op := &queueOperationPurge{}
q.Queue.Clear()
q.Queue.PushBack(op)
q.Values = make(map[string]*T) // Reset the values map
}
// Keys returns all the keys in the queue
func (q *writeQueue[T]) Keys() []string {
q.Lock()
defer q.Unlock()
keys := make([]string, 0, len(q.Values)+1)
for key := range q.Values {
keys = append(keys, key) // Add valid keys
}
return keys // Return the list of all keys
}
// StartWriting removes the oldest key-value pair from the queue
func (q *writeQueue[T]) StartWriting() (queueOperation, bool) {
q.Lock()
defer q.Unlock()
if q.CurrentlyWriting != nil {
panic("write operation already in progress")
}
if q.Queue.Len() == 0 {
return nil, false
}
op := q.Queue.At(0)
q.CurrentlyWriting = op
return op, true
}
// DoneWriting marks the current writing key-value pair as done
func (q *writeQueue[T]) DoneWriting(ok bool) {
q.Lock()
defer q.Unlock()
// The queue could have been changed since the StartWriting call,
// so we need to check if the first operation is the same as the current writing operation
if ok && q.Queue.Len() > 0 && q.Queue.At(0) == q.CurrentlyWriting {
// Remove the completed operation from the front of the queue
q.Queue.PopFront()
// If it's a set operation, and the value was not overridden
if op, ok := q.CurrentlyWriting.(*queueOperationSet[T]); ok {
if value, ok := q.Values[op.Key]; ok && value == op.Value {
delete(q.Values, op.Key)
}
}
}
q.CurrentlyWriting = nil // Reset the current writing operation
}
// GetStats returns the current size of the queue
func (q *writeQueue[T]) GetStats() (int, int) {
q.Lock()
defer q.Unlock()
return q.Queue.Len(), len(q.Values)
}