diff --git a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts index 236a7edf..aa378170 100644 --- a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts +++ b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts @@ -49,6 +49,8 @@ export namespace CreateReq { setContinuous(value?: CreateReq.Options.Continuous): Options; getQuery(): string; setQuery(value: string): Options; + getEngineVersion(): number; + setEngineVersion(value: number): Options; getModeCase(): Options.ModeCase; @@ -68,6 +70,7 @@ export namespace CreateReq { pb_transient?: CreateReq.Options.Transient.AsObject, continuous?: CreateReq.Options.Continuous.AsObject, query: string, + engineVersion: number, } diff --git a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js index d249709b..f8fdfbf2 100644 --- a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js +++ b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js @@ -864,7 +864,8 @@ proto.event_store.client.projections.CreateReq.Options.toObject = function(inclu oneTime: (f = msg.getOneTime()) && kurrentdb_protocols_v1_shared_pb.Empty.toObject(includeInstance, f), pb_transient: (f = msg.getTransient()) && proto.event_store.client.projections.CreateReq.Options.Transient.toObject(includeInstance, f), continuous: (f = msg.getContinuous()) && proto.event_store.client.projections.CreateReq.Options.Continuous.toObject(includeInstance, f), - query: jspb.Message.getFieldWithDefault(msg, 4, "") + query: jspb.Message.getFieldWithDefault(msg, 4, ""), + engineVersion: jspb.Message.getFieldWithDefault(msg, 5, 0) }; if (includeInstance) { @@ -920,6 +921,10 @@ proto.event_store.client.projections.CreateReq.Options.deserializeBinaryFromRead var value = /** @type {string} */ (reader.readString()); msg.setQuery(value); break; + case 5: + var value = /** @type {number} */ (reader.readInt32()); + msg.setEngineVersion(value); + break; default: reader.skipField(); break; @@ -980,6 +985,13 @@ proto.event_store.client.projections.CreateReq.Options.serializeBinaryToWriter = f ); } + f = message.getEngineVersion(); + if (f !== 0) { + writer.writeInt32( + 5, + f + ); + } }; @@ -1432,6 +1444,24 @@ proto.event_store.client.projections.CreateReq.Options.prototype.setQuery = func }; +/** + * optional int32 engine_version = 5; + * @return {number} + */ +proto.event_store.client.projections.CreateReq.Options.prototype.getEngineVersion = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 5, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.event_store.client.projections.CreateReq.Options} returns this + */ +proto.event_store.client.projections.CreateReq.Options.prototype.setEngineVersion = function(value) { + return jspb.Message.setProto3IntField(this, 5, value); +}; + + /** * optional Options options = 1; * @return {?proto.event_store.client.projections.CreateReq.Options} diff --git a/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto b/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto index 08fe1627..dbabd644 100644 --- a/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto +++ b/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto @@ -28,6 +28,7 @@ message CreateReq { Continuous continuous = 3; } string query = 4; + int32 engine_version = 5; // 0 or 1 = v1 (default), 2 = v2 message Transient { option deprecated = true; diff --git a/packages/db-client/src/constants.ts b/packages/db-client/src/constants.ts index 7c2f75b6..104f7994 100644 --- a/packages/db-client/src/constants.ts +++ b/packages/db-client/src/constants.ts @@ -29,6 +29,10 @@ export const RETRY = "retry"; export const SKIP = "skip"; export const STOP = "stop"; +// projection engine version +export const PROJECTION_ENGINE_V1 = "v1"; +export const PROJECTION_ENGINE_V2 = "v2"; + // projection status export const CREATING = "Creating"; export const LOADING = "Loading"; diff --git a/packages/db-client/src/projections/createProjection.ts b/packages/db-client/src/projections/createProjection.ts index db1f3483..7ae8c53c 100644 --- a/packages/db-client/src/projections/createProjection.ts +++ b/packages/db-client/src/projections/createProjection.ts @@ -5,9 +5,15 @@ import { import { CreateReq } from "../../generated/kurrentdb/protocols/v1/projectionmanagement_pb"; import { Client } from "../Client"; -import type { BaseOptions } from "../types"; +import { PROJECTION_ENGINE_V1, PROJECTION_ENGINE_V2 } from "../constants"; +import type { BaseOptions, ProjectionEngineVersion } from "../types"; import { debug, convertToCommandError } from "../utils"; +const ENGINE_VERSION_WIRE: Record = { + [PROJECTION_ENGINE_V1]: 1, + [PROJECTION_ENGINE_V2]: 2, +}; + export interface CreateProjectionOptions extends BaseOptions { /** * Enables emitting from the projection. @@ -19,6 +25,15 @@ export interface CreateProjectionOptions extends BaseOptions { * @defaultValue false */ trackEmittedStreams?: boolean; + /** + * Selects the projection engine version. Pinned at create time and + * cannot be changed later. V2 is opt-in and does not support + * `trackEmittedStreams`, bi-state projections, or live `outputState` + * result streams. See the KurrentDB documentation for the full list of + * limitations before choosing V2. + * @defaultValue {@link PROJECTION_ENGINE_V1} + */ + engineVersion?: ProjectionEngineVersion; } declare module "../Client" { @@ -66,6 +81,7 @@ const createProjectionGRPC = async function ( { emitEnabled = false, trackEmittedStreams = false, + engineVersion = PROJECTION_ENGINE_V1, ...baseOptions }: CreateProjectionOptions = {} ): Promise { @@ -79,6 +95,9 @@ const createProjectionGRPC = async function ( options.setContinuous(continuous); options.setQuery(query); + if (engineVersion === PROJECTION_ENGINE_V2) { + options.setEngineVersion(ENGINE_VERSION_WIRE[engineVersion]); + } req.setOptions(options); @@ -104,6 +123,7 @@ const createProjectionHTTP = async function ( { emitEnabled = false, trackEmittedStreams = false, + engineVersion = PROJECTION_ENGINE_V1, ...baseOptions }: CreateProjectionOptions = {} ) { @@ -116,6 +136,9 @@ const createProjectionHTTP = async function ( name: projectionName, emit: emitEnabled.toString(), trackemittedstreams: trackEmittedStreams.toString(), + ...(engineVersion === PROJECTION_ENGINE_V2 && { + engineversion: ENGINE_VERSION_WIRE[engineVersion].toString(), + }), }, }, query diff --git a/packages/db-client/src/types/index.ts b/packages/db-client/src/types/index.ts index a9696d6c..3e2ba3d4 100644 --- a/packages/db-client/src/types/index.ts +++ b/packages/db-client/src/types/index.ts @@ -262,6 +262,10 @@ export type NodePreference = | typeof constants.LEADER | typeof constants.READ_ONLY_REPLICA; +export type ProjectionEngineVersion = + | typeof constants.PROJECTION_ENGINE_V1 + | typeof constants.PROJECTION_ENGINE_V2; + export interface EndPoint { address: string; port: number; diff --git a/packages/test/src/projections/createProjection.test.ts b/packages/test/src/projections/createProjection.test.ts index f2817bf6..983acab1 100644 --- a/packages/test/src/projections/createProjection.test.ts +++ b/packages/test/src/projections/createProjection.test.ts @@ -3,6 +3,7 @@ import { collect, createTestNode, delay } from "@test-utils"; import { KurrentDBClient, jsonEvent, + PROJECTION_ENGINE_V2, StreamNotFoundError, } from "@kurrent/kurrentdb-client"; @@ -118,4 +119,23 @@ describe("createProjection", () => { expect(emittedStream).toBeDefined(); expect(emittedStream.event?.type).toBe("$StreamTracked"); }); + + test("v2 engine", async () => { + const PROJECTION_NAME = "v2_engine"; + + await expect( + client.createProjection( + PROJECTION_NAME, + ` + fromAll() + .when({ + $init: function (state, ev) { + return {}; + } + }); + `, + { engineVersion: PROJECTION_ENGINE_V2 } + ) + ).resolves.toBeUndefined(); + }); });