diff --git a/yatgbot/messagequeue/heap.go b/yatgbot/messagequeue/heap.go index ba315fd..92f7101 100644 --- a/yatgbot/messagequeue/heap.go +++ b/yatgbot/messagequeue/heap.go @@ -91,7 +91,7 @@ func (j MessageJob) cancel() { // messageHeap is a thread-safe priority queue for MessageJob. type messageHeap struct { jobs []MessageJob - mu sync.Mutex + mu *sync.Mutex } // newMessageHeap creates a new instance of messageHeap. @@ -99,9 +99,10 @@ type messageHeap struct { // Example usage: // // heap := newMessageHeap() -func newMessageHeap() messageHeap { +func newMessageHeap(mu *sync.Mutex) messageHeap { return messageHeap{ jobs: make([]MessageJob, 0, PriorityQueueAllocSize), + mu: mu, } } diff --git a/yatgbot/messagequeue/heap_test.go b/yatgbot/messagequeue/heap_test.go index e5c0f13..76fa348 100644 --- a/yatgbot/messagequeue/heap_test.go +++ b/yatgbot/messagequeue/heap_test.go @@ -1,6 +1,7 @@ package messagequeue import ( + "sync" "testing" "time" ) @@ -17,7 +18,7 @@ func mustPop(t *testing.T, h *messageHeap) MessageJob { } func TestHeap_PushPopOrdering(t *testing.T) { - h := newMessageHeap() + h := newMessageHeap(&sync.Mutex{}) now := time.Now() // ID 3: highest priority (1) and *oldest* timestamp @@ -47,7 +48,7 @@ func TestHeap_PushPopOrdering(t *testing.T) { } func TestHeap_DeleteByID(t *testing.T) { - h := newMessageHeap() + h := newMessageHeap(&sync.Mutex{}) h.Push(MessageJob{ID: 10}) h.Push(MessageJob{ID: 20}) @@ -65,7 +66,7 @@ func TestHeap_DeleteByID(t *testing.T) { } func TestHeap_DeleteFunc(t *testing.T) { - h := newMessageHeap() + h := newMessageHeap(&sync.Mutex{}) h.Push(MessageJob{ID: 1, Priority: 5}) h.Push(MessageJob{ID: 2, Priority: 3}) @@ -86,7 +87,7 @@ func TestHeap_DeleteFunc(t *testing.T) { } func TestHeap_PopOnEmpty(t *testing.T) { - h := newMessageHeap() + h := newMessageHeap(&sync.Mutex{}) if _, ok := h.Pop(); ok { t.Fatalf("expected ok==false on empty Pop") } diff --git a/yatgbot/messagequeue/messagequeue.go b/yatgbot/messagequeue/messagequeue.go index 7a10773..a350f85 100644 --- a/yatgbot/messagequeue/messagequeue.go +++ b/yatgbot/messagequeue/messagequeue.go @@ -38,13 +38,15 @@ func NewDispatcher( parseMode yatgmessageencoding.MessageEncoding, log yalogger.Logger, ) *Dispatcher { + mu := sync.Mutex{} + dispatcher := &Dispatcher{ parseMode: parseMode, Client: client, messageQueueChannel: make(chan MessageJob), log: log, - heap: newMessageHeap(), - cond: *sync.NewCond(&sync.Mutex{}), + heap: newMessageHeap(&mu), + cond: *sync.NewCond(&mu), } go dispatcher.proccessMessagesQueue() diff --git a/yatgbot/updates.go b/yatgbot/updates.go index f1a1585..f61e11a 100644 --- a/yatgbot/updates.go +++ b/yatgbot/updates.go @@ -19,18 +19,18 @@ import ( // dispatcher := tg.NewUpdateDispatcher(yourClient) // // router.Bind(dispatcher) -func (r *Dispatcher) Bind(tgDispatcher *tg.UpdateDispatcher) { - tgDispatcher.OnNewMessage(r.handleNewMessage) - tgDispatcher.OnBotCallbackQuery(r.handleBotCallbackQuery) - tgDispatcher.OnDeleteMessages(r.handleDeleteMessages) - tgDispatcher.OnEditMessage(r.handleEditMessage) - tgDispatcher.OnNewChannelMessage(r.handleNewChannelMessage) - tgDispatcher.OnEditChannelMessage(r.handleEditChannelMessage) - tgDispatcher.OnChannelParticipant(r.handleChannelParticipant) - tgDispatcher.OnDeleteChannelMessages(r.handleDeleteChannelMessages) - tgDispatcher.OnBotMessageReactions(r.handleBotMessageReactions) - tgDispatcher.OnBotPrecheckoutQuery(r.handleBotPrecheckoutQuery) - tgDispatcher.OnBotInlineQuery(r.handleBotInlineQuery) +func (r *Dispatcher) Bind(tgDispatcher *tg.UpdateDispatcher, sync bool) { + tgDispatcher.OnNewMessage(wrapAsync(sync, r.handleNewMessage)) + tgDispatcher.OnBotCallbackQuery(wrapAsync(sync, r.handleBotCallbackQuery)) + tgDispatcher.OnDeleteMessages(wrapAsync(sync, r.handleDeleteMessages)) + tgDispatcher.OnEditMessage(wrapAsync(sync, r.handleEditMessage)) + tgDispatcher.OnNewChannelMessage(wrapAsync(sync, r.handleNewChannelMessage)) + tgDispatcher.OnEditChannelMessage(wrapAsync(sync, r.handleEditChannelMessage)) + tgDispatcher.OnChannelParticipant(wrapAsync(sync, r.handleChannelParticipant)) + tgDispatcher.OnDeleteChannelMessages(wrapAsync(sync, r.handleDeleteChannelMessages)) + tgDispatcher.OnBotMessageReactions(wrapAsync(sync, r.handleBotMessageReactions)) + tgDispatcher.OnBotPrecheckoutQuery(wrapAsync(sync, r.handleBotPrecheckoutQuery)) + tgDispatcher.OnBotInlineQuery(wrapAsync(sync, r.handleBotInlineQuery)) } // handleNewMessage wraps the new message handler to match the expected signature for the update dispatcher. @@ -335,3 +335,21 @@ func (r *Dispatcher) handleBotMessageReactions( inputPeer: peer, }) } + +// wrapAsync wraps the handler to run asynchronously if sync is false. +func wrapAsync[T tg.UpdateClass]( + sync bool, + h func(context.Context, tg.Entities, T) error, +) func(context.Context, tg.Entities, T) error { + if sync { + return h + } + + return func(ctx context.Context, e tg.Entities, upd T) error { + go func() { + _ = h(ctx, e, upd) + }() + + return nil + } +} diff --git a/yatgbot/yatgbot.go b/yatgbot/yatgbot.go index 9018552..ad0199b 100644 --- a/yatgbot/yatgbot.go +++ b/yatgbot/yatgbot.go @@ -34,6 +34,7 @@ type Options struct { Cache yacache.Cache[*redis.Client] MainRouter *RouterGroup ParseMode yatgmessageencoding.MessageEncoding + Sync bool Log yalogger.Logger } @@ -53,6 +54,7 @@ type Options struct { // MainRouter: yourMainRouterGroup, // ParseMode: yourParseModeInstance, // Log: yourLoggerInstance, +// Sync: false, // EmbeddedLocales: yourEmbeddedLocalesFS, // } // @@ -152,7 +154,7 @@ func InitYaTgBot( MainRouter: options.MainRouter, } - dispatcher.Bind(&telegramDispatcher) + dispatcher.Bind(&telegramDispatcher, options.Sync) return dispatcher, nil } diff --git a/yatgclient/heplers.go b/yatgclient/heplers.go index 547fcdd..3b4eed4 100644 --- a/yatgclient/heplers.go +++ b/yatgclient/heplers.go @@ -121,10 +121,7 @@ func (c *Client) UploadFile( ctx context.Context, file io.Reader, ) (tg.MessageMediaClass, yaerrors.Error) { - var ( - peer tg.InputPeerClass - chunkID int - ) + var chunkID int randID := rand.Int63() @@ -165,14 +162,8 @@ func (c *Client) UploadFile( chunkID++ } - if c.IsBot { - peer = &tg.InputPeerEmpty{} - } else { - peer = &tg.InputPeerSelf{} - } - uploadedMedia, err := c.API().MessagesUploadMedia(ctx, &tg.MessagesUploadMediaRequest{ - Peer: peer, + Peer: &tg.InputPeerSelf{}, Media: &tg.InputMediaUploadedPhoto{ File: &tg.InputFile{ ID: randID,