Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .changeset/claims-dataobject.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
"@fluidframework/datastore-definitions": minor
"@fluidframework/datastore": minor
"@fluidframework/runtime-definitions": minor
"@fluidframework/aqueduct": minor
"__section": feature
---
Add first-writer-wins claims to data store runtime and DataObject

This adds a new "claim" primitive to `IFluidDataStoreRuntime` and `DataObject`
that lets a data store publish first-writer-wins key/value entries. A claim
is conceptually a small piece of immutable per-data-store state where the
first client to write a given key wins; concurrent writers from other clients
observe `"AlreadyClaimed"` and can branch their logic accordingly.

New API surface (all `@legacy` `@beta`):

- `ClaimResult = "Success" | "AlreadyClaimed"` — terminal sequenced
outcome of a claim attempt.
- `IClaimAttempt` — the synchronous return shape of `trySetClaim`. It is
a discriminated union on `status`:
- `{ status: "Success" | "AlreadyClaimed" }` when the outcome is
already known locally (detached, or the key was previously
sequenced). There is nothing to await.
- `{ status: "Pending"; result: Promise<ClaimResult> }` when the
outcome can't be determined locally yet — for example, the client
is attached but disconnected, the op has been submitted but not
yet sequenced, or claim state is still being hydrated from the
base snapshot. The `result` promise resolves to the final
sequenced `ClaimResult`, or rejects if the runtime is disposed
before the attempt is sequenced.
- `IFluidDataStoreRuntime.trySetClaim(key, value): IClaimAttempt`,
`getClaim(key)`, `hasClaim(key)`, and `claims` (a read-only iterator).
- `IFluidDataStorePolicies.enableDataStoreClaims` opt-in flag (defaults to
off; set to `true` on the data store runtime to enable the API).
- `DataObject.trySetClaim`, `getClaim`, `hasClaim` convenience helpers that
forward to the runtime.

Claim values may contain `IFluidHandle` instances; these are encoded the same
way as handles in summary blobs and contribute outbound routes to garbage
collection. Claims are persisted via a `.claims` summary blob on the data
store and rehydrated on subsequent loads.

Use a claim (rather than writing to `DataObject.root`) when you specifically
need first-writer-wins semantics — for example, when multiple clients race
to designate themselves as the owner of a particular role within the data
store and only one should succeed.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ export interface ContainerRuntimeFactoryWithDefaultDataStoreProps {

// @beta @legacy
export abstract class DataObject<I extends DataObjectTypes = DataObjectTypes> extends PureDataObject<I> {
protected getClaim(key: string): unknown;
protected getUninitializedErrorString(item: string): string;
protected hasClaim(key: string): boolean;
initializeInternal(existing: boolean): Promise<void>;
protected get root(): ISharedDirectory;
protected trySetClaim(key: string, value: unknown): IClaimAttempt;
}

// @beta @legacy
Expand Down
68 changes: 68 additions & 0 deletions packages/framework/aqueduct/src/data-objects/dataObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
* Licensed under the MIT License.
*/

import type {
ClaimResult,
IClaimAttempt,
} from "@fluidframework/datastore-definitions/internal";
import {
type ISharedDirectory,
MapFactory,
SharedDirectory,
} from "@fluidframework/map/internal";
import { UsageError } from "@fluidframework/telemetry-utils/internal";

import { PureDataObject } from "./pureDataObject.js";
import type { DataObjectTypes } from "./types.js";
Expand All @@ -20,6 +25,21 @@ import type { DataObjectTypes } from "./types.js";
* and registering channels with the runtime any new DDS that is set on the root
* will automatically be registered.
*
* @remarks
* In addition to the {@link DataObject.root | root directory}, `DataObject`
* also exposes a first-writer-wins **claims** API:
* {@link DataObject.trySetClaim}, {@link DataObject.getClaim}, and
* {@link DataObject.hasClaim}. Use a claim instead of `root.set` when you
* need to wire up a singleton entry (typically a handle to a child DDS or
* data store) and want first-writer-wins semantics — once a claim is
* sequenced it can never be overwritten by another client. By contrast,
* `root.set` (and other DDS writes) use last-writer-wins semantics, so two
* clients racing to populate the same key will silently overwrite each
* other.
*
* Claims require the underlying data store runtime to have the
* `enableDataStoreClaims` policy enabled.
*
* @typeParam I - The optional input types used to strongly type the data object
* @legacy
* @beta
Expand All @@ -42,6 +62,54 @@ export abstract class DataObject<
return this.internalRoot;
}

