From 0e1896c0a1270c641a117cd4913ac51badebbe1c Mon Sep 17 00:00:00 2001 From: Kian Thompson <102998837+kian-thompson@users.noreply.github.com> Date: Tue, 12 May 2026 00:50:40 +0000 Subject: [PATCH 1/5] Add ClaimResult type and claims API to data store runtime definitions Introduces the public API surface for first-writer-wins claims: - ClaimResult = "Success" | "AlreadyClaimed" - IFluidDataStoreRuntime.trySetClaim/getClaim/hasClaim/claims (all optional) - IFluidDataStorePolicies.enableDataStoreClaims opt-in flag All members are @legacy @beta. Implementation follows in @fluidframework/datastore. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../datastore-definitions.legacy.alpha.api.md | 7 ++ .../datastore-definitions.legacy.beta.api.md | 7 ++ .../src/dataStoreRuntime.ts | 68 +++++++++++++++++++ .../datastore-definitions/src/index.ts | 1 + .../runtime-definitions.legacy.alpha.api.md | 1 + .../runtime-definitions.legacy.beta.api.md | 1 + .../src/dataStoreContext.ts | 16 +++++ 7 files changed, 101 insertions(+) diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md index fbf624137081..91ebcd77b39e 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md @@ -4,6 +4,9 @@ ```ts +// @beta @legacy +export type ClaimResult = "Success" | "AlreadyClaimed"; + // @beta @legacy export interface IChannel extends IFluidLoadable { // (undocumented) @@ -76,6 +79,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider; // (undocumented) readonly clientId: string | undefined; // (undocumented) @@ -86,7 +90,9 @@ export interface IFluidDataStoreRuntime extends IEventProvider; getAudience(): IAudience; getChannel(id: string): Promise; + getClaim?(key: string): unknown; getQuorum(): IQuorumClients; + hasClaim?(key: string): boolean; // (undocumented) readonly id: string; readonly idCompressor: IIdCompressor | undefined; @@ -104,6 +110,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider void; + trySetClaim?(key: string, value: unknown): Promise; uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; waitAttached(): Promise; } diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md index 174ec80ca5a6..f76d27db3297 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md @@ -4,6 +4,9 @@ ```ts +// @beta @legacy +export type ClaimResult = "Success" | "AlreadyClaimed"; + // @beta @legacy export interface IChannel extends IFluidLoadable { // (undocumented) @@ -76,6 +79,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider; // (undocumented) readonly clientId: string | undefined; // (undocumented) @@ -86,7 +90,9 @@ export interface IFluidDataStoreRuntime extends IEventProvider; getAudience(): IAudience; getChannel(id: string): Promise; + getClaim?(key: string): unknown; getQuorum(): IQuorumClients; + hasClaim?(key: string): boolean; // (undocumented) readonly id: string; readonly idCompressor: IIdCompressor | undefined; @@ -104,6 +110,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider void; + trySetClaim?(key: string, value: unknown): Promise; uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; waitAttached(): Promise; } diff --git a/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts b/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts index 0918a49aec34..b99cda72f109 100644 --- a/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts @@ -50,6 +50,18 @@ export interface IFluidDataStoreRuntimeEvents extends IEvent { export type IDeltaManagerErased = ErasedType<"@fluidframework/container-definitions.IDeltaManager">; +/** + * Result of attempting to set a claim on a data store via + * {@link IFluidDataStoreRuntime.trySetClaim}. + * + * - `"Success"` - this client owns the claim for the given key. + * - `"AlreadyClaimed"` - another client has already claimed the key; the + * existing value is unchanged. Claims are first-writer-wins and are + * immutable for the lifetime of the document. + * @legacy @beta + */ +export type ClaimResult = "Success" | "AlreadyClaimed"; + /** * Represents the runtime for the data store. Contains helper functions/state of the data store. * @sealed @@ -203,6 +215,62 @@ export interface IFluidDataStoreRuntime * Indicates whether the data store has uncommitted local changes. */ readonly isDirty: boolean; + + /** + * Attempt to set a first-writer-wins "claim" for the given key on this data + * store. Once a claim has been sequenced for a key, no other client can + * overwrite it for the lifetime of the document. + * + * @remarks + * Claims are intended for partner scenarios that need to wire up singleton + * components (typically a handle to a child DDS) with first-writer-wins + * semantics, instead of the last-writer-wins semantics provided by writing + * to a DDS such as `SharedDirectory`. + * + * The value is JSON-serializable. {@link @fluidframework/core-interfaces#IFluidHandle} + * instances embedded in the value are encoded the same way as handles in + * summary blobs and contribute to garbage-collection routes from this data + * store. + * + * The returned promise resolves to `"Success"` for the client whose op was + * sequenced first for the key, and `"AlreadyClaimed"` for every other + * client. A repeated call from the winning client returns `"Success"`. + * + * Optional. Implementations that do not support claims will not provide + * this method. + * + * @param key - The claim key. + * @param value - The claim value (JSON-serializable; may include handles). + * @returns A promise that resolves to the {@link ClaimResult}. + */ + trySetClaim?(key: string, value: unknown): Promise; + + /** + * Returns the value of a previously-claimed key, with embedded handles + * decoded. Returns `undefined` if the key has not (yet) been claimed. + * + * Optional. Implementations that do not support claims will not provide + * this method. + */ + getClaim?(key: string): unknown; + + /** + * Returns `true` if the given key has been sequenced as a claim on this + * data store. + * + * Optional. Implementations that do not support claims will not provide + * this method. + */ + hasClaim?(key: string): boolean; + + /** + * Read-only view of all sequenced claims on this data store, with embedded + * handles decoded. + * + * Optional. Implementations that do not support claims will not provide + * this property. + */ + readonly claims?: ReadonlyMap; } /** diff --git a/packages/runtime/datastore-definitions/src/index.ts b/packages/runtime/datastore-definitions/src/index.ts index b986216a6a0e..3bf22b58c7dc 100644 --- a/packages/runtime/datastore-definitions/src/index.ts +++ b/packages/runtime/datastore-definitions/src/index.ts @@ -24,6 +24,7 @@ export type { IFluidDataStoreRuntimeEvents, IFluidDataStoreRuntimeInternalConfig, IDeltaManagerErased, + ClaimResult, } from "./dataStoreRuntime.js"; export type { Jsonable, diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index 9f18e360b28c..86fdff8a32ca 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -188,6 +188,7 @@ export interface IFluidDataStoreFactory extends IProvideFluidDataStoreFactory { // @beta @legacy export interface IFluidDataStorePolicies { + readonly enableDataStoreClaims?: boolean; readonly readonlyInStagingMode: boolean; } diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md index 75f077ccfa54..ffa2cc01f803 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md @@ -181,6 +181,7 @@ export interface IFluidDataStoreFactory extends IProvideFluidDataStoreFactory { // @beta @legacy export interface IFluidDataStorePolicies { + readonly enableDataStoreClaims?: boolean; readonly readonlyInStagingMode: boolean; } diff --git a/packages/runtime/runtime-definitions/src/dataStoreContext.ts b/packages/runtime/runtime-definitions/src/dataStoreContext.ts index 4e514b16b38d..4b5ee3ddd184 100644 --- a/packages/runtime/runtime-definitions/src/dataStoreContext.ts +++ b/packages/runtime/runtime-definitions/src/dataStoreContext.ts @@ -377,6 +377,22 @@ export interface IFluidDataStorePolicies { * @see {@link IContainerRuntimeBase.enterStagingMode} */ readonly readonlyInStagingMode: boolean; + + /** + * When set to true, this data store enables the first-writer-wins + * "claims" feature. {@link @fluidframework/datastore-definitions#IFluidDataStoreRuntime.trySetClaim} + * (and the corresponding helpers on `DataObject`) require this flag and + * will throw a `UsageError` if it is not set. + * + * @defaultValue `false` + * + * @remarks + * The new claim op type is silently ignored by older clients that do not + * understand it. Authors enabling this flag should ensure all + * collaborating clients run a runtime that recognizes the op (otherwise + * older clients would not see the claim being set). + */ + readonly enableDataStoreClaims?: boolean; } /** From fee482dc71b2d4f7fe17472ba4683dc3d6ba8c70 Mon Sep 17 00:00:00 2001 From: Kian Thompson <102998837+kian-thompson@users.noreply.github.com> Date: Tue, 12 May 2026 00:50:55 +0000 Subject: [PATCH 2/5] Implement data store claims in FluidDataStoreRuntime Adds first-writer-wins claim support to FluidDataStoreRuntime, gated by the new IFluidDataStorePolicies.enableDataStoreClaims flag. - Adds DataStoreMessageType.Claim op type and IClaimMessage envelope. - New runtime state: sequencedClaims, wonClaims, pendingClaims, with async rehydration from a .claims summary blob. - trySetClaim awaits load, then submits a Claim op (or writes directly when detached). Local op processing classifies the writer as winner/loser. - Inbound Claim ops arriving before the snapshot load completes are buffered and drained in arrival order. - Summary persistence via getAttachSummary() and summarize() writes the .claims blob; getOutboundRoutes walks claim values for handle routes so GC sees handles inside claims. - Handle values inside claims are encoded/decoded the same way as in summary blobs (encodeHandleForSerialization / RemoteFluidObjectHandle). - dispose() resolves any outstanding pending deferreds with AlreadyClaimed. - Adds 13 unit tests covering single-client, two-client races, repeat-from- winner/loser, detached set + attach, GC routes, reconnect, feature flag off, staging mode, snapshot rehydration, summary content, and dispose. The DataStoreMessageType enum gained a new value, which breaks the current_as_old typetest; this is recorded in package.json typeValidation.broken. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../api-report/datastore.legacy.beta.api.md | 8 +- packages/runtime/datastore/package.json | 3 + .../runtime/datastore/src/dataStoreRuntime.ts | 371 +++++++++++++++- packages/runtime/datastore/src/index.ts | 1 + .../runtime/datastore/src/test/claims.spec.ts | 399 ++++++++++++++++++ .../validateDatastorePrevious.generated.ts | 1 + 6 files changed, 780 insertions(+), 3 deletions(-) create mode 100644 packages/runtime/datastore/src/test/claims.spec.ts diff --git a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md index 1ee912869d4b..f657b9d5b94f 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md @@ -9,7 +9,9 @@ export enum DataStoreMessageType { // (undocumented) Attach = "attach", // (undocumented) - ChannelOp = "op" + ChannelOp = "op", + // (undocumented) + Claim = "claim" } // @beta @legacy @@ -30,6 +32,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; // (undocumented) get clientDetails(): IClientDetails; // (undocumented) @@ -54,9 +57,11 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; + getClaim(key: string): unknown | undefined; getGCData(fullGC?: boolean): Promise; // (undocumented) getQuorum(): IQuorumClients; + hasClaim(key: string): boolean; // (undocumented) readonly id: string; // (undocumented) @@ -99,6 +104,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; + trySetClaim(key: string, value: unknown): Promise; updateUsedRoutes(usedRoutes: string[]): void; // (undocumented) uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; diff --git a/packages/runtime/datastore/package.json b/packages/runtime/datastore/package.json index 3ac7bf6ba4ff..83ca2d9bd375 100644 --- a/packages/runtime/datastore/package.json +++ b/packages/runtime/datastore/package.json @@ -160,6 +160,9 @@ "broken": { "Class_FluidDataStoreRuntime": { "forwardCompat": false + }, + "Enum_DataStoreMessageType": { + "backCompat": false } }, "entrypoint": "legacy" diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index da14c2438e5d..0ddc5eb6f5d2 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -3,7 +3,11 @@ * Licensed under the MIT License. */ -import { TypedEventEmitter, type ILayerCompatDetails } from "@fluid-internal/client-utils"; +import { + TypedEventEmitter, + type ILayerCompatDetails, + bufferToString, +} from "@fluid-internal/client-utils"; import { AttachState, type IAudience } from "@fluidframework/container-definitions"; import type { IDeltaManager } from "@fluidframework/container-definitions/internal"; import type { @@ -29,6 +33,7 @@ import type { IFluidDataStoreRuntime, IFluidDataStoreRuntimeEvents, IDeltaManagerErased, + ClaimResult, } from "@fluidframework/datastore-definitions/internal"; import { type IClientDetails, @@ -67,7 +72,11 @@ import { currentSummarizeStepPropertyName, } from "@fluidframework/runtime-definitions/internal"; import { + encodeHandleForSerialization, GCDataBuilder, + isSerializedHandle, + isFluidHandle, + RemoteFluidObjectHandle, RequestParser, SummaryTreeBuilder, addBlobToSummary, @@ -136,6 +145,33 @@ export enum DataStoreMessageType { // Creates a new channel Attach = "attach", ChannelOp = "op", + // Submits a first-writer-wins claim entry on the data store. + Claim = "claim", +} + +/** + * Wire format for a {@link DataStoreMessageType.Claim} op. + * The `value` is the JSON-serializable claim value with embedded handles + * already encoded as serialized handle markers. + * @internal + */ +export interface IClaimMessage { + key: string; + value: unknown; +} + +/** + * Well-known summary blob name used to persist sequenced claims for a data + * store. Sibling of channel subtrees in the data store's summary. + */ +const claimsBlobName = ".claims"; + +/** + * Persisted format of the claims blob. Only sequenced entries are written; + * the winner-vs-loser distinction is local state and is not persisted. + */ +interface IClaimsBlobContent { + entries: [string, unknown][]; } /** @@ -147,7 +183,8 @@ export enum DataStoreMessageType { */ export type LocalFluidDataStoreRuntimeMessage = | { type: DataStoreMessageType.ChannelOp; content: IEnvelope } - | { type: DataStoreMessageType.Attach; content: IAttachMessage }; + | { type: DataStoreMessageType.Attach; content: IAttachMessage } + | { type: DataStoreMessageType.Claim; content: IClaimMessage }; /** * @legacy @beta @@ -230,6 +267,84 @@ function isURL(str: string): boolean { } } +/** + * Recursively walks a claim value, replacing any embedded + * {@link @fluidframework/core-interfaces#IFluidHandle} with its serialized + * form, and binding the handle to the data store runtime so its graph is + * attached. + */ +function encodeHandlesInClaimValue( + value: unknown, + bind: (handle: IFluidHandle) => void, +): unknown { + if (value === null || value === undefined) { + return value; + } + if (isFluidHandle(value)) { + bind(value); + return encodeHandleForSerialization(toFluidHandleInternal(value)); + } + if (Array.isArray(value)) { + return value.map((item) => encodeHandlesInClaimValue(item, bind)); + } + if (typeof value === "object") { + const result: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + result[k] = encodeHandlesInClaimValue(v, bind); + } + return result; + } + return value; +} + +/** + * Recursively walks an encoded claim value, replacing every + * {@link ISerializedHandle} with a {@link RemoteFluidObjectHandle} bound to + * the supplied route context. + */ +function decodeHandlesInClaimValue( + value: unknown, + routeContext: IFluidHandleContext, +): unknown { + if (value === null || value === undefined || typeof value !== "object") { + return value; + } + if (isSerializedHandle(value)) { + return new RemoteFluidObjectHandle(value.url, routeContext, value.payloadPending === true); + } + if (Array.isArray(value)) { + return value.map((item) => decodeHandlesInClaimValue(item, routeContext)); + } + const result: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + result[k] = decodeHandlesInClaimValue(v, routeContext); + } + return result; +} + +/** + * Walks an encoded claim value and collects all handle URLs (outbound GC + * routes contributed by the claim). + */ +function collectClaimHandleRoutes(value: unknown, routes: string[]): void { + if (value === null || value === undefined || typeof value !== "object") { + return; + } + if (isSerializedHandle(value)) { + routes.push(value.url); + return; + } + if (Array.isArray(value)) { + for (const item of value) { + collectClaimHandleRoutes(item, routes); + } + return; + } + for (const v of Object.values(value as Record)) { + collectClaimHandleRoutes(v, routes); + } +} + const defaultPolicies: IFluidDataStorePolicies = { readonlyInStagingMode: false, }; @@ -341,6 +456,45 @@ export class FluidDataStoreRuntime // store becomes visible. private readonly pendingHandlesToMakeVisible: Set = new Set(); + /** + * Authoritative claim entries that have been sequenced. Values are stored + * in their encoded form (handles already replaced with serialized handle + * markers). Decoded on demand via `getClaim` / `claims`. + */ + private readonly sequencedClaims = new Map(); + + /** + * Keys for which this client's op was the one that won the race for a + * claim. Used to disambiguate `"Success"` vs `"AlreadyClaimed"` for + * repeat calls. This is purely local state and is not persisted in + * summaries (the loading client was, by definition, not the writer). + */ + private readonly wonClaims = new Set(); + + /** + * Outstanding `trySetClaim` calls that have submitted an op and are + * waiting for it to be sequenced. Keyed by claim key. + */ + private readonly pendingClaims = new Map>(); + + /** + * Promise that resolves once `sequencedClaims` has been hydrated from + * the base snapshot (or immediately, if there is no base snapshot). + */ + private readonly claimsLoadP: Promise | undefined; + + /** + * `true` once `sequencedClaims` has been hydrated from the base + * snapshot. Used to gate sync claim accessors. + */ + private claimsLoaded = false; + + /** + * Buffer of inbound Claim ops that arrived before `claimsLoaded` + * became `true`. Drained in arrival order once load completes. + */ + private readonly bufferedClaimOps: { content: IClaimMessage; local: boolean }[] = []; + public readonly id: string; // TODO: use something other than `any` here (breaking change) @@ -543,6 +697,14 @@ export class FluidDataStoreRuntime this.mc.config.getNumber("Fluid.Telemetry.LocalChangesTelemetryCount") ?? 10; this.minVersionForCollab = this.dataStoreContext.minVersionForCollab; + + // Kick off the (possibly-async) load of the persisted .claims blob. + // Inbound Claim ops that arrive before this completes are buffered and + // drained in arrival order; sync claim accessors return the empty + // state until the load resolves. + this.claimsLoadP = this.loadClaimsFromSnapshot().catch((error) => { + this.mc.logger.sendErrorEvent({ eventName: "ClaimsLoadFailed" }, error); + }); } /** @@ -569,10 +731,161 @@ export class FluidDataStoreRuntime } this._disposed = true; + // Resolve any outstanding claim attempts as "AlreadyClaimed" so callers + // don't hang on a disposed runtime. (We choose resolution over + // rejection to make caller error handling simpler — the caller can + // distinguish via a subsequent `hasClaim` check if needed.) + for (const [, deferred] of this.pendingClaims) { + deferred.resolve("AlreadyClaimed"); + } + this.pendingClaims.clear(); + this.emit("dispose"); this.removeAllListeners(); } + // ---------------------------------------------------------------------- + // Claims (first-writer-wins, data-store-scoped key/value entries). + // ---------------------------------------------------------------------- + + private get claimsEnabled(): boolean { + return this.policies.enableDataStoreClaims === true; + } + + private async loadClaimsFromSnapshot(): Promise { + const snapshot = this.dataStoreContext.baseSnapshot; + const blobId = snapshot?.blobs[claimsBlobName]; + if (blobId !== undefined) { + const blob = await this.dataStoreContext.storage.readBlob(blobId); + const text = bufferToString(blob, "utf8"); + const parsed = JSON.parse(text) as IClaimsBlobContent; + for (const [k, v] of parsed.entries) { + if (!this.sequencedClaims.has(k)) { + this.sequencedClaims.set(k, v); + } + } + } + this.claimsLoaded = true; + // Drain buffered ops in arrival order. + const buffered = this.bufferedClaimOps.splice(0, this.bufferedClaimOps.length); + for (const { content, local } of buffered) { + this.applyClaimOp(content, local); + } + } + + private async ensureClaimsLoaded(): Promise { + if (this.claimsLoaded) { + return; + } + await this.claimsLoadP; + } + + /** + * Apply a sequenced Claim op. First-writer-wins: + * - If the key is already in `sequencedClaims`, ignore the op. If the + * op was local, the deferred resolves to `"AlreadyClaimed"`. + * - Otherwise, write into `sequencedClaims`. If the op was local, + * record this client as the winner and resolve to `"Success"`. + */ + private applyClaimOp(content: IClaimMessage, local: boolean): void { + const { key, value } = content; + const winner = !this.sequencedClaims.has(key); + if (winner) { + this.sequencedClaims.set(key, value); + if (local) { + this.wonClaims.add(key); + } + } + if (local) { + const deferred = this.pendingClaims.get(key); + if (deferred !== undefined) { + this.pendingClaims.delete(key); + deferred.resolve(winner ? "Success" : "AlreadyClaimed"); + } + } + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.trySetClaim} + */ + public async trySetClaim(key: string, value: unknown): Promise { + this.verifyNotClosed(); + if (!this.claimsEnabled) { + throw new UsageError( + "DataStore claims are not enabled. Set the `enableDataStoreClaims` policy on the data store runtime to opt in.", + ); + } + if (this.inStagingMode) { + throw new UsageError( + "trySetClaim is not supported while the container is in staging mode.", + ); + } + if (typeof key !== "string" || key.length === 0) { + throw new UsageError("Claim key must be a non-empty string."); + } + + await this.ensureClaimsLoaded(); + + // Encode handles in `value` and bind them to this runtime's graph. + const encoded = encodeHandlesInClaimValue(value, (h) => this.bind(h)); + + // Already-sequenced winner / loser disambiguation. + if (this.sequencedClaims.has(key)) { + return this.wonClaims.has(key) ? "Success" : "AlreadyClaimed"; + } + + // Detached: write directly without an op (will be persisted by + // getAttachSummary). + if (!this.isAttached) { + this.sequencedClaims.set(key, encoded); + this.wonClaims.add(key); + return "Success"; + } + + // If a local attempt for this key is already in flight, await its + // outcome. + const existing = this.pendingClaims.get(key); + if (existing !== undefined) { + return existing.promise; + } + + const deferred = new Deferred(); + this.pendingClaims.set(key, deferred); + this.submit({ + type: DataStoreMessageType.Claim, + content: { key, value: encoded }, + }); + return deferred.promise; + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.getClaim} + */ + public getClaim(key: string): unknown | undefined { + if (!this.sequencedClaims.has(key)) { + return undefined; + } + return decodeHandlesInClaimValue(this.sequencedClaims.get(key), this.routeContext); + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.hasClaim} + */ + public hasClaim(key: string): boolean { + return this.sequencedClaims.has(key); + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.claims} + */ + public get claims(): ReadonlyMap { + const decoded = new Map(); + for (const [k, v] of this.sequencedClaims) { + decoded.set(k, decodeHandlesInClaimValue(v, this.routeContext)); + } + return decoded; + } + public async resolveHandle(request: IRequest): Promise { return this.request(request); } @@ -1014,6 +1327,17 @@ export class FluidDataStoreRuntime this.processAttachMessages(messageCollection); break; } + case DataStoreMessageType.Claim: { + for (const { contents } of messagesContent) { + const claimMessage = contents as IClaimMessage; + if (this.claimsLoaded) { + this.applyClaimOp(claimMessage, local); + } else { + this.bufferedClaimOps.push({ content: claimMessage, local }); + } + } + break; + } default: } } catch (error) { @@ -1057,6 +1381,11 @@ export class FluidDataStoreRuntime for (const [contextId] of this.contexts) { outboundRoutes.push(`${this.absolutePath}/${contextId}`); } + // Include any handle routes embedded in sequenced claim values so GC + // keeps their referents alive. + for (const value of this.sequencedClaims.values()) { + collectClaimHandleRoutes(value, outboundRoutes); + } return outboundRoutes; } @@ -1093,9 +1422,27 @@ export class FluidDataStoreRuntime summaryBuilder.addWithStats(contextId, contextSummary); }, ); + this.addClaimsBlob(summaryBuilder); return summaryBuilder.getSummaryTree(); } + /** + * Writes the persisted `.claims` blob into the supplied summary builder + * if there are any sequenced claims. The blob stores only sequenced + * entries — the local winner-vs-loser distinction (`wonClaims`) is + * per-client state and is intentionally not persisted (the loading + * client cannot have been the writer). + */ + private addClaimsBlob(summaryBuilder: SummaryTreeBuilder): void { + if (this.sequencedClaims.size === 0) { + return; + } + const content: IClaimsBlobContent = { + entries: [...this.sequencedClaims.entries()], + }; + summaryBuilder.addBlob(claimsBlobName, JSON.stringify(content)); + } + /** * Generates data used for garbage collection. This includes a list of GC nodes that represent this channel * including any of its child channel contexts. Each node has a set of outbound routes to other GC nodes in the @@ -1178,6 +1525,7 @@ export class FluidDataStoreRuntime }, ); + this.addClaimsBlob(summaryBuilder); return summaryBuilder.getSummaryTree(); } @@ -1384,6 +1732,14 @@ export class FluidDataStoreRuntime this.submit({ type, content }, localOpMetadata); break; } + case DataStoreMessageType.Claim: { + // Resubmit the claim op verbatim. The race-resolution in + // applyClaimOp correctly distinguishes Success vs + // AlreadyClaimed once the resubmitted op is sequenced. + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + this.submit({ type, content }, localOpMetadata); + break; + } default: { unreachableCase(type); } @@ -1485,6 +1841,17 @@ export class FluidDataStoreRuntime await channelContext.getChannel(); return channelContext.applyStashedOp(envelope.contents); } + case DataStoreMessageType.Claim: { + // Replay a stashed claim op locally: re-register the + // pending deferred (it will be resolved once the op is + // sequenced or resubmitted on reconnect). + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const claimMessage = content.content as IClaimMessage; + if (!this.pendingClaims.has(claimMessage.key)) { + this.pendingClaims.set(claimMessage.key, new Deferred()); + } + return; + } default: { unreachableCase(type); } diff --git a/packages/runtime/datastore/src/index.ts b/packages/runtime/datastore/src/index.ts index 881feb45ba89..688d5fa02a0b 100644 --- a/packages/runtime/datastore/src/index.ts +++ b/packages/runtime/datastore/src/index.ts @@ -7,6 +7,7 @@ export { FluidObjectHandle } from "./fluidHandle.js"; export { DataStoreMessageType, FluidDataStoreRuntime, + type IClaimMessage, type ISharedObjectRegistry, type LocalFluidDataStoreRuntimeMessage, mixinRequestHandler, diff --git a/packages/runtime/datastore/src/test/claims.spec.ts b/packages/runtime/datastore/src/test/claims.spec.ts new file mode 100644 index 000000000000..40af1ca51106 --- /dev/null +++ b/packages/runtime/datastore/src/test/claims.spec.ts @@ -0,0 +1,399 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { stringToBuffer } from "@fluid-internal/client-utils"; +import { AttachState } from "@fluidframework/container-definitions"; +import { ContainerErrorTypes } from "@fluidframework/container-definitions/internal"; +import type { IErrorBase, IFluidHandle } from "@fluidframework/core-interfaces"; +import { fluidHandleSymbol } from "@fluidframework/core-interfaces"; +import type { IFluidHandleInternal } from "@fluidframework/core-interfaces/internal"; +import { SummaryType } from "@fluidframework/driver-definitions"; +import type { ISnapshotTree } from "@fluidframework/driver-definitions/internal"; +import type { + IRuntimeMessageCollection, + IRuntimeStorageService, + ISequencedMessageEnvelope, +} from "@fluidframework/runtime-definitions/internal"; +import { + FluidHandleBase, + encodeHandleForSerialization, + toFluidHandleErased, +} from "@fluidframework/runtime-utils/internal"; +import { MockFluidDataStoreContext } from "@fluidframework/test-runtime-utils/internal"; + +import { + DataStoreMessageType, + FluidDataStoreRuntime, + type IClaimMessage, + type ISharedObjectRegistry, + type LocalFluidDataStoreRuntimeMessage, +} from "../dataStoreRuntime.js"; + +const claimsBlobName = ".claims"; + +interface SubmittedOp { + type: string; + content: unknown; +} + +class TestStorage implements IRuntimeStorageService { + public constructor(private readonly blobs: Map) {} + public async readBlob(id: string): Promise { + const b = this.blobs.get(id); + if (b === undefined) { + throw new Error(`Blob ${id} not found`); + } + return b; + } +} + +/** + * Minimal IFluidHandle stub usable in a claim value. It is "attached" so + * binding it is a no-op. + */ +class FakeHandle extends FluidHandleBase implements IFluidHandleInternal { + public readonly isAttached = true; + public constructor(public readonly absolutePath: string) { + super(); + } + public attachGraph(): void { + /* no-op */ + } + public async get(): Promise { + return undefined; + } + public get [fluidHandleSymbol](): never { + return toFluidHandleErased(this) as unknown as never; + } +} + +function makeContext(options?: { + attachState?: AttachState; + baseSnapshot?: ISnapshotTree; + blobs?: Map; +}): { + context: MockFluidDataStoreContext; + submitted: SubmittedOp[]; +} { + const ctx = new MockFluidDataStoreContext(); + ctx.attachState = options?.attachState ?? AttachState.Attached; + ctx.baseSnapshot = options?.baseSnapshot; + ctx.storage = new TestStorage(options?.blobs ?? new Map()); + const submitted: SubmittedOp[] = []; + (ctx as unknown as { submitMessage: (t: string, c: unknown) => void }).submitMessage = ( + type, + content, + ) => { + submitted.push({ type, content }); + }; + (ctx as unknown as { makeLocallyVisible: () => void }).makeLocallyVisible = () => {}; + return { context: ctx, submitted }; +} + +/** + * Yield enough microtasks for trySetClaim's async ensureClaimsLoaded / + * promise chain to submit its op. + */ +async function flush(): Promise { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + } +} + +const sharedObjectRegistry: ISharedObjectRegistry = { + get: () => undefined, +}; + +function createRuntime( + context: MockFluidDataStoreContext, + policies?: { enableDataStoreClaims?: boolean }, +): FluidDataStoreRuntime { + const effectivePolicies = policies ?? { enableDataStoreClaims: true }; + const runtime: FluidDataStoreRuntime = new FluidDataStoreRuntime( + context, + sharedObjectRegistry, + /* existing */ context.baseSnapshot !== undefined, + async () => runtime, + effectivePolicies, + ); + return runtime; +} + +function makeClaimAck(content: IClaimMessage, local: boolean): IRuntimeMessageCollection { + const envelope = { + type: DataStoreMessageType.Claim, + } as unknown as ISequencedMessageEnvelope; + return { + envelope, + local, + messagesContent: [{ contents: content, clientSequenceNumber: 1, localOpMetadata: {} }], + }; +} + +/** + * Pull the most-recently-submitted claim op off the submitted list. + */ +async function popClaim(submitted: SubmittedOp[]): Promise { + await flush(); + const op = submitted.pop(); + assert(op !== undefined, "Expected a submitted claim op"); + assert.strictEqual(op.type, DataStoreMessageType.Claim, "Expected Claim op type"); + return op.content as IClaimMessage; +} + +describe("FluidDataStoreRuntime claims", () => { + it("single-client claim resolves to Success and is exposed via getClaim/hasClaim/claims", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + const promise = runtime.trySetClaim?.("k", "v"); + assert(promise !== undefined); + // The op was submitted; round-trip it back as a local ack. + const claim = await popClaim(submitted); + runtime.processMessages(makeClaimAck(claim, true)); + assert.strictEqual(await promise, "Success"); + assert.strictEqual(runtime.hasClaim?.("k"), true); + assert.strictEqual(runtime.getClaim?.("k"), "v"); + assert.deepStrictEqual([...(runtime.claims?.entries() ?? [])], [["k", "v"]]); + }); + + it("two-client race: first writer wins; loser observes AlreadyClaimed", async () => { + // Client A + const { context: ctxA, submitted: subA } = makeContext(); + const runtimeA = createRuntime(ctxA); + // Client B + const { context: ctxB, submitted: subB } = makeContext(); + const runtimeB = createRuntime(ctxB); + + const pA = runtimeA.trySetClaim?.("k", "A"); + const pB = runtimeB.trySetClaim?.("k", "B"); + assert(pA !== undefined && pB !== undefined); + + const aOp = await popClaim(subA); + const bOp = await popClaim(subB); + + // Sequencer orders A then B; both clients see both ops. A is local on + // runtimeA, remote on runtimeB; B is local on runtimeB, remote on runtimeA. + runtimeA.processMessages(makeClaimAck(aOp, true)); + runtimeA.processMessages(makeClaimAck(bOp, false)); + runtimeB.processMessages(makeClaimAck(aOp, false)); + runtimeB.processMessages(makeClaimAck(bOp, true)); + + assert.strictEqual(await pA, "Success"); + assert.strictEqual(await pB, "AlreadyClaimed"); + assert.strictEqual(runtimeA.getClaim?.("k"), "A"); + assert.strictEqual(runtimeB.getClaim?.("k"), "A"); + }); + + it("repeated claim from the winner returns Success even with a different value", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + const p1 = runtime.trySetClaim?.("k", "v1"); + const op1 = await popClaim(submitted); + runtime.processMessages(makeClaimAck(op1, true)); + assert.strictEqual(await p1, "Success"); + + // Repeat with different value -> still Success, original value retained. + const p2 = runtime.trySetClaim?.("k", "v2"); + assert.strictEqual(await p2, "Success"); + assert.strictEqual(runtime.getClaim?.("k"), "v1"); + }); + + it("repeated claim from a loser returns AlreadyClaimed (identity, not value)", async () => { + const { context: ctxA, submitted: subA } = makeContext(); + const runtimeA = createRuntime(ctxA); + const { context: ctxB, submitted: subB } = makeContext(); + const runtimeB = createRuntime(ctxB); + + const pA = runtimeA.trySetClaim?.("k", "shared"); + const pB = runtimeB.trySetClaim?.("k", "shared"); + const aOp = await popClaim(subA); + const bOp = await popClaim(subB); + runtimeA.processMessages(makeClaimAck(aOp, true)); + runtimeA.processMessages(makeClaimAck(bOp, false)); + runtimeB.processMessages(makeClaimAck(aOp, false)); + runtimeB.processMessages(makeClaimAck(bOp, true)); + assert.strictEqual(await pA, "Success"); + assert.strictEqual(await pB, "AlreadyClaimed"); + + // Loser tries again with the same (key, value): still AlreadyClaimed. + assert.strictEqual(await runtimeB.trySetClaim?.("k", "shared"), "AlreadyClaimed"); + }); + + it("detached set resolves Success synchronously and is persisted via getAttachSummary", async () => { + const { context } = makeContext({ attachState: AttachState.Detached }); + const runtime = createRuntime(context); + assert.strictEqual(await runtime.trySetClaim?.("k", "v"), "Success"); + assert.strictEqual(runtime.hasClaim?.("k"), true); + + // Make the runtime locally visible so getAttachSummary works. + runtime.makeVisibleAndAttachGraph(); + const attachSummary = runtime.getAttachSummary(); + const claimsBlob = attachSummary.summary.tree[claimsBlobName]; + assert(claimsBlob !== undefined, "Attach summary must include .claims blob"); + assert.strictEqual(claimsBlob.type, SummaryType.Blob); + }); + + it("after detached set + attach, a remote op for the same key resolves AlreadyClaimed", async () => { + const { context, submitted } = makeContext({ attachState: AttachState.Detached }); + const runtime = createRuntime(context); + assert.strictEqual(await runtime.trySetClaim?.("k", "winner"), "Success"); + + // Simulate becoming attached. + runtime.setAttachState(AttachState.Attaching); + runtime.setAttachState(AttachState.Attached); + + // Now a remote claim for the same key arrives -> ignored. + const remoteOp: IClaimMessage = { key: "k", value: "loser" }; + runtime.processMessages(makeClaimAck(remoteOp, false)); + assert.strictEqual(runtime.getClaim?.("k"), "winner"); + assert.strictEqual(submitted.length, 0, "Detached set should not have submitted an op"); + }); + + it("GC data includes outbound routes from handles inside claim values", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + const handle = new FakeHandle("/dataStoreA/channelB") as unknown as IFluidHandle; + const p = runtime.trySetClaim?.("k", { foo: handle }); + const op = await popClaim(submitted); + runtime.processMessages(makeClaimAck(op, true)); + assert.strictEqual(await p, "Success"); + const gcData = await runtime.getGCData(); + assert(gcData.gcNodes["/"] !== undefined); + assert( + gcData.gcNodes["/"].includes("/dataStoreA/channelB"), + `Expected outbound route from claim handle, got ${JSON.stringify(gcData.gcNodes["/"])}`, + ); + }); + + it("reconnect: a pending claim that loses the race resolves to AlreadyClaimed", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + // Start a claim attempt locally. + const pLocal = runtime.trySetClaim?.("k", "local"); + const localOp = await popClaim(submitted); + + // A remote winner is sequenced first. + runtime.processMessages(makeClaimAck({ key: "k", value: "remote" }, false)); + + // Resubmit the local op (e.g. reconnect). The op processor sees the + // key is already claimed and resolves the pending deferred to + // AlreadyClaimed. + runtime.reSubmit( + DataStoreMessageType.Claim, + localOp as unknown as Record, + undefined, + false, + ); + // The resubmit emits the same op back through submit; pop it and ack as local. + const resubmitted = await popClaim(submitted); + runtime.processMessages(makeClaimAck(resubmitted, true)); + + assert.strictEqual(await pLocal, "AlreadyClaimed"); + assert.strictEqual(runtime.getClaim?.("k"), "remote"); + }); + + it("feature flag off: trySetClaim throws a UsageError", async () => { + const { context } = makeContext(); + const runtime = createRuntime(context, { enableDataStoreClaims: false }); + await assert.rejects( + async () => runtime.trySetClaim?.("k", "v"), + (e: IErrorBase) => + e.errorType === ContainerErrorTypes.usageError && + e.message.includes("DataStore claims are not enabled"), + ); + }); + + it("rehydrates sequencedClaims from a base snapshot blob", async () => { + // Build a snapshot containing the .claims blob. + const blobId = "claimsBlobId"; + const blobContent = JSON.stringify({ + entries: [ + ["a", "valueA"], + [ + "b", + encodeHandleForSerialization( + new FakeHandle("/x/y") as unknown as IFluidHandleInternal, + ), + ], + ], + }); + const blobs = new Map(); + blobs.set(blobId, stringToBuffer(blobContent, "utf8")); + const baseSnapshot: ISnapshotTree = { + blobs: { [claimsBlobName]: blobId }, + trees: {}, + }; + const { context } = makeContext({ baseSnapshot, blobs }); + const runtime = createRuntime(context); + + // Force the load to complete by awaiting trySetClaim (which awaits the load). + // (Using a key that doesn't exist; we expect the op to be submitted, but we + // don't process it — we just want the load to drain.) + const setP = runtime.trySetClaim?.("c", "valueC"); + // Allow microtasks to run. + await Promise.resolve(); + await Promise.resolve(); + + assert.strictEqual(runtime.hasClaim?.("a"), true); + assert.strictEqual(runtime.getClaim?.("a"), "valueA"); + assert.strictEqual(runtime.hasClaim?.("b"), true); + const decoded = runtime.getClaim?.("b") as { absolutePath: string }; + assert.strictEqual(decoded.absolutePath, "/x/y"); + + // Don't leave the open setP unhandled - just await it later. We attach + // a no-op handler so any rejection is observed. + setP?.catch(() => undefined); + }); + + it("staging mode: trySetClaim throws", async () => { + const { context } = makeContext(); + (context.containerRuntime as unknown as { inStagingMode: boolean }).inStagingMode = true; + const runtime = createRuntime(context); + await assert.rejects( + async () => runtime.trySetClaim?.("k", "v"), + (e: IErrorBase) => + e.errorType === ContainerErrorTypes.usageError && e.message.includes("staging"), + ); + }); + + it("summary includes the .claims blob with sequenced entries", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + const p = runtime.trySetClaim?.("k", "v"); + runtime.processMessages(makeClaimAck(await popClaim(submitted), true)); + await p; + + const summary = await runtime.summarize(true, false); + assert(summary.summary.type === SummaryType.Tree); + const blob = summary.summary.tree[claimsBlobName]; + assert(blob?.type === SummaryType.Blob); + const parsed = JSON.parse( + typeof blob.content === "string" + ? blob.content + : Buffer.from(blob.content).toString("utf8"), + ) as { entries: [string, unknown][] }; + assert.deepStrictEqual(parsed.entries, [["k", "v"]]); + }); + + it("dispose resolves outstanding pending claims as AlreadyClaimed", async () => { + const { context } = makeContext(); + const runtime = createRuntime(context); + const p = runtime.trySetClaim?.("k", "v"); + await flush(); + runtime.dispose(); + assert.strictEqual(await p, "AlreadyClaimed"); + }); + + // Reference imported types so they aren't flagged as unused. + const _typeRefs: LocalFluidDataStoreRuntimeMessage[] = []; + if (_typeRefs.length > 0) { + throw new Error("unreachable"); + } +}); diff --git a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts index bc4a808530ff..64e8f68cbd88 100644 --- a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts +++ b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts @@ -88,6 +88,7 @@ declare type old_as_current_for_Enum_DataStoreMessageType = requireAssignableTo< * typeValidation.broken: * "Enum_DataStoreMessageType": {"backCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type current_as_old_for_Enum_DataStoreMessageType = requireAssignableTo, TypeOnly> /* From 522d997e68256f08148e72c3b16456ac72f4c4f6 Mon Sep 17 00:00:00 2001 From: Kian Thompson <102998837+kian-thompson@users.noreply.github.com> Date: Tue, 12 May 2026 00:51:01 +0000 Subject: [PATCH 3/5] Add trySetClaim/getClaim/hasClaim helpers to DataObject Forwards to the underlying IFluidDataStoreRuntime claims API. Throws a UsageError if the runtime does not implement claims (e.g., the enableDataStoreClaims policy was not enabled when the data store was created). Updates the DataObject class doc-comment with guidance on when to use a claim vs. writing to root. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../api-report/aqueduct.legacy.beta.api.md | 3 + .../aqueduct/src/data-objects/dataObject.ts | 56 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md b/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md index f1e238c0f96b..9723881be7dc 100644 --- a/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md +++ b/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md @@ -52,9 +52,12 @@ export interface ContainerRuntimeFactoryWithDefaultDataStoreProps { // @beta @legacy export abstract class DataObject extends PureDataObject { + protected getClaim(key: string): unknown; protected getUninitializedErrorString(item: string): string; + protected hasClaim(key: string): boolean; initializeInternal(existing: boolean): Promise; protected get root(): ISharedDirectory; + protected trySetClaim(key: string, value: unknown): Promise; } // @beta @legacy diff --git a/packages/framework/aqueduct/src/data-objects/dataObject.ts b/packages/framework/aqueduct/src/data-objects/dataObject.ts index 02b38b713ec6..d7d1f983d851 100644 --- a/packages/framework/aqueduct/src/data-objects/dataObject.ts +++ b/packages/framework/aqueduct/src/data-objects/dataObject.ts @@ -3,11 +3,13 @@ * Licensed under the MIT License. */ +import type { ClaimResult } from "@fluidframework/datastore-definitions/internal"; import { type ISharedDirectory, MapFactory, SharedDirectory, } from "@fluidframework/map/internal"; +import { UsageError } from "@fluidframework/telemetry-utils/internal"; import { PureDataObject } from "./pureDataObject.js"; import type { DataObjectTypes } from "./types.js"; @@ -20,6 +22,21 @@ import type { DataObjectTypes } from "./types.js"; * and registering channels with the runtime any new DDS that is set on the root * will automatically be registered. * + * @remarks + * In addition to the {@link DataObject.root | root directory}, `DataObject` + * also exposes a first-writer-wins **claims** API: + * {@link DataObject.trySetClaim}, {@link DataObject.getClaim}, and + * {@link DataObject.hasClaim}. Use a claim instead of `root.set` when you + * need to wire up a singleton entry (typically a handle to a child DDS or + * data store) and want first-writer-wins semantics — once a claim is + * sequenced it can never be overwritten by another client. By contrast, + * `root.set` (and other DDS writes) use last-writer-wins semantics, so two + * clients racing to populate the same key will silently overwrite each + * other. + * + * Claims require the underlying data store runtime to have the + * `enableDataStoreClaims` policy enabled. + * * @typeParam I - The optional input types used to strongly type the data object * @legacy * @beta @@ -42,6 +59,45 @@ export abstract class DataObject< return this.internalRoot; } + /** + * Attempts to set a first-writer-wins claim on this data store. The + * promise resolves to `"Success"` if this client won the race for the + * key, or `"AlreadyClaimed"` if another client (including a previous + * incarnation of the same client) won. See {@link DataObject} class + * remarks for when to use a claim vs. writing to + * {@link DataObject.root | root}. + * + * @param key - The claim key (non-empty string). + * @param value - The JSON-serializable value to claim. May contain + * {@link @fluidframework/core-interfaces#IFluidHandle} instances; these + * are encoded the same way as handles in summary blobs and contribute + * outbound routes to garbage collection. + * @returns A promise that resolves to `"Success"` or `"AlreadyClaimed"`. + */ + protected async trySetClaim(key: string, value: unknown): Promise { + if (this.runtime.trySetClaim === undefined) { + throw new UsageError( + "The data store runtime does not support claims. Enable the `enableDataStoreClaims` policy on the data store runtime to use this API.", + ); + } + return this.runtime.trySetClaim(key, value); + } + + /** + * Returns the value of a previously-claimed key, or `undefined` if the + * key has not been claimed. Embedded handles are decoded. + */ + protected getClaim(key: string): unknown { + return this.runtime.getClaim?.(key); + } + + /** + * Returns `true` if a claim has been sequenced for the given key. + */ + protected hasClaim(key: string): boolean { + return this.runtime.hasClaim?.(key) ?? false; + } + /** * Initializes internal objects and calls initialization overrides. * Caller is responsible for ensuring this is only invoked once. From 49c256cfd37a76fad65b5fc9f020729c91e2cab3 Mon Sep 17 00:00:00 2001 From: Kian Thompson <102998837+kian-thompson@users.noreply.github.com> Date: Tue, 12 May 2026 00:51:04 +0000 Subject: [PATCH 4/5] Add changeset for data store claims feature Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .changeset/claims-dataobject.md | 34 +++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .changeset/claims-dataobject.md diff --git a/.changeset/claims-dataobject.md b/.changeset/claims-dataobject.md new file mode 100644 index 000000000000..cf7e530fa9d9 --- /dev/null +++ b/.changeset/claims-dataobject.md @@ -0,0 +1,34 @@ +--- +"@fluidframework/datastore-definitions": minor +"@fluidframework/datastore": minor +"@fluidframework/runtime-definitions": minor +"@fluidframework/aqueduct": minor +"__section": feature +--- +Add first-writer-wins claims to data store runtime and DataObject + +This adds a new "claim" primitive to `IFluidDataStoreRuntime` and `DataObject` +that lets a data store publish first-writer-wins key/value entries. A claim +is conceptually a small piece of immutable per-data-store state where the +first client to write a given key wins; concurrent writers from other clients +observe `"AlreadyClaimed"` and can branch their logic accordingly. + +New API surface (all `@legacy` `@beta`): + +- `ClaimResult = "Success" | "AlreadyClaimed"`. +- `IFluidDataStoreRuntime.trySetClaim(key, value)`, `getClaim(key)`, + `hasClaim(key)`, and `claims` (a read-only iterator). +- `IFluidDataStorePolicies.enableDataStoreClaims` opt-in flag (defaults to + off; set to `true` on the data store runtime to enable the API). +- `DataObject.trySetClaim`, `getClaim`, `hasClaim` convenience helpers that + forward to the runtime. + +Claim values may contain `IFluidHandle` instances; these are encoded the same +way as handles in summary blobs and contribute outbound routes to garbage +collection. Claims are persisted via a `.claims` summary blob on the data +store and rehydrated on subsequent loads. + +Use a claim (rather than writing to `DataObject.root`) when you specifically +need first-writer-wins semantics — for example, when multiple clients race +to designate themselves as the owner of a particular role within the data +store and only one should succeed. From 746793f08da8a092c32a499dd4703b7c4c75ae8d Mon Sep 17 00:00:00 2001 From: Kian Thompson <102998837+kian-thompson@users.noreply.github.com> Date: Mon, 18 May 2026 18:24:07 +0000 Subject: [PATCH 5/5] Add offline-aware trySetClaim returning IClaimAttempt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change trySetClaim from async Promise to a synchronous method returning a discriminated-union IClaimAttempt: | { status: 'Success' | 'AlreadyClaimed' } | { status: 'Pending'; result: Promise } Terminal cases (already-loaded + known outcome, or detached) return synchronously with no promise. Cases where the outcome is not yet known locally (claims still loading, or op submitted but not yet sequenced — including attached+disconnected) return Pending with a result promise that resolves when the outcome is determined. Dispose now rejects pending claim deferreds with UsageError instead of misleadingly resolving them to AlreadyClaimed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .changeset/claims-dataobject.md | 19 ++- .../api-report/aqueduct.legacy.beta.api.md | 2 +- .../aqueduct/src/data-objects/dataObject.ts | 30 ++-- .../datastore-definitions.legacy.alpha.api.md | 10 +- .../datastore-definitions.legacy.beta.api.md | 10 +- .../src/dataStoreRuntime.ts | 58 +++++++- .../datastore-definitions/src/index.ts | 1 + .../api-report/datastore.legacy.beta.api.md | 2 +- .../runtime/datastore/src/dataStoreRuntime.ts | 87 +++++++++--- .../runtime/datastore/src/test/claims.spec.ts | 133 +++++++++++++----- 10 files changed, 273 insertions(+), 79 deletions(-) diff --git a/.changeset/claims-dataobject.md b/.changeset/claims-dataobject.md index cf7e530fa9d9..8ee6fe3a6af0 100644 --- a/.changeset/claims-dataobject.md +++ b/.changeset/claims-dataobject.md @@ -15,9 +15,22 @@ observe `"AlreadyClaimed"` and can branch their logic accordingly. New API surface (all `@legacy` `@beta`): -- `ClaimResult = "Success" | "AlreadyClaimed"`. -- `IFluidDataStoreRuntime.trySetClaim(key, value)`, `getClaim(key)`, - `hasClaim(key)`, and `claims` (a read-only iterator). +- `ClaimResult = "Success" | "AlreadyClaimed"` — terminal sequenced + outcome of a claim attempt. +- `IClaimAttempt` — the synchronous return shape of `trySetClaim`. It is + a discriminated union on `status`: + - `{ status: "Success" | "AlreadyClaimed" }` when the outcome is + already known locally (detached, or the key was previously + sequenced). There is nothing to await. + - `{ status: "Pending"; result: Promise }` when the + outcome can't be determined locally yet — for example, the client + is attached but disconnected, the op has been submitted but not + yet sequenced, or claim state is still being hydrated from the + base snapshot. The `result` promise resolves to the final + sequenced `ClaimResult`, or rejects if the runtime is disposed + before the attempt is sequenced. +- `IFluidDataStoreRuntime.trySetClaim(key, value): IClaimAttempt`, + `getClaim(key)`, `hasClaim(key)`, and `claims` (a read-only iterator). - `IFluidDataStorePolicies.enableDataStoreClaims` opt-in flag (defaults to off; set to `true` on the data store runtime to enable the API). - `DataObject.trySetClaim`, `getClaim`, `hasClaim` convenience helpers that diff --git a/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md b/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md index 9723881be7dc..5ee98749011e 100644 --- a/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md +++ b/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md @@ -57,7 +57,7 @@ export abstract class DataObject ex protected hasClaim(key: string): boolean; initializeInternal(existing: boolean): Promise; protected get root(): ISharedDirectory; - protected trySetClaim(key: string, value: unknown): Promise; + protected trySetClaim(key: string, value: unknown): IClaimAttempt; } // @beta @legacy diff --git a/packages/framework/aqueduct/src/data-objects/dataObject.ts b/packages/framework/aqueduct/src/data-objects/dataObject.ts index d7d1f983d851..1e8946b54b8b 100644 --- a/packages/framework/aqueduct/src/data-objects/dataObject.ts +++ b/packages/framework/aqueduct/src/data-objects/dataObject.ts @@ -3,7 +3,10 @@ * Licensed under the MIT License. */ -import type { ClaimResult } from "@fluidframework/datastore-definitions/internal"; +import type { + ClaimResult, + IClaimAttempt, +} from "@fluidframework/datastore-definitions/internal"; import { type ISharedDirectory, MapFactory, @@ -60,21 +63,30 @@ export abstract class DataObject< } /** - * Attempts to set a first-writer-wins claim on this data store. The - * promise resolves to `"Success"` if this client won the race for the - * key, or `"AlreadyClaimed"` if another client (including a previous - * incarnation of the same client) won. See {@link DataObject} class - * remarks for when to use a claim vs. writing to - * {@link DataObject.root | root}. + * Attempts to set a first-writer-wins claim on this data store. + * + * Returns synchronously with an {@link IClaimAttempt} describing the + * immediate state. Its {@link IClaimAttempt.status} is `"Success"` or + * `"AlreadyClaimed"` when the outcome is already known locally + * (detached, or the key was previously sequenced), and `"Pending"` + * otherwise (e.g. while disconnected, while the op is in flight, or + * while claim state is still being loaded from the base snapshot). + * Callers can branch on the status immediately for race / fallback + * logic and await {@link IClaimAttempt.result} to observe the final + * sequenced outcome. + * + * See {@link DataObject} class remarks for when to use a claim vs. + * writing to {@link DataObject.root | root}. * * @param key - The claim key (non-empty string). * @param value - The JSON-serializable value to claim. May contain * {@link @fluidframework/core-interfaces#IFluidHandle} instances; these * are encoded the same way as handles in summary blobs and contribute * outbound routes to garbage collection. - * @returns A promise that resolves to `"Success"` or `"AlreadyClaimed"`. + * @returns An {@link IClaimAttempt} carrying the immediate status and + * a promise for the eventual sequenced {@link ClaimResult}. */ - protected async trySetClaim(key: string, value: unknown): Promise { + protected trySetClaim(key: string, value: unknown): IClaimAttempt { if (this.runtime.trySetClaim === undefined) { throw new UsageError( "The data store runtime does not support claims. Enable the `enableDataStoreClaims` policy on the data store runtime to use this API.", diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md index 91ebcd77b39e..471b2f79b196 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md @@ -50,6 +50,14 @@ export interface IChannelStorageService { readBlob(path: string): Promise; } +// @beta @legacy +export type IClaimAttempt = { + readonly status: "Success" | "AlreadyClaimed"; +} | { + readonly status: "Pending"; + readonly result: Promise; +}; + // @beta @legacy export interface IDeltaConnection { attach(handler: IDeltaHandler): void; @@ -110,7 +118,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider void; - trySetClaim?(key: string, value: unknown): Promise; + trySetClaim?(key: string, value: unknown): IClaimAttempt; uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; waitAttached(): Promise; } diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md index f76d27db3297..44615acecb55 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md @@ -50,6 +50,14 @@ export interface IChannelStorageService { readBlob(path: string): Promise; } +// @beta @legacy +export type IClaimAttempt = { + readonly status: "Success" | "AlreadyClaimed"; +} | { + readonly status: "Pending"; + readonly result: Promise; +}; + // @beta @legacy export interface IDeltaConnection { attach(handler: IDeltaHandler): void; @@ -110,7 +118,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider void; - trySetClaim?(key: string, value: unknown): Promise; + trySetClaim?(key: string, value: unknown): IClaimAttempt; uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; waitAttached(): Promise; } diff --git a/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts b/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts index b99cda72f109..1e090a5e706b 100644 --- a/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts @@ -51,8 +51,7 @@ export type IDeltaManagerErased = ErasedType<"@fluidframework/container-definitions.IDeltaManager">; /** - * Result of attempting to set a claim on a data store via - * {@link IFluidDataStoreRuntime.trySetClaim}. + * Final, sequenced outcome of a claim attempt. * * - `"Success"` - this client owns the claim for the given key. * - `"AlreadyClaimed"` - another client has already claimed the key; the @@ -62,6 +61,41 @@ export type IDeltaManagerErased = */ export type ClaimResult = "Success" | "AlreadyClaimed"; +/** + * The synchronous handle returned by + * {@link IFluidDataStoreRuntime.trySetClaim}. + * + * The shape is a discriminated union on {@link IClaimAttempt.status}: + * + * - When `status` is `"Success"` or `"AlreadyClaimed"`, the outcome is + * already known locally (detached, or the key was previously + * sequenced); no further work is required. + * - When `status` is `"Pending"`, the outcome cannot be determined + * locally yet — for example, the client is attached but disconnected, + * the op has been submitted but not yet sequenced, or claim state is + * still being hydrated from the base snapshot. In that case, + * {@link IClaimAttempt.result} resolves to the eventual sequenced + * {@link ClaimResult}, or rejects if the runtime is disposed before the + * attempt is sequenced. + * + * Callers can branch on `status` synchronously for race / fallback + * logic without ever creating a promise on the terminal paths. + * @legacy @beta + */ +export type IClaimAttempt = + | { + readonly status: "Success" | "AlreadyClaimed"; + } + | { + readonly status: "Pending"; + /** + * Resolves to the final sequenced {@link ClaimResult} once the + * op (this client's or another's) is sequenced. Rejects if the + * runtime is disposed before the attempt is sequenced. + */ + readonly result: Promise; + }; + /** * Represents the runtime for the data store. Contains helper functions/state of the data store. * @sealed @@ -232,18 +266,28 @@ export interface IFluidDataStoreRuntime * summary blobs and contribute to garbage-collection routes from this data * store. * - * The returned promise resolves to `"Success"` for the client whose op was - * sequenced first for the key, and `"AlreadyClaimed"` for every other - * client. A repeated call from the winning client returns `"Success"`. + * Returns synchronously with an {@link IClaimAttempt} describing the + * outcome. When the outcome is known locally — the key was already + * sequenced, or the data store is detached — `status` is `"Success"` + * or `"AlreadyClaimed"` and there is nothing to await. Otherwise + * `status` is `"Pending"` and {@link IClaimAttempt.result} resolves + * to the eventual sequenced {@link ClaimResult} (`"Success"` for the + * client whose op is sequenced first for the key, and + * `"AlreadyClaimed"` for every other client). + * + * Local ops are automatically resubmitted by the runtime across + * reconnects, so the result promise will eventually resolve once the + * client reconnects — unless the runtime is disposed first, in which + * case the result promise rejects. * * Optional. Implementations that do not support claims will not provide * this method. * * @param key - The claim key. * @param value - The claim value (JSON-serializable; may include handles). - * @returns A promise that resolves to the {@link ClaimResult}. + * @returns An {@link IClaimAttempt} discriminated on `status`. */ - trySetClaim?(key: string, value: unknown): Promise; + trySetClaim?(key: string, value: unknown): IClaimAttempt; /** * Returns the value of a previously-claimed key, with embedded handles diff --git a/packages/runtime/datastore-definitions/src/index.ts b/packages/runtime/datastore-definitions/src/index.ts index 3bf22b58c7dc..01b6eb3e1362 100644 --- a/packages/runtime/datastore-definitions/src/index.ts +++ b/packages/runtime/datastore-definitions/src/index.ts @@ -25,6 +25,7 @@ export type { IFluidDataStoreRuntimeInternalConfig, IDeltaManagerErased, ClaimResult, + IClaimAttempt, } from "./dataStoreRuntime.js"; export type { Jsonable, diff --git a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md index f657b9d5b94f..f44a2870e7d4 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md @@ -104,7 +104,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; - trySetClaim(key: string, value: unknown): Promise; + trySetClaim(key: string, value: unknown): IClaimAttempt; updateUsedRoutes(usedRoutes: string[]): void; // (undocumented) uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index 0ddc5eb6f5d2..b8f9c80e0c63 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -34,6 +34,7 @@ import type { IFluidDataStoreRuntimeEvents, IDeltaManagerErased, ClaimResult, + IClaimAttempt, } from "@fluidframework/datastore-definitions/internal"; import { type IClientDetails, @@ -731,12 +732,16 @@ export class FluidDataStoreRuntime } this._disposed = true; - // Resolve any outstanding claim attempts as "AlreadyClaimed" so callers - // don't hang on a disposed runtime. (We choose resolution over - // rejection to make caller error handling simpler — the caller can - // distinguish via a subsequent `hasClaim` check if needed.) + // Reject any outstanding claim attempts so callers don't hang on a + // disposed runtime. The local op was never sequenced, so resolving + // as `"AlreadyClaimed"` would be misleading. Rejection lets the + // caller distinguish "this attempt did not complete" from a + // sequenced loss. + const disposeError = new UsageError( + "Data store runtime was disposed before the claim was sequenced.", + ); for (const [, deferred] of this.pendingClaims) { - deferred.resolve("AlreadyClaimed"); + deferred.reject(disposeError); } this.pendingClaims.clear(); @@ -773,12 +778,9 @@ export class FluidDataStoreRuntime } } - private async ensureClaimsLoaded(): Promise { - if (this.claimsLoaded) { - return; - } - await this.claimsLoadP; - } + // (Previously `ensureClaimsLoaded` was used by `trySetClaim`; the + // rewrite consults `claimsLoaded` / `claimsLoadP` directly to enable + // synchronous status reporting.) /** * Apply a sequenced Claim op. First-writer-wins: @@ -808,7 +810,7 @@ export class FluidDataStoreRuntime /** * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.trySetClaim} */ - public async trySetClaim(key: string, value: unknown): Promise { + public trySetClaim(key: string, value: unknown): IClaimAttempt { this.verifyNotClosed(); if (!this.claimsEnabled) { throw new UsageError( @@ -824,38 +826,77 @@ export class FluidDataStoreRuntime throw new UsageError("Claim key must be a non-empty string."); } - await this.ensureClaimsLoaded(); - - // Encode handles in `value` and bind them to this runtime's graph. + // Encode handles eagerly so handle binding happens synchronously + // (matching the previous behavior). The encoded form is what we + // store in `sequencedClaims` and what we send on the wire. const encoded = encodeHandlesInClaimValue(value, (h) => this.bind(h)); + // Fast path: claim state is already hydrated from snapshot, so we + // can determine the immediate status synchronously. + if (this.claimsLoaded) { + return this.trySetClaimLoaded(key, encoded); + } + + // Slow path: claim state is still loading. We cannot determine the + // immediate status, so report `"Pending"` and chain the actual + // attempt off the load completion. + const result = (async (): Promise => { + await this.claimsLoadP; + if (this._disposed) { + throw new UsageError( + "Data store runtime was disposed before the claim was sequenced.", + ); + } + if (!this.claimsLoaded) { + // Load failed (the error was logged from the load wrapper). + // We cannot safely declare a winner, so surface this to the + // caller as a rejection. + throw new UsageError( + "Claim state failed to load from the base snapshot; cannot determine claim outcome.", + ); + } + const attempt = this.trySetClaimLoaded(key, encoded); + return attempt.status === "Pending" ? attempt.result : attempt.status; + })(); + return { status: "Pending", result }; + } + + /** + * Implements {@link trySetClaim} for the case where claim state has + * already been hydrated from the base snapshot. `encodedValue` is the + * caller's value with any embedded handles already encoded and bound. + */ + private trySetClaimLoaded(key: string, encodedValue: unknown): IClaimAttempt { // Already-sequenced winner / loser disambiguation. if (this.sequencedClaims.has(key)) { - return this.wonClaims.has(key) ? "Success" : "AlreadyClaimed"; + return { status: this.wonClaims.has(key) ? "Success" : "AlreadyClaimed" }; } // Detached: write directly without an op (will be persisted by // getAttachSummary). if (!this.isAttached) { - this.sequencedClaims.set(key, encoded); + this.sequencedClaims.set(key, encodedValue); this.wonClaims.add(key); - return "Success"; + return { status: "Success" }; } - // If a local attempt for this key is already in flight, await its - // outcome. + // If a local attempt for this key is already in flight, return its + // existing deferred so all callers see the same outcome. const existing = this.pendingClaims.get(key); if (existing !== undefined) { - return existing.promise; + return { status: "Pending", result: existing.promise }; } const deferred = new Deferred(); this.pendingClaims.set(key, deferred); this.submit({ type: DataStoreMessageType.Claim, - content: { key, value: encoded }, + content: { key, value: encodedValue }, }); - return deferred.promise; + // The op has been submitted to the outbox but is not yet sequenced + // (and we may not even be connected). Status is `"Pending"` until + // the op (or a competing remote op) is sequenced. + return { status: "Pending", result: deferred.promise }; } /** diff --git a/packages/runtime/datastore/src/test/claims.spec.ts b/packages/runtime/datastore/src/test/claims.spec.ts index 40af1ca51106..8bc894720962 100644 --- a/packages/runtime/datastore/src/test/claims.spec.ts +++ b/packages/runtime/datastore/src/test/claims.spec.ts @@ -11,6 +11,10 @@ import { ContainerErrorTypes } from "@fluidframework/container-definitions/inter import type { IErrorBase, IFluidHandle } from "@fluidframework/core-interfaces"; import { fluidHandleSymbol } from "@fluidframework/core-interfaces"; import type { IFluidHandleInternal } from "@fluidframework/core-interfaces/internal"; +import type { + ClaimResult, + IClaimAttempt, +} from "@fluidframework/datastore-definitions/internal"; import { SummaryType } from "@fluidframework/driver-definitions"; import type { ISnapshotTree } from "@fluidframework/driver-definitions/internal"; import type { @@ -145,17 +149,29 @@ async function popClaim(submitted: SubmittedOp[]): Promise { return op.content as IClaimMessage; } +/** + * Resolve a claim attempt to its terminal {@link ClaimResult} regardless of + * whether it landed on a synchronous (Success/AlreadyClaimed) or asynchronous + * (Pending) branch of the discriminated union. + */ +async function awaitResult(attempt: IClaimAttempt | undefined): Promise { + assert(attempt !== undefined, "Expected a claim attempt"); + return attempt.status === "Pending" ? attempt.result : attempt.status; +} + describe("FluidDataStoreRuntime claims", () => { it("single-client claim resolves to Success and is exposed via getClaim/hasClaim/claims", async () => { const { context, submitted } = makeContext(); const runtime = createRuntime(context); - const promise = runtime.trySetClaim?.("k", "v"); - assert(promise !== undefined); + const attempt = runtime.trySetClaim?.("k", "v"); + assert(attempt !== undefined); + // Attached + op not yet sequenced -> immediate status is "Pending". + assert(attempt.status === "Pending"); // The op was submitted; round-trip it back as a local ack. const claim = await popClaim(submitted); runtime.processMessages(makeClaimAck(claim, true)); - assert.strictEqual(await promise, "Success"); + assert.strictEqual(await attempt.result, "Success"); assert.strictEqual(runtime.hasClaim?.("k"), true); assert.strictEqual(runtime.getClaim?.("k"), "v"); assert.deepStrictEqual([...(runtime.claims?.entries() ?? [])], [["k", "v"]]); @@ -172,6 +188,8 @@ describe("FluidDataStoreRuntime claims", () => { const pA = runtimeA.trySetClaim?.("k", "A"); const pB = runtimeB.trySetClaim?.("k", "B"); assert(pA !== undefined && pB !== undefined); + assert.strictEqual(pA.status, "Pending"); + assert.strictEqual(pB.status, "Pending"); const aOp = await popClaim(subA); const bOp = await popClaim(subB); @@ -183,8 +201,8 @@ describe("FluidDataStoreRuntime claims", () => { runtimeB.processMessages(makeClaimAck(aOp, false)); runtimeB.processMessages(makeClaimAck(bOp, true)); - assert.strictEqual(await pA, "Success"); - assert.strictEqual(await pB, "AlreadyClaimed"); + assert.strictEqual(await awaitResult(pA), "Success"); + assert.strictEqual(await awaitResult(pB), "AlreadyClaimed"); assert.strictEqual(runtimeA.getClaim?.("k"), "A"); assert.strictEqual(runtimeB.getClaim?.("k"), "A"); }); @@ -196,11 +214,12 @@ describe("FluidDataStoreRuntime claims", () => { const p1 = runtime.trySetClaim?.("k", "v1"); const op1 = await popClaim(submitted); runtime.processMessages(makeClaimAck(op1, true)); - assert.strictEqual(await p1, "Success"); + assert.strictEqual(await awaitResult(p1), "Success"); - // Repeat with different value -> still Success, original value retained. + // Repeat with different value -> still Success synchronously; the + // terminal-status branch carries no promise to await. const p2 = runtime.trySetClaim?.("k", "v2"); - assert.strictEqual(await p2, "Success"); + assert.strictEqual(p2?.status, "Success"); assert.strictEqual(runtime.getClaim?.("k"), "v1"); }); @@ -218,17 +237,20 @@ describe("FluidDataStoreRuntime claims", () => { runtimeA.processMessages(makeClaimAck(bOp, false)); runtimeB.processMessages(makeClaimAck(aOp, false)); runtimeB.processMessages(makeClaimAck(bOp, true)); - assert.strictEqual(await pA, "Success"); - assert.strictEqual(await pB, "AlreadyClaimed"); + assert.strictEqual(await awaitResult(pA), "Success"); + assert.strictEqual(await awaitResult(pB), "AlreadyClaimed"); - // Loser tries again with the same (key, value): still AlreadyClaimed. - assert.strictEqual(await runtimeB.trySetClaim?.("k", "shared"), "AlreadyClaimed"); + // Loser tries again with the same (key, value): still AlreadyClaimed + // synchronously — no op submitted, no promise to await. + const retry = runtimeB.trySetClaim?.("k", "shared"); + assert.strictEqual(retry?.status, "AlreadyClaimed"); }); - it("detached set resolves Success synchronously and is persisted via getAttachSummary", async () => { + it("detached set resolves Success synchronously and is persisted via getAttachSummary", () => { const { context } = makeContext({ attachState: AttachState.Detached }); const runtime = createRuntime(context); - assert.strictEqual(await runtime.trySetClaim?.("k", "v"), "Success"); + const attempt = runtime.trySetClaim?.("k", "v"); + assert.strictEqual(attempt?.status, "Success"); assert.strictEqual(runtime.hasClaim?.("k"), true); // Make the runtime locally visible so getAttachSummary works. @@ -242,7 +264,7 @@ describe("FluidDataStoreRuntime claims", () => { it("after detached set + attach, a remote op for the same key resolves AlreadyClaimed", async () => { const { context, submitted } = makeContext({ attachState: AttachState.Detached }); const runtime = createRuntime(context); - assert.strictEqual(await runtime.trySetClaim?.("k", "winner"), "Success"); + assert.strictEqual(runtime.trySetClaim?.("k", "winner")?.status, "Success"); // Simulate becoming attached. runtime.setAttachState(AttachState.Attaching); @@ -262,7 +284,7 @@ describe("FluidDataStoreRuntime claims", () => { const p = runtime.trySetClaim?.("k", { foo: handle }); const op = await popClaim(submitted); runtime.processMessages(makeClaimAck(op, true)); - assert.strictEqual(await p, "Success"); + assert.strictEqual(await awaitResult(p), "Success"); const gcData = await runtime.getGCData(); assert(gcData.gcNodes["/"] !== undefined); assert( @@ -277,6 +299,7 @@ describe("FluidDataStoreRuntime claims", () => { // Start a claim attempt locally. const pLocal = runtime.trySetClaim?.("k", "local"); + assert.strictEqual(pLocal?.status, "Pending"); const localOp = await popClaim(submitted); // A remote winner is sequenced first. @@ -295,15 +318,15 @@ describe("FluidDataStoreRuntime claims", () => { const resubmitted = await popClaim(submitted); runtime.processMessages(makeClaimAck(resubmitted, true)); - assert.strictEqual(await pLocal, "AlreadyClaimed"); + assert.strictEqual(await awaitResult(pLocal), "AlreadyClaimed"); assert.strictEqual(runtime.getClaim?.("k"), "remote"); }); - it("feature flag off: trySetClaim throws a UsageError", async () => { + it("feature flag off: trySetClaim throws a UsageError", () => { const { context } = makeContext(); const runtime = createRuntime(context, { enableDataStoreClaims: false }); - await assert.rejects( - async () => runtime.trySetClaim?.("k", "v"), + assert.throws( + () => runtime.trySetClaim?.("k", "v"), (e: IErrorBase) => e.errorType === ContainerErrorTypes.usageError && e.message.includes("DataStore claims are not enabled"), @@ -333,11 +356,11 @@ describe("FluidDataStoreRuntime claims", () => { const { context } = makeContext({ baseSnapshot, blobs }); const runtime = createRuntime(context); - // Force the load to complete by awaiting trySetClaim (which awaits the load). - // (Using a key that doesn't exist; we expect the op to be submitted, but we - // don't process it — we just want the load to drain.) + // Claim state hasn't been loaded yet -> immediate status is "Pending" + // even though we'd otherwise be a fast-path "AlreadyClaimed" on key "a". const setP = runtime.trySetClaim?.("c", "valueC"); - // Allow microtasks to run. + assert.strictEqual(setP?.status, "Pending"); + // Allow microtasks to run so the load completes. await Promise.resolve(); await Promise.resolve(); @@ -347,17 +370,20 @@ describe("FluidDataStoreRuntime claims", () => { const decoded = runtime.getClaim?.("b") as { absolutePath: string }; assert.strictEqual(decoded.absolutePath, "/x/y"); - // Don't leave the open setP unhandled - just await it later. We attach - // a no-op handler so any rejection is observed. - setP?.catch(() => undefined); + // Don't leave the open setP unhandled - just attach a no-op handler so + // any rejection is observed. setP is "Pending" while claim state is + // still loading. + if (setP?.status === "Pending") { + setP.result.catch(() => undefined); + } }); - it("staging mode: trySetClaim throws", async () => { + it("staging mode: trySetClaim throws", () => { const { context } = makeContext(); (context.containerRuntime as unknown as { inStagingMode: boolean }).inStagingMode = true; const runtime = createRuntime(context); - await assert.rejects( - async () => runtime.trySetClaim?.("k", "v"), + assert.throws( + () => runtime.trySetClaim?.("k", "v"), (e: IErrorBase) => e.errorType === ContainerErrorTypes.usageError && e.message.includes("staging"), ); @@ -368,7 +394,7 @@ describe("FluidDataStoreRuntime claims", () => { const runtime = createRuntime(context); const p = runtime.trySetClaim?.("k", "v"); runtime.processMessages(makeClaimAck(await popClaim(submitted), true)); - await p; + await awaitResult(p); const summary = await runtime.summarize(true, false); assert(summary.summary.type === SummaryType.Tree); @@ -382,13 +408,54 @@ describe("FluidDataStoreRuntime claims", () => { assert.deepStrictEqual(parsed.entries, [["k", "v"]]); }); - it("dispose resolves outstanding pending claims as AlreadyClaimed", async () => { + it("dispose rejects outstanding pending claim result promises", async () => { const { context } = makeContext(); const runtime = createRuntime(context); const p = runtime.trySetClaim?.("k", "v"); + assert(p !== undefined); + assert(p.status === "Pending"); await flush(); runtime.dispose(); - assert.strictEqual(await p, "AlreadyClaimed"); + await assert.rejects( + async () => p.result, + (e: IErrorBase) => + e.errorType === ContainerErrorTypes.usageError && e.message.includes("disposed"), + ); + }); + + it("offline: trySetClaim returns Pending synchronously and the result promise stays unresolved until the op is sequenced", async () => { + // We can't directly toggle "connected" on the mock context, but the + // shape of the API guarantees that for any attached client whose op + // has not yet been sequenced (the disconnected case included), the + // immediate status is "Pending" and the consumer can immediately + // race or fall back. + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + const attempt = runtime.trySetClaim?.("k", "v"); + assert(attempt !== undefined); + assert(attempt.status === "Pending"); + + // Until we round-trip the op, the result promise is unresolved. + // Pre-attach a state-tracking listener so we can verify it isn't + // resolved prematurely. + let resolved = false; + attempt.result.then( + () => { + resolved = true; + }, + () => { + resolved = true; + }, + ); + await flush(); + assert.strictEqual(resolved, false, "result must not resolve before the op is sequenced"); + + // Sequence the op (as if we'd just reconnected and the resubmitter + // drained the outbox). + const claim = await popClaim(submitted); + runtime.processMessages(makeClaimAck(claim, true)); + assert.strictEqual(await attempt.result, "Success"); }); // Reference imported types so they aren't flagged as unused.