Conversation
|
👋 thomaska, thanks for creating this pull request! To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team. Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks! |
There was a problem hiding this comment.
Pull request overview
This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.
Changes:
- Introduced
ChipIngressBatchEmitterwith per-(domain, entity) worker goroutines for batching events - Added
chipIngressEmitterWorkerto handle batch assembly and sending with configurable timeouts - Removed goroutine wrapper from
DualSourceEmitter.Emit()since batching is now non-blocking (channel send) - Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/beholder/chip_ingress_batch_emitter.go | New batch emitter with per-worker buffering and periodic flushing via PublishBatch |
| pkg/beholder/chip_ingress_emitter_worker.go | Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops |
| pkg/beholder/chip_ingress_batch_emitter_test.go | Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults |
| pkg/beholder/dual_source_emitter.go | Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking |
| pkg/beholder/client.go | Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering |
| pkg/beholder/config.go | Added 4 new config fields with inline documentation and default values |
| pkg/beholder/config_test.go | Updated expected output to include new config fields |
Comments suppressed due to low confidence (2)
pkg/beholder/config.go:50
- The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.
pkg/beholder/client.go:248
- The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
✅ API Diff Results -
|
| return nil, err | ||
| } | ||
|
|
||
| chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) |
| // via chipingress.Client.PublishBatch on a periodic interval. | ||
| // It satisfies the Emitter interface so it can be used as a drop-in replacement | ||
| // for ChipIngressEmitter. | ||
| type ChipIngressBatchEmitter struct { |
| return e, nil | ||
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitter) start(_ context.Context) error { |
There was a problem hiding this comment.
what's the role of this function if it always returns null?
There was a problem hiding this comment.
it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.
|
|
||
| // NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client. | ||
| // Call Start() to begin health monitoring, and Close() to stop all workers. | ||
| func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) { |
There was a problem hiding this comment.
this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor
| services.Service | ||
| eng *services.Engine | ||
|
|
||
| client chipingress.Client |
There was a problem hiding this comment.
func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
define an interface as the client
| if cfg.ChipIngressBatchEmitterEnabled { | ||
| // TODO: accept a logger from the caller instead of creating a new root logger, | ||
| // so batch emitter logs respect the node's logging configuration. | ||
| lggr, lErr := ccllogger.New() |
There was a problem hiding this comment.
this shouldn't be done, use the node's logger
Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
Replace the per-event
ChipIngressEmitterwithChipIngressBatchEmittertobatch events sent to ChIP Ingress via
PublishBatch. Reduces N gRPC calls +N Kafka transactions to 1 call + 1 transaction per flush interval.
Batching:
PublishBatchon a configurable intervalEmit()(channel send); drops with exponential-backoff logging when buffer is fullselect/defaultto avoid blocking on concurrent readsResilience (parity with OTLP path):
PublishBatchfailure (default: 5s/30s/1m, matching OTel SDK)Observability:
events_sent,events_dropped,batch_retries,batch_failures,events_drained, all withdomain/entityattributesConfig:
ChipIngressBufferSize(default 100),ChipIngressMaxBatchSize(default 50),ChipIngressSendInterval(default 500ms),ChipIngressSendTimeout(default 10s)ChipIngressRetryConfig(default 5s/30s/1m),ChipIngressDrainTimeout(default 5s)ChipIngressBatchEmitterEnabledfeature flag (defaultfalse-- opt-in)Other:
DualSourceEmitter: removed goroutine wrapper sinceEmit()is non-blockingWhy
The old
ChipIngressEmittercalledPublishper event with no retry, no drain, and no buffering. Under load, this created unnecessary gRPC/Kafka overhead and silently lost events on any transient failure or shutdown.PublishBatchamortizes the cost and the added retry/drain mechanisms bring chip-ingress delivery resilience to parity with the existing OTLP path.Supports
smartcontractkit/chainlink#21327