Skip to content

Comments

feat(history-sync): emit messaging-history.set event on sync completion and fix race condition#2440

Open
alexandrereyes wants to merge 6 commits intoEvolutionAPI:mainfrom
alexandrereyes:main
Open

feat(history-sync): emit messaging-history.set event on sync completion and fix race condition#2440
alexandrereyes wants to merge 6 commits intoEvolutionAPI:mainfrom
alexandrereyes:main

Conversation

@alexandrereyes
Copy link

@alexandrereyes alexandrereyes commented Feb 23, 2026

Problem

When an external application subscribes to messages.set events and tries to query messages from the API upon receiving the event, it hits a race condition: the webhook is emitted before the data is persisted to the database via createMany. This means consumers receive the notification but can't find the messages yet.

Additionally, there is no dedicated event signaling that history sync is complete. Consumers receive N partial messages.set events with incremental progress and must guess when it's safe to start querying. The Chatwoot integration already handles this internally via historySyncNotification (progress === 100), but external consumers have no equivalent signal.

Solution

1. Fix race condition: persist before emitting

Reorder CHATS_SET and MESSAGES_SET webhook emissions to fire after prismaRepository.*.createMany(), ensuring data is queryable when the event reaches consumers.

Before:

sendDataWebhook(MESSAGES_SET, data)  →  consumer receives event
createMany(data)                     →  data now in DB (too late)

After:

createMany(data)                     →  data now in DB
sendDataWebhook(MESSAGES_SET, data)  →  consumer receives event (data available)

2. Emit messaging-history.set when sync completes

When progress === 100, emit a new MESSAGING_HISTORY_SET event with summary counts:

{
  "event": "messaging-history.set",
  "instance": "my-instance",
  "data": {
    "messageCount": 142,
    "chatCount": 35,
    "contactCount": 50
  }
}

The Events.MESSAGING_HISTORY_SET enum value already existed (messaging-history.set) but was only used internally as the Baileys handler name. This PR exposes it to external consumers.

3. Register event across all transports

Added MESSAGING_HISTORY_SET to:

  • EventController.events (subscribable event list)
  • Type definitions: EventsRabbitmq, EventsWebhook, EventsPusher, Sqs.EVENTS
  • Environment config: all 6 transports (RabbitMQ, NATS, SQS, Kafka, Pusher, Webhook)
  • Validation schemas: instance.schema.ts (webhook, rabbitmq, nats, sqs enums)

🔗 Related Issue

Builds on top of #2260 which added isLatest and progress to the messages.set payload.

🧪 Type of Change

  • 🐛 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)
  • 💥 Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • 📚 Documentation update
  • 🔧 Refactoring (no functional changes)
  • ⚡ Performance improvement
  • 🧹 Code cleanup
  • 🔒 Security fix

🧪 Testing

  • Manual testing completed
  • Functionality verified in development environment
  • No breaking changes introduced
  • TypeScript build passes (tsc --noEmit && tsup)
  • Lint passes

✅ Checklist

  • My code follows the project's style guidelines
  • I have performed a self-review of my code
  • My changes generate no new warnings
  • I have manually tested my changes thoroughly
  • Fully backward compatible — the new event is opt-in via event subscription
  • No console.log added (uses existing structured logging)
  • No unguarded extra spread (all controllers already use ...(extra ?? {}))

📝 Files Changed

File Change
whatsapp.baileys.service.ts Reorder webhooks after createMany; emit MESSAGING_HISTORY_SET on progress === 100
event.controller.ts Add MESSAGING_HISTORY_SET to subscribable events list
env.config.ts Add event to all transport type definitions and env config blocks
instance.schema.ts Add event to all 4 inline validation enums

Summary by Sourcery

Ensure messaging history sync events are emitted after data persistence and expose a completion event across all transports.

New Features:

  • Emit a messaging-history.set event with summary counts when message history sync reaches 100% progress.
  • Expose the MESSAGING_HISTORY_SET event as subscribable across webhook, RabbitMQ, NATS, SQS, Kafka, and Pusher transports.

Bug Fixes:

  • Resolve a race condition by emitting chats.set and messages.set webhooks only after bulk message and chat data are persisted.
  • Avoid duplicate or missing messages.upsert emissions for incoming audio messages by caching emitted events and adding a fallback emitter on message updates.