/**
* Attempts to set a first-writer-wins claim on this data store.
*
* Returns synchronously with an {@link IClaimAttempt} describing the
* immediate state. Its {@link IClaimAttempt.status} is `"Success"` or
* `"AlreadyClaimed"` when the outcome is already known locally
* (detached, or the key was previously sequenced), and `"Pending"`
* otherwise (e.g. while disconnected, while the op is in flight, or
* while claim state is still being loaded from the base snapshot).
* Callers can branch on the status immediately for race / fallback
* logic and await {@link IClaimAttempt.result} to observe the final
* sequenced outcome.
*
* See {@link DataObject} class remarks for when to use a claim vs.
* writing to {@link DataObject.root | root}.
*
* @param key - The claim key (non-empty string).
* @param value - The JSON-serializable value to claim. May contain
* {@link @fluidframework/core-interfaces#IFluidHandle} instances; these
* are encoded the same way as handles in summary blobs and contribute
* outbound routes to garbage collection.
* @returns An {@link IClaimAttempt} carrying the immediate status and
* a promise for the eventual sequenced {@link ClaimResult}.
*/
protected trySetClaim(key: string, value: unknown): IClaimAttempt {
if (this.runtime.trySetClaim === undefined) {
throw new UsageError(
"The data store runtime does not support claims. Enable the `enableDataStoreClaims` policy on the data store runtime to use this API.",
);
}
return this.runtime.trySetClaim(key, value);
}

/**
* Returns the value of a previously-claimed key, or `undefined` if the
* key has not been claimed. Embedded handles are decoded.
*/
protected getClaim(key: string): unknown {
return this.runtime.getClaim?.(key);
}

/**
* Returns `true` if a claim has been sequenced for the given key.
*/
protected hasClaim(key: string): boolean {
return this.runtime.hasClaim?.(key) ?? false;
}

/**
* Initializes internal objects and calls initialization overrides.
* Caller is responsible for ensuring this is only invoked once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

```ts

// @beta @legacy
export type ClaimResult = "Success" | "AlreadyClaimed";

// @beta @legacy
export interface IChannel extends IFluidLoadable {
// (undocumented)
Expand Down Expand Up @@ -47,6 +50,14 @@ export interface IChannelStorageService {
readBlob(path: string): Promise<ArrayBufferLike>;
}

// @beta @legacy
export type IClaimAttempt = {
readonly status: "Success" | "AlreadyClaimed";
} | {
readonly status: "Pending";
readonly result: Promise<ClaimResult>;
};

// @beta @legacy
export interface IDeltaConnection {
attach(handler: IDeltaHandler): void;
Expand Down Expand Up @@ -76,6 +87,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
bindChannel(channel: IChannel): void;
// (undocumented)
readonly channelsRoutingContext: IFluidHandleContext;
readonly claims?: ReadonlyMap<string, unknown>;
// (undocumented)
readonly clientId: string | undefined;
// (undocumented)
Expand All @@ -86,7 +98,9 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
readonly entryPoint: IFluidHandle<FluidObject>;
getAudience(): IAudience;
getChannel(id: string): Promise<IChannel>;
getClaim?(key: string): unknown;
getQuorum(): IQuorumClients;
hasClaim?(key: string): boolean;
// (undocumented)
readonly id: string;
readonly idCompressor: IIdCompressor | undefined;
Expand All @@ -104,6 +118,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
// (undocumented)
readonly rootRoutingContext: IFluidHandleContext;
submitSignal: (type: string, content: unknown, targetClientId?: string) => void;
trySetClaim?(key: string, value: unknown): IClaimAttempt;
uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise<IFluidHandle<ArrayBufferLike>>;
waitAttached(): Promise<void>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

```ts

// @beta @legacy
export type ClaimResult = "Success" | "AlreadyClaimed";

// @beta @legacy
export interface IChannel extends IFluidLoadable {
// (undocumented)
Expand Down Expand Up @@ -47,6 +50,14 @@ export interface IChannelStorageService {
readBlob(path: string): Promise<ArrayBufferLike>;
}

// @beta @legacy
export type IClaimAttempt = {
readonly status: "Success" | "AlreadyClaimed";
} | {
readonly status: "Pending";
readonly result: Promise<ClaimResult>;
};

// @beta @legacy
export interface IDeltaConnection {
attach(handler: IDeltaHandler): void;
Expand Down Expand Up @@ -76,6 +87,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
bindChannel(channel: IChannel): void;
// (undocumented)
readonly channelsRoutingContext: IFluidHandleContext;
readonly claims?: ReadonlyMap<string, unknown>;
// (undocumented)
readonly clientId: string | undefined;
// (undocumented)
Expand All @@ -86,7 +98,9 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
readonly entryPoint: IFluidHandle<FluidObject>;
getAudience(): IAudience;
getChannel(id: string): Promise<IChannel>;
getClaim?(key: string): unknown;
getQuorum(): IQuorumClients;
hasClaim?(key: string): boolean;
// (undocumented)
readonly id: string;
readonly idCompressor: IIdCompressor | undefined;
Expand All @@ -104,6 +118,7 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
// (undocumented)
readonly rootRoutingContext: IFluidHandleContext;
submitSignal: (type: string, content: unknown, targetClientId?: string) => void;
trySetClaim?(key: string, value: unknown): IClaimAttempt;
uploadBlob(blob: ArrayBufferLike, signal?: AbortSignal): Promise<IFluidHandle<ArrayBufferLike>>;
waitAttached(): Promise<void>;
}
Expand Down
112 changes: 112 additions & 0 deletions packages/runtime/datastore-definitions/src/dataStoreRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,52 @@ export interface IFluidDataStoreRuntimeEvents extends IEvent {
export type IDeltaManagerErased =
ErasedType<"@fluidframework/container-definitions.IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>">;

/**
* Final, sequenced outcome of a claim attempt.
*
* - `"Success"` - this client owns the claim for the given key.
* - `"AlreadyClaimed"` - another client has already claimed the key; the
* existing value is unchanged. Claims are first-writer-wins and are
* immutable for the lifetime of the document.
* @legacy @beta
*/
export type ClaimResult = "Success" | "AlreadyClaimed";

