Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export namespace CreateReq {
setContinuous(value?: CreateReq.Options.Continuous): Options;
getQuery(): string;
setQuery(value: string): Options;
getEngineVersion(): number;
setEngineVersion(value: number): Options;

getModeCase(): Options.ModeCase;

Expand All @@ -68,6 +70,7 @@ export namespace CreateReq {
pb_transient?: CreateReq.Options.Transient.AsObject,
continuous?: CreateReq.Options.Continuous.AsObject,
query: string,
engineVersion: number,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,8 @@ proto.event_store.client.projections.CreateReq.Options.toObject = function(inclu
oneTime: (f = msg.getOneTime()) && kurrentdb_protocols_v1_shared_pb.Empty.toObject(includeInstance, f),
pb_transient: (f = msg.getTransient()) && proto.event_store.client.projections.CreateReq.Options.Transient.toObject(includeInstance, f),
continuous: (f = msg.getContinuous()) && proto.event_store.client.projections.CreateReq.Options.Continuous.toObject(includeInstance, f),
query: jspb.Message.getFieldWithDefault(msg, 4, "")
query: jspb.Message.getFieldWithDefault(msg, 4, ""),
engineVersion: jspb.Message.getFieldWithDefault(msg, 5, 0)
};

if (includeInstance) {
Expand Down Expand Up @@ -920,6 +921,10 @@ proto.event_store.client.projections.CreateReq.Options.deserializeBinaryFromRead
var value = /** @type {string} */ (reader.readString());
msg.setQuery(value);
break;
case 5:
var value = /** @type {number} */ (reader.readInt32());
msg.setEngineVersion(value);
break;
default:
reader.skipField();
break;
Expand Down Expand Up @@ -980,6 +985,13 @@ proto.event_store.client.projections.CreateReq.Options.serializeBinaryToWriter =
f
);
}
f = message.getEngineVersion();
if (f !== 0) {
writer.writeInt32(
5,
f
);
}
};


Expand Down Expand Up @@ -1432,6 +1444,24 @@ proto.event_store.client.projections.CreateReq.Options.prototype.setQuery = func
};


/**
* optional int32 engine_version = 5;
* @return {number}
*/
proto.event_store.client.projections.CreateReq.Options.prototype.getEngineVersion = function() {
return /** @type {number} */ (jspb.Message.getFieldWithDefault(this, 5, 0));
};


/**
* @param {number} value
* @return {!proto.event_store.client.projections.CreateReq.Options} returns this
*/
proto.event_store.client.projections.CreateReq.Options.prototype.setEngineVersion = function(value) {
return jspb.Message.setProto3IntField(this, 5, value);
};


/**
* optional Options options = 1;
* @return {?proto.event_store.client.projections.CreateReq.Options}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ message CreateReq {
Continuous continuous = 3;
}
string query = 4;
int32 engine_version = 5; // 0 or 1 = v1 (default), 2 = v2

message Transient {
option deprecated = true;
Expand Down
4 changes: 4 additions & 0 deletions packages/db-client/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export const RETRY = "retry";
export const SKIP = "skip";
export const STOP = "stop";

// projection engine version
export const PROJECTION_ENGINE_V1 = "v1";
export const PROJECTION_ENGINE_V2 = "v2";

// projection status
export const CREATING = "Creating";
export const LOADING = "Loading";
Expand Down
25 changes: 24 additions & 1 deletion packages/db-client/src/projections/createProjection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import {
import { CreateReq } from "../../generated/kurrentdb/protocols/v1/projectionmanagement_pb";

import { Client } from "../Client";
import type { BaseOptions } from "../types";
import { PROJECTION_ENGINE_V1, PROJECTION_ENGINE_V2 } from "../constants";
import type { BaseOptions, ProjectionEngineVersion } from "../types";
import { debug, convertToCommandError } from "../utils";

const ENGINE_VERSION_WIRE: Record<ProjectionEngineVersion, number> = {
[PROJECTION_ENGINE_V1]: 1,
[PROJECTION_ENGINE_V2]: 2,
};

export interface CreateProjectionOptions extends BaseOptions {
/**
* Enables emitting from the projection.
Expand All @@ -19,6 +25,15 @@ export interface CreateProjectionOptions extends BaseOptions {
* @defaultValue false
*/
trackEmittedStreams?: boolean;
/**
* Selects the projection engine version. Pinned at create time and
* cannot be changed later. V2 is opt-in and does not support
* `trackEmittedStreams`, bi-state projections, or live `outputState`
* result streams. See the KurrentDB documentation for the full list of
* limitations before choosing V2.
* @defaultValue {@link PROJECTION_ENGINE_V1}
*/
engineVersion?: ProjectionEngineVersion;
}

declare module "../Client" {
Expand Down Expand Up @@ -66,6 +81,7 @@ const createProjectionGRPC = async function (
{
emitEnabled = false,
trackEmittedStreams = false,
engineVersion = PROJECTION_ENGINE_V1,
...baseOptions
}: CreateProjectionOptions = {}
): Promise<void> {
Expand All @@ -79,6 +95,9 @@ const createProjectionGRPC = async function (

options.setContinuous(continuous);
options.setQuery(query);
if (engineVersion === PROJECTION_ENGINE_V2) {
options.setEngineVersion(ENGINE_VERSION_WIRE[engineVersion]);
}

req.setOptions(options);

Expand All @@ -104,6 +123,7 @@ const createProjectionHTTP = async function (
{
emitEnabled = false,
trackEmittedStreams = false,
engineVersion = PROJECTION_ENGINE_V1,
...baseOptions
}: CreateProjectionOptions = {}
) {
Expand All @@ -116,6 +136,9 @@ const createProjectionHTTP = async function (
name: projectionName,
emit: emitEnabled.toString(),
trackemittedstreams: trackEmittedStreams.toString(),
...(engineVersion === PROJECTION_ENGINE_V2 && {
engineversion: ENGINE_VERSION_WIRE[engineVersion].toString(),
}),
},
},
query
Expand Down
4 changes: 4 additions & 0 deletions packages/db-client/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ export type NodePreference =
| typeof constants.LEADER
| typeof constants.READ_ONLY_REPLICA;

export type ProjectionEngineVersion =
| typeof constants.PROJECTION_ENGINE_V1
| typeof constants.PROJECTION_ENGINE_V2;

export interface EndPoint {
address: string;
port: number;
Expand Down
20 changes: 20 additions & 0 deletions packages/test/src/projections/createProjection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { collect, createTestNode, delay } from "@test-utils";
import {
KurrentDBClient,
jsonEvent,
PROJECTION_ENGINE_V2,
StreamNotFoundError,
} from "@kurrent/kurrentdb-client";

Expand Down Expand Up @@ -118,4 +119,23 @@ describe("createProjection", () => {
expect(emittedStream).toBeDefined();
expect(emittedStream.event?.type).toBe("$StreamTracked");
});

test("v2 engine", async () => {
const PROJECTION_NAME = "v2_engine";

await expect(
client.createProjection(
PROJECTION_NAME,
`
fromAll()
.when({
$init: function (state, ev) {
return {};
}
});
`,
{ engineVersion: PROJECTION_ENGINE_V2 }
)
).resolves.toBeUndefined();
});
});
Loading