From 36ae79f1df297289e1aa4a7e0ae96473c445bdfd Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 6 May 2026 11:52:13 +0400 Subject: [PATCH] feat: support projection v2 engine --- kurrentdb/protos/projections.proto | 1 + .../src/event_store/generated/projections.rs | 2 ++ kurrentdb/src/options/projections.rs | 27 +++++++++++++++++++ kurrentdb/src/projection_client.rs | 1 + 4 files changed, 31 insertions(+) diff --git a/kurrentdb/protos/projections.proto b/kurrentdb/protos/projections.proto index 1fe8f367..2341246f 100644 --- a/kurrentdb/protos/projections.proto +++ b/kurrentdb/protos/projections.proto @@ -28,6 +28,7 @@ message CreateReq { Continuous continuous = 3; } string query = 4; + int32 engine_version = 5; message Transient { string name = 1; diff --git a/kurrentdb/src/event_store/generated/projections.rs b/kurrentdb/src/event_store/generated/projections.rs index cbef0daa..2c0500f8 100644 --- a/kurrentdb/src/event_store/generated/projections.rs +++ b/kurrentdb/src/event_store/generated/projections.rs @@ -10,6 +10,8 @@ pub mod create_req { pub struct Options { #[prost(string, tag = "4")] pub query: ::prost::alloc::string::String, + #[prost(int32, tag = "5")] + pub engine_version: i32, #[prost(oneof = "options::Mode", tags = "1, 2, 3")] pub mode: ::core::option::Option, } diff --git a/kurrentdb/src/options/projections.rs b/kurrentdb/src/options/projections.rs index b2711ff6..cf84f394 100644 --- a/kurrentdb/src/options/projections.rs +++ b/kurrentdb/src/options/projections.rs @@ -1,10 +1,30 @@ use kurrentdb_macros::options; +/// Selects which projection engine the server should use when creating a +/// continuous projection. `V1` is the default and is supported by every server +/// version. `V2` requires a server that supports the next-generation engine. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum ProjectionEngineVersion { + #[default] + V1, + V2, +} + +impl ProjectionEngineVersion { + pub(crate) fn as_i32(self) -> i32 { + match self { + ProjectionEngineVersion::V1 => 1, + ProjectionEngineVersion::V2 => 2, + } + } +} + options! { #[derive(Clone, Default)] pub struct CreateProjectionOptions { pub(crate) track_emitted_streams: bool, pub(crate) emit: bool, + pub(crate) engine_version: ProjectionEngineVersion, } } @@ -23,6 +43,13 @@ impl CreateProjectionOptions { pub fn emit(self, emit: bool) -> Self { Self { emit, ..self } } + + pub fn engine_version(self, engine_version: ProjectionEngineVersion) -> Self { + Self { + engine_version, + ..self + } + } } options! { diff --git a/kurrentdb/src/projection_client.rs b/kurrentdb/src/projection_client.rs index f9249cc7..501bf693 100644 --- a/kurrentdb/src/projection_client.rs +++ b/kurrentdb/src/projection_client.rs @@ -74,6 +74,7 @@ impl ProjectionClient { options, projections::create_req::Options { query: query.clone(), + engine_version: options.engine_version.as_i32(), mode: Some(projections::create_req::options::Mode::Continuous( projections::create_req::options::Continuous { name: name.as_ref().to_string(),