Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions yatgbot/messagequeue/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"slices"
"sync"
"time"

"github.com/YaCodeDev/GoYaCodeDevUtils/yaerrors"
Expand Down Expand Up @@ -91,18 +90,16 @@ func (j MessageJob) cancel() {
// messageHeap is a thread-safe priority queue for MessageJob.
type messageHeap struct {
jobs []MessageJob
mu *sync.Mutex
}

// newMessageHeap creates a new instance of messageHeap.
//
// Example usage:
//
// heap := newMessageHeap()
func newMessageHeap(mu *sync.Mutex) messageHeap {
func newMessageHeap() messageHeap {
return messageHeap{
jobs: make([]MessageJob, 0, PriorityQueueAllocSize),
mu: mu,
}
}

Expand Down Expand Up @@ -144,12 +141,8 @@ func (h *messageHeap) sort() {
//
// heap.Push(job)
func (h *messageHeap) Push(job MessageJob) {
h.mu.Lock()

h.jobs = append(h.jobs, job)
h.sort()

h.mu.Unlock()
}

// Len returns the number of jobs in the heap.
Expand All @@ -158,9 +151,6 @@ func (h *messageHeap) Push(job MessageJob) {
//
// length := heap.Len()
func (h *messageHeap) Len() int {
h.mu.Lock()
defer h.mu.Unlock()

return len(h.jobs)
}

Expand All @@ -178,14 +168,10 @@ func (h *messageHeap) Pop() (MessageJob, bool) {
return MessageJob{}, false
}

h.mu.Lock()

last := len(h.jobs) - 1
job := h.jobs[last]
h.jobs = h.jobs[:last]

h.mu.Unlock()

return job, true
}

Expand All @@ -203,8 +189,6 @@ func (h *messageHeap) Pop() (MessageJob, bool) {
func (h *messageHeap) Delete(id uint64) bool {
var canceledJob *MessageJob

h.mu.Lock()

for i, job := range h.jobs {
if job.ID == id {
h.jobs = append(h.jobs[:i], h.jobs[i+1:]...)
Expand All @@ -214,8 +198,6 @@ func (h *messageHeap) Delete(id uint64) bool {
}
}

h.mu.Unlock()

if canceledJob != nil {
canceledJob.cancel()

Expand Down Expand Up @@ -244,8 +226,6 @@ func (h *messageHeap) DeleteFunc(deleteFunc func(MessageJob) bool) []uint64 {
canceledJobs []MessageJob
)

h.mu.Lock()

newJobs := make([]MessageJob, 0, len(h.jobs))

for _, job := range h.jobs {
Expand All @@ -261,8 +241,6 @@ func (h *messageHeap) DeleteFunc(deleteFunc func(MessageJob) bool) []uint64 {

h.jobs = newJobs

h.mu.Unlock()

for _, job := range canceledJobs {
job.cancel()
}
Expand Down
9 changes: 4 additions & 5 deletions yatgbot/messagequeue/heap_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package messagequeue

import (
"sync"
"testing"
"time"
)
Expand All @@ -18,7 +17,7 @@ func mustPop(t *testing.T, h *messageHeap) MessageJob {
}

func TestHeap_PushPopOrdering(t *testing.T) {
h := newMessageHeap(&sync.Mutex{})
h := newMessageHeap()
now := time.Now()

// ID 3: highest priority (1) and *oldest* timestamp
Expand Down Expand Up @@ -48,7 +47,7 @@ func TestHeap_PushPopOrdering(t *testing.T) {
}

func TestHeap_DeleteByID(t *testing.T) {
h := newMessageHeap(&sync.Mutex{})
h := newMessageHeap()
h.Push(MessageJob{ID: 10})
h.Push(MessageJob{ID: 20})

Expand All @@ -66,7 +65,7 @@ func TestHeap_DeleteByID(t *testing.T) {
}

func TestHeap_DeleteFunc(t *testing.T) {
h := newMessageHeap(&sync.Mutex{})
h := newMessageHeap()

h.Push(MessageJob{ID: 1, Priority: 5})
h.Push(MessageJob{ID: 2, Priority: 3})
Expand All @@ -87,7 +86,7 @@ func TestHeap_DeleteFunc(t *testing.T) {
}

func TestHeap_PopOnEmpty(t *testing.T) {
h := newMessageHeap(&sync.Mutex{})
h := newMessageHeap()
if _, ok := h.Pop(); ok {
t.Fatalf("expected ok==false on empty Pop")
}
Expand Down
6 changes: 2 additions & 4 deletions yatgbot/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@ func NewDispatcher(
parseMode yatgmessageencoding.MessageEncoding,
log yalogger.Logger,
) *Dispatcher {
mu := sync.Mutex{}

dispatcher := &Dispatcher{
parseMode: parseMode,
Client: client,
messageQueueChannel: make(chan MessageJob),
log: log,
heap: newMessageHeap(&mu),
cond: *sync.NewCond(&mu),
heap: newMessageHeap(),
cond: *sync.NewCond(&sync.Mutex{}),
}

go dispatcher.proccessMessagesQueue()
Expand Down