feat(history-sync): emit messaging-history.set event on sync completion and fix race condition#2440
Open
alexandrereyes wants to merge 6 commits intoEvolutionAPI:mainfrom
Open
feat(history-sync): emit messaging-history.set event on sync completion and fix race condition#2440alexandrereyes wants to merge 6 commits intoEvolutionAPI:mainfrom
alexandrereyes wants to merge 6 commits intoEvolutionAPI:mainfrom
Conversation
…on and fix race condition Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database persistence, fixing a race condition where consumers received the event before data was queryable. Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%, allowing consumers to know exactly when history sync is complete and messages are available in the database. Register the new event across all transport types (Webhook, WebSocket, RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas.
…on and fix race condition
Contributor
Reviewer's GuideEnsures history sync webhooks are emitted only after data persistence to avoid race conditions, introduces a new messaging-history.set event on sync completion across all transports, adds safeguards for audio message upserts, and adds a GitHub Actions workflow to publish Docker images to GHCR. Sequence diagram for history sync persistence and messaging-history.setsequenceDiagram
participant BaileysStartupService
participant PrismaChatRepo as PrismaChatRepository
participant PrismaMessageRepo as PrismaMessageRepository
participant WebhookSystem
participant ExternalConsumer
BaileysStartupService->>BaileysStartupService: build chatsRaw
BaileysStartupService->>PrismaChatRepo: createMany(chatsRaw)
PrismaChatRepo-->>BaileysStartupService: chats persisted
BaileysStartupService->>WebhookSystem: sendDataWebhook(CHATS_SET, chatsRaw)
BaileysStartupService->>BaileysStartupService: build messagesRaw, isLatest, progress
alt SAVE_DATA_HISTORIC enabled
BaileysStartupService->>PrismaMessageRepo: createMany(messagesRaw)
PrismaMessageRepo-->>BaileysStartupService: messages persisted
else SAVE_DATA_HISTORIC disabled
BaileysStartupService-->>BaileysStartupService: skip persistence
end
BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGES_SET, messagesRaw, isLatest, progress)
WebhookSystem-->>ExternalConsumer: emit messages.set webhook
opt sync completion
BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGING_HISTORY_SET, summaryCounts)
WebhookSystem-->>ExternalConsumer: emit messaging-history.set webhook
end
ExternalConsumer->>ExternalConsumer: detect sync completion via messaging-history.set
ExternalConsumer->>PrismaMessageRepo: query messages
PrismaMessageRepo-->>ExternalConsumer: persisted messages returned
Sequence diagram for audio messages.upsert and fallback emissionsequenceDiagram
participant WhatsApp
participant BaileysStartupService
participant BaileysCache
participant WebhookSystem
WhatsApp-->>BaileysStartupService: incoming audio message upsert
BaileysStartupService->>BaileysStartupService: prepare messageRaw
BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGES_UPSERT, messageRaw)
BaileysStartupService->>BaileysCache: set(getUpsertEmittedCacheKey(messageId), true, ttl)
%% later message update for same audio...
WhatsApp-->>BaileysStartupService: message status update
BaileysStartupService->>BaileysCache: get(getUpsertEmittedCacheKey(messageId))
alt upsert already emitted
BaileysCache-->>BaileysStartupService: true
BaileysStartupService-->>BaileysStartupService: skip fallback upsert
else upsert not emitted
BaileysCache-->>BaileysStartupService: null or false
BaileysStartupService->>BaileysStartupService: build fallbackUpsertPayload from stored message
BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGES_UPSERT, fallbackUpsertPayload)
BaileysStartupService->>BaileysCache: set(getUpsertEmittedCacheKey(messageId), true, ttl)
end
Updated class diagram for BaileysStartupService history sync and audio handlingclassDiagram
class BaileysStartupService {
- string instanceId
- any instance
- ConfigService configService
- PrismaRepository prismaRepository
- BaileysCache baileysCache
- Logger logger
+ getUpsertEmittedCacheKey(messageId string) string
+ sendDataWebhook(event Events, payload any, bulk boolean, headers any, extra any) Promise~void~
+ handleHistorySync(chatsRaw any[], messagesRaw any[], isLatest boolean, progress number) Promise~void~
+ handleMessageUpsert(messageRaw any) Promise~void~
+ handleMessageUpdate(key any, message any) Promise~void~
}
class ConfigService {
+ getDatabaseConfig() Database
+ getCacheConfig() CacheConf
+ getChatwootConfig() Chatwoot
+ getS3Config() S3
}
class PrismaRepository {
+ chat ChatRepository
+ message MessageRepository
+ media MediaRepository
}
class ChatRepository {
+ createMany(data any[], skipDuplicates boolean) Promise~void~
}
class MessageRepository {
+ createMany(data any[], skipDuplicates boolean) Promise~void~
+ update(where any, data any) Promise~void~
}
class MediaRepository {
+ create(data any) Promise~void~
}
class BaileysCache {
+ get(key string) Promise~any~
+ set(key string, value any, ttlSeconds number) Promise~void~
}
class Logger {
+ warn(message any) void
+ verbose(message any) void
+ error(message any) void
}
BaileysStartupService --> ConfigService : uses
BaileysStartupService --> PrismaRepository : uses
BaileysStartupService --> BaileysCache : uses
BaileysStartupService --> Logger : logs
PrismaRepository --> ChatRepository : aggregates
PrismaRepository --> MessageRepository : aggregates
PrismaRepository --> MediaRepository : aggregates
BaileysStartupService ..> ChatRepository : persists chats
BaileysStartupService ..> MessageRepository : persists messages
BaileysStartupService ..> MediaRepository : persists media
BaileysStartupService ..> BaileysCache : stores upsertEmitted flag
BaileysStartupService ..> Logger : logs media and fallback events
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The
MESSAGING_HISTORY_SETpayload usesmessagesRaw.length,chatsRaw.length, andcontacts?.lengthfrom the final batch whenprogress === 100; if history sync can span multiple batches this will only report counts for that last batch rather than the full synced totals, so consider accumulating totals across the sync or deriving them from a stable source. - For the
MESSAGING_HISTORY_SETevent you’re callingsendDataWebhook(Events.MESSAGING_HISTORY_SET, {...})with a different argument shape than the nearbyMESSAGES_SETcall (which passestrue, undefined, extra); double-check whether this helper expects the additional flags/metadata so the new event behaves consistently across transports.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `MESSAGING_HISTORY_SET` payload uses `messagesRaw.length`, `chatsRaw.length`, and `contacts?.length` from the final batch when `progress === 100`; if history sync can span multiple batches this will only report counts for that last batch rather than the full synced totals, so consider accumulating totals across the sync or deriving them from a stable source.
- For the `MESSAGING_HISTORY_SET` event you’re calling `sendDataWebhook(Events.MESSAGING_HISTORY_SET, {...})` with a different argument shape than the nearby `MESSAGES_SET` call (which passes `true, undefined, extra`); double-check whether this helper expects the additional flags/metadata so the new event behaves consistently across transports.
## Individual Comments
### Comment 1
<location path="src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts" line_range="1078-1079" />
<code_context>
contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })),
);
+ if (progress === 100) {
+ this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
+ messageCount: messagesRaw.length,
+ chatCount: chatsRaw.length,
</code_context>
<issue_to_address>
**suggestion:** MESSAGING_HISTORY_SET payload may be misleading if messages/chats are chunked or filtered.
Since `MESSAGING_HISTORY_SET` is only sent when `progress === 100`, `messageCount` and `chatCount` currently reflect `messagesRaw.length` / `chatsRaw.length` for that final batch only. If sync uses batching, filtering, or de-duplication, consumers may misinterpret these as global history totals. Either track and emit cumulative counts over the whole sync, or document that the event exposes counts for the final (or current) chunk only to avoid misuse downstream.
Suggested implementation:
```typescript
if (progress === 100) {
this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
// cumulative counts over the entire history sync
messageCount: this.messagingHistoryTotalMessageCount,
chatCount: this.messagingHistoryTotalChatCount,
contactCount: this.messagingHistoryTotalContactCount ?? contacts?.length ?? 0,
});
}
```
To fully implement cumulative counts and make the payload semantically correct, you should also:
1. **Add fields to the WhatsAppBaileysService class** (or the relevant service class that owns this code):
- Initialize at class level, e.g.:
- `private messagingHistoryTotalMessageCount = 0;`
- `private messagingHistoryTotalChatCount = 0;`
- `private messagingHistoryTotalContactCount = 0;`
2. **Reset the counters when a new history sync starts**:
- At the beginning of the history sync workflow (where `progress` likely starts at `0` or where a sync request is initiated), explicitly set the counters to `0`:
- `this.messagingHistoryTotalMessageCount = 0;`
- `this.messagingHistoryTotalChatCount = 0;`
- `this.messagingHistoryTotalContactCount = 0;`
3. **Increment the counters for each batch**:
- Wherever `messagesRaw`, `chatsRaw`, and `contacts` for the current chunk are populated (before this snippet), increment the totals:
- `this.messagingHistoryTotalMessageCount += messagesRaw.length;`
- `this.messagingHistoryTotalChatCount += chatsRaw.length;`
- For contacts, either:
- Use unique contacts only (e.g. increment by the number of *new* contacts discovered in this batch), or
- If deduplication is already done, `this.messagingHistoryTotalContactCount += contacts.length;`
- Make sure this logic aligns with any existing filtering/de-duplication so that the totals represent what downstream consumers expect.
4. **Optional documentation**:
- Add a short doc comment on the `MESSAGING_HISTORY_SET` handler or event definition clarifying that `messageCount`, `chatCount`, and `contactCount` are cumulative over the entire sync, not per-batch, so consumers don’t misinterpret the payload.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Track message, chat and contact counts across all history sync batches using instance-level counters, so the final event reports accurate totals instead of only the last batch counts. Addresses Sourcery review feedback on PR EvolutionAPI#2440.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When an external application subscribes to
messages.setevents and tries to query messages from the API upon receiving the event, it hits a race condition: the webhook is emitted before the data is persisted to the database viacreateMany. This means consumers receive the notification but can't find the messages yet.Additionally, there is no dedicated event signaling that history sync is complete. Consumers receive N partial
messages.setevents with incrementalprogressand must guess when it's safe to start querying. The Chatwoot integration already handles this internally viahistorySyncNotification(progress === 100), but external consumers have no equivalent signal.Solution
1. Fix race condition: persist before emitting
Reorder
CHATS_SETandMESSAGES_SETwebhook emissions to fire afterprismaRepository.*.createMany(), ensuring data is queryable when the event reaches consumers.Before:
After:
2. Emit
messaging-history.setwhen sync completesWhen
progress === 100, emit a newMESSAGING_HISTORY_SETevent with summary counts:{ "event": "messaging-history.set", "instance": "my-instance", "data": { "messageCount": 142, "chatCount": 35, "contactCount": 50 } }The
Events.MESSAGING_HISTORY_SETenum value already existed (messaging-history.set) but was only used internally as the Baileys handler name. This PR exposes it to external consumers.3. Register event across all transports
Added
MESSAGING_HISTORY_SETto:EventController.events(subscribable event list)EventsRabbitmq,EventsWebhook,EventsPusher,Sqs.EVENTSinstance.schema.ts(webhook, rabbitmq, nats, sqs enums)🔗 Related Issue
Builds on top of #2260 which added
isLatestandprogressto themessages.setpayload.🧪 Type of Change
🧪 Testing
tsc --noEmit && tsup)✅ Checklist
console.logadded (uses existing structured logging)extraspread (all controllers already use...(extra ?? {}))📝 Files Changed
whatsapp.baileys.service.tscreateMany; emitMESSAGING_HISTORY_SETonprogress === 100event.controller.tsMESSAGING_HISTORY_SETto subscribable events listenv.config.tsinstance.schema.tsSummary by Sourcery
Ensure messaging history sync events are emitted after data persistence and expose a completion event across all transports.
New Features:
Bug Fixes:
Enhancements:
CI: