diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 9dec4133dc03..27abb07e61f8 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1360,7 +1360,14 @@ export class ContainerRuntime return this._getAttachState(); } - public readonly isReadOnly = (): boolean => this.deltaManager.readOnlyInfo.readonly === true; + public readonly isReadOnly = (): boolean => + // `_deltaManager` and `pendingStateManager` are both assigned partway + // through the constructor; `baseLogger` is built earlier and stamps + // `isReadOnly` on every error event (e.g. layer-compat failures + // during construction), so this can be called before either is + // assigned. Optional chains keep that window safe. + this._deltaManager?.readOnlyInfo.readonly === true || + this.pendingStateManager?.isApplyingStashedOps === true; /** * Current session schema - defines what options are on & off. @@ -1597,6 +1604,8 @@ export class ContainerRuntime private readonly extensions = new Map(); + public readonly baseLogger: ITelemetryBaseLogger; + /***/ protected constructor( context: IContainerContext, @@ -1610,7 +1619,7 @@ export class ContainerRuntime private readonly runtimeOptions: Readonly, private readonly containerScope: FluidObject, // Create a custom ITelemetryBaseLogger to output telemetry events. - public readonly baseLogger: ITelemetryBaseLogger, + logger: ITelemetryBaseLogger, existing: boolean, blobManagerLoadInfo: IBlobManagerLoadInfo, @@ -1663,15 +1672,20 @@ export class ContainerRuntime this.isSnapshotInstanceOfISnapshot = snapshotWithContents !== undefined; - this.mc = createChildMonitoringContext({ - logger: this.baseLogger, - namespace: "ContainerRuntime", + this.baseLogger = createChildLogger({ + logger, properties: { - all: { - inStagingMode: this.inStagingMode, + error: { + inStagingMode: () => this.inStagingMode, + isApplyingStashedOps: () => this.pendingStateManager?.isApplyingStashedOps, + isReadOnly: () => this.isReadOnly(), }, }, }); + this.mc = createChildMonitoringContext({ + logger: this.baseLogger, + namespace: "ContainerRuntime", + }); // Validate that the Loader is compatible with this Runtime. const maybeLoaderCompatDetailsForRuntime = context as FluidObject; @@ -1840,6 +1854,18 @@ export class ContainerRuntime }, pendingRuntimeState?.pending, this.baseLogger, + { + // PSM has cleared `isApplyingStashedOps`; `isReadOnly()` now + // reflects the network-readonly state again. Fan out so DDSes + // know they can submit once more. No open hook is needed — + // the apply window opens before `channelCollection` exists, + // so a fanout there would be a no-op; data stores instead + // pick up the initial readonly state from `isReadOnly()` + // when they're first asked. + onAfterStashedOpsApplied: () => { + this.notifyReadOnlyState(); + }, + }, ); let outerDeltaManager: IDeltaManagerFull = this.innerDeltaManager; @@ -2797,6 +2823,19 @@ export class ContainerRuntime return; } + // Invariant: the canSendOps edge in `setConnectionStateCore` — the + // only caller of this method — cannot fire while + // `applyStashedOpsAt` is in flight, because the loader awaits the + // apply before transitioning the runtime to a write-capable + // connection. If this assert ever fires, that contract has changed + // and the submit guard at `submit()` would catch a runtime-internal + // resubmit (`Rejoin`, `GC`, `FluidDataStoreOp`) for an op type + // outside the apply-window allowlist. + assert( + !this.pendingStateManager.isApplyingStashedOps, + "replayPendingStates must not be called during stashed-op apply window", + ); + // Replaying is an internal operation and we don't want to generate noise while doing it. // So temporarily disable dirty state change events, and save the old state. // When we're done, we'll emit the event if the state changed. @@ -2912,8 +2951,13 @@ export class ContainerRuntime } } - private readonly notifyReadOnlyState = (readonly: boolean): void => - this.channelCollection.notifyReadOnlyState(readonly); + private readonly notifyReadOnlyState = (): void => + // `channelCollection` may be undefined when invoked from the PSM's + // open hook, which fires from `new PendingStateManager(...)` earlier + // in this constructor than `channelCollection` is assigned. That's + // fine — DDSes created after this point will read the runtime's + // `isReadOnly()` aggregation and start out in the correct state. + this.channelCollection?.notifyReadOnlyState(this.isReadOnly()); public setConnectionState(canSendOps: boolean, clientId?: string): void { this.setConnectionStateToConnectedOrDisconnected(canSendOps, clientId); @@ -4814,6 +4858,51 @@ export class ContainerRuntime ): void { this.verifyNotClosed(); + // Nothing should be submitting while we're replaying stashed ops. + // The runtime is readonly during the apply window (see + // `PendingStateManager._applyLifecycle`), so a compliant DDS skips + // submits. If we land here anyway, a DDS bypassed the readonly gate + // (e.g. a realize-time write that doesn't consult `readOnly`) and + // produced a local op that has no counterpart in the saved-op + // replay — we cannot reconcile the mismatch, so fail fatally. We + // check here (rather than at flush) because outbox flushes are + // deferred and the apply window could close before the offending op + // reaches the pending queue. + // + // Allowlist: `BlobAttach` is a runtime-internal op type that may + // legitimately fire during apply — produced by `sharePendingBlobs`, + // which is invoked from `loadRuntime2` before `applyStashedOpsAt` + // resolves. `IdAllocation` is not in this allowlist because the + // assert at 0x9a5 below enforces that it never reaches `submit()` + // at all; treating that assert as the single source of truth. + // + // Always surface the error event to telemetry on a bypass so we can + // attribute incidents regardless of the kill-switch state. The kill + // switch `DisableSubmitDuringStashedApplyThrow` only suppresses the + // throw + container close, leaving an off-switch if a first- or + // third-party DDS in production quietly bypasses the readonly gate. + if ( + this.pendingStateManager.isApplyingStashedOps && + containerRuntimeMessage.type !== ContainerMessageType.BlobAttach + ) { + const error = new UsageError("Local op submitted during stashed-op apply window", { + messageType: containerRuntimeMessage.type, + }); + this.mc.logger.sendErrorEvent({ eventName: "SubmitDuringStashedOpApply" }, error); + if ( + this.mc.config.getBoolean( + "Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow", + ) !== true + ) { + // Close the container before throwing so the "throw + close" + // contract is enforced by this code path rather than by + // whichever caller happens to wrap the throw in `.catch(closeFn)`. + // `closeFn` is idempotent; a caller that also closes won't double-close. + this.closeFn(error); + throw error; + } + } + // There should be no ops in detached container state! assert( this.attachState !== AttachState.Detached, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index b654b2489327..1b421293a6cd 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -142,6 +142,26 @@ export interface IRuntimeStateHandler { isAttached: () => boolean; } +/** + * Optional hooks invoked at the close of the stashed-op apply lifecycle. + * + * `onAfterStashedOpsApplied` fires synchronously the first time + * `initialMessages` drains during `applyStashedOpsAt`, immediately after + * `isApplyingStashedOps` flips to `false`. Fires at most once per PSM lifetime. + * + * Synchronous: fires from a `finally` block where async behavior would + * complicate error propagation. + * + * No corresponding open hook is exposed. The apply window is opened eagerly + * in the PSM constructor, but at that point `ContainerRuntime` has not yet + * wired up the downstream observers (`channelCollection` is undefined), so a + * fanout fired from the constructor would be a no-op. Consumers that care + * about the open transition can read `isApplyingStashedOps` directly. + */ +export interface PendingStateManagerHooks { + onAfterStashedOpsApplied?: () => void; +} + function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean { const content = JSON.parse(message.content) as Partial; return content.type === "groupedBatch" && content.contents?.length === 0; @@ -368,15 +388,59 @@ export class PendingStateManager implements IDisposable { private readonly logger: ITelemetryLoggerExt; + /** + * One-way lifecycle of the stashed-op apply window: `notStarted` → `applying` → `ended`. + * + * Transitions are explicit and irreversible. `notStarted` → `applying` happens in the + * constructor when stashed state is present (i.e. `initialMessages` is non-empty at + * construction). The open is eager so the runtime is readonly from the moment any DDS + * could possibly observe it. `applying` → `ended` happens the first time + * {@link applyStashedOpsAt} drains `initialMessages`. After that, local edits are safe — + * they queue FIFO behind any remaining `pendingMessages`, preserving server-side ordering. + * + * The window never reopens. After `ended`, subsequent `applyStashedOpsAt` calls (e.g. + * from late `notifyOpReplay`s) early-return at the empty guard. + * + * `pendingMessages` state is intentionally NOT part of the close condition. Those + * entries are drained transparently by {@link replayPendingStates} on connect via + * resubmit (each pop is matched by a fresh push), so the queue size is conserved across + * resubmit and DDSes can't distinguish a resubmit-ack from a normal ack. Holding the + * window open through resubmit would force resubmits to run while the runtime is + * readonly, which is the inverse of what we want ("never resubmit during apply stashed + * ops"). + * + * An apply error leaves the lifecycle at `applying` because the queue isn't drained. + * That's fine: an error here is fatal for the load, the container is unusable, and + * there's no state to restore. + */ + private _applyLifecycle: "notStarted" | "applying" | "ended" = "notStarted"; + public get isApplyingStashedOps(): boolean { + return this._applyLifecycle === "applying"; + } + + private readonly hooks: PendingStateManagerHooks; + constructor( private readonly stateHandler: IRuntimeStateHandler, stashedLocalState: IPendingLocalState | undefined, logger: ITelemetryBaseLogger, + hooks: PendingStateManagerHooks = {}, ) { this.logger = createChildLogger({ logger }); + this.hooks = hooks; if (stashedLocalState?.pendingStates) { this.initialMessages.push(...stashedLocalState.pendingStates); } + // Open the apply window eagerly if there is any stashed work. The + // runtime is readonly while `isApplyingStashedOps` is true (see + // `ContainerRuntime.isReadOnly`); compliant DDSes consult `readOnly` + // at realize time and skip submits. No fanout fires here — downstream + // observers (`channelCollection`) are not yet constructed at this + // point in the runtime constructor, and the first real readonly read + // happens after the constructor returns. + if (!this.initialMessages.isEmpty()) { + this._applyLifecycle = "applying"; + } } public get disposed(): boolean { @@ -451,6 +515,10 @@ export class PendingStateManager implements IDisposable { * @param seqNum - Sequence number at which to apply ops. Will apply all ops if seqNum is undefined. */ public async applyStashedOpsAt(seqNum?: number): Promise { + if (this.initialMessages.isEmpty()) { + return; + } + // apply stashed ops at sequence number while (!this.initialMessages.isEmpty()) { if (seqNum !== undefined) { @@ -497,6 +565,16 @@ export class PendingStateManager implements IDisposable { throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage); } } + + // The apply window was opened eagerly in the constructor when there + // was any stashed work. We close it on full successful drain only. + // If an apply throws above, control never reaches here and the + // lifecycle stays at "applying" — the load is fatal so there's no + // recoverable state. + if (this._applyLifecycle === "applying" && this.initialMessages.isEmpty()) { + this._applyLifecycle = "ended"; + this.hooks.onAfterStashedOpsApplied?.(); + } } /** diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 93e78c4317a2..52d020a8f4ea 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1597,6 +1597,120 @@ describe("Runtime", () => { ); }); }); + + describe("Submit during stashed-op apply", () => { + let containerRuntime: ContainerRuntime; + let mockLogger: MockLogger; + let containerErrors: ICriticalContainerError[]; + + async function createRuntime(settings: Record = {}): Promise { + mockLogger = new MockLogger(); + containerErrors = []; + const context = { + ...getMockContext({ logger: mockLogger, settings }), + closeFn: (error?: ICriticalContainerError): void => { + if (error !== undefined) { + containerErrors.push(error); + } + }, + }; + const { runtime } = await ContainerRuntime.loadRuntime2({ + context: context as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + requestHandler: undefined, + runtimeOptions: {}, + provideEntryPoint: mockProvideEntryPoint, + }); + containerRuntime = runtime; + } + + function setApplyingStashedOps(isApplying: boolean): void { + const psm = ( + containerRuntime as unknown as { pendingStateManager: PendingStateManager } + ).pendingStateManager; + Object.defineProperty(psm, "isApplyingStashedOps", { + configurable: true, + get: () => isApplying, + }); + } + + function submitBlobAttach(): void { + // Mirrors `sendBlobAttachMessage` in ContainerRuntime; submit() is private. + ( + containerRuntime as unknown as { + submit: ( + message: { type: ContainerMessageType; contents: unknown }, + localOpMetadata: unknown, + metadata: { localId: string; blobId: string }, + ) => void; + } + ).submit({ type: ContainerMessageType.BlobAttach, contents: undefined }, undefined, { + localId: "local-1", + blobId: "blob-1", + }); + } + + it("throws, logs, and closes the container on submit during apply", async () => { + await createRuntime(); + setApplyingStashedOps(true); + assert.throws( + () => submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), + (error: IErrorBase) => + error.errorType === ContainerErrorTypes.usageError && + error.message === "Local op submitted during stashed-op apply window", + ); + mockLogger.assertMatchAny([ + { + eventName: "ContainerRuntime:SubmitDuringStashedOpApply", + category: "error", + messageType: ContainerMessageType.FluidDataStoreOp, + }, + ]); + assert.strictEqual( + containerErrors.length, + 1, + "closeFn should have been invoked exactly once", + ); + assert.strictEqual( + containerErrors[0].errorType, + ContainerErrorTypes.usageError, + "closeFn should have received the UsageError", + ); + }); + + it("does not throw when the apply window is closed", async () => { + await createRuntime(); + setApplyingStashedOps(false); + assert.doesNotThrow(() => + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), + ); + }); + + it("does not throw for BlobAttach during apply (allowlisted)", async () => { + await createRuntime(); + setApplyingStashedOps(true); + assert.doesNotThrow(() => submitBlobAttach()); + }); + + it("kill switch suppresses throw and logs error event", async () => { + await createRuntime({ + "Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow": true, + }); + setApplyingStashedOps(true); + assert.doesNotThrow(() => + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage), + ); + mockLogger.assertMatchAny([ + { + eventName: "ContainerRuntime:SubmitDuringStashedOpApply", + category: "error", + messageType: ContainerMessageType.FluidDataStoreOp, + }, + ]); + }); + }); + describe("Supports mixin classes", () => { it("new loadRuntime2 method works", async () => { const makeMixin = ( diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 21f581c8aee1..c451512f365b 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -834,6 +834,116 @@ describe("Pending State Manager", () => { assert.strictEqual(applyStashedOps.length, 0); assert.strictEqual(pendingStateManager.pendingMessagesCount, 1); }); + + describe("apply lifecycle", () => { + const stashedMessage = (refSeq: number, csn: number): IPendingMessage => ({ + type: "message", + content: '{"type":"component"}', + referenceSequenceNumber: refSeq, + localOpMetadata: undefined, + opMetadata: undefined, + batchInfo: { clientId: "CLIENT_ID", batchStartCsn: csn, length: 1, staged: false }, + runtimeOp: undefined, + }); + + const stateHandler = (): IRuntimeStateHandler => ({ + applyStashedOp: async () => undefined, + clientId: () => "clientId", + connected: () => true, + reSubmitBatch: () => {}, + isActiveConnection: () => false, + isAttached: () => true, + }); + + it("eagerly enters apply window in constructor when stashed state present", () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + assert.strictEqual(psm.isApplyingStashedOps, true); + assert.strictEqual(afterCount, 0); + }); + + it("does not enter apply window when no stashed state", () => { + let afterCount = 0; + const psm = new PendingStateManager(stateHandler(), undefined, logger, { + onAfterStashedOpsApplied: () => afterCount++, + }); + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(afterCount, 0); + }); + + it("closes apply window when initialMessages drains", async () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1), stashedMessage(11, 2)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + assert.strictEqual(psm.isApplyingStashedOps, true); + await psm.applyStashedOpsAt(); + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(afterCount, 1); + }); + + it("stays in apply window across partial drains", async () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1), stashedMessage(11, 2)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ) as unknown as PendingStateManager_WithPrivates; + + await (psm as unknown as PendingStateManager).applyStashedOpsAt(10); + assert.strictEqual(psm.isApplyingStashedOps, true); + assert.strictEqual(afterCount, 0); + assert.strictEqual(psm.initialMessages.length, 1); + + await (psm as unknown as PendingStateManager).applyStashedOpsAt(11); + assert.strictEqual(psm.isApplyingStashedOps, false); + assert.strictEqual(afterCount, 1); + assert.strictEqual(psm.initialMessages.length, 0); + }); + + it("close hook fires exactly once even with repeated applyStashedOpsAt calls", async () => { + let afterCount = 0; + const psm = new PendingStateManager( + stateHandler(), + { pendingStates: [stashedMessage(10, 1)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + await psm.applyStashedOpsAt(); + await psm.applyStashedOpsAt(); + await psm.applyStashedOpsAt(100); + assert.strictEqual(afterCount, 1); + assert.strictEqual(psm.isApplyingStashedOps, false); + }); + + it("leaves lifecycle in 'applying' and does not fire close hook when the last apply throws", async () => { + let afterCount = 0; + const failingHandler: IRuntimeStateHandler = { + ...stateHandler(), + applyStashedOp: async () => { + throw new Error("apply failed"); + }, + }; + const psm = new PendingStateManager( + failingHandler, + { pendingStates: [stashedMessage(10, 1)] }, + logger, + { onAfterStashedOpsApplied: () => afterCount++ }, + ); + await assert.rejects(psm.applyStashedOpsAt()); + assert.strictEqual(psm.isApplyingStashedOps, true); + assert.strictEqual(afterCount, 0); + }); + }); }); describe("Pending messages state", () => { @@ -930,18 +1040,24 @@ describe("Pending State Manager", () => { it("has both pending messages and initial messages", () => { const pendingStateManager = createPendingStateManager(forInitialMessages); - // let each message be its own batch + // Flushing while initialMessages is non-empty is a usage error (apply + // window is open), so push directly into the private pendingMessages + // queue to set up the dual-populated state under test. for (const message of forFlushedMessages) { - pendingStateManager.onFlushBatch( - [ - { - runtimeOp: message.runtimeOp, - referenceSequenceNumber: message.referenceSequenceNumber, - }, - ], - 0, - false /* staged */, - ); + pendingStateManager.pendingMessages.push({ + type: "message", + content: '{"type":"component"}', + referenceSequenceNumber: message.referenceSequenceNumber, + localOpMetadata: undefined, + opMetadata: undefined, + batchInfo: { + clientId: "CLIENT_ID", + batchStartCsn: 0, + length: 1, + staged: false, + }, + runtimeOp: message.runtimeOp, + }); } assert.strictEqual( pendingStateManager.hasPendingMessages(), diff --git a/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts new file mode 100644 index 000000000000..e8bf7f90c9c5 --- /dev/null +++ b/packages/test/local-server-tests/src/test/submitDuringStashedOpApply.spec.ts @@ -0,0 +1,148 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "assert"; + +import { + asLegacyAlpha, + createDetachedContainer, + loadExistingContainer, +} from "@fluidframework/container-loader/internal"; +import { type ISharedMap, SharedMap } from "@fluidframework/map/internal"; +import type { + IFluidDataStoreChannel, + IFluidDataStoreContext, + IFluidDataStoreFactory, +} from "@fluidframework/runtime-definitions/internal"; +import { LocalDeltaConnectionServer } from "@fluidframework/server-local-server"; +import { TestFluidObjectFactory } from "@fluidframework/test-utils/internal"; +import type { ITestFluidObject } from "@fluidframework/test-utils/internal"; + +import { createLoader } from "../utils.js"; + +/** + * NOTE — anti-pattern under test, do not copy. + * + * Submitting ops from inside DDS op-event handlers is bad practice and + * should be avoided. Op events fire during op processing (including + * stashed-op replay and rollback), and a cascading write inside the + * handler can either land at the wrong moment in the op stream or be + * silently dropped. If a cascading write is unavoidable, the handler + * MUST gate on both `IFluidDataStoreRuntime.isReadOnly()` (when `true`, + * the handler must not submit edits; the `"readonly"` event is the live + * signal) and `IFluidDataStoreRuntime.activeLocalOperationActivity` + * (when set to `"applyStashed"` or `"rollback"`, the runtime itself is + * driving the change, not the user, and the handler should not react + * with new ops). + * + * This factory deliberately omits both gates so the load rejects with + * the expected `UsageError`. Two SharedMaps are needed because the + * channel-level `stashedOpMd` capture in `ChannelDeltaConnection.submit` + * swallows any submit issued *on the same channel* while that channel's + * `applyStashedOp` is in flight. A submit targeting a *different* + * channel goes through the normal submit path — which is exactly the + * "event handler on map A writes to map B" shape that reaches the + * runtime guard in production. + */ +class ReactingMapFactory implements IFluidDataStoreFactory { + public constructor(private readonly inner: IFluidDataStoreFactory) {} + + public get IFluidDataStoreFactory(): IFluidDataStoreFactory { + return this; + } + public get type(): string { + return this.inner.type; + } + + public async instantiateDataStore( + context: IFluidDataStoreContext, + existing: boolean, + ): Promise { + const channel = await this.inner.instantiateDataStore(context, existing); + if (!existing) { + return channel; + } + // Eagerly realize both maps and wire up the cross-map listener. + // `IFluidDataStoreChannel` doesn't expose `getChannel`, but the + // concrete TestFluidObjectFactory runtime does. + const runtimeWithChannels = channel as IFluidDataStoreChannel & { + getChannel(id: string): Promise; + }; + const primary = (await runtimeWithChannels.getChannel("primary")) as ISharedMap; + const secondary = (await runtimeWithChannels.getChannel("secondary")) as ISharedMap; + primary.on("valueChanged", (changed) => { + secondary.set(`mirror:${changed.key}`, "cascaded-value"); + }); + return channel; + } +} + +describe("Submit during stashed-op apply (end-to-end)", () => { + it("rejects load when a valueChanged listener does a cross-map edit during apply", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + + // 1. Create the container with two named SharedMaps, attach, write, + // disconnect, write offline, capture pending local state. + const goodFactory = new TestFluidObjectFactory( + [ + ["primary", SharedMap.getFactory()], + ["secondary", SharedMap.getFactory()], + ], + "default", + ); + const { + codeDetails, + loaderProps: goodLoaderProps, + urlResolver, + } = createLoader({ + deltaConnectionServer, + defaultDataStoreFactory: goodFactory, + }); + + const container = asLegacyAlpha( + await createDetachedContainer({ codeDetails, ...goodLoaderProps }), + ); + + const initialObject = (await container.getEntryPoint()) as ITestFluidObject; + const primary = await initialObject.getSharedObject("primary"); + primary.set("pre-attach", "value"); + + await container.attach(urlResolver.createCreateNewRequest("submit-during-apply")); + primary.set("attached", "value"); + + const url = await container.getAbsoluteUrl(""); + assert(url !== undefined, "container should have a URL after attach"); + + container.disconnect(); + primary.set("offline", "value"); + + const pendingLocalState = await container.getPendingLocalState(); + container.close(); + + // 2. Build a separate loader that, on existing=true loads, wires up a + // valueChanged listener on `primary` that performs a cascading set + // on `secondary`. Share the resolver and driver so the URL produced + // above resolves on the new loader. + const { loaderProps: reactingLoaderProps } = createLoader({ + deltaConnectionServer, + defaultDataStoreFactory: new ReactingMapFactory(goodFactory), + }); + + // 3. The stashed `offline` op fires `valueChanged` on `primary` during + // apply; the listener's `secondary.set` reaches the runtime's + // submit guard (a different channel from the one in applyStashedOp, + // so the channel-level stashedOpMd capture doesn't swallow it), and + // the load rejects. + await assert.rejects( + loadExistingContainer({ + ...reactingLoaderProps, + request: { url }, + pendingLocalState, + }), + (error: Error & { message?: string }) => + error.message?.includes("Local op submitted during stashed-op apply window") === true, + ); + }); +});