diff --git a/.changeset/squash-non-tree-dds.md b/.changeset/squash-non-tree-dds.md new file mode 100644 index 000000000000..fafbb727683d --- /dev/null +++ b/.changeset/squash-non-tree-dds.md @@ -0,0 +1,38 @@ +--- +"@fluidframework/cell": minor +"@fluidframework/counter": minor +"@fluid-experimental/ink": minor +"@fluidframework/legacy-dds": minor +"@fluidframework/map": minor +"@fluidframework/matrix": minor +"@fluidframework/merge-tree": minor +"@fluidframework/ordered-collection": minor +"@fluid-experimental/pact-map": minor +"@fluidframework/register-collection": minor +"@fluidframework/sequence": minor +"@fluidframework/task-manager": minor +--- + +Implement squash-on-resubmit for non-tree DDSes + +All non-tree DDSes now have explicit `reSubmitSquashed` overrides so staging-mode commits (`commitChanges({squash: true})`) can drop intermediate values before they reach the wire. Values written and removed within a single staging session (for example, a sensitive string set and then deleted before commit) are no longer transmitted as part of the squashed batch. + +The model is uniform across DDSes: the runtime walks staged pending changes oldest-to-newest and asks the DDS, for each change, whether a later staged change subsumes it. Subsumed changes are dropped (with the same kind of pending-state cleanup that rollback performs); non-subsumed changes are resubmitted unchanged. Pre-staging ops still in flight are never touched. + +Per-DDS treatment: + +- `SharedCell`, `SharedMap`, `SharedDirectory`, `SharedMatrix`: subsumption-aware squash drops superseded ops (per-cell / per-key LWW; for `clear` and `delete`, a later clear or a later op on the same key subsumes). For `SharedDirectory` subdirectory lifecycle ops, a staged `createSubDirectory(name) + deleteSubDirectory(name)` pair is also dropped so user-supplied subdirectory names don't leak when the pair nets to no-op. +- `SharedCounter`, `SharedTaskManager`: identity squash—increments and volunteer/abandon ops carry intent that is not subsumable by a later staged op of the same shape. +- `SharedSequence` text and endpoints: unchanged—squash was already wired end-to-end via merge-tree's `regeneratePendingOp(squash)`. +- `SharedSequence` / `SharedString` segment properties: merge-tree's `resetPendingDeltaToOps` now filters `annotate` `props` and `insert` `seg.properties` against keys touched by later staged annotates on the same segment. If every key is overridden the op is dropped entirely. +- `SharedSequence` interval properties: `IntervalCollection.resubmitMessage` now filters `add` / `change` `value.properties` against keys touched by later staged `add` / `change` ops on the same interval, and drops an `add` / `change` entirely when a later `delete` on the same interval subsumes it. +- legacy `SharedArray`: subsumption-aware squash drops a staged `insertEntry` (with its user-supplied `value`) when a later staged `deleteEntry`, deleting `toggle`, or move chain ending in either subsumes it; intervening `toggleMove` ops disable the optimization and the chain is resubmitted unchanged. +- `Ink`, `ConsensusRegisterCollection`, `ConsensusOrderedCollection`, `PactMap`, legacy `SharedSignal`: identity squash with documented rationale. These DDSes have append-only, order-preserving, or consensus-bound semantics where collapsing pending ops would change observable behavior. + +Together this removes the dependency on the `Fluid.SharedObject.AllowStagingModeWithoutSquashing` configuration flag fallback for the listed DDSes. + +Known limitations (documented in code; not addressed by these changes): + +- `ConsensusOrderedCollection.add` carries a serialized user value; an `add(secret) → acquire → complete` chain inside a staging session still transmits the `add` op on commit. +- `ConsensusRegisterCollection` writes participate in `readVersions()` history; collapsing pending writes would alter observable semantics, so intermediate writes during staging remain visible. +- `Ink` and legacy `SharedSignal` ops carry user-supplied pen / metadata; staging-mode notifications are intentionally transmitted on commit. diff --git a/packages/dds/cell/package.json b/packages/dds/cell/package.json index 19657a2bf007..e8d8004945aa 100644 --- a/packages/dds/cell/package.json +++ b/packages/dds/cell/package.json @@ -107,7 +107,9 @@ "devDependencies": { "@arethetypeswrong/cli": "^0.18.2", "@biomejs/biome": "~2.4.5", + "@fluid-internal/client-utils": "workspace:~", "@fluid-internal/mocha-test-setup": "workspace:~", + "@fluid-private/stochastic-test-utils": "workspace:~", "@fluid-private/test-dds-utils": "workspace:~", "@fluid-tools/build-cli": "catalog:buildTools", "@fluidframework/build-common": "^2.0.3", @@ -115,6 +117,7 @@ "@fluidframework/cell-previous": "npm:@fluidframework/cell@2.92.0", "@fluidframework/container-definitions": "workspace:~", "@fluidframework/eslint-config-fluid": "catalog:eslint", + "@fluidframework/runtime-utils": "workspace:~", "@fluidframework/test-runtime-utils": "workspace:~", "@microsoft/api-extractor": "7.58.1", "@types/mocha": "^10.0.10", diff --git a/packages/dds/cell/src/cell.ts b/packages/dds/cell/src/cell.ts index 6888040b9fc0..4f4f144131a1 100644 --- a/packages/dds/cell/src/cell.ts +++ b/packages/dds/cell/src/cell.ts @@ -334,6 +334,23 @@ export class SharedCell } } + /** + * Cell is last-write-wins: any pending op other than the latest is superseded. + * We drop superseded ops entirely (no wire emission) and resubmit only the final pending op, + * which by construction represents the cell's tip state. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + const cellOpMetadata = localOpMetadata as ICellLocalOpMetadata; + const lastPendingMessageId = this.pendingMessageIds[this.pendingMessageIds.length - 1]; + if (cellOpMetadata.pendingMessageId === lastPendingMessageId) { + this.submitLocalMessage(content, localOpMetadata); + } else { + const index = this.pendingMessageIds.indexOf(cellOpMetadata.pendingMessageId); + assert(index !== -1, 0xd01 /* Pending message id missing from queue during squash */); + this.pendingMessageIds.splice(index, 1); + } + } + /** * Rollback a local op. * diff --git a/packages/dds/cell/src/test/cell.spec.ts b/packages/dds/cell/src/test/cell.spec.ts index 214c8d4e1cce..3fd2ea634a0c 100644 --- a/packages/dds/cell/src/test/cell.spec.ts +++ b/packages/dds/cell/src/test/cell.spec.ts @@ -5,7 +5,12 @@ import { strict as assert } from "node:assert"; -import { type IGCTestProvider, runGCTests } from "@fluid-private/test-dds-utils"; +import { + enterStagingMode, + type IGCTestProvider, + reconnectAndSquash, + runGCTests, +} from "@fluid-private/test-dds-utils"; import { AttachState } from "@fluidframework/container-definitions"; import { MockContainerRuntimeFactory, @@ -488,6 +493,142 @@ describe("Cell", () => { }); }); + describe("Squash on resubmit", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForReconnection; + let dataStoreRuntime1: MockFluidDataStoreRuntime; + let containerRuntime1: MockContainerRuntimeForReconnection; + let cell1: ISharedCell; + let cell2: ISharedCell; + let peerObservations: { event: "valueChanged" | "delete"; value: unknown }[]; + + function createCellForSquash(id: string): { + cell: ISharedCell; + dataStoreRuntime: MockFluidDataStoreRuntime; + containerRuntime: MockContainerRuntimeForReconnection; + } { + const dataStoreRuntime = new MockFluidDataStoreRuntime(); + const containerRuntime = + containerRuntimeFactory.createContainerRuntime(dataStoreRuntime); + const services = { + deltaConnection: dataStoreRuntime.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + const cell = new SharedCell(id, dataStoreRuntime, CellFactory.Attributes); + cell.connect(services); + return { cell, dataStoreRuntime, containerRuntime }; + } + + beforeEach("createCellsForSquash", () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const response1 = createCellForSquash("cell1"); + cell1 = response1.cell; + dataStoreRuntime1 = response1.dataStoreRuntime; + containerRuntime1 = response1.containerRuntime; + const response2 = createCellForSquash("cell2"); + cell2 = response2.cell; + peerObservations = []; + cell2.on("valueChanged", (value) => + peerObservations.push({ event: "valueChanged", value }), + ); + cell2.on("delete", () => peerObservations.push({ event: "delete", value: undefined })); + }); + + it("drops intermediate set when a later delete supersedes it", () => { + const secret = "SSN: 123-45-6789"; + containerRuntime1.connected = false; + cell1.set(secret); + cell1.delete(); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(cell1.get(), undefined, "cell1 final state should be empty"); + assert.equal(cell2.get(), undefined, "cell2 final state should be empty"); + assert.deepEqual( + peerObservations, + [{ event: "delete", value: undefined }], + "peer must only see the final delete; intermediate set must not leak", + ); + for (const observation of peerObservations) { + assert.notEqual( + observation.value, + secret, + "secret value must never appear on the wire", + ); + } + }); + + it("drops intermediate sets when a later set supersedes them", () => { + const secret = "secret-intermediate"; + const finalValue = "final-value"; + containerRuntime1.connected = false; + cell1.set(secret); + cell1.set(finalValue); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(cell1.get(), finalValue); + assert.equal(cell2.get(), finalValue); + assert.deepEqual( + peerObservations, + [{ event: "valueChanged", value: finalValue }], + "peer should see exactly one set carrying the final value", + ); + }); + + it("squashes a long pending chain to the final state only", () => { + containerRuntime1.connected = false; + cell1.set("a"); + cell1.set("b"); + cell1.delete(); + cell1.set("c"); + cell1.delete(); + cell1.set("d"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(cell1.get(), "d"); + assert.equal(cell2.get(), "d"); + assert.equal(peerObservations.length, 1); + assert.deepEqual(peerObservations[0], { event: "valueChanged", value: "d" }); + }); + + it("passes through a single pending op unchanged", () => { + containerRuntime1.connected = false; + cell1.set("only"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(cell2.get(), "only"); + assert.deepEqual(peerObservations, [{ event: "valueChanged", value: "only" }]); + }); + + it("preserves a pre-staging set still in flight when a staging set is squashed", () => { + // Submit a pre-staging set while connected so it's in flight at the runtime layer + // but not yet ACKed when we disconnect. The staging-mode set + delete should leave + // the pre-staging value to land normally, and "secret" must never reach the peer. + cell1.set("pre"); + enterStagingMode(containerRuntime1); + cell1.set("secret"); + cell1.delete(); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(cell1.get(), undefined); + assert.equal(cell2.get(), undefined); + const sawPre = peerObservations.some( + (observation) => observation.event === "valueChanged" && observation.value === "pre", + ); + assert.equal( + sawPre, + true, + "pre-staging set must land on the peer when only the staged ops are squashed", + ); + for (const observation of peerObservations) { + assert.notEqual(observation.value, "secret", "staged secret must not leak"); + } + }); + }); + describe("Garbage Collection", () => { class GCSharedCellProvider implements IGCTestProvider { private subCellCount = 0; diff --git a/packages/dds/cell/src/test/cell.squash.fuzz.spec.ts b/packages/dds/cell/src/test/cell.squash.fuzz.spec.ts new file mode 100644 index 000000000000..db806bdae8dd --- /dev/null +++ b/packages/dds/cell/src/test/cell.squash.fuzz.spec.ts @@ -0,0 +1,142 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import * as path from "node:path"; + +import { TypedEventEmitter } from "@fluid-internal/client-utils"; +import { + type Generator, + createWeightedAsyncGenerator, + done, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; +import { + type DDSFuzzHarnessEvents, + type SquashFuzzModel, + type SquashFuzzTestState, + createSquashFuzzSuite, +} from "@fluid-private/test-dds-utils"; +import { isFluidHandle } from "@fluidframework/runtime-utils/internal"; + +import { CellFactory } from "../cellFactory.js"; + +import { _dirname } from "./dirname.cjs"; + +interface SetOp { + type: "set"; + value: string | number; +} +interface SetPoisonedOp { + type: "setPoisoned"; +} +interface DeleteOp { + type: "delete"; +} + +type SquashOp = SetOp | SetPoisonedOp | DeleteOp; + +type FuzzState = SquashFuzzTestState; + +function isPoisonedHandle(value: unknown): boolean { + return ( + isFluidHandle(value) && (value as unknown as { poisoned?: unknown }).poisoned === true + ); +} + +function makeGenerator(): (state: FuzzState) => Promise { + const isInStaging = (state: FuzzState): boolean => + state.client.stagingModeStatus === "staging"; + + const setOp = async (state: FuzzState): Promise => ({ + type: "set", + value: state.random.pick([ + (): string => state.random.string(state.random.integer(1, 4)), + (): number => state.random.integer(0, 100), + ])(), + }); + const setPoisoned = async (): Promise => ({ type: "setPoisoned" }); + const deleteOp = async (): Promise => ({ type: "delete" }); + + return createWeightedAsyncGenerator([ + [setOp, 6], + [setPoisoned, 4, isInStaging], + [deleteOp, 3], + ]); +} + +function makeExitingGenerator(): Generator { + return (state): SquashOp | typeof done => { + const value = state.client.channel.get(); + if (isPoisonedHandle(value)) { + return { type: "delete" }; + } + return done; + }; +} + +function reducer(state: FuzzState, op: SquashOp): void { + const { client } = state; + switch (op.type) { + case "set": { + client.channel.set(op.value); + break; + } + case "setPoisoned": { + client.channel.set(state.random.poisonedHandle()); + break; + } + case "delete": { + client.channel.delete(); + break; + } + default: { + break; + } + } +} + +const squashModel: SquashFuzzModel = { + workloadName: "cell squashing", + generatorFactory: () => takeAsync(60, makeGenerator()), + reducer, + validateConsistency: async (a, b) => { + const vA: unknown = a.channel.get(); + const vB: unknown = b.channel.get(); + if (isFluidHandle(vA)) { + assert(isFluidHandle(vB)); + } else { + assert.equal(vA, vB); + } + }, + factory: new CellFactory(), + exitingStagingModeGeneratorFactory: makeExitingGenerator, + validatePoisonedContentRemoved: (client) => { + const value: unknown = client.channel.get(); + assert( + !isPoisonedHandle(value), + "Poisoned handle in cell not removed before exiting staging", + ); + }, +}; + +const emitter = new TypedEventEmitter(); + +describe("SharedCell squash fuzz", () => { + createSquashFuzzSuite(squashModel, { + validationStrategy: { type: "fixedInterval", interval: 10 }, + reconnectProbability: 0.1, + numberOfClients: 3, + clientJoinOptions: { + maxNumberOfClients: 4, + clientAddProbability: 0.05, + }, + detachedStartOptions: { numOpsBeforeAttach: 0 }, + defaultTestCount: 50, + saveFailures: { directory: path.join(_dirname, "../../src/test/results-squash-cell") }, + emitter, + stagingMode: { changeStagingModeProbability: 0.15 }, + }); +}); diff --git a/packages/dds/cell/src/test/dirname.cts b/packages/dds/cell/src/test/dirname.cts new file mode 100644 index 000000000000..ac1703eb418b --- /dev/null +++ b/packages/dds/cell/src/test/dirname.cts @@ -0,0 +1,15 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +/** + * Problem: + * - `__dirname` is not defined in ESM + * - `import.meta.url` is not defined in CJS + * Solution: + * - Export '__dirname' from a .cjs file in the same directory. + * + * Note that *.cjs files are always CommonJS, but can be imported from ESM. + */ +export const _dirname = __dirname; diff --git a/packages/dds/counter/src/counter.ts b/packages/dds/counter/src/counter.ts index 4258984f9254..5eeb55450946 100644 --- a/packages/dds/counter/src/counter.ts +++ b/packages/dds/counter/src/counter.ts @@ -207,6 +207,16 @@ export class SharedCounter this.increment(counterOp.incrementAmount); } + /** + * Counter increments are commutative and additive — each pending increment carries the user's + * intent to add that amount to the counter. No increment is "subsumed" by a later one (a later + * `+3` doesn't cancel an earlier `+5`), so the squash on resubmit is the identity transform: + * each pending increment is resubmitted unchanged. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata); + } + /** * {@inheritDoc @fluidframework/shared-object-base#SharedObject.rollback} * @sealed diff --git a/packages/dds/counter/src/test/counter.spec.ts b/packages/dds/counter/src/test/counter.spec.ts index a5185e14848d..02abcd6aa74c 100644 --- a/packages/dds/counter/src/test/counter.spec.ts +++ b/packages/dds/counter/src/test/counter.spec.ts @@ -5,6 +5,7 @@ import { strict as assert } from "node:assert"; +import { enterStagingMode, reconnectAndSquash } from "@fluid-private/test-dds-utils"; import { AttachState } from "@fluidframework/container-definitions"; import type { IChannelFactory } from "@fluidframework/datastore-definitions/internal"; import { @@ -306,6 +307,80 @@ describe("SharedCounter", () => { }); }); + describe("SharedCounter squash on resubmit", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForReconnection; + let containerRuntime1: MockContainerRuntimeForReconnection; + let dataStoreRuntime1: MockFluidDataStoreRuntime; + let counter1: ISharedCounter; + let counter2: ISharedCounter; + let peerIncrements: number[]; + + beforeEach("createCountersForSquash", () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + dataStoreRuntime1 = new MockFluidDataStoreRuntime(); + containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1); + counter1 = factory.create(dataStoreRuntime1, "c1") as ISharedCounter; + counter1.connect({ + deltaConnection: dataStoreRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + const dsr2 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dsr2); + counter2 = factory.create(dsr2, "c2") as ISharedCounter; + counter2.connect({ + deltaConnection: dsr2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + peerIncrements = []; + counter2.on("incremented", (delta: number) => { + peerIncrements.push(delta); + }); + }); + + // Counter increments are commutative and not subsumable — each pending increment carries + // the user's explicit intent. Squash is the identity transform: every staged increment is + // resubmitted unchanged, just like a regular reSubmit. + + it("resubmits each staged increment unchanged", () => { + containerRuntime1.connected = false; + counter1.increment(5); + counter1.increment(3); + counter1.increment(-1); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(counter1.value, 7); + assert.equal(counter2.value, 7); + assert.deepEqual(peerIncrements, [5, 3, -1]); + }); + + it("preserves pre-staging increments still in flight at squash time", () => { + // Submit a pre-staging increment while connected (so it's in flight at the runtime + // layer but not yet ACKed when we disconnect). + counter1.increment(2); + enterStagingMode(containerRuntime1); + counter1.increment(5); + counter1.increment(-1); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(counter1.value, 6); + assert.equal(counter2.value, 6); + assert.deepEqual(peerIncrements, [2, 5, -1]); + }); + + it("passes through a single pending increment unchanged", () => { + containerRuntime1.connected = false; + counter1.increment(42); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(counter2.value, 42); + assert.deepEqual(peerIncrements, [42]); + }); + }); + describe("SharedCounter Rollback", () => { let counterFactory: IChannelFactory; let containerRuntimeFactory: MockContainerRuntimeFactory; diff --git a/packages/dds/ink/src/ink.ts b/packages/dds/ink/src/ink.ts index a9a91b0a5783..ab70163d4a76 100644 --- a/packages/dds/ink/src/ink.ts +++ b/packages/dds/ink/src/ink.ts @@ -259,6 +259,15 @@ export class Ink extends SharedObject implements IInk { return; } + /** + * Ink ops (createStroke / stylus / clear) are append-style: each op records an intentional + * user action and is never superseded by a later pending op. Squash is therefore the identity + * transform — we just replay the original ops via reSubmitCore. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata); + } + /** * Update the model for a clear operation. * @param operation - The operation object diff --git a/packages/dds/legacy-dds/src/array/sharedArray.ts b/packages/dds/legacy-dds/src/array/sharedArray.ts index 4ffd6bce12ba..09e56fe6aec6 100644 --- a/packages/dds/legacy-dds/src/array/sharedArray.ts +++ b/packages/dds/legacy-dds/src/array/sharedArray.ts @@ -46,6 +46,21 @@ import { SharedArrayRevertible } from "./sharedArrayRevertible.js"; const snapshotFileName = "header"; +/** + * Per-op pending state used by the squash-on-resubmit path. The same object is the + * `localOpMetadata` passed to `submitLocalMessage`, which lets us splice an entry out + * of {@link SharedArrayClass.pendingOps} when a later staged op subsumes it. + * + * - `entryId` is the op's primary entry (source for moves/toggleMoves). + * - `targetEntryId` is the destination for moves/toggleMoves; undefined otherwise. + */ +interface SharedArrayPendingOp { + readonly op: ISharedArrayOperation; + readonly type: OperationType; + readonly entryId: string; + readonly targetEntryId?: string; +} + /** * Represents a shared array that allows communication between distributed clients. * @@ -77,6 +92,47 @@ export class SharedArrayClass */ private readonly remoteDeleteWithLocalPendingDelete: Set = new Set(); + /** + * FIFO of in-flight local ops. The `localOpMetadata` passed to each `submitLocalMessage` + * call is the same object stored here, so {@link reSubmitSquashed} can splice an entry + * out when a later staged op subsumes it. + * + * Push on submit / `applyStashedOp`; shift on local ack; splice on squash-drop. + */ + private readonly pendingOps: SharedArrayPendingOp[] = []; + + /** + * Lazily-computed plan for the current resubmit batch. Identifies the set of + * pendingOps to drop together (so insertAfter dependency chains are preserved) + * and the insertAfter rewrites needed for non-dropped dependents. Invalidated on + * any pendingOps mutation outside the squash path (submit / local-ack). + */ + private cachedSquashPlan?: { + drops: Set>; + rewrites: Map, string | undefined>; + }; + + /** + * Lowest pendingOps index seen so far by {@link reSubmitSquashed} in the current + * resubmit batch. Anything below this index is a pre-staging op (already on the wire + * before staging began) that must not be considered as a chain root by + * {@link computeSquashPlan} — dropping its chain would silently retract an op the + * peer has already observed (or will observe via the runtime's pre-staging resubmit). + * + * Reset whenever pendingOps mutates outside the squash path so the next batch + * recomputes from scratch. + */ + private stagingBoundaryIdx?: number; + + /** + * Entries whose creation op was dropped by a prior squash batch and therefore + * never reached peers. Lives across staging cycles: a later cycle that needs to + * rewrite an `insertAfterEntryId` must skip these when picking a wire-valid + * predecessor, because they remain in {@link sharedArray} as deleted entries + * but are not visible to any peer. + */ + private readonly wireBlacklist = new Set(); + /** * Create a new shared array * @@ -220,7 +276,7 @@ export class SharedArrayClass return; } - this.submitLocalMessage(op); + this.submitArrayOp(op); } public delete(index: number): void { @@ -247,7 +303,7 @@ export class SharedArrayClass return; } - this.submitLocalMessage(op); + this.submitArrayOp(op); } public rearrangeToFront(values: T[]): void { @@ -324,7 +380,7 @@ export class SharedArrayClass return; } - this.submitLocalMessage(op); + this.submitArrayOp(op); } /** @@ -360,7 +416,7 @@ export class SharedArrayClass return; } - this.submitLocalMessage(op); + this.submitArrayOp(op); } /** * Method to do undo/redo of move operation. All entries of the same payload/value are stored @@ -398,7 +454,7 @@ export class SharedArrayClass return; } - this.submitLocalMessage(op); + this.submitArrayOp(op); } public rollback(op: unknown, _localOpMetadata: unknown): void { @@ -522,6 +578,274 @@ export class SharedArrayClass */ protected onDisconnect(): void {} + /** + * Per-op squash. The only op that carries user content is `insertEntry` (its `value`); the + * others reference entryIds. So the subsumption walk starts at each staged `insertEntry` + * and follows the chain forward through `moveEntry` (which re-homes the entry's value + * under a new entryId) until the chain terminates. If the chain's final state is "deleted" + * — via `deleteEntry` or `toggle(isDeleted=true)` — the entire chain is dropped (including + * the insert that carried the value). If the chain remains live, or if a `toggleMove` + * intervenes (which can resurrect an earlier link in unpredictable ways), the chain is + * resubmitted unchanged. + * + * Dropped ops are spliced from {@link pendingOps} in this pass; later `reSubmitSquashed` + * calls for those same ops short-circuit via the membership check. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + const pendingOp = localOpMetadata as SharedArrayPendingOp; + if (this.stagingBoundaryIdx === undefined) { + // First staged op in this resubmit batch — anchor the staging boundary at its + // position so chain walking from earlier (pre-staging) inserts can't pull a + // staged delete into a drop set and silently corrupt state. + const idx = this.pendingOps.indexOf(pendingOp); + this.stagingBoundaryIdx = idx === -1 ? this.pendingOps.length : idx; + this.cachedSquashPlan = undefined; + } + this.cachedSquashPlan ??= this.computeSquashPlan(this.stagingBoundaryIdx); + const { drops, rewrites } = this.cachedSquashPlan; + if (drops.has(pendingOp)) { + const idx = this.pendingOps.indexOf(pendingOp); + if (idx !== -1) { + this.pendingOps.splice(idx, 1); + } + return; + } + if (rewrites.has(pendingOp)) { + const newInsertAfter = rewrites.get(pendingOp); + const op = content as ISharedArrayOperation; + if (op.type === OperationType.insertEntry || op.type === OperationType.moveEntry) { + this.reSubmitCore({ ...op, insertAfterEntryId: newInsertAfter }, localOpMetadata); + return; + } + } + this.reSubmitCore(content, localOpMetadata); + } + + /** + * Push a pending-op record and submit the op with that record as `localOpMetadata`. + * The record is consumed FIFO on local ack (see {@link processMessage}) and may be + * spliced out earlier by a squash decision (see {@link reSubmitSquashed}). + */ + private submitArrayOp(op: ISharedArrayOperation): void { + const pendingOp = this.buildPendingOp(op); + this.pendingOps.push(pendingOp); + this.cachedSquashPlan = undefined; + this.stagingBoundaryIdx = undefined; + this.submitLocalMessage(op, pendingOp); + } + + private buildPendingOp(op: ISharedArrayOperation): SharedArrayPendingOp { + switch (op.type) { + case OperationType.insertEntry: + case OperationType.deleteEntry: + case OperationType.toggle: { + return { op, type: op.type, entryId: op.entryId }; + } + case OperationType.moveEntry: + case OperationType.toggleMove: { + return { + op, + type: op.type, + entryId: op.entryId, + targetEntryId: op.changedToEntryId, + }; + } + default: { + unreachableCase(op); + } + } + } + + /** + * Compute the squash plan for the current pendingOps state. Two passes: + * pass 1 identifies each insertEntry chain (insert + moves + terminal + * delete/toggle); chains that terminate in a deleted state contribute their + * ops to `drops` and their entryIds to `droppedEntries`. Pass 2 computes + * insertAfter rewrites for non-dropped insert/move ops whose + * `insertAfterEntryId` references a dropped entry, by walking sharedArray + * backward to the nearest non-dropped entry. + */ + private computeSquashPlan(stagingBoundaryIdx: number = 0): { + drops: Set>; + rewrites: Map, string | undefined>; + } { + const drops = new Set>(); + const droppedEntries = new Set(); + const claimed = new Set>(); + + for (let opIdx = 0; opIdx < this.pendingOps.length; opIdx++) { + const op = this.pendingOps[opIdx]; + assert(op !== undefined, "pendingOps index in range"); + if (op.type !== OperationType.insertEntry || claimed.has(op)) { + continue; + } + if (opIdx < stagingBoundaryIdx) { + // Pre-staging insert — already on the wire, can't be retracted via squash. + continue; + } + const chain = this.walkInsertChain(op); + if (chain === undefined) { + continue; + } + for (const chainOp of chain.ops) { + drops.add(chainOp); + claimed.add(chainOp); + } + for (const entry of chain.entries) { + droppedEntries.add(entry); + this.wireBlacklist.add(entry); + } + } + + const rewrites = new Map, string | undefined>(); + if (droppedEntries.size > 0) { + const entryBirthIdx = this.buildEntryBirthIndexMap(); + for (let pIdx = 0; pIdx < this.pendingOps.length; pIdx++) { + const pendingOp = this.pendingOps[pIdx]; + assert(pendingOp !== undefined, 0xcfd /* pendingOps index in range */); + if (drops.has(pendingOp)) { + continue; + } + const op = pendingOp.op; + if (op.type !== OperationType.insertEntry && op.type !== OperationType.moveEntry) { + continue; + } + if ( + op.insertAfterEntryId === undefined || + (!droppedEntries.has(op.insertAfterEntryId) && + !this.wireBlacklist.has(op.insertAfterEntryId)) + ) { + continue; + } + const anchorEntryId = + op.type === OperationType.moveEntry ? op.changedToEntryId : pendingOp.entryId; + rewrites.set( + pendingOp, + this.resolveRewriteTarget(anchorEntryId, pIdx, droppedEntries, entryBirthIdx), + ); + } + } + return { drops, rewrites }; + } + + /** + * Map from entryId to the pendingOps index where that entry is created + * (insertEntry or moveEntry's target). Entries not in the map are pre-staging + * acked entries that are already on the wire. + */ + private buildEntryBirthIndexMap(): Map { + const map = new Map(); + for (let i = 0; i < this.pendingOps.length; i++) { + const p = this.pendingOps[i]; + assert(p !== undefined, 0xcfe /* pendingOps index in range */); + if (p.type === OperationType.insertEntry) { + map.set(p.entryId, i); + } else if (p.type === OperationType.moveEntry && p.targetEntryId !== undefined) { + map.set(p.targetEntryId, i); + } + } + return map; + } + + /** + * Walk sharedArray backward from the given entry's position to find the nearest + * entry that will be on the wire when the rewritten op is submitted. A candidate + * qualifies when it is not in `droppedEntries`, not in `wireBlacklist`, and is + * either acked pre-staging (not in `entryBirthIdx`) or born earlier than this + * op in the same squash batch. Returns undefined when no qualifying predecessor + * exists, which means the rewrite anchors to the front. + */ + private resolveRewriteTarget( + anchorEntryId: string, + opPendingIdx: number, + droppedEntries: Set, + entryBirthIdx: Map, + ): string | undefined { + const idx = this.findIndexOfEntryId(anchorEntryId); + if (idx <= 0) { + return undefined; + } + for (let i = idx - 1; i >= 0; i--) { + const entry = this.sharedArray[i]; + if (entry === undefined) { + continue; + } + if (droppedEntries.has(entry.entryId)) { + continue; + } + if (this.wireBlacklist.has(entry.entryId)) { + // Dropped by an earlier squash batch — sharedArray still has the (deleted) + // entry locally, but it never reached peers. + continue; + } + const birthIdx = entryBirthIdx.get(entry.entryId); + if (birthIdx !== undefined && birthIdx >= opPendingIdx) { + // Predecessor's wire op is submitted later than this one; not yet on + // the wire at the time of submission. + continue; + } + return entry.entryId; + } + return undefined; + } + + /** + * Walk forward from the given insertEntry collecting its chain — insert + any + * `moveEntry` hops + a terminating `deleteEntry` or deleting `toggle`. Returns + * the chain ops and entryIds touched if the chain ends in a deleted state; + * undefined otherwise. Ignores forward dependencies; rewrite handling lives in + * the caller. + */ + private walkInsertChain( + insertOp: SharedArrayPendingOp, + ): { ops: SharedArrayPendingOp[]; entries: Set } | undefined { + const startIdx = this.pendingOps.indexOf(insertOp); + if (startIdx === -1) { + return undefined; + } + + let currentEntry = insertOp.entryId; + let isCurrentlyDeleted = false; + const ops: SharedArrayPendingOp[] = [insertOp]; + const entries = new Set([currentEntry]); + + for (let i = startIdx + 1; i < this.pendingOps.length; i++) { + const candidate = this.pendingOps[i]; + assert(candidate !== undefined, 0xcf9 /* pendingOps index in range */); + const sourceMatches = candidate.entryId === currentEntry; + const targetMatches = candidate.targetEntryId === currentEntry; + if (!sourceMatches && !targetMatches) { + continue; + } + if (candidate.type === OperationType.deleteEntry && sourceMatches) { + ops.push(candidate); + isCurrentlyDeleted = true; + continue; + } + if (candidate.type === OperationType.toggle && sourceMatches) { + ops.push(candidate); + isCurrentlyDeleted = (candidate.op as IToggleOperation).isDeleted; + continue; + } + if (candidate.type === OperationType.moveEntry && sourceMatches) { + ops.push(candidate); + assert( + candidate.targetEntryId !== undefined, + 0xcfa /* moveEntry pendingOp has target */, + ); + currentEntry = candidate.targetEntryId; + entries.add(currentEntry); + isCurrentlyDeleted = false; + continue; + } + // toggleMove or a move-into-chain rewires the skip list in ways the walker + // can't safely compose; bail. + return undefined; + } + + return isCurrentlyDeleted ? { ops, entries } : undefined; + } + /** * Tracks the doubly linked skip list for the given entry to identify local pending counter attribute. * It signifies if a local pending operation exists for the payload/value being tracked in the skip list @@ -618,7 +942,14 @@ export class SharedArrayClass throw new Error("Unknown operation"); } } - if (!local) { + if (local) { + // Pending ops are FIFO-consumed on local ack. Squash-drops splice their + // entries out earlier in {@link reSubmitSquashed}, so the shifted entry + // here always matches the op being acked. + this.pendingOps.shift(); + this.cachedSquashPlan = undefined; + this.stagingBoundaryIdx = undefined; + } else { this.emitValueChangedEvent(op, local); } } @@ -1024,6 +1355,6 @@ export class SharedArrayClass unreachableCase(op); } } - this.submitLocalMessage(op); + this.submitArrayOp(op); } } diff --git a/packages/dds/legacy-dds/src/signal/sharedSignal.ts b/packages/dds/legacy-dds/src/signal/sharedSignal.ts index 556c7b37fcc5..2ea6f702e287 100644 --- a/packages/dds/legacy-dds/src/signal/sharedSignal.ts +++ b/packages/dds/legacy-dds/src/signal/sharedSignal.ts @@ -132,6 +132,18 @@ export class SharedSignalClass */ protected onDisconnect(): void {} + /** + * Signals are fire-and-forget notifications. Each pending notify represents an intentional + * user event and is never superseded by a later op. Squash is therefore the identity transform. + * + * Caveat for staging mode: a notify made during staging will be transmitted on commit. Callers + * who use signals to carry sensitive metadata should avoid notifying inside a staging session + * unless the metadata is intended to be transmitted. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata); + } + protected override processMessagesCore(messagesCollection: IRuntimeMessageCollection): void { const { envelope, local, messagesContent } = messagesCollection; for (const messageContent of messagesContent) { diff --git a/packages/dds/legacy-dds/src/test/array/sharedArraySquash.fuzz.spec.ts b/packages/dds/legacy-dds/src/test/array/sharedArraySquash.fuzz.spec.ts new file mode 100644 index 000000000000..73919fcba5a9 --- /dev/null +++ b/packages/dds/legacy-dds/src/test/array/sharedArraySquash.fuzz.spec.ts @@ -0,0 +1,261 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import * as path from "node:path"; + +import { TypedEventEmitter } from "@fluid-internal/client-utils"; +import { + type Generator, + createWeightedAsyncGenerator, + done, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; +import { + type DDSFuzzHarnessEvents, + createSquashFuzzSuite, + type SquashFuzzModel, + type SquashFuzzTestState, +} from "@fluid-private/test-dds-utils"; +import type { IFluidHandle } from "@fluidframework/core-interfaces"; + +import type { ISharedArray, SerializableTypeForSharedArray } from "../../index.js"; +import { OperationType, SharedArrayFactory } from "../../index.js"; + +import { _dirname } from "./dirname.cjs"; + +type PoisonValue = SerializableTypeForSharedArray; + +interface AddPoisonedHandle { + type: "addPoisonedHandle"; + index: number; +} + +interface InsertString { + type: "insertString"; + index: number; + value: string; +} + +interface DeleteAt { + type: "deleteAt"; + index: number; +} + +interface MoveEntry { + type: "moveEntry"; + oldIndex: number; + newIndex: number; +} + +type SquashOperation = AddPoisonedHandle | InsertString | DeleteAt | MoveEntry; + +type SquashFactory = SharedArrayFactory; + +type TrackablePoisonedSharedArray = ISharedArray & { + poisonedEntryIds: Set; +}; + +function isTrackable( + channel: ISharedArray, +): channel is TrackablePoisonedSharedArray { + return (channel as TrackablePoisonedSharedArray).poisonedEntryIds !== undefined; +} + +function isPoisonedHandle(value: unknown): value is IFluidHandle & { poisoned: true } { + return ( + value !== null && + typeof value === "object" && + (value as { poisoned?: unknown }).poisoned === true + ); +} + +const eventEmitterForFuzzHarness = new TypedEventEmitter(); + +eventEmitterForFuzzHarness.on("clientCreate", (client) => { + const channel = client.channel as TrackablePoisonedSharedArray; + channel.poisonedEntryIds = new Set(); + channel.on("valueChanged", (op) => { + switch (op.type) { + case OperationType.insertEntry: { + // Poisoned-ness is tagged by reducer at addPoisonedHandle time; + // nothing to do here for non-poisoned inserts. + break; + } + case OperationType.deleteEntry: { + channel.poisonedEntryIds.delete(op.entryId); + break; + } + case OperationType.moveEntry: { + if (channel.poisonedEntryIds.has(op.entryId)) { + channel.poisonedEntryIds.delete(op.entryId); + channel.poisonedEntryIds.add(op.changedToEntryId); + } + break; + } + case OperationType.toggle: { + // Toggling a delete back to insert resurrects an entry; if it was poisoned, + // it still is. We don't track the reverse direction because the generator + // never targets poisoned entries with toggle ops. + break; + } + case OperationType.toggleMove: { + if (channel.poisonedEntryIds.has(op.changedToEntryId)) { + channel.poisonedEntryIds.delete(op.changedToEntryId); + channel.poisonedEntryIds.add(op.entryId); + } + break; + } + default: { + break; + } + } + }); +}); + +function makeSquashGenerator(): ( + state: SquashFuzzTestState, +) => Promise { + const insertOp = async ( + state: SquashFuzzTestState, + ): Promise => ({ + type: "insertString", + index: state.random.integer(0, Math.max(0, state.client.channel.get().length)), + value: state.random.string(state.random.integer(1, 5)), + }); + + const addPoisonedHandleOp = async ( + state: SquashFuzzTestState, + ): Promise => ({ + type: "addPoisonedHandle", + index: state.random.integer(0, Math.max(0, state.client.channel.get().length)), + }); + + const deleteOp = async (state: SquashFuzzTestState): Promise => ({ + type: "deleteAt", + index: state.random.integer(0, Math.max(0, state.client.channel.get().length - 1)), + }); + + const moveEntryOp = async ( + state: SquashFuzzTestState, + ): Promise => { + const len = state.client.channel.get().length; + return { + type: "moveEntry", + oldIndex: state.random.integer(0, Math.max(0, len - 1)), + newIndex: state.random.integer(0, Math.max(0, len)), + }; + }; + + const isInStagingMode = (state: SquashFuzzTestState): boolean => + state.client.stagingModeStatus === "staging"; + const hasEntries = (state: SquashFuzzTestState): boolean => + state.client.channel.get().length > 0; + + return createWeightedAsyncGenerator>([ + [insertOp, 6], + [addPoisonedHandleOp, 3, isInStagingMode], + [deleteOp, 3, hasEntries], + // moveEntry intentionally omitted: its skip-list rewiring composes with insert chains + // in ways that aren't yet covered by the chain walker. Tracked separately. + [moveEntryOp, 0, hasEntries], + ]); +} + +function makeExitingStagingModeGenerator(): Generator< + SquashOperation, + SquashFuzzTestState +> { + return (state): SquashOperation | typeof done => { + const channel = state.client.channel; + const values = channel.get(); + for (let i = 0; i < values.length; i++) { + if (isPoisonedHandle(values[i])) { + return { type: "deleteAt", index: i }; + } + } + return done; + }; +} + +function squashReducer(state: SquashFuzzTestState, op: SquashOperation): void { + const { client } = state; + assert(isTrackable(client.channel), "channel must be set up via clientCreate emitter"); + switch (op.type) { + case "insertString": { + client.channel.insert(op.index, op.value); + break; + } + case "addPoisonedHandle": { + const handle = state.random.poisonedHandle(); + const before = new Set(); + const captureEntryId = (eventOp: { type: number; entryId?: string }): void => { + if (eventOp.type === OperationType.insertEntry && eventOp.entryId !== undefined) { + before.add(eventOp.entryId); + } + }; + client.channel.on("valueChanged", captureEntryId); + try { + client.channel.insert(op.index, handle); + } finally { + client.channel.off("valueChanged", captureEntryId); + } + for (const entryId of before) { + client.channel.poisonedEntryIds.add(entryId); + } + break; + } + case "deleteAt": { + client.channel.delete(op.index); + break; + } + case "moveEntry": { + client.channel.move(op.oldIndex, op.newIndex); + break; + } + default: { + break; + } + } +} + +function validatePoisonedContentRemoved(client: { channel: ISharedArray }): void { + const values = client.channel.get(); + for (let i = 0; i < values.length; i++) { + assert( + !isPoisonedHandle(values[i]), + `Poisoned handle at index ${i} not removed before exiting staging mode`, + ); + } +} + +const squashModel: SquashFuzzModel = { + workloadName: "sharedArray squashing", + generatorFactory: () => takeAsync(60, makeSquashGenerator()), + reducer: squashReducer, + validateConsistency: async (a, b) => { + assert.deepEqual(a.channel.get(), b.channel.get()); + }, + factory: new SharedArrayFactory(), + exitingStagingModeGeneratorFactory: makeExitingStagingModeGenerator, + validatePoisonedContentRemoved, +}; + +describe("SharedArray squash fuzz", () => { + createSquashFuzzSuite(squashModel, { + validationStrategy: { type: "fixedInterval", interval: 10 }, + reconnectProbability: 0, + numberOfClients: 1, + clientJoinOptions: { + maxNumberOfClients: 1, + clientAddProbability: 0, + }, + detachedStartOptions: { numOpsBeforeAttach: 0 }, + defaultTestCount: 50, + saveFailures: { directory: path.join(_dirname, "../../src/test/results-squash") }, + emitter: eventEmitterForFuzzHarness, + stagingMode: { changeStagingModeProbability: 0.2 }, + }); +}); diff --git a/packages/dds/legacy-dds/src/test/array/sharedArraySquash.spec.ts b/packages/dds/legacy-dds/src/test/array/sharedArraySquash.spec.ts new file mode 100644 index 000000000000..2f2971b5635b --- /dev/null +++ b/packages/dds/legacy-dds/src/test/array/sharedArraySquash.spec.ts @@ -0,0 +1,187 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { enterStagingMode, reconnectAndSquash } from "@fluid-private/test-dds-utils"; +import type { IChannelFactory } from "@fluidframework/datastore-definitions/internal"; +import { + MockContainerRuntimeFactoryForReconnection, + type MockContainerRuntimeForReconnection, + MockFluidDataStoreRuntime, + MockStorage, +} from "@fluidframework/test-runtime-utils/internal"; + +import type { ISharedArray, ISharedArrayOperation } from "../../index.js"; +import { SharedArrayBuilder } from "../../index.js"; + +describe("SharedArray squash on resubmit", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForReconnection; + let dataStoreRuntime1: MockFluidDataStoreRuntime; + let containerRuntime1: MockContainerRuntimeForReconnection; + let sharedArray1: ISharedArray; + let sharedArray2: ISharedArray; + let peerOps: ISharedArrayOperation[]; + let factory: IChannelFactory>; + + function createSharedArrayForSquash(id: string): { + array: ISharedArray; + dataStoreRuntime: MockFluidDataStoreRuntime; + containerRuntime: MockContainerRuntimeForReconnection; + } { + const dataStoreRuntime = new MockFluidDataStoreRuntime(); + dataStoreRuntime.local = false; + const containerRuntime = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime); + const services = { + deltaConnection: containerRuntime.createDeltaConnection(), + objectStorage: new MockStorage(), + }; + const array = factory.create(dataStoreRuntime, id); + array.connect(services); + return { array, dataStoreRuntime, containerRuntime }; + } + + beforeEach(() => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + factory = SharedArrayBuilder().getFactory(); + const response1 = createSharedArrayForSquash("sharedArray1"); + sharedArray1 = response1.array; + dataStoreRuntime1 = response1.dataStoreRuntime; + containerRuntime1 = response1.containerRuntime; + const response2 = createSharedArrayForSquash("sharedArray2"); + sharedArray2 = response2.array; + peerOps = []; + sharedArray2.on("valueChanged", (op: ISharedArrayOperation) => { + peerOps.push(op); + }); + }); + + it("drops a single insertEntry whose value is deleted within the staging session", () => { + const secret = "SSN: 123-45-6789"; + + containerRuntime1.connected = false; + sharedArray1.insert(0, secret); + sharedArray1.delete(0); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual([...sharedArray1.get()], []); + assert.deepEqual([...sharedArray2.get()], []); + assert.deepEqual( + peerOps, + [], + "peer must observe no ops; insert+delete pair should be dropped", + ); + }); + + it("drops every entry in a sequence of insert+delete pairs", () => { + containerRuntime1.connected = false; + sharedArray1.insert(0, "secret-A"); + sharedArray1.delete(0); + sharedArray1.insert(0, "secret-B"); + sharedArray1.delete(0); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual([...sharedArray1.get()], []); + assert.deepEqual([...sharedArray2.get()], []); + assert.deepEqual(peerOps, []); + }); + + it("keeps an insertEntry whose entry remains live at commit time", () => { + containerRuntime1.connected = false; + sharedArray1.insert(0, "keep-me"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual([...sharedArray1.get()], ["keep-me"]); + assert.deepEqual([...sharedArray2.get()], ["keep-me"]); + assert.equal(peerOps.length, 1); + assert.equal(peerOps[0]?.type, 0 /* insertEntry */); + }); + + it("drops only the squashable pair; keeps unrelated staged inserts", () => { + containerRuntime1.connected = false; + sharedArray1.insert(0, "live-1"); + sharedArray1.insert(1, "secret"); + sharedArray1.delete(1); + sharedArray1.insert(1, "live-2"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual([...sharedArray1.get()], ["live-1", "live-2"]); + assert.deepEqual([...sharedArray2.get()], ["live-1", "live-2"]); + for (const op of peerOps) { + if ("value" in op) { + assert.notEqual(op.value, "secret", "staged secret must not leak"); + } + } + }); + + it("drops a move-chain ending in a delete", () => { + // Seed a non-staged entry so move targets a valid position. + sharedArray1.insert(0, "anchor"); + containerRuntimeFactory.processAllMessages(); + + containerRuntime1.connected = false; + sharedArray1.insert(1, "secret"); + // Move the staged entry to a different position (creates a new entryId for it). + sharedArray1.move(1, 0); + // Delete the moved value (now at index 0 after the move). + sharedArray1.delete(0); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual([...sharedArray1.get()], ["anchor"]); + assert.deepEqual([...sharedArray2.get()], ["anchor"]); + for (const op of peerOps) { + if ("value" in op) { + assert.notEqual(op.value, "secret", "staged secret must not leak"); + } + } + }); + + it("does not drop a pre-staging insert", () => { + // Pre-staging insert lands while connected, then disconnect, then staged insert+delete. + sharedArray1.insert(0, "pre-staging"); + containerRuntime1.connected = false; + sharedArray1.insert(1, "secret"); + sharedArray1.delete(1); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual([...sharedArray1.get()], ["pre-staging"]); + assert.deepEqual([...sharedArray2.get()], ["pre-staging"]); + for (const op of peerOps) { + if ("value" in op) { + assert.notEqual(op.value, "secret", "staged secret must not leak"); + } + } + }); + + it("delivers a staged delete of a pre-staging insert still in flight", () => { + // The pre-staging insert is submitted while connected (so it's in the runtime's + // pending queue, not yet ACKed) and the staged delete targets that same entry. + // computeSquashPlan must not consider the pre-staging insert as a chain root — + // otherwise the chain walker pulls the staged delete into drops, splices it from + // pendingOps, and the peer keeps the entry while the local view shows it deleted. + sharedArray1.insert(0, "pre-staging"); + enterStagingMode(containerRuntime1); + sharedArray1.delete(0); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual( + [...sharedArray1.get()], + [], + "local view should reflect the staged delete", + ); + assert.deepEqual( + [...sharedArray2.get()], + [], + "peer should observe the staged delete and end up with an empty array", + ); + }); +}); diff --git a/packages/dds/map/src/directory.ts b/packages/dds/map/src/directory.ts index 01f7e7fa0f07..a5cc3506ca8f 100644 --- a/packages/dds/map/src/directory.ts +++ b/packages/dds/map/src/directory.ts @@ -217,6 +217,12 @@ interface PendingKeySet { path: string; value: unknown; subdir: SubDirectory; + /** + * Back-pointer to the {@link PendingKeyLifetime} that contains this keySet. Lets the + * squash path locate the containing lifetime in O(1) given just the keySet metadata, + * avoiding an O(K) `keysets.indexOf` scan inside the resubmit loop. + */ + lifetime: PendingKeyLifetime; } interface PendingKeyDelete { @@ -694,6 +700,41 @@ export class SharedDirectory handler.resubmit(message, localOpMetadata); } + /** + * Per-op squash. Three subsumption channels: + * + * - Storage ops (set/delete/clear): another storage op on the same key that supersedes + * this one (handled by {@link SubDirectory.dropIfSubsumedByLaterStorageOp}). + * - Storage ops on a pending-deleted-or-disposed subdir: the same helper splices any + * storage op whose owning subdir is no longer reachable in the optimistic view. + * - Subdir lifecycle ops (createSubDirectory / deleteSubDirectory): a staged + * `createSubDirectory(name)` immediately followed (in any order, with no surviving + * delete in between) by a `deleteSubDirectory(name)` cancels out, so neither op + * reaches the wire — this is the path that keeps user-supplied subdirectory names off + * the wire when the pair nets to no-op. + */ + protected override reSubmitSquashed( + content: unknown, + localOpMetadata: DirectoryLocalOpMetadata, + ): void { + const message = content as IDirectoryOperation; + if (message.type === "set" || message.type === "delete" || message.type === "clear") { + const storageMetadata = localOpMetadata as StorageLocalOpMetadata; + if (storageMetadata.subdir.dropIfSubsumedByLaterStorageOp(localOpMetadata)) { + return; + } + } else if ( + message.type === "createSubDirectory" || + message.type === "deleteSubDirectory" + ) { + const subdirMetadata = localOpMetadata as SubDirLocalOpMetadata; + if (subdirMetadata.parentSubdir.dropIfSubsumedSubdirOp(subdirMetadata)) { + return; + } + } + this.reSubmitCore(content, localOpMetadata); + } + /** * {@inheritDoc @fluidframework/shared-object-base#SharedObject.loadCore} */ @@ -1084,12 +1125,25 @@ export class SharedDirectory interface ICreateSubDirLocalOpMetadata { type: "createSubDir"; parentSubdir: SubDirectory; + /** + * Back-pointer to the {@link PendingSubDirectoryCreate} entry in + * `parentSubdir.pendingSubDirectoryData` this metadata originated from. Lets squash and + * resubmit identify the exact entry by reference rather than name+type, which is + * ambiguous when multiple same-name lifecycle ops are pending (e.g. delete→create→delete). + */ + pendingEntry: PendingSubDirectoryCreate; } interface IDeleteSubDirLocalOpMetadata { type: "deleteSubDir"; subDirectory: SubDirectory | undefined; parentSubdir: SubDirectory; + /** + * Back-pointer to the {@link PendingSubDirectoryDelete} entry in + * `parentSubdir.pendingSubDirectoryData` this metadata originated from. See + * {@link ICreateSubDirLocalOpMetadata.pendingEntry}. + */ + pendingEntry: PendingSubDirectoryDelete; } type SubDirLocalOpMetadata = ICreateSubDirLocalOpMetadata | IDeleteSubDirLocalOpMetadata; @@ -1256,6 +1310,7 @@ class SubDirectory extends TypedEventEmitter implements IDirec path: this.absolutePath, value, subdir: this, + lifetime: latestPendingEntry, }; latestPendingEntry.keySets.push(pendingKeySet); @@ -1345,7 +1400,7 @@ class SubDirectory extends TypedEventEmitter implements IDirec path: this.absolutePath, type: "createSubDirectory", }; - this.submitCreateSubDirectoryMessage(op); + this.submitCreateSubDirectoryMessage(op, pendingSubDirectoryCreate); } else { // If we are detached, don't submit the op and directly commit // the subdir to _sequencedSubdirectories. @@ -1422,7 +1477,11 @@ class SubDirectory extends TypedEventEmitter implements IDirec type: "deleteSubDirectory", path: this.absolutePath, }; - this.submitDeleteSubDirectoryMessage(op, previousOptimisticSubDirectory); + this.submitDeleteSubDirectoryMessage( + op, + previousOptimisticSubDirectory, + pendingSubdirDelete, + ); this.emit("subDirectoryDeleted", subdirName, true, this); // We don't want to fully dispose the subdir tree since this is only a pending // local delete. Instead we will only emit the dispose event to reflect the @@ -2285,6 +2344,115 @@ class SubDirectory extends TypedEventEmitter implements IDirec this.directory.submitDirectoryMessage(op, localOpMetadata); } + /** + * Per-op squash decision for a single staged storage op on this subdirectory. Locates the + * op's metadata in pendingStorageData, asks whether a later pending op subsumes it, and + * either drops it (splicing the tracking out, similar to a targeted rollback) or returns + * false so the caller can perform a normal resubmit. + * + * Subsumption rules match {@link MapKernel.tryResubmitSquashedMessage}. + * + * @returns True if the op was dropped (caller should not resubmit). False if it should be + * resubmitted normally. + */ + public dropIfSubsumedByLaterStorageOp(metadata: unknown): boolean { + // Identity-based reachability check: this op was queued against THIS specific + // SubDirectory instance. If the path no longer resolves to this exact instance — + // because a pending or sequenced delete removed it, or because the staging-mode + // squash already dropped its create+delete pair, or because another client's + // concurrent create has taken over the name with a different instance — the op + // must be dropped. Resubmitting it would land the value on the wrong subdir + // (e.g. a concurrently-sequenced one with the same path) and diverge clients. + const subdirRemoved = + this.disposed || this.directory.getWorkingDirectory(this.absolutePath) !== this; + const m = metadata as StorageLocalOpMetadata; + if (m.type === "set") { + // Fast path: PendingKeySet carries a back-pointer to its containing lifetime, so the + // tip check is O(1) (no scan of pendingStorageData and no O(K) keysets.indexOf). + const lifetime = m.lifetime; + const lastKeySet = lifetime.keySets[lifetime.keySets.length - 1]; + if (subdirRemoved || lastKeySet !== m) { + const keySetIdx = lifetime.keySets.indexOf(m); + assert( + keySetIdx !== -1, + 0xcfc /* keySet must be present in its back-pointed lifetime */, + ); + lifetime.keySets.splice(keySetIdx, 1); + if (lifetime.keySets.length === 0) { + const emptyLifetimeIdx = this.pendingStorageData.indexOf(lifetime); + if (emptyLifetimeIdx !== -1) { + this.pendingStorageData.splice(emptyLifetimeIdx, 1); + } + } + return true; + } + // Tip case: need pendingStorageData order to check for later entries. + const lifetimeIdx = this.pendingStorageData.indexOf(lifetime); + assert(lifetimeIdx !== -1, 0xd00 /* lifetime must be present in pendingStorageData */); + if (this.isStorageKeySetSubsumed(lifetime, lifetime.keySets.length - 1, lifetimeIdx)) { + lifetime.keySets.length -= 1; + if (lifetime.keySets.length === 0) { + this.pendingStorageData.splice(lifetimeIdx, 1); + } + return true; + } + return false; + } + // Standalone clear or delete. + const entryIdx = this.pendingStorageData.indexOf(m); + if (entryIdx === -1) { + return false; + } + if (subdirRemoved || this.isStandaloneStorageEntrySubsumed(m, entryIdx)) { + this.pendingStorageData.splice(entryIdx, 1); + return true; + } + return false; + } + + private isStandaloneStorageEntrySubsumed( + entry: PendingClear | PendingKeyDelete, + entryIdx: number, + ): boolean { + for (let j = entryIdx + 1; j < this.pendingStorageData.length; j++) { + const later = this.pendingStorageData[j]; + assert( + later !== undefined, + 0xcff /* pendingStorageData entry must exist within bounds */, + ); + if (later.type === "clear") { + return true; + } + if (entry.type === "delete" && later.key === entry.key) { + return true; + } + } + return false; + } + + private isStorageKeySetSubsumed( + lifetime: PendingKeyLifetime, + keySetIdx: number, + lifetimeIdx: number, + ): boolean { + // A later keySet in the same lifetime always subsumes (same key, same logical region). + if (keySetIdx < lifetime.keySets.length - 1) { + return true; + } + // Otherwise, any later pendingStorageData entry that affects this key subsumes. + for (let j = lifetimeIdx + 1; j < this.pendingStorageData.length; j++) { + const later = this.pendingStorageData[j]; + assert( + later !== undefined, + 0xcfb /* pendingStorageData entry must exist within bounds */, + ); + if (later.type === "clear" || later.key === lifetime.key) { + return true; + } + } + return false; + } + /** * Submit a key message to remote clients based on a previous submit. * @param op - The map key message @@ -2314,10 +2482,14 @@ class SubDirectory extends TypedEventEmitter implements IDirec * Submit a create subdirectory operation. * @param op - The operation */ - private submitCreateSubDirectoryMessage(op: IDirectorySubDirectoryOperation): void { + private submitCreateSubDirectoryMessage( + op: IDirectorySubDirectoryOperation, + pendingEntry: PendingSubDirectoryCreate, + ): void { const localOpMetadata: ICreateSubDirLocalOpMetadata = { type: "createSubDir", parentSubdir: this, + pendingEntry, }; this.directory.submitDirectoryMessage(op, localOpMetadata); } @@ -2330,56 +2502,101 @@ class SubDirectory extends TypedEventEmitter implements IDirec private submitDeleteSubDirectoryMessage( op: IDirectorySubDirectoryOperation, subDir: SubDirectory, + pendingEntry: PendingSubDirectoryDelete, ): void { const localOpMetadata: IDeleteSubDirLocalOpMetadata = { type: "deleteSubDir", subDirectory: subDir, parentSubdir: this, + pendingEntry, }; this.directory.submitDirectoryMessage(op, localOpMetadata); } /** - * Submit a subdirectory operation again - * @param op - The operation - * @param localOpMetadata - metadata submitted with the op originally - */ + * Decide whether a staged subdirectory lifecycle op should be dropped from the squash + * output. The goal is to prevent user-supplied `subdirName` strings from reaching the + * wire when the staging-session ops cancel out — i.e. when a `createSubDirectory(name)` + * is immediately followed (in pendingSubDirectoryData) by a `deleteSubDirectory(name)`. + * + * Reference-identity over the originating pendingEntry is load-bearing: name+type alone + * mis-pairs same-name lifecycle cycles like `delete→create→delete` and can splice a + * pre-staging in-flight entry instead of the staged sibling. Callers pass the metadata + * the runtime handed back; the back-pointer to the originating pendingSubDirectoryData + * entry resolves which op is being squashed without ambiguity. + * + * Subsumption rule (per-op, mirroring `MapKernel.dropIfSubsumed`): + * + * - For a staged `createSubDirectory(name)`: walk forward from its entry. If the first + * later same-name entry is a `deleteSubDirectory(name)`, the create+delete pair nets + * to no-op; splice both so neither op reaches the wire and the matching delete call + * sees its entry gone. + * - For a staged `deleteSubDirectory(name)`: stand. It either targets a pre-existing + * subdir (no leak — the name was on the wire pre-staging) or was already spliced as + * part of a paired create (handled below). + * - For either op type: if the pendingEntry is no longer in pendingSubDirectoryData, it + * was spliced by an earlier squash decision; drop. + */ + public dropIfSubsumedSubdirOp(localOpMetadata: SubDirLocalOpMetadata): boolean { + const pendingEntry = localOpMetadata.pendingEntry; + const entryIdx = this.pendingSubDirectoryData.indexOf(pendingEntry); + if (entryIdx === -1) { + // Already spliced by a prior paired-create decision in this squash batch. + return true; + } + if (pendingEntry.type !== "createSubDirectory") { + // deleteSubDirectory stands. Only paired-create can subsume it (handled above by + // finding the entry already spliced). + return false; + } + // Look for the first later entry on the same name. If it's a delete, the pair cancels. + for (let i = entryIdx + 1; i < this.pendingSubDirectoryData.length; i++) { + const later = this.pendingSubDirectoryData[i]; + assert(later !== undefined, "pendingSubDirectoryData entry must exist within bounds"); + if (later.subdirName !== pendingEntry.subdirName) { + continue; + } + if (later.type === "deleteSubDirectory") { + // Splice the later delete first to keep the create's index stable. + this.pendingSubDirectoryData.splice(i, 1); + this.pendingSubDirectoryData.splice(entryIdx, 1); + return true; + } + // A same-name `createSubDirectory` later would require an intervening delete, but + // that delete would have appeared first in this loop. Reaching here means the data + // model has been violated. + assert(false, "createSubDirectory cannot follow createSubDirectory without a delete"); + } + return false; + } + public resubmitSubDirectoryMessage( op: IDirectorySubDirectoryOperation, localOpMetadata: SubDirLocalOpMetadata, ): void { - // Only submit the op, if we have record for it, otherwise it is possible that the older instance - // is already deleted, in which case we don't need to submit the op. + // Identify the originating pending entry by reference so we never confuse same-name + // lifecycle ops (e.g. delete→create→delete) with each other. If the entry has been + // spliced (ACKed or dropped via squash), there's nothing to resubmit. + if (!this.pendingSubDirectoryData.includes(localOpMetadata.pendingEntry)) { + return; + } if (localOpMetadata.type === "createSubDir") { - // For create operations, look specifically for createSubDirectory entries - const pendingEntry = findLast( - this.pendingSubDirectoryData, - (entry) => entry.subdirName === op.subdirName && entry.type === "createSubDirectory", - ); - if (pendingEntry !== undefined) { - assert( - pendingEntry.type === "createSubDirectory", - 0xc33 /* pending entry should be createSubDirectory */, - ); - // We should add the client id, since when reconnecting it can have a different client id. - pendingEntry.subdir.clientIds.add(this.runtime.clientId ?? "detached"); - // We also need to undelete the subdirectory tree if it was previously deleted - this.undisposeSubdirectoryTree(pendingEntry.subdir); - this.submitCreateSubDirectoryMessage(op); - } - } else if (localOpMetadata.type === "deleteSubDir") { + const pendingEntry = localOpMetadata.pendingEntry; + // We should add the client id, since when reconnecting it can have a different client id. + pendingEntry.subdir.clientIds.add(this.runtime.clientId ?? "detached"); + // We also need to undelete the subdirectory tree if it was previously deleted + this.undisposeSubdirectoryTree(pendingEntry.subdir); + this.submitCreateSubDirectoryMessage(op, pendingEntry); + } else { assert( localOpMetadata.subDirectory !== undefined, 0xc34 /* Subdirectory should exist */, ); - // For delete operations, look specifically for deleteSubDirectory entries - const pendingEntry = findLast( - this.pendingSubDirectoryData, - (entry) => entry.subdirName === op.subdirName && entry.type === "deleteSubDirectory", + this.submitDeleteSubDirectoryMessage( + op, + localOpMetadata.subDirectory, + localOpMetadata.pendingEntry, ); - if (pendingEntry !== undefined) { - this.submitDeleteSubDirectoryMessage(op, localOpMetadata.subDirectory); - } } } diff --git a/packages/dds/map/src/map.ts b/packages/dds/map/src/map.ts index a6c4ab57b938..6bb0f3dddf06 100644 --- a/packages/dds/map/src/map.ts +++ b/packages/dds/map/src/map.ts @@ -278,6 +278,15 @@ export class SharedMap extends SharedObject implements IShared this.kernel.tryResubmitMessage(content as IMapOperation, localOpMetadata); } + /** + * Per-op squash: for each staged pending op the runtime hands us, either drop it (if a later + * pending op subsumes it — same key set/deleted later, or covered by a later clear) or + * resubmit it unchanged. Pre-staging ops in flight are never touched. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.kernel.tryResubmitSquashedMessage(content as IMapOperation, localOpMetadata); + } + /** * {@inheritDoc @fluidframework/shared-object-base#SharedObjectCore.applyStashedOp} */ diff --git a/packages/dds/map/src/mapKernel.ts b/packages/dds/map/src/mapKernel.ts index af62cd315006..258a078e2dd5 100644 --- a/packages/dds/map/src/mapKernel.ts +++ b/packages/dds/map/src/mapKernel.ts @@ -74,6 +74,12 @@ export type IMapDataObjectSerialized = Record; interface PendingKeySet { type: "set"; value: ILocalValue; + /** + * Back-pointer to the {@link PendingKeyLifetime} that contains this keySet. Lets the + * squash path locate the containing lifetime in O(1) given just the keySet metadata, + * avoiding an O(K) `keysets.indexOf` scan inside the resubmit loop. + */ + lifetime: PendingKeyLifetime; } interface PendingKeyDelete { @@ -425,6 +431,7 @@ export class MapKernel { const pendingKeySet: PendingKeySet = { type: "set", value: localValue, + lifetime: latestPendingEntry, }; latestPendingEntry.keySets.push(pendingKeySet); @@ -580,6 +587,125 @@ export class MapKernel { return true; } + /** + * Per-op squash decision for a single staged pending change. Locates the change's metadata + * in pendingData, asks whether a later pending change subsumes it, and either drops it + * (splicing the tracking out of pendingData, similar to a targeted rollback) or resubmits + * it unchanged. + * + * Subsumption rules: + * + * - A `clear` is subsumed by any later `clear`. + * - A `delete` for key `k` is subsumed by any later `clear`, `delete` for `k`, or + * `lifetime` for `k`. + * - A `set` for key `k` (a keySet inside a lifetime) is subsumed by any later keySet in the + * same lifetime, or by any later `clear`, `delete` for `k`, or `lifetime` for `k`. + * + * @returns True if the op was handled (dropped or resubmitted), false if no handler is + * registered for the op type. + */ + public tryResubmitSquashedMessage(op: IMapOperation, localOpMetadata: unknown): boolean { + const handler = this.messageHandlers.get(op.type); + if (handler === undefined) { + return false; + } + if (this.dropIfSubsumed(localOpMetadata)) { + return true; + } + handler.resubmit(op, localOpMetadata as PendingLocalOpMetadata); + return true; + } + + /** + * If the pending op identified by `metadata` is subsumed by a later pending op in + * `pendingData`, splice its tracking out of pendingData and return true. Otherwise return + * false and leave pendingData untouched. + * + * Removing the tracking is necessary because the runtime drops the op from its own pending + * queue when we choose not to resubmit; without the cleanup, the kernel would later try to + * match an ACK that will never arrive. + */ + private dropIfSubsumed(metadata: unknown): boolean { + const m = metadata as PendingLocalOpMetadata; + if (m.type === "set") { + // Fast path: PendingKeySet carries a back-pointer to its containing lifetime, so the + // tip check is O(1) (no scan of pendingData and no O(K) keysets.indexOf). + const lifetime = m.lifetime; + const lastKeySet = lifetime.keySets[lifetime.keySets.length - 1]; + if (lastKeySet !== m) { + // Not the lifetime's tip — strictly subsumed by a later keySet on this key. + const keySetIdx = lifetime.keySets.indexOf(m); + assert( + keySetIdx !== -1, + 0xd02 /* keySet must be present in its back-pointed lifetime */, + ); + lifetime.keySets.splice(keySetIdx, 1); + return true; + } + // Tip case: need pendingData order to check for later entries on this key (delete, + // clear, or another lifetime). The lifetime's position is still found by linear scan, + // but only when the cheap tip check above didn't decide it. + const lifetimeIdx = this.pendingData.indexOf(lifetime); + assert(lifetimeIdx !== -1, 0xd03 /* lifetime must be present in pendingData */); + if (this.isKeySetSubsumed(lifetime, lifetime.keySets.length - 1, lifetimeIdx)) { + lifetime.keySets.length -= 1; + if (lifetime.keySets.length === 0) { + this.pendingData.splice(lifetimeIdx, 1); + } + return true; + } + return false; + } + // Standalone clear or delete — these *are* pendingData entries, so indexOf gives the + // position directly without scanning lifetime keysets. + const entryIdx = this.pendingData.indexOf(m); + if (entryIdx === -1) { + return false; + } + if (this.isStandaloneEntrySubsumed(m, entryIdx)) { + this.pendingData.splice(entryIdx, 1); + return true; + } + return false; + } + + private isStandaloneEntrySubsumed( + entry: PendingClear | PendingKeyDelete, + entryIdx: number, + ): boolean { + for (let j = entryIdx + 1; j < this.pendingData.length; j++) { + const later = this.pendingData[j]; + assert(later !== undefined, 0xd04 /* pendingData entry must exist within bounds */); + if (later.type === "clear") { + return true; + } + if (entry.type === "delete" && later.key === entry.key) { + return true; + } + } + return false; + } + + private isKeySetSubsumed( + lifetime: PendingKeyLifetime, + keySetIdx: number, + lifetimeIdx: number, + ): boolean { + // A later keySet in the same lifetime always subsumes (same key, same logical region). + if (keySetIdx < lifetime.keySets.length - 1) { + return true; + } + // Otherwise, any later pendingData entry that affects this key subsumes. + for (let j = lifetimeIdx + 1; j < this.pendingData.length; j++) { + const later = this.pendingData[j]; + assert(later !== undefined, 0xd05 /* pendingData entry must exist within bounds */); + if (later.type === "clear" || later.key === lifetime.key) { + return true; + } + } + return false; + } + public tryApplyStashedOp(op: IMapOperation): void { switch (op.type) { case "clear": { diff --git a/packages/dds/map/src/test/mocha/directory.squash.fuzz.spec.ts b/packages/dds/map/src/test/mocha/directory.squash.fuzz.spec.ts new file mode 100644 index 000000000000..9dce3143a1d0 --- /dev/null +++ b/packages/dds/map/src/test/mocha/directory.squash.fuzz.spec.ts @@ -0,0 +1,295 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import * as path from "node:path"; + +import { TypedEventEmitter } from "@fluid-internal/client-utils"; +import { + type Generator, + createWeightedAsyncGenerator, + done, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; +import { + type DDSFuzzHarnessEvents, + type SquashFuzzModel, + type SquashFuzzTestState, + createSquashFuzzSuite, +} from "@fluid-private/test-dds-utils"; +import { isFluidHandle } from "@fluidframework/runtime-utils/internal"; + +import { DirectoryFactory, type IDirectory } from "../../index.js"; + +import { _dirname } from "./dirname.cjs"; + +interface DirSetPoisonedKey { + type: "setPoisonedKey"; + path: string; + key: string; +} + +interface DirSetKey { + type: "setKey"; + path: string; + key: string; + value: string | number; +} + +interface DirDeleteKey { + type: "deleteKey"; + path: string; + key: string; +} + +interface DirClear { + type: "clear"; + path: string; +} + +interface DirCreateSub { + type: "createSub"; + parentPath: string; + name: string; +} + +interface DirDeleteSub { + type: "deleteSub"; + parentPath: string; + name: string; +} + +type SquashOp = + | DirSetPoisonedKey + | DirSetKey + | DirDeleteKey + | DirClear + | DirCreateSub + | DirDeleteSub; + +type SquashFactory = DirectoryFactory; +type FuzzState = SquashFuzzTestState; + +const keyPool = ["k0", "k1", "k2"]; +const subdirPool = ["s0", "s1", "s2"]; + +function isPoisonedHandle(value: unknown): boolean { + return ( + isFluidHandle(value) && (value as unknown as { poisoned?: unknown }).poisoned === true + ); +} + +function pickExistingPath(state: FuzzState): string { + const { random, client } = state; + let cur: IDirectory = client.channel; + for (;;) { + const subs: IDirectory[] = []; + for (const [, sub] of cur.subdirectories()) { + subs.push(sub); + } + const choice = random.pick([undefined, ...subs]); + if (choice === undefined) { + return cur.absolutePath; + } + cur = choice; + } +} + +function makeGenerator(): (state: FuzzState) => Promise { + const isInStaging = (state: FuzzState): boolean => + state.client.stagingModeStatus === "staging"; + + const setKey = async (state: FuzzState): Promise => ({ + type: "setKey", + path: pickExistingPath(state), + key: state.random.pick(keyPool), + value: state.random.pick([ + (): string => state.random.string(state.random.integer(1, 4)), + (): number => state.random.integer(0, 100), + ])(), + }); + + const setPoisoned = async (state: FuzzState): Promise => ({ + type: "setPoisonedKey", + path: pickExistingPath(state), + key: state.random.pick(keyPool), + }); + + const deleteKey = async (state: FuzzState): Promise => ({ + type: "deleteKey", + path: pickExistingPath(state), + key: state.random.pick(keyPool), + }); + + const clear = async (state: FuzzState): Promise => ({ + type: "clear", + path: pickExistingPath(state), + }); + + const createSub = async (state: FuzzState): Promise => ({ + type: "createSub", + parentPath: pickExistingPath(state), + name: state.random.pick(subdirPool), + }); + + const deleteSub = async (state: FuzzState): Promise => ({ + type: "deleteSub", + parentPath: pickExistingPath(state), + name: state.random.pick(subdirPool), + }); + + return createWeightedAsyncGenerator([ + [setKey, 6], + [setPoisoned, 4, isInStaging], + [deleteKey, 3], + [clear, 1], + [createSub, 3], + [deleteSub, 2], + ]); +} + +function findFirstPoisoned(dir: IDirectory): { path: string; key: string } | undefined { + for (const [key, value] of dir.entries()) { + if (isPoisonedHandle(value)) { + return { path: dir.absolutePath, key }; + } + } + for (const [, sub] of dir.subdirectories()) { + const found = findFirstPoisoned(sub); + if (found !== undefined) { + return found; + } + } + return undefined; +} + +function makeExitingGenerator(): Generator { + return (state): SquashOp | typeof done => { + const found = findFirstPoisoned(state.client.channel); + if (found === undefined) { + return done; + } + return { type: "deleteKey", path: found.path, key: found.key }; + }; +} + +function reducer(state: FuzzState, op: SquashOp): void { + const { client } = state; + const root = client.channel; + switch (op.type) { + case "setKey": { + const dir = root.getWorkingDirectory(op.path); + if (dir !== undefined) { + dir.set(op.key, op.value); + } + break; + } + case "setPoisonedKey": { + const dir = root.getWorkingDirectory(op.path); + if (dir !== undefined) { + dir.set(op.key, state.random.poisonedHandle()); + } + break; + } + case "deleteKey": { + const dir = root.getWorkingDirectory(op.path); + if (dir !== undefined) { + dir.delete(op.key); + } + break; + } + case "clear": { + const dir = root.getWorkingDirectory(op.path); + if (dir !== undefined) { + dir.clear(); + } + break; + } + case "createSub": { + const parent = root.getWorkingDirectory(op.parentPath); + if (parent !== undefined) { + parent.createSubDirectory(op.name); + } + break; + } + case "deleteSub": { + const parent = root.getWorkingDirectory(op.parentPath); + if (parent?.hasSubDirectory(op.name) === true) { + parent.deleteSubDirectory(op.name); + } + break; + } + default: { + break; + } + } +} + +function assertNoPoisonContent(dir: IDirectory): void { + for (const [key, value] of dir.entries()) { + assert( + !isPoisonedHandle(value), + `Poisoned handle at ${dir.absolutePath}/${key} not removed before exiting staging`, + ); + } + for (const [, sub] of dir.subdirectories()) { + assertNoPoisonContent(sub); + } +} + +const squashModel: SquashFuzzModel = { + workloadName: "directory squashing", + generatorFactory: () => takeAsync(60, makeGenerator()), + reducer, + validateConsistency: async (a, b) => { + const compare = (da: IDirectory, db: IDirectory): void => { + assert.equal(da.size, db.size); + for (const [key, vA] of da.entries()) { + const vB: unknown = db.get(key); + if (isFluidHandle(vA)) { + assert(isFluidHandle(vB)); + } else { + assert.equal(vA, vB); + } + } + const subsA: string[] = []; + const subsB: string[] = []; + for (const [n] of da.subdirectories()) subsA.push(n); + for (const [n] of db.subdirectories()) subsB.push(n); + subsA.sort(); + subsB.sort(); + assert.deepEqual(subsA, subsB); + for (const name of subsA) { + const subA = da.getSubDirectory(name); + const subB = db.getSubDirectory(name); + assert(subA !== undefined && subB !== undefined); + compare(subA, subB); + } + }; + compare(a.channel, b.channel); + }, + factory: new DirectoryFactory(), + exitingStagingModeGeneratorFactory: makeExitingGenerator, + validatePoisonedContentRemoved: (client) => assertNoPoisonContent(client.channel), +}; + +const emitter = new TypedEventEmitter(); + +describe("SharedDirectory squash fuzz", () => { + createSquashFuzzSuite(squashModel, { + validationStrategy: { type: "fixedInterval", interval: 10 }, + reconnectProbability: 0.1, + numberOfClients: 3, + clientJoinOptions: { + maxNumberOfClients: 4, + clientAddProbability: 0.05, + }, + detachedStartOptions: { numOpsBeforeAttach: 0 }, + defaultTestCount: 50, + saveFailures: { directory: path.join(_dirname, "../../src/test/results-squash-dir") }, + emitter, + stagingMode: { changeStagingModeProbability: 0.15 }, + }); +}); diff --git a/packages/dds/map/src/test/mocha/map.squash.fuzz.spec.ts b/packages/dds/map/src/test/mocha/map.squash.fuzz.spec.ts new file mode 100644 index 000000000000..122f9b31e835 --- /dev/null +++ b/packages/dds/map/src/test/mocha/map.squash.fuzz.spec.ts @@ -0,0 +1,179 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import * as path from "node:path"; + +import { TypedEventEmitter } from "@fluid-internal/client-utils"; +import { + type Generator, + createWeightedAsyncGenerator, + done, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; +import { + type DDSFuzzHarnessEvents, + type SquashFuzzModel, + type SquashFuzzTestState, + createSquashFuzzSuite, +} from "@fluid-private/test-dds-utils"; +import { isFluidHandle } from "@fluidframework/runtime-utils/internal"; + +import { MapFactory } from "../../index.js"; + +import { _dirname } from "./dirname.cjs"; + +interface SetPoisonedKey { + type: "setPoisonedKey"; + key: string; +} + +interface SetKey { + type: "setKey"; + key: string; + value: string | number; +} + +interface DeleteKey { + type: "deleteKey"; + key: string; +} + +interface ClearMap { + type: "clear"; +} + +type SquashOperation = SetPoisonedKey | SetKey | DeleteKey | ClearMap; + +type SquashFactory = MapFactory; +type FuzzState = SquashFuzzTestState; + +const keyPool = ["k0", "k1", "k2", "k3"]; + +function isPoisonedHandle(value: unknown): boolean { + return ( + isFluidHandle(value) && (value as unknown as { poisoned?: unknown }).poisoned === true + ); +} + +function makeGenerator(): (state: FuzzState) => Promise { + const isInStaging = (state: FuzzState): boolean => + state.client.stagingModeStatus === "staging"; + + const setPoisoned = async (state: FuzzState): Promise => ({ + type: "setPoisonedKey", + key: state.random.pick(keyPool), + }); + + const setKey = async (state: FuzzState): Promise => ({ + type: "setKey", + key: state.random.pick(keyPool), + value: state.random.pick([ + (): string => state.random.string(state.random.integer(1, 4)), + (): number => state.random.integer(0, 100), + ])(), + }); + + const deleteKey = async (state: FuzzState): Promise => ({ + type: "deleteKey", + key: state.random.pick(keyPool), + }); + + const clear = async (): Promise => ({ type: "clear" }); + + return createWeightedAsyncGenerator([ + [setKey, 6], + [setPoisoned, 4, isInStaging], + [deleteKey, 3], + [clear, 1], + ]); +} + +function makeExitingGenerator(): Generator { + return (state): SquashOperation | typeof done => { + const channel = state.client.channel; + for (const [key, value] of channel.entries()) { + if (isPoisonedHandle(value)) { + return { type: "deleteKey", key }; + } + } + return done; + }; +} + +function reducer(state: FuzzState, op: SquashOperation): void { + const { client } = state; + switch (op.type) { + case "setKey": { + client.channel.set(op.key, op.value); + break; + } + case "setPoisonedKey": { + client.channel.set(op.key, state.random.poisonedHandle()); + break; + } + case "deleteKey": { + client.channel.delete(op.key); + break; + } + case "clear": { + client.channel.clear(); + break; + } + default: { + break; + } + } +} + +function validatePoisonedContentRemoved(client: { + channel: ReturnType; +}): void { + for (const [key, value] of client.channel.entries()) { + assert( + !isPoisonedHandle(value), + `Poisoned handle at key "${key}" not removed before exiting staging mode`, + ); + } +} + +const squashModel: SquashFuzzModel = { + workloadName: "map squashing", + generatorFactory: () => takeAsync(60, makeGenerator()), + reducer, + validateConsistency: async (a, b) => { + assert.equal(a.channel.size, b.channel.size); + for (const [key, valueA] of a.channel.entries()) { + const valueB: unknown = b.channel.get(key); + if (isFluidHandle(valueA)) { + assert(isFluidHandle(valueB)); + } else { + assert.equal(valueA, valueB); + } + } + }, + factory: new MapFactory(), + exitingStagingModeGeneratorFactory: makeExitingGenerator, + validatePoisonedContentRemoved, +}; + +const emitter = new TypedEventEmitter(); + +describe("SharedMap squash fuzz", () => { + createSquashFuzzSuite(squashModel, { + validationStrategy: { type: "fixedInterval", interval: 10 }, + reconnectProbability: 0.1, + numberOfClients: 3, + clientJoinOptions: { + maxNumberOfClients: 4, + clientAddProbability: 0.05, + }, + detachedStartOptions: { numOpsBeforeAttach: 0 }, + defaultTestCount: 50, + saveFailures: { directory: path.join(_dirname, "../../src/test/results-squash-map") }, + emitter, + stagingMode: { changeStagingModeProbability: 0.15 }, + }); +}); diff --git a/packages/dds/map/src/test/mocha/squash.spec.ts b/packages/dds/map/src/test/mocha/squash.spec.ts new file mode 100644 index 000000000000..d9e23e739734 --- /dev/null +++ b/packages/dds/map/src/test/mocha/squash.spec.ts @@ -0,0 +1,578 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { enterStagingMode, reconnectAndSquash } from "@fluid-private/test-dds-utils"; +import { + MockContainerRuntimeFactoryForReconnection, + type MockContainerRuntimeForReconnection, + MockFluidDataStoreRuntime, + MockStorage, +} from "@fluidframework/test-runtime-utils/internal"; + +import { + type ISharedDirectory, + type ISharedMap, + SharedDirectory, + SharedMap, +} from "../../index.js"; + +interface KeyChange { + key: string; + previousValue: unknown; + newValue: unknown; +} + +describe("SharedMap squash on resubmit", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForReconnection; + let dataStoreRuntime1: MockFluidDataStoreRuntime; + let containerRuntime1: MockContainerRuntimeForReconnection; + let map1: ISharedMap; + let map2: ISharedMap; + let peerChanges: KeyChange[]; + let peerClears: number; + + beforeEach("createMapsForSquash", () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const factory = SharedMap.getFactory(); + + dataStoreRuntime1 = new MockFluidDataStoreRuntime(); + containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1); + map1 = factory.create(dataStoreRuntime1, "map1"); + map1.connect({ + deltaConnection: dataStoreRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + const dataStoreRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2); + map2 = factory.create(dataStoreRuntime2, "map2"); + map2.connect({ + deltaConnection: dataStoreRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + peerChanges = []; + peerClears = 0; + map2.on("valueChanged", (changed, local) => { + if (!local) { + peerChanges.push({ + key: changed.key, + previousValue: changed.previousValue, + newValue: map2.get(changed.key), + }); + } + }); + map2.on("clear", (local) => { + if (!local) { + peerClears++; + } + }); + }); + + it("drops intermediate set when a later delete supersedes it on the same key", () => { + const secret = "SSN: 123-45-6789"; + containerRuntime1.connected = false; + map1.set("k1", secret); + map1.delete("k1"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(map1.get("k1"), undefined); + assert.equal(map2.get("k1"), undefined); + // k1 didn't exist on the peer before the squash, so the delete is harmless either way. + // What matters: the secret must never have appeared as a newValue on the peer. + for (const change of peerChanges) { + assert.notEqual(change.newValue, secret, "secret must never leak through squash"); + } + }); + + it("collapses set-then-set to a single set with the final value", () => { + const secret = "intermediate-secret"; + const finalValue = "final"; + containerRuntime1.connected = false; + map1.set("k1", secret); + map1.set("k1", finalValue); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(map2.get("k1"), finalValue); + assert.equal(peerChanges.length, 1, "peer should see exactly one valueChanged for k1"); + assert.equal(peerChanges[0]?.key, "k1"); + assert.equal(peerChanges[0]?.newValue, finalValue); + for (const change of peerChanges) { + assert.notEqual(change.newValue, secret); + } + }); + + it("squashes independent keys to one op per key (LWW)", () => { + containerRuntime1.connected = false; + map1.set("a", "a0"); + map1.set("b", "b0"); + map1.set("a", "a1"); + map1.set("c", "c0"); + map1.set("a", "a2"); + map1.delete("b"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(map2.get("a"), "a2"); + assert.equal(map2.get("b"), undefined); + assert.equal(map2.get("c"), "c0"); + + // One event per key: a's final value, b's speculative delete (matches existing semantics + // where deletes are sent even when locally nothing changes), c's final value. + const observedKeys = peerChanges.map((c) => c.key).sort(); + assert.deepEqual(observedKeys, ["a", "b", "c"]); + assert.equal(peerChanges.find((c) => c.key === "a")?.newValue, "a2"); + assert.equal(peerChanges.find((c) => c.key === "b")?.newValue, undefined); + assert.equal(peerChanges.find((c) => c.key === "c")?.newValue, "c0"); + // None of the intermediate values should have surfaced. + for (const change of peerChanges) { + assert.notEqual(change.newValue, "a0"); + assert.notEqual(change.newValue, "a1"); + assert.notEqual(change.newValue, "b0"); + } + }); + + it("drops set-then-set-then-set chains; intermediate values never appear on the wire", () => { + containerRuntime1.connected = false; + map1.set("k", "v1"); + map1.set("k", "v2"); + map1.set("k", "v3"); + map1.set("k", "v4"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(map2.get("k"), "v4"); + const observedValues = peerChanges.map((c) => c.newValue); + assert.deepEqual(observedValues, ["v4"]); + }); + + it("emits a clear when one occurred during staging", () => { + // Pre-populate map2 via map1 with content the squashed clear should remove on the peer. + map1.set("seed", "value"); + containerRuntimeFactory.processAllMessages(); + assert.equal(map2.get("seed"), "value"); + // Reset peer observations after pre-population. + peerChanges = []; + peerClears = 0; + + containerRuntime1.connected = false; + map1.set("staging-set", "leaked"); + map1.clear(); + map1.set("after-clear", "kept"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(peerClears, 1, "peer should observe exactly one clear"); + assert.equal(map2.get("seed"), undefined); + assert.equal(map2.get("staging-set"), undefined); + assert.equal(map2.get("after-clear"), "kept"); + for (const change of peerChanges) { + assert.notEqual(change.newValue, "leaked", "pre-clear staged value must not leak"); + } + }); + + it("drops a delete that follows a clear (clear already removed the key on the peer)", () => { + map1.set("seed", "value"); + containerRuntimeFactory.processAllMessages(); + peerChanges = []; + peerClears = 0; + + containerRuntime1.connected = false; + map1.clear(); + map1.delete("seed"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(peerClears, 1); + // We expect only the clear to reach the peer; the delete is subsumed. + assert.equal(map2.get("seed"), undefined); + }); + + it("passes through a single pending set unchanged", () => { + containerRuntime1.connected = false; + map1.set("only", "value"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(map2.get("only"), "value"); + assert.deepEqual( + peerChanges.map((c) => ({ key: c.key, newValue: c.newValue })), + [{ key: "only", newValue: "value" }], + ); + }); + + it("preserves a pre-staging set still in flight when a staging set on a different key is squashed", () => { + // Submit a pre-staging set on key "a" while connected so it's in flight at the runtime + // layer but not yet ACKed when we disconnect. + map1.set("a", "pre"); + enterStagingMode(containerRuntime1); + // Staging-mode edits on a different key plus a self-subsumption pair on "a". + map1.set("b", "secret-b"); + map1.set("b", "final-b"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(map2.get("a"), "pre", "pre-staging set must still be delivered"); + assert.equal(map2.get("b"), "final-b"); + for (const change of peerChanges) { + assert.notEqual(change.newValue, "secret-b", "intermediate staging value must not leak"); + } + }); + + it("preserves a pre-staging set when a staging set on the same key is squashed against itself", () => { + // Pre-staging set on "k". Mixed-lifetime case: the pre-staging keySet and staging keySets + // share one PendingKeyLifetime in the kernel. + map1.set("k", "pre"); + enterStagingMode(containerRuntime1); + map1.set("k", "secret"); + map1.set("k", "final"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + // The pre-staging "pre" is sent then overwritten by the staging "final" (which subsumed + // "secret"). Peer's final view is "final"; "secret" never appears in a peer event. + assert.equal(map2.get("k"), "final"); + const sawPre = peerChanges.some( + (change) => change.key === "k" && change.newValue === "pre", + ); + assert.equal( + sawPre, + true, + "pre-staging set on the same key must land before the staged squash result", + ); + for (const change of peerChanges) { + assert.notEqual(change.newValue, "secret"); + } + }); +}); + +describe("SharedDirectory squash on resubmit (storage)", () => { + let containerRuntimeFactory: MockContainerRuntimeFactoryForReconnection; + let dataStoreRuntime1: MockFluidDataStoreRuntime; + let containerRuntime1: MockContainerRuntimeForReconnection; + let dir1: ISharedDirectory; + let dir2: ISharedDirectory; + let peerValueChanges: { path: string; key: string; newValue: unknown }[]; + + beforeEach("createDirectoriesForSquash", () => { + containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const factory = SharedDirectory.getFactory(); + + dataStoreRuntime1 = new MockFluidDataStoreRuntime(); + containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1); + dir1 = factory.create(dataStoreRuntime1, "dir1"); + dir1.connect({ + deltaConnection: dataStoreRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + const dataStoreRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dataStoreRuntime2); + dir2 = factory.create(dataStoreRuntime2, "dir2"); + dir2.connect({ + deltaConnection: dataStoreRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + peerValueChanges = []; + dir2.on("valueChanged", (changed, local) => { + if (!local) { + const subdir = dir2.getWorkingDirectory(changed.path); + peerValueChanges.push({ + path: changed.path, + key: changed.key, + newValue: subdir?.get(changed.key), + }); + } + }); + }); + + it("drops intermediate set when later delete supersedes it at the root", () => { + const secret = "SSN: 123-45-6789"; + containerRuntime1.connected = false; + dir1.set("k1", secret); + dir1.delete("k1"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir1.get("k1"), undefined); + assert.equal(dir2.get("k1"), undefined); + for (const change of peerValueChanges) { + assert.notEqual(change.newValue, secret, "secret must not leak through squash"); + } + }); + + it("collapses set-then-set to a single set with the final value", () => { + const secret = "intermediate"; + containerRuntime1.connected = false; + dir1.set("k", secret); + dir1.set("k", "final"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.get("k"), "final"); + for (const change of peerValueChanges) { + assert.notEqual(change.newValue, secret); + } + }); + + it("squashes storage independently per subdirectory", () => { + const subA = dir1.createSubDirectory("a"); + const subB = dir1.createSubDirectory("b"); + containerRuntimeFactory.processAllMessages(); + peerValueChanges = []; + + containerRuntime1.connected = false; + subA.set("k", "secretA"); + subA.set("k", "finalA"); + subB.set("k", "secretB"); + subB.delete("k"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + const subA2 = dir2.getWorkingDirectory("/a"); + const subB2 = dir2.getWorkingDirectory("/b"); + assert.equal(subA2?.get("k"), "finalA"); + assert.equal(subB2?.get("k"), undefined); + for (const change of peerValueChanges) { + assert.notEqual(change.newValue, "secretA"); + assert.notEqual(change.newValue, "secretB"); + } + }); + + it("emits a clear when a clear occurred during staging", () => { + dir1.set("seed", "value"); + containerRuntimeFactory.processAllMessages(); + assert.equal(dir2.get("seed"), "value"); + peerValueChanges = []; + + containerRuntime1.connected = false; + dir1.set("staging", "leaked"); + dir1.clear(); + dir1.set("after-clear", "kept"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.get("seed"), undefined); + assert.equal(dir2.get("staging"), undefined); + assert.equal(dir2.get("after-clear"), "kept"); + for (const change of peerValueChanges) { + assert.notEqual(change.newValue, "leaked"); + } + }); + + it("passes through a single pending set unchanged", () => { + containerRuntime1.connected = false; + dir1.set("only", "value"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.get("only"), "value"); + assert.equal(peerValueChanges.length, 1); + assert.equal(peerValueChanges[0]?.newValue, "value"); + }); + + it("preserves a pre-staging set still in flight when a staging set on the same key is squashed", () => { + // Mixed-lifetime case: pre-staging keySet and staging keySets share one + // PendingKeyLifetime in the kernel. + dir1.set("k", "pre"); + containerRuntime1.connected = false; + dir1.set("k", "secret"); + dir1.set("k", "final"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.get("k"), "final"); + for (const change of peerValueChanges) { + assert.notEqual(change.newValue, "secret"); + } + }); + + it("drops a staged createSubDirectory + deleteSubDirectory pair so the subdir name doesn't leak", () => { + // The subdir name itself is user-supplied content (e.g. a user id or tenant slug). + // A staged create+delete pair on a name that didn't exist pre-staging nets to no-op + // and must not transmit the name on commit. + const peerSubdirCreatedNames: string[] = []; + const peerSubdirDeletedNames: string[] = []; + dir2.on("subDirectoryCreated", (name, local) => { + if (!local) peerSubdirCreatedNames.push(name); + }); + dir2.on("subDirectoryDeleted", (name, local) => { + if (!local) peerSubdirDeletedNames.push(name); + }); + + containerRuntime1.connected = false; + dir1.createSubDirectory("secret-id"); + dir1.deleteSubDirectory("secret-id"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.getSubDirectory("secret-id"), undefined); + assert.deepEqual( + peerSubdirCreatedNames, + [], + "createSubDirectory must not reach the peer when paired with a staged delete", + ); + assert.deepEqual( + peerSubdirDeletedNames, + [], + "deleteSubDirectory must not reach the peer when paired with a staged create", + ); + }); + + it("keeps the final create when staged ops are create+delete+create on the same name", () => { + const peerSubdirCreatedNames: string[] = []; + dir2.on("subDirectoryCreated", (name, local) => { + if (!local) peerSubdirCreatedNames.push(name); + }); + + containerRuntime1.connected = false; + dir1.createSubDirectory("x"); + dir1.deleteSubDirectory("x"); + dir1.createSubDirectory("x"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.notEqual(dir2.getSubDirectory("x"), undefined, "final create should land on peer"); + assert.deepEqual( + peerSubdirCreatedNames, + ["x"], + "peer should observe exactly one createSubDirectory event", + ); + }); + + it("preserves a delete of a pre-existing subdirectory (no leak, no false subsumption)", () => { + // Pre-create + ACK so "pre-existing" exists on the peer. + dir1.createSubDirectory("pre"); + containerRuntimeFactory.processAllMessages(); + assert.notEqual(dir2.getSubDirectory("pre"), undefined); + + const peerSubdirDeletedNames: string[] = []; + dir2.on("subDirectoryDeleted", (name, local) => { + if (!local) peerSubdirDeletedNames.push(name); + }); + + containerRuntime1.connected = false; + dir1.deleteSubDirectory("pre"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.getSubDirectory("pre"), undefined); + assert.deepEqual( + peerSubdirDeletedNames, + ["pre"], + "delete of pre-existing subdir must emit", + ); + }); + + it("drops staged storage ops on a subdirectory that is also pending-deleted in staging", () => { + // Pre-create the subdirectory so the staging-mode set has a target. The pre-staging + // createSubDirectory ACK lands before staging begins. + dir1.createSubDirectory("sub"); + containerRuntimeFactory.processAllMessages(); + peerValueChanges = []; + + containerRuntime1.connected = false; + // In staging: write a secret into the subdirectory, then delete the whole subdirectory. + // The delete subsumes the set — the value must not reach the wire on commit. + const sub = dir1.getSubDirectory("sub"); + assert(sub !== undefined); + sub.set("k", "secret"); + dir1.deleteSubDirectory("sub"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal( + dir2.getSubDirectory("sub"), + undefined, + "subdirectory should be removed on peer", + ); + for (const change of peerValueChanges) { + assert.notEqual( + change.newValue, + "secret", + "staged value on a pending-deleted subdir must not leak", + ); + } + }); + + it("squashes staged delete→create→delete on a pre-existing subdir to a single delete", () => { + // Pre-create the subdir and let it ACK so it's "pre-existing" on both clients. + dir1.createSubDirectory("x"); + containerRuntimeFactory.processAllMessages(); + assert.notEqual(dir2.getSubDirectory("x"), undefined); + + const peerSubdirCreatedNames: string[] = []; + const peerSubdirDeletedNames: string[] = []; + dir2.on("subDirectoryCreated", (name, local) => { + if (!local) peerSubdirCreatedNames.push(name); + }); + dir2.on("subDirectoryDeleted", (name, local) => { + if (!local) peerSubdirDeletedNames.push(name); + }); + + // Without reference identity, the inner create would pair with the later delete and + // leave the first delete unsplicable in pendingSubDirectoryData, causing a duplicate + // delete on the wire and a 0xc31 assert on ACK. + containerRuntime1.connected = false; + dir1.deleteSubDirectory("x"); + dir1.createSubDirectory("x"); + dir1.deleteSubDirectory("x"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(dir2.getSubDirectory("x"), undefined, "subdir should be deleted on peer"); + assert.deepEqual( + peerSubdirCreatedNames, + [], + "the inner staged create must not reach the peer (it paired with the final delete)", + ); + assert.deepEqual( + peerSubdirDeletedNames, + ["x"], + "peer should observe exactly one deleteSubDirectory event", + ); + }); + + it("preserves a pre-staging createSubDirectory in flight when staged delete+create on the same name is squashed", () => { + // Pre-staging op still in flight at the runtime layer. The pre-staging create's + // PendingSubDirectoryCreate entry must never be spliced by the staged squash logic — + // only the staged entries are eligible. Without reference identity, the staged create's + // findIndex-by-name would return the pre-staging entry and incorrectly pair it with the + // staged delete, dropping the staged create from the wire and asserting 0xc33 on the + // pre-staging ACK. + const peerEvents: string[] = []; + dir2.on("subDirectoryCreated", (name, local) => { + if (!local) peerEvents.push(`created:${name}`); + }); + dir2.on("subDirectoryDeleted", (name, local) => { + if (!local) peerEvents.push(`deleted:${name}`); + }); + + dir1.createSubDirectory("y"); + enterStagingMode(containerRuntime1); + dir1.deleteSubDirectory("y"); + dir1.createSubDirectory("y"); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.notEqual( + dir2.getSubDirectory("y"), + undefined, + "the staged create should land on the peer", + ); + // The peer sees the pre-staging create, the staged delete that cancels it, and the + // staged re-create — three events in order. + assert.deepEqual( + peerEvents, + ["created:y", "deleted:y", "created:y"], + "peer should observe pre-staging create, staged delete, then staged re-create", + ); + }); +}); diff --git a/packages/dds/matrix/src/matrix.ts b/packages/dds/matrix/src/matrix.ts index 5816dc24ab84..b8a91015564d 100644 --- a/packages/dds/matrix/src/matrix.ts +++ b/packages/dds/matrix/src/matrix.ts @@ -820,7 +820,24 @@ export class SharedMatrix return client.findReconnectionPosition(segment, localSeq) + offset; } + /** + * Per-cell LWW squash: a pending setCell that has been overwritten by a later setCell for the + * same cell is dropped (its value never reaches the wire). Row/column vector ops pass + * `squash=true` to merge-tree's regeneratePendingOp so the merge-tree-internal squash applies. + */ + protected override reSubmitSquashed(incoming: unknown, localOpMetadata: unknown): void { + this.reSubmitInternal(incoming, localOpMetadata, true); + } + protected reSubmitCore(incoming: unknown, localOpMetadata: unknown): void { + this.reSubmitInternal(incoming, localOpMetadata, false); + } + + private reSubmitInternal( + incoming: unknown, + localOpMetadata: unknown, + squash: boolean, + ): void { const originalRefSeq = this.inFlightRefSeqs.shift(); assert( originalRefSeq !== undefined, @@ -846,9 +863,16 @@ export class SharedMatrix assert(local !== undefined, 0xba5 /* local operation must have a pending array */); const localSeqIndex = local.findIndex((p) => p.localSeq === localSeq); assert(localSeqIndex >= 0, 0xba6 /* local operation must have a pending entry */); + // In squash mode, only emit the latest pending set per cell. Earlier sets for the same + // cell are superseded and their values must never reach the wire. + const isLatestPendingSetForCell = localSeqIndex === local.length - 1; const [change] = local.splice(localSeqIndex, 1); assert(change.localSeq === localSeq, 0xba7 /* must match */); + if (squash && !isLatestPendingSetForCell) { + return; + } + if ( row !== undefined && col !== undefined && @@ -868,13 +892,13 @@ export class SharedMatrix switch (content.target) { case SnapshotPath.cols: { this.submitColMessage( - this.cols.regeneratePendingOp(content, localOpMetadata, false), + this.cols.regeneratePendingOp(content, localOpMetadata, squash), ); break; } case SnapshotPath.rows: { this.submitRowMessage( - this.rows.regeneratePendingOp(content, localOpMetadata, false), + this.rows.regeneratePendingOp(content, localOpMetadata, squash), ); break; } diff --git a/packages/dds/matrix/src/test/matrix.squash.fuzz.spec.ts b/packages/dds/matrix/src/test/matrix.squash.fuzz.spec.ts new file mode 100644 index 000000000000..cf0e15906b16 --- /dev/null +++ b/packages/dds/matrix/src/test/matrix.squash.fuzz.spec.ts @@ -0,0 +1,213 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import * as path from "node:path"; + +import { TypedEventEmitter } from "@fluid-internal/client-utils"; +import { + type Generator, + createWeightedAsyncGenerator, + done, + takeAsync, +} from "@fluid-private/stochastic-test-utils"; +import { + type DDSFuzzHarnessEvents, + type SquashFuzzModel, + type SquashFuzzTestState, + createSquashFuzzSuite, +} from "@fluid-private/test-dds-utils"; +import { isFluidHandle } from "@fluidframework/runtime-utils/internal"; + +import { SharedMatrix, type SharedMatrixFactory } from "../runtime.js"; + +import { _dirname } from "./dirname.cjs"; + +interface InsertRowsOp { + type: "insertRows"; + start: number; + count: number; +} +interface InsertColsOp { + type: "insertCols"; + start: number; + count: number; +} +interface SetCellOp { + type: "set"; + row: number; + col: number; + value: string | number; +} +interface SetPoisonedCellOp { + type: "setPoisoned"; + row: number; + col: number; +} +interface ClearCellOp { + type: "clearCell"; + row: number; + col: number; +} + +type SquashOp = InsertRowsOp | InsertColsOp | SetCellOp | SetPoisonedCellOp | ClearCellOp; + +type FuzzState = SquashFuzzTestState; + +function isPoisonedHandle(value: unknown): boolean { + return ( + isFluidHandle(value) && (value as unknown as { poisoned?: unknown }).poisoned === true + ); +} + +function findFirstPoisoned(channel: SharedMatrix): { row: number; col: number } | undefined { + for (let row = 0; row < channel.rowCount; row++) { + for (let col = 0; col < channel.colCount; col++) { + if (isPoisonedHandle(channel.getCell(row, col))) { + return { row, col }; + } + } + } + return undefined; +} + +function makeGenerator(): (state: FuzzState) => Promise { + const isInStaging = (state: FuzzState): boolean => + state.client.stagingModeStatus === "staging"; + const hasCells = (state: FuzzState): boolean => + state.client.channel.rowCount > 0 && state.client.channel.colCount > 0; + + const insertRows = async (state: FuzzState): Promise => ({ + type: "insertRows", + start: state.random.integer(0, state.client.channel.rowCount), + count: state.random.integer(1, 3), + }); + const insertCols = async (state: FuzzState): Promise => ({ + type: "insertCols", + start: state.random.integer(0, state.client.channel.colCount), + count: state.random.integer(1, 3), + }); + const setCell = async (state: FuzzState): Promise => ({ + type: "set", + row: state.random.integer(0, state.client.channel.rowCount - 1), + col: state.random.integer(0, state.client.channel.colCount - 1), + value: state.random.pick([ + (): string => state.random.string(state.random.integer(1, 3)), + (): number => state.random.integer(0, 100), + ])(), + }); + const setPoisoned = async (state: FuzzState): Promise => ({ + type: "setPoisoned", + row: state.random.integer(0, state.client.channel.rowCount - 1), + col: state.random.integer(0, state.client.channel.colCount - 1), + }); + const clearCell = async (state: FuzzState): Promise => ({ + type: "clearCell", + row: state.random.integer(0, state.client.channel.rowCount - 1), + col: state.random.integer(0, state.client.channel.colCount - 1), + }); + + return createWeightedAsyncGenerator([ + [insertRows, 4], + [insertCols, 4], + [setCell, 10, hasCells], + [setPoisoned, 6, (state) => isInStaging(state) && hasCells(state)], + [clearCell, 3, hasCells], + ]); +} + +function makeExitingGenerator(): Generator { + return (state): SquashOp | typeof done => { + const found = findFirstPoisoned(state.client.channel); + if (found === undefined) { + return done; + } + return { type: "clearCell", row: found.row, col: found.col }; + }; +} + +function reducer(state: FuzzState, op: SquashOp): void { + const { client } = state; + switch (op.type) { + case "insertRows": { + client.channel.insertRows(op.start, op.count); + break; + } + case "insertCols": { + client.channel.insertCols(op.start, op.count); + break; + } + case "set": { + if (op.row < client.channel.rowCount && op.col < client.channel.colCount) { + client.channel.setCell(op.row, op.col, op.value); + } + break; + } + case "setPoisoned": { + if (op.row < client.channel.rowCount && op.col < client.channel.colCount) { + client.channel.setCell(op.row, op.col, state.random.poisonedHandle()); + } + break; + } + case "clearCell": { + if (op.row < client.channel.rowCount && op.col < client.channel.colCount) { + client.channel.setCell(op.row, op.col, undefined); + } + break; + } + default: { + break; + } + } +} + +const squashModel: SquashFuzzModel = { + workloadName: "matrix squashing", + generatorFactory: () => takeAsync(60, makeGenerator()), + reducer, + validateConsistency: async (a, b) => { + assert.equal(a.channel.rowCount, b.channel.rowCount); + assert.equal(a.channel.colCount, b.channel.colCount); + for (let r = 0; r < a.channel.rowCount; r++) { + for (let c = 0; c < a.channel.colCount; c++) { + const va = a.channel.getCell(r, c); + const vb = b.channel.getCell(r, c); + if (isFluidHandle(va)) { + assert(isFluidHandle(vb)); + } else { + assert.deepEqual(va, vb); + } + } + } + }, + factory: SharedMatrix.getFactory(), + exitingStagingModeGeneratorFactory: makeExitingGenerator, + validatePoisonedContentRemoved: (client) => { + const found = findFirstPoisoned(client.channel); + assert( + found === undefined, + `Poisoned handle at (${found?.row}, ${found?.col}) not removed before exiting staging`, + ); + }, +}; + +const emitter = new TypedEventEmitter(); + +describe("SharedMatrix squash fuzz", () => { + createSquashFuzzSuite(squashModel, { + validationStrategy: { type: "fixedInterval", interval: 10 }, + reconnectProbability: 0.1, + numberOfClients: 3, + clientJoinOptions: { + maxNumberOfClients: 4, + clientAddProbability: 0.05, + }, + detachedStartOptions: { numOpsBeforeAttach: 0 }, + defaultTestCount: 50, + saveFailures: { directory: path.join(_dirname, "../../src/test/results-squash-matrix") }, + emitter, + stagingMode: { changeStagingModeProbability: 0.15 }, + }); +}); diff --git a/packages/dds/matrix/src/test/matrix.squash.spec.ts b/packages/dds/matrix/src/test/matrix.squash.spec.ts new file mode 100644 index 000000000000..dade52c4a137 --- /dev/null +++ b/packages/dds/matrix/src/test/matrix.squash.spec.ts @@ -0,0 +1,148 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { reconnectAndSquash } from "@fluid-private/test-dds-utils"; +import { + MockContainerRuntimeFactoryForReconnection, + MockFluidDataStoreRuntime, + MockStorage, +} from "@fluidframework/test-runtime-utils/internal"; + +import { extract, matrixFactory } from "./utils.js"; + +describe("SharedMatrix squash on resubmit", () => { + it("drops intermediate setCell when a later setCell supersedes it on the same cell", () => { + const containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const dataRuntime1 = new MockFluidDataStoreRuntime(); + const containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataRuntime1); + const dataRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dataRuntime2); + + const matrix1 = matrixFactory.create(dataRuntime1, "A"); + matrix1.connect({ + deltaConnection: dataRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + const matrix2 = matrixFactory.create(dataRuntime2, "B"); + matrix2.connect({ + deltaConnection: dataRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + matrix1.insertRows(0, 1); + matrix1.insertCols(0, 1); + containerRuntimeFactory.processAllMessages(); + + const peerCellValues: unknown[] = []; + matrix2.openMatrix({ + rowsChanged: () => {}, + colsChanged: () => {}, + cellsChanged: (rowStart, colStart, rowCount, colCount) => { + for (let r = rowStart; r < rowStart + rowCount; r++) { + for (let c = colStart; c < colStart + colCount; c++) { + peerCellValues.push(matrix2.getCell(r, c)); + } + } + }, + }); + + containerRuntime1.connected = false; + matrix1.setCell(0, 0, "secret"); + matrix1.setCell(0, 0, "final"); + reconnectAndSquash(containerRuntime1, dataRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual(extract(matrix1), [["final"]]); + assert.deepEqual(extract(matrix2), [["final"]]); + for (const value of peerCellValues) { + assert.notEqual(value, "secret", "intermediate cell value must not leak through squash"); + } + }); + + it("squashes setCell chains to a single final value per cell", () => { + const containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const dataRuntime1 = new MockFluidDataStoreRuntime(); + const containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataRuntime1); + const dataRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dataRuntime2); + + const matrix1 = matrixFactory.create(dataRuntime1, "A"); + matrix1.connect({ + deltaConnection: dataRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + const matrix2 = matrixFactory.create(dataRuntime2, "B"); + matrix2.connect({ + deltaConnection: dataRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + matrix1.insertRows(0, 1); + matrix1.insertCols(0, 2); + containerRuntimeFactory.processAllMessages(); + + const peerObservedValues: unknown[] = []; + matrix2.openMatrix({ + rowsChanged: () => {}, + colsChanged: () => {}, + cellsChanged: (rowStart, colStart, rowCount, colCount) => { + for (let r = rowStart; r < rowStart + rowCount; r++) { + for (let c = colStart; c < colStart + colCount; c++) { + peerObservedValues.push({ r, c, v: matrix2.getCell(r, c) }); + } + } + }, + }); + + containerRuntime1.connected = false; + matrix1.setCell(0, 0, "a0"); + matrix1.setCell(0, 1, "b0"); + matrix1.setCell(0, 0, "a1"); + matrix1.setCell(0, 0, "a2"); + matrix1.setCell(0, 1, "b1"); + reconnectAndSquash(containerRuntime1, dataRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual(extract(matrix2), [["a2", "b1"]]); + for (const obs of peerObservedValues) { + assert.notEqual(obs, undefined); + assert.notEqual((obs as { v: unknown }).v, "a0"); + assert.notEqual((obs as { v: unknown }).v, "a1"); + assert.notEqual((obs as { v: unknown }).v, "b0"); + } + }); + + it("passes through a single pending setCell unchanged", () => { + const containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection(); + const dataRuntime1 = new MockFluidDataStoreRuntime(); + const containerRuntime1 = containerRuntimeFactory.createContainerRuntime(dataRuntime1); + const dataRuntime2 = new MockFluidDataStoreRuntime(); + containerRuntimeFactory.createContainerRuntime(dataRuntime2); + + const matrix1 = matrixFactory.create(dataRuntime1, "A"); + matrix1.connect({ + deltaConnection: dataRuntime1.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + const matrix2 = matrixFactory.create(dataRuntime2, "B"); + matrix2.connect({ + deltaConnection: dataRuntime2.createDeltaConnection(), + objectStorage: new MockStorage(), + }); + + matrix1.insertRows(0, 1); + matrix1.insertCols(0, 1); + containerRuntimeFactory.processAllMessages(); + + containerRuntime1.connected = false; + matrix1.setCell(0, 0, "only"); + reconnectAndSquash(containerRuntime1, dataRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.deepEqual(extract(matrix2), [["only"]]); + }); +}); diff --git a/packages/dds/merge-tree/src/client.ts b/packages/dds/merge-tree/src/client.ts index dfffdc7cba1c..65bfde3b3feb 100644 --- a/packages/dds/merge-tree/src/client.ts +++ b/packages/dds/merge-tree/src/client.ts @@ -1183,18 +1183,48 @@ export class Client extends TypedEventEmitter { // unless the remove was local, in which case the annotate must have come // before the remove if (!isRemovedAndAcked(segment)) { - newOp = - resetOp.props === undefined - ? createAdjustRangeOp( - segmentPosition, - segmentPosition + segment.cachedLength, - resetOp.adjust, - ) - : createAnnotateRangeOp( - segmentPosition, - segmentPosition + segment.cachedLength, - resetOp.props, - ); + if ( + squash && + resetOp.props !== undefined && + segment.segmentGroups !== undefined && + segmentGroup.localSeq !== undefined + ) { + // Property-level squash: filter out keys overridden by a later staged + // annotate on this segment. If every key is overridden, drop the op + // entirely so the older value never reaches the wire. + const laterKeys = segment.segmentGroups.keysAnnotatedLaterThan( + segmentGroup.localSeq, + ); + const filteredProps: PropertySet = {}; + let kept = 0; + for (const key of Object.keys(resetOp.props)) { + if (!laterKeys.has(key)) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + filteredProps[key] = resetOp.props[key]; + kept++; + } + } + if (kept > 0) { + newOp = createAnnotateRangeOp( + segmentPosition, + segmentPosition + segment.cachedLength, + filteredProps, + ); + } + } else { + newOp = + resetOp.props === undefined + ? createAdjustRangeOp( + segmentPosition, + segmentPosition + segment.cachedLength, + resetOp.adjust, + ) + : createAnnotateRangeOp( + segmentPosition, + segmentPosition + segment.cachedLength, + resetOp.props, + ); + } } break; } @@ -1244,11 +1274,31 @@ export class Client extends TypedEventEmitter { } const segInsertOp: ISegment = segment.clone(); - const opProps = + const opProps: PropertySet | undefined = isObject(resetOp.seg) && "props" in resetOp.seg && isObject(resetOp.seg.props) ? { ...resetOp.seg.props } : undefined; - segInsertOp.properties = opProps; + if ( + squash && + opProps !== undefined && + segment.segmentGroups !== undefined && + segmentGroup.localSeq !== undefined + ) { + // Property-level squash: drop keys later overwritten by a staged annotate. + // The segment text itself stays (we're emitting the insert), but the + // per-key values it carries are filtered so superseded values never leak. + const laterKeys = segment.segmentGroups.keysAnnotatedLaterThan( + segmentGroup.localSeq, + ); + if (laterKeys.size > 0) { + for (const key of laterKeys) { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete opProps[key]; + } + } + } + segInsertOp.properties = + opProps !== undefined && Object.keys(opProps).length === 0 ? undefined : opProps; newOp = createInsertSegmentOp(segmentPosition, segInsertOp); break; } diff --git a/packages/dds/merge-tree/src/segmentGroupCollection.ts b/packages/dds/merge-tree/src/segmentGroupCollection.ts index 5456892c8080..0287ebf6fb8f 100644 --- a/packages/dds/merge-tree/src/segmentGroupCollection.ts +++ b/packages/dds/merge-tree/src/segmentGroupCollection.ts @@ -48,6 +48,42 @@ export class SegmentGroupCollection { walkList(this.segmentGroups, (sg) => segmentGroups.enqueueOnCopy(sg.data, this.segment)); } + /** + * Returns the set of property keys touched by annotate ops on this segment with a `localSeq` + * strictly greater than the given `localSeq`. Used by the squash resubmit path to filter + * out keys that have been overridden by later staged annotates — those values must not be + * carried on the wire by the older op. + * + * For each later segment-group that contains this segment, the per-segment entry in + * `previousProps` records the property values that were in effect before the annotate + * applied; its keys are therefore the keys the annotate touched. + */ + public keysAnnotatedLaterThan(localSeq: number): Set { + const keys = new Set(); + walkList(this.segmentGroups, (node) => { + const group = node.data; + if ( + group.localSeq === undefined || + group.localSeq <= localSeq || + group.previousProps === undefined + ) { + return; + } + const idx = group.segments.indexOf(this.segment); + if (idx < 0) { + return; + } + const props = group.previousProps[idx]; + if (props === undefined) { + return; + } + for (const k of Object.keys(props)) { + keys.add(k); + } + }); + return keys; + } + private enqueueOnCopy(segmentGroup: SegmentGroup, sourceSegment: ISegmentLeaf): void { this.enqueue(segmentGroup); if (segmentGroup.previousProps) { diff --git a/packages/dds/ordered-collection/api-report/ordered-collection.legacy.beta.api.md b/packages/dds/ordered-collection/api-report/ordered-collection.legacy.beta.api.md index dcdbc6a47d0a..e918725a0102 100644 --- a/packages/dds/ordered-collection/api-report/ordered-collection.legacy.beta.api.md +++ b/packages/dds/ordered-collection/api-report/ordered-collection.legacy.beta.api.md @@ -29,6 +29,7 @@ export class ConsensusOrderedCollection extends SharedObject } } + /** + * ConsensusOrderedCollection ops (add / acquire / complete / release) participate in a + * server-ordered queue. Collapsing pending ops would change the queue's observable state + * (e.g. an add+acquire pair is meaningfully different from no ops at all, since the queue + * positions are externally observable). Squash on resubmit is therefore the identity + * transform — each pending op is replayed in order via reSubmitCore. + * + * Known leak: `add` carries a serialized user value. A staging-mode sequence such as + * `add(secret) -> acquire -> complete` (or `add(secret) -> acquire -> release`) still + * transmits the `add` op on commit because identity squash replays it in order. Callers + * that need leak-free staging behavior for queue values should hold the `add` locally + * and only call `add` after committing the staging session. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata); + } + protected override processMessagesCore(messagesCollection: IRuntimeMessageCollection): void { const { envelope, local, messagesContent } = messagesCollection; for (const messageContent of messagesContent) { diff --git a/packages/dds/pact-map/src/pactMap.ts b/packages/dds/pact-map/src/pactMap.ts index ca690871d5f8..e8ff5d208df9 100644 --- a/packages/dds/pact-map/src/pactMap.ts +++ b/packages/dds/pact-map/src/pactMap.ts @@ -367,6 +367,17 @@ export class PactMapClass */ protected onDisconnect(): void {} + /** + * PactMap implements consensus over proposed values: each set is a proposal that must be + * accepted by all connected clients at the time it was sequenced. Collapsing pending + * proposals would change observable consensus semantics (and PactMap already drops a new + * set early if there is an outstanding pending proposal for the same key, so the squash + * surface is small in practice). Squash on resubmit is the identity transform. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata); + } + /** * {@inheritDoc @fluidframework/shared-object-base#SharedObjectCore.reSubmitCore} */ diff --git a/packages/dds/register-collection/api-report/register-collection.legacy.beta.api.md b/packages/dds/register-collection/api-report/register-collection.legacy.beta.api.md index 3623a0f3cfaa..6b49cb84b8bc 100644 --- a/packages/dds/register-collection/api-report/register-collection.legacy.beta.api.md +++ b/packages/dds/register-collection/api-report/register-collection.legacy.beta.api.md @@ -25,6 +25,7 @@ export class ConsensusRegisterCollectionClass extends SharedObject protected onDisconnect(): void {} + /** + * ConsensusRegisterCollection writes participate in server-side consensus and the version + * history is exposed via readVersions(). Collapsing pending writes would change observable + * semantics, so squash on resubmit is intentionally the identity transform (replay each + * pending write in order via reSubmitCore). + * + * If callers need staging-mode-like behavior with no intermediate version exposure, they + * should hold writes locally and apply only the final value when committing. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata); + } + protected override processMessagesCore(messagesCollection: IRuntimeMessageCollection): void { const { envelope, local, messagesContent } = messagesCollection; for (const messageContent of messagesContent) { diff --git a/packages/dds/sequence/src/intervalCollection.ts b/packages/dds/sequence/src/intervalCollection.ts index 0f28f3c805e5..8908ef986aec 100644 --- a/packages/dds/sequence/src/intervalCollection.ts +++ b/packages/dds/sequence/src/intervalCollection.ts @@ -11,6 +11,7 @@ import { assert, DoublyLinkedList, unreachableCase, + walkList, type ListNode, } from "@fluidframework/core-utils/internal"; import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal"; @@ -730,6 +731,35 @@ function hasEndpointChanges( return serialized.start !== undefined && serialized.end !== undefined; } +/** + * Strip everything from the property bag of a delete op's payload except the keys peers + * use to identify the target interval (the reserved `intervalId` key — read by + * `getSerializedProperties` — and `reservedRangeLabelsKey`). {@link + * IntervalCollection.ackDelete} only consumes the id, so the rest of the bag — user- + * supplied properties on the live interval at the time of the staged delete — is + * redundant on the wire and must not ride out. + */ +function stripUserPropertiesFromDeletePayload< + T extends SerializedIntervalDelta | ISerializedInterval, +>(value: T): T { + if (value.properties === undefined) { + return value; + } + const onlyIdentity: PropertySet = {}; + const intervalId = value.properties.intervalId; + if (intervalId !== undefined) { + onlyIdentity.intervalId = intervalId; + } + const labels = value.properties[reservedRangeLabelsKey]; + if (labels !== undefined) { + onlyIdentity[reservedRangeLabelsKey] = labels; + } + return { + ...value, + properties: Object.keys(onlyIdentity).length === 0 ? undefined : onlyIdentity, + }; +} + /** * {@inheritdoc IIntervalCollection} */ @@ -929,13 +959,40 @@ export class IntervalCollection const localOpMetadata = removeMetadataFromPendingChanges(maybeMetadata); + // Squash filtering on the property channel: an ADD or CHANGE that's later subsumed by + // a DELETE for the same interval is dropped entirely; per-key property overrides by + // later staged ADD/CHANGE on the same interval are stripped from this op; a DELETE + // either drops entirely (if its interval was added in the same staging batch and so + // never reached peers) or strips its property bag (ackDelete identifies the interval + // by id; the serialized payload's properties carry the live interval's full bag and + // would leak any user-supplied values). + let workingValue = value; + if (squash) { + const { id } = getSerializedProperties(value); + if (opName === "add" || opName === "change") { + if (this.hasLaterDeleteForInterval(id, localOpMetadata.localSeq)) { + clearEmptyPendingEntry(this.pending, id); + return; + } + workingValue = this.filterPropertiesForSquash(value, id, localOpMetadata.localSeq); + } else { + if (this.hasEarlierAddForInterval(id, localOpMetadata.localSeq)) { + // The interval was created within this staging batch — its paired add has + // already been dropped above, so peers never saw it. Skip the delete too. + clearEmptyPendingEntry(this.pending, id); + return; + } + workingValue = stripUserPropertiesFromDeletePayload(value); + } + } + const rebasedValue = localOpMetadata.endpointChangesNode === undefined - ? value - : this.rebaseLocalInterval(value, localOpMetadata, squash); + ? workingValue + : this.rebaseLocalInterval(workingValue, localOpMetadata, squash); if (rebasedValue === undefined) { - const { id } = getSerializedProperties(value); + const { id } = getSerializedProperties(workingValue); clearEmptyPendingEntry(this.pending, id); return; } @@ -943,6 +1000,84 @@ export class IntervalCollection this.submitDelta({ opName, value: rebasedValue as any }, localOpMetadata); } + private hasLaterDeleteForInterval(id: string, localSeq: number): boolean { + const pending = this.pending[id]; + if (pending === undefined) { + return false; + } + let found = false; + walkList(pending.local, (node) => { + if (node.data.localSeq > localSeq && node.data.type === "delete") { + found = true; + return false; + } + }); + return found; + } + + private hasEarlierAddForInterval(id: string, localSeq: number): boolean { + const pending = this.pending[id]; + if (pending === undefined) { + return false; + } + let found = false; + walkList(pending.local, (node) => { + if (node.data.localSeq < localSeq && node.data.type === "add") { + found = true; + return false; + } + }); + return found; + } + + private collectLaterPropertyKeysForInterval(id: string, localSeq: number): Set { + const keys = new Set(); + const pending = this.pending[id]; + if (pending === undefined) { + return keys; + } + walkList(pending.local, (node) => { + const md = node.data; + if ( + md.localSeq > localSeq && + (md.type === "add" || md.type === "change") && + md.propertyKeys !== undefined + ) { + for (const k of md.propertyKeys) { + keys.add(k); + } + } + }); + return keys; + } + + private filterPropertiesForSquash( + value: T, + id: string, + localSeq: number, + ): T { + if (value.properties === undefined) { + return value; + } + const laterKeys = this.collectLaterPropertyKeysForInterval(id, localSeq); + if (laterKeys.size === 0) { + return value; + } + const filtered: PropertySet = {}; + let kept = 0; + for (const key of Object.keys(value.properties)) { + if (!laterKeys.has(key)) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + filtered[key] = value.properties[key]; + kept++; + } + } + return { + ...value, + properties: kept === 0 ? undefined : filtered, + }; + } + public applyStashedOp(op: IIntervalCollectionTypeOperationValue): void { const { opName, value } = op; const { id, properties } = getSerializedProperties(value); @@ -1225,6 +1360,7 @@ export class IntervalCollection type: "add", localSeq, interval, + propertyKeys: props === undefined ? undefined : new Set(Object.keys(props)), }, ); } @@ -1353,6 +1489,7 @@ export class IntervalCollection type: "change", localSeq, interval: newInterval ?? interval, + propertyKeys: props === undefined ? undefined : new Set(Object.keys(props)), }; this.submitDelta( diff --git a/packages/dds/sequence/src/intervalCollectionMapInterfaces.ts b/packages/dds/sequence/src/intervalCollectionMapInterfaces.ts index a7950d8a9825..d98144acb856 100644 --- a/packages/dds/sequence/src/intervalCollectionMapInterfaces.ts +++ b/packages/dds/sequence/src/intervalCollectionMapInterfaces.ts @@ -24,12 +24,24 @@ export interface IntervalAddLocalMetadata { localSeq: number; endpointChangesNode?: ListNode; interval: SequenceIntervalClass; + /** + * Property keys this op submitted. Used by the squash resubmit path to detect later + * pending ops on the same interval that overrode these keys, so the older values can + * be filtered out before the op is re-emitted on commit. + */ + propertyKeys?: ReadonlySet; } export interface IntervalChangeLocalMetadata { type: typeof IntervalDeltaOpType.CHANGE; localSeq: number; endpointChangesNode?: ListNode; interval: SequenceIntervalClass; + /** + * Property keys this op submitted. Used by the squash resubmit path to detect later + * pending ops on the same interval that overrode these keys, so the older values can + * be filtered out before the op is re-emitted on commit. + */ + propertyKeys?: ReadonlySet; } export interface IntervalDeleteLocalMetadata { type: typeof IntervalDeltaOpType.DELETE; diff --git a/packages/dds/sequence/src/test/sharedString.spec.ts b/packages/dds/sequence/src/test/sharedString.spec.ts index c94f9543c30f..db73ae40c358 100644 --- a/packages/dds/sequence/src/test/sharedString.spec.ts +++ b/packages/dds/sequence/src/test/sharedString.spec.ts @@ -5,6 +5,7 @@ import { strict as assert } from "node:assert"; +import { reconnectAndSquash } from "@fluid-private/test-dds-utils"; import { AttachState } from "@fluidframework/container-definitions"; import { IChannelServices } from "@fluidframework/datastore-definitions/internal"; import { ISummaryTree } from "@fluidframework/driver-definitions"; @@ -839,6 +840,226 @@ describe("SharedString", () => { // Verify that the changes were correctly received by the second SharedString assert.equal(sharedString2.getText(), "hello friend"); }); + + describe("squash property channel", () => { + it("drops a staged annotate value overridden by a later staged annotate on the same range", async () => { + // Pre-existing text so we can annotate over it. + sharedString.insertText(0, "hello world"); + containerRuntimeFactory.processAllMessages(); + assert.equal(sharedString2.getText(), "hello world"); + + // Capture every property value the peer ever sees applied to the segment at pos 0. + const peerSeenColors: unknown[] = []; + sharedString2.on("sequenceDelta", (event) => { + if (!event.isLocal) { + for (const range of event.ranges) { + if (range.segment.properties?.color !== undefined) { + peerSeenColors.push(range.segment.properties.color); + } + } + } + }); + + // Disconnect, annotate twice with overlapping keys, reconnect with squash. + containerRuntime1.connected = false; + sharedString.annotateRange(0, 5, { color: "secret-color" }); + sharedString.annotateRange(0, 5, { color: "public-color" }); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(sharedString2.getPropertiesAtPosition(0)?.color, "public-color"); + for (const value of peerSeenColors) { + assert.notEqual(value, "secret-color", "secret color must not leak through squash"); + } + }); + + it("drops a staged insert's property value overridden by a later staged annotate", async () => { + // Capture every property value the peer ever sees on the inserted segment. + const peerSeenColors: unknown[] = []; + sharedString2.on("sequenceDelta", (event) => { + if (!event.isLocal) { + for (const range of event.ranges) { + if (range.segment.properties?.color !== undefined) { + peerSeenColors.push(range.segment.properties.color); + } + } + } + }); + + containerRuntime1.connected = false; + sharedString.insertText(0, "x", { color: "secret-color" }); + sharedString.annotateRange(0, 1, { color: "public-color" }); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + assert.equal(sharedString2.getPropertiesAtPosition(0)?.color, "public-color"); + for (const value of peerSeenColors) { + assert.notEqual( + value, + "secret-color", + "secret color on insert must not leak through squash", + ); + } + }); + + it("drops a staged interval add subsumed by a later staged delete", async () => { + sharedString.insertText(0, "hello world"); + containerRuntimeFactory.processAllMessages(); + + const collection1 = sharedString.getIntervalCollection("test"); + const collection2 = sharedString2.getIntervalCollection("test"); + + const peerSeenProps: unknown[] = []; + collection2.on("addInterval", (addedInterval) => { + if (addedInterval.properties?.color !== undefined) { + peerSeenProps.push(addedInterval.properties.color); + } + }); + collection2.on("propertyChanged", (_interval, propsDeltas) => { + if (propsDeltas?.color !== undefined) { + peerSeenProps.push(propsDeltas.color); + } + }); + + containerRuntime1.connected = false; + const interval = collection1.add({ + start: 0, + end: 5, + props: { color: "secret-color" }, + }); + collection1.removeIntervalById(interval.getIntervalId()); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + for (const value of peerSeenProps) { + assert.notEqual(value, "secret-color", "secret interval prop must not leak"); + } + }); + + it("does not leak the property bag of a staged interval add+delete pair on the wire", async () => { + // The ackDelete path on peers only consumes the interval id, so a property leak + // on a paired add+delete is invisible to peer events — capture the raw wire ops + // to assert the secret property never goes out. + sharedString.insertText(0, "hello world"); + containerRuntimeFactory.processAllMessages(); + + const collection1 = sharedString.getIntervalCollection("test"); + + const wireOps: unknown[] = []; + const originalPushMessage = + containerRuntimeFactory.pushMessage.bind(containerRuntimeFactory); + containerRuntimeFactory.pushMessage = (msg) => { + wireOps.push(JSON.parse(JSON.stringify(msg))); + originalPushMessage(msg); + }; + try { + containerRuntime1.connected = false; + const interval = collection1.add({ + start: 0, + end: 5, + props: { color: "secret-color" }, + }); + collection1.removeIntervalById(interval.getIntervalId()); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + } finally { + containerRuntimeFactory.pushMessage = originalPushMessage; + } + + const stringified = JSON.stringify(wireOps); + assert.equal( + stringified.includes("secret-color"), + false, + "secret interval prop must not appear in any wire op payload", + ); + }); + + it("scrubs the property bag from a staged delete of a pre-existing interval", async () => { + // A delete of a pre-existing (sequenced) interval is needed on the wire so peers + // remove the interval, but ackDelete identifies it by id; the serialized payload's + // property bag carries the live interval's full bag (including any staged-only + // values) and must be scrubbed. + sharedString.insertText(0, "hello world"); + containerRuntimeFactory.processAllMessages(); + const collection1 = sharedString.getIntervalCollection("test"); + const collection2 = sharedString2.getIntervalCollection("test"); + const baseInterval = collection1.add({ + start: 0, + end: 5, + props: { color: "public-base" }, + }); + const baseId = baseInterval.getIntervalId(); + containerRuntimeFactory.processAllMessages(); + assert.notEqual(collection2.getIntervalById(baseId), undefined); + + const wireOps: unknown[] = []; + const originalPushMessage = + containerRuntimeFactory.pushMessage.bind(containerRuntimeFactory); + containerRuntimeFactory.pushMessage = (msg) => { + wireOps.push(JSON.parse(JSON.stringify(msg))); + originalPushMessage(msg); + }; + try { + containerRuntime1.connected = false; + collection1.removeIntervalById(baseId); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + } finally { + containerRuntimeFactory.pushMessage = originalPushMessage; + } + + assert.equal( + collection2.getIntervalById(baseId), + undefined, + "delete should still reach the peer", + ); + const stringified = JSON.stringify(wireOps); + assert.equal( + stringified.includes("public-base"), + false, + "pre-existing interval's properties must not ride out on the staged delete payload", + ); + }); + + it("drops a staged interval change's property value overridden by a later staged change", async () => { + sharedString.insertText(0, "hello world"); + containerRuntimeFactory.processAllMessages(); + + const collection1 = sharedString.getIntervalCollection("test"); + const collection2 = sharedString2.getIntervalCollection("test"); + + const baseInterval = collection1.add({ + start: 0, + end: 5, + props: { color: "base" }, + }); + const baseId = baseInterval.getIntervalId(); + containerRuntimeFactory.processAllMessages(); + + const peerSeenColors: unknown[] = []; + collection2.on("propertyChanged", (_interval, propsDeltas) => { + if (propsDeltas?.color !== undefined) { + peerSeenColors.push(propsDeltas.color); + } + }); + + containerRuntime1.connected = false; + collection1.change(baseId, { props: { color: "secret-color" } }); + collection1.change(baseId, { props: { color: "public-color" } }); + reconnectAndSquash(containerRuntime1, dataStoreRuntime1); + containerRuntimeFactory.processAllMessages(); + + const interval2 = collection2.getIntervalById(baseId); + assert.equal(interval2?.properties?.color, "public-color"); + for (const value of peerSeenColors) { + assert.notEqual( + value, + "secret-color", + "intermediate interval prop must not leak through squash", + ); + } + }); + }); }); // revertibles are deeply test in the merge tree package diff --git a/packages/dds/task-manager/src/taskManager.ts b/packages/dds/task-manager/src/taskManager.ts index 0a9f3d9ca11a..2a68e36c7d06 100644 --- a/packages/dds/task-manager/src/taskManager.ts +++ b/packages/dds/task-manager/src/taskManager.ts @@ -672,6 +672,21 @@ export class TaskManagerClass this.connectionWatcher.emit("connect"); } + /** + * TaskManager's reSubmitCore already collapses volunteer+abandon pairs (it was designed for + * the disconnect case, where the server drops the client from queues). The same collapse is + * the correct squash for staging-mode commit: redundant pending ops are not re-emitted, and + * a still-pending volunteer is re-emitted to express the final intent. + * + * One known limitation: an in-staging abandon for a task the client previously volunteered + * to (pre-staging) is not propagated by this path, because reSubmitCore assumes the server + * has already removed the client from the queue. TaskManager's queue position is inherently + * tied to live connection state, so this is consistent with its overall design. + */ + protected override reSubmitSquashed(content: unknown, localOpMetadata: unknown): void { + this.reSubmitCore(content, localOpMetadata as number); + } + /** * Override resubmit core to avoid resubmission on reconnect. On disconnect we accept our removal from the * queues, and leave it up to the user to decide whether they want to attempt to re-enter a queue on reconnect. diff --git a/packages/dds/test-dds-utils/src/index.ts b/packages/dds/test-dds-utils/src/index.ts index b18aebe65e2e..dbf7ee9175b7 100644 --- a/packages/dds/test-dds-utils/src/index.ts +++ b/packages/dds/test-dds-utils/src/index.ts @@ -35,3 +35,4 @@ export { export type { ISnapshotSuite } from "./ddsSnapshotHarness.js"; export { createSnapshotSuite } from "./ddsSnapshotHarness.js"; export type { Client, FuzzSerializedIdCompressor } from "./clientLoading.js"; +export { enterStagingMode, reconnectAndSquash } from "./utils.js"; diff --git a/packages/dds/test-dds-utils/src/utils.ts b/packages/dds/test-dds-utils/src/utils.ts index 5f9571856ed5..9fa1906074ea 100644 --- a/packages/dds/test-dds-utils/src/utils.ts +++ b/packages/dds/test-dds-utils/src/utils.ts @@ -20,27 +20,70 @@ export function makeUnreachableCodePathProxy(name: string): T }); } +/** + * Index in the container runtime's pendingMessages queue where the staging slice begins. + * Anything at a lower index is a pre-staging op that should resubmit with `squash=false`; + * anything from this index onward is a staging op that should resubmit with `squash=true`. + */ +const stagingBoundaries = new WeakMap(); + +/** + * Disconnect a mock container runtime and record the pre-staging/staging boundary. Use this + * instead of `containerRuntime.connected = false` for tests that need a pre-staging op to + * remain in flight across the staging session — the boundary lets {@link reconnectAndSquash} + * apply `squash=true` only to the staging slice rather than every pending op. + * + * Tests that do not have a pre-staging op in flight can keep using `connected = false` + * directly; in that case the entire pending queue is staged, which is also the fallback + * behavior of `reconnectAndSquash` when no boundary has been recorded. + * + * @internal + */ +export function enterStagingMode(containerRuntime: MockContainerRuntimeForReconnection): void { + containerRuntime.connected = false; + // Setting `connected = false` flushes the outbox into pendingMessages. The current length + // is the count of pre-staging ops; everything submitted after this point is staged. + const pendingMessages = ( + containerRuntime as unknown as { readonly pendingMessages: readonly unknown[] } + ).pendingMessages; + stagingBoundaries.set(containerRuntime, pendingMessages.length); +} + +/** + * Reconnects the given containerRuntime, forcing the staging slice of pending ops to resubmit + * with `squash=true` while pre-staging ops resubmit normally with `squash=false`. The staging + * slice is determined by {@link enterStagingMode}; if it was not called, every pending op is + * treated as staged (legacy behavior — preserved for tests that don't need pre-staging + * fidelity). + * + * Used by tests that need to exercise a DDS's squash codepath end-to-end without the + * runtime-level staging-mode APIs being plumbed through the mocks. + * + * @internal + */ export function reconnectAndSquash( containerRuntime: MockContainerRuntimeForReconnection, dataStoreRuntime: MockFluidDataStoreRuntime, ): void { - // The mocks don't fully plumb squashing and/or APIs for staging mode yet. To still exercise the squashing code path, - // we patch data store runtime's resubmit to always squash while we transition to "off". - const patchReSubmit = ( - runtime: MockFluidDataStoreRuntime, - options: { squash: boolean }, - ): (() => void) => { + const stagingBoundary = stagingBoundaries.get(containerRuntime) ?? 0; + stagingBoundaries.delete(containerRuntime); + let resubmitIndex = 0; + // The mocks don't fully plumb squashing and/or APIs for staging mode yet. To still exercise + // the squashing code path, we patch the data store runtime's reSubmit so each pending op is + // resubmitted with `squash=true` iff it falls in the staging slice. + const patchReSubmit = (runtime: MockFluidDataStoreRuntime): (() => void) => { // eslint-disable-next-line @typescript-eslint/unbound-method const originalReSubmit = runtime.reSubmit; - runtime.reSubmit = (content: unknown, localOpMetadata: unknown, squash: boolean) => - originalReSubmit.call(runtime, content, localOpMetadata, options.squash); + runtime.reSubmit = (content: unknown, localOpMetadata: unknown, squash: boolean) => { + const stagedSquash = resubmitIndex >= stagingBoundary; + resubmitIndex++; + return originalReSubmit.call(runtime, content, localOpMetadata, stagedSquash); + }; return () => { runtime.reSubmit = originalReSubmit; }; }; - const cleanup = patchReSubmit(dataStoreRuntime, { - squash: true, - }); + const cleanup = patchReSubmit(dataStoreRuntime); try { containerRuntime.connected = true; } finally { diff --git a/packages/dds/tree/src/feature-libraries/chunked-forest/chunkTree.ts b/packages/dds/tree/src/feature-libraries/chunked-forest/chunkTree.ts index fa0d9f408317..796a858846c5 100644 --- a/packages/dds/tree/src/feature-libraries/chunked-forest/chunkTree.ts +++ b/packages/dds/tree/src/feature-libraries/chunked-forest/chunkTree.ts @@ -636,7 +636,7 @@ export function splitFieldAtIndex( // their own refs from chunkRange, so the slot's ref to the original needs to be released. chunk.referenceRemoved(); } - assert(remaining === 0, "nodeIndex exceeds total node count in field"); + assert(remaining === 0, 0xd07 /* nodeIndex exceeds total node count in field */); return chunks.length; } diff --git a/packages/dds/tree/src/feature-libraries/chunked-forest/chunkedForest.ts b/packages/dds/tree/src/feature-libraries/chunked-forest/chunkedForest.ts index ef450e5faf2f..f4cc6712f59f 100644 --- a/packages/dds/tree/src/feature-libraries/chunked-forest/chunkedForest.ts +++ b/packages/dds/tree/src/feature-libraries/chunked-forest/chunkedForest.ts @@ -203,7 +203,7 @@ export class ChunkedForest implements IEditableForest { this.forest.#events.emit("beforeChange"); const parent = this.getParent(); const sourceField = parent.mutableChunk.fields.get(parent.key) ?? []; - assert(source.start <= source.end, "detach range start must not exceed end"); + assert(source.start <= source.end, 0xd06 /* detach range start must not exceed end */); const policy: ChunkCompressor = { policy: this.forest.chunker, diff --git a/packages/runtime/test-runtime-utils/src/assertionShortCodesMap.ts b/packages/runtime/test-runtime-utils/src/assertionShortCodesMap.ts index 012417287d33..253e089272b2 100644 --- a/packages/runtime/test-runtime-utils/src/assertionShortCodesMap.ts +++ b/packages/runtime/test-runtime-utils/src/assertionShortCodesMap.ts @@ -1937,5 +1937,20 @@ export const shortCodeMap = { "0xcf5": "Unexpected indexOfChunkStack.length", "0xcf6": "Unexpected indexWithinChunkStack.length", "0xcf7": "Parent chunk not found in latest summary tracking", - "0xcf8": "Unexpected pending operation during revert" + "0xcf8": "Unexpected pending operation during revert", + "0xcf9": "pendingOps index in range", + "0xcfa": "moveEntry pendingOp has target", + "0xcfb": "pendingStorageData entry must exist within bounds", + "0xcfc": "keySet must be present in its back-pointed lifetime", + "0xcfd": "pendingOps index in range", + "0xcfe": "pendingOps index in range", + "0xcff": "pendingStorageData entry must exist within bounds", + "0xd00": "lifetime must be present in pendingStorageData", + "0xd01": "Pending message id missing from queue during squash", + "0xd02": "keySet must be present in its back-pointed lifetime", + "0xd03": "lifetime must be present in pendingData", + "0xd04": "pendingData entry must exist within bounds", + "0xd05": "pendingData entry must exist within bounds", + "0xd06": "detach range start must not exceed end", + "0xd07": "nodeIndex exceeds total node count in field" }; diff --git a/packages/test/local-server-stress-tests/src/baseModel.ts b/packages/test/local-server-stress-tests/src/baseModel.ts index 18862d6b978a..dba66129fbc0 100644 --- a/packages/test/local-server-stress-tests/src/baseModel.ts +++ b/packages/test/local-server-stress-tests/src/baseModel.ts @@ -62,7 +62,8 @@ const orderSequentiallyReducer = async ( export const reducer = combineReducersAsync({ enterStagingMode: async (state, op) => state.client.entryPoint.enterStagingMode(), - exitStagingMode: async (state, op) => state.client.entryPoint.exitStagingMode(op.commit), + exitStagingMode: async (state, op) => + state.client.entryPoint.exitStagingMode(op.commit, op.squash), createDataStore: async (state, op) => { const { handle } = await state.datastore.createDataStore(op.tag, op.asChild); if (op.storeHandle) { @@ -194,6 +195,9 @@ export function makeGenerator( async ({ random }) => ({ type: "exitStagingMode", commit: random.bool(), + // Exercise the squash codepath on roughly half of committing exits. Discard exits + // ignore this flag, so randomizing unconditionally is fine. + squash: random.bool(), }), 25, (state) => diff --git a/packages/test/local-server-stress-tests/src/stressDataObject.ts b/packages/test/local-server-stress-tests/src/stressDataObject.ts index 1fc7b7fb229c..556a74d5f318 100644 --- a/packages/test/local-server-stress-tests/src/stressDataObject.ts +++ b/packages/test/local-server-stress-tests/src/stressDataObject.ts @@ -26,7 +26,7 @@ import type { IChannel } from "@fluidframework/datastore-definitions/internal"; // Valid export as per package.json export map import { modifyClusterSize } from "@fluidframework/id-compressor/internal/test-utils"; import { ISharedMap, SharedMap } from "@fluidframework/map/internal"; -import type { StageControls } from "@fluidframework/runtime-definitions/internal"; +import type { StageControlsInternal } from "@fluidframework/runtime-definitions/internal"; import { RuntimeHeaders, toFluidHandleInternal } from "@fluidframework/runtime-utils/internal"; import { timeoutAwait } from "@fluidframework/test-utils/internal"; @@ -59,6 +59,12 @@ export interface EnterStagingMode { export interface ExitStagingMode { type: "exitStagingMode"; commit: boolean; + /** + * Only meaningful when `commit` is true. When true, the runtime squashes intermediate + * state out of the resubmitted ops so values inserted and then removed during the + * staging session don't reach the wire. + */ + squash: boolean; } export type StressDataObjectOperations = @@ -331,14 +337,17 @@ export class DefaultStressDataObject extends StressDataObject { this._locallyCreatedObjects.push(obj); } - private stageControls: StageControls | undefined; + private stageControls: StageControlsInternal | undefined; private readonly containerRuntimeExp = this.context.containerRuntime; public enterStagingMode(): void { assert( this.containerRuntimeExp.enterStagingMode !== undefined, "enterStagingMode must be defined", ); - this.stageControls = this.containerRuntimeExp.enterStagingMode(); + // IContainerRuntimeBase.enterStagingMode is typed as returning StageControls (beta); + // the actual returned object is the internal variant which carries the squash option + // on commitChanges. + this.stageControls = this.containerRuntimeExp.enterStagingMode() as StageControlsInternal; } public inStagingMode(): boolean { @@ -349,10 +358,10 @@ export class DefaultStressDataObject extends StressDataObject { return this.containerRuntimeExp.inStagingMode; } - public exitStagingMode(commit: boolean): void { + public exitStagingMode(commit: boolean, squash: boolean = false): void { assert(this.stageControls !== undefined, "must have staging mode controls"); if (commit) { - this.stageControls.commitChanges(); + this.stageControls.commitChanges({ squash }); } else { this.stageControls.discardChanges(); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9fa709a184cb..e402a7811264 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -7904,9 +7904,15 @@ importers: '@biomejs/biome': specifier: ~2.4.5 version: 2.4.5 + '@fluid-internal/client-utils': + specifier: workspace:~ + version: link:../../common/client-utils '@fluid-internal/mocha-test-setup': specifier: workspace:~ version: link:../../test/mocha-test-setup + '@fluid-private/stochastic-test-utils': + specifier: workspace:~ + version: link:../../test/stochastic-test-utils '@fluid-private/test-dds-utils': specifier: workspace:~ version: link:../test-dds-utils @@ -7928,6 +7934,9 @@ importers: '@fluidframework/eslint-config-fluid': specifier: catalog:eslint version: 9.0.0(@typescript-eslint/utils@8.56.1(eslint@9.39.1(jiti@2.6.1))(typescript@5.4.5))(eslint@9.39.1(jiti@2.6.1))(typescript@5.4.5) + '@fluidframework/runtime-utils': + specifier: workspace:~ + version: link:../../runtime/runtime-utils '@fluidframework/test-runtime-utils': specifier: workspace:~ version: link:../../runtime/test-runtime-utils