diff --git a/apps/server/src/git/Layers/GitCore.test.ts b/apps/server/src/git/Layers/GitCore.test.ts index 635b9e8bc..dc97b9364 100644 --- a/apps/server/src/git/Layers/GitCore.test.ts +++ b/apps/server/src/git/Layers/GitCore.test.ts @@ -415,6 +415,9 @@ it.layer(TestLayer)("git integration", (it) => { it.effect("refreshes upstream behind count after checkout when remote branch advanced", () => Effect.gen(function* () { + const services = yield* Effect.services(); + const runPromise = Effect.runPromiseWith(services); + const remote = yield* makeTmpDir(); const source = yield* makeTmpDir(); const clone = yield* makeTmpDir(); @@ -449,7 +452,7 @@ it.layer(TestLayer)("git integration", (it) => { const core = yield* GitCore; yield* Effect.promise(() => vi.waitFor(async () => { - const details = await Effect.runPromise(core.statusDetails(source)); + const details = await runPromise(core.statusDetails(source)); expect(details.branch).toBe(featureBranch); expect(details.aheadCount).toBe(0); expect(details.behindCount).toBe(1); diff --git a/apps/server/src/main.test.ts b/apps/server/src/main.test.ts index c644b4778..0279e78f7 100644 --- a/apps/server/src/main.test.ts +++ b/apps/server/src/main.test.ts @@ -368,7 +368,7 @@ it.layer(testLayer)("server CLI command", (it) => { it.effect("does not start server for invalid --mode values", () => Effect.gen(function* () { - yield* runCli(["--mode", "invalid"]); + yield* runCli(["--mode", "invalid"]).pipe(Effect.catch(() => Effect.void)); assert.equal(start.mock.calls.length, 0); assert.equal(stop.mock.calls.length, 0); @@ -386,7 +386,7 @@ it.layer(testLayer)("server CLI command", (it) => { it.effect("does not start server for out-of-range --port values", () => Effect.gen(function* () { - yield* runCli(["--port", "70000"]); + yield* runCli(["--port", "70000"]).pipe(Effect.catch(() => Effect.void)); // effect/unstable/cli renders help/errors for parse failures and returns success. assert.equal(start.mock.calls.length, 0); diff --git a/apps/server/src/persistence/NodeSqliteClient.ts b/apps/server/src/persistence/NodeSqliteClient.ts index 1d6e22d9b..5577ac5b0 100644 --- a/apps/server/src/persistence/NodeSqliteClient.ts +++ b/apps/server/src/persistence/NodeSqliteClient.ts @@ -20,7 +20,7 @@ import * as Stream from "effect/Stream"; import * as Reactivity from "effect/unstable/reactivity/Reactivity"; import * as Client from "effect/unstable/sql/SqlClient"; import type { Connection } from "effect/unstable/sql/SqlConnection"; -import { SqlError } from "effect/unstable/sql/SqlError"; +import { SqlError, classifySqliteError } from "effect/unstable/sql/SqlError"; import * as Statement from "effect/unstable/sql/Statement"; const ATTR_DB_SYSTEM_NAME = "db.system.name"; @@ -29,6 +29,9 @@ export const TypeId: TypeId = "~local/sqlite-node/SqliteClient"; export type TypeId = "~local/sqlite-node/SqliteClient"; +const classifyError = (cause: unknown, message: string, operation: string) => + classifySqliteError(cause, { message, operation }); + /** * SqliteClient - Effect service tag for the sqlite SQL client. */ @@ -109,7 +112,10 @@ const makeWithDatabase = ( lookup: (sql: string) => Effect.try({ try: () => db.prepare(sql), - catch: (cause) => new SqlError({ cause, message: "Failed to prepare statement" }), + catch: (cause) => + new SqlError({ + reason: classifyError(cause, "Failed to prepare statement", "prepare"), + }), }), }); @@ -127,7 +133,11 @@ const makeWithDatabase = ( const result = statement.run(...(params as any)); return Effect.succeed(raw ? (result as unknown as ReadonlyArray) : []); } catch (cause) { - return Effect.fail(new SqlError({ cause, message: "Failed to execute statement" })); + return Effect.fail( + new SqlError({ + reason: classifyError(cause, "Failed to execute statement", "execute"), + }), + ); } }); @@ -150,7 +160,10 @@ const makeWithDatabase = ( statement.run(...(params as any)); return []; }, - catch: (cause) => new SqlError({ cause, message: "Failed to execute statement" }), + catch: (cause) => + new SqlError({ + reason: classifyError(cause, "Failed to execute statement", "execute"), + }), }), (statement) => Effect.sync(() => { diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts index acb82709f..d064a8239 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts @@ -1100,10 +1100,13 @@ describe("ClaudeAdapterLive", () => { it.effect("closes the session when the Claude stream aborts after a turn starts", () => { const harness = makeHarness(); return Effect.gen(function* () { + const services = yield* Effect.services(); + const runFork = Effect.runForkWith(services); + const adapter = yield* ClaudeAdapter; const runtimeEvents: Array = []; - const runtimeEventsFiber = Effect.runFork( + const runtimeEventsFiber = runFork( Stream.runForEach(adapter.streamEvents, (event) => Effect.sync(() => { runtimeEvents.push(event); @@ -1197,9 +1200,12 @@ describe("ClaudeAdapterLive", () => { ); return Effect.gen(function* () { + const services = yield* Effect.services(); + const runFork = Effect.runForkWith(services); + const adapter = yield* ClaudeAdapter; - const runtimeEventsFiber = Effect.runFork( + const runtimeEventsFiber = runFork( Stream.runForEach(adapter.streamEvents, () => Effect.void), ); diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index d6ff446ad..b0f080118 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -2380,6 +2380,10 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( existingResumeSessionId === undefined ? yield* Random.nextUUIDv4 : undefined; const sessionId = existingResumeSessionId ?? newSessionId; + const services = yield* Effect.services(); + const runFork = Effect.runForkWith(services); + const runPromise = Effect.runPromiseWith(services); + const promptQueue = yield* Queue.unbounded(); const prompt = Stream.fromQueue(promptQueue).pipe( Stream.filter((item) => item.type === "message"), @@ -2461,7 +2465,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( } aborted = true; pendingUserInputs.delete(requestId); - Effect.runFork(Deferred.succeed(answersDeferred, {} as ProviderUserInputAnswers)); + runFork(Deferred.succeed(answersDeferred, {} as ProviderUserInputAnswers)); }; callbackOptions.signal.addEventListener("abort", onAbort, { once: true }); @@ -2607,7 +2611,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( return; } pendingApprovals.delete(requestId); - Effect.runFork(Deferred.succeed(decisionDeferred, "cancel")); + runFork(Deferred.succeed(decisionDeferred, "cancel")); }; callbackOptions.signal.addEventListener("abort", onAbort, { @@ -2662,7 +2666,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( }); const canUseTool: CanUseTool = (toolName, toolInput, callbackOptions) => - Effect.runPromise(canUseToolEffect(toolName, toolInput, callbackOptions)); + runPromise(canUseToolEffect(toolName, toolInput, callbackOptions)); const claudeSettings = yield* serverSettingsService.getSettings.pipe( Effect.map((settings) => settings.providers.claudeAgent), @@ -2813,7 +2817,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( providerRefs: {}, }); - const streamFiber = Effect.runFork(runSdkStream(context)); + const streamFiber = runFork(runSdkStream(context)); context.streamFiber = streamFiber; streamFiber.addObserver((exit) => { if (context.stopped) { @@ -2822,7 +2826,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( if (context.streamFiber === streamFiber) { context.streamFiber = undefined; } - Effect.runFork(handleStreamExit(context, exit)); + runFork(handleStreamExit(context, exit)); }); return { diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index a4f6f987b..7290660cf 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -252,6 +252,11 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< } = serverConfig; const availableEditors = resolveAvailableEditors(); + const runtimeServices = yield* Effect.services< + ServerRuntimeServices | ServerConfig | FileSystem.FileSystem | Path.Path + >(); + const runPromise = Effect.runPromiseWith(runtimeServices); + const gitManager = yield* GitManager; const terminalManager = yield* TerminalManager; const keybindingsManager = yield* Keybindings; @@ -429,7 +434,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< res.end(body); }; - void Effect.runPromise( + void runPromise( Effect.gen(function* () { const url = new URL(req.url ?? "/", `http://localhost:${port}`); if (tryHandleProjectFaviconRequest(url, res)) { @@ -713,13 +718,8 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< ); } - const runtimeServices = yield* Effect.services< - ServerRuntimeServices | ServerConfig | FileSystem.FileSystem | Path.Path - >(); - const runPromise = Effect.runPromiseWith(runtimeServices); - - const unsubscribeTerminalEvents = yield* terminalManager.subscribe( - (event) => void Effect.runPromise(pushBus.publishAll(WS_CHANNELS.terminalEvent, event)), + const unsubscribeTerminalEvents = yield* terminalManager.subscribe((event) => + runPromise(pushBus.publishAll(WS_CHANNELS.terminalEvent, event)), ); yield* Effect.addFinalizer(() => Effect.sync(() => unsubscribeTerminalEvents())); yield* readiness.markTerminalSubscriptionsReady; diff --git a/bun.lock b/bun.lock index 857d3a83c..4c640ba3c 100644 --- a/bun.lock +++ b/bun.lock @@ -173,13 +173,13 @@ "vite": "^8.0.0", }, "catalog": { - "@effect/language-service": "0.75.1", - "@effect/platform-node": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/platform-node@8881a9b", - "@effect/sql-sqlite-bun": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/sql-sqlite-bun@8881a9b", - "@effect/vitest": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/vitest@8881a9b", + "@effect/language-service": "0.84.1", + "@effect/platform-node": "4.0.0-beta.42", + "@effect/sql-sqlite-bun": "4.0.0-beta.42", + "@effect/vitest": "4.0.0-beta.42", "@types/bun": "^1.3.9", "@types/node": "^24.10.13", - "effect": "https://pkg.pr.new/Effect-TS/effect-smol/effect@8881a9b", + "effect": "4.0.0-beta.42", "tsdown": "^0.20.3", "typescript": "^5.7.3", "vitest": "^4.0.0", @@ -265,15 +265,15 @@ "@dnd-kit/utilities": ["@dnd-kit/utilities@3.2.2", "", { "dependencies": { "tslib": "^2.0.0" }, "peerDependencies": { "react": ">=16.8.0" } }, "sha512-+MKAJEOfaBe5SmV6t34p80MMKhjvUz0vRrvVJbPT0WElzaOJ/1xs+D+KDv+tD/NE5ujfrChEcshd4fLn0wpiqg=="], - "@effect/language-service": ["@effect/language-service@0.75.1", "", { "bin": { "effect-language-service": "cli.js" } }, "sha512-g9xD2tAQgRFpYC2YgpZq02VeSL5fBbFJ0B/g1o+14NuNmwtaYJc7SjiLWAA9eyhJHosNrn6h1Ye+Kx6j5mN0AA=="], + "@effect/language-service": ["@effect/language-service@0.84.1", "", { "bin": { "effect-language-service": "cli.js" } }, "sha512-YUqjJU24HeYgPV453cR2fDqkZ+zZKMuxGnmxWAPscWJ6gt6FB7JZohMCOczRTIOGPrQMcloJX7BjCaPu+RNhpw=="], - "@effect/platform-node": ["@effect/platform-node@https://pkg.pr.new/Effect-TS/effect-smol/@effect/platform-node@8881a9b", { "dependencies": { "@effect/platform-node-shared": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/platform-node-shared@8881a9b606d84a6f5eb6615279138322984f5368", "mime": "^4.1.0", "undici": "^7.20.0" }, "peerDependencies": { "effect": "^4.0.0-beta.25", "ioredis": "^5.7.0" } }], + "@effect/platform-node": ["@effect/platform-node@4.0.0-beta.42", "", { "dependencies": { "@effect/platform-node-shared": "^4.0.0-beta.42", "mime": "^4.1.0", "undici": "^7.24.0" }, "peerDependencies": { "effect": "^4.0.0-beta.42", "ioredis": "^5.7.0" } }, "sha512-kbdRML2FBa4q8U8rZQcnmLKZ5zN/z1bAA7t5D1/UsBHZqJgnfRgu1CP6kaEfb1Nie6YyaWshxTktZQryjvW/Yg=="], - "@effect/platform-node-shared": ["@effect/platform-node-shared@https://pkg.pr.new/Effect-TS/effect-smol/@effect/platform-node-shared@8881a9b606d84a6f5eb6615279138322984f5368", { "dependencies": { "@types/ws": "^8.18.1", "ws": "^8.19.0" }, "peerDependencies": { "effect": "^4.0.0-beta.25" } }], + "@effect/platform-node-shared": ["@effect/platform-node-shared@4.0.0-beta.42", "", { "dependencies": { "@types/ws": "^8.18.1", "ws": "^8.19.0" }, "peerDependencies": { "effect": "^4.0.0-beta.42" } }, "sha512-PC+lxLsrwob3+nBChAPrQq32olCeyApgXBvs1NrRsoArLViNT76T/68CttuCAksCZj5e1bZ1ZibLPel3vUmx2g=="], - "@effect/sql-sqlite-bun": ["@effect/sql-sqlite-bun@https://pkg.pr.new/Effect-TS/effect-smol/@effect/sql-sqlite-bun@8881a9b", { "peerDependencies": { "effect": "^4.0.0-beta.25" } }], + "@effect/sql-sqlite-bun": ["@effect/sql-sqlite-bun@4.0.0-beta.42", "", { "peerDependencies": { "effect": "^4.0.0-beta.42" } }, "sha512-Ah2QfkeV+I9r5OBVJijSDnFXCv51giBXngSwhju5gefc0uWiM3G1tsYAqrNX24HlvFFEnOAZqNf/Sq1h4NqOAA=="], - "@effect/vitest": ["@effect/vitest@https://pkg.pr.new/Effect-TS/effect-smol/@effect/vitest@8881a9b", { "peerDependencies": { "effect": "^4.0.0-beta.25", "vitest": "^3.0.0 || ^4.0.0" } }], + "@effect/vitest": ["@effect/vitest@4.0.0-beta.42", "", { "peerDependencies": { "effect": "^4.0.0-beta.42", "vitest": "^3.0.0 || ^4.0.0" } }, "sha512-/11arjUnCRhIrBRvOn/nrbg5p/FadjAPvStddZlpl1VrCxtB2s0n39cbG9uTyDdf1ZrRBG73Upo1ZDF1CTWy8w=="], "@electron/get": ["@electron/get@2.0.3", "", { "dependencies": { "debug": "^4.1.1", "env-paths": "^2.2.0", "fs-extra": "^8.1.0", "got": "^11.8.5", "progress": "^2.0.3", "semver": "^6.2.0", "sumchecker": "^3.0.1" }, "optionalDependencies": { "global-agent": "^3.0.0" } }, "sha512-Qkzpg2s9GnVV2I2BjRksUi43U5e6+zaQMcjoJy0C+C5oxaKl+fmckGDQFtRpZpZV0NQekuZZ+tGz7EA9TVnQtQ=="], @@ -1017,7 +1017,7 @@ "dunder-proto": ["dunder-proto@1.0.1", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.1", "es-errors": "^1.3.0", "gopd": "^1.2.0" } }, "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A=="], - "effect": ["effect@https://pkg.pr.new/Effect-TS/effect-smol/effect@8881a9b", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.5.3", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.8", "multipasta": "^0.2.7", "toml": "^3.0.0", "uuid": "^13.0.0", "yaml": "^2.8.2" } }], + "effect": ["effect@4.0.0-beta.42", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.5.3", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.8", "multipasta": "^0.2.7", "toml": "^3.0.0", "uuid": "^13.0.0", "yaml": "^2.8.2" } }, "sha512-c1UrRP+tLzyHb4Fepl8XBDJlLQLkrcMXrRBba441GQRxMbeQ/aIOSFcBwSda1iMJ5l9F0lYc3Bhe33/whrmavQ=="], "electron": ["electron@40.6.0", "", { "dependencies": { "@electron/get": "^2.0.0", "@types/node": "^24.9.0", "extract-zip": "^2.0.1" }, "bin": { "electron": "cli.js" } }, "sha512-ett8W+yOFGDuM0vhJMamYSkrbV3LoaffzJd9GfjI96zRAxyrNqUSKqBpf/WGbQCweDxX2pkUCUfrv4wwKpsFZA=="], @@ -1905,14 +1905,6 @@ "@babel/types/@babel/helper-validator-identifier": ["@babel/helper-validator-identifier@7.28.5", "", {}, "sha512-qSs4ifwzKJSV39ucNjsvc6WVHs6b7S03sOh2OcHF9UHfVPqWWALUsNUVzhSBiItjRZoLHx7nIarVjqKVusUZ1Q=="], - "@effect/platform-node/effect": ["effect@4.0.0-beta.33", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.5.3", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.8", "multipasta": "^0.2.7", "toml": "^3.0.0", "uuid": "^13.0.0", "yaml": "^2.8.2" } }, "sha512-ln9emWPd1SemokSdOV43r2CbH1j8GTe9qbPvttmh9/j2OR0WNmj7UpjbN34llQgF9QV4IdcN6QdV2w8G7B7RyQ=="], - - "@effect/platform-node-shared/effect": ["effect@4.0.0-beta.33", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.5.3", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.8", "multipasta": "^0.2.7", "toml": "^3.0.0", "uuid": "^13.0.0", "yaml": "^2.8.2" } }, "sha512-ln9emWPd1SemokSdOV43r2CbH1j8GTe9qbPvttmh9/j2OR0WNmj7UpjbN34llQgF9QV4IdcN6QdV2w8G7B7RyQ=="], - - "@effect/sql-sqlite-bun/effect": ["effect@4.0.0-beta.33", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.5.3", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.8", "multipasta": "^0.2.7", "toml": "^3.0.0", "uuid": "^13.0.0", "yaml": "^2.8.2" } }, "sha512-ln9emWPd1SemokSdOV43r2CbH1j8GTe9qbPvttmh9/j2OR0WNmj7UpjbN34llQgF9QV4IdcN6QdV2w8G7B7RyQ=="], - - "@effect/vitest/effect": ["effect@4.0.0-beta.33", "", { "dependencies": { "@standard-schema/spec": "^1.1.0", "fast-check": "^4.5.3", "find-my-way-ts": "^0.1.6", "ini": "^6.0.0", "kubernetes-types": "^1.30.0", "msgpackr": "^1.11.8", "multipasta": "^0.2.7", "toml": "^3.0.0", "uuid": "^13.0.0", "yaml": "^2.8.2" } }, "sha512-ln9emWPd1SemokSdOV43r2CbH1j8GTe9qbPvttmh9/j2OR0WNmj7UpjbN34llQgF9QV4IdcN6QdV2w8G7B7RyQ=="], - "@electron/get/fs-extra": ["fs-extra@8.1.0", "", { "dependencies": { "graceful-fs": "^4.2.0", "jsonfile": "^4.0.0", "universalify": "^0.1.0" } }, "sha512-yhlQgA6mnOJUKOsRUFsgJdQCvkKhcz8tlZG5HBQfReYZy46OwLcY+Zia0mtdHsOo9y/hP+CxMN0TU9QxoOtG4g=="], "@electron/get/semver": ["semver@6.3.1", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA=="], diff --git a/package.json b/package.json index 02e71cf09..a90677996 100644 --- a/package.json +++ b/package.json @@ -8,11 +8,11 @@ "scripts" ], "catalog": { - "effect": "https://pkg.pr.new/Effect-TS/effect-smol/effect@8881a9b", - "@effect/platform-node": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/platform-node@8881a9b", - "@effect/sql-sqlite-bun": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/sql-sqlite-bun@8881a9b", - "@effect/vitest": "https://pkg.pr.new/Effect-TS/effect-smol/@effect/vitest@8881a9b", - "@effect/language-service": "0.75.1", + "effect": "4.0.0-beta.42", + "@effect/platform-node": "4.0.0-beta.42", + "@effect/sql-sqlite-bun": "4.0.0-beta.42", + "@effect/vitest": "4.0.0-beta.42", + "@effect/language-service": "0.84.1", "@types/bun": "^1.3.9", "@types/node": "^24.10.13", "tsdown": "^0.20.3", diff --git a/packages/shared/src/DrainableWorker.ts b/packages/shared/src/DrainableWorker.ts index 6b9ede9f4..7eb311ca5 100644 --- a/packages/shared/src/DrainableWorker.ts +++ b/packages/shared/src/DrainableWorker.ts @@ -1,14 +1,12 @@ /** - * DrainableWorker - A queue-based worker that exposes a `drain()` effect. + * DrainableWorker - A queue-based worker with deterministic `drain()`. * - * Wraps the common `Queue.unbounded` + `Effect.forever` pattern and adds - * a signal that resolves when the queue is empty **and** the current item - * has finished processing. This lets tests replace timing-sensitive - * `Effect.sleep` calls with deterministic `drain()`. + * Tracks outstanding work in STM so `drain()` resolves only when no items + * are queued or in flight. Useful in tests instead of timing-based waits. * * @module DrainableWorker */ -import { Deferred, Effect, Queue, Ref } from "effect"; +import { Effect, TxQueue, TxRef } from "effect"; import type { Scope } from "effect"; export interface DrainableWorker { @@ -39,63 +37,40 @@ export const makeDrainableWorker = ( process: (item: A) => Effect.Effect, ): Effect.Effect, never, Scope.Scope | R> => Effect.gen(function* () { - const queue = yield* Queue.unbounded(); - const initialIdle = yield* Deferred.make(); - yield* Deferred.succeed(initialIdle, undefined).pipe(Effect.orDie); - const state = yield* Ref.make({ - outstanding: 0, - idle: initialIdle, - }); + const ref = yield* TxRef.make(0); - yield* Effect.addFinalizer(() => Queue.shutdown(queue).pipe(Effect.asVoid)); + const queue = yield* Effect.acquireRelease(TxQueue.unbounded(), (queue) => + TxQueue.shutdown(queue), + ); - const finishOne = Ref.modify(state, (current) => { - const remaining = Math.max(0, current.outstanding - 1); - return [ - remaining === 0 ? current.idle : null, - { - outstanding: remaining, - idle: current.idle, - }, - ] as const; - }).pipe( - Effect.flatMap((idle) => - idle === null ? Effect.void : Deferred.succeed(idle, undefined).pipe(Effect.orDie), - ), + const takeItem = Effect.tx( + Effect.gen(function* () { + const item = yield* TxQueue.take(queue); + yield* TxRef.update(ref, (n) => n + 1); + return item; + }), ); - yield* Effect.forkScoped( - Effect.forever( - Queue.take(queue).pipe( - Effect.flatMap((item) => process(item).pipe(Effect.ensuring(finishOne))), - ), + yield* takeItem.pipe( + Effect.flatMap((item) => + process(item).pipe(Effect.ensuring(TxRef.update(ref, (n) => n - 1))), ), + Effect.forever, + Effect.forkScoped, ); - const enqueue: DrainableWorker["enqueue"] = (item) => + const drain: DrainableWorker["drain"] = Effect.tx( Effect.gen(function* () { - const nextIdle = yield* Deferred.make(); - yield* Ref.update(state, (current) => - current.outstanding === 0 - ? { - outstanding: 1, - idle: nextIdle, - } - : { - outstanding: current.outstanding + 1, - idle: current.idle, - }, - ); - - const accepted = yield* Queue.offer(queue, item); - if (!accepted) { - yield* finishOne; + const inFlight = yield* TxRef.get(ref); + const isEmpty = yield* TxQueue.isEmpty(queue); + if (inFlight > 0 || !isEmpty) { + return yield* Effect.txRetry; } - }); - - const drain: DrainableWorker["drain"] = Ref.get(state).pipe( - Effect.flatMap(({ idle }) => Deferred.await(idle)), + }), ); - return { enqueue, drain } satisfies DrainableWorker; + return { + enqueue: (item) => TxQueue.offer(queue, item), + drain, + } satisfies DrainableWorker; });