From df651f891b2aee1e90b47e057714e8c77cddbfc1 Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 5 May 2026 12:59:43 +0400 Subject: [PATCH 1/3] feat: support V2 projection engine on createProjection Adds ProjectionEngineVersion enum and engineVersion option to createProjection, plumbed through both gRPC (CreateReq.Options.engine_version) and the HTTP fallback (engineversion query param). --- .../protocols/v1/projectionmanagement_pb.d.ts | 3 ++ .../protocols/v1/projectionmanagement_pb.js | 32 ++++++++++++++++++- .../protocols/v1/projectionmanagement.proto | 1 + .../src/projections/createProjection.ts | 28 ++++++++++++++++ .../src/projections/createProjection.test.ts | 20 ++++++++++++ 5 files changed, 83 insertions(+), 1 deletion(-) diff --git a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts index 236a7edf..aa378170 100644 --- a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts +++ b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.d.ts @@ -49,6 +49,8 @@ export namespace CreateReq { setContinuous(value?: CreateReq.Options.Continuous): Options; getQuery(): string; setQuery(value: string): Options; + getEngineVersion(): number; + setEngineVersion(value: number): Options; getModeCase(): Options.ModeCase; @@ -68,6 +70,7 @@ export namespace CreateReq { pb_transient?: CreateReq.Options.Transient.AsObject, continuous?: CreateReq.Options.Continuous.AsObject, query: string, + engineVersion: number, } diff --git a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js index d249709b..f8fdfbf2 100644 --- a/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js +++ b/packages/db-client/generated/kurrentdb/protocols/v1/projectionmanagement_pb.js @@ -864,7 +864,8 @@ proto.event_store.client.projections.CreateReq.Options.toObject = function(inclu oneTime: (f = msg.getOneTime()) && kurrentdb_protocols_v1_shared_pb.Empty.toObject(includeInstance, f), pb_transient: (f = msg.getTransient()) && proto.event_store.client.projections.CreateReq.Options.Transient.toObject(includeInstance, f), continuous: (f = msg.getContinuous()) && proto.event_store.client.projections.CreateReq.Options.Continuous.toObject(includeInstance, f), - query: jspb.Message.getFieldWithDefault(msg, 4, "") + query: jspb.Message.getFieldWithDefault(msg, 4, ""), + engineVersion: jspb.Message.getFieldWithDefault(msg, 5, 0) }; if (includeInstance) { @@ -920,6 +921,10 @@ proto.event_store.client.projections.CreateReq.Options.deserializeBinaryFromRead var value = /** @type {string} */ (reader.readString()); msg.setQuery(value); break; + case 5: + var value = /** @type {number} */ (reader.readInt32()); + msg.setEngineVersion(value); + break; default: reader.skipField(); break; @@ -980,6 +985,13 @@ proto.event_store.client.projections.CreateReq.Options.serializeBinaryToWriter = f ); } + f = message.getEngineVersion(); + if (f !== 0) { + writer.writeInt32( + 5, + f + ); + } }; @@ -1432,6 +1444,24 @@ proto.event_store.client.projections.CreateReq.Options.prototype.setQuery = func }; +/** + * optional int32 engine_version = 5; + * @return {number} + */ +proto.event_store.client.projections.CreateReq.Options.prototype.getEngineVersion = function() { + return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 5, 0)); +}; + + +/** + * @param {number} value + * @return {!proto.event_store.client.projections.CreateReq.Options} returns this + */ +proto.event_store.client.projections.CreateReq.Options.prototype.setEngineVersion = function(value) { + return jspb.Message.setProto3IntField(this, 5, value); +}; + + /** * optional Options options = 1; * @return {?proto.event_store.client.projections.CreateReq.Options} diff --git a/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto b/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto index 08fe1627..dbabd644 100644 --- a/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto +++ b/packages/db-client/protos/kurrentdb/protocols/v1/projectionmanagement.proto @@ -28,6 +28,7 @@ message CreateReq { Continuous continuous = 3; } string query = 4; + int32 engine_version = 5; // 0 or 1 = v1 (default), 2 = v2 message Transient { option deprecated = true; diff --git a/packages/db-client/src/projections/createProjection.ts b/packages/db-client/src/projections/createProjection.ts index db1f3483..a9d28c6e 100644 --- a/packages/db-client/src/projections/createProjection.ts +++ b/packages/db-client/src/projections/createProjection.ts @@ -8,6 +8,24 @@ import { Client } from "../Client"; import type { BaseOptions } from "../types"; import { debug, convertToCommandError } from "../utils"; +/** + * The projection engine version used to execute a projection. + * + * The engine version is pinned at create time and cannot be changed later. + */ +export enum ProjectionEngineVersion { + /** The original projection engine. This is the default. */ + V1 = 1, + /** + * The next-generation projection engine that processes partitions in + * parallel. V2 is opt-in and does not support `trackEmittedStreams`, + * bi-state projections, or live `outputState` result streams. See the + * KurrentDB documentation for the full list of limitations before + * choosing V2. + */ + V2 = 2, +} + export interface CreateProjectionOptions extends BaseOptions { /** * Enables emitting from the projection. @@ -19,6 +37,12 @@ export interface CreateProjectionOptions extends BaseOptions { * @defaultValue false */ trackEmittedStreams?: boolean; + /** + * Selects the projection engine version. Pinned at create time and + * cannot be changed later. + * @defaultValue {@link ProjectionEngineVersion.V1} + */ + engineVersion?: ProjectionEngineVersion; } declare module "../Client" { @@ -66,6 +90,7 @@ const createProjectionGRPC = async function ( { emitEnabled = false, trackEmittedStreams = false, + engineVersion = ProjectionEngineVersion.V1, ...baseOptions }: CreateProjectionOptions = {} ): Promise { @@ -79,6 +104,7 @@ const createProjectionGRPC = async function ( options.setContinuous(continuous); options.setQuery(query); + options.setEngineVersion(engineVersion); req.setOptions(options); @@ -104,6 +130,7 @@ const createProjectionHTTP = async function ( { emitEnabled = false, trackEmittedStreams = false, + engineVersion = ProjectionEngineVersion.V1, ...baseOptions }: CreateProjectionOptions = {} ) { @@ -116,6 +143,7 @@ const createProjectionHTTP = async function ( name: projectionName, emit: emitEnabled.toString(), trackemittedstreams: trackEmittedStreams.toString(), + engineversion: engineVersion.toString(), }, }, query diff --git a/packages/test/src/projections/createProjection.test.ts b/packages/test/src/projections/createProjection.test.ts index f2817bf6..5a80bfda 100644 --- a/packages/test/src/projections/createProjection.test.ts +++ b/packages/test/src/projections/createProjection.test.ts @@ -3,6 +3,7 @@ import { collect, createTestNode, delay } from "@test-utils"; import { KurrentDBClient, jsonEvent, + ProjectionEngineVersion, StreamNotFoundError, } from "@kurrent/kurrentdb-client"; @@ -118,4 +119,23 @@ describe("createProjection", () => { expect(emittedStream).toBeDefined(); expect(emittedStream.event?.type).toBe("$StreamTracked"); }); + + test("v2 engine", async () => { + const PROJECTION_NAME = "v2_engine"; + + await expect( + client.createProjection( + PROJECTION_NAME, + ` + fromAll() + .when({ + $init: function (state, ev) { + return {}; + } + }); + `, + { engineVersion: ProjectionEngineVersion.V2 } + ) + ).resolves.toBeUndefined(); + }); }); From 44e8d5d311c924b789b906b41fe140910c54f4e1 Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 5 May 2026 13:36:35 +0400 Subject: [PATCH 2/3] refactor: decouple ProjectionEngineVersion from wire encoding Switch the public enum to string values (V1="v1", V2="v2") and map to the int32 wire format at the gRPC/HTTP boundary. Resolve option defaults once at the entry instead of duplicating them in each helper. --- .../src/projections/createProjection.ts | 61 +++++++++++++------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/packages/db-client/src/projections/createProjection.ts b/packages/db-client/src/projections/createProjection.ts index a9d28c6e..b8891512 100644 --- a/packages/db-client/src/projections/createProjection.ts +++ b/packages/db-client/src/projections/createProjection.ts @@ -15,7 +15,7 @@ import { debug, convertToCommandError } from "../utils"; */ export enum ProjectionEngineVersion { /** The original projection engine. This is the default. */ - V1 = 1, + V1 = "v1", /** * The next-generation projection engine that processes partitions in * parallel. V2 is opt-in and does not support `trackEmittedStreams`, @@ -23,9 +23,14 @@ export enum ProjectionEngineVersion { * KurrentDB documentation for the full list of limitations before * choosing V2. */ - V2 = 2, + V2 = "v2", } +const ENGINE_VERSION_WIRE: Record = { + [ProjectionEngineVersion.V1]: 1, + [ProjectionEngineVersion.V2]: 2, +}; + export interface CreateProjectionOptions extends BaseOptions { /** * Enables emitting from the projection. @@ -45,6 +50,12 @@ export interface CreateProjectionOptions extends BaseOptions { engineVersion?: ProjectionEngineVersion; } +interface ResolvedCreateProjectionOptions extends BaseOptions { + emitEnabled: boolean; + trackEmittedStreams: boolean; + engineVersion: ProjectionEngineVersion; +} + declare module "../Client" { interface Client { /** @@ -65,22 +76,34 @@ Client.prototype.createProjection = async function ( this: Client, projectionName: string, query: string, - options: CreateProjectionOptions = {} + { + emitEnabled = false, + trackEmittedStreams = false, + engineVersion = ProjectionEngineVersion.V1, + ...baseOptions + }: CreateProjectionOptions = {} ): Promise { + const resolved: ResolvedCreateProjectionOptions = { + emitEnabled, + trackEmittedStreams, + engineVersion, + ...baseOptions, + }; + debug.command("createProjection: %O", { projectionName, query, - options, + options: resolved, }); if ( - options.trackEmittedStreams && + trackEmittedStreams && !(await this.supports(ProjectionsService.create, "track_emitted_streams")) ) { - return createProjectionHTTP.call(this, projectionName, query, options); + return createProjectionHTTP.call(this, projectionName, query, resolved); } - return createProjectionGRPC.call(this, projectionName, query, options); + return createProjectionGRPC.call(this, projectionName, query, resolved); }; const createProjectionGRPC = async function ( @@ -88,11 +111,11 @@ const createProjectionGRPC = async function ( projectionName: string, query: string, { - emitEnabled = false, - trackEmittedStreams = false, - engineVersion = ProjectionEngineVersion.V1, + emitEnabled, + trackEmittedStreams, + engineVersion, ...baseOptions - }: CreateProjectionOptions = {} + }: ResolvedCreateProjectionOptions ): Promise { const req = new CreateReq(); const options = new CreateReq.Options(); @@ -104,7 +127,7 @@ const createProjectionGRPC = async function ( options.setContinuous(continuous); options.setQuery(query); - options.setEngineVersion(engineVersion); + options.setEngineVersion(ENGINE_VERSION_WIRE[engineVersion]); req.setOptions(options); @@ -128,11 +151,11 @@ const createProjectionHTTP = async function ( projectionName: string, query: string, { - emitEnabled = false, - trackEmittedStreams = false, - engineVersion = ProjectionEngineVersion.V1, + emitEnabled, + trackEmittedStreams, + engineVersion, ...baseOptions - }: CreateProjectionOptions = {} + }: ResolvedCreateProjectionOptions ) { await this.HTTPRequest( "POST", @@ -141,9 +164,9 @@ const createProjectionHTTP = async function ( ...baseOptions, searchParams: { name: projectionName, - emit: emitEnabled.toString(), - trackemittedstreams: trackEmittedStreams.toString(), - engineversion: engineVersion.toString(), + emit: String(emitEnabled), + trackemittedstreams: String(trackEmittedStreams), + engineversion: String(ENGINE_VERSION_WIRE[engineVersion]), }, }, query From 8bde4881d8200d8fedd26254fa6a1d8148cab51a Mon Sep 17 00:00:00 2001 From: William Chong Date: Tue, 5 May 2026 13:42:23 +0400 Subject: [PATCH 3/3] refactor: align ProjectionEngineVersion with codebase conventions Replace the local TypeScript enum with constants in src/constants.ts (PROJECTION_ENGINE_V1 / PROJECTION_ENGINE_V2) plus a union type in src/types, matching the pattern used by NodePreference, ConsumerStrategy, and other public string-valued enums. --- packages/db-client/src/constants.ts | 4 + .../src/projections/createProjection.ts | 88 +++++++------------ packages/db-client/src/types/index.ts | 4 + .../src/projections/createProjection.test.ts | 4 +- 4 files changed, 40 insertions(+), 60 deletions(-) diff --git a/packages/db-client/src/constants.ts b/packages/db-client/src/constants.ts index 7c2f75b6..104f7994 100644 --- a/packages/db-client/src/constants.ts +++ b/packages/db-client/src/constants.ts @@ -29,6 +29,10 @@ export const RETRY = "retry"; export const SKIP = "skip"; export const STOP = "stop"; +// projection engine version +export const PROJECTION_ENGINE_V1 = "v1"; +export const PROJECTION_ENGINE_V2 = "v2"; + // projection status export const CREATING = "Creating"; export const LOADING = "Loading"; diff --git a/packages/db-client/src/projections/createProjection.ts b/packages/db-client/src/projections/createProjection.ts index b8891512..7ae8c53c 100644 --- a/packages/db-client/src/projections/createProjection.ts +++ b/packages/db-client/src/projections/createProjection.ts @@ -5,30 +5,13 @@ import { import { CreateReq } from "../../generated/kurrentdb/protocols/v1/projectionmanagement_pb"; import { Client } from "../Client"; -import type { BaseOptions } from "../types"; +import { PROJECTION_ENGINE_V1, PROJECTION_ENGINE_V2 } from "../constants"; +import type { BaseOptions, ProjectionEngineVersion } from "../types"; import { debug, convertToCommandError } from "../utils"; -/** - * The projection engine version used to execute a projection. - * - * The engine version is pinned at create time and cannot be changed later. - */ -export enum ProjectionEngineVersion { - /** The original projection engine. This is the default. */ - V1 = "v1", - /** - * The next-generation projection engine that processes partitions in - * parallel. V2 is opt-in and does not support `trackEmittedStreams`, - * bi-state projections, or live `outputState` result streams. See the - * KurrentDB documentation for the full list of limitations before - * choosing V2. - */ - V2 = "v2", -} - const ENGINE_VERSION_WIRE: Record = { - [ProjectionEngineVersion.V1]: 1, - [ProjectionEngineVersion.V2]: 2, + [PROJECTION_ENGINE_V1]: 1, + [PROJECTION_ENGINE_V2]: 2, }; export interface CreateProjectionOptions extends BaseOptions { @@ -44,18 +27,15 @@ export interface CreateProjectionOptions extends BaseOptions { trackEmittedStreams?: boolean; /** * Selects the projection engine version. Pinned at create time and - * cannot be changed later. - * @defaultValue {@link ProjectionEngineVersion.V1} + * cannot be changed later. V2 is opt-in and does not support + * `trackEmittedStreams`, bi-state projections, or live `outputState` + * result streams. See the KurrentDB documentation for the full list of + * limitations before choosing V2. + * @defaultValue {@link PROJECTION_ENGINE_V1} */ engineVersion?: ProjectionEngineVersion; } -interface ResolvedCreateProjectionOptions extends BaseOptions { - emitEnabled: boolean; - trackEmittedStreams: boolean; - engineVersion: ProjectionEngineVersion; -} - declare module "../Client" { interface Client { /** @@ -76,34 +56,22 @@ Client.prototype.createProjection = async function ( this: Client, projectionName: string, query: string, - { - emitEnabled = false, - trackEmittedStreams = false, - engineVersion = ProjectionEngineVersion.V1, - ...baseOptions - }: CreateProjectionOptions = {} + options: CreateProjectionOptions = {} ): Promise { - const resolved: ResolvedCreateProjectionOptions = { - emitEnabled, - trackEmittedStreams, - engineVersion, - ...baseOptions, - }; - debug.command("createProjection: %O", { projectionName, query, - options: resolved, + options, }); if ( - trackEmittedStreams && + options.trackEmittedStreams && !(await this.supports(ProjectionsService.create, "track_emitted_streams")) ) { - return createProjectionHTTP.call(this, projectionName, query, resolved); + return createProjectionHTTP.call(this, projectionName, query, options); } - return createProjectionGRPC.call(this, projectionName, query, resolved); + return createProjectionGRPC.call(this, projectionName, query, options); }; const createProjectionGRPC = async function ( @@ -111,11 +79,11 @@ const createProjectionGRPC = async function ( projectionName: string, query: string, { - emitEnabled, - trackEmittedStreams, - engineVersion, + emitEnabled = false, + trackEmittedStreams = false, + engineVersion = PROJECTION_ENGINE_V1, ...baseOptions - }: ResolvedCreateProjectionOptions + }: CreateProjectionOptions = {} ): Promise { const req = new CreateReq(); const options = new CreateReq.Options(); @@ -127,7 +95,9 @@ const createProjectionGRPC = async function ( options.setContinuous(continuous); options.setQuery(query); - options.setEngineVersion(ENGINE_VERSION_WIRE[engineVersion]); + if (engineVersion === PROJECTION_ENGINE_V2) { + options.setEngineVersion(ENGINE_VERSION_WIRE[engineVersion]); + } req.setOptions(options); @@ -151,11 +121,11 @@ const createProjectionHTTP = async function ( projectionName: string, query: string, { - emitEnabled, - trackEmittedStreams, - engineVersion, + emitEnabled = false, + trackEmittedStreams = false, + engineVersion = PROJECTION_ENGINE_V1, ...baseOptions - }: ResolvedCreateProjectionOptions + }: CreateProjectionOptions = {} ) { await this.HTTPRequest( "POST", @@ -164,9 +134,11 @@ const createProjectionHTTP = async function ( ...baseOptions, searchParams: { name: projectionName, - emit: String(emitEnabled), - trackemittedstreams: String(trackEmittedStreams), - engineversion: String(ENGINE_VERSION_WIRE[engineVersion]), + emit: emitEnabled.toString(), + trackemittedstreams: trackEmittedStreams.toString(), + ...(engineVersion === PROJECTION_ENGINE_V2 && { + engineversion: ENGINE_VERSION_WIRE[engineVersion].toString(), + }), }, }, query diff --git a/packages/db-client/src/types/index.ts b/packages/db-client/src/types/index.ts index a9696d6c..3e2ba3d4 100644 --- a/packages/db-client/src/types/index.ts +++ b/packages/db-client/src/types/index.ts @@ -262,6 +262,10 @@ export type NodePreference = | typeof constants.LEADER | typeof constants.READ_ONLY_REPLICA; +export type ProjectionEngineVersion = + | typeof constants.PROJECTION_ENGINE_V1 + | typeof constants.PROJECTION_ENGINE_V2; + export interface EndPoint { address: string; port: number; diff --git a/packages/test/src/projections/createProjection.test.ts b/packages/test/src/projections/createProjection.test.ts index 5a80bfda..983acab1 100644 --- a/packages/test/src/projections/createProjection.test.ts +++ b/packages/test/src/projections/createProjection.test.ts @@ -3,7 +3,7 @@ import { collect, createTestNode, delay } from "@test-utils"; import { KurrentDBClient, jsonEvent, - ProjectionEngineVersion, + PROJECTION_ENGINE_V2, StreamNotFoundError, } from "@kurrent/kurrentdb-client"; @@ -134,7 +134,7 @@ describe("createProjection", () => { } }); `, - { engineVersion: ProjectionEngineVersion.V2 } + { engineVersion: PROJECTION_ENGINE_V2 } ) ).resolves.toBeUndefined(); });