Skip to content

feat(engine): maxConcurrency backpressure semaphore#19

Open
albertgwo wants to merge 2 commits intofeat/engine-pr2-engine-lifecyclefrom
feat/engine-pr7-backpressure
Open

feat(engine): maxConcurrency backpressure semaphore#19
albertgwo wants to merge 2 commits intofeat/engine-pr2-engine-lifecyclefrom
feat/engine-pr7-backpressure

Conversation

@albertgwo
Copy link
Copy Markdown
Contributor

@albertgwo albertgwo commented Mar 19, 2026

User description

Summary

  • Add counting semaphore for limiting concurrent node executions
  • Configurable `maxConcurrency` option on engine
  • Backpressure automatically queues excess work
  • Prevents resource exhaustion in high-parallelism blueprints

Dependency

Base: `feat/engine-pr2-engine-lifecycle` (branches from engine core, parallel to PR 3-6)

PR 11 of 16 in the execution engine implementation.

Test plan

  • Semaphore limits concurrent executions to configured max
  • Queued work executes when slots become available
  • Semaphore correctly handles errors (releases slot on failure)
  • No deadlocks under concurrent load

Generated description

Below is a concise technical summary of the changes proposed in this PR:
Add control by introducing Semaphore and honoring EngineOptions.maxConcurrency inside CompiledFlow so engine executions stay within configured concurrency limits while still releasing slots on errors. Cover the semaphore and FlowprintEngine execution path with unit and integration tests to prove backpressure queues excess work instead of overwhelming resources.

TopicDetails
Integration tests Validate semaphore semantics and FlowprintEngine backpressure using deferred barriers to prove FIFO queuing and slot freeing behavior under load.
Modified files (1)
  • packages/engine/src/__tests__/engine/semaphore.test.ts
Latest Contributors(0)
UserCommitDate
Concurrency cap Enforce maxConcurrency via Semaphore in CompiledFlow, including acquiring/releasing slots and exposing the option through EngineOptions, so execute() calls queue instead of overloading resources.
Modified files (4)
  • packages/engine/src/engine/compiled-flow.ts
  • packages/engine/src/engine/index.ts
  • packages/engine/src/engine/semaphore.ts
  • packages/engine/src/engine/types.ts
Latest Contributors(0)
UserCommitDate
This pull request is reviewed by Baz. Review like a pro on (Baz).

Add an optional maxConcurrency option to EngineOptions that limits
concurrent execute() calls on a CompiledFlow via a counting semaphore.
When all slots are taken, subsequent callers wait until a slot frees up.
Zero overhead when omitted (no semaphore created).
Unit tests for the Semaphore class (acquire/release, FIFO ordering,
blocking beyond max). Integration tests verifying backpressure behavior:
unlimited default, maxConcurrency limiting concurrent execute() calls,
and slot release allowing queued executions to proceed.
Comment on lines 1 to +3
export { FlowprintEngine } from './engine.js'
export { CompiledFlow } from './compiled-flow.js'
export { Semaphore } from './semaphore.js'
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruminaider/flowprint-engine was changed without a .changeset entry, should we add a Changeset to bump its version and review CLAUDE.md?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/index.ts around lines 1-3 the new export `Semaphore` was
added which changes the public API of the @ruminaider/flowprint-engine package. Add a
new file in packages/engine/.changeset (e.g., add-semaphore.md) that names the changed
package (@ruminaider/flowprint-engine) and specifies a version bump (patch if
backwards-compatible, minor if you consider this a feature), with a short description
like "Export Semaphore from engine" so the release tooling will bump the package and
publish it. Also confirm CLAUDE.md lines 89-91 are satisfied and that the
repository-level .changeset config will pick up the new file.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +35 to +36
if (options.maxConcurrency != null) {
this.semaphore = new Semaphore(options.maxConcurrency)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options.maxConcurrency is passed into new Semaphore without validation — should we validate or clamp it to a positive integer before constructing the Semaphore and add defensive checks in the Semaphore constructor?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

In packages/engine/src/engine/compiled-flow.ts around lines 35 to 36, the constructor
currently does new Semaphore(options.maxConcurrency) without validating the value.
Change this to validate that options.maxConcurrency is a positive integer before
constructing the Semaphore: if it is null/undefined leave semaphore undefined
(unlimited), if it is a positive integer create the semaphore, and if it is
zero/negative or non-integer either clamp to a minimum of 1 or throw a clear
configuration error (prefer throwing to avoid surprising behavior). Also add a short
defensive check in packages/engine/src/engine/semaphore.ts constructor to assert max is
a positive integer and throw on invalid input so invalid configs cannot silently
deadlock the engine.

Heads up!

Your free trial ends tomorrow.
To keep getting your PRs reviewed by Baz, update your team's subscription

@cloudflare-workers-and-pages
Copy link
Copy Markdown

Deploying flowprint with  Cloudflare Pages  Cloudflare Pages

Latest commit: 0878b15
Status: ✅  Deploy successful!
Preview URL: https://04e75f8a.flowprint.pages.dev
Branch Preview URL: https://feat-engine-pr7-backpressure.flowprint.pages.dev

View logs

@albertgwo
Copy link
Copy Markdown
Contributor Author

Code Review Findings

Critical (90-100 confidence)

1. No input validation on maxConcurrency0 or negative causes permanent deadlock (95)

  • File: packages/engine/src/engine/semaphore.ts, constructor
  • maxConcurrency: 0 makes this.current < this.max always false — every acquire() queues forever. Negative values same. Non-integers like 2.5 produce unpredictable slot accounting.
  • Fix:
constructor(private readonly max: number) {
  if (!Number.isInteger(max) || max < 1) {
    throw new Error(`Semaphore max must be a positive integer, got ${max}`)
  }
}

2. Semaphore.release() can drive current below zero (92)

  • File: packages/engine/src/engine/semaphore.ts, release() method
  • Extra release() calls make current negative, silently allowing more concurrent slots than max.
  • Fix: Add guard: if (this.current <= 0) throw new Error('release without matching acquire')

Important (80-89 confidence)

3. Missing test: slot release on handler error (88)

  • File: packages/engine/src/__tests__/engine/semaphore.test.ts
  • PR test plan lists "handles errors (releases slot on failure)" but no test covers it. Most important backpressure behavior to verify.

4. Missing changeset file (85)

  • Adds Semaphore and maxConcurrency option to public API.

5. Semaphore exported as public API — questionable surface (82)

  • File: packages/engine/src/engine/index.ts
  • Internal implementation detail of CompiledFlow. Increases semver surface. No documented external use case.
  • Fix: Remove from public exports or add JSDoc + validation first.

6. release() in finally could fire without matching acquire() (80)

  • If acquire() were enhanced with cancellation/timeouts and could throw, release() in finally would corrupt state.
  • Fix: Guard with let acquired = false flag pattern.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant