diff --git a/.changeset/claims-dataobject.md b/.changeset/claims-dataobject.md new file mode 100644 index 000000000000..8ee6fe3a6af0 --- /dev/null +++ b/.changeset/claims-dataobject.md @@ -0,0 +1,47 @@ +--- +"@fluidframework/datastore-definitions": minor +"@fluidframework/datastore": minor +"@fluidframework/runtime-definitions": minor +"@fluidframework/aqueduct": minor +"__section": feature +--- +Add first-writer-wins claims to data store runtime and DataObject + +This adds a new "claim" primitive to `IFluidDataStoreRuntime` and `DataObject` +that lets a data store publish first-writer-wins key/value entries. A claim +is conceptually a small piece of immutable per-data-store state where the +first client to write a given key wins; concurrent writers from other clients +observe `"AlreadyClaimed"` and can branch their logic accordingly. + +New API surface (all `@legacy` `@beta`): + +- `ClaimResult = "Success" | "AlreadyClaimed"` — terminal sequenced + outcome of a claim attempt. +- `IClaimAttempt` — the synchronous return shape of `trySetClaim`. It is + a discriminated union on `status`: + - `{ status: "Success" | "AlreadyClaimed" }` when the outcome is + already known locally (detached, or the key was previously + sequenced). There is nothing to await. + - `{ status: "Pending"; result: Promise }` when the + outcome can't be determined locally yet — for example, the client + is attached but disconnected, the op has been submitted but not + yet sequenced, or claim state is still being hydrated from the + base snapshot. The `result` promise resolves to the final + sequenced `ClaimResult`, or rejects if the runtime is disposed + before the attempt is sequenced. +- `IFluidDataStoreRuntime.trySetClaim(key, value): IClaimAttempt`, + `getClaim(key)`, `hasClaim(key)`, and `claims` (a read-only iterator). +- `IFluidDataStorePolicies.enableDataStoreClaims` opt-in flag (defaults to + off; set to `true` on the data store runtime to enable the API). +- `DataObject.trySetClaim`, `getClaim`, `hasClaim` convenience helpers that + forward to the runtime. + +Claim values may contain `IFluidHandle` instances; these are encoded the same +way as handles in summary blobs and contribute outbound routes to garbage +collection. Claims are persisted via a `.claims` summary blob on the data +store and rehydrated on subsequent loads. + +Use a claim (rather than writing to `DataObject.root`) when you specifically +need first-writer-wins semantics — for example, when multiple clients race +to designate themselves as the owner of a particular role within the data +store and only one should succeed. diff --git a/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md b/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md index f1e238c0f96b..5ee98749011e 100644 --- a/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md +++ b/packages/framework/aqueduct/api-report/aqueduct.legacy.beta.api.md @@ -52,9 +52,12 @@ export interface ContainerRuntimeFactoryWithDefaultDataStoreProps { // @beta @legacy export abstract class DataObject extends PureDataObject { + protected getClaim(key: string): unknown; protected getUninitializedErrorString(item: string): string; + protected hasClaim(key: string): boolean; initializeInternal(existing: boolean): Promise; protected get root(): ISharedDirectory; + protected trySetClaim(key: string, value: unknown): IClaimAttempt; } // @beta @legacy diff --git a/packages/framework/aqueduct/src/data-objects/dataObject.ts b/packages/framework/aqueduct/src/data-objects/dataObject.ts index 02b38b713ec6..1e8946b54b8b 100644 --- a/packages/framework/aqueduct/src/data-objects/dataObject.ts +++ b/packages/framework/aqueduct/src/data-objects/dataObject.ts @@ -3,11 +3,16 @@ * Licensed under the MIT License. */ +import type { + ClaimResult, + IClaimAttempt, +} from "@fluidframework/datastore-definitions/internal"; import { type ISharedDirectory, MapFactory, SharedDirectory, } from "@fluidframework/map/internal"; +import { UsageError } from "@fluidframework/telemetry-utils/internal"; import { PureDataObject } from "./pureDataObject.js"; import type { DataObjectTypes } from "./types.js"; @@ -20,6 +25,21 @@ import type { DataObjectTypes } from "./types.js"; * and registering channels with the runtime any new DDS that is set on the root * will automatically be registered. * + * @remarks + * In addition to the {@link DataObject.root | root directory}, `DataObject` + * also exposes a first-writer-wins **claims** API: + * {@link DataObject.trySetClaim}, {@link DataObject.getClaim}, and + * {@link DataObject.hasClaim}. Use a claim instead of `root.set` when you + * need to wire up a singleton entry (typically a handle to a child DDS or + * data store) and want first-writer-wins semantics — once a claim is + * sequenced it can never be overwritten by another client. By contrast, + * `root.set` (and other DDS writes) use last-writer-wins semantics, so two + * clients racing to populate the same key will silently overwrite each + * other. + * + * Claims require the underlying data store runtime to have the + * `enableDataStoreClaims` policy enabled. + * * @typeParam I - The optional input types used to strongly type the data object * @legacy * @beta @@ -42,6 +62,54 @@ export abstract class DataObject< return this.internalRoot; } + /** + * Attempts to set a first-writer-wins claim on this data store. + * + * Returns synchronously with an {@link IClaimAttempt} describing the + * immediate state. Its {@link IClaimAttempt.status} is `"Success"` or + * `"AlreadyClaimed"` when the outcome is already known locally + * (detached, or the key was previously sequenced), and `"Pending"` + * otherwise (e.g. while disconnected, while the op is in flight, or + * while claim state is still being loaded from the base snapshot). + * Callers can branch on the status immediately for race / fallback + * logic and await {@link IClaimAttempt.result} to observe the final + * sequenced outcome. + * + * See {@link DataObject} class remarks for when to use a claim vs. + * writing to {@link DataObject.root | root}. + * + * @param key - The claim key (non-empty string). + * @param value - The JSON-serializable value to claim. May contain + * {@link @fluidframework/core-interfaces#IFluidHandle} instances; these + * are encoded the same way as handles in summary blobs and contribute + * outbound routes to garbage collection. + * @returns An {@link IClaimAttempt} carrying the immediate status and + * a promise for the eventual sequenced {@link ClaimResult}. + */ + protected trySetClaim(key: string, value: unknown): IClaimAttempt { + if (this.runtime.trySetClaim === undefined) { + throw new UsageError( + "The data store runtime does not support claims. Enable the `enableDataStoreClaims` policy on the data store runtime to use this API.", + ); + } + return this.runtime.trySetClaim(key, value); + } + + /** + * Returns the value of a previously-claimed key, or `undefined` if the + * key has not been claimed. Embedded handles are decoded. + */ + protected getClaim(key: string): unknown { + return this.runtime.getClaim?.(key); + } + + /** + * Returns `true` if a claim has been sequenced for the given key. + */ + protected hasClaim(key: string): boolean { + return this.runtime.hasClaim?.(key) ?? false; + } + /** * Initializes internal objects and calls initialization overrides. * Caller is responsible for ensuring this is only invoked once. diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md index fbf624137081..471b2f79b196 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.alpha.api.md @@ -4,6 +4,9 @@ ```ts +// @beta @legacy +export type ClaimResult = "Success" | "AlreadyClaimed"; + // @beta @legacy export interface IChannel extends IFluidLoadable { // (undocumented) @@ -47,6 +50,14 @@ export interface IChannelStorageService { readBlob(path: string): Promise; } +// @beta @legacy +export type IClaimAttempt = { + readonly status: "Success" | "AlreadyClaimed"; +} | { + readonly status: "Pending"; + readonly result: Promise; +}; + // @beta @legacy export interface IDeltaConnection { attach(handler: IDeltaHandler): void; @@ -76,6 +87,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider; // (undocumented) readonly clientId: string | undefined; // (undocumented) @@ -86,7 +98,9 @@ export interface IFluidDataStoreRuntime extends IEventProvider; getAudience(): IAudience; getChannel(id: string): Promise; + getClaim?(key: string): unknown; getQuorum(): IQuorumClients; + hasClaim?(key: string): boolean; // (undocumented) readonly id: string; readonly idCompressor: IIdCompressor | undefined; @@ -104,6 +118,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider void; + trySetClaim?(key: string, value: unknown): IClaimAttempt; uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; waitAttached(): Promise; } diff --git a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md index 174ec80ca5a6..44615acecb55 100644 --- a/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md +++ b/packages/runtime/datastore-definitions/api-report/datastore-definitions.legacy.beta.api.md @@ -4,6 +4,9 @@ ```ts +// @beta @legacy +export type ClaimResult = "Success" | "AlreadyClaimed"; + // @beta @legacy export interface IChannel extends IFluidLoadable { // (undocumented) @@ -47,6 +50,14 @@ export interface IChannelStorageService { readBlob(path: string): Promise; } +// @beta @legacy +export type IClaimAttempt = { + readonly status: "Success" | "AlreadyClaimed"; +} | { + readonly status: "Pending"; + readonly result: Promise; +}; + // @beta @legacy export interface IDeltaConnection { attach(handler: IDeltaHandler): void; @@ -76,6 +87,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider; // (undocumented) readonly clientId: string | undefined; // (undocumented) @@ -86,7 +98,9 @@ export interface IFluidDataStoreRuntime extends IEventProvider; getAudience(): IAudience; getChannel(id: string): Promise; + getClaim?(key: string): unknown; getQuorum(): IQuorumClients; + hasClaim?(key: string): boolean; // (undocumented) readonly id: string; readonly idCompressor: IIdCompressor | undefined; @@ -104,6 +118,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider void; + trySetClaim?(key: string, value: unknown): IClaimAttempt; uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; waitAttached(): Promise; } diff --git a/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts b/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts index 0918a49aec34..1e090a5e706b 100644 --- a/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore-definitions/src/dataStoreRuntime.ts @@ -50,6 +50,52 @@ export interface IFluidDataStoreRuntimeEvents extends IEvent { export type IDeltaManagerErased = ErasedType<"@fluidframework/container-definitions.IDeltaManager">; +/** + * Final, sequenced outcome of a claim attempt. + * + * - `"Success"` - this client owns the claim for the given key. + * - `"AlreadyClaimed"` - another client has already claimed the key; the + * existing value is unchanged. Claims are first-writer-wins and are + * immutable for the lifetime of the document. + * @legacy @beta + */ +export type ClaimResult = "Success" | "AlreadyClaimed"; + +/** + * The synchronous handle returned by + * {@link IFluidDataStoreRuntime.trySetClaim}. + * + * The shape is a discriminated union on {@link IClaimAttempt.status}: + * + * - When `status` is `"Success"` or `"AlreadyClaimed"`, the outcome is + * already known locally (detached, or the key was previously + * sequenced); no further work is required. + * - When `status` is `"Pending"`, the outcome cannot be determined + * locally yet — for example, the client is attached but disconnected, + * the op has been submitted but not yet sequenced, or claim state is + * still being hydrated from the base snapshot. In that case, + * {@link IClaimAttempt.result} resolves to the eventual sequenced + * {@link ClaimResult}, or rejects if the runtime is disposed before the + * attempt is sequenced. + * + * Callers can branch on `status` synchronously for race / fallback + * logic without ever creating a promise on the terminal paths. + * @legacy @beta + */ +export type IClaimAttempt = + | { + readonly status: "Success" | "AlreadyClaimed"; + } + | { + readonly status: "Pending"; + /** + * Resolves to the final sequenced {@link ClaimResult} once the + * op (this client's or another's) is sequenced. Rejects if the + * runtime is disposed before the attempt is sequenced. + */ + readonly result: Promise; + }; + /** * Represents the runtime for the data store. Contains helper functions/state of the data store. * @sealed @@ -203,6 +249,72 @@ export interface IFluidDataStoreRuntime * Indicates whether the data store has uncommitted local changes. */ readonly isDirty: boolean; + + /** + * Attempt to set a first-writer-wins "claim" for the given key on this data + * store. Once a claim has been sequenced for a key, no other client can + * overwrite it for the lifetime of the document. + * + * @remarks + * Claims are intended for partner scenarios that need to wire up singleton + * components (typically a handle to a child DDS) with first-writer-wins + * semantics, instead of the last-writer-wins semantics provided by writing + * to a DDS such as `SharedDirectory`. + * + * The value is JSON-serializable. {@link @fluidframework/core-interfaces#IFluidHandle} + * instances embedded in the value are encoded the same way as handles in + * summary blobs and contribute to garbage-collection routes from this data + * store. + * + * Returns synchronously with an {@link IClaimAttempt} describing the + * outcome. When the outcome is known locally — the key was already + * sequenced, or the data store is detached — `status` is `"Success"` + * or `"AlreadyClaimed"` and there is nothing to await. Otherwise + * `status` is `"Pending"` and {@link IClaimAttempt.result} resolves + * to the eventual sequenced {@link ClaimResult} (`"Success"` for the + * client whose op is sequenced first for the key, and + * `"AlreadyClaimed"` for every other client). + * + * Local ops are automatically resubmitted by the runtime across + * reconnects, so the result promise will eventually resolve once the + * client reconnects — unless the runtime is disposed first, in which + * case the result promise rejects. + * + * Optional. Implementations that do not support claims will not provide + * this method. + * + * @param key - The claim key. + * @param value - The claim value (JSON-serializable; may include handles). + * @returns An {@link IClaimAttempt} discriminated on `status`. + */ + trySetClaim?(key: string, value: unknown): IClaimAttempt; + + /** + * Returns the value of a previously-claimed key, with embedded handles + * decoded. Returns `undefined` if the key has not (yet) been claimed. + * + * Optional. Implementations that do not support claims will not provide + * this method. + */ + getClaim?(key: string): unknown; + + /** + * Returns `true` if the given key has been sequenced as a claim on this + * data store. + * + * Optional. Implementations that do not support claims will not provide + * this method. + */ + hasClaim?(key: string): boolean; + + /** + * Read-only view of all sequenced claims on this data store, with embedded + * handles decoded. + * + * Optional. Implementations that do not support claims will not provide + * this property. + */ + readonly claims?: ReadonlyMap; } /** diff --git a/packages/runtime/datastore-definitions/src/index.ts b/packages/runtime/datastore-definitions/src/index.ts index b986216a6a0e..01b6eb3e1362 100644 --- a/packages/runtime/datastore-definitions/src/index.ts +++ b/packages/runtime/datastore-definitions/src/index.ts @@ -24,6 +24,8 @@ export type { IFluidDataStoreRuntimeEvents, IFluidDataStoreRuntimeInternalConfig, IDeltaManagerErased, + ClaimResult, + IClaimAttempt, } from "./dataStoreRuntime.js"; export type { Jsonable, diff --git a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md index 1ee912869d4b..f44a2870e7d4 100644 --- a/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md +++ b/packages/runtime/datastore/api-report/datastore.legacy.beta.api.md @@ -9,7 +9,9 @@ export enum DataStoreMessageType { // (undocumented) Attach = "attach", // (undocumented) - ChannelOp = "op" + ChannelOp = "op", + // (undocumented) + Claim = "claim" } // @beta @legacy @@ -30,6 +32,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; // (undocumented) get clientDetails(): IClientDetails; // (undocumented) @@ -54,9 +57,11 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; + getClaim(key: string): unknown | undefined; getGCData(fullGC?: boolean): Promise; // (undocumented) getQuorum(): IQuorumClients; + hasClaim(key: string): boolean; // (undocumented) readonly id: string; // (undocumented) @@ -99,6 +104,7 @@ export class FluidDataStoreRuntime extends TypedEventEmitter; + trySetClaim(key: string, value: unknown): IClaimAttempt; updateUsedRoutes(usedRoutes: string[]): void; // (undocumented) uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise>; diff --git a/packages/runtime/datastore/package.json b/packages/runtime/datastore/package.json index 3ac7bf6ba4ff..83ca2d9bd375 100644 --- a/packages/runtime/datastore/package.json +++ b/packages/runtime/datastore/package.json @@ -160,6 +160,9 @@ "broken": { "Class_FluidDataStoreRuntime": { "forwardCompat": false + }, + "Enum_DataStoreMessageType": { + "backCompat": false } }, "entrypoint": "legacy" diff --git a/packages/runtime/datastore/src/dataStoreRuntime.ts b/packages/runtime/datastore/src/dataStoreRuntime.ts index da14c2438e5d..b8f9c80e0c63 100644 --- a/packages/runtime/datastore/src/dataStoreRuntime.ts +++ b/packages/runtime/datastore/src/dataStoreRuntime.ts @@ -3,7 +3,11 @@ * Licensed under the MIT License. */ -import { TypedEventEmitter, type ILayerCompatDetails } from "@fluid-internal/client-utils"; +import { + TypedEventEmitter, + type ILayerCompatDetails, + bufferToString, +} from "@fluid-internal/client-utils"; import { AttachState, type IAudience } from "@fluidframework/container-definitions"; import type { IDeltaManager } from "@fluidframework/container-definitions/internal"; import type { @@ -29,6 +33,8 @@ import type { IFluidDataStoreRuntime, IFluidDataStoreRuntimeEvents, IDeltaManagerErased, + ClaimResult, + IClaimAttempt, } from "@fluidframework/datastore-definitions/internal"; import { type IClientDetails, @@ -67,7 +73,11 @@ import { currentSummarizeStepPropertyName, } from "@fluidframework/runtime-definitions/internal"; import { + encodeHandleForSerialization, GCDataBuilder, + isSerializedHandle, + isFluidHandle, + RemoteFluidObjectHandle, RequestParser, SummaryTreeBuilder, addBlobToSummary, @@ -136,6 +146,33 @@ export enum DataStoreMessageType { // Creates a new channel Attach = "attach", ChannelOp = "op", + // Submits a first-writer-wins claim entry on the data store. + Claim = "claim", +} + +/** + * Wire format for a {@link DataStoreMessageType.Claim} op. + * The `value` is the JSON-serializable claim value with embedded handles + * already encoded as serialized handle markers. + * @internal + */ +export interface IClaimMessage { + key: string; + value: unknown; +} + +/** + * Well-known summary blob name used to persist sequenced claims for a data + * store. Sibling of channel subtrees in the data store's summary. + */ +const claimsBlobName = ".claims"; + +/** + * Persisted format of the claims blob. Only sequenced entries are written; + * the winner-vs-loser distinction is local state and is not persisted. + */ +interface IClaimsBlobContent { + entries: [string, unknown][]; } /** @@ -147,7 +184,8 @@ export enum DataStoreMessageType { */ export type LocalFluidDataStoreRuntimeMessage = | { type: DataStoreMessageType.ChannelOp; content: IEnvelope } - | { type: DataStoreMessageType.Attach; content: IAttachMessage }; + | { type: DataStoreMessageType.Attach; content: IAttachMessage } + | { type: DataStoreMessageType.Claim; content: IClaimMessage }; /** * @legacy @beta @@ -230,6 +268,84 @@ function isURL(str: string): boolean { } } +/** + * Recursively walks a claim value, replacing any embedded + * {@link @fluidframework/core-interfaces#IFluidHandle} with its serialized + * form, and binding the handle to the data store runtime so its graph is + * attached. + */ +function encodeHandlesInClaimValue( + value: unknown, + bind: (handle: IFluidHandle) => void, +): unknown { + if (value === null || value === undefined) { + return value; + } + if (isFluidHandle(value)) { + bind(value); + return encodeHandleForSerialization(toFluidHandleInternal(value)); + } + if (Array.isArray(value)) { + return value.map((item) => encodeHandlesInClaimValue(item, bind)); + } + if (typeof value === "object") { + const result: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + result[k] = encodeHandlesInClaimValue(v, bind); + } + return result; + } + return value; +} + +/** + * Recursively walks an encoded claim value, replacing every + * {@link ISerializedHandle} with a {@link RemoteFluidObjectHandle} bound to + * the supplied route context. + */ +function decodeHandlesInClaimValue( + value: unknown, + routeContext: IFluidHandleContext, +): unknown { + if (value === null || value === undefined || typeof value !== "object") { + return value; + } + if (isSerializedHandle(value)) { + return new RemoteFluidObjectHandle(value.url, routeContext, value.payloadPending === true); + } + if (Array.isArray(value)) { + return value.map((item) => decodeHandlesInClaimValue(item, routeContext)); + } + const result: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + result[k] = decodeHandlesInClaimValue(v, routeContext); + } + return result; +} + +/** + * Walks an encoded claim value and collects all handle URLs (outbound GC + * routes contributed by the claim). + */ +function collectClaimHandleRoutes(value: unknown, routes: string[]): void { + if (value === null || value === undefined || typeof value !== "object") { + return; + } + if (isSerializedHandle(value)) { + routes.push(value.url); + return; + } + if (Array.isArray(value)) { + for (const item of value) { + collectClaimHandleRoutes(item, routes); + } + return; + } + for (const v of Object.values(value as Record)) { + collectClaimHandleRoutes(v, routes); + } +} + const defaultPolicies: IFluidDataStorePolicies = { readonlyInStagingMode: false, }; @@ -341,6 +457,45 @@ export class FluidDataStoreRuntime // store becomes visible. private readonly pendingHandlesToMakeVisible: Set = new Set(); + /** + * Authoritative claim entries that have been sequenced. Values are stored + * in their encoded form (handles already replaced with serialized handle + * markers). Decoded on demand via `getClaim` / `claims`. + */ + private readonly sequencedClaims = new Map(); + + /** + * Keys for which this client's op was the one that won the race for a + * claim. Used to disambiguate `"Success"` vs `"AlreadyClaimed"` for + * repeat calls. This is purely local state and is not persisted in + * summaries (the loading client was, by definition, not the writer). + */ + private readonly wonClaims = new Set(); + + /** + * Outstanding `trySetClaim` calls that have submitted an op and are + * waiting for it to be sequenced. Keyed by claim key. + */ + private readonly pendingClaims = new Map>(); + + /** + * Promise that resolves once `sequencedClaims` has been hydrated from + * the base snapshot (or immediately, if there is no base snapshot). + */ + private readonly claimsLoadP: Promise | undefined; + + /** + * `true` once `sequencedClaims` has been hydrated from the base + * snapshot. Used to gate sync claim accessors. + */ + private claimsLoaded = false; + + /** + * Buffer of inbound Claim ops that arrived before `claimsLoaded` + * became `true`. Drained in arrival order once load completes. + */ + private readonly bufferedClaimOps: { content: IClaimMessage; local: boolean }[] = []; + public readonly id: string; // TODO: use something other than `any` here (breaking change) @@ -543,6 +698,14 @@ export class FluidDataStoreRuntime this.mc.config.getNumber("Fluid.Telemetry.LocalChangesTelemetryCount") ?? 10; this.minVersionForCollab = this.dataStoreContext.minVersionForCollab; + + // Kick off the (possibly-async) load of the persisted .claims blob. + // Inbound Claim ops that arrive before this completes are buffered and + // drained in arrival order; sync claim accessors return the empty + // state until the load resolves. + this.claimsLoadP = this.loadClaimsFromSnapshot().catch((error) => { + this.mc.logger.sendErrorEvent({ eventName: "ClaimsLoadFailed" }, error); + }); } /** @@ -569,10 +732,201 @@ export class FluidDataStoreRuntime } this._disposed = true; + // Reject any outstanding claim attempts so callers don't hang on a + // disposed runtime. The local op was never sequenced, so resolving + // as `"AlreadyClaimed"` would be misleading. Rejection lets the + // caller distinguish "this attempt did not complete" from a + // sequenced loss. + const disposeError = new UsageError( + "Data store runtime was disposed before the claim was sequenced.", + ); + for (const [, deferred] of this.pendingClaims) { + deferred.reject(disposeError); + } + this.pendingClaims.clear(); + this.emit("dispose"); this.removeAllListeners(); } + // ---------------------------------------------------------------------- + // Claims (first-writer-wins, data-store-scoped key/value entries). + // ---------------------------------------------------------------------- + + private get claimsEnabled(): boolean { + return this.policies.enableDataStoreClaims === true; + } + + private async loadClaimsFromSnapshot(): Promise { + const snapshot = this.dataStoreContext.baseSnapshot; + const blobId = snapshot?.blobs[claimsBlobName]; + if (blobId !== undefined) { + const blob = await this.dataStoreContext.storage.readBlob(blobId); + const text = bufferToString(blob, "utf8"); + const parsed = JSON.parse(text) as IClaimsBlobContent; + for (const [k, v] of parsed.entries) { + if (!this.sequencedClaims.has(k)) { + this.sequencedClaims.set(k, v); + } + } + } + this.claimsLoaded = true; + // Drain buffered ops in arrival order. + const buffered = this.bufferedClaimOps.splice(0, this.bufferedClaimOps.length); + for (const { content, local } of buffered) { + this.applyClaimOp(content, local); + } + } + + // (Previously `ensureClaimsLoaded` was used by `trySetClaim`; the + // rewrite consults `claimsLoaded` / `claimsLoadP` directly to enable + // synchronous status reporting.) + + /** + * Apply a sequenced Claim op. First-writer-wins: + * - If the key is already in `sequencedClaims`, ignore the op. If the + * op was local, the deferred resolves to `"AlreadyClaimed"`. + * - Otherwise, write into `sequencedClaims`. If the op was local, + * record this client as the winner and resolve to `"Success"`. + */ + private applyClaimOp(content: IClaimMessage, local: boolean): void { + const { key, value } = content; + const winner = !this.sequencedClaims.has(key); + if (winner) { + this.sequencedClaims.set(key, value); + if (local) { + this.wonClaims.add(key); + } + } + if (local) { + const deferred = this.pendingClaims.get(key); + if (deferred !== undefined) { + this.pendingClaims.delete(key); + deferred.resolve(winner ? "Success" : "AlreadyClaimed"); + } + } + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.trySetClaim} + */ + public trySetClaim(key: string, value: unknown): IClaimAttempt { + this.verifyNotClosed(); + if (!this.claimsEnabled) { + throw new UsageError( + "DataStore claims are not enabled. Set the `enableDataStoreClaims` policy on the data store runtime to opt in.", + ); + } + if (this.inStagingMode) { + throw new UsageError( + "trySetClaim is not supported while the container is in staging mode.", + ); + } + if (typeof key !== "string" || key.length === 0) { + throw new UsageError("Claim key must be a non-empty string."); + } + + // Encode handles eagerly so handle binding happens synchronously + // (matching the previous behavior). The encoded form is what we + // store in `sequencedClaims` and what we send on the wire. + const encoded = encodeHandlesInClaimValue(value, (h) => this.bind(h)); + + // Fast path: claim state is already hydrated from snapshot, so we + // can determine the immediate status synchronously. + if (this.claimsLoaded) { + return this.trySetClaimLoaded(key, encoded); + } + + // Slow path: claim state is still loading. We cannot determine the + // immediate status, so report `"Pending"` and chain the actual + // attempt off the load completion. + const result = (async (): Promise => { + await this.claimsLoadP; + if (this._disposed) { + throw new UsageError( + "Data store runtime was disposed before the claim was sequenced.", + ); + } + if (!this.claimsLoaded) { + // Load failed (the error was logged from the load wrapper). + // We cannot safely declare a winner, so surface this to the + // caller as a rejection. + throw new UsageError( + "Claim state failed to load from the base snapshot; cannot determine claim outcome.", + ); + } + const attempt = this.trySetClaimLoaded(key, encoded); + return attempt.status === "Pending" ? attempt.result : attempt.status; + })(); + return { status: "Pending", result }; + } + + /** + * Implements {@link trySetClaim} for the case where claim state has + * already been hydrated from the base snapshot. `encodedValue` is the + * caller's value with any embedded handles already encoded and bound. + */ + private trySetClaimLoaded(key: string, encodedValue: unknown): IClaimAttempt { + // Already-sequenced winner / loser disambiguation. + if (this.sequencedClaims.has(key)) { + return { status: this.wonClaims.has(key) ? "Success" : "AlreadyClaimed" }; + } + + // Detached: write directly without an op (will be persisted by + // getAttachSummary). + if (!this.isAttached) { + this.sequencedClaims.set(key, encodedValue); + this.wonClaims.add(key); + return { status: "Success" }; + } + + // If a local attempt for this key is already in flight, return its + // existing deferred so all callers see the same outcome. + const existing = this.pendingClaims.get(key); + if (existing !== undefined) { + return { status: "Pending", result: existing.promise }; + } + + const deferred = new Deferred(); + this.pendingClaims.set(key, deferred); + this.submit({ + type: DataStoreMessageType.Claim, + content: { key, value: encodedValue }, + }); + // The op has been submitted to the outbox but is not yet sequenced + // (and we may not even be connected). Status is `"Pending"` until + // the op (or a competing remote op) is sequenced. + return { status: "Pending", result: deferred.promise }; + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.getClaim} + */ + public getClaim(key: string): unknown | undefined { + if (!this.sequencedClaims.has(key)) { + return undefined; + } + return decodeHandlesInClaimValue(this.sequencedClaims.get(key), this.routeContext); + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.hasClaim} + */ + public hasClaim(key: string): boolean { + return this.sequencedClaims.has(key); + } + + /** + * {@inheritDoc @fluidframework/datastore-definitions#IFluidDataStoreRuntime.claims} + */ + public get claims(): ReadonlyMap { + const decoded = new Map(); + for (const [k, v] of this.sequencedClaims) { + decoded.set(k, decodeHandlesInClaimValue(v, this.routeContext)); + } + return decoded; + } + public async resolveHandle(request: IRequest): Promise { return this.request(request); } @@ -1014,6 +1368,17 @@ export class FluidDataStoreRuntime this.processAttachMessages(messageCollection); break; } + case DataStoreMessageType.Claim: { + for (const { contents } of messagesContent) { + const claimMessage = contents as IClaimMessage; + if (this.claimsLoaded) { + this.applyClaimOp(claimMessage, local); + } else { + this.bufferedClaimOps.push({ content: claimMessage, local }); + } + } + break; + } default: } } catch (error) { @@ -1057,6 +1422,11 @@ export class FluidDataStoreRuntime for (const [contextId] of this.contexts) { outboundRoutes.push(`${this.absolutePath}/${contextId}`); } + // Include any handle routes embedded in sequenced claim values so GC + // keeps their referents alive. + for (const value of this.sequencedClaims.values()) { + collectClaimHandleRoutes(value, outboundRoutes); + } return outboundRoutes; } @@ -1093,9 +1463,27 @@ export class FluidDataStoreRuntime summaryBuilder.addWithStats(contextId, contextSummary); }, ); + this.addClaimsBlob(summaryBuilder); return summaryBuilder.getSummaryTree(); } + /** + * Writes the persisted `.claims` blob into the supplied summary builder + * if there are any sequenced claims. The blob stores only sequenced + * entries — the local winner-vs-loser distinction (`wonClaims`) is + * per-client state and is intentionally not persisted (the loading + * client cannot have been the writer). + */ + private addClaimsBlob(summaryBuilder: SummaryTreeBuilder): void { + if (this.sequencedClaims.size === 0) { + return; + } + const content: IClaimsBlobContent = { + entries: [...this.sequencedClaims.entries()], + }; + summaryBuilder.addBlob(claimsBlobName, JSON.stringify(content)); + } + /** * Generates data used for garbage collection. This includes a list of GC nodes that represent this channel * including any of its child channel contexts. Each node has a set of outbound routes to other GC nodes in the @@ -1178,6 +1566,7 @@ export class FluidDataStoreRuntime }, ); + this.addClaimsBlob(summaryBuilder); return summaryBuilder.getSummaryTree(); } @@ -1384,6 +1773,14 @@ export class FluidDataStoreRuntime this.submit({ type, content }, localOpMetadata); break; } + case DataStoreMessageType.Claim: { + // Resubmit the claim op verbatim. The race-resolution in + // applyClaimOp correctly distinguishes Success vs + // AlreadyClaimed once the resubmitted op is sequenced. + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + this.submit({ type, content }, localOpMetadata); + break; + } default: { unreachableCase(type); } @@ -1485,6 +1882,17 @@ export class FluidDataStoreRuntime await channelContext.getChannel(); return channelContext.applyStashedOp(envelope.contents); } + case DataStoreMessageType.Claim: { + // Replay a stashed claim op locally: re-register the + // pending deferred (it will be resolved once the op is + // sequenced or resubmitted on reconnect). + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const claimMessage = content.content as IClaimMessage; + if (!this.pendingClaims.has(claimMessage.key)) { + this.pendingClaims.set(claimMessage.key, new Deferred()); + } + return; + } default: { unreachableCase(type); } diff --git a/packages/runtime/datastore/src/index.ts b/packages/runtime/datastore/src/index.ts index 881feb45ba89..688d5fa02a0b 100644 --- a/packages/runtime/datastore/src/index.ts +++ b/packages/runtime/datastore/src/index.ts @@ -7,6 +7,7 @@ export { FluidObjectHandle } from "./fluidHandle.js"; export { DataStoreMessageType, FluidDataStoreRuntime, + type IClaimMessage, type ISharedObjectRegistry, type LocalFluidDataStoreRuntimeMessage, mixinRequestHandler, diff --git a/packages/runtime/datastore/src/test/claims.spec.ts b/packages/runtime/datastore/src/test/claims.spec.ts new file mode 100644 index 000000000000..8bc894720962 --- /dev/null +++ b/packages/runtime/datastore/src/test/claims.spec.ts @@ -0,0 +1,466 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; + +import { stringToBuffer } from "@fluid-internal/client-utils"; +import { AttachState } from "@fluidframework/container-definitions"; +import { ContainerErrorTypes } from "@fluidframework/container-definitions/internal"; +import type { IErrorBase, IFluidHandle } from "@fluidframework/core-interfaces"; +import { fluidHandleSymbol } from "@fluidframework/core-interfaces"; +import type { IFluidHandleInternal } from "@fluidframework/core-interfaces/internal"; +import type { + ClaimResult, + IClaimAttempt, +} from "@fluidframework/datastore-definitions/internal"; +import { SummaryType } from "@fluidframework/driver-definitions"; +import type { ISnapshotTree } from "@fluidframework/driver-definitions/internal"; +import type { + IRuntimeMessageCollection, + IRuntimeStorageService, + ISequencedMessageEnvelope, +} from "@fluidframework/runtime-definitions/internal"; +import { + FluidHandleBase, + encodeHandleForSerialization, + toFluidHandleErased, +} from "@fluidframework/runtime-utils/internal"; +import { MockFluidDataStoreContext } from "@fluidframework/test-runtime-utils/internal"; + +import { + DataStoreMessageType, + FluidDataStoreRuntime, + type IClaimMessage, + type ISharedObjectRegistry, + type LocalFluidDataStoreRuntimeMessage, +} from "../dataStoreRuntime.js"; + +const claimsBlobName = ".claims"; + +interface SubmittedOp { + type: string; + content: unknown; +} + +class TestStorage implements IRuntimeStorageService { + public constructor(private readonly blobs: Map) {} + public async readBlob(id: string): Promise { + const b = this.blobs.get(id); + if (b === undefined) { + throw new Error(`Blob ${id} not found`); + } + return b; + } +} + +/** + * Minimal IFluidHandle stub usable in a claim value. It is "attached" so + * binding it is a no-op. + */ +class FakeHandle extends FluidHandleBase implements IFluidHandleInternal { + public readonly isAttached = true; + public constructor(public readonly absolutePath: string) { + super(); + } + public attachGraph(): void { + /* no-op */ + } + public async get(): Promise { + return undefined; + } + public get [fluidHandleSymbol](): never { + return toFluidHandleErased(this) as unknown as never; + } +} + +function makeContext(options?: { + attachState?: AttachState; + baseSnapshot?: ISnapshotTree; + blobs?: Map; +}): { + context: MockFluidDataStoreContext; + submitted: SubmittedOp[]; +} { + const ctx = new MockFluidDataStoreContext(); + ctx.attachState = options?.attachState ?? AttachState.Attached; + ctx.baseSnapshot = options?.baseSnapshot; + ctx.storage = new TestStorage(options?.blobs ?? new Map()); + const submitted: SubmittedOp[] = []; + (ctx as unknown as { submitMessage: (t: string, c: unknown) => void }).submitMessage = ( + type, + content, + ) => { + submitted.push({ type, content }); + }; + (ctx as unknown as { makeLocallyVisible: () => void }).makeLocallyVisible = () => {}; + return { context: ctx, submitted }; +} + +/** + * Yield enough microtasks for trySetClaim's async ensureClaimsLoaded / + * promise chain to submit its op. + */ +async function flush(): Promise { + for (let i = 0; i < 10; i++) { + await Promise.resolve(); + } +} + +const sharedObjectRegistry: ISharedObjectRegistry = { + get: () => undefined, +}; + +function createRuntime( + context: MockFluidDataStoreContext, + policies?: { enableDataStoreClaims?: boolean }, +): FluidDataStoreRuntime { + const effectivePolicies = policies ?? { enableDataStoreClaims: true }; + const runtime: FluidDataStoreRuntime = new FluidDataStoreRuntime( + context, + sharedObjectRegistry, + /* existing */ context.baseSnapshot !== undefined, + async () => runtime, + effectivePolicies, + ); + return runtime; +} + +function makeClaimAck(content: IClaimMessage, local: boolean): IRuntimeMessageCollection { + const envelope = { + type: DataStoreMessageType.Claim, + } as unknown as ISequencedMessageEnvelope; + return { + envelope, + local, + messagesContent: [{ contents: content, clientSequenceNumber: 1, localOpMetadata: {} }], + }; +} + +/** + * Pull the most-recently-submitted claim op off the submitted list. + */ +async function popClaim(submitted: SubmittedOp[]): Promise { + await flush(); + const op = submitted.pop(); + assert(op !== undefined, "Expected a submitted claim op"); + assert.strictEqual(op.type, DataStoreMessageType.Claim, "Expected Claim op type"); + return op.content as IClaimMessage; +} + +/** + * Resolve a claim attempt to its terminal {@link ClaimResult} regardless of + * whether it landed on a synchronous (Success/AlreadyClaimed) or asynchronous + * (Pending) branch of the discriminated union. + */ +async function awaitResult(attempt: IClaimAttempt | undefined): Promise { + assert(attempt !== undefined, "Expected a claim attempt"); + return attempt.status === "Pending" ? attempt.result : attempt.status; +} + +describe("FluidDataStoreRuntime claims", () => { + it("single-client claim resolves to Success and is exposed via getClaim/hasClaim/claims", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + const attempt = runtime.trySetClaim?.("k", "v"); + assert(attempt !== undefined); + // Attached + op not yet sequenced -> immediate status is "Pending". + assert(attempt.status === "Pending"); + // The op was submitted; round-trip it back as a local ack. + const claim = await popClaim(submitted); + runtime.processMessages(makeClaimAck(claim, true)); + assert.strictEqual(await attempt.result, "Success"); + assert.strictEqual(runtime.hasClaim?.("k"), true); + assert.strictEqual(runtime.getClaim?.("k"), "v"); + assert.deepStrictEqual([...(runtime.claims?.entries() ?? [])], [["k", "v"]]); + }); + + it("two-client race: first writer wins; loser observes AlreadyClaimed", async () => { + // Client A + const { context: ctxA, submitted: subA } = makeContext(); + const runtimeA = createRuntime(ctxA); + // Client B + const { context: ctxB, submitted: subB } = makeContext(); + const runtimeB = createRuntime(ctxB); + + const pA = runtimeA.trySetClaim?.("k", "A"); + const pB = runtimeB.trySetClaim?.("k", "B"); + assert(pA !== undefined && pB !== undefined); + assert.strictEqual(pA.status, "Pending"); + assert.strictEqual(pB.status, "Pending"); + + const aOp = await popClaim(subA); + const bOp = await popClaim(subB); + + // Sequencer orders A then B; both clients see both ops. A is local on + // runtimeA, remote on runtimeB; B is local on runtimeB, remote on runtimeA. + runtimeA.processMessages(makeClaimAck(aOp, true)); + runtimeA.processMessages(makeClaimAck(bOp, false)); + runtimeB.processMessages(makeClaimAck(aOp, false)); + runtimeB.processMessages(makeClaimAck(bOp, true)); + + assert.strictEqual(await awaitResult(pA), "Success"); + assert.strictEqual(await awaitResult(pB), "AlreadyClaimed"); + assert.strictEqual(runtimeA.getClaim?.("k"), "A"); + assert.strictEqual(runtimeB.getClaim?.("k"), "A"); + }); + + it("repeated claim from the winner returns Success even with a different value", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + const p1 = runtime.trySetClaim?.("k", "v1"); + const op1 = await popClaim(submitted); + runtime.processMessages(makeClaimAck(op1, true)); + assert.strictEqual(await awaitResult(p1), "Success"); + + // Repeat with different value -> still Success synchronously; the + // terminal-status branch carries no promise to await. + const p2 = runtime.trySetClaim?.("k", "v2"); + assert.strictEqual(p2?.status, "Success"); + assert.strictEqual(runtime.getClaim?.("k"), "v1"); + }); + + it("repeated claim from a loser returns AlreadyClaimed (identity, not value)", async () => { + const { context: ctxA, submitted: subA } = makeContext(); + const runtimeA = createRuntime(ctxA); + const { context: ctxB, submitted: subB } = makeContext(); + const runtimeB = createRuntime(ctxB); + + const pA = runtimeA.trySetClaim?.("k", "shared"); + const pB = runtimeB.trySetClaim?.("k", "shared"); + const aOp = await popClaim(subA); + const bOp = await popClaim(subB); + runtimeA.processMessages(makeClaimAck(aOp, true)); + runtimeA.processMessages(makeClaimAck(bOp, false)); + runtimeB.processMessages(makeClaimAck(aOp, false)); + runtimeB.processMessages(makeClaimAck(bOp, true)); + assert.strictEqual(await awaitResult(pA), "Success"); + assert.strictEqual(await awaitResult(pB), "AlreadyClaimed"); + + // Loser tries again with the same (key, value): still AlreadyClaimed + // synchronously — no op submitted, no promise to await. + const retry = runtimeB.trySetClaim?.("k", "shared"); + assert.strictEqual(retry?.status, "AlreadyClaimed"); + }); + + it("detached set resolves Success synchronously and is persisted via getAttachSummary", () => { + const { context } = makeContext({ attachState: AttachState.Detached }); + const runtime = createRuntime(context); + const attempt = runtime.trySetClaim?.("k", "v"); + assert.strictEqual(attempt?.status, "Success"); + assert.strictEqual(runtime.hasClaim?.("k"), true); + + // Make the runtime locally visible so getAttachSummary works. + runtime.makeVisibleAndAttachGraph(); + const attachSummary = runtime.getAttachSummary(); + const claimsBlob = attachSummary.summary.tree[claimsBlobName]; + assert(claimsBlob !== undefined, "Attach summary must include .claims blob"); + assert.strictEqual(claimsBlob.type, SummaryType.Blob); + }); + + it("after detached set + attach, a remote op for the same key resolves AlreadyClaimed", async () => { + const { context, submitted } = makeContext({ attachState: AttachState.Detached }); + const runtime = createRuntime(context); + assert.strictEqual(runtime.trySetClaim?.("k", "winner")?.status, "Success"); + + // Simulate becoming attached. + runtime.setAttachState(AttachState.Attaching); + runtime.setAttachState(AttachState.Attached); + + // Now a remote claim for the same key arrives -> ignored. + const remoteOp: IClaimMessage = { key: "k", value: "loser" }; + runtime.processMessages(makeClaimAck(remoteOp, false)); + assert.strictEqual(runtime.getClaim?.("k"), "winner"); + assert.strictEqual(submitted.length, 0, "Detached set should not have submitted an op"); + }); + + it("GC data includes outbound routes from handles inside claim values", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + const handle = new FakeHandle("/dataStoreA/channelB") as unknown as IFluidHandle; + const p = runtime.trySetClaim?.("k", { foo: handle }); + const op = await popClaim(submitted); + runtime.processMessages(makeClaimAck(op, true)); + assert.strictEqual(await awaitResult(p), "Success"); + const gcData = await runtime.getGCData(); + assert(gcData.gcNodes["/"] !== undefined); + assert( + gcData.gcNodes["/"].includes("/dataStoreA/channelB"), + `Expected outbound route from claim handle, got ${JSON.stringify(gcData.gcNodes["/"])}`, + ); + }); + + it("reconnect: a pending claim that loses the race resolves to AlreadyClaimed", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + // Start a claim attempt locally. + const pLocal = runtime.trySetClaim?.("k", "local"); + assert.strictEqual(pLocal?.status, "Pending"); + const localOp = await popClaim(submitted); + + // A remote winner is sequenced first. + runtime.processMessages(makeClaimAck({ key: "k", value: "remote" }, false)); + + // Resubmit the local op (e.g. reconnect). The op processor sees the + // key is already claimed and resolves the pending deferred to + // AlreadyClaimed. + runtime.reSubmit( + DataStoreMessageType.Claim, + localOp as unknown as Record, + undefined, + false, + ); + // The resubmit emits the same op back through submit; pop it and ack as local. + const resubmitted = await popClaim(submitted); + runtime.processMessages(makeClaimAck(resubmitted, true)); + + assert.strictEqual(await awaitResult(pLocal), "AlreadyClaimed"); + assert.strictEqual(runtime.getClaim?.("k"), "remote"); + }); + + it("feature flag off: trySetClaim throws a UsageError", () => { + const { context } = makeContext(); + const runtime = createRuntime(context, { enableDataStoreClaims: false }); + assert.throws( + () => runtime.trySetClaim?.("k", "v"), + (e: IErrorBase) => + e.errorType === ContainerErrorTypes.usageError && + e.message.includes("DataStore claims are not enabled"), + ); + }); + + it("rehydrates sequencedClaims from a base snapshot blob", async () => { + // Build a snapshot containing the .claims blob. + const blobId = "claimsBlobId"; + const blobContent = JSON.stringify({ + entries: [ + ["a", "valueA"], + [ + "b", + encodeHandleForSerialization( + new FakeHandle("/x/y") as unknown as IFluidHandleInternal, + ), + ], + ], + }); + const blobs = new Map(); + blobs.set(blobId, stringToBuffer(blobContent, "utf8")); + const baseSnapshot: ISnapshotTree = { + blobs: { [claimsBlobName]: blobId }, + trees: {}, + }; + const { context } = makeContext({ baseSnapshot, blobs }); + const runtime = createRuntime(context); + + // Claim state hasn't been loaded yet -> immediate status is "Pending" + // even though we'd otherwise be a fast-path "AlreadyClaimed" on key "a". + const setP = runtime.trySetClaim?.("c", "valueC"); + assert.strictEqual(setP?.status, "Pending"); + // Allow microtasks to run so the load completes. + await Promise.resolve(); + await Promise.resolve(); + + assert.strictEqual(runtime.hasClaim?.("a"), true); + assert.strictEqual(runtime.getClaim?.("a"), "valueA"); + assert.strictEqual(runtime.hasClaim?.("b"), true); + const decoded = runtime.getClaim?.("b") as { absolutePath: string }; + assert.strictEqual(decoded.absolutePath, "/x/y"); + + // Don't leave the open setP unhandled - just attach a no-op handler so + // any rejection is observed. setP is "Pending" while claim state is + // still loading. + if (setP?.status === "Pending") { + setP.result.catch(() => undefined); + } + }); + + it("staging mode: trySetClaim throws", () => { + const { context } = makeContext(); + (context.containerRuntime as unknown as { inStagingMode: boolean }).inStagingMode = true; + const runtime = createRuntime(context); + assert.throws( + () => runtime.trySetClaim?.("k", "v"), + (e: IErrorBase) => + e.errorType === ContainerErrorTypes.usageError && e.message.includes("staging"), + ); + }); + + it("summary includes the .claims blob with sequenced entries", async () => { + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + const p = runtime.trySetClaim?.("k", "v"); + runtime.processMessages(makeClaimAck(await popClaim(submitted), true)); + await awaitResult(p); + + const summary = await runtime.summarize(true, false); + assert(summary.summary.type === SummaryType.Tree); + const blob = summary.summary.tree[claimsBlobName]; + assert(blob?.type === SummaryType.Blob); + const parsed = JSON.parse( + typeof blob.content === "string" + ? blob.content + : Buffer.from(blob.content).toString("utf8"), + ) as { entries: [string, unknown][] }; + assert.deepStrictEqual(parsed.entries, [["k", "v"]]); + }); + + it("dispose rejects outstanding pending claim result promises", async () => { + const { context } = makeContext(); + const runtime = createRuntime(context); + const p = runtime.trySetClaim?.("k", "v"); + assert(p !== undefined); + assert(p.status === "Pending"); + await flush(); + runtime.dispose(); + await assert.rejects( + async () => p.result, + (e: IErrorBase) => + e.errorType === ContainerErrorTypes.usageError && e.message.includes("disposed"), + ); + }); + + it("offline: trySetClaim returns Pending synchronously and the result promise stays unresolved until the op is sequenced", async () => { + // We can't directly toggle "connected" on the mock context, but the + // shape of the API guarantees that for any attached client whose op + // has not yet been sequenced (the disconnected case included), the + // immediate status is "Pending" and the consumer can immediately + // race or fall back. + const { context, submitted } = makeContext(); + const runtime = createRuntime(context); + + const attempt = runtime.trySetClaim?.("k", "v"); + assert(attempt !== undefined); + assert(attempt.status === "Pending"); + + // Until we round-trip the op, the result promise is unresolved. + // Pre-attach a state-tracking listener so we can verify it isn't + // resolved prematurely. + let resolved = false; + attempt.result.then( + () => { + resolved = true; + }, + () => { + resolved = true; + }, + ); + await flush(); + assert.strictEqual(resolved, false, "result must not resolve before the op is sequenced"); + + // Sequence the op (as if we'd just reconnected and the resubmitter + // drained the outbox). + const claim = await popClaim(submitted); + runtime.processMessages(makeClaimAck(claim, true)); + assert.strictEqual(await attempt.result, "Success"); + }); + + // Reference imported types so they aren't flagged as unused. + const _typeRefs: LocalFluidDataStoreRuntimeMessage[] = []; + if (_typeRefs.length > 0) { + throw new Error("unreachable"); + } +}); diff --git a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts index bc4a808530ff..64e8f68cbd88 100644 --- a/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts +++ b/packages/runtime/datastore/src/test/types/validateDatastorePrevious.generated.ts @@ -88,6 +88,7 @@ declare type old_as_current_for_Enum_DataStoreMessageType = requireAssignableTo< * typeValidation.broken: * "Enum_DataStoreMessageType": {"backCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type current_as_old_for_Enum_DataStoreMessageType = requireAssignableTo, TypeOnly> /* diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index 9f18e360b28c..86fdff8a32ca 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -188,6 +188,7 @@ export interface IFluidDataStoreFactory extends IProvideFluidDataStoreFactory { // @beta @legacy export interface IFluidDataStorePolicies { + readonly enableDataStoreClaims?: boolean; readonly readonlyInStagingMode: boolean; } diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md index 75f077ccfa54..ffa2cc01f803 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.beta.api.md @@ -181,6 +181,7 @@ export interface IFluidDataStoreFactory extends IProvideFluidDataStoreFactory { // @beta @legacy export interface IFluidDataStorePolicies { + readonly enableDataStoreClaims?: boolean; readonly readonlyInStagingMode: boolean; } diff --git a/packages/runtime/runtime-definitions/src/dataStoreContext.ts b/packages/runtime/runtime-definitions/src/dataStoreContext.ts index 4e514b16b38d..4b5ee3ddd184 100644 --- a/packages/runtime/runtime-definitions/src/dataStoreContext.ts +++ b/packages/runtime/runtime-definitions/src/dataStoreContext.ts @@ -377,6 +377,22 @@ export interface IFluidDataStorePolicies { * @see {@link IContainerRuntimeBase.enterStagingMode} */ readonly readonlyInStagingMode: boolean; + + /** + * When set to true, this data store enables the first-writer-wins + * "claims" feature. {@link @fluidframework/datastore-definitions#IFluidDataStoreRuntime.trySetClaim} + * (and the corresponding helpers on `DataObject`) require this flag and + * will throw a `UsageError` if it is not set. + * + * @defaultValue `false` + * + * @remarks + * The new claim op type is silently ignored by older clients that do not + * understand it. Authors enabling this flag should ensure all + * collaborating clients run a runtime that recognizes the op (otherwise + * older clients would not see the claim being set). + */ + readonly enableDataStoreClaims?: boolean; } /**