From b724ca83d8265d001df83415736ea3f75512e117 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Tue, 12 May 2026 17:58:15 -0700 Subject: [PATCH 01/17] fix(container-runtime): block local submits during stashed-op apply PendingStateManager now exposes a one-way apply lifecycle that opens eagerly when stashed state is present and closes when initialMessages drains. ContainerRuntime aggregates this into isReadOnly() and fans it out via notifyReadOnlyState so compliant DDSes skip submits during stashed-op replay. If a DDS bypasses the readonly gate and submits anyway, submit() throws a fatal UsageError at the call site rather than letting a poison entry race the saved-op replay and mismatch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 68 ++++++- .../src/pendingStateManager.ts | 166 +++++++++++++----- .../src/test/containerRuntime.spec.ts | 42 +++++ .../src/test/pendingStateManager.spec.ts | 128 ++++++++++++-- 4 files changed, 343 insertions(+), 61 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 9dec4133dc03..124e7766ea47 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1360,7 +1360,9 @@ export class ContainerRuntime return this._getAttachState(); } - public readonly isReadOnly = (): boolean => this.deltaManager.readOnlyInfo.readonly === true; + public readonly isReadOnly = (): boolean => + this.deltaManager.readOnlyInfo.readonly === true || + this.pendingStateManager.isApplyingStashedOps; /** * Current session schema - defines what options are on & off. @@ -1597,6 +1599,8 @@ export class ContainerRuntime private readonly extensions = new Map(); + public readonly baseLogger: ITelemetryBaseLogger; + /***/ protected constructor( context: IContainerContext, @@ -1610,7 +1614,7 @@ export class ContainerRuntime private readonly runtimeOptions: Readonly, private readonly containerScope: FluidObject, // Create a custom ITelemetryBaseLogger to output telemetry events. - public readonly baseLogger: ITelemetryBaseLogger, + logger: ITelemetryBaseLogger, existing: boolean, blobManagerLoadInfo: IBlobManagerLoadInfo, @@ -1663,15 +1667,20 @@ export class ContainerRuntime this.isSnapshotInstanceOfISnapshot = snapshotWithContents !== undefined; - this.mc = createChildMonitoringContext({ - logger: this.baseLogger, - namespace: "ContainerRuntime", + this.baseLogger = createChildLogger({ + logger, properties: { - all: { - inStagingMode: this.inStagingMode, + error: { + inStagingMode: () => this.inStagingMode, + isApplyingStashedOps: () => this.pendingStateManager?.isApplyingStashedOps, + isReadOnly: () => this.isReadOnly(), }, }, }); + this.mc = createChildMonitoringContext({ + logger: this.baseLogger, + namespace: "ContainerRuntime", + }); // Validate that the Loader is compatible with this Runtime. const maybeLoaderCompatDetailsForRuntime = context as FluidObject; @@ -1840,6 +1849,26 @@ export class ContainerRuntime }, pendingRuntimeState?.pending, this.baseLogger, + { + // PSM has flipped `isApplyingStashedOps` to true; `isReadOnly()` + // now returns true. Fan out the new readonly state to data stores + // so DDSes see it during stashed-op replay and skip submits that + // would race the replay (e.g. realize-time writes that should not + // produce new ops). Fires synchronously from the PSM constructor + // — `notifyReadOnlyState` tolerates `channelCollection` being + // undefined since it isn't constructed until later in this + // constructor; data stores will pick up the readonly state from + // `isReadOnly()` when they're first asked. + onBeforeFirstStashedOpApply: () => { + this.notifyReadOnlyState(); + }, + // PSM has cleared the flag; `isReadOnly()` now reflects the + // network-readonly state again. Fan out so DDSes know they can + // submit once more. + onAfterStashedOpsApplied: () => { + this.notifyReadOnlyState(); + }, + }, ); let outerDeltaManager: IDeltaManagerFull = this.innerDeltaManager; @@ -2912,8 +2941,13 @@ export class ContainerRuntime } } - private readonly notifyReadOnlyState = (readonly: boolean): void => - this.channelCollection.notifyReadOnlyState(readonly); + private readonly notifyReadOnlyState = (): void => + // `channelCollection` may be undefined when invoked from the PSM's + // open hook, which fires from `new PendingStateManager(...)` earlier + // in this constructor than `channelCollection` is assigned. That's + // fine — DDSes created after this point will read the runtime's + // `isReadOnly()` aggregation and start out in the correct state. + this.channelCollection?.notifyReadOnlyState(this.isReadOnly()); public setConnectionState(canSendOps: boolean, clientId?: string): void { this.setConnectionStateToConnectedOrDisconnected(canSendOps, clientId); @@ -4814,6 +4848,22 @@ export class ContainerRuntime ): void { this.verifyNotClosed(); + // Nothing should be submitting while we're replaying stashed ops. + // The runtime is readonly during the apply window (see + // `PendingStateManager._applyLifecycle`), so a compliant DDS skips + // submits. If we land here anyway, a DDS bypassed the readonly gate + // (e.g. a realize-time write that doesn't consult `readOnly`) and + // produced a local op that has no counterpart in the saved-op + // replay — we cannot reconcile the mismatch, so fail fatally. We + // check here (rather than at flush) because outbox flushes are + // deferred and the apply window could close before the offending op + // reaches the pending queue. + if (this.pendingStateManager.isApplyingStashedOps) { + throw new UsageError("Local op submitted during stashed-op apply window", { + messageType: containerRuntimeMessage.type, + }); + } + // There should be no ops in detached container state! assert( this.attachState !== AttachState.Detached, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index b654b2489327..0a91ba9027bc 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -142,6 +142,28 @@ export interface IRuntimeStateHandler { isAttached: () => boolean; } +/** + * Optional hooks invoked at the boundaries of the stashed-op apply lifecycle. + * + * - `onBeforeFirstStashedOpApply` fires synchronously from the PSM constructor + * when stashed state is present (i.e. `initialMessages` is non-empty at + * construction). At that moment `isApplyingStashedOps` is already `true`, + * so observers see the new state. + * - `onAfterStashedOpsApplied` fires synchronously the first time + * `initialMessages` drains during `applyStashedOpsAt`, immediately after + * `isApplyingStashedOps` flips to `false`. + * + * Both hooks fire at most once per PSM lifetime. + * + * Hooks are synchronous: the open hook must fire from a constructor, and the + * close hook fires from a `finally` block where async behavior would complicate + * error propagation. + */ +export interface PendingStateManagerHooks { + onBeforeFirstStashedOpApply?: () => void; + onAfterStashedOpsApplied?: () => void; +} + function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean { const content = JSON.parse(message.content) as Partial; return content.type === "groupedBatch" && content.contents?.length === 0; @@ -368,15 +390,62 @@ export class PendingStateManager implements IDisposable { private readonly logger: ITelemetryLoggerExt; + /** + * One-way lifecycle of the stashed-op apply window: + * "notStarted" → "applying" → "ended" + * + * Transitions are explicit and irreversible: + * - "notStarted" → "applying" in the constructor when stashed state is + * present (i.e. `initialMessages` is non-empty at construction). The + * open is eager so the runtime is readonly from the moment any DDS + * could possibly observe it. + * - "applying" → "ended" the first time {@link applyStashedOpsAt} drains + * `initialMessages`. After that, local edits are safe — they queue + * FIFO behind any remaining `pendingMessages`, preserving server-side + * ordering. + * + * The window never reopens. After "ended", subsequent `applyStashedOpsAt` + * calls (e.g. from late `notifyOpReplay`s) early-return at the empty guard. + * + * `pendingMessages` state is intentionally NOT part of the close condition. + * Those entries are drained transparently by {@link replayPendingStates} + * on connect via resubmit (each pop is matched by a fresh push), so the + * queue size is conserved across resubmit and DDSes can't distinguish a + * resubmit-ack from a normal ack. Holding the window open through resubmit + * would force resubmits to run while the runtime is readonly, which is the + * inverse of what we want ("never resubmit during apply stashed ops"). + * + * An apply error leaves the lifecycle at "applying" because the queue + * isn't drained. That's fine: an error here is fatal for the load, the + * container is unusable, and there's no state to restore. + */ + private _applyLifecycle: "notStarted" | "applying" | "ended" = "notStarted"; + public get isApplyingStashedOps(): boolean { + return this._applyLifecycle === "applying"; + } + + private readonly hooks: PendingStateManagerHooks; + constructor( private readonly stateHandler: IRuntimeStateHandler, stashedLocalState: IPendingLocalState | undefined, logger: ITelemetryBaseLogger, + hooks: PendingStateManagerHooks = {}, ) { this.logger = createChildLogger({ logger }); + this.hooks = hooks; if (stashedLocalState?.pendingStates) { this.initialMessages.push(...stashedLocalState.pendingStates); } + // Open the apply window eagerly if there is any stashed work. The + // runtime fans this out to readonly state so DDSes don't submit while + // we're replaying stashed ops. If a hook throws, we let it propagate to + // the caller of `new PendingStateManager` — at construction time there + // is no apply state to unwind. + if (!this.initialMessages.isEmpty()) { + this._applyLifecycle = "applying"; + this.hooks.onBeforeFirstStashedOpApply?.(); + } } public get disposed(): boolean { @@ -451,50 +520,65 @@ export class PendingStateManager implements IDisposable { * @param seqNum - Sequence number at which to apply ops. Will apply all ops if seqNum is undefined. */ public async applyStashedOpsAt(seqNum?: number): Promise { - // apply stashed ops at sequence number - while (!this.initialMessages.isEmpty()) { - if (seqNum !== undefined) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const peekMessage = this.initialMessages.peekFront()!; - if (peekMessage.referenceSequenceNumber > seqNum) { - break; // nothing left to do at this sequence number - } - if (peekMessage.referenceSequenceNumber < seqNum) { - throw new Error("loaded from snapshot too recent to apply stashed ops"); - } - } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const nextMessage = this.initialMessages.shift()!; - // Nothing to apply if the message is an empty batch. - // We still need to track it for resubmission. - try { - if (isEmptyBatchPendingMessage(nextMessage)) { - nextMessage.localOpMetadata = { - emptyBatch: true, - } satisfies LocalEmptyBatchPlaceholder["localOpMetadata"]; // equivalent to applyStashedOp for empty batch - patchbatchInfo(nextMessage); // Back compat - this.pendingMessages.push(nextMessage); - continue; + if (this.initialMessages.isEmpty()) { + return; + } + + // The apply window was opened eagerly in the constructor when there + // was any stashed work. We close it the first time we drain + // `initialMessages`. An apply error leaves the lifecycle at + // "applying"; the load is fatal so there's no recoverable state. + try { + // apply stashed ops at sequence number + while (!this.initialMessages.isEmpty()) { + if (seqNum !== undefined) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const peekMessage = this.initialMessages.peekFront()!; + if (peekMessage.referenceSequenceNumber > seqNum) { + break; // nothing left to do at this sequence number + } + if (peekMessage.referenceSequenceNumber < seqNum) { + throw new Error("loaded from snapshot too recent to apply stashed ops"); + } } - // applyStashedOp will cause the DDS to behave as if it has sent the op but not actually send it - const localOpMetadata = await this.stateHandler.applyStashedOp(nextMessage.content); - if (this.stateHandler.isAttached()) { - nextMessage.localOpMetadata = localOpMetadata; - // NOTE: This runtimeOp has been roundtripped through string, which is technically lossy. - // e.g. At this point, handles are in their encoded form. - nextMessage.runtimeOp = JSON.parse( - nextMessage.content, - ) as LocalContainerRuntimeMessage; - // then we push onto pendingMessages which will cause PendingStateManager to resubmit when we connect - patchbatchInfo(nextMessage); // Back compat - this.pendingMessages.push(nextMessage); - } else { - if (localOpMetadata !== undefined) { - throw new Error("Local Op Metadata must be undefined when not attached"); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const nextMessage = this.initialMessages.shift()!; + // Nothing to apply if the message is an empty batch. + // We still need to track it for resubmission. + try { + if (isEmptyBatchPendingMessage(nextMessage)) { + nextMessage.localOpMetadata = { + emptyBatch: true, + } satisfies LocalEmptyBatchPlaceholder["localOpMetadata"]; // equivalent to applyStashedOp for empty batch + patchbatchInfo(nextMessage); // Back compat + this.pendingMessages.push(nextMessage); + continue; + } + // applyStashedOp will cause the DDS to behave as if it has sent the op but not actually send it + const localOpMetadata = await this.stateHandler.applyStashedOp(nextMessage.content); + if (this.stateHandler.isAttached()) { + nextMessage.localOpMetadata = localOpMetadata; + // NOTE: This runtimeOp has been roundtripped through string, which is technically lossy. + // e.g. At this point, handles are in their encoded form. + nextMessage.runtimeOp = JSON.parse( + nextMessage.content, + ) as LocalContainerRuntimeMessage; + // then we push onto pendingMessages which will cause PendingStateManager to resubmit when we connect + patchbatchInfo(nextMessage); // Back compat + this.pendingMessages.push(nextMessage); + } else { + if (localOpMetadata !== undefined) { + throw new Error("Local Op Metadata must be undefined when not attached"); + } } + } catch (error) { + throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage); } - } catch (error) { - throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage); + } + } finally { + if (this._applyLifecycle === "applying" && this.initialMessages.isEmpty()) { + this._applyLifecycle = "ended"; + this.hooks.onAfterStashedOpsApplied?.(); } } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 93e78c4317a2..7ce6b0abe787 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1597,6 +1597,48 @@ describe("Runtime", () => { ); }); }); + + describe("Submit during stashed-op apply", () => { + let containerRuntime: ContainerRuntime; + + beforeEach(async () => { + containerRuntime = await ContainerRuntime.loadRuntime2({ + context: getMockContext() as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + requestHandler: undefined, + runtimeOptions: {}, + provideEntryPoint: mockProvideEntryPoint, + }); + }); + + function setApplyingStashedOps(isApplying: boolean): void { + const psm = (containerRuntime as unknown as { pendingStateManager: PendingStateManager }) + .pendingStateManager; + Object.defineProperty(psm, "isApplyingStashedOps", { + configurable: true, + get: () => isApplying, + }); + } + + it("throws a fatal usage error from submitMessage", () => { + setApplyingStashedOps(true); + assert.throws( + () => submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), + (error: IErrorBase) => + error.errorType === ContainerErrorTypes.usageError && + error.message === "Local op submitted during stashed-op apply window", + ); + }); + + it("does not throw when the apply window is closed", () => { + setApplyingStashedOps(false); + assert.doesNotThrow(() => + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), + ); + }); + }); + describe("Supports mixin classes", () => { it("new loadRuntime2 method works", async () => { const makeMixin = ( diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 21f581c8aee1..14e2b5994898 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -834,6 +834,106 @@ describe("Pending State Manager", () => { assert.strictEqual(applyStashedOps.length, 0); assert.strictEqual(pendingStateManager.pendingMessagesCount, 1); }); + + describe("apply lifecycle", () => { + const stashedMessage = (refSeq: number, csn: number): IPendingMessage => ({ + type: "message", + content: '{"type":"component"}', + referenceSequenceNumber: refSeq, + localOpMetadata: undefined, + opMetadata: undefined, + batchInfo: { clientId: "CLIENT_ID", batchStartCsn: csn, length: 1, staged: false }, + runtimeOp: undefined, + }); + + const stateHandler = (): IRuntimeStateHandler => ({ + applyStashedOp: async () => undefined, + clientId: () => "clientId", + connected: () => true, + reSubmitBatch: () => {}, + isActiveConnection: () => false, + isAttached: () => true, + }); + + it("eagerly enters apply window in constructor when stashed state present", () => { + let beforeCount = 0; + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1)] }, + logger, + { + onBeforeFirstStashedOpApply: () => beforeCount++, + onAfterStashedOpsApplied: () => afterCount++, + }, + ); + assert.strictEqual(psm.isApplyingStashedOps, true); + assert.strictEqual(beforeCount, 1); + assert.strictEqual(afterCount, 0); + }); + + it("does not enter apply window when no stashed state", () => { + let beforeCount = 0; + let afterCount = 0; + const psm = new PendingStateManager(stateHandler(), undefined, logger, { + onBeforeFirstStashedOpApply: () => beforeCount++, + onAfterStashedOpsApplied: () => afterCount++, + }); + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(beforeCount, 0); + assert.strictEqual(afterCount, 0); + }); + + it("closes apply window when initialMessages drains", async () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1), stashedMessage(11, 2)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + assert.strictEqual(psm.isApplyingStashedOps, true); + await psm.applyStashedOpsAt(); + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(afterCount, 1); + }); + + it("stays in apply window across partial drains", async () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1), stashedMessage(11, 2)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ) as unknown as PendingStateManager_WithPrivates; + + await (psm as unknown as PendingStateManager).applyStashedOpsAt(10); + assert.strictEqual(psm.isApplyingStashedOps, true); + assert.strictEqual(afterCount, 0); + assert.strictEqual(psm.initialMessages.length, 1); + + await (psm as unknown as PendingStateManager).applyStashedOpsAt(11); + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(afterCount, 1); + assert.strictEqual(psm.initialMessages.length, 0); + }); + + it("close hook fires exactly once even with repeated applyStashedOpsAt calls", async () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + await psm.applyStashedOpsAt(); + await psm.applyStashedOpsAt(); + await psm.applyStashedOpsAt(100); + assert.strictEqual(afterCount, 1); + assert.strictEqual(psm.isApplyingStashedOps, false); + }); + + }); }); describe("Pending messages state", () => { @@ -930,18 +1030,24 @@ describe("Pending State Manager", () => { it("has both pending messages and initial messages", () => { const pendingStateManager = createPendingStateManager(forInitialMessages); - // let each message be its own batch + // Flushing while initialMessages is non-empty is a usage error (apply + // window is open), so push directly into the private pendingMessages + // queue to set up the dual-populated state under test. for (const message of forFlushedMessages) { - pendingStateManager.onFlushBatch( - [ - { - runtimeOp: message.runtimeOp, - referenceSequenceNumber: message.referenceSequenceNumber, - }, - ], - 0, - false /* staged */, - ); + pendingStateManager.pendingMessages.push({ + type: "message", + content: '{"type":"component"}', + referenceSequenceNumber: message.referenceSequenceNumber, + localOpMetadata: undefined, + opMetadata: undefined, + batchInfo: { + clientId: "CLIENT_ID", + batchStartCsn: 0, + length: 1, + staged: false, + }, + runtimeOp: message.runtimeOp, + }); } assert.strictEqual( pendingStateManager.hasPendingMessages(), From 7b813f4749fcc8aeab6992a0a44724997898431d Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Tue, 12 May 2026 19:09:28 -0700 Subject: [PATCH 02/17] fix: address deep-review correctness bugs in stashed-op apply lifecycle isReadOnly() now uses optional-chain on pendingStateManager since the baseLogger evaluates this lazy property in a constructor window before the PSM is assigned, which would throw TypeError on error telemetry. applyStashedOpsAt() now tracks loop completion explicitly; a failure on the last stashed op no longer flips the lifecycle to "ended" (the failing message is shifted off initialMessages before apply runs, so the queue otherwise looks drained). New regression test covers the failure-on-last-op case. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 6 +++++- .../src/pendingStateManager.ts | 17 +++++++++++++---- .../src/test/containerRuntime.spec.ts | 8 +++++--- .../src/test/pendingStateManager.spec.ts | 18 ++++++++++++++++++ 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 124e7766ea47..f77196848c74 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1361,8 +1361,12 @@ export class ContainerRuntime } public readonly isReadOnly = (): boolean => + // `pendingStateManager` is assigned partway through the constructor; + // `baseLogger` is built earlier and stamps `isReadOnly` on error + // telemetry, so this can be called before the PSM exists. Optional + // chain keeps that window safe. this.deltaManager.readOnlyInfo.readonly === true || - this.pendingStateManager.isApplyingStashedOps; + this.pendingStateManager?.isApplyingStashedOps === true; /** * Current session schema - defines what options are on & off. diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 0a91ba9027bc..5cd792bf4e57 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -525,9 +525,13 @@ export class PendingStateManager implements IDisposable { } // The apply window was opened eagerly in the constructor when there - // was any stashed work. We close it the first time we drain - // `initialMessages`. An apply error leaves the lifecycle at - // "applying"; the load is fatal so there's no recoverable state. + // was any stashed work. We close it on full successful drain only — + // `loopCompleted` distinguishes a clean exit from an apply error, + // since a message is shifted off `initialMessages` *before* it is + // applied, so on error `initialMessages.isEmpty()` would otherwise + // look like a successful drain. An apply error leaves the lifecycle + // at "applying"; the load is fatal so there's no recoverable state. + let loopCompleted = false; try { // apply stashed ops at sequence number while (!this.initialMessages.isEmpty()) { @@ -575,8 +579,13 @@ export class PendingStateManager implements IDisposable { throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage); } } + loopCompleted = true; } finally { - if (this._applyLifecycle === "applying" && this.initialMessages.isEmpty()) { + if ( + loopCompleted && + this._applyLifecycle === "applying" && + this.initialMessages.isEmpty() + ) { this._applyLifecycle = "ended"; this.hooks.onAfterStashedOpsApplied?.(); } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 7ce6b0abe787..6fb2f49a8a10 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1602,7 +1602,7 @@ describe("Runtime", () => { let containerRuntime: ContainerRuntime; beforeEach(async () => { - containerRuntime = await ContainerRuntime.loadRuntime2({ + const { runtime } = await ContainerRuntime.loadRuntime2({ context: getMockContext() as IContainerContext, registry: new FluidDataStoreRegistry([]), existing: false, @@ -1610,11 +1610,13 @@ describe("Runtime", () => { runtimeOptions: {}, provideEntryPoint: mockProvideEntryPoint, }); + containerRuntime = runtime; }); function setApplyingStashedOps(isApplying: boolean): void { - const psm = (containerRuntime as unknown as { pendingStateManager: PendingStateManager }) - .pendingStateManager; + const psm = ( + containerRuntime as unknown as { pendingStateManager: PendingStateManager } + ).pendingStateManager; Object.defineProperty(psm, "isApplyingStashedOps", { configurable: true, get: () => isApplying, diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 14e2b5994898..8ac704c66d09 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -933,6 +933,24 @@ describe("Pending State Manager", () => { assert.strictEqual(psm.isApplyingStashedOps, false); }); + it("leaves lifecycle in 'applying' and does not fire close hook when the last apply throws", async () => { + let afterCount = 0; + const failingHandler: IRuntimeStateHandler = { + ...stateHandler(), + applyStashedOp: async () => { + throw new Error("apply failed"); + }, + }; + const psm = new PendingStateManager( + failingHandler, + { pendingStates: [stashedMessage(10, 1)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + await assert.rejects(psm.applyStashedOpsAt()); + assert.strictEqual(psm.isApplyingStashedOps, true); + assert.strictEqual(afterCount, 0); + }); }); }); From 35487d44167617af4f5ae9e535d3ff4789a738a6 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Tue, 12 May 2026 19:45:33 -0700 Subject: [PATCH 03/17] fix(container-runtime): satisfy jsdoc/check-indentation on PSM docstrings ESLint rule flagged indented continuation lines inside two block comments on PendingStateManager. Reflowed both: the hooks-interface comment uses separate paragraphs instead of a bulleted list with indented continuations, and the lifecycle comment runs as flat prose instead of indented bullets. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/pendingStateManager.ts | 61 +++++++++---------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 5cd792bf4e57..90cd2d43585c 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -145,19 +145,20 @@ export interface IRuntimeStateHandler { /** * Optional hooks invoked at the boundaries of the stashed-op apply lifecycle. * - * - `onBeforeFirstStashedOpApply` fires synchronously from the PSM constructor - * when stashed state is present (i.e. `initialMessages` is non-empty at - * construction). At that moment `isApplyingStashedOps` is already `true`, - * so observers see the new state. - * - `onAfterStashedOpsApplied` fires synchronously the first time - * `initialMessages` drains during `applyStashedOpsAt`, immediately after - * `isApplyingStashedOps` flips to `false`. + * `onBeforeFirstStashedOpApply` fires synchronously from the PSM constructor + * when stashed state is present (i.e. `initialMessages` is non-empty at + * construction). At that moment `isApplyingStashedOps` is already `true`, so + * observers see the new state. + * + * `onAfterStashedOpsApplied` fires synchronously the first time + * `initialMessages` drains during `applyStashedOpsAt`, immediately after + * `isApplyingStashedOps` flips to `false`. * * Both hooks fire at most once per PSM lifetime. * * Hooks are synchronous: the open hook must fire from a constructor, and the - * close hook fires from a `finally` block where async behavior would complicate - * error propagation. + * close hook fires from a `finally` block where async behavior would + * complicate error propagation. */ export interface PendingStateManagerHooks { onBeforeFirstStashedOpApply?: () => void; @@ -391,33 +392,29 @@ export class PendingStateManager implements IDisposable { private readonly logger: ITelemetryLoggerExt; /** - * One-way lifecycle of the stashed-op apply window: - * "notStarted" → "applying" → "ended" + * One-way lifecycle of the stashed-op apply window: `notStarted` → `applying` → `ended`. * - * Transitions are explicit and irreversible: - * - "notStarted" → "applying" in the constructor when stashed state is - * present (i.e. `initialMessages` is non-empty at construction). The - * open is eager so the runtime is readonly from the moment any DDS - * could possibly observe it. - * - "applying" → "ended" the first time {@link applyStashedOpsAt} drains - * `initialMessages`. After that, local edits are safe — they queue - * FIFO behind any remaining `pendingMessages`, preserving server-side - * ordering. + * Transitions are explicit and irreversible. `notStarted` → `applying` happens in the + * constructor when stashed state is present (i.e. `initialMessages` is non-empty at + * construction). The open is eager so the runtime is readonly from the moment any DDS + * could possibly observe it. `applying` → `ended` happens the first time + * {@link applyStashedOpsAt} drains `initialMessages`. After that, local edits are safe — + * they queue FIFO behind any remaining `pendingMessages`, preserving server-side ordering. * - * The window never reopens. After "ended", subsequent `applyStashedOpsAt` - * calls (e.g. from late `notifyOpReplay`s) early-return at the empty guard. + * The window never reopens. After `ended`, subsequent `applyStashedOpsAt` calls (e.g. + * from late `notifyOpReplay`s) early-return at the empty guard. * - * `pendingMessages` state is intentionally NOT part of the close condition. - * Those entries are drained transparently by {@link replayPendingStates} - * on connect via resubmit (each pop is matched by a fresh push), so the - * queue size is conserved across resubmit and DDSes can't distinguish a - * resubmit-ack from a normal ack. Holding the window open through resubmit - * would force resubmits to run while the runtime is readonly, which is the - * inverse of what we want ("never resubmit during apply stashed ops"). + * `pendingMessages` state is intentionally NOT part of the close condition. Those + * entries are drained transparently by {@link replayPendingStates} on connect via + * resubmit (each pop is matched by a fresh push), so the queue size is conserved across + * resubmit and DDSes can't distinguish a resubmit-ack from a normal ack. Holding the + * window open through resubmit would force resubmits to run while the runtime is + * readonly, which is the inverse of what we want ("never resubmit during apply stashed + * ops"). * - * An apply error leaves the lifecycle at "applying" because the queue - * isn't drained. That's fine: an error here is fatal for the load, the - * container is unusable, and there's no state to restore. + * An apply error leaves the lifecycle at `applying` because the queue isn't drained. + * That's fine: an error here is fatal for the load, the container is unusable, and + * there's no state to restore. */ private _applyLifecycle: "notStarted" | "applying" | "ended" = "notStarted"; public get isApplyingStashedOps(): boolean { From 442d94bf36f12fba31b95c7e0d66ecb7aa98f30a Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Tue, 12 May 2026 22:59:31 -0700 Subject: [PATCH 04/17] fix(container-runtime): isReadOnly must tolerate undefined _deltaManager too MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The baseLogger now stamps isReadOnly on every error event. During construction, error events fire (e.g. layer-compat failures via validateLoaderCompatibility) before _deltaManager is assigned, so the prior fix's optional chain on pendingStateManager wasn't enough — the deltaManager.readOnlyInfo read threw TypeError and masked the layer incompat error, breaking runtimeLayerCompatValidation tests in CI. Switching to this._deltaManager?.readOnlyInfo lets the lazy property return false (well-defined) during the partial-construction window. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../runtime/container-runtime/src/containerRuntime.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f77196848c74..fcac1b4f220c 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1361,11 +1361,12 @@ export class ContainerRuntime } public readonly isReadOnly = (): boolean => - // `pendingStateManager` is assigned partway through the constructor; - // `baseLogger` is built earlier and stamps `isReadOnly` on error - // telemetry, so this can be called before the PSM exists. Optional - // chain keeps that window safe. - this.deltaManager.readOnlyInfo.readonly === true || + // `_deltaManager` and `pendingStateManager` are both assigned partway + // through the constructor; `baseLogger` is built earlier and stamps + // `isReadOnly` on every error event (e.g. layer-compat failures + // during construction), so this can be called before either is + // assigned. Optional chains keep that window safe. + this._deltaManager?.readOnlyInfo.readonly === true || this.pendingStateManager?.isApplyingStashedOps === true; /** From d4053218835eb6ef731a02db62350d486ca4cb91 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 09:34:53 -0700 Subject: [PATCH 05/17] fix: address deep-review threads on stashed-op apply submit guard - Remove `onBeforeFirstStashedOpApply` hook entirely. It was a no-op at the only call site (channelCollection is not yet constructed when the PSM constructor runs), and the misleading comment risked masking a future regression. PSM still flips `_applyLifecycle` eagerly; no fanout fires until the close hook. - Allowlist `BlobAttach` and `IdAllocation` message types in the submit-during-apply guard. `BlobAttach` is produced by `sharePendingBlobs`, which runs before `applyStashedOpsAt` resolves. `IdAllocation` is already excluded by a downstream assert but is allowlisted here defensively. - Add `Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow` kill switch. The UsageError is always constructed; when the kill switch is enabled it is sent as a `SubmitDuringStashedOpApply` error event instead of thrown, leaving an off-switch for production rollout. - Enforce the PSM-documented invariant ("never resubmit during apply stashed ops") with an explicit early-return in `replayPendingStates` when `isApplyingStashedOps` is true. Closes the connection-transition mid-partial-drain hole where `reSubmit` for `Rejoin` / `GC` would have hit the submit guard. New tests cover BlobAttach during apply (allowlisted, no throw), the kill switch (no throw, error event logged), and `replayPendingStates` no-op during apply. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 62 +++++++++++----- .../src/pendingStateManager.ts | 32 ++++---- .../src/test/containerRuntime.spec.ts | 73 +++++++++++++++++-- .../src/test/pendingStateManager.spec.ts | 10 +-- 4 files changed, 129 insertions(+), 48 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index fcac1b4f220c..d1192eb82559 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1855,21 +1855,13 @@ export class ContainerRuntime pendingRuntimeState?.pending, this.baseLogger, { - // PSM has flipped `isApplyingStashedOps` to true; `isReadOnly()` - // now returns true. Fan out the new readonly state to data stores - // so DDSes see it during stashed-op replay and skip submits that - // would race the replay (e.g. realize-time writes that should not - // produce new ops). Fires synchronously from the PSM constructor - // — `notifyReadOnlyState` tolerates `channelCollection` being - // undefined since it isn't constructed until later in this - // constructor; data stores will pick up the readonly state from - // `isReadOnly()` when they're first asked. - onBeforeFirstStashedOpApply: () => { - this.notifyReadOnlyState(); - }, - // PSM has cleared the flag; `isReadOnly()` now reflects the - // network-readonly state again. Fan out so DDSes know they can - // submit once more. + // PSM has cleared `isApplyingStashedOps`; `isReadOnly()` now + // reflects the network-readonly state again. Fan out so DDSes + // know they can submit once more. No open hook is needed — + // the apply window opens before `channelCollection` exists, + // so a fanout there would be a no-op; data stores instead + // pick up the initial readonly state from `isReadOnly()` + // when they're first asked. onAfterStashedOpsApplied: () => { this.notifyReadOnlyState(); }, @@ -2831,6 +2823,17 @@ export class ContainerRuntime return; } + // Never resubmit during the stashed-op apply window. The PSM design + // invariant ("never resubmit during apply stashed ops") is enforced + // here so that a connection-state transition mid-partial-drain cannot + // drive `reSubmit` into `submit()` — which would otherwise be caught + // by the readonly/apply guard. The window will close on the next + // successful `applyStashedOpsAt` drain; whichever connection edge + // follows will re-run replay normally. + if (this.pendingStateManager.isApplyingStashedOps) { + return; + } + // Replaying is an internal operation and we don't want to generate noise while doing it. // So temporarily disable dirty state change events, and save the old state. // When we're done, we'll emit the event if the state changed. @@ -4863,10 +4866,35 @@ export class ContainerRuntime // check here (rather than at flush) because outbox flushes are // deferred and the apply window could close before the offending op // reaches the pending queue. - if (this.pendingStateManager.isApplyingStashedOps) { - throw new UsageError("Local op submitted during stashed-op apply window", { + // + // Allowlist: `BlobAttach` and `IdAllocation` are runtime-internal + // op types that may legitimately fire during apply. `BlobAttach` + // is produced by `sharePendingBlobs`, which is invoked from + // `loadRuntime2` before `applyStashedOpsAt` resolves. `IdAllocation` + // is asserted to bypass `submit()` entirely (see the assert below), + // but is allowlisted here defensively in case that contract shifts. + // + // Kill switch: when `DisableSubmitDuringStashedApplyThrow` is enabled, + // we still construct and surface the error event to telemetry, but + // do not throw — leaves an off-switch if a first- or third-party + // DDS in production quietly bypasses the readonly gate. + if ( + this.pendingStateManager.isApplyingStashedOps && + containerRuntimeMessage.type !== ContainerMessageType.BlobAttach && + containerRuntimeMessage.type !== ContainerMessageType.IdAllocation + ) { + const error = new UsageError("Local op submitted during stashed-op apply window", { messageType: containerRuntimeMessage.type, }); + if ( + this.mc.config.getBoolean( + "Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow", + ) === true + ) { + this.mc.logger.sendErrorEvent({ eventName: "SubmitDuringStashedOpApply" }, error); + } else { + throw error; + } } // There should be no ops in detached container state! diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 90cd2d43585c..f72076c6679b 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -143,25 +143,22 @@ export interface IRuntimeStateHandler { } /** - * Optional hooks invoked at the boundaries of the stashed-op apply lifecycle. - * - * `onBeforeFirstStashedOpApply` fires synchronously from the PSM constructor - * when stashed state is present (i.e. `initialMessages` is non-empty at - * construction). At that moment `isApplyingStashedOps` is already `true`, so - * observers see the new state. + * Optional hooks invoked at the close of the stashed-op apply lifecycle. * * `onAfterStashedOpsApplied` fires synchronously the first time * `initialMessages` drains during `applyStashedOpsAt`, immediately after - * `isApplyingStashedOps` flips to `false`. - * - * Both hooks fire at most once per PSM lifetime. + * `isApplyingStashedOps` flips to `false`. Fires at most once per PSM lifetime. * - * Hooks are synchronous: the open hook must fire from a constructor, and the - * close hook fires from a `finally` block where async behavior would + * Synchronous: fires from a `finally` block where async behavior would * complicate error propagation. + * + * No corresponding open hook is exposed. The apply window is opened eagerly + * in the PSM constructor, but at that point `ContainerRuntime` has not yet + * wired up the downstream observers (`channelCollection` is undefined), so a + * fanout fired from the constructor would be a no-op. Consumers that care + * about the open transition can read `isApplyingStashedOps` directly. */ export interface PendingStateManagerHooks { - onBeforeFirstStashedOpApply?: () => void; onAfterStashedOpsApplied?: () => void; } @@ -435,13 +432,14 @@ export class PendingStateManager implements IDisposable { this.initialMessages.push(...stashedLocalState.pendingStates); } // Open the apply window eagerly if there is any stashed work. The - // runtime fans this out to readonly state so DDSes don't submit while - // we're replaying stashed ops. If a hook throws, we let it propagate to - // the caller of `new PendingStateManager` — at construction time there - // is no apply state to unwind. + // runtime is readonly while `isApplyingStashedOps` is true (see + // `ContainerRuntime.isReadOnly`); compliant DDSes consult `readOnly` + // at realize time and skip submits. No fanout fires here — downstream + // observers (`channelCollection`) are not yet constructed at this + // point in the runtime constructor, and the first real readonly read + // happens after the constructor returns. if (!this.initialMessages.isEmpty()) { this._applyLifecycle = "applying"; - this.hooks.onBeforeFirstStashedOpApply?.(); } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 6fb2f49a8a10..5b511acc8d68 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1600,10 +1600,12 @@ describe("Runtime", () => { describe("Submit during stashed-op apply", () => { let containerRuntime: ContainerRuntime; + let mockLogger: MockLogger; - beforeEach(async () => { + async function createRuntime(settings: Record = {}): Promise { + mockLogger = new MockLogger(); const { runtime } = await ContainerRuntime.loadRuntime2({ - context: getMockContext() as IContainerContext, + context: getMockContext({ logger: mockLogger, settings }) as IContainerContext, registry: new FluidDataStoreRegistry([]), existing: false, requestHandler: undefined, @@ -1611,7 +1613,7 @@ describe("Runtime", () => { provideEntryPoint: mockProvideEntryPoint, }); containerRuntime = runtime; - }); + } function setApplyingStashedOps(isApplying: boolean): void { const psm = ( @@ -1623,7 +1625,24 @@ describe("Runtime", () => { }); } - it("throws a fatal usage error from submitMessage", () => { + function submitBlobAttach(): void { + // Mirrors `sendBlobAttachMessage` in ContainerRuntime; submit() is private. + ( + containerRuntime as unknown as { + submit: ( + message: { type: ContainerMessageType; contents: unknown }, + localOpMetadata: unknown, + metadata: { localId: string; blobId: string }, + ) => void; + } + ).submit({ type: ContainerMessageType.BlobAttach, contents: undefined }, undefined, { + localId: "local-1", + blobId: "blob-1", + }); + } + + it("throws a fatal usage error from submitMessage", async () => { + await createRuntime(); setApplyingStashedOps(true); assert.throws( () => submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), @@ -1633,12 +1652,56 @@ describe("Runtime", () => { ); }); - it("does not throw when the apply window is closed", () => { + it("does not throw when the apply window is closed", async () => { + await createRuntime(); setApplyingStashedOps(false); assert.doesNotThrow(() => submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), ); }); + + it("does not throw for BlobAttach during apply (allowlisted)", async () => { + await createRuntime(); + setApplyingStashedOps(true); + assert.doesNotThrow(() => submitBlobAttach()); + }); + + it("kill switch suppresses throw and logs error event", async () => { + await createRuntime({ + "Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow": true, + }); + setApplyingStashedOps(true); + assert.doesNotThrow(() => + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), + ); + mockLogger.assertMatchAny([ + { + eventName: "ContainerRuntime:SubmitDuringStashedOpApply", + category: "error", + messageType: ContainerMessageType.FluidDataStoreOp, + }, + ]); + }); + + it("replayPendingStates is a no-op during apply", async () => { + await createRuntime(); + setApplyingStashedOps(true); + const psm = ( + containerRuntime as unknown as { pendingStateManager: PendingStateManager } + ).pendingStateManager; + let psmReplayCalls = 0; + psm.replayPendingStates = ((): IPendingMessage["batchInfo"][] => { + psmReplayCalls++; + return []; + }) as PendingStateManager["replayPendingStates"]; + // Force `shouldSendOps()` to return true so the only gate left is the apply check. + (containerRuntime as unknown as { shouldSendOps: () => boolean }).shouldSendOps = + (): boolean => true; + ( + containerRuntime as unknown as { replayPendingStates: () => void } + ).replayPendingStates(); + assert.strictEqual(psmReplayCalls, 0); + }); }); describe("Supports mixin classes", () => { diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 8ac704c66d09..c451512f365b 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -856,31 +856,23 @@ describe("Pending State Manager", () => { }); it("eagerly enters apply window in constructor when stashed state present", () => { - let beforeCount = 0; let afterCount = 0; const psm = new PendingStateManager( stateHandler(), { pendingStates: [stashedMessage(10, 1)] }, logger, - { - onBeforeFirstStashedOpApply: () => beforeCount++, - onAfterStashedOpsApplied: () => afterCount++, - }, + { onAfterStashedOpsApplied: () => afterCount++ }, ); assert.strictEqual(psm.isApplyingStashedOps, true); - assert.strictEqual(beforeCount, 1); assert.strictEqual(afterCount, 0); }); it("does not enter apply window when no stashed state", () => { - let beforeCount = 0; let afterCount = 0; const psm = new PendingStateManager(stateHandler(), undefined, logger, { - onBeforeFirstStashedOpApply: () => beforeCount++, onAfterStashedOpsApplied: () => afterCount++, }); assert.strictEqual(psm.isApplyingStashedOps, false); - assert.strictEqual(beforeCount, 0); assert.strictEqual(afterCount, 0); }); From 25ccf6da0dd5d5162657523b402cf8ec5d1d7bc4 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 14:47:57 -0700 Subject: [PATCH 06/17] fix(container-runtime): re-trigger replay from PSM close hook The `replayPendingStates` apply-window early-return suppresses replay during apply, but `setConnectionState(true)` only fires `replayPendingStates` on the `canSendOps` *edge*. If that edge fires mid-apply (or while stashed entries are still being pushed into `pendingMessages` by the apply loop), the trigger is consumed without effect and nothing re-fires it after the window closes. Have `onAfterStashedOpsApplied` call `replayPendingStates` after `notifyReadOnlyState`. The method self-gates on `shouldSendOps()`, and the PSM sets `_applyLifecycle = "ended"` before firing the hook so the apply-window early-return won't fire either. New test asserts the runtime's `replayPendingStates` is invoked when the close hook fires. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 11 ++++++ .../src/test/containerRuntime.spec.ts | 38 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index d1192eb82559..aa2b7a6e7bb4 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1862,8 +1862,19 @@ export class ContainerRuntime // so a fanout there would be a no-op; data stores instead // pick up the initial readonly state from `isReadOnly()` // when they're first asked. + // + // Also re-trigger replay: stashed entries that the apply loop + // pushed into `pendingMessages` need to be resubmitted, and + // any `setConnectionState(true)` edge that fired mid-apply + // was already consumed (the `replayPendingStates` + // apply-window early-return suppressed it). + // `replayPendingStates` self-gates on `shouldSendOps()`, and + // at this point `isApplyingStashedOps` is already `false` + // (PSM sets `_applyLifecycle = "ended"` before firing this + // hook), so the apply-window early-return won't fire either. onAfterStashedOpsApplied: () => { this.notifyReadOnlyState(); + this.replayPendingStates(); }, }, ); diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 5b511acc8d68..026e5bc32ccf 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1702,6 +1702,44 @@ describe("Runtime", () => { ).replayPendingStates(); assert.strictEqual(psmReplayCalls, 0); }); + + it("close hook re-triggers replayPendingStates after apply drains", async () => { + await createRuntime(); + const psm = ( + containerRuntime as unknown as { + pendingStateManager: PendingStateManager; + } + ).pendingStateManager; + + // Seed an empty-batch stashed entry so the apply loop drains + // without invoking the real `applyStashedOp` handler. + const psmPrivate = psm as unknown as { + initialMessages: { push: (msg: IPendingMessage) => void }; + _applyLifecycle: "notStarted" | "applying" | "ended"; + }; + const emptyBatchStashed = { + type: "message", + referenceSequenceNumber: 0, + content: JSON.stringify({ type: "groupedBatch", contents: [] }), + }; + psmPrivate.initialMessages.push(emptyBatchStashed as IPendingMessage); + psmPrivate._applyLifecycle = "applying"; + + // Spy on the runtime's `replayPendingStates`; assert it fires + // from the close hook even when the PSM didn't see a + // connection-state edge during apply. + let runtimeReplayCalls = 0; + ( + containerRuntime as unknown as { replayPendingStates: () => void } + ).replayPendingStates = (): void => { + runtimeReplayCalls++; + }; + + await psm.applyStashedOpsAt(); + + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(runtimeReplayCalls, 1); + }); }); describe("Supports mixin classes", () => { From e6bcc3a74307c73d7fd209e9c6eb39efdae55362 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 16:03:20 -0700 Subject: [PATCH 07/17] refactor(container-runtime): assert canSendOps doesn't fire mid-apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert the `replayPendingStates` apply-window early-return and the `onAfterStashedOpsApplied` re-trigger. The canSendOps edge in `setConnectionStateCore` is the only call site for `replayPendingStates`, and the loader contract awaits `applyStashedOpsAt` before transitioning the runtime to a write-capable connection — so the mid-apply edge the defensive code was guarding against cannot occur. Replace the early-return with an assert that fails loudly if the invariant is ever violated by a future loader change, surfacing the contract rather than silently swallowing the edge. Remove the two now-obsolete tests (`replayPendingStates is a no-op during apply`, `close hook re-triggers replayPendingStates after apply drains`). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 33 ++++------- .../src/test/containerRuntime.spec.ts | 58 ------------------- 2 files changed, 12 insertions(+), 79 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index aa2b7a6e7bb4..e6f09389f1f4 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1862,19 +1862,8 @@ export class ContainerRuntime // so a fanout there would be a no-op; data stores instead // pick up the initial readonly state from `isReadOnly()` // when they're first asked. - // - // Also re-trigger replay: stashed entries that the apply loop - // pushed into `pendingMessages` need to be resubmitted, and - // any `setConnectionState(true)` edge that fired mid-apply - // was already consumed (the `replayPendingStates` - // apply-window early-return suppressed it). - // `replayPendingStates` self-gates on `shouldSendOps()`, and - // at this point `isApplyingStashedOps` is already `false` - // (PSM sets `_applyLifecycle = "ended"` before firing this - // hook), so the apply-window early-return won't fire either. onAfterStashedOpsApplied: () => { this.notifyReadOnlyState(); - this.replayPendingStates(); }, }, ); @@ -2834,16 +2823,18 @@ export class ContainerRuntime return; } - // Never resubmit during the stashed-op apply window. The PSM design - // invariant ("never resubmit during apply stashed ops") is enforced - // here so that a connection-state transition mid-partial-drain cannot - // drive `reSubmit` into `submit()` — which would otherwise be caught - // by the readonly/apply guard. The window will close on the next - // successful `applyStashedOpsAt` drain; whichever connection edge - // follows will re-run replay normally. - if (this.pendingStateManager.isApplyingStashedOps) { - return; - } + // Invariant: the canSendOps edge in `setConnectionStateCore` — the + // only caller of this method — cannot fire while + // `applyStashedOpsAt` is in flight, because the loader awaits the + // apply before transitioning the runtime to a write-capable + // connection. If this assert ever fires, that contract has changed + // and the submit guard at `submit()` would catch a runtime-internal + // resubmit (`Rejoin`, `GC`, `FluidDataStoreOp`) for an op type + // outside the apply-window allowlist. + assert( + !this.pendingStateManager.isApplyingStashedOps, + "replayPendingStates must not be called during stashed-op apply window", + ); // Replaying is an internal operation and we don't want to generate noise while doing it. // So temporarily disable dirty state change events, and save the old state. diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 026e5bc32ccf..f08ccda87a4b 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1682,64 +1682,6 @@ describe("Runtime", () => { }, ]); }); - - it("replayPendingStates is a no-op during apply", async () => { - await createRuntime(); - setApplyingStashedOps(true); - const psm = ( - containerRuntime as unknown as { pendingStateManager: PendingStateManager } - ).pendingStateManager; - let psmReplayCalls = 0; - psm.replayPendingStates = ((): IPendingMessage["batchInfo"][] => { - psmReplayCalls++; - return []; - }) as PendingStateManager["replayPendingStates"]; - // Force `shouldSendOps()` to return true so the only gate left is the apply check. - (containerRuntime as unknown as { shouldSendOps: () => boolean }).shouldSendOps = - (): boolean => true; - ( - containerRuntime as unknown as { replayPendingStates: () => void } - ).replayPendingStates(); - assert.strictEqual(psmReplayCalls, 0); - }); - - it("close hook re-triggers replayPendingStates after apply drains", async () => { - await createRuntime(); - const psm = ( - containerRuntime as unknown as { - pendingStateManager: PendingStateManager; - } - ).pendingStateManager; - - // Seed an empty-batch stashed entry so the apply loop drains - // without invoking the real `applyStashedOp` handler. - const psmPrivate = psm as unknown as { - initialMessages: { push: (msg: IPendingMessage) => void }; - _applyLifecycle: "notStarted" | "applying" | "ended"; - }; - const emptyBatchStashed = { - type: "message", - referenceSequenceNumber: 0, - content: JSON.stringify({ type: "groupedBatch", contents: [] }), - }; - psmPrivate.initialMessages.push(emptyBatchStashed as IPendingMessage); - psmPrivate._applyLifecycle = "applying"; - - // Spy on the runtime's `replayPendingStates`; assert it fires - // from the close hook even when the PSM didn't see a - // connection-state edge during apply. - let runtimeReplayCalls = 0; - ( - containerRuntime as unknown as { replayPendingStates: () => void } - ).replayPendingStates = (): void => { - runtimeReplayCalls++; - }; - - await psm.applyStashedOpsAt(); - - assert.strictEqual(psm.isApplyingStashedOps, false); - assert.strictEqual(runtimeReplayCalls, 1); - }); }); describe("Supports mixin classes", () => { From d6b5944ba61edafe3dec568f18262683d1fff876 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 16:26:31 -0700 Subject: [PATCH 08/17] refactor: always log submit-during-apply; simplify PSM apply close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - `submit()` now unconditionally sends `SubmitDuringStashedOpApply` as an error event on every bypass; the kill switch `DisableSubmitDuringStashedApplyThrow` only suppresses the throw + close. Telemetry attribution is identical whether or not the kill switch is engaged. - Existing throw test now also asserts the error event fires, so symmetry with the kill-switch test is explicit. - `PendingStateManager.applyStashedOpsAt` no longer uses a `loopCompleted` flag + `try/finally`. The close block (set `_applyLifecycle = "ended"`, fire hook) only ran when the loop completed without throwing — moving it inline after the loop preserves that property and drops the indirection. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 14 +-- .../src/pendingStateManager.ts | 112 ++++++++---------- .../src/test/containerRuntime.spec.ts | 9 +- 3 files changed, 66 insertions(+), 69 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index e6f09389f1f4..beb01b693188 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -4876,10 +4876,11 @@ export class ContainerRuntime // is asserted to bypass `submit()` entirely (see the assert below), // but is allowlisted here defensively in case that contract shifts. // - // Kill switch: when `DisableSubmitDuringStashedApplyThrow` is enabled, - // we still construct and surface the error event to telemetry, but - // do not throw — leaves an off-switch if a first- or third-party - // DDS in production quietly bypasses the readonly gate. + // Always surface the error event to telemetry on a bypass so we can + // attribute incidents regardless of the kill-switch state. The kill + // switch `DisableSubmitDuringStashedApplyThrow` only suppresses the + // throw + container close, leaving an off-switch if a first- or + // third-party DDS in production quietly bypasses the readonly gate. if ( this.pendingStateManager.isApplyingStashedOps && containerRuntimeMessage.type !== ContainerMessageType.BlobAttach && @@ -4888,13 +4889,12 @@ export class ContainerRuntime const error = new UsageError("Local op submitted during stashed-op apply window", { messageType: containerRuntimeMessage.type, }); + this.mc.logger.sendErrorEvent({ eventName: "SubmitDuringStashedOpApply" }, error); if ( this.mc.config.getBoolean( "Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow", - ) === true + ) !== true ) { - this.mc.logger.sendErrorEvent({ eventName: "SubmitDuringStashedOpApply" }, error); - } else { throw error; } } diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index f72076c6679b..1b421293a6cd 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -519,72 +519,62 @@ export class PendingStateManager implements IDisposable { return; } - // The apply window was opened eagerly in the constructor when there - // was any stashed work. We close it on full successful drain only — - // `loopCompleted` distinguishes a clean exit from an apply error, - // since a message is shifted off `initialMessages` *before* it is - // applied, so on error `initialMessages.isEmpty()` would otherwise - // look like a successful drain. An apply error leaves the lifecycle - // at "applying"; the load is fatal so there's no recoverable state. - let loopCompleted = false; - try { - // apply stashed ops at sequence number - while (!this.initialMessages.isEmpty()) { - if (seqNum !== undefined) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const peekMessage = this.initialMessages.peekFront()!; - if (peekMessage.referenceSequenceNumber > seqNum) { - break; // nothing left to do at this sequence number - } - if (peekMessage.referenceSequenceNumber < seqNum) { - throw new Error("loaded from snapshot too recent to apply stashed ops"); - } - } + // apply stashed ops at sequence number + while (!this.initialMessages.isEmpty()) { + if (seqNum !== undefined) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const nextMessage = this.initialMessages.shift()!; - // Nothing to apply if the message is an empty batch. - // We still need to track it for resubmission. - try { - if (isEmptyBatchPendingMessage(nextMessage)) { - nextMessage.localOpMetadata = { - emptyBatch: true, - } satisfies LocalEmptyBatchPlaceholder["localOpMetadata"]; // equivalent to applyStashedOp for empty batch - patchbatchInfo(nextMessage); // Back compat - this.pendingMessages.push(nextMessage); - continue; - } - // applyStashedOp will cause the DDS to behave as if it has sent the op but not actually send it - const localOpMetadata = await this.stateHandler.applyStashedOp(nextMessage.content); - if (this.stateHandler.isAttached()) { - nextMessage.localOpMetadata = localOpMetadata; - // NOTE: This runtimeOp has been roundtripped through string, which is technically lossy. - // e.g. At this point, handles are in their encoded form. - nextMessage.runtimeOp = JSON.parse( - nextMessage.content, - ) as LocalContainerRuntimeMessage; - // then we push onto pendingMessages which will cause PendingStateManager to resubmit when we connect - patchbatchInfo(nextMessage); // Back compat - this.pendingMessages.push(nextMessage); - } else { - if (localOpMetadata !== undefined) { - throw new Error("Local Op Metadata must be undefined when not attached"); - } - } - } catch (error) { - throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage); + const peekMessage = this.initialMessages.peekFront()!; + if (peekMessage.referenceSequenceNumber > seqNum) { + break; // nothing left to do at this sequence number + } + if (peekMessage.referenceSequenceNumber < seqNum) { + throw new Error("loaded from snapshot too recent to apply stashed ops"); } } - loopCompleted = true; - } finally { - if ( - loopCompleted && - this._applyLifecycle === "applying" && - this.initialMessages.isEmpty() - ) { - this._applyLifecycle = "ended"; - this.hooks.onAfterStashedOpsApplied?.(); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const nextMessage = this.initialMessages.shift()!; + // Nothing to apply if the message is an empty batch. + // We still need to track it for resubmission. + try { + if (isEmptyBatchPendingMessage(nextMessage)) { + nextMessage.localOpMetadata = { + emptyBatch: true, + } satisfies LocalEmptyBatchPlaceholder["localOpMetadata"]; // equivalent to applyStashedOp for empty batch + patchbatchInfo(nextMessage); // Back compat + this.pendingMessages.push(nextMessage); + continue; + } + // applyStashedOp will cause the DDS to behave as if it has sent the op but not actually send it + const localOpMetadata = await this.stateHandler.applyStashedOp(nextMessage.content); + if (this.stateHandler.isAttached()) { + nextMessage.localOpMetadata = localOpMetadata; + // NOTE: This runtimeOp has been roundtripped through string, which is technically lossy. + // e.g. At this point, handles are in their encoded form. + nextMessage.runtimeOp = JSON.parse( + nextMessage.content, + ) as LocalContainerRuntimeMessage; + // then we push onto pendingMessages which will cause PendingStateManager to resubmit when we connect + patchbatchInfo(nextMessage); // Back compat + this.pendingMessages.push(nextMessage); + } else { + if (localOpMetadata !== undefined) { + throw new Error("Local Op Metadata must be undefined when not attached"); + } + } + } catch (error) { + throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage); } } + + // The apply window was opened eagerly in the constructor when there + // was any stashed work. We close it on full successful drain only. + // If an apply throws above, control never reaches here and the + // lifecycle stays at "applying" — the load is fatal so there's no + // recoverable state. + if (this._applyLifecycle === "applying" && this.initialMessages.isEmpty()) { + this._applyLifecycle = "ended"; + this.hooks.onAfterStashedOpsApplied?.(); + } } /** diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index f08ccda87a4b..00ba0a8b6701 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1641,7 +1641,7 @@ describe("Runtime", () => { }); } - it("throws a fatal usage error from submitMessage", async () => { + it("throws a fatal usage error from submitMessage and logs", async () => { await createRuntime(); setApplyingStashedOps(true); assert.throws( @@ -1650,6 +1650,13 @@ describe("Runtime", () => { error.errorType === ContainerErrorTypes.usageError && error.message === "Local op submitted during stashed-op apply window", ); + mockLogger.assertMatchAny([ + { + eventName: "ContainerRuntime:SubmitDuringStashedOpApply", + category: "error", + messageType: ContainerMessageType.FluidDataStoreOp, + }, + ]); }); it("does not throw when the apply window is closed", async () => { From 471ab7dfe97a1d86d62171a08aec16a46bff716d Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 16:59:17 -0700 Subject: [PATCH 09/17] test(local-server-tests): cover load-fails when DDS reacts during apply End-to-end coverage for the stashed-op apply submit guard: the original test in `container-runtime` only verified that `submit()` throws when `isApplyingStashedOps` is true. This drives the throw through a realistic listener pattern and asserts the resulting load rejection. Two SharedMaps live in the same data store. A `valueChanged` listener on the primary map performs a cascading `set` on the secondary map. Stashed offline ops replay on the primary during `applyStashedOpsAt`, firing `valueChanged`; the cascading `set` on the secondary reaches `ContainerRuntime.submit()` (the channel-level `stashedOpMd` capture only swallows submits from the channel currently in `applyStashedOp`, not cross-channel writes) and the guard rejects the load. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/submitDuringStashedOpApply.spec.ts | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts new file mode 100644 index 000000000000..c8cb61734993 --- /dev/null +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -0,0 +1,152 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; + +import { ContainerRuntimeFactoryWithDefaultDataStore } from "@fluidframework/aqueduct/internal"; +import { + asLegacyAlpha, + createDetachedContainer, + loadExistingContainer, +} from "@fluidframework/container-loader/internal"; +import { + LocalDocumentServiceFactory, + LocalResolver, +} from "@fluidframework/local-driver/internal"; +import { type ISharedMap, SharedMap } from "@fluidframework/map/internal"; +import type { + IFluidDataStoreChannel, + IFluidDataStoreContext, + IFluidDataStoreFactory, +} from "@fluidframework/runtime-definitions/internal"; +import { LocalDeltaConnectionServer } from "@fluidframework/server-local-server"; +import { LocalCodeLoader, TestFluidObjectFactory } from "@fluidframework/test-utils/internal"; +import type { ITestFluidObject } from "@fluidframework/test-utils/internal"; + +/** + * Wraps an inner {@link IFluidDataStoreFactory} so that on `existing=true` + * loads the data store's two SharedMaps are eagerly realized, and a + * `valueChanged` listener is registered on the "primary" map that performs + * a follow-up `set` on the "secondary" map. + * + * Two maps are needed because the channel-level `stashedOpMd` capture in + * `ChannelDeltaConnection.submit` swallows any submit issued *on the same + * channel* while that channel's `applyStashedOp` is in flight. A submit + * targeting a *different* channel goes through the normal submit path — + * which is exactly the channel that's currently replaying a stashed op + * vs. an event-handler write that propagates to another map. + * + * This models a real-world bug pattern: app code subscribed to DDS + * change events that performs cascading edits across DDSes without + * consulting `readOnly`. + */ +class ReactingMapFactory implements IFluidDataStoreFactory { + public constructor(private readonly inner: IFluidDataStoreFactory) {} + + public get IFluidDataStoreFactory(): IFluidDataStoreFactory { + return this; + } + public get type(): string { + return this.inner.type; + } + + public async instantiateDataStore( + context: IFluidDataStoreContext, + existing: boolean, + ): Promise { + const channel = await this.inner.instantiateDataStore(context, existing); + if (!existing) { + return channel; + } + // Eagerly realize both maps and wire up the cross-map listener. + // `IFluidDataStoreChannel` doesn't expose `getChannel`, but the + // concrete TestFluidObjectFactory runtime does. + const runtimeWithChannels = channel as IFluidDataStoreChannel & { + getChannel(id: string): Promise; + }; + const primary = (await runtimeWithChannels.getChannel("primary")) as ISharedMap; + const secondary = (await runtimeWithChannels.getChannel("secondary")) as ISharedMap; + primary.on("valueChanged", (changed) => { + secondary.set(`mirror:${changed.key}`, "cascaded-value"); + }); + return channel; + } +} + +describe("Submit during stashed-op apply (end-to-end)", () => { + it("rejects load when a valueChanged listener does a cross-map edit during apply", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const documentServiceFactory = new LocalDocumentServiceFactory(deltaConnectionServer); + const urlResolver = new LocalResolver(); + const codeDetails = { package: "test" }; + + // 1. Create the container with two named SharedMaps, attach, write, + // disconnect, write offline, capture pending local state. + const goodFactory = new TestFluidObjectFactory( + [ + ["primary", SharedMap.getFactory()], + ["secondary", SharedMap.getFactory()], + ], + "default", + ); + const goodRuntimeFactory = new ContainerRuntimeFactoryWithDefaultDataStore({ + defaultFactory: goodFactory, + registryEntries: [[goodFactory.type, Promise.resolve(goodFactory)]], + }); + const goodCodeLoader = new LocalCodeLoader([[codeDetails, goodRuntimeFactory]]); + + const container = asLegacyAlpha( + await createDetachedContainer({ + codeDetails, + codeLoader: goodCodeLoader, + documentServiceFactory, + urlResolver, + }), + ); + + const initialObject = (await container.getEntryPoint()) as ITestFluidObject; + const primary = await initialObject.getSharedObject("primary"); + primary.set("pre-attach", "value"); + + await container.attach(urlResolver.createCreateNewRequest("submit-during-apply")); + primary.set("attached", "value"); + + const url = await container.getAbsoluteUrl(""); + assert(url !== undefined, "container should have a URL after attach"); + + container.disconnect(); + primary.set("offline", "value"); + + const pendingLocalState = await container.getPendingLocalState(); + container.close(); + + // 2. Build a separate loader that, on existing=true loads, wires up a + // valueChanged listener on `primary` that performs a cascading set + // on `secondary`. + const reactingFactory = new ReactingMapFactory(goodFactory); + const reactingRuntimeFactory = new ContainerRuntimeFactoryWithDefaultDataStore({ + defaultFactory: reactingFactory, + registryEntries: [[reactingFactory.type, Promise.resolve(reactingFactory)]], + }); + const reactingCodeLoader = new LocalCodeLoader([[codeDetails, reactingRuntimeFactory]]); + + // 3. The stashed `offline` op fires `valueChanged` on `primary` during + // apply; the listener's `secondary.set` reaches the runtime's + // submit guard (a different channel from the one in applyStashedOp, + // so the channel-level stashedOpMd capture doesn't swallow it), and + // the load rejects. + await assert.rejects( + loadExistingContainer({ + codeLoader: reactingCodeLoader, + documentServiceFactory, + urlResolver, + request: { url }, + pendingLocalState, + }), + (error: Error & { message?: string }) => + error.message?.includes("Local op submitted during stashed-op apply window") === true, + ); + }); +}); From 277ff9c9ae0acf0cbb4441e86fadb3b068c562fd Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 17:08:22 -0700 Subject: [PATCH 10/17] docs(test): label the cross-map listener as an anti-pattern Spell out in the factory's JSDoc that submitting ops from DDS op-event handlers is bad practice and that any cascading write must gate on both `IContainer.readOnlyInfo.readonly` and `IFluidDataStoreRuntime.activeLocalOperationActivity` to be safe during stashed-op replay and rollback. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/submitDuringStashedOpApply.spec.ts | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts index c8cb61734993..6f6a2f749c91 100644 --- a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -26,21 +26,30 @@ import { LocalCodeLoader, TestFluidObjectFactory } from "@fluidframework/test-ut import type { ITestFluidObject } from "@fluidframework/test-utils/internal"; /** - * Wraps an inner {@link IFluidDataStoreFactory} so that on `existing=true` - * loads the data store's two SharedMaps are eagerly realized, and a - * `valueChanged` listener is registered on the "primary" map that performs - * a follow-up `set` on the "secondary" map. + * NOTE — anti-pattern under test, do not copy. * - * Two maps are needed because the channel-level `stashedOpMd` capture in - * `ChannelDeltaConnection.submit` swallows any submit issued *on the same - * channel* while that channel's `applyStashedOp` is in flight. A submit - * targeting a *different* channel goes through the normal submit path — - * which is exactly the channel that's currently replaying a stashed op - * vs. an event-handler write that propagates to another map. + * Submitting ops from inside DDS op-event handlers is bad practice and + * should be avoided. Op events fire during op processing (including + * stashed-op replay and rollback), and a cascading write inside the + * handler can either land at the wrong moment in the op stream or be + * silently dropped. If a cascading write is unavoidable, the handler + * MUST gate on both: + * 1. `IContainer.readOnlyInfo.readonly` (or `ContainerRuntime.isReadOnly()`) + * — surfaced as `true` during stashed-op replay so well-behaved + * handlers can opt out. + * 2. `IFluidDataStoreRuntime.activeLocalOperationActivity` — when set + * (`"applyStashed"` or `"rollback"`), the runtime itself is + * driving the change, not the user, and the handler should not + * react with new ops. * - * This models a real-world bug pattern: app code subscribed to DDS - * change events that performs cascading edits across DDSes without - * consulting `readOnly`. + * This factory deliberately omits both gates so the load rejects with + * the expected `UsageError`. Two SharedMaps are needed because the + * channel-level `stashedOpMd` capture in `ChannelDeltaConnection.submit` + * swallows any submit issued *on the same channel* while that channel's + * `applyStashedOp` is in flight. A submit targeting a *different* + * channel goes through the normal submit path — which is exactly the + * "event handler on map A writes to map B" shape that reaches the + * runtime guard in production. */ class ReactingMapFactory implements IFluidDataStoreFactory { public constructor(private readonly inner: IFluidDataStoreFactory) {} From a41e38eb2d3b7ed741427555f184a23d157920a0 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 17:09:27 -0700 Subject: [PATCH 11/17] docs(test): correct readonly guidance to point at the data store runtime DDS handlers should gate on `IFluidDataStoreRuntime.isReadOnly()` (with the `"readonly"` event as the live signal), not on `IContainer.readOnlyInfo.readonly` / `ContainerRuntime.isReadOnly()`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/test/submitDuringStashedOpApply.spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts index 6f6a2f749c91..3b8d9092561e 100644 --- a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -34,9 +34,9 @@ import type { ITestFluidObject } from "@fluidframework/test-utils/internal"; * handler can either land at the wrong moment in the op stream or be * silently dropped. If a cascading write is unavoidable, the handler * MUST gate on both: - * 1. `IContainer.readOnlyInfo.readonly` (or `ContainerRuntime.isReadOnly()`) - * — surfaced as `true` during stashed-op replay so well-behaved - * handlers can opt out. + * 1. `IFluidDataStoreRuntime.isReadOnly()` — surfaced as `true` + * during stashed-op replay so well-behaved handlers can opt out. + * The `"readonly"` event is the corresponding signal. * 2. `IFluidDataStoreRuntime.activeLocalOperationActivity` — when set * (`"applyStashed"` or `"rollback"`), the runtime itself is * driving the change, not the user, and the handler should not From b2ac7dcbcddd11f2d0f025824473799f4ff9c0f6 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 17:10:11 -0700 Subject: [PATCH 12/17] docs(test): drop the "when readOnly is true" enumeration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Don't list the conditions under which `isReadOnly()` returns `true` — the contract for handlers is simpler: if it's `true`, don't submit. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/test/submitDuringStashedOpApply.spec.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts index 3b8d9092561e..011b75fb1961 100644 --- a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -34,9 +34,8 @@ import type { ITestFluidObject } from "@fluidframework/test-utils/internal"; * handler can either land at the wrong moment in the op stream or be * silently dropped. If a cascading write is unavoidable, the handler * MUST gate on both: - * 1. `IFluidDataStoreRuntime.isReadOnly()` — surfaced as `true` - * during stashed-op replay so well-behaved handlers can opt out. - * The `"readonly"` event is the corresponding signal. + * 1. `IFluidDataStoreRuntime.isReadOnly()` — when `true`, the handler + * must not submit edits. The `"readonly"` event is the live signal. * 2. `IFluidDataStoreRuntime.activeLocalOperationActivity` — when set * (`"applyStashed"` or `"rollback"`), the runtime itself is * driving the change, not the user, and the handler should not From 80b7b9a6f315c455711ed0216138f490cc2aec31 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 17:12:36 -0700 Subject: [PATCH 13/17] refactor(test): use createLoader util to cut loader plumbing Replace the hand-rolled `LocalDocumentServiceFactory` / `LocalResolver` / `LocalCodeLoader` / `ContainerRuntimeFactoryWithDefaultDataStore` setup with `createLoader` from `src/utils.ts`. Same coverage, fewer imports, matches the convention in the rest of `local-server-tests`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/submitDuringStashedOpApply.spec.ts | 47 ++++++++----------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts index 011b75fb1961..fc8d1ffaf57a 100644 --- a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -5,16 +5,11 @@ import { strict as assert } from "assert"; -import { ContainerRuntimeFactoryWithDefaultDataStore } from "@fluidframework/aqueduct/internal"; import { asLegacyAlpha, createDetachedContainer, loadExistingContainer, } from "@fluidframework/container-loader/internal"; -import { - LocalDocumentServiceFactory, - LocalResolver, -} from "@fluidframework/local-driver/internal"; import { type ISharedMap, SharedMap } from "@fluidframework/map/internal"; import type { IFluidDataStoreChannel, @@ -22,9 +17,11 @@ import type { IFluidDataStoreFactory, } from "@fluidframework/runtime-definitions/internal"; import { LocalDeltaConnectionServer } from "@fluidframework/server-local-server"; -import { LocalCodeLoader, TestFluidObjectFactory } from "@fluidframework/test-utils/internal"; +import { TestFluidObjectFactory } from "@fluidframework/test-utils/internal"; import type { ITestFluidObject } from "@fluidframework/test-utils/internal"; +import { createLoader } from "../utils.js"; + /** * NOTE — anti-pattern under test, do not copy. * @@ -86,9 +83,6 @@ class ReactingMapFactory implements IFluidDataStoreFactory { describe("Submit during stashed-op apply (end-to-end)", () => { it("rejects load when a valueChanged listener does a cross-map edit during apply", async () => { const deltaConnectionServer = LocalDeltaConnectionServer.create(); - const documentServiceFactory = new LocalDocumentServiceFactory(deltaConnectionServer); - const urlResolver = new LocalResolver(); - const codeDetails = { package: "test" }; // 1. Create the container with two named SharedMaps, attach, write, // disconnect, write offline, capture pending local state. @@ -99,19 +93,17 @@ describe("Submit during stashed-op apply (end-to-end)", () => { ], "default", ); - const goodRuntimeFactory = new ContainerRuntimeFactoryWithDefaultDataStore({ - defaultFactory: goodFactory, - registryEntries: [[goodFactory.type, Promise.resolve(goodFactory)]], + const { + codeDetails, + loaderProps: goodLoaderProps, + urlResolver, + } = createLoader({ + deltaConnectionServer, + defaultDataStoreFactory: goodFactory, }); - const goodCodeLoader = new LocalCodeLoader([[codeDetails, goodRuntimeFactory]]); const container = asLegacyAlpha( - await createDetachedContainer({ - codeDetails, - codeLoader: goodCodeLoader, - documentServiceFactory, - urlResolver, - }), + await createDetachedContainer({ codeDetails, ...goodLoaderProps }), ); const initialObject = (await container.getEntryPoint()) as ITestFluidObject; @@ -132,13 +124,14 @@ describe("Submit during stashed-op apply (end-to-end)", () => { // 2. Build a separate loader that, on existing=true loads, wires up a // valueChanged listener on `primary` that performs a cascading set - // on `secondary`. - const reactingFactory = new ReactingMapFactory(goodFactory); - const reactingRuntimeFactory = new ContainerRuntimeFactoryWithDefaultDataStore({ - defaultFactory: reactingFactory, - registryEntries: [[reactingFactory.type, Promise.resolve(reactingFactory)]], + // on `secondary`. Share the resolver and driver so the URL produced + // above resolves on the new loader. + const { loaderProps: reactingLoaderProps } = createLoader({ + deltaConnectionServer, + defaultDataStoreFactory: new ReactingMapFactory(goodFactory), + urlResolver, + documentServiceFactory: goodLoaderProps.documentServiceFactory, }); - const reactingCodeLoader = new LocalCodeLoader([[codeDetails, reactingRuntimeFactory]]); // 3. The stashed `offline` op fires `valueChanged` on `primary` during // apply; the listener's `secondary.set` reaches the runtime's @@ -147,9 +140,7 @@ describe("Submit during stashed-op apply (end-to-end)", () => { // the load rejects. await assert.rejects( loadExistingContainer({ - codeLoader: reactingCodeLoader, - documentServiceFactory, - urlResolver, + ...reactingLoaderProps, request: { url }, pendingLocalState, }), From 19a27cad67d54eb889e82df9766de8dd6e41cee4 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 17:51:58 -0700 Subject: [PATCH 14/17] test: simplify second createLoader call Drop the explicit `urlResolver` / `documentServiceFactory` pass-through on the second loader. Sharing the `deltaConnectionServer` is sufficient for URL resolution; each `createLoader` call can produce its own resolver and driver. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/test/submitDuringStashedOpApply.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts index fc8d1ffaf57a..e2d4a1e612a8 100644 --- a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -129,8 +129,6 @@ describe("Submit during stashed-op apply (end-to-end)", () => { const { loaderProps: reactingLoaderProps } = createLoader({ deltaConnectionServer, defaultDataStoreFactory: new ReactingMapFactory(goodFactory), - urlResolver, - documentServiceFactory: goodLoaderProps.documentServiceFactory, }); // 3. The stashed `offline` op fires `valueChanged` on `primary` during From 5d8aad9d7703f2d4bce9c8fef18fd768b9c4de69 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 13 May 2026 18:19:24 -0700 Subject: [PATCH 15/17] fix(test): satisfy jsdoc/check-indentation The `jsdoc/check-indentation` ESLint rule disallows indented continuation lines (and numbered lists) inside JSDoc bodies. Flatten the isReadOnly / activeLocalOperationActivity bullet list into a single paragraph. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/test/submitDuringStashedOpApply.spec.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts index e2d4a1e612a8..e8bf7f90c9c5 100644 --- a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -30,13 +30,12 @@ import { createLoader } from "../utils.js"; * stashed-op replay and rollback), and a cascading write inside the * handler can either land at the wrong moment in the op stream or be * silently dropped. If a cascading write is unavoidable, the handler - * MUST gate on both: - * 1. `IFluidDataStoreRuntime.isReadOnly()` — when `true`, the handler - * must not submit edits. The `"readonly"` event is the live signal. - * 2. `IFluidDataStoreRuntime.activeLocalOperationActivity` — when set - * (`"applyStashed"` or `"rollback"`), the runtime itself is - * driving the change, not the user, and the handler should not - * react with new ops. + * MUST gate on both `IFluidDataStoreRuntime.isReadOnly()` (when `true`, + * the handler must not submit edits; the `"readonly"` event is the live + * signal) and `IFluidDataStoreRuntime.activeLocalOperationActivity` + * (when set to `"applyStashed"` or `"rollback"`, the runtime itself is + * driving the change, not the user, and the handler should not react + * with new ops). * * This factory deliberately omits both gates so the load rejects with * the expected `UsageError`. Two SharedMaps are needed because the From df8d5953d94d75e6341b8e8deac0b360a7139f5a Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Thu, 14 May 2026 09:45:15 -0700 Subject: [PATCH 16/17] fix(container-runtime): call closeFn on submit-during-apply throw MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The submit-guard `throw` is outside the existing `submit()` try/catch, so without an explicit `closeFn` call the container only closes when the caller's promise chain wraps the rejection in `.catch(closeFn)`. The inline comment and the kill-switch contract both state "throw + close", but the code only delivered the throw — close happened transitively. Add `this.closeFn(error)` immediately before `throw error;`. `closeFn` is idempotent so a caller that also closes won't double-close. Extend the existing throw test to assert `closeFn` was invoked exactly once with the UsageError, so the contract is regression-protected. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 5 ++++ .../src/test/containerRuntime.spec.ts | 24 +++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index beb01b693188..f9a6ee6023e7 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -4895,6 +4895,11 @@ export class ContainerRuntime "Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow", ) !== true ) { + // Close the container before throwing so the "throw + close" + // contract is enforced by this code path rather than by + // whichever caller happens to wrap the throw in `.catch(closeFn)`. + // `closeFn` is idempotent; a caller that also closes won't double-close. + this.closeFn(error); throw error; } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 00ba0a8b6701..52d020a8f4ea 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1601,11 +1601,21 @@ describe("Runtime", () => { describe("Submit during stashed-op apply", () => { let containerRuntime: ContainerRuntime; let mockLogger: MockLogger; + let containerErrors: ICriticalContainerError[]; async function createRuntime(settings: Record = {}): Promise { mockLogger = new MockLogger(); + containerErrors = []; + const context = { + ...getMockContext({ logger: mockLogger, settings }), + closeFn: (error?: ICriticalContainerError): void => { + if (error !== undefined) { + containerErrors.push(error); + } + }, + }; const { runtime } = await ContainerRuntime.loadRuntime2({ - context: getMockContext({ logger: mockLogger, settings }) as IContainerContext, + context: context as IContainerContext, registry: new FluidDataStoreRegistry([]), existing: false, requestHandler: undefined, @@ -1641,7 +1651,7 @@ describe("Runtime", () => { }); } - it("throws a fatal usage error from submitMessage and logs", async () => { + it("throws, logs, and closes the container on submit during apply", async () => { await createRuntime(); setApplyingStashedOps(true); assert.throws( @@ -1657,6 +1667,16 @@ describe("Runtime", () => { messageType: ContainerMessageType.FluidDataStoreOp, }, ]); + assert.strictEqual( + containerErrors.length, + 1, + "closeFn should have been invoked exactly once", + ); + assert.strictEqual( + containerErrors[0].errorType, + ContainerErrorTypes.usageError, + "closeFn should have received the UsageError", + ); }); it("does not throw when the apply window is closed", async () => { From 9c50906fe0304c8c544da15cb0d78d7f58f7a7d6 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Thu, 14 May 2026 10:37:58 -0700 Subject: [PATCH 17/17] refactor(container-runtime): single source of truth for IdAllocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop `IdAllocation` from the stashed-op-apply guard allowlist. The downstream assert 0x9a5 ("IdAllocation should be submitted directly to outbox") already enforces that `IdAllocation` never reaches `submit()` at all, so the allowlist entry was dead code. If that contract ever shifts, the assert is the natural place a future author will see the impact and decide whether to also allowlist for the apply window — better than a forgotten reference in a comment block. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../container-runtime/src/containerRuntime.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f9a6ee6023e7..27abb07e61f8 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -4869,12 +4869,12 @@ export class ContainerRuntime // deferred and the apply window could close before the offending op // reaches the pending queue. // - // Allowlist: `BlobAttach` and `IdAllocation` are runtime-internal - // op types that may legitimately fire during apply. `BlobAttach` - // is produced by `sharePendingBlobs`, which is invoked from - // `loadRuntime2` before `applyStashedOpsAt` resolves. `IdAllocation` - // is asserted to bypass `submit()` entirely (see the assert below), - // but is allowlisted here defensively in case that contract shifts. + // Allowlist: `BlobAttach` is a runtime-internal op type that may + // legitimately fire during apply — produced by `sharePendingBlobs`, + // which is invoked from `loadRuntime2` before `applyStashedOpsAt` + // resolves. `IdAllocation` is not in this allowlist because the + // assert at 0x9a5 below enforces that it never reaches `submit()` + // at all; treating that assert as the single source of truth. // // Always surface the error event to telemetry on a bypass so we can // attribute incidents regardless of the kill-switch state. The kill @@ -4883,8 +4883,7 @@ export class ContainerRuntime // third-party DDS in production quietly bypasses the readonly gate. if ( this.pendingStateManager.isApplyingStashedOps && - containerRuntimeMessage.type !== ContainerMessageType.BlobAttach && - containerRuntimeMessage.type !== ContainerMessageType.IdAllocation + containerRuntimeMessage.type !== ContainerMessageType.BlobAttach ) { const error = new UsageError("Local op submitted during stashed-op apply window", { messageType: containerRuntimeMessage.type,