diff --git a/src/main/java/io/kurrent/dbclient/CreateProjection.java b/src/main/java/io/kurrent/dbclient/CreateProjection.java index 480927b6..dd1b2b89 100644 --- a/src/main/java/io/kurrent/dbclient/CreateProjection.java +++ b/src/main/java/io/kurrent/dbclient/CreateProjection.java @@ -12,6 +12,7 @@ class CreateProjection { private final String query; private final boolean trackEmittedStreams; private final boolean emitEnabled; + private final int engineVersion; private final CreateProjectionOptions options; public CreateProjection(final GrpcClient client, final String projectionName, final String query, @@ -22,6 +23,7 @@ public CreateProjection(final GrpcClient client, final String projectionName, fi this.query = query; this.trackEmittedStreams = options.isTrackingEmittedStreams(); this.emitEnabled = options.isEmitEnabled(); + this.engineVersion = options.getEngineVersion(); this.options = options; } @@ -36,6 +38,7 @@ public CompletableFuture execute() { Projectionmanagement.CreateReq.Options.Builder optionsBuilder = Projectionmanagement.CreateReq.Options.newBuilder() .setQuery(query) + .setEngineVersion(engineVersion) .setContinuous(continuousBuilder); Projectionmanagement.CreateReq request = Projectionmanagement.CreateReq.newBuilder() diff --git a/src/main/java/io/kurrent/dbclient/CreateProjectionOptions.java b/src/main/java/io/kurrent/dbclient/CreateProjectionOptions.java index 532a87e6..5d5a9247 100644 --- a/src/main/java/io/kurrent/dbclient/CreateProjectionOptions.java +++ b/src/main/java/io/kurrent/dbclient/CreateProjectionOptions.java @@ -6,6 +6,7 @@ public class CreateProjectionOptions extends OptionsBase { private boolean trackEmittedStreams; private boolean emitEnabled; + private int engineVersion; private CreateProjectionOptions() { this.trackEmittedStreams = false; @@ -26,6 +27,10 @@ boolean isEmitEnabled() { return emitEnabled; } + int getEngineVersion() { + return engineVersion; + } + /** * If true, the projection tracks all streams it creates. */ @@ -41,4 +46,17 @@ public CreateProjectionOptions emitEnabled(boolean value) { this.emitEnabled = value; return this; } + + /** + * Selects the projection engine version. {@code 0} (default) or {@code 1} selects V1; + * {@code 2} selects the V2 engine, which provides partition-based parallel processing. + *

+ * The engine version is pinned at create time and cannot be changed via update. + * V2 has limitations versus V1: {@code trackEmittedStreams} is rejected, + * result streams are not emitted, and bi-state projections are not supported. + */ + public CreateProjectionOptions engineVersion(int value) { + this.engineVersion = value; + return this; + } } diff --git a/src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto b/src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto index b61e940b..2b120e5f 100644 --- a/src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto +++ b/src/main/proto/kurrentdb/protocol/v1/projectionmanagement.proto @@ -28,6 +28,7 @@ message CreateReq { Continuous continuous = 3; } string query = 4; + int32 engine_version = 5; // 0 or 1 = v1 (default), 2 = v2 message Transient { string name = 1;