diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java b/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java index f920d39..13411bf 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java @@ -17,11 +17,17 @@ import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc; import fr.aneo.armonik.client.definition.SessionDefinition; +import fr.aneo.armonik.client.exception.ArmoniKException; +import fr.aneo.armonik.client.internal.concurrent.Futures; import io.grpc.ManagedChannel; import java.time.Duration; +import java.util.HashSet; +import java.util.concurrent.CompletionStage; -import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toCreateSessionRequest; +import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*; +import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toSessionState; +import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration; import static java.util.Objects.requireNonNull; /** @@ -104,8 +110,144 @@ public ArmoniKClient(ArmoniKConfig armoniKConfig) { public SessionHandle openSession(SessionDefinition sessionDefinition) { requireNonNull(sessionDefinition, "sessionDefinition must not be null"); - var id = createSession(sessionDefinition); - return new SessionHandle(id, sessionDefinition, channelPool); + var sessionInfo = createSession(sessionDefinition); + return new SessionHandle(sessionInfo, sessionDefinition.outputListener(), channelPool); + } + + /** + * Connects to an existing session in the ArmoniK cluster. + *

+ * This method retrieves the session metadata associated with the given {@link SessionId} + * and returns a {@link SessionHandle} that can be used to submit tasks and monitor + * output blob completions + *

+ * If a {@link BlobCompletionListener} is provided, the returned {@code SessionHandle} + * will automatically start listening for output blob completion events for this + * session. + * + *

Error Handling: + * If the session does not exist, this method throws an {@link ArmoniKException} + * + * @param sessionId the identifier of the existing session to connect to + * @param outputListener an optional listener for output blob completion events; + * if {@code null}, no listener is registered + * @return a {@link SessionHandle} bound to the existing session + * @throws NullPointerException if {@code sessionId} is {@code null} + * @throws ArmoniKException if the session does not exist, or if a communication error occurs + * @see SessionHandle + * @see BlobCompletionListener + */ + public SessionHandle getSession(SessionId sessionId, BlobCompletionListener outputListener) { + requireNonNull(sessionId, "sessionId must not be null"); + + try { + var sessionResponse = channelPool.execute( + channel -> SessionsGrpc.newBlockingStub(channel).getSession(toGetSessionRequest(sessionId)) + ); + if (!sessionResponse.hasSession()) throw new ArmoniKException("Session not found"); + + var sessionInfo = new SessionInfo( + sessionId, + new HashSet<>(sessionResponse.getSession().getPartitionIdsList()), + toTaskConfiguration(sessionResponse.getSession().getOptions()) + ); + + return new SessionHandle(sessionInfo, outputListener, channelPool); + } catch (Exception e) { + throw new ArmoniKException(e.getMessage(), e); + } + } + + /** + * Connects to an existing session in the ArmoniK cluster without registering + * an output listener. + *

+ * This is a convenience overload equivalent to calling: + *

{@code
+   * getSession(sessionId, null);
+   * }
+ * + *

Error Handling: + * If the session does not exist, this method throws an {@link ArmoniKException}. + * + * @param sessionId the identifier of the existing session to connect to + * @return a {@link SessionHandle} for interacting with the session + * @throws NullPointerException if {@code sessionId} is {@code null} + * @throws ArmoniKException if the session does not exist, or if a communication error occurs + * @see #getSession(SessionId, BlobCompletionListener) + */ + public SessionHandle getSession(SessionId sessionId) { + return getSession(sessionId, null); + } + + /** + * Closes a session in the ArmoniK cluster by its identifier. + *

+ * A closed session no longer accepts new task submissions, but all existing tasks, results, + * and metadata remain available in the control plane. Closing a session is the recommended + * finalization step once no additional tasks will be submitted. + *

+ * This client-level method is provided as a convenience when the caller does not + * hold a {@link SessionHandle}. When a {@code SessionHandle} instance is available, + * prefer calling {@link SessionHandle#close()} so that all lifecycle-related + * operations remain grouped on the handle itself. + *

+ * The returned {@link CompletionStage} completes asynchronously with the updated + * {@link SessionState} returned by the Sessions service, or completes exceptionally + * if the request fails or the session does not exist. + * + * @param sessionId the identifier of the session to close; must not be {@code null} + * @return a completion stage yielding the updated state of the session after the + * close operation has been applied + * @throws NullPointerException if {@code sessionId} is {@code null} + * @see SessionHandle#close() + * @see SessionState + */ + public CompletionStage closeSession(SessionId sessionId) { + requireNonNull(sessionId, "sessionId must not be null"); + + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionId))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + + /** + * Cancels a session in the ArmoniK cluster by its identifier. + *

+ * Cancelling a session instructs the control plane to stop all remaining work associated + * with the session. Depending on the scheduling and execution state, running tasks may be + * interrupted, and queued tasks will no longer be scheduled. + *

+ * A cancelled session still retains its metadata, and completed task results remain + * accessible unless the session is later purged or deleted. Cancellation is useful + * when the overall workflow must be terminated early due to failure or user decision. + *

+ * This client-level convenience method is intended for scenarios where the caller + * does not have a {@link SessionHandle} instance. When such a handle is available, + * prefer calling {@link SessionHandle#cancel()} to keep lifecycle operations grouped + * with the associated session. + *

+ * The returned {@link CompletionStage} completes asynchronously with the updated + * {@link SessionState} returned by the Sessions service, or completes exceptionally + * if the request fails or the session does not exist. + * + * @param sessionId the identifier of the session to cancel; must not be {@code null} + * @return a completion stage yielding the updated state of the session after the + * cancellation operation has been applied + * @throws NullPointerException if {@code sessionId} is {@code null} + * @see SessionHandle#cancel() + * @see SessionState + */ + public CompletionStage cancelSession(SessionId sessionId) { + requireNonNull(sessionId, "sessionId must not be null"); + + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.cancelSession(toCancelSessionRequest(sessionId))) + .thenApply(response -> toSessionState(response.getSession())); + }); } /** diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/BatchingPolicy.java b/armonik-client/src/main/java/fr/aneo/armonik/client/BatchingPolicy.java index 18f0a83..08aa6db 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/BatchingPolicy.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/BatchingPolicy.java @@ -73,7 +73,7 @@ * @see BlobCompletionCoordinator * @see SessionDefinition */ -public record BatchingPolicy( +record BatchingPolicy( int batchSize, Duration maxDelay, int maxConcurrentBatches, @@ -94,7 +94,7 @@ public record BatchingPolicy( * * @see SessionDefinition */ - public static final BatchingPolicy DEFAULT = new BatchingPolicy(50, Duration.ofMillis(200), 5, 100); + static final BatchingPolicy DEFAULT = new BatchingPolicy(50, Duration.ofMillis(200), 5, 100); /** * Validates all policy parameters and their relationships. @@ -110,7 +110,7 @@ public record BatchingPolicy( * or {@code capPerBatch} is {@code <= 0} * @throws IllegalArgumentException if {@code maxDelay} is {@code null}, zero, or negative */ - public BatchingPolicy { + BatchingPolicy { if (batchSize <= 0) { throw new IllegalArgumentException("batchSize must be > 0, got: " + batchSize); } diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/BlobCompletionCoordinator.java b/armonik-client/src/main/java/fr/aneo/armonik/client/BlobCompletionCoordinator.java index 1cba307..d8910fc 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/BlobCompletionCoordinator.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/BlobCompletionCoordinator.java @@ -85,11 +85,11 @@ final class BlobCompletionCoordinator { * @see BlobCompletionListener * @see BlobCompletionEventWatcher */ - BlobCompletionCoordinator(SessionId sessionId, ChannelPool channelPool, SessionDefinition sessionDefinition) { + BlobCompletionCoordinator(SessionId sessionId, ChannelPool channelPool, BlobCompletionListener outputListener) { this( sessionId, - new BlobCompletionEventWatcher(sessionId, channelPool, sessionDefinition.outputListener()), - sessionDefinition.outputBatchingPolicy(), + new BlobCompletionEventWatcher(sessionId, channelPool, outputListener), + BatchingPolicy.DEFAULT, Schedulers.shared() ); } @@ -116,7 +116,7 @@ final class BlobCompletionCoordinator { ) { this.sessionId = requireNonNull(sessionId, "sessionId must not be null"); this.watcher = requireNonNull(watcher, "watcher must not be null"); - this.batchingPolicy = requireNonNull(batchingPolicy, "batchingPolicy must not be null"); + this.batchingPolicy = requireNonNullElse(batchingPolicy, BatchingPolicy.DEFAULT); this.scheduler = requireNonNull(scheduler, "scheduler must not be null"); this.permits = new Semaphore(batchingPolicy.maxConcurrentBatches()); } diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/SessionHandle.java b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionHandle.java index 78a7797..5f0edc1 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/SessionHandle.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionHandle.java @@ -16,6 +16,7 @@ package fr.aneo.armonik.client; import fr.aneo.armonik.api.grpc.v1.results.ResultsGrpc; +import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc; import fr.aneo.armonik.client.definition.SessionDefinition; import fr.aneo.armonik.client.definition.TaskDefinition; import fr.aneo.armonik.client.definition.blob.BlobDefinition; @@ -29,6 +30,7 @@ import java.util.function.Function; import static fr.aneo.armonik.client.internal.grpc.mappers.BlobMapper.toResultMetaDataRequest; +import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*; import static java.util.Objects.requireNonNull; /** @@ -59,24 +61,26 @@ public final class SessionHandle { /** - * Constructs a new session handle with the specified session information and configuration. + * Constructs a session handle for an existing or newly created session in the ArmoniK cluster. *

- * This constructor initializes the handle with the necessary components for task submission - * and output processing within the session context. The handle will use the provided - * gRPC channel for all cluster communications. + * It initializes the handle with the session metadata and configures task submission + * and optional output blob monitoring using the provided {@link BlobCompletionListener}. + *

+ * If an output listener is supplied, this handle automatically begins monitoring + * output blob completions using the default batching strategy (not exposed in the public API). * - * @param sessionInfo the immutable session metadata including session ID and configuration - * @param sessionDefinition the session definition used for task default configurations - * @param channelPool the gRPC channel pool for cluster communication - * @throws NullPointerException if any parameter is null + * @param sessionInfo immutable metadata describing the session, including the session ID + * @param outputListener optional listener notified when output blobs reach completion; may be {@code null} + * @param channelPool the gRPC channel pool used for communication with the cluster + * @throws NullPointerException if {@code sessionInfo} or {@code channelPool} is {@code null} * @see SessionInfo - * @see SessionDefinition + * @see BlobCompletionListener + * @see ArmoniKClient#openSession(SessionDefinition) + * @see ArmoniKClient#getSession(SessionId, BlobCompletionListener) */ - SessionHandle(SessionInfo sessionInfo, SessionDefinition sessionDefinition, ChannelPool channelPool) { - requireNonNull(sessionDefinition, "sessionDefinition must not be null"); - + SessionHandle(SessionInfo sessionInfo, BlobCompletionListener outputListener, ChannelPool channelPool) { this.sessionInfo = requireNonNull(sessionInfo, "sessionInfo must not be null"); - this.taskSubmitter = new TaskSubmitter(sessionInfo.id(), sessionDefinition, channelPool); + this.taskSubmitter = new TaskSubmitter(sessionInfo.id(), sessionInfo.taskConfiguration(), outputListener, channelPool); this.channelPool = requireNonNull(channelPool, "channelPool must not be null"); } @@ -185,6 +189,176 @@ public BlobHandle createBlob(InputBlobDefinition blobDefinition) { return blobHandle; } + /** + * Requests cancellation of this session in the ArmoniK cluster. + *

+ * Cancelling a session signals the control plane to stop all remaining work associated + * with this session. Depending on the server configuration and current task states, + * running tasks may be interrupted and queued tasks will no longer be scheduled. + * Results that have already been produced remain accessible unless the session + * is subsequently purged or deleted. + *

+ * The returned completion stage is completed asynchronously with the updated + * {@link SessionState} once the cancellation request has been processed by the + * Sessions service, or completed exceptionally if the request fails. + * + * @return a completion stage that yields the updated state of this session after + * the cancellation request has been applied + * + * @see SessionState + */ + public CompletionStage cancel() { + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.cancelSession(toCancelSessionRequest(sessionInfo.id()))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + + /** + * Pauses this session in the ArmoniK cluster. + *

+ * Pausing a session temporarily suspends the scheduling of new tasks in the session. + * Tasks that are already running continue until completion, but pending tasks are + * not picked up for execution until the session is resumed via {@link #resume()}. + *

+ * This operation is useful when you need to temporarily throttle or stop processing + * without cancelling the session or losing its state. + * The returned completion stage is completed asynchronously with the updated + * {@link SessionState} once the pause request has been processed by the Sessions + * service, or completed exceptionally if the request fails. + * + * @return a completion stage that yields the updated state of this session after + * the pause request has been applied + * + * @see #resume() + * @see SessionState + */ + public CompletionStage pause() { + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.pauseSession(toPauseSessionRequest(sessionInfo.id()))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + /** + * Resumes a previously paused session in the ArmoniK cluster. + *

+ * Resuming a session re-enables scheduling of tasks that were previously held + * while the session was paused. Any pending tasks become eligible for execution + * again according to the cluster scheduling policies. + *

+ * Calling this method on a session that is not paused is safe; the Sessions service + * will simply return the current state of the session. + * The returned completion stage is completed asynchronously with the updated + * {@link SessionState} once the resume request has been processed by the Sessions + * service, or completed exceptionally if the request fails. + * + * @return a completion stage that yields the updated state of this session after + * the resume request has been applied + * + * @see #pause() + * @see SessionState + */ + public CompletionStage resume() { + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.resumeSession(toResumeSessionRequest(sessionInfo.id()))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + + /** + * Closes this session in the ArmoniK cluster. + *

+ * Closing a session finalizes it and prevents any new task submissions, while + * preserving existing tasks, results, and metadata. This is the recommended way + * to indicate that no further work will be submitted for this session once all + * expected tasks have been created. + *

+ * Closing a session does not remove its data. To free up storage or completely + * remove the session, combine this operation with {@link #purge()} and + * {@link #delete()} as appropriate. + * The returned completion stage is completed asynchronously with the updated + * {@link SessionState} once the close request has been processed by the Sessions + * service, or completed exceptionally if the request fails. + * + * @return a completion stage that yields the updated state of this session after + * the close request has been applied + * + * @see #purge() + * @see #delete() + * @see SessionState + */ + public CompletionStage close() { + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionInfo.id()))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + + /** + * Purges this session's data in the ArmoniK cluster. + *

+ * Purging a session removes the underlying data for its blobs (task inputs and + * outputs) from the storage layer while keeping the session and task metadata. + * This operation is useful to reclaim storage space once the actual data is no + * longer needed but you still want to keep an audit trail or execution history. + *

+ * After a purge, attempts to download blob data associated with this session + * will fail, but session and task information remain available until the session + * is deleted. + * The returned completion stage is completed asynchronously with the updated + * {@link SessionState} once the purge request has been processed by the Sessions + * service, or completed exceptionally if the request fails. + * + * @return a completion stage that yields the updated state of this session after + * the purge request has been applied + * + * @see #delete() + * @see SessionState + */ + public CompletionStage purge() { + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.purgeSession(toPurgeSessionRequest(sessionInfo.id()))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + + /** + * Deletes this session from the ArmoniK cluster. + *

+ * Deleting a session permanently removes its metadata from the Sessions, Tasks, + * and Blobs. This is typically the final step in the lifecycle of a + * session once it has been closed and, optionally, purged. + *

+ * After deletion, this handle still exists as a local object but any further + * interaction with the remote session (such as submitting tasks or querying + * state) will fail because the session no longer exists in the cluster. + * Clients are expected to discard the handle after deletion. + * The returned completion stage is completed asynchronously with the updated + * {@link SessionState} reported by the Sessions service just before removal, + * or completed exceptionally if the request fails. + * + * @return a completion stage that yields the last known state of this session + * before it is removed from the cluster + * + * @see #close() + * @see #purge() + * @see SessionState + */ + public CompletionStage delete() { + return channelPool.executeAsync(channel -> { + var sessionsFutureStub = SessionsGrpc.newFutureStub(channel); + return Futures.toCompletionStage(sessionsFutureStub.deleteSession(toDeleteSessionRequest(sessionInfo.id()))) + .thenApply(response -> toSessionState(response.getSession())); + }); + } + + + private Function> createBlobInfo(BlobDefinition blobDefinition) { return channel -> { var request = toResultMetaDataRequest(sessionInfo.id(), List.of(blobDefinition)); diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/SessionId.java b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionId.java index e7833bc..3cee745 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/SessionId.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionId.java @@ -48,7 +48,7 @@ public String asString() { return id; } - static SessionId from(String sessionId) { + public static SessionId from(String sessionId) { return new SessionId(sessionId); } diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/SessionState.java b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionState.java new file mode 100644 index 0000000..994f2d6 --- /dev/null +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionState.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2025 ANEO (armonik@aneo.fr) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fr.aneo.armonik.client; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +/** + * Immutable snapshot of a session's state in the ArmoniK cluster. + *

+ * A {@code SessionState} instance mirrors the information returned by the Sessions service + * for a given session. It aggregates the current lifecycle status, submission flags, + * configuration and key timestamps into a single value object. Instances of this record + * are typically obtained from session lifecycle operations such as + * {@link SessionHandle#cancel()}, {@link SessionHandle#pause()}, {@link SessionHandle#resume()}, + * {@link SessionHandle#close()}, {@link SessionHandle#purge()} and {@link SessionHandle#delete()}, + * or from {@link ArmoniKClient#closeSession(SessionId)}. + *

+ * Time-related fields may be {@code null} when the corresponding lifecycle transition + * has not occurred yet. For example, {@code cancelledAt} is only populated when the + * session has been cancelled, and {@code closedAt} is only populated after the session + * has been closed. The {@code duration} field is generally set when the session is in a + * terminal state (such as {@code CANCELLED} or {@code CLOSED}) and represents the elapsed + * time between creation and termination as computed by the control plane. + * + * @param sessionId the unique identifier of the session + * @param status the current lifecycle status of the session as reported by the cluster + * @param clientSubmission whether clients are currently allowed to submit new tasks in this session + * @param workerSubmission whether workers are currently allowed to submit tasks in this session + * @param partitionIds the set of partition identifiers associated with this session; determines where + * tasks may be scheduled + * @param taskConfiguration the default task configuration applied to tasks created in this session + * @param createdAt the instant at which the session was created in the cluster; never {@code null} + * for a valid session + * @param cancelledAt the instant at which the session was cancelled, or {@code null} if the session + * has not been cancelled + * @param closedAt the instant at which the session was closed, or {@code null} if the session + * has not been closed + * @param purgedAt the instant at which the session's data was purged from storage, or {@code null} + * if no purge has been performed + * @param deletedAt the instant at which the session was deleted from the control plane, or {@code null} + * if the session still exists + * @param duration the total duration of the session as computed by the control plane, typically set + * when the session reaches a terminal state; may be {@code null} otherwise + * @see SessionHandle + * @see SessionStatus + * @see TaskConfiguration + */ +public record SessionState( + SessionId sessionId, + SessionStatus status, + boolean clientSubmission, + boolean workerSubmission, + List partitionIds, + TaskConfiguration taskConfiguration, + Instant createdAt, + Instant cancelledAt, + Instant closedAt, + Instant purgedAt, + Instant deletedAt, + Duration duration +) { +} diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/SessionStatus.java b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionStatus.java new file mode 100644 index 0000000..09934bd --- /dev/null +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/SessionStatus.java @@ -0,0 +1,26 @@ +/* + * Copyright © 2025 ANEO (armonik@aneo.fr) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fr.aneo.armonik.client; + +public enum SessionStatus { + UNSPECIFIED, + RUNNING, + CANCELLED, + PAUSED, + CLOSED, + PURGED, + DELETED +} diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/TaskSubmitter.java b/armonik-client/src/main/java/fr/aneo/armonik/client/TaskSubmitter.java index 1a17fc6..3dc1cc1 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/TaskSubmitter.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/TaskSubmitter.java @@ -18,7 +18,6 @@ import com.google.gson.Gson; import fr.aneo.armonik.api.grpc.v1.results.ResultsGrpc; import fr.aneo.armonik.api.grpc.v1.tasks.TasksGrpc; -import fr.aneo.armonik.client.definition.SessionDefinition; import fr.aneo.armonik.client.definition.TaskDefinition; import fr.aneo.armonik.client.definition.blob.BlobDefinition; import fr.aneo.armonik.client.definition.blob.InputBlobDefinition; @@ -49,6 +48,8 @@ import static fr.aneo.armonik.client.internal.grpc.mappers.BlobMapper.toResultMetaDataRequest; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toMap; /** @@ -89,27 +90,35 @@ final class TaskSubmitter { /** * Creates a new task submitter for the specified session context. *

- * The task submitter will use the session's configuration for default task settings and - * will optionally set up blob completion coordination if an output listener is configured - * in the session definition. + * This constructor is used internally by {@link SessionHandle} when a session is opened + * or retrieved via {@link ArmoniKClient}. It configures the default task settings for the + * session and, if an {@link BlobCompletionListener} is provided, initializes output blob + * completion coordination using the default batching strategy. + *

+ * When {@code taskConfiguration} is {@code null}, the global + * {@link TaskConfiguration#defaultConfiguration() default configuration} is applied. * * @param sessionId the identifier of the session context for task submission - * @param sessionDefinition the session definition containing task configuration and output listener + * @param taskConfiguration the default task configuration to apply for submissions in this session; + * may be {@code null} to use the global default + * @param outputListener optional listener for output blob completion events; may be {@code null} * @param channelPool the gRPC channel pool for cluster communication - * @throws NullPointerException if any parameter is null - * @see SessionDefinition + * @throws NullPointerException if {@code sessionId} or {@code channelPool} is {@code null} + * @see SessionHandle + * @see TaskConfiguration + * @see BlobCompletionListener * @see BlobCompletionCoordinator */ TaskSubmitter(SessionId sessionId, - SessionDefinition sessionDefinition, + TaskConfiguration taskConfiguration, + BlobCompletionListener outputListener, ChannelPool channelPool) { - this.sessionId = sessionId; - this.defaultTaskConfiguration = sessionDefinition.taskConfiguration() != null ? sessionDefinition.taskConfiguration() : defaultConfiguration(); - this.channelPool = channelPool; - - if (sessionDefinition.outputListener() != null) { - this.blobCompletionCoordinator = new BlobCompletionCoordinator(sessionId, channelPool, sessionDefinition); + this.sessionId = requireNonNull(sessionId); + this.defaultTaskConfiguration = requireNonNullElse(taskConfiguration, defaultConfiguration()); + this.channelPool = requireNonNull(channelPool); + if (outputListener != null) { + this.blobCompletionCoordinator = new BlobCompletionCoordinator(sessionId, channelPool, outputListener); } } @@ -198,7 +207,11 @@ TaskHandle submit(TaskDefinition taskDefinition) { * @see SessionHandle#awaitOutputsProcessed() */ CompletionStage waitUntilFinished() { - return blobCompletionCoordinator.waitUntilIdle(); + if (blobCompletionCoordinator != null) { + return blobCompletionCoordinator.waitUntilIdle(); + } else { + return completedFuture(null); + } } /** diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/definition/SessionDefinition.java b/armonik-client/src/main/java/fr/aneo/armonik/client/definition/SessionDefinition.java index e22044c..4403daf 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/definition/SessionDefinition.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/definition/SessionDefinition.java @@ -19,6 +19,8 @@ import java.util.Set; +import static java.util.Objects.requireNonNullElse; + /** * Definition specifying the configuration and parameters for creating a session in ArmoniK. @@ -35,18 +37,15 @@ * @param partitionIds the set of partition identifiers where tasks within this session can be executed * @param taskConfiguration the default task configuration applied to tasks submitted within this session * @param outputListener the listener to receive task output completion events for this session - * @param outputBatchingPolicy the batching policy used for coordinating task output completion monitoring * @see ArmoniKClient#openSession(SessionDefinition) * @see SessionHandle * @see TaskConfiguration * @see BlobCompletionListener - * @see BatchingPolicy */ public record SessionDefinition( Set partitionIds, TaskConfiguration taskConfiguration, - BlobCompletionListener outputListener, - BatchingPolicy outputBatchingPolicy + BlobCompletionListener outputListener ) { /** @@ -59,10 +58,9 @@ public record SessionDefinition( * @param partitionIds the set of partition identifiers where tasks can be executed * @throws NullPointerException if partitionIds is null * @see TaskConfiguration#defaultConfiguration() - * @see BatchingPolicy#DEFAULT */ public SessionDefinition(Set partitionIds) { - this(partitionIds, TaskConfiguration.defaultConfiguration(), null, BatchingPolicy.DEFAULT); + this(partitionIds, TaskConfiguration.defaultConfiguration(), null); } /** @@ -76,10 +74,9 @@ public SessionDefinition(Set partitionIds) { * @param taskConfiguration the default task configuration for tasks in this session * @throws NullPointerException if any parameter is null * @see TaskConfiguration - * @see BatchingPolicy#DEFAULT */ public SessionDefinition(Set partitionIds, TaskConfiguration taskConfiguration) { - this(partitionIds, taskConfiguration, null, BatchingPolicy.DEFAULT); + this(partitionIds, taskConfiguration, null); } /** @@ -94,19 +91,16 @@ public SessionDefinition(Set partitionIds, TaskConfiguration taskConfigu * Default values are applied for null parameters: *

* * @throws NullPointerException if partitionIds is null * @throws IllegalArgumentException if partitionIds is empty or if task configuration * specifies a partition not included in the session's partition set * @see TaskConfiguration#defaultConfiguration() - * @see BatchingPolicy#DEFAULT */ public SessionDefinition { - taskConfiguration = taskConfiguration == null ? TaskConfiguration.defaultConfiguration() : taskConfiguration; - partitionIds = partitionIds == null ? Set.of() : partitionIds; - outputBatchingPolicy = outputBatchingPolicy == null ? BatchingPolicy.DEFAULT : outputBatchingPolicy; + taskConfiguration = requireNonNullElse(taskConfiguration, TaskConfiguration.defaultConfiguration()); + partitionIds = requireNonNullElse(partitionIds, Set.of()); validateTaskDefaultPartition(partitionIds, taskConfiguration); } diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java index 9f76080..fb876eb 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java @@ -15,8 +15,6 @@ */ package fr.aneo.armonik.client.internal.concurrent; -import fr.aneo.armonik.client.BatchingPolicy; - import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -38,7 +36,6 @@ * A JVM shutdown hook is automatically registered to properly shut down the executor when * the application terminates, ensuring clean resource cleanup. * - * @see BatchingPolicy */ public class Schedulers { diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java index 3e85327..9c1182c 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java @@ -15,9 +15,18 @@ */ package fr.aneo.armonik.client.internal.grpc.mappers; +import com.google.protobuf.Timestamp; +import fr.aneo.armonik.api.grpc.v1.sessions.SessionStatusOuterClass; +import fr.aneo.armonik.client.SessionId; +import fr.aneo.armonik.client.SessionState; +import fr.aneo.armonik.client.SessionStatus; import fr.aneo.armonik.client.definition.SessionDefinition; -import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.CreateSessionRequest; +import java.time.Duration; +import java.time.Instant; + +import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.*; +import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration; import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskOptions; public final class SessionMapper { @@ -31,4 +40,72 @@ public static CreateSessionRequest toCreateSessionRequest(SessionDefinition sess .setDefaultTaskOption(toTaskOptions(sessionDefinition.taskConfiguration())) .build(); } + + public static GetSessionRequest toGetSessionRequest(SessionId sessionId) { + return GetSessionRequest.newBuilder() + .setSessionId(sessionId.asString()) + .build(); + } + + public static CancelSessionRequest toCancelSessionRequest(SessionId sessionId) { + return CancelSessionRequest.newBuilder().setSessionId(sessionId.asString()).build(); + } + + public static PauseSessionRequest toPauseSessionRequest(SessionId sessionId) { + return PauseSessionRequest.newBuilder().setSessionId(sessionId.asString()).build(); + } + + public static ResumeSessionRequest toResumeSessionRequest(SessionId sessionId) { + return ResumeSessionRequest.newBuilder().setSessionId(sessionId.asString()).build(); + } + + public static CloseSessionRequest toCloseSessionRequest(SessionId sessionId) { + return CloseSessionRequest.newBuilder().setSessionId(sessionId.asString()).build(); + } + + public static PurgeSessionRequest toPurgeSessionRequest(SessionId sessionId) { + return PurgeSessionRequest.newBuilder().setSessionId(sessionId.asString()).build(); + } + + public static DeleteSessionRequest toDeleteSessionRequest(SessionId sessionId) { + return DeleteSessionRequest.newBuilder().setSessionId(sessionId.asString()).build(); + } + + public static SessionState toSessionState(SessionRaw sessionRaw) { + return new SessionState( + SessionId.from(sessionRaw.getSessionId()), + toSessionStatus(sessionRaw.getStatus()), + sessionRaw.getClientSubmission(), + sessionRaw.getWorkerSubmission(), + sessionRaw.getPartitionIdsList(), + toTaskConfiguration(sessionRaw.getOptions()), + sessionRaw.hasCreatedAt() ? toInstant(sessionRaw.getCreatedAt()) : null, + sessionRaw.hasCancelledAt() ? toInstant(sessionRaw.getCancelledAt()) : null, + sessionRaw.hasClosedAt() ? toInstant(sessionRaw.getClosedAt()) : null, + sessionRaw.hasPurgedAt() ? toInstant(sessionRaw.getPurgedAt()) : null, + sessionRaw.hasDeletedAt() ? toInstant(sessionRaw.getDeletedAt()) : null, + sessionRaw.hasDuration() ? toDuration(sessionRaw.getDuration()) : null + ); + } + + private static SessionStatus toSessionStatus(SessionStatusOuterClass.SessionStatus sessionStatus) { + return switch (sessionStatus) { + case SESSION_STATUS_UNSPECIFIED -> SessionStatus.UNSPECIFIED; + case SESSION_STATUS_RUNNING -> SessionStatus.RUNNING; + case SESSION_STATUS_PAUSED -> SessionStatus.PAUSED; + case SESSION_STATUS_CANCELLED -> SessionStatus.CANCELLED; + case SESSION_STATUS_CLOSED -> SessionStatus.CLOSED; + case SESSION_STATUS_PURGED -> SessionStatus.PURGED; + case SESSION_STATUS_DELETED -> SessionStatus.DELETED; + default -> throw new IllegalArgumentException("Unknown session status: " + sessionStatus); + }; + } + + private static Duration toDuration(com.google.protobuf.Duration protobufDuration) { + return Duration.ofSeconds(protobufDuration.getSeconds(), protobufDuration.getNanos()); + } + + private static Instant toInstant(Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()); + } } diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java index e9e9bb3..d3edbed 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java @@ -59,4 +59,15 @@ public static TaskOptions toTaskOptions(TaskConfiguration taskConfiguration) { .putAllOptions(taskConfiguration.options()) .build(); } + + public static TaskConfiguration toTaskConfiguration(TaskOptions taskOptions) { + return new TaskConfiguration( + taskOptions.getMaxRetries(), + taskOptions.getPriority(), + taskOptions.getPartitionId(), + java.time.Duration.ofSeconds(taskOptions.getMaxDuration().getSeconds(), taskOptions.getMaxDuration().getNanos()), + taskOptions.getOptionsMap() + ); + } + } diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java b/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java index ac5ba70..e7b1abd 100644 --- a/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java +++ b/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java @@ -16,6 +16,7 @@ package fr.aneo.armonik.client; import fr.aneo.armonik.client.definition.SessionDefinition; +import fr.aneo.armonik.client.exception.ArmoniKException; import fr.aneo.armonik.client.testutils.InProcessGrpcTestBase; import fr.aneo.armonik.client.testutils.SessionsGrpcMock; import io.grpc.BindableService; @@ -46,6 +47,7 @@ void setUp() { } @Test + @DisplayName("should open a new session successfully") void should_open_a_new_session_successfully() { // When var sessionHandle = client.openSession(sessionDefinition); @@ -64,6 +66,63 @@ void should_open_a_new_session_successfully() { assertThat(sessionsGrpcMock.submittedCreateSessionRequest.getDefaultTaskOption().getOptionsMap()).isEqualTo(Map.of("option1", "value1")); } + @Test + @DisplayName("should get an existing session") + void should_get_an_existing_session() { + // Given + var sessionId = SessionId.from("session_1"); + + // When + var sessionHandle = client.getSession(sessionId); + + // Then + assertThat(sessionsGrpcMock.submittedGetSessionRequest.getSessionId()).isEqualTo(sessionId.asString()); + assertThat(sessionHandle.sessionInfo()).isEqualTo( + new SessionInfo( + sessionId, + Set.of("partition1", "partition2"), + new TaskConfiguration(2, 5, "partition1", Duration.ofMinutes(60), Map.of("option1", "value1")) + )); + } + + @Test + @DisplayName("should close a session") + void should_close_an_existing_session() { + // Given + var sessionId = SessionId.from("session_1"); + + // When + client.closeSession(sessionId); + + // Then + assertThat(sessionsGrpcMock.submittedCloseSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedCloseSessionRequest.getSessionId()).isEqualTo(sessionId.asString()); + } + + @Test + @DisplayName("should cancel a session") + void should_cancel_an_existing_session() { + // Given + var sessionId = SessionId.from("session_1"); + + // When + client.cancelSession(sessionId); + + // Then + assertThat(sessionsGrpcMock.submittedCancelSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedCancelSessionRequest.getSessionId()).isEqualTo(sessionId.asString()); + } + + @Test + @DisplayName("should throw exception when session does not exist") + void should_throw_exception_when_session_does_not_exist() { + // Given + var sessionId = SessionId.from("does not exist"); + + // When / Then + assertThatThrownBy(() -> client.getSession(sessionId)).isInstanceOf(ArmoniKException.class); + } + @Test @DisplayName("should close channel pool when client is closed") void should_close_channel_pool_when_client_is_closed() { diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java b/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java index 13928ba..d3163d6 100644 --- a/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java +++ b/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java @@ -16,14 +16,10 @@ package fr.aneo.armonik.client; import com.google.gson.Gson; -import fr.aneo.armonik.client.definition.SessionDefinition; import fr.aneo.armonik.client.definition.TaskDefinition; import fr.aneo.armonik.client.definition.blob.InputBlobDefinition; import fr.aneo.armonik.client.definition.blob.OutputBlobDefinition; -import fr.aneo.armonik.client.testutils.EventsGrpcMock; -import fr.aneo.armonik.client.testutils.InProcessGrpcTestBase; -import fr.aneo.armonik.client.testutils.ResultsGrpcMock; -import fr.aneo.armonik.client.testutils.TasksGrpcMock; +import fr.aneo.armonik.client.testutils.*; import io.grpc.BindableService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -33,7 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; @@ -42,7 +37,7 @@ import static fr.aneo.armonik.client.TestDataFactory.blobHandle; import static fr.aneo.armonik.client.TestDataFactory.sessionInfo; import static fr.aneo.armonik.client.WorkerLibrary.*; -import static fr.aneo.armonik.client.testutils.ResultsGrpcMock.*; +import static fr.aneo.armonik.client.testutils.ResultsGrpcMock.MetadataRequest; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -52,26 +47,22 @@ class SessionHandleTest extends InProcessGrpcTestBase { private final ResultsGrpcMock resultsGrpcMock = new ResultsGrpcMock(); private final TasksGrpcMock taskGrpcMock = new TasksGrpcMock(); private final EventsGrpcMock eventsGrpcMock = new EventsGrpcMock(); + private final SessionsGrpcMock sessionsGrpcMock = new SessionsGrpcMock(); private BlobCompletionListenerMock outputTaskListener; private SessionInfo sessionInfo; private SessionHandle sessionHandle; @Override protected List services() { - return List.of(resultsGrpcMock, taskGrpcMock, eventsGrpcMock); + return List.of(resultsGrpcMock, taskGrpcMock, eventsGrpcMock, sessionsGrpcMock); } @BeforeEach void setUp() { + var taskConfiguration = new TaskConfiguration(3, 1, "partition_1", Duration.ofMinutes(60), Map.of("option1", "value1")); outputTaskListener = new BlobCompletionListenerMock(); - var sessionDefinition = new SessionDefinition( - Set.of("partition_1"), - new TaskConfiguration(3, 1, "partition_1", Duration.ofMinutes(60), Map.of("option1", "value1")), - outputTaskListener, - new BatchingPolicy(1, Duration.ofSeconds(1), 1, 1) - ); - sessionInfo = sessionInfo("partition_1"); - sessionHandle = new SessionHandle(sessionInfo, sessionDefinition, channelPool); + sessionInfo = sessionInfo("partition_1", taskConfiguration); + sessionHandle = new SessionHandle(sessionInfo, outputTaskListener, channelPool); resultsGrpcMock.reset(); } @@ -104,6 +95,7 @@ void should_submit_a_task() { } @Test + @DisplayName("should await outputs processed until all outputs complete") void should_await_outputs_processed_until_all_outputs_complete() throws Exception { // Given var awaitDone = new CountDownLatch(1); @@ -143,6 +135,7 @@ void should_await_outputs_processed_until_all_outputs_complete() throws Exceptio } @Test + @DisplayName("should await outputs processed with some failed outputs") void should_await_outputs_processed_with_some_failed_outputs() throws Exception { // Given var awaitDone = new CountDownLatch(1); @@ -183,6 +176,7 @@ void should_await_outputs_processed_with_some_failed_outputs() throws Exception } @Test + @DisplayName("should create a blob handle") void should_create_a_blob_handle() throws InterruptedException, TimeoutException { // Given var blobDefinition = InputBlobDefinition.from("Hello World".getBytes()); @@ -201,6 +195,72 @@ void should_create_a_blob_handle() throws InterruptedException, TimeoutException assertThat(resultsGrpcMock.uploadedDataInfos.get(0).blobId).isEqualTo(blobHandle.deferredBlobInfo().toCompletableFuture().join().id().asString()); } + @Test + @DisplayName("should cancel session with the session id") + void should_cancel_session_with_the_session_id() { + // When + sessionHandle.cancel().toCompletableFuture().join(); + + // Then + assertThat(sessionsGrpcMock.submittedCancelSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedCancelSessionRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); + } + + @Test + @DisplayName("should pause session with the session id") + void should_pause_session_with_the_session_id() { + // When + sessionHandle.pause().toCompletableFuture().join(); + + // Then + assertThat(sessionsGrpcMock.submittedPauseSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedPauseSessionRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); + } + + @Test + @DisplayName("should resume session with the session id") + void should_resume_session_with_the_session_id() { + // When + sessionHandle.resume().toCompletableFuture().join(); + + // Then + assertThat(sessionsGrpcMock.submittedResumeSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedResumeSessionRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); + } + + @Test + @DisplayName("should close session with the session id") + void should_close_session_with_the_session_id() { + // When + sessionHandle.close().toCompletableFuture().join(); + + // Then + assertThat(sessionsGrpcMock.submittedCloseSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedCloseSessionRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); + } + + @Test + @DisplayName("should purge session with the session id") + void should_purge_session_with_the_session_id() { + // When + sessionHandle.purge().toCompletableFuture().join(); + + // Then + assertThat(sessionsGrpcMock.submittedPurgeSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedPurgeSessionRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); + } + + @Test + @DisplayName("should delete session with the session id") + void should_delete_session_with_the_session_id() { + // When + sessionHandle.delete(); + + // Then + assertThat(sessionsGrpcMock.submittedDeleteSessionRequest).isNotNull(); + assertThat(sessionsGrpcMock.submittedDeleteSessionRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); + } + private void validateSubmittedSession() { assertThat(taskGrpcMock.submittedTasksRequest.getSessionId()).isEqualTo(sessionInfo.id().asString()); } diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/TestDataFactory.java b/armonik-client/src/test/java/fr/aneo/armonik/client/TestDataFactory.java index 18cbce2..f53bbe6 100644 --- a/armonik-client/src/test/java/fr/aneo/armonik/client/TestDataFactory.java +++ b/armonik-client/src/test/java/fr/aneo/armonik/client/TestDataFactory.java @@ -34,6 +34,9 @@ public static SessionId sessionId(String sessionId) { static SessionInfo sessionInfo(String partition) { return new SessionInfo(SessionId.from("SessionId"), Set.of(partition), defaultConfiguration()); } + static SessionInfo sessionInfo(String partition, TaskConfiguration taskConfiguration) { + return new SessionInfo(SessionId.from("SessionId"), Set.of(partition), taskConfiguration); + } public static BlobId blobId(String blobId) { return BlobId.from(blobId); diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapperTest.java b/armonik-client/src/test/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapperTest.java new file mode 100644 index 0000000..3358c61 --- /dev/null +++ b/armonik-client/src/test/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapperTest.java @@ -0,0 +1,129 @@ +/* + * Copyright © 2025 ANEO (armonik@aneo.fr) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fr.aneo.armonik.client.internal.grpc.mappers; + +import com.google.protobuf.Timestamp; +import fr.aneo.armonik.api.grpc.v1.sessions.SessionStatusOuterClass; +import fr.aneo.armonik.client.SessionState; +import fr.aneo.armonik.client.SessionStatus; +import fr.aneo.armonik.client.TaskConfiguration; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import static fr.aneo.armonik.api.grpc.v1.Objects.TaskOptions; +import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.SessionRaw; +import static org.assertj.core.api.Assertions.assertThat; + +class SessionMapperTest { + + @Test + void should_map_session_raw_to_session_state() { + // Given + var createdAt = Timestamp.newBuilder().setSeconds(100).setNanos(200).build(); + var cancelledAt = Timestamp.newBuilder().setSeconds(300).setNanos(400).build(); + var closedAt = Timestamp.newBuilder().setSeconds(500).setNanos(600).build(); + var purgedAt = Timestamp.newBuilder().setSeconds(700).setNanos(800).build(); + var deletedAt = Timestamp.newBuilder().setSeconds(900).setNanos(1000).build(); + var duration = com.google.protobuf.Duration.newBuilder() + .setSeconds(42) + .setNanos(123_000_000) + .build(); + + var taskOptions = TaskOptions.newBuilder() + .setMaxRetries(3) + .setPriority(5) + .setPartitionId("partition-1") + .setMaxDuration(com.google.protobuf.Duration.newBuilder() + .setSeconds(600) + .setNanos(0)) + .putAllOptions(Map.of("opt1", "val1")) + .build(); + + var raw = SessionRaw.newBuilder() + .setSessionId("session-123") + .setStatus(SessionStatusOuterClass.SessionStatus.SESSION_STATUS_CANCELLED) + .setClientSubmission(true) + .setWorkerSubmission(false) + .addAllPartitionIds(List.of("p1", "p2")) + .setOptions(taskOptions) + .setCreatedAt(createdAt) + .setCancelledAt(cancelledAt) + .setClosedAt(closedAt) + .setPurgedAt(purgedAt) + .setDeletedAt(deletedAt) + .setDuration(duration) + .build(); + + // When + SessionState sessionState = SessionMapper.toSessionState(raw); + + // Then + assertThat(sessionState.sessionId().asString()).isEqualTo("session-123"); + assertThat(sessionState.status()).isEqualTo(SessionStatus.CANCELLED); + assertThat(sessionState.clientSubmission()).isTrue(); + assertThat(sessionState.workerSubmission()).isFalse(); + assertThat(sessionState.partitionIds()).containsExactly("p1", "p2"); + assertThat(sessionState.taskConfiguration()) + .usingRecursiveComparison() + .isEqualTo(new TaskConfiguration( + 3, + 5, + "partition-1", + Duration.ofSeconds(600), + Map.of("opt1", "val1") + )); + + assertThat(sessionState.createdAt()).isEqualTo(Instant.ofEpochSecond(100, 200)); + assertThat(sessionState.cancelledAt()).isEqualTo(Instant.ofEpochSecond(300, 400)); + assertThat(sessionState.closedAt()).isEqualTo(Instant.ofEpochSecond(500, 600)); + assertThat(sessionState.purgedAt()).isEqualTo(Instant.ofEpochSecond(700, 800)); + assertThat(sessionState.deletedAt()).isEqualTo(Instant.ofEpochSecond(900, 1000)); + assertThat(sessionState.duration()).isEqualTo(Duration.ofSeconds(42, 123_000_000)); + } + + @Test + void should_set_time_fields_to_null_when_missing() { + // Given + var taskOptions = TaskOptions.newBuilder() + .setMaxRetries(3) + .setPriority(5) + .setPartitionId("partition-1") + .setMaxDuration(com.google.protobuf.Duration.newBuilder() + .setSeconds(600) + .setNanos(0)); + var raw = SessionRaw.newBuilder() + .setSessionId("session-123") + .setOptions(taskOptions) + .setStatus(SessionStatusOuterClass.SessionStatus.SESSION_STATUS_RUNNING) + .build(); + + // When + SessionState state = SessionMapper.toSessionState(raw); + + // Then + assertThat(state.status()).isEqualTo(SessionStatus.RUNNING); + assertThat(state.createdAt()).isNull(); + assertThat(state.cancelledAt()).isNull(); + assertThat(state.closedAt()).isNull(); + assertThat(state.purgedAt()).isNull(); + assertThat(state.deletedAt()).isNull(); + assertThat(state.duration()).isNull(); + } +} diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/testutils/SessionsGrpcMock.java b/armonik-client/src/test/java/fr/aneo/armonik/client/testutils/SessionsGrpcMock.java index e82857f..14a15b3 100644 --- a/armonik-client/src/test/java/fr/aneo/armonik/client/testutils/SessionsGrpcMock.java +++ b/armonik-client/src/test/java/fr/aneo/armonik/client/testutils/SessionsGrpcMock.java @@ -15,21 +15,138 @@ */ package fr.aneo.armonik.client.testutils; +import com.google.protobuf.Duration; import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc; import io.grpc.stub.StreamObserver; -import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.CreateSessionReply; -import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.CreateSessionRequest; +import java.util.List; +import java.util.Map; -public class SessionsGrpcMock extends SessionsGrpc.SessionsImplBase{ +import static fr.aneo.armonik.api.grpc.v1.Objects.TaskOptions; +import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.*; + +public class SessionsGrpcMock extends SessionsGrpc.SessionsImplBase { public CreateSessionRequest submittedCreateSessionRequest; + public GetSessionRequest submittedGetSessionRequest; + public CancelSessionRequest submittedCancelSessionRequest; + public PauseSessionRequest submittedPauseSessionRequest; + public ResumeSessionRequest submittedResumeSessionRequest; + public CloseSessionRequest submittedCloseSessionRequest; + public PurgeSessionRequest submittedPurgeSessionRequest; + public DeleteSessionRequest submittedDeleteSessionRequest; @Override public void createSession(CreateSessionRequest request, StreamObserver responseObserver) { - var response = CreateSessionReply.newBuilder().setSessionId("SessionId").build(); this.submittedCreateSessionRequest = request; - responseObserver.onNext(response); + responseObserver.onNext(CreateSessionReply.newBuilder().setSessionId("SessionId").build()); + responseObserver.onCompleted(); + } + + @Override + public void getSession(GetSessionRequest request, StreamObserver responseObserver) { + this.submittedGetSessionRequest = request; + + if (request.getSessionId().equals("does not exist")) { + responseObserver.onError(io.grpc.Status.NOT_FOUND.withDescription("Session not found").asRuntimeException()); + } else { + responseObserver.onNext(GetSessionResponse.newBuilder() + .setSession( + SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .addAllPartitionIds(List.of("partition1", "partition2")) + .setOptions(taskOptions()) + .build()) + .build()); + } + responseObserver.onCompleted(); + } + + @Override + public void cancelSession(CancelSessionRequest request, StreamObserver responseObserver) { + this.submittedCancelSessionRequest = request; + + var raw = SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .setOptions(taskOptions()) + .build(); + + responseObserver.onNext(CancelSessionResponse.newBuilder().setSession(raw).build()); + responseObserver.onCompleted(); + } + + @Override + public void pauseSession(PauseSessionRequest request, StreamObserver responseObserver) { + this.submittedPauseSessionRequest = request; + + var raw = SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .setOptions(taskOptions()) + .build(); + + responseObserver.onNext(PauseSessionResponse.newBuilder().setSession(raw).build()); + responseObserver.onCompleted(); + } + + @Override + public void resumeSession(ResumeSessionRequest request, StreamObserver responseObserver) { + this.submittedResumeSessionRequest = request; + + var raw = SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .setOptions(taskOptions()) + .build(); + + responseObserver.onNext(ResumeSessionResponse.newBuilder().setSession(raw).build()); + responseObserver.onCompleted(); + } + + @Override + public void closeSession(CloseSessionRequest request, StreamObserver responseObserver) { + this.submittedCloseSessionRequest = request; + + var raw = SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .setOptions(taskOptions()) + .build(); + + responseObserver.onNext(CloseSessionResponse.newBuilder().setSession(raw).build()); + responseObserver.onCompleted(); + } + + @Override + public void purgeSession(PurgeSessionRequest request, StreamObserver responseObserver) { + this.submittedPurgeSessionRequest = request; + + var raw = SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .setOptions(taskOptions()) + .build(); + + responseObserver.onNext(PurgeSessionResponse.newBuilder().setSession(raw).build()); responseObserver.onCompleted(); } + + @Override + public void deleteSession(DeleteSessionRequest request, StreamObserver responseObserver) { + this.submittedDeleteSessionRequest = request; + + var raw = SessionRaw.newBuilder() + .setSessionId(request.getSessionId()) + .setOptions(taskOptions()) + .build(); + + responseObserver.onNext(DeleteSessionResponse.newBuilder().setSession(raw).build()); + responseObserver.onCompleted(); + } + + private static TaskOptions taskOptions() { + return TaskOptions.newBuilder() + .setPartitionId("partition1") + .setMaxRetries(2) + .setPriority(5) + .setMaxDuration(Duration.newBuilder().setSeconds(3600)) + .putAllOptions(Map.of("option1", "value1")) + .build(); + } }