Enhancements:

  • Improve media upload handling for messages with invalid or missing media content to reduce noisy processing and logging.

CI:

  • Add a GitHub Actions workflow to build and publish a multi-architecture Docker image to GHCR on pushes, tags, and manual dispatch.

…on and fix race condition

Reorder webhook emissions (CHATS_SET, MESSAGES_SET) to fire after database
persistence, fixing a race condition where consumers received the event
before data was queryable.

Emit a new MESSAGING_HISTORY_SET event when progress reaches 100%,
allowing consumers to know exactly when history sync is complete and
messages are available in the database.

Register the new event across all transport types (Webhook, WebSocket,
RabbitMQ, NATS, SQS, Kafka, Pusher) and validation schemas.
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Feb 23, 2026

Reviewer's Guide

Ensures history sync webhooks are emitted only after data persistence to avoid race conditions, introduces a new messaging-history.set event on sync completion across all transports, adds safeguards for audio message upserts, and adds a GitHub Actions workflow to publish Docker images to GHCR.

Sequence diagram for history sync persistence and messaging-history.set

sequenceDiagram
  participant BaileysStartupService
  participant PrismaChatRepo as PrismaChatRepository
  participant PrismaMessageRepo as PrismaMessageRepository
  participant WebhookSystem
  participant ExternalConsumer

  BaileysStartupService->>BaileysStartupService: build chatsRaw
  BaileysStartupService->>PrismaChatRepo: createMany(chatsRaw)
  PrismaChatRepo-->>BaileysStartupService: chats persisted
  BaileysStartupService->>WebhookSystem: sendDataWebhook(CHATS_SET, chatsRaw)

  BaileysStartupService->>BaileysStartupService: build messagesRaw, isLatest, progress
  alt SAVE_DATA_HISTORIC enabled
    BaileysStartupService->>PrismaMessageRepo: createMany(messagesRaw)
    PrismaMessageRepo-->>BaileysStartupService: messages persisted
  else SAVE_DATA_HISTORIC disabled
    BaileysStartupService-->>BaileysStartupService: skip persistence
  end

  BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGES_SET, messagesRaw, isLatest, progress)
  WebhookSystem-->>ExternalConsumer: emit messages.set webhook

  opt sync completion
    BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGING_HISTORY_SET, summaryCounts)
    WebhookSystem-->>ExternalConsumer: emit messaging-history.set webhook
  end

  ExternalConsumer->>ExternalConsumer: detect sync completion via messaging-history.set
  ExternalConsumer->>PrismaMessageRepo: query messages
  PrismaMessageRepo-->>ExternalConsumer: persisted messages returned
Loading

Sequence diagram for audio messages.upsert and fallback emission

sequenceDiagram
  participant WhatsApp
  participant BaileysStartupService
  participant BaileysCache
  participant WebhookSystem

  WhatsApp-->>BaileysStartupService: incoming audio message upsert
  BaileysStartupService->>BaileysStartupService: prepare messageRaw
  BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGES_UPSERT, messageRaw)
  BaileysStartupService->>BaileysCache: set(getUpsertEmittedCacheKey(messageId), true, ttl)

  %% later message update for same audio...
  WhatsApp-->>BaileysStartupService: message status update
  BaileysStartupService->>BaileysCache: get(getUpsertEmittedCacheKey(messageId))
  alt upsert already emitted
    BaileysCache-->>BaileysStartupService: true
    BaileysStartupService-->>BaileysStartupService: skip fallback upsert
  else upsert not emitted
    BaileysCache-->>BaileysStartupService: null or false
    BaileysStartupService->>BaileysStartupService: build fallbackUpsertPayload from stored message
    BaileysStartupService->>WebhookSystem: sendDataWebhook(MESSAGES_UPSERT, fallbackUpsertPayload)
    BaileysStartupService->>BaileysCache: set(getUpsertEmittedCacheKey(messageId), true, ttl)
  end
Loading

Updated class diagram for BaileysStartupService history sync and audio handling

