Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b724ca8
fix(container-runtime): block local submits during stashed-op apply
anthony-murphy May 13, 2026
7b813f4
fix: address deep-review correctness bugs in stashed-op apply lifecycle
anthony-murphy May 13, 2026
35487d4
fix(container-runtime): satisfy jsdoc/check-indentation on PSM docstr…
anthony-murphy May 13, 2026
442d94b
fix(container-runtime): isReadOnly must tolerate undefined _deltaMana…
anthony-murphy May 13, 2026
bc58bea
Merge branch 'main' into debug-pending-state
anthony-murphy May 13, 2026
d405321
fix: address deep-review threads on stashed-op apply submit guard
anthony-murphy May 13, 2026
25ccf6d
fix(container-runtime): re-trigger replay from PSM close hook
anthony-murphy May 13, 2026
e6bcc3a
refactor(container-runtime): assert canSendOps doesn't fire mid-apply
anthony-murphy May 13, 2026
d6b5944
refactor: always log submit-during-apply; simplify PSM apply close
anthony-murphy May 13, 2026
471ab7d
test(local-server-tests): cover load-fails when DDS reacts during apply
anthony-murphy May 13, 2026
277ff9c
docs(test): label the cross-map listener as an anti-pattern
anthony-murphy May 14, 2026
a41e38e
docs(test): correct readonly guidance to point at the data store runtime
anthony-murphy May 14, 2026
b2ac7dc
docs(test): drop the "when readOnly is true" enumeration
anthony-murphy May 14, 2026
80b7b9a
refactor(test): use createLoader util to cut loader plumbing
anthony-murphy May 14, 2026
19a27ca
test: simplify second createLoader call
anthony-murphy May 14, 2026
5d8aad9
fix(test): satisfy jsdoc/check-indentation
anthony-murphy May 14, 2026
df8d595
fix(container-runtime): call closeFn on submit-during-apply throw
anthony-murphy May 14, 2026
30e8fd3
Merge branch 'main' into debug-pending-state
anthony-murphy May 14, 2026
9c50906
refactor(container-runtime): single source of truth for IdAllocation
anthony-murphy May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 98 additions & 9 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,14 @@ export class ContainerRuntime
return this._getAttachState();
}

public readonly isReadOnly = (): boolean => this.deltaManager.readOnlyInfo.readonly === true;
public readonly isReadOnly = (): boolean =>
// `_deltaManager` and `pendingStateManager` are both assigned partway
// through the constructor; `baseLogger` is built earlier and stamps
// `isReadOnly` on every error event (e.g. layer-compat failures
// during construction), so this can be called before either is
// assigned. Optional chains keep that window safe.
this._deltaManager?.readOnlyInfo.readonly === true ||
this.pendingStateManager?.isApplyingStashedOps === true;
Comment thread
anthony-murphy marked this conversation as resolved.

/**
* Current session schema - defines what options are on & off.
Expand Down Expand Up @@ -1597,6 +1604,8 @@ export class ContainerRuntime

private readonly extensions = new Map<ContainerExtensionId, ExtensionEntry>();

public readonly baseLogger: ITelemetryBaseLogger;

/***/
protected constructor(
context: IContainerContext,
Expand All @@ -1610,7 +1619,7 @@ export class ContainerRuntime
private readonly runtimeOptions: Readonly<ContainerRuntimeOptionsInternal>,
private readonly containerScope: FluidObject,
// Create a custom ITelemetryBaseLogger to output telemetry events.
public readonly baseLogger: ITelemetryBaseLogger,
logger: ITelemetryBaseLogger,
existing: boolean,

blobManagerLoadInfo: IBlobManagerLoadInfo,
Comment thread
anthony-murphy marked this conversation as resolved.
Expand Down Expand Up @@ -1663,15 +1672,20 @@ export class ContainerRuntime

this.isSnapshotInstanceOfISnapshot = snapshotWithContents !== undefined;

this.mc = createChildMonitoringContext({
logger: this.baseLogger,
namespace: "ContainerRuntime",
this.baseLogger = createChildLogger({
logger,
properties: {
all: {
inStagingMode: this.inStagingMode,
error: {
inStagingMode: () => this.inStagingMode,
isApplyingStashedOps: () => this.pendingStateManager?.isApplyingStashedOps,
isReadOnly: () => this.isReadOnly(),
},
},
});
this.mc = createChildMonitoringContext({
Comment thread
anthony-murphy marked this conversation as resolved.
logger: this.baseLogger,
namespace: "ContainerRuntime",
});

// Validate that the Loader is compatible with this Runtime.
const maybeLoaderCompatDetailsForRuntime = context as FluidObject<ILayerCompatDetails>;
Expand Down Expand Up @@ -1840,6 +1854,18 @@ export class ContainerRuntime
},
pendingRuntimeState?.pending,
this.baseLogger,
{
// PSM has cleared `isApplyingStashedOps`; `isReadOnly()` now
Comment thread
anthony-murphy marked this conversation as resolved.
// reflects the network-readonly state again. Fan out so DDSes
// know they can submit once more. No open hook is needed —
// the apply window opens before `channelCollection` exists,
// so a fanout there would be a no-op; data stores instead
// pick up the initial readonly state from `isReadOnly()`
// when they're first asked.
onAfterStashedOpsApplied: () => {
this.notifyReadOnlyState();
},
Comment thread
anthony-murphy marked this conversation as resolved.
},
);

let outerDeltaManager: IDeltaManagerFull = this.innerDeltaManager;
Expand Down Expand Up @@ -2797,6 +2823,19 @@ export class ContainerRuntime
return;
}

// Invariant: the canSendOps edge in `setConnectionStateCore` — the
// only caller of this method — cannot fire while
// `applyStashedOpsAt` is in flight, because the loader awaits the
// apply before transitioning the runtime to a write-capable
// connection. If this assert ever fires, that contract has changed
// and the submit guard at `submit()` would catch a runtime-internal
// resubmit (`Rejoin`, `GC`, `FluidDataStoreOp`) for an op type
// outside the apply-window allowlist.
assert(
!this.pendingStateManager.isApplyingStashedOps,
"replayPendingStates must not be called during stashed-op apply window",
);
Comment thread
anthony-murphy marked this conversation as resolved.

// Replaying is an internal operation and we don't want to generate noise while doing it.
// So temporarily disable dirty state change events, and save the old state.
// When we're done, we'll emit the event if the state changed.
Expand Down Expand Up @@ -2912,8 +2951,13 @@ export class ContainerRuntime
}
}

