Skip to content

Commit bcf3a2d

Browse files
Merge pull request #17 from YaCodeDev/fix/yatgbot
Delete mutex from `messageHeap`
2 parents 0c48e80 + 93d4863 commit bcf3a2d

3 files changed

Lines changed: 7 additions & 32 deletions

File tree

yatgbot/messagequeue/heap.go

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"net/http"
88
"slices"
9-
"sync"
109
"time"
1110

1211
"github.com/YaCodeDev/GoYaCodeDevUtils/yaerrors"
@@ -91,18 +90,16 @@ func (j MessageJob) cancel() {
9190
// messageHeap is a thread-safe priority queue for MessageJob.
9291
type messageHeap struct {
9392
jobs []MessageJob
94-
mu *sync.Mutex
9593
}
9694

9795
// newMessageHeap creates a new instance of messageHeap.
9896
//
9997
// Example usage:
10098
//
10199
// heap := newMessageHeap()
102-
func newMessageHeap(mu *sync.Mutex) messageHeap {
100+
func newMessageHeap() messageHeap {
103101
return messageHeap{
104102
jobs: make([]MessageJob, 0, PriorityQueueAllocSize),
105-
mu: mu,
106103
}
107104
}
108105

@@ -144,12 +141,8 @@ func (h *messageHeap) sort() {
144141
//
145142
// heap.Push(job)
146143
func (h *messageHeap) Push(job MessageJob) {
147-
h.mu.Lock()
148-
149144
h.jobs = append(h.jobs, job)
150145
h.sort()
151-
152-
h.mu.Unlock()
153146
}
154147

155148
// Len returns the number of jobs in the heap.
@@ -158,9 +151,6 @@ func (h *messageHeap) Push(job MessageJob) {
158151
//
159152
// length := heap.Len()
160153
func (h *messageHeap) Len() int {
161-
h.mu.Lock()
162-
defer h.mu.Unlock()
163-
164154
return len(h.jobs)
165155
}
166156

@@ -178,14 +168,10 @@ func (h *messageHeap) Pop() (MessageJob, bool) {
178168
return MessageJob{}, false
179169
}
180170

181-
h.mu.Lock()
182-
183171
last := len(h.jobs) - 1
184172
job := h.jobs[last]
185173
h.jobs = h.jobs[:last]
186174

187-
h.mu.Unlock()
188-
189175
return job, true
190176
}
191177

@@ -203,8 +189,6 @@ func (h *messageHeap) Pop() (MessageJob, bool) {
203189
func (h *messageHeap) Delete(id uint64) bool {
204190
var canceledJob *MessageJob
205191

206-
h.mu.Lock()
207-
208192
for i, job := range h.jobs {
209193
if job.ID == id {
210194
h.jobs = append(h.jobs[:i], h.jobs[i+1:]...)
@@ -214,8 +198,6 @@ func (h *messageHeap) Delete(id uint64) bool {
214198
}
215199
}
216200

217-
h.mu.Unlock()
218-
219201
if canceledJob != nil {
220202
canceledJob.cancel()
221203

@@ -244,8 +226,6 @@ func (h *messageHeap) DeleteFunc(deleteFunc func(MessageJob) bool) []uint64 {
244226
canceledJobs []MessageJob
245227
)
246228

247-
h.mu.Lock()
248-
249229
newJobs := make([]MessageJob, 0, len(h.jobs))
250230

251231
for _, job := range h.jobs {
@@ -261,8 +241,6 @@ func (h *messageHeap) DeleteFunc(deleteFunc func(MessageJob) bool) []uint64 {
261241

262242
h.jobs = newJobs
263243

264-
h.mu.Unlock()
265-
266244
for _, job := range canceledJobs {
267245
job.cancel()
268246
}

yatgbot/messagequeue/heap_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package messagequeue
22

33
import (
4-
"sync"
54
"testing"
65
"time"
76
)
@@ -18,7 +17,7 @@ func mustPop(t *testing.T, h *messageHeap) MessageJob {
1817
}
1918

2019
func TestHeap_PushPopOrdering(t *testing.T) {
21-
h := newMessageHeap(&sync.Mutex{})
20+
h := newMessageHeap()
2221
now := time.Now()
2322

2423
// ID 3: highest priority (1) and *oldest* timestamp
@@ -48,7 +47,7 @@ func TestHeap_PushPopOrdering(t *testing.T) {
4847
}
4948

5049
func TestHeap_DeleteByID(t *testing.T) {
51-
h := newMessageHeap(&sync.Mutex{})
50+
h := newMessageHeap()
5251
h.Push(MessageJob{ID: 10})
5352
h.Push(MessageJob{ID: 20})
5453

@@ -66,7 +65,7 @@ func TestHeap_DeleteByID(t *testing.T) {
6665
}
6766

6867
func TestHeap_DeleteFunc(t *testing.T) {
69-
h := newMessageHeap(&sync.Mutex{})
68+
h := newMessageHeap()
7069

7170
h.Push(MessageJob{ID: 1, Priority: 5})
7271
h.Push(MessageJob{ID: 2, Priority: 3})
@@ -87,7 +86,7 @@ func TestHeap_DeleteFunc(t *testing.T) {
8786
}
8887

8988
func TestHeap_PopOnEmpty(t *testing.T) {
90-
h := newMessageHeap(&sync.Mutex{})
89+
h := newMessageHeap()
9190
if _, ok := h.Pop(); ok {
9291
t.Fatalf("expected ok==false on empty Pop")
9392
}

yatgbot/messagequeue/messagequeue.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,13 @@ func NewDispatcher(
3838
parseMode yatgmessageencoding.MessageEncoding,
3939
log yalogger.Logger,
4040
) *Dispatcher {
41-
mu := sync.Mutex{}
42-
4341
dispatcher := &Dispatcher{
4442
parseMode: parseMode,
4543
Client: client,
4644
messageQueueChannel: make(chan MessageJob),
4745
log: log,
48-
heap: newMessageHeap(&mu),
49-
cond: *sync.NewCond(&mu),
46+
heap: newMessageHeap(),
47+
cond: *sync.NewCond(&sync.Mutex{}),
5048
}
5149

5250
go dispatcher.proccessMessagesQueue()

0 commit comments

Comments
 (0)