From 93d4863641c07392b29dd0c4abde29921d1bfb57 Mon Sep 17 00:00:00 2001 From: Olderestin Date: Wed, 28 Jan 2026 01:34:14 +0200 Subject: [PATCH] fix(yatgbot): Delete mutex from `messageHeap` --- yatgbot/messagequeue/heap.go | 24 +----------------------- yatgbot/messagequeue/heap_test.go | 9 ++++----- yatgbot/messagequeue/messagequeue.go | 6 ++---- 3 files changed, 7 insertions(+), 32 deletions(-) diff --git a/yatgbot/messagequeue/heap.go b/yatgbot/messagequeue/heap.go index 92f7101..7a2daa0 100644 --- a/yatgbot/messagequeue/heap.go +++ b/yatgbot/messagequeue/heap.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "slices" - "sync" "time" "github.com/YaCodeDev/GoYaCodeDevUtils/yaerrors" @@ -91,7 +90,6 @@ func (j MessageJob) cancel() { // messageHeap is a thread-safe priority queue for MessageJob. type messageHeap struct { jobs []MessageJob - mu *sync.Mutex } // newMessageHeap creates a new instance of messageHeap. @@ -99,10 +97,9 @@ type messageHeap struct { // Example usage: // // heap := newMessageHeap() -func newMessageHeap(mu *sync.Mutex) messageHeap { +func newMessageHeap() messageHeap { return messageHeap{ jobs: make([]MessageJob, 0, PriorityQueueAllocSize), - mu: mu, } } @@ -144,12 +141,8 @@ func (h *messageHeap) sort() { // // heap.Push(job) func (h *messageHeap) Push(job MessageJob) { - h.mu.Lock() - h.jobs = append(h.jobs, job) h.sort() - - h.mu.Unlock() } // Len returns the number of jobs in the heap. @@ -158,9 +151,6 @@ func (h *messageHeap) Push(job MessageJob) { // // length := heap.Len() func (h *messageHeap) Len() int { - h.mu.Lock() - defer h.mu.Unlock() - return len(h.jobs) } @@ -178,14 +168,10 @@ func (h *messageHeap) Pop() (MessageJob, bool) { return MessageJob{}, false } - h.mu.Lock() - last := len(h.jobs) - 1 job := h.jobs[last] h.jobs = h.jobs[:last] - h.mu.Unlock() - return job, true } @@ -203,8 +189,6 @@ func (h *messageHeap) Pop() (MessageJob, bool) { func (h *messageHeap) Delete(id uint64) bool { var canceledJob *MessageJob - h.mu.Lock() - for i, job := range h.jobs { if job.ID == id { h.jobs = append(h.jobs[:i], h.jobs[i+1:]...) @@ -214,8 +198,6 @@ func (h *messageHeap) Delete(id uint64) bool { } } - h.mu.Unlock() - if canceledJob != nil { canceledJob.cancel() @@ -244,8 +226,6 @@ func (h *messageHeap) DeleteFunc(deleteFunc func(MessageJob) bool) []uint64 { canceledJobs []MessageJob ) - h.mu.Lock() - newJobs := make([]MessageJob, 0, len(h.jobs)) for _, job := range h.jobs { @@ -261,8 +241,6 @@ func (h *messageHeap) DeleteFunc(deleteFunc func(MessageJob) bool) []uint64 { h.jobs = newJobs - h.mu.Unlock() - for _, job := range canceledJobs { job.cancel() } diff --git a/yatgbot/messagequeue/heap_test.go b/yatgbot/messagequeue/heap_test.go index 76fa348..e5c0f13 100644 --- a/yatgbot/messagequeue/heap_test.go +++ b/yatgbot/messagequeue/heap_test.go @@ -1,7 +1,6 @@ package messagequeue import ( - "sync" "testing" "time" ) @@ -18,7 +17,7 @@ func mustPop(t *testing.T, h *messageHeap) MessageJob { } func TestHeap_PushPopOrdering(t *testing.T) { - h := newMessageHeap(&sync.Mutex{}) + h := newMessageHeap() now := time.Now() // ID 3: highest priority (1) and *oldest* timestamp @@ -48,7 +47,7 @@ func TestHeap_PushPopOrdering(t *testing.T) { } func TestHeap_DeleteByID(t *testing.T) { - h := newMessageHeap(&sync.Mutex{}) + h := newMessageHeap() h.Push(MessageJob{ID: 10}) h.Push(MessageJob{ID: 20}) @@ -66,7 +65,7 @@ func TestHeap_DeleteByID(t *testing.T) { } func TestHeap_DeleteFunc(t *testing.T) { - h := newMessageHeap(&sync.Mutex{}) + h := newMessageHeap() h.Push(MessageJob{ID: 1, Priority: 5}) h.Push(MessageJob{ID: 2, Priority: 3}) @@ -87,7 +86,7 @@ func TestHeap_DeleteFunc(t *testing.T) { } func TestHeap_PopOnEmpty(t *testing.T) { - h := newMessageHeap(&sync.Mutex{}) + h := newMessageHeap() if _, ok := h.Pop(); ok { t.Fatalf("expected ok==false on empty Pop") } diff --git a/yatgbot/messagequeue/messagequeue.go b/yatgbot/messagequeue/messagequeue.go index a350f85..7a10773 100644 --- a/yatgbot/messagequeue/messagequeue.go +++ b/yatgbot/messagequeue/messagequeue.go @@ -38,15 +38,13 @@ func NewDispatcher( parseMode yatgmessageencoding.MessageEncoding, log yalogger.Logger, ) *Dispatcher { - mu := sync.Mutex{} - dispatcher := &Dispatcher{ parseMode: parseMode, Client: client, messageQueueChannel: make(chan MessageJob), log: log, - heap: newMessageHeap(&mu), - cond: *sync.NewCond(&mu), + heap: newMessageHeap(), + cond: *sync.NewCond(&sync.Mutex{}), } go dispatcher.proccessMessagesQueue()