feat(engine): maxConcurrency backpressure semaphore#19
feat(engine): maxConcurrency backpressure semaphore#19albertgwo wants to merge 2 commits intofeat/engine-pr2-engine-lifecyclefrom
Conversation
Add an optional maxConcurrency option to EngineOptions that limits concurrent execute() calls on a CompiledFlow via a counting semaphore. When all slots are taken, subsequent callers wait until a slot frees up. Zero overhead when omitted (no semaphore created).
Unit tests for the Semaphore class (acquire/release, FIFO ordering, blocking beyond max). Integration tests verifying backpressure behavior: unlimited default, maxConcurrency limiting concurrent execute() calls, and slot release allowing queued executions to proceed.
| export { FlowprintEngine } from './engine.js' | ||
| export { CompiledFlow } from './compiled-flow.js' | ||
| export { Semaphore } from './semaphore.js' |
There was a problem hiding this comment.
@ruminaider/flowprint-engine was changed without a .changeset entry, should we add a Changeset to bump its version and review CLAUDE.md?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/index.ts around lines 1-3 the new export `Semaphore` was
added which changes the public API of the @ruminaider/flowprint-engine package. Add a
new file in packages/engine/.changeset (e.g., add-semaphore.md) that names the changed
package (@ruminaider/flowprint-engine) and specifies a version bump (patch if
backwards-compatible, minor if you consider this a feature), with a short description
like "Export Semaphore from engine" so the release tooling will bump the package and
publish it. Also confirm CLAUDE.md lines 89-91 are satisfied and that the
repository-level .changeset config will pick up the new file.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
| if (options.maxConcurrency != null) { | ||
| this.semaphore = new Semaphore(options.maxConcurrency) |
There was a problem hiding this comment.
options.maxConcurrency is passed into new Semaphore without validation — should we validate or clamp it to a positive integer before constructing the Semaphore and add defensive checks in the Semaphore constructor?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In packages/engine/src/engine/compiled-flow.ts around lines 35 to 36, the constructor
currently does new Semaphore(options.maxConcurrency) without validating the value.
Change this to validate that options.maxConcurrency is a positive integer before
constructing the Semaphore: if it is null/undefined leave semaphore undefined
(unlimited), if it is a positive integer create the semaphore, and if it is
zero/negative or non-integer either clamp to a minimum of 1 or throw a clear
configuration error (prefer throwing to avoid surprising behavior). Also add a short
defensive check in packages/engine/src/engine/semaphore.ts constructor to assert max is
a positive integer and throw on invalid input so invalid configs cannot silently
deadlock the engine.
Heads up!
Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription
Deploying flowprint with
|
| Latest commit: |
0878b15
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://04e75f8a.flowprint.pages.dev |
| Branch Preview URL: | https://feat-engine-pr7-backpressure.flowprint.pages.dev |
Code Review FindingsCritical (90-100 confidence)1. No input validation on
constructor(private readonly max: number) {
if (!Number.isInteger(max) || max < 1) {
throw new Error(`Semaphore max must be a positive integer, got ${max}`)
}
}2.
Important (80-89 confidence)3. Missing test: slot release on handler error (88)
4. Missing changeset file (85)
5.
6.
|
User description
Summary
Dependency
Base: `feat/engine-pr2-engine-lifecycle` (branches from engine core, parallel to PR 3-6)
PR 11 of 16 in the execution engine implementation.
Test plan
Generated description
Below is a concise technical summary of the changes proposed in this PR:
Add control by introducing
Semaphoreand honoringEngineOptions.maxConcurrencyinsideCompiledFlowso engine executions stay within configured concurrency limits while still releasing slots on errors. Cover the semaphore andFlowprintEngineexecution path with unit and integration tests to prove backpressure queues excess work instead of overwhelming resources.FlowprintEnginebackpressure using deferred barriers to prove FIFO queuing and slot freeing behavior under load.Modified files (1)
Latest Contributors(0)
maxConcurrencyviaSemaphoreinCompiledFlow, including acquiring/releasing slots and exposing the option throughEngineOptions, so execute() calls queue instead of overloading resources.Modified files (4)
Latest Contributors(0)