/**
* The synchronous handle returned by
* {@link IFluidDataStoreRuntime.trySetClaim}.
*
* The shape is a discriminated union on {@link IClaimAttempt.status}:
*
* - When `status` is `"Success"` or `"AlreadyClaimed"`, the outcome is
* already known locally (detached, or the key was previously
* sequenced); no further work is required.
* - When `status` is `"Pending"`, the outcome cannot be determined
* locally yet — for example, the client is attached but disconnected,
* the op has been submitted but not yet sequenced, or claim state is
* still being hydrated from the base snapshot. In that case,
* {@link IClaimAttempt.result} resolves to the eventual sequenced
* {@link ClaimResult}, or rejects if the runtime is disposed before the
* attempt is sequenced.
*
* Callers can branch on `status` synchronously for race / fallback
* logic without ever creating a promise on the terminal paths.
* @legacy @beta
*/
export type IClaimAttempt =
| {
readonly status: "Success" | "AlreadyClaimed";
}
| {
readonly status: "Pending";
/**
* Resolves to the final sequenced {@link ClaimResult} once the
* op (this client's or another's) is sequenced. Rejects if the
* runtime is disposed before the attempt is sequenced.
*/
readonly result: Promise<ClaimResult>;
};

/**
* Represents the runtime for the data store. Contains helper functions/state of the data store.
* @sealed
Expand Down Expand Up @@ -203,6 +249,72 @@ export interface IFluidDataStoreRuntime
* Indicates whether the data store has uncommitted local changes.
*/
readonly isDirty: boolean;

/**
* Attempt to set a first-writer-wins "claim" for the given key on this data
* store. Once a claim has been sequenced for a key, no other client can
* overwrite it for the lifetime of the document.
*
* @remarks
* Claims are intended for partner scenarios that need to wire up singleton
* components (typically a handle to a child DDS) with first-writer-wins
* semantics, instead of the last-writer-wins semantics provided by writing
* to a DDS such as `SharedDirectory`.
*
* The value is JSON-serializable. {@link @fluidframework/core-interfaces#IFluidHandle}
* instances embedded in the value are encoded the same way as handles in
* summary blobs and contribute to garbage-collection routes from this data
* store.
*
* Returns synchronously with an {@link IClaimAttempt} describing the
* outcome. When the outcome is known locally — the key was already
* sequenced, or the data store is detached — `status` is `"Success"`
* or `"AlreadyClaimed"` and there is nothing to await. Otherwise
* `status` is `"Pending"` and {@link IClaimAttempt.result} resolves
* to the eventual sequenced {@link ClaimResult} (`"Success"` for the
* client whose op is sequenced first for the key, and
* `"AlreadyClaimed"` for every other client).
*
* Local ops are automatically resubmitted by the runtime across
* reconnects, so the result promise will eventually resolve once the
* client reconnects — unless the runtime is disposed first, in which
* case the result promise rejects.
*
* Optional. Implementations that do not support claims will not provide
* this method.
*
* @param key - The claim key.
* @param value - The claim value (JSON-serializable; may include handles).
* @returns An {@link IClaimAttempt} discriminated on `status`.
*/
trySetClaim?(key: string, value: unknown): IClaimAttempt;

/**
* Returns the value of a previously-claimed key, with embedded handles
* decoded. Returns `undefined` if the key has not (yet) been claimed.
*
* Optional. Implementations that do not support claims will not provide
* this method.
*/
getClaim?(key: string): unknown;

/**
* Returns `true` if the given key has been sequenced as a claim on this
* data store.
*
* Optional. Implementations that do not support claims will not provide
* this method.
*/
hasClaim?(key: string): boolean;

/**
* Read-only view of all sequenced claims on this data store, with embedded
* handles decoded.
*
* Optional. Implementations that do not support claims will not provide
* this property.
*/
readonly claims?: ReadonlyMap<string, unknown>;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/runtime/datastore-definitions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export type {
IFluidDataStoreRuntimeEvents,
IFluidDataStoreRuntimeInternalConfig,
IDeltaManagerErased,
ClaimResult,
IClaimAttempt,
} from "./dataStoreRuntime.js";
export type {
Jsonable,
Expand Down
Loading
Loading