classDiagram
  class BaileysStartupService {
    - string instanceId
    - any instance
    - ConfigService configService
    - PrismaRepository prismaRepository
    - BaileysCache baileysCache
    - Logger logger
    + getUpsertEmittedCacheKey(messageId string) string
    + sendDataWebhook(event Events, payload any, bulk boolean, headers any, extra any) Promise~void~
    + handleHistorySync(chatsRaw any[], messagesRaw any[], isLatest boolean, progress number) Promise~void~
    + handleMessageUpsert(messageRaw any) Promise~void~
    + handleMessageUpdate(key any, message any) Promise~void~
  }

  class ConfigService {
    + getDatabaseConfig() Database
    + getCacheConfig() CacheConf
    + getChatwootConfig() Chatwoot
    + getS3Config() S3
  }

  class PrismaRepository {
    + chat ChatRepository
    + message MessageRepository
    + media MediaRepository
  }

  class ChatRepository {
    + createMany(data any[], skipDuplicates boolean) Promise~void~
  }

  class MessageRepository {
    + createMany(data any[], skipDuplicates boolean) Promise~void~
    + update(where any, data any) Promise~void~
  }

  class MediaRepository {
    + create(data any) Promise~void~
  }

  class BaileysCache {
    + get(key string) Promise~any~
    + set(key string, value any, ttlSeconds number) Promise~void~
  }

  class Logger {
    + warn(message any) void
    + verbose(message any) void
    + error(message any) void
  }

  BaileysStartupService --> ConfigService : uses
  BaileysStartupService --> PrismaRepository : uses
  BaileysStartupService --> BaileysCache : uses
  BaileysStartupService --> Logger : logs
  PrismaRepository --> ChatRepository : aggregates
  PrismaRepository --> MessageRepository : aggregates
  PrismaRepository --> MediaRepository : aggregates
  BaileysStartupService ..> ChatRepository : persists chats
  BaileysStartupService ..> MessageRepository : persists messages
  BaileysStartupService ..> MediaRepository : persists media
  BaileysStartupService ..> BaileysCache : stores upsertEmitted flag
  BaileysStartupService ..> Logger : logs media and fallback events
Loading

File-Level Changes

Change Details Files
Reordered history-sync webhook emission to occur after database persistence and emit a completion summary event when sync reaches 100% progress.
  • Move CHATS_SET webhook send after chat createMany call when historic persistence is enabled.
  • Move MESSAGES_SET webhook send after message createMany call to guarantee messages are persisted before external notification.
  • Emit MESSAGING_HISTORY_SET webhook with message/chat/contact counts when progress equals 100 during history sync.
src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts
Added a cache-based safeguard to avoid missing messages.upsert events for audio messages and prevent duplicate emissions.
  • Introduce getUpsertEmittedCacheKey helper to build cache keys per instance/message.
  • On audio messages.upsert, set a cache flag after emitting to track that the event was sent.
  • On message updates, check cache and emit a fallback messages.upsert payload for audio messages that never had an upsert emitted, then set the cache flag.
src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts
Exposed the MESSAGING_HISTORY_SET event to external consumers across controllers, configuration, and validation.
  • Add MESSAGING_HISTORY_SET to the EventController subscribable events list.
  • Extend transport event type definitions (RabbitMQ, SQS, Webhook, Pusher) with a MESSAGING_HISTORY_SET boolean flag.
  • Wire MESSAGING_HISTORY_SET into env-based configuration for RabbitMQ, NATS, SQS, Kafka, Pusher, and Webhook transports.
  • Update instanceSchema enums so instances can enable/disable MESSAGING_HISTORY_SET for webhook, rabbitmq, nats, and sqs.
src/api/integrations/event/event.controller.ts
src/config/env.config.ts
src/validate/instance.schema.ts
Added CI automation to build and publish multi-arch Docker images to GitHub Container Registry on pushes and tags.
  • Create a GitHub Actions workflow to build and push the evolution-api image to ghcr.io on main and version tags.
  • Set up QEMU and Buildx for multi-architecture builds (amd64 and arm64).
  • Configure Docker metadata, tags, labels, cache, and GHCR authentication using GITHUB_TOKEN.
