Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions yatgbot/messagequeue/heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,18 @@ func (j MessageJob) cancel() {
// messageHeap is a thread-safe priority queue for MessageJob.
type messageHeap struct {
jobs []MessageJob
mu sync.Mutex
mu *sync.Mutex
}

// newMessageHeap creates a new instance of messageHeap.
//
// Example usage:
//
// heap := newMessageHeap()
func newMessageHeap() messageHeap {
func newMessageHeap(mu *sync.Mutex) messageHeap {
return messageHeap{
jobs: make([]MessageJob, 0, PriorityQueueAllocSize),
mu: mu,
}
}

Expand Down
9 changes: 5 additions & 4 deletions yatgbot/messagequeue/heap_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package messagequeue

import (
"sync"
"testing"
"time"
)
Expand All @@ -17,7 +18,7 @@ func mustPop(t *testing.T, h *messageHeap) MessageJob {
}

func TestHeap_PushPopOrdering(t *testing.T) {
h := newMessageHeap()
h := newMessageHeap(&sync.Mutex{})
now := time.Now()

// ID 3: highest priority (1) and *oldest* timestamp
Expand Down Expand Up @@ -47,7 +48,7 @@ func TestHeap_PushPopOrdering(t *testing.T) {
}

func TestHeap_DeleteByID(t *testing.T) {
h := newMessageHeap()
h := newMessageHeap(&sync.Mutex{})
h.Push(MessageJob{ID: 10})
h.Push(MessageJob{ID: 20})

Expand All @@ -65,7 +66,7 @@ func TestHeap_DeleteByID(t *testing.T) {
}

func TestHeap_DeleteFunc(t *testing.T) {
h := newMessageHeap()
h := newMessageHeap(&sync.Mutex{})

h.Push(MessageJob{ID: 1, Priority: 5})
h.Push(MessageJob{ID: 2, Priority: 3})
Expand All @@ -86,7 +87,7 @@ func TestHeap_DeleteFunc(t *testing.T) {
}

func TestHeap_PopOnEmpty(t *testing.T) {
h := newMessageHeap()
h := newMessageHeap(&sync.Mutex{})
if _, ok := h.Pop(); ok {
t.Fatalf("expected ok==false on empty Pop")
}
Expand Down
6 changes: 4 additions & 2 deletions yatgbot/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ func NewDispatcher(
parseMode yatgmessageencoding.MessageEncoding,
log yalogger.Logger,
) *Dispatcher {
mu := sync.Mutex{}

dispatcher := &Dispatcher{
parseMode: parseMode,
Client: client,
messageQueueChannel: make(chan MessageJob),
log: log,
heap: newMessageHeap(),
cond: *sync.NewCond(&sync.Mutex{}),
heap: newMessageHeap(&mu),
cond: *sync.NewCond(&mu),
}

go dispatcher.proccessMessagesQueue()
Expand Down
42 changes: 30 additions & 12 deletions yatgbot/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ import (
// dispatcher := tg.NewUpdateDispatcher(yourClient)
//
// router.Bind(dispatcher)
func (r *Dispatcher) Bind(tgDispatcher *tg.UpdateDispatcher) {
tgDispatcher.OnNewMessage(r.handleNewMessage)
tgDispatcher.OnBotCallbackQuery(r.handleBotCallbackQuery)
tgDispatcher.OnDeleteMessages(r.handleDeleteMessages)
tgDispatcher.OnEditMessage(r.handleEditMessage)
tgDispatcher.OnNewChannelMessage(r.handleNewChannelMessage)
tgDispatcher.OnEditChannelMessage(r.handleEditChannelMessage)
tgDispatcher.OnChannelParticipant(r.handleChannelParticipant)
tgDispatcher.OnDeleteChannelMessages(r.handleDeleteChannelMessages)
tgDispatcher.OnBotMessageReactions(r.handleBotMessageReactions)
tgDispatcher.OnBotPrecheckoutQuery(r.handleBotPrecheckoutQuery)
tgDispatcher.OnBotInlineQuery(r.handleBotInlineQuery)
func (r *Dispatcher) Bind(tgDispatcher *tg.UpdateDispatcher, sync bool) {
tgDispatcher.OnNewMessage(wrapAsync(sync, r.handleNewMessage))
tgDispatcher.OnBotCallbackQuery(wrapAsync(sync, r.handleBotCallbackQuery))
tgDispatcher.OnDeleteMessages(wrapAsync(sync, r.handleDeleteMessages))
tgDispatcher.OnEditMessage(wrapAsync(sync, r.handleEditMessage))
tgDispatcher.OnNewChannelMessage(wrapAsync(sync, r.handleNewChannelMessage))
tgDispatcher.OnEditChannelMessage(wrapAsync(sync, r.handleEditChannelMessage))
tgDispatcher.OnChannelParticipant(wrapAsync(sync, r.handleChannelParticipant))
tgDispatcher.OnDeleteChannelMessages(wrapAsync(sync, r.handleDeleteChannelMessages))
tgDispatcher.OnBotMessageReactions(wrapAsync(sync, r.handleBotMessageReactions))
tgDispatcher.OnBotPrecheckoutQuery(wrapAsync(sync, r.handleBotPrecheckoutQuery))
tgDispatcher.OnBotInlineQuery(wrapAsync(sync, r.handleBotInlineQuery))
}

// handleNewMessage wraps the new message handler to match the expected signature for the update dispatcher.
Expand Down Expand Up @@ -335,3 +335,21 @@ func (r *Dispatcher) handleBotMessageReactions(
inputPeer: peer,
})
}

// wrapAsync wraps the handler to run asynchronously if sync is false.
func wrapAsync[T tg.UpdateClass](
sync bool,
h func(context.Context, tg.Entities, T) error,
) func(context.Context, tg.Entities, T) error {
if sync {
return h
}

return func(ctx context.Context, e tg.Entities, upd T) error {
go func() {
_ = h(ctx, e, upd)
}()

return nil
}
}
4 changes: 3 additions & 1 deletion yatgbot/yatgbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Options struct {
Cache yacache.Cache[*redis.Client]
MainRouter *RouterGroup
ParseMode yatgmessageencoding.MessageEncoding
Sync bool
Log yalogger.Logger
}

Expand All @@ -53,6 +54,7 @@ type Options struct {
// MainRouter: yourMainRouterGroup,
// ParseMode: yourParseModeInstance,
// Log: yourLoggerInstance,
// Sync: false,
// EmbeddedLocales: yourEmbeddedLocalesFS,
// }
//
Expand Down Expand Up @@ -152,7 +154,7 @@ func InitYaTgBot(
MainRouter: options.MainRouter,
}

dispatcher.Bind(&telegramDispatcher)
dispatcher.Bind(&telegramDispatcher, options.Sync)

return dispatcher, nil
}
13 changes: 2 additions & 11 deletions yatgclient/heplers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ func (c *Client) UploadFile(
ctx context.Context,
file io.Reader,
) (tg.MessageMediaClass, yaerrors.Error) {
var (
peer tg.InputPeerClass
chunkID int
)
var chunkID int

randID := rand.Int63()

Expand Down Expand Up @@ -165,14 +162,8 @@ func (c *Client) UploadFile(
chunkID++
}

if c.IsBot {
peer = &tg.InputPeerEmpty{}
} else {
peer = &tg.InputPeerSelf{}
}

uploadedMedia, err := c.API().MessagesUploadMedia(ctx, &tg.MessagesUploadMediaRequest{
Peer: peer,
Peer: &tg.InputPeerSelf{},
Media: &tg.InputMediaUploadedPhoto{
File: &tg.InputFile{
ID: randID,
Expand Down