private readonly notifyReadOnlyState = (readonly: boolean): void =>
this.channelCollection.notifyReadOnlyState(readonly);
private readonly notifyReadOnlyState = (): void =>
// `channelCollection` may be undefined when invoked from the PSM's
// open hook, which fires from `new PendingStateManager(...)` earlier
// in this constructor than `channelCollection` is assigned. That's
// fine — DDSes created after this point will read the runtime's
// `isReadOnly()` aggregation and start out in the correct state.
Comment on lines +2955 to +2959
this.channelCollection?.notifyReadOnlyState(this.isReadOnly());
Comment thread
anthony-murphy marked this conversation as resolved.

public setConnectionState(canSendOps: boolean, clientId?: string): void {
this.setConnectionStateToConnectedOrDisconnected(canSendOps, clientId);
Expand Down Expand Up @@ -4814,6 +4858,51 @@ export class ContainerRuntime
): void {
Comment thread
anthony-murphy marked this conversation as resolved.
this.verifyNotClosed();

// Nothing should be submitting while we're replaying stashed ops.
Comment thread
anthony-murphy marked this conversation as resolved.
// The runtime is readonly during the apply window (see
// `PendingStateManager._applyLifecycle`), so a compliant DDS skips
// submits. If we land here anyway, a DDS bypassed the readonly gate
// (e.g. a realize-time write that doesn't consult `readOnly`) and
// produced a local op that has no counterpart in the saved-op
// replay — we cannot reconcile the mismatch, so fail fatally. We
// check here (rather than at flush) because outbox flushes are
// deferred and the apply window could close before the offending op
// reaches the pending queue.
//
// Allowlist: `BlobAttach` is a runtime-internal op type that may
// legitimately fire during apply — produced by `sharePendingBlobs`,
// which is invoked from `loadRuntime2` before `applyStashedOpsAt`
// resolves. `IdAllocation` is not in this allowlist because the
// assert at 0x9a5 below enforces that it never reaches `submit()`
// at all; treating that assert as the single source of truth.
//
// Always surface the error event to telemetry on a bypass so we can
// attribute incidents regardless of the kill-switch state. The kill
// switch `DisableSubmitDuringStashedApplyThrow` only suppresses the
// throw + container close, leaving an off-switch if a first- or
// third-party DDS in production quietly bypasses the readonly gate.
Comment thread
anthony-murphy marked this conversation as resolved.
if (
this.pendingStateManager.isApplyingStashedOps &&
containerRuntimeMessage.type !== ContainerMessageType.BlobAttach
) {
const error = new UsageError("Local op submitted during stashed-op apply window", {
messageType: containerRuntimeMessage.type,
});
Comment thread
anthony-murphy marked this conversation as resolved.
this.mc.logger.sendErrorEvent({ eventName: "SubmitDuringStashedOpApply" }, error);
if (
this.mc.config.getBoolean(
"Fluid.ContainerRuntime.DisableSubmitDuringStashedApplyThrow",
) !== true
) {
// Close the container before throwing so the "throw + close"
// contract is enforced by this code path rather than by
// whichever caller happens to wrap the throw in `.catch(closeFn)`.
// `closeFn` is idempotent; a caller that also closes won't double-close.
this.closeFn(error);
throw error;
Comment thread
anthony-murphy marked this conversation as resolved.
}
}

// There should be no ops in detached container state!
assert(
this.attachState !== AttachState.Detached,
Expand Down
78 changes: 78 additions & 0 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,26 @@ export interface IRuntimeStateHandler {
isAttached: () => boolean;
}

/**
* Optional hooks invoked at the close of the stashed-op apply lifecycle.
*
* `onAfterStashedOpsApplied` fires synchronously the first time
* `initialMessages` drains during `applyStashedOpsAt`, immediately after
* `isApplyingStashedOps` flips to `false`. Fires at most once per PSM lifetime.
*
* Synchronous: fires from a `finally` block where async behavior would
* complicate error propagation.
Comment on lines +152 to +153
*
* No corresponding open hook is exposed. The apply window is opened eagerly
* in the PSM constructor, but at that point `ContainerRuntime` has not yet
* wired up the downstream observers (`channelCollection` is undefined), so a
* fanout fired from the constructor would be a no-op. Consumers that care
* about the open transition can read `isApplyingStashedOps` directly.
*/
export interface PendingStateManagerHooks {
onAfterStashedOpsApplied?: () => void;
}

function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean {
const content = JSON.parse(message.content) as Partial<EmptyGroupedBatch>;
return content.type === "groupedBatch" && content.contents?.length === 0;
Expand Down Expand Up @@ -368,15 +388,59 @@ export class PendingStateManager implements IDisposable {

Comment thread
anthony-murphy marked this conversation as resolved.
private readonly logger: ITelemetryLoggerExt;

/**
* One-way lifecycle of the stashed-op apply window: `notStarted` → `applying` → `ended`.
*
* Transitions are explicit and irreversible. `notStarted` → `applying` happens in the
* constructor when stashed state is present (i.e. `initialMessages` is non-empty at
* construction). The open is eager so the runtime is readonly from the moment any DDS
* could possibly observe it. `applying` → `ended` happens the first time
* {@link applyStashedOpsAt} drains `initialMessages`. After that, local edits are safe —
* they queue FIFO behind any remaining `pendingMessages`, preserving server-side ordering.
*
* The window never reopens. After `ended`, subsequent `applyStashedOpsAt` calls (e.g.
* from late `notifyOpReplay`s) early-return at the empty guard.
*
* `pendingMessages` state is intentionally NOT part of the close condition. Those
* entries are drained transparently by {@link replayPendingStates} on connect via
* resubmit (each pop is matched by a fresh push), so the queue size is conserved across
* resubmit and DDSes can't distinguish a resubmit-ack from a normal ack. Holding the
* window open through resubmit would force resubmits to run while the runtime is
* readonly, which is the inverse of what we want ("never resubmit during apply stashed
* ops").
*
* An apply error leaves the lifecycle at `applying` because the queue isn't drained.
* That's fine: an error here is fatal for the load, the container is unusable, and
* there's no state to restore.
*/
private _applyLifecycle: "notStarted" | "applying" | "ended" = "notStarted";
public get isApplyingStashedOps(): boolean {
return this._applyLifecycle === "applying";
}

private readonly hooks: PendingStateManagerHooks;

constructor(
private readonly stateHandler: IRuntimeStateHandler,
stashedLocalState: IPendingLocalState | undefined,
logger: ITelemetryBaseLogger,
hooks: PendingStateManagerHooks = {},
) {
this.logger = createChildLogger({ logger });
this.hooks = hooks;
if (stashedLocalState?.pendingStates) {
this.initialMessages.push(...stashedLocalState.pendingStates);
}
// Open the apply window eagerly if there is any stashed work. The
// runtime is readonly while `isApplyingStashedOps` is true (see
// `ContainerRuntime.isReadOnly`); compliant DDSes consult `readOnly`
// at realize time and skip submits. No fanout fires here — downstream
// observers (`channelCollection`) are not yet constructed at this
// point in the runtime constructor, and the first real readonly read
// happens after the constructor returns.
if (!this.initialMessages.isEmpty()) {
this._applyLifecycle = "applying";
}
}

public get disposed(): boolean {
Expand Down Expand Up @@ -451,6 +515,10 @@ export class PendingStateManager implements IDisposable {
* @param seqNum - Sequence number at which to apply ops. Will apply all ops if seqNum is undefined.
*/
public async applyStashedOpsAt(seqNum?: number): Promise<void> {
if (this.initialMessages.isEmpty()) {
return;
}

// apply stashed ops at sequence number
while (!this.initialMessages.isEmpty()) {
if (seqNum !== undefined) {
Expand Down Expand Up @@ -497,6 +565,16 @@ export class PendingStateManager implements IDisposable {
throw DataProcessingError.wrapIfUnrecognized(error, "applyStashedOp", nextMessage);
}
Comment thread
anthony-murphy marked this conversation as resolved.
}

// The apply window was opened eagerly in the constructor when there
// was any stashed work. We close it on full successful drain only.
// If an apply throws above, control never reaches here and the
// lifecycle stays at "applying" — the load is fatal so there's no
// recoverable state.
if (this._applyLifecycle === "applying" && this.initialMessages.isEmpty()) {
this._applyLifecycle = "ended";
this.hooks.onAfterStashedOpsApplied?.();
}
}

/**
Expand Down
Loading
Loading