.github/workflows/publish_ghcr_image.yml

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The MESSAGING_HISTORY_SET payload uses messagesRaw.length, chatsRaw.length, and contacts?.length from the final batch when progress === 100; if history sync can span multiple batches this will only report counts for that last batch rather than the full synced totals, so consider accumulating totals across the sync or deriving them from a stable source.
  • For the MESSAGING_HISTORY_SET event you’re calling sendDataWebhook(Events.MESSAGING_HISTORY_SET, {...}) with a different argument shape than the nearby MESSAGES_SET call (which passes true, undefined, extra); double-check whether this helper expects the additional flags/metadata so the new event behaves consistently across transports.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `MESSAGING_HISTORY_SET` payload uses `messagesRaw.length`, `chatsRaw.length`, and `contacts?.length` from the final batch when `progress === 100`; if history sync can span multiple batches this will only report counts for that last batch rather than the full synced totals, so consider accumulating totals across the sync or deriving them from a stable source.
- For the `MESSAGING_HISTORY_SET` event you’re calling `sendDataWebhook(Events.MESSAGING_HISTORY_SET, {...})` with a different argument shape than the nearby `MESSAGES_SET` call (which passes `true, undefined, extra`); double-check whether this helper expects the additional flags/metadata so the new event behaves consistently across transports.

## Individual Comments

### Comment 1
<location path="src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts" line_range="1078-1079" />
<code_context>
           contacts.filter((c) => !!c.notify || !!c.name).map((c) => ({ id: c.id, name: c.name ?? c.notify })),
         );

+        if (progress === 100) {
+          this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
+            messageCount: messagesRaw.length,
+            chatCount: chatsRaw.length,
</code_context>
<issue_to_address>
**suggestion:** MESSAGING_HISTORY_SET payload may be misleading if messages/chats are chunked or filtered.

Since `MESSAGING_HISTORY_SET` is only sent when `progress === 100`, `messageCount` and `chatCount` currently reflect `messagesRaw.length` / `chatsRaw.length` for that final batch only. If sync uses batching, filtering, or de-duplication, consumers may misinterpret these as global history totals. Either track and emit cumulative counts over the whole sync, or document that the event exposes counts for the final (or current) chunk only to avoid misuse downstream.

Suggested implementation:

```typescript
        if (progress === 100) {
          this.sendDataWebhook(Events.MESSAGING_HISTORY_SET, {
            // cumulative counts over the entire history sync
            messageCount: this.messagingHistoryTotalMessageCount,
            chatCount: this.messagingHistoryTotalChatCount,
            contactCount: this.messagingHistoryTotalContactCount ?? contacts?.length ?? 0,
          });
        }

```

To fully implement cumulative counts and make the payload semantically correct, you should also:

1. **Add fields to the WhatsAppBaileysService class** (or the relevant service class that owns this code):
   - Initialize at class level, e.g.:
     - `private messagingHistoryTotalMessageCount = 0;`
     - `private messagingHistoryTotalChatCount = 0;`
     - `private messagingHistoryTotalContactCount = 0;`
2. **Reset the counters when a new history sync starts**:
   - At the beginning of the history sync workflow (where `progress` likely starts at `0` or where a sync request is initiated), explicitly set the counters to `0`:
     - `this.messagingHistoryTotalMessageCount = 0;`
     - `this.messagingHistoryTotalChatCount = 0;`
     - `this.messagingHistoryTotalContactCount = 0;`
3. **Increment the counters for each batch**:
   - Wherever `messagesRaw`, `chatsRaw`, and `contacts` for the current chunk are populated (before this snippet), increment the totals:
     - `this.messagingHistoryTotalMessageCount += messagesRaw.length;`
     - `this.messagingHistoryTotalChatCount += chatsRaw.length;`
     - For contacts, either:
       - Use unique contacts only (e.g. increment by the number of *new* contacts discovered in this batch), or
       - If deduplication is already done, `this.messagingHistoryTotalContactCount += contacts.length;`
   - Make sure this logic aligns with any existing filtering/de-duplication so that the totals represent what downstream consumers expect.
4. **Optional documentation**:
   - Add a short doc comment on the `MESSAGING_HISTORY_SET` handler or event definition clarifying that `messageCount`, `chatCount`, and `contactCount` are cumulative over the entire sync, not per-batch, so consumers don’t misinterpret the payload.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Track message, chat and contact counts across all history sync batches
using instance-level counters, so the final event reports accurate
totals instead of only the last batch counts.

Addresses Sourcery review feedback on PR EvolutionAPI#2440.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant