Skip to content

Chipingress publish batch#1862

Open
thomaska wants to merge 18 commits intomainfrom
infoplat-3436-chipingress-publishBatch
Open

Chipingress publish batch#1862
thomaska wants to merge 18 commits intomainfrom
infoplat-3436-chipingress-publishBatch

Conversation

@thomaska
Copy link

@thomaska thomaska commented Feb 27, 2026

Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436

Summary

Replace the per-event ChipIngressEmitter with ChipIngressBatchEmitter to
batch events sent to ChIP Ingress via PublishBatch. Reduces N gRPC calls +
N Kafka transactions to 1 call + 1 transaction per flush interval.

Batching:

  • Per-(domain, entity) worker buffers events in a channel, flushes via PublishBatch on a configurable interval
  • Non-blocking Emit() (channel send); drops with exponential-backoff logging when buffer is full
  • Non-blocking channel drain via select/default to avoid blocking on concurrent reads

Resilience (parity with OTLP path):

  • Retry with exponential backoff + jitter on PublishBatch failure (default: 5s/30s/1m, matching OTel SDK)
  • Graceful drain on shutdown -- flushes buffered events with a configurable timeout before stopping
  • Drain continues attempting subsequent batches even if one fails

Observability:

  • OTel counters: events_sent, events_dropped, batch_retries, batch_failures, events_drained, all with domain/entity attributes

Config:

  • ChipIngressBufferSize (default 100), ChipIngressMaxBatchSize (default 50), ChipIngressSendInterval (default 500ms), ChipIngressSendTimeout (default 10s)
  • ChipIngressRetryConfig (default 5s/30s/1m), ChipIngressDrainTimeout (default 5s)
  • ChipIngressBatchEmitterEnabled feature flag (default false -- opt-in)

Other:

  • Simplified DualSourceEmitter: removed goroutine wrapper since Emit() is non-blocking
  • 17 tests covering batching, retry success/exhaustion, graceful drain, drain timeout, drain with partial failure, shutdown during backoff, buffer-full drops, per-domain isolation, context cancellation, and config defaults

Why

The old ChipIngressEmitter called Publish per event with no retry, no drain, and no buffering. Under load, this created unnecessary gRPC/Kafka overhead and silently lost events on any transient failure or shutdown. PublishBatch amortizes the cost and the added retry/drain mechanisms bring chip-ingress delivery resilience to parity with the existing OTLP path.

Supports

smartcontractkit/chainlink#21327

Copilot AI review requested due to automatic review settings February 27, 2026 14:43
@thomaska thomaska requested a review from a team as a code owner February 27, 2026 14:43
@github-actions
Copy link

👋 thomaska, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.

Changes:

  • Introduced ChipIngressBatchEmitter with per-(domain, entity) worker goroutines for batching events
  • Added chipIngressEmitterWorker to handle batch assembly and sending with configurable timeouts
  • Removed goroutine wrapper from DualSourceEmitter.Emit() since batching is now non-blocking (channel send)
  • Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
pkg/beholder/chip_ingress_batch_emitter.go New batch emitter with per-worker buffering and periodic flushing via PublishBatch
pkg/beholder/chip_ingress_emitter_worker.go Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops
pkg/beholder/chip_ingress_batch_emitter_test.go Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults
pkg/beholder/dual_source_emitter.go Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking
pkg/beholder/client.go Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering
pkg/beholder/config.go Added 4 new config fields with inline documentation and default values
pkg/beholder/config_test.go Updated expected output to include new config fields
Comments suppressed due to low confidence (2)

pkg/beholder/config.go:50

  • The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
	ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.

pkg/beholder/client.go:248

  • The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
		for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions
Copy link

github-actions bot commented Feb 27, 2026

✅ API Diff Results - github.com/smartcontractkit/chainlink-common

✅ Compatible Changes (17)

pkg/beholder (2)
  • ChipIngressBatchEmitter — ➕ Added

  • NewChipIngressBatchEmitter — ➕ Added

pkg/beholder.Config (7)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxWorkers — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/beholder.writerClientConfig (7)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxWorkers — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/loop.EnvConfig (1)
  • ChipIngressBatchEmitterEnabled — ➕ Added

📄 View full apidiff report

@smartcontractkit smartcontractkit deleted a comment from github-actions bot Feb 27, 2026
return nil, err
}

chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a feature flag

// via chipingress.Client.PublishBatch on a periodic interval.
// It satisfies the Emitter interface so it can be used as a drop-in replacement
// for ChipIngressEmitter.
type ChipIngressBatchEmitter struct {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name it Service

return e, nil
}

func (e *ChipIngressBatchEmitter) start(_ context.Context) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the role of this function if it always returns null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.


// NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client.
// Call Start() to begin health monitoring, and Close() to stop all workers.
func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor

@thomaska thomaska requested a review from a team as a code owner March 2, 2026 11:18
services.Service
eng *services.Engine

client chipingress.Client
Copy link
Author

@thomaska thomaska Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {

define an interface as the client

if cfg.ChipIngressBatchEmitterEnabled {
// TODO: accept a logger from the caller instead of creating a new root logger,
// so batch emitter logs respect the node's logging configuration.
lggr, lErr := ccllogger.New()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be done, use the node's logger

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants