diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java b/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java index f920d39..13411bf 100644 --- a/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java +++ b/armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java @@ -17,11 +17,17 @@ import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc; import fr.aneo.armonik.client.definition.SessionDefinition; +import fr.aneo.armonik.client.exception.ArmoniKException; +import fr.aneo.armonik.client.internal.concurrent.Futures; import io.grpc.ManagedChannel; import java.time.Duration; +import java.util.HashSet; +import java.util.concurrent.CompletionStage; -import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toCreateSessionRequest; +import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*; +import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toSessionState; +import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration; import static java.util.Objects.requireNonNull; /** @@ -104,8 +110,144 @@ public ArmoniKClient(ArmoniKConfig armoniKConfig) { public SessionHandle openSession(SessionDefinition sessionDefinition) { requireNonNull(sessionDefinition, "sessionDefinition must not be null"); - var id = createSession(sessionDefinition); - return new SessionHandle(id, sessionDefinition, channelPool); + var sessionInfo = createSession(sessionDefinition); + return new SessionHandle(sessionInfo, sessionDefinition.outputListener(), channelPool); + } + + /** + * Connects to an existing session in the ArmoniK cluster. + *
+ * This method retrieves the session metadata associated with the given {@link SessionId} + * and returns a {@link SessionHandle} that can be used to submit tasks and monitor + * output blob completions + *
+ * If a {@link BlobCompletionListener} is provided, the returned {@code SessionHandle} + * will automatically start listening for output blob completion events for this + * session. + * + *
Error Handling: + * If the session does not exist, this method throws an {@link ArmoniKException} + * + * @param sessionId the identifier of the existing session to connect to + * @param outputListener an optional listener for output blob completion events; + * if {@code null}, no listener is registered + * @return a {@link SessionHandle} bound to the existing session + * @throws NullPointerException if {@code sessionId} is {@code null} + * @throws ArmoniKException if the session does not exist, or if a communication error occurs + * @see SessionHandle + * @see BlobCompletionListener + */ + public SessionHandle getSession(SessionId sessionId, BlobCompletionListener outputListener) { + requireNonNull(sessionId, "sessionId must not be null"); + + try { + var sessionResponse = channelPool.execute( + channel -> SessionsGrpc.newBlockingStub(channel).getSession(toGetSessionRequest(sessionId)) + ); + if (!sessionResponse.hasSession()) throw new ArmoniKException("Session not found"); + + var sessionInfo = new SessionInfo( + sessionId, + new HashSet<>(sessionResponse.getSession().getPartitionIdsList()), + toTaskConfiguration(sessionResponse.getSession().getOptions()) + ); + + return new SessionHandle(sessionInfo, outputListener, channelPool); + } catch (Exception e) { + throw new ArmoniKException(e.getMessage(), e); + } + } + + /** + * Connects to an existing session in the ArmoniK cluster without registering + * an output listener. + *
+ * This is a convenience overload equivalent to calling: + *
{@code
+ * getSession(sessionId, null);
+ * }
+ *
+ * Error Handling: + * If the session does not exist, this method throws an {@link ArmoniKException}. + * + * @param sessionId the identifier of the existing session to connect to + * @return a {@link SessionHandle} for interacting with the session + * @throws NullPointerException if {@code sessionId} is {@code null} + * @throws ArmoniKException if the session does not exist, or if a communication error occurs + * @see #getSession(SessionId, BlobCompletionListener) + */ + public SessionHandle getSession(SessionId sessionId) { + return getSession(sessionId, null); + } + + /** + * Closes a session in the ArmoniK cluster by its identifier. + *
+ * A closed session no longer accepts new task submissions, but all existing tasks, results, + * and metadata remain available in the control plane. Closing a session is the recommended + * finalization step once no additional tasks will be submitted. + *
+ * This client-level method is provided as a convenience when the caller does not + * hold a {@link SessionHandle}. When a {@code SessionHandle} instance is available, + * prefer calling {@link SessionHandle#close()} so that all lifecycle-related + * operations remain grouped on the handle itself. + *
+ * The returned {@link CompletionStage} completes asynchronously with the updated
+ * {@link SessionState} returned by the Sessions service, or completes exceptionally
+ * if the request fails or the session does not exist.
+ *
+ * @param sessionId the identifier of the session to close; must not be {@code null}
+ * @return a completion stage yielding the updated state of the session after the
+ * close operation has been applied
+ * @throws NullPointerException if {@code sessionId} is {@code null}
+ * @see SessionHandle#close()
+ * @see SessionState
+ */
+ public CompletionStage
+ * Cancelling a session instructs the control plane to stop all remaining work associated
+ * with the session. Depending on the scheduling and execution state, running tasks may be
+ * interrupted, and queued tasks will no longer be scheduled.
+ *
+ * A cancelled session still retains its metadata, and completed task results remain
+ * accessible unless the session is later purged or deleted. Cancellation is useful
+ * when the overall workflow must be terminated early due to failure or user decision.
+ *
+ * This client-level convenience method is intended for scenarios where the caller
+ * does not have a {@link SessionHandle} instance. When such a handle is available,
+ * prefer calling {@link SessionHandle#cancel()} to keep lifecycle operations grouped
+ * with the associated session.
+ *
+ * The returned {@link CompletionStage} completes asynchronously with the updated
+ * {@link SessionState} returned by the Sessions service, or completes exceptionally
+ * if the request fails or the session does not exist.
+ *
+ * @param sessionId the identifier of the session to cancel; must not be {@code null}
+ * @return a completion stage yielding the updated state of the session after the
+ * cancellation operation has been applied
+ * @throws NullPointerException if {@code sessionId} is {@code null}
+ * @see SessionHandle#cancel()
+ * @see SessionState
+ */
+ public CompletionStage
- * This constructor initializes the handle with the necessary components for task submission
- * and output processing within the session context. The handle will use the provided
- * gRPC channel for all cluster communications.
+ * It initializes the handle with the session metadata and configures task submission
+ * and optional output blob monitoring using the provided {@link BlobCompletionListener}.
+ *
+ * If an output listener is supplied, this handle automatically begins monitoring
+ * output blob completions using the default batching strategy (not exposed in the public API).
*
- * @param sessionInfo the immutable session metadata including session ID and configuration
- * @param sessionDefinition the session definition used for task default configurations
- * @param channelPool the gRPC channel pool for cluster communication
- * @throws NullPointerException if any parameter is null
+ * @param sessionInfo immutable metadata describing the session, including the session ID
+ * @param outputListener optional listener notified when output blobs reach completion; may be {@code null}
+ * @param channelPool the gRPC channel pool used for communication with the cluster
+ * @throws NullPointerException if {@code sessionInfo} or {@code channelPool} is {@code null}
* @see SessionInfo
- * @see SessionDefinition
+ * @see BlobCompletionListener
+ * @see ArmoniKClient#openSession(SessionDefinition)
+ * @see ArmoniKClient#getSession(SessionId, BlobCompletionListener)
*/
- SessionHandle(SessionInfo sessionInfo, SessionDefinition sessionDefinition, ChannelPool channelPool) {
- requireNonNull(sessionDefinition, "sessionDefinition must not be null");
-
+ SessionHandle(SessionInfo sessionInfo, BlobCompletionListener outputListener, ChannelPool channelPool) {
this.sessionInfo = requireNonNull(sessionInfo, "sessionInfo must not be null");
- this.taskSubmitter = new TaskSubmitter(sessionInfo.id(), sessionDefinition, channelPool);
+ this.taskSubmitter = new TaskSubmitter(sessionInfo.id(), sessionInfo.taskConfiguration(), outputListener, channelPool);
this.channelPool = requireNonNull(channelPool, "channelPool must not be null");
}
@@ -185,6 +189,176 @@ public BlobHandle createBlob(InputBlobDefinition blobDefinition) {
return blobHandle;
}
+ /**
+ * Requests cancellation of this session in the ArmoniK cluster.
+ *
+ * Cancelling a session signals the control plane to stop all remaining work associated
+ * with this session. Depending on the server configuration and current task states,
+ * running tasks may be interrupted and queued tasks will no longer be scheduled.
+ * Results that have already been produced remain accessible unless the session
+ * is subsequently purged or deleted.
+ *
+ * The returned completion stage is completed asynchronously with the updated
+ * {@link SessionState} once the cancellation request has been processed by the
+ * Sessions service, or completed exceptionally if the request fails.
+ *
+ * @return a completion stage that yields the updated state of this session after
+ * the cancellation request has been applied
+ *
+ * @see SessionState
+ */
+ public CompletionStage
+ * Pausing a session temporarily suspends the scheduling of new tasks in the session.
+ * Tasks that are already running continue until completion, but pending tasks are
+ * not picked up for execution until the session is resumed via {@link #resume()}.
+ *
+ * This operation is useful when you need to temporarily throttle or stop processing
+ * without cancelling the session or losing its state.
+ * The returned completion stage is completed asynchronously with the updated
+ * {@link SessionState} once the pause request has been processed by the Sessions
+ * service, or completed exceptionally if the request fails.
+ *
+ * @return a completion stage that yields the updated state of this session after
+ * the pause request has been applied
+ *
+ * @see #resume()
+ * @see SessionState
+ */
+ public CompletionStage
+ * Resuming a session re-enables scheduling of tasks that were previously held
+ * while the session was paused. Any pending tasks become eligible for execution
+ * again according to the cluster scheduling policies.
+ *
+ * Calling this method on a session that is not paused is safe; the Sessions service
+ * will simply return the current state of the session.
+ * The returned completion stage is completed asynchronously with the updated
+ * {@link SessionState} once the resume request has been processed by the Sessions
+ * service, or completed exceptionally if the request fails.
+ *
+ * @return a completion stage that yields the updated state of this session after
+ * the resume request has been applied
+ *
+ * @see #pause()
+ * @see SessionState
+ */
+ public CompletionStage
+ * Closing a session finalizes it and prevents any new task submissions, while
+ * preserving existing tasks, results, and metadata. This is the recommended way
+ * to indicate that no further work will be submitted for this session once all
+ * expected tasks have been created.
+ *
+ * Closing a session does not remove its data. To free up storage or completely
+ * remove the session, combine this operation with {@link #purge()} and
+ * {@link #delete()} as appropriate.
+ * The returned completion stage is completed asynchronously with the updated
+ * {@link SessionState} once the close request has been processed by the Sessions
+ * service, or completed exceptionally if the request fails.
+ *
+ * @return a completion stage that yields the updated state of this session after
+ * the close request has been applied
+ *
+ * @see #purge()
+ * @see #delete()
+ * @see SessionState
+ */
+ public CompletionStage
+ * Purging a session removes the underlying data for its blobs (task inputs and
+ * outputs) from the storage layer while keeping the session and task metadata.
+ * This operation is useful to reclaim storage space once the actual data is no
+ * longer needed but you still want to keep an audit trail or execution history.
+ *
+ * After a purge, attempts to download blob data associated with this session
+ * will fail, but session and task information remain available until the session
+ * is deleted.
+ * The returned completion stage is completed asynchronously with the updated
+ * {@link SessionState} once the purge request has been processed by the Sessions
+ * service, or completed exceptionally if the request fails.
+ *
+ * @return a completion stage that yields the updated state of this session after
+ * the purge request has been applied
+ *
+ * @see #delete()
+ * @see SessionState
+ */
+ public CompletionStage
+ * Deleting a session permanently removes its metadata from the Sessions, Tasks,
+ * and Blobs. This is typically the final step in the lifecycle of a
+ * session once it has been closed and, optionally, purged.
+ *
+ * After deletion, this handle still exists as a local object but any further
+ * interaction with the remote session (such as submitting tasks or querying
+ * state) will fail because the session no longer exists in the cluster.
+ * Clients are expected to discard the handle after deletion.
+ * The returned completion stage is completed asynchronously with the updated
+ * {@link SessionState} reported by the Sessions service just before removal,
+ * or completed exceptionally if the request fails.
+ *
+ * @return a completion stage that yields the last known state of this session
+ * before it is removed from the cluster
+ *
+ * @see #close()
+ * @see #purge()
+ * @see SessionState
+ */
+ public CompletionStage
+ * A {@code SessionState} instance mirrors the information returned by the Sessions service
+ * for a given session. It aggregates the current lifecycle status, submission flags,
+ * configuration and key timestamps into a single value object. Instances of this record
+ * are typically obtained from session lifecycle operations such as
+ * {@link SessionHandle#cancel()}, {@link SessionHandle#pause()}, {@link SessionHandle#resume()},
+ * {@link SessionHandle#close()}, {@link SessionHandle#purge()} and {@link SessionHandle#delete()},
+ * or from {@link ArmoniKClient#closeSession(SessionId)}.
+ *
+ * Time-related fields may be {@code null} when the corresponding lifecycle transition
+ * has not occurred yet. For example, {@code cancelledAt} is only populated when the
+ * session has been cancelled, and {@code closedAt} is only populated after the session
+ * has been closed. The {@code duration} field is generally set when the session is in a
+ * terminal state (such as {@code CANCELLED} or {@code CLOSED}) and represents the elapsed
+ * time between creation and termination as computed by the control plane.
+ *
+ * @param sessionId the unique identifier of the session
+ * @param status the current lifecycle status of the session as reported by the cluster
+ * @param clientSubmission whether clients are currently allowed to submit new tasks in this session
+ * @param workerSubmission whether workers are currently allowed to submit tasks in this session
+ * @param partitionIds the set of partition identifiers associated with this session; determines where
+ * tasks may be scheduled
+ * @param taskConfiguration the default task configuration applied to tasks created in this session
+ * @param createdAt the instant at which the session was created in the cluster; never {@code null}
+ * for a valid session
+ * @param cancelledAt the instant at which the session was cancelled, or {@code null} if the session
+ * has not been cancelled
+ * @param closedAt the instant at which the session was closed, or {@code null} if the session
+ * has not been closed
+ * @param purgedAt the instant at which the session's data was purged from storage, or {@code null}
+ * if no purge has been performed
+ * @param deletedAt the instant at which the session was deleted from the control plane, or {@code null}
+ * if the session still exists
+ * @param duration the total duration of the session as computed by the control plane, typically set
+ * when the session reaches a terminal state; may be {@code null} otherwise
+ * @see SessionHandle
+ * @see SessionStatus
+ * @see TaskConfiguration
+ */
+public record SessionState(
+ SessionId sessionId,
+ SessionStatus status,
+ boolean clientSubmission,
+ boolean workerSubmission,
+ List
- * The task submitter will use the session's configuration for default task settings and
- * will optionally set up blob completion coordination if an output listener is configured
- * in the session definition.
+ * This constructor is used internally by {@link SessionHandle} when a session is opened
+ * or retrieved via {@link ArmoniKClient}. It configures the default task settings for the
+ * session and, if an {@link BlobCompletionListener} is provided, initializes output blob
+ * completion coordination using the default batching strategy.
+ *
+ * When {@code taskConfiguration} is {@code null}, the global
+ * {@link TaskConfiguration#defaultConfiguration() default configuration} is applied.
*
* @param sessionId the identifier of the session context for task submission
- * @param sessionDefinition the session definition containing task configuration and output listener
+ * @param taskConfiguration the default task configuration to apply for submissions in this session;
+ * may be {@code null} to use the global default
+ * @param outputListener optional listener for output blob completion events; may be {@code null}
* @param channelPool the gRPC channel pool for cluster communication
- * @throws NullPointerException if any parameter is null
- * @see SessionDefinition
+ * @throws NullPointerException if {@code sessionId} or {@code channelPool} is {@code null}
+ * @see SessionHandle
+ * @see TaskConfiguration
+ * @see BlobCompletionListener
* @see BlobCompletionCoordinator
*/
TaskSubmitter(SessionId sessionId,
- SessionDefinition sessionDefinition,
+ TaskConfiguration taskConfiguration,
+ BlobCompletionListener outputListener,
ChannelPool channelPool) {
- this.sessionId = sessionId;
- this.defaultTaskConfiguration = sessionDefinition.taskConfiguration() != null ? sessionDefinition.taskConfiguration() : defaultConfiguration();
- this.channelPool = channelPool;
-
- if (sessionDefinition.outputListener() != null) {
- this.blobCompletionCoordinator = new BlobCompletionCoordinator(sessionId, channelPool, sessionDefinition);
+ this.sessionId = requireNonNull(sessionId);
+ this.defaultTaskConfiguration = requireNonNullElse(taskConfiguration, defaultConfiguration());
+ this.channelPool = requireNonNull(channelPool);
+ if (outputListener != null) {
+ this.blobCompletionCoordinator = new BlobCompletionCoordinator(sessionId, channelPool, outputListener);
}
}
@@ -198,7 +207,11 @@ TaskHandle submit(TaskDefinition taskDefinition) {
* @see SessionHandle#awaitOutputsProcessed()
*/
CompletionStage
*
*
* @throws NullPointerException if partitionIds is null
* @throws IllegalArgumentException if partitionIds is empty or if task configuration
* specifies a partition not included in the session's partition set
* @see TaskConfiguration#defaultConfiguration()
- * @see BatchingPolicy#DEFAULT
*/
public SessionDefinition {
- taskConfiguration = taskConfiguration == null ? TaskConfiguration.defaultConfiguration() : taskConfiguration;
- partitionIds = partitionIds == null ? Set.of() : partitionIds;
- outputBatchingPolicy = outputBatchingPolicy == null ? BatchingPolicy.DEFAULT : outputBatchingPolicy;
+ taskConfiguration = requireNonNullElse(taskConfiguration, TaskConfiguration.defaultConfiguration());
+ partitionIds = requireNonNullElse(partitionIds, Set.of());
validateTaskDefaultPartition(partitionIds, taskConfiguration);
}
diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java
index 9f76080..fb876eb 100644
--- a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java
+++ b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/concurrent/Schedulers.java
@@ -15,8 +15,6 @@
*/
package fr.aneo.armonik.client.internal.concurrent;
-import fr.aneo.armonik.client.BatchingPolicy;
-
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -38,7 +36,6 @@
* A JVM shutdown hook is automatically registered to properly shut down the executor when
* the application terminates, ensuring clean resource cleanup.
*
- * @see BatchingPolicy
*/
public class Schedulers {
diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java
index 3e85327..9c1182c 100644
--- a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java
+++ b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/SessionMapper.java
@@ -15,9 +15,18 @@
*/
package fr.aneo.armonik.client.internal.grpc.mappers;
+import com.google.protobuf.Timestamp;
+import fr.aneo.armonik.api.grpc.v1.sessions.SessionStatusOuterClass;
+import fr.aneo.armonik.client.SessionId;
+import fr.aneo.armonik.client.SessionState;
+import fr.aneo.armonik.client.SessionStatus;
import fr.aneo.armonik.client.definition.SessionDefinition;
-import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.CreateSessionRequest;
+import java.time.Duration;
+import java.time.Instant;
+
+import static fr.aneo.armonik.api.grpc.v1.sessions.SessionsCommon.*;
+import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration;
import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskOptions;
public final class SessionMapper {
@@ -31,4 +40,72 @@ public static CreateSessionRequest toCreateSessionRequest(SessionDefinition sess
.setDefaultTaskOption(toTaskOptions(sessionDefinition.taskConfiguration()))
.build();
}
+
+ public static GetSessionRequest toGetSessionRequest(SessionId sessionId) {
+ return GetSessionRequest.newBuilder()
+ .setSessionId(sessionId.asString())
+ .build();
+ }
+
+ public static CancelSessionRequest toCancelSessionRequest(SessionId sessionId) {
+ return CancelSessionRequest.newBuilder().setSessionId(sessionId.asString()).build();
+ }
+
+ public static PauseSessionRequest toPauseSessionRequest(SessionId sessionId) {
+ return PauseSessionRequest.newBuilder().setSessionId(sessionId.asString()).build();
+ }
+
+ public static ResumeSessionRequest toResumeSessionRequest(SessionId sessionId) {
+ return ResumeSessionRequest.newBuilder().setSessionId(sessionId.asString()).build();
+ }
+
+ public static CloseSessionRequest toCloseSessionRequest(SessionId sessionId) {
+ return CloseSessionRequest.newBuilder().setSessionId(sessionId.asString()).build();
+ }
+
+ public static PurgeSessionRequest toPurgeSessionRequest(SessionId sessionId) {
+ return PurgeSessionRequest.newBuilder().setSessionId(sessionId.asString()).build();
+ }
+
+ public static DeleteSessionRequest toDeleteSessionRequest(SessionId sessionId) {
+ return DeleteSessionRequest.newBuilder().setSessionId(sessionId.asString()).build();
+ }
+
+ public static SessionState toSessionState(SessionRaw sessionRaw) {
+ return new SessionState(
+ SessionId.from(sessionRaw.getSessionId()),
+ toSessionStatus(sessionRaw.getStatus()),
+ sessionRaw.getClientSubmission(),
+ sessionRaw.getWorkerSubmission(),
+ sessionRaw.getPartitionIdsList(),
+ toTaskConfiguration(sessionRaw.getOptions()),
+ sessionRaw.hasCreatedAt() ? toInstant(sessionRaw.getCreatedAt()) : null,
+ sessionRaw.hasCancelledAt() ? toInstant(sessionRaw.getCancelledAt()) : null,
+ sessionRaw.hasClosedAt() ? toInstant(sessionRaw.getClosedAt()) : null,
+ sessionRaw.hasPurgedAt() ? toInstant(sessionRaw.getPurgedAt()) : null,
+ sessionRaw.hasDeletedAt() ? toInstant(sessionRaw.getDeletedAt()) : null,
+ sessionRaw.hasDuration() ? toDuration(sessionRaw.getDuration()) : null
+ );
+ }
+
+ private static SessionStatus toSessionStatus(SessionStatusOuterClass.SessionStatus sessionStatus) {
+ return switch (sessionStatus) {
+ case SESSION_STATUS_UNSPECIFIED -> SessionStatus.UNSPECIFIED;
+ case SESSION_STATUS_RUNNING -> SessionStatus.RUNNING;
+ case SESSION_STATUS_PAUSED -> SessionStatus.PAUSED;
+ case SESSION_STATUS_CANCELLED -> SessionStatus.CANCELLED;
+ case SESSION_STATUS_CLOSED -> SessionStatus.CLOSED;
+ case SESSION_STATUS_PURGED -> SessionStatus.PURGED;
+ case SESSION_STATUS_DELETED -> SessionStatus.DELETED;
+ default -> throw new IllegalArgumentException("Unknown session status: " + sessionStatus);
+ };
+ }
+
+ private static Duration toDuration(com.google.protobuf.Duration protobufDuration) {
+ return Duration.ofSeconds(protobufDuration.getSeconds(), protobufDuration.getNanos());
+ }
+
+ private static Instant toInstant(Timestamp timestamp) {
+ return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
+ }
}
diff --git a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java
index e9e9bb3..d3edbed 100644
--- a/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java
+++ b/armonik-client/src/main/java/fr/aneo/armonik/client/internal/grpc/mappers/TaskMapper.java
@@ -59,4 +59,15 @@ public static TaskOptions toTaskOptions(TaskConfiguration taskConfiguration) {
.putAllOptions(taskConfiguration.options())
.build();
}
+
+ public static TaskConfiguration toTaskConfiguration(TaskOptions taskOptions) {
+ return new TaskConfiguration(
+ taskOptions.getMaxRetries(),
+ taskOptions.getPriority(),
+ taskOptions.getPartitionId(),
+ java.time.Duration.ofSeconds(taskOptions.getMaxDuration().getSeconds(), taskOptions.getMaxDuration().getNanos()),
+ taskOptions.getOptionsMap()
+ );
+ }
+
}
diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java b/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java
index ac5ba70..e7b1abd 100644
--- a/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java
+++ b/armonik-client/src/test/java/fr/aneo/armonik/client/ArmoniKClientTest.java
@@ -16,6 +16,7 @@
package fr.aneo.armonik.client;
import fr.aneo.armonik.client.definition.SessionDefinition;
+import fr.aneo.armonik.client.exception.ArmoniKException;
import fr.aneo.armonik.client.testutils.InProcessGrpcTestBase;
import fr.aneo.armonik.client.testutils.SessionsGrpcMock;
import io.grpc.BindableService;
@@ -46,6 +47,7 @@ void setUp() {
}
@Test
+ @DisplayName("should open a new session successfully")
void should_open_a_new_session_successfully() {
// When
var sessionHandle = client.openSession(sessionDefinition);
@@ -64,6 +66,63 @@ void should_open_a_new_session_successfully() {
assertThat(sessionsGrpcMock.submittedCreateSessionRequest.getDefaultTaskOption().getOptionsMap()).isEqualTo(Map.of("option1", "value1"));
}
+ @Test
+ @DisplayName("should get an existing session")
+ void should_get_an_existing_session() {
+ // Given
+ var sessionId = SessionId.from("session_1");
+
+ // When
+ var sessionHandle = client.getSession(sessionId);
+
+ // Then
+ assertThat(sessionsGrpcMock.submittedGetSessionRequest.getSessionId()).isEqualTo(sessionId.asString());
+ assertThat(sessionHandle.sessionInfo()).isEqualTo(
+ new SessionInfo(
+ sessionId,
+ Set.of("partition1", "partition2"),
+ new TaskConfiguration(2, 5, "partition1", Duration.ofMinutes(60), Map.of("option1", "value1"))
+ ));
+ }
+
+ @Test
+ @DisplayName("should close a session")
+ void should_close_an_existing_session() {
+ // Given
+ var sessionId = SessionId.from("session_1");
+
+ // When
+ client.closeSession(sessionId);
+
+ // Then
+ assertThat(sessionsGrpcMock.submittedCloseSessionRequest).isNotNull();
+ assertThat(sessionsGrpcMock.submittedCloseSessionRequest.getSessionId()).isEqualTo(sessionId.asString());
+ }
+
+ @Test
+ @DisplayName("should cancel a session")
+ void should_cancel_an_existing_session() {
+ // Given
+ var sessionId = SessionId.from("session_1");
+
+ // When
+ client.cancelSession(sessionId);
+
+ // Then
+ assertThat(sessionsGrpcMock.submittedCancelSessionRequest).isNotNull();
+ assertThat(sessionsGrpcMock.submittedCancelSessionRequest.getSessionId()).isEqualTo(sessionId.asString());
+ }
+
+ @Test
+ @DisplayName("should throw exception when session does not exist")
+ void should_throw_exception_when_session_does_not_exist() {
+ // Given
+ var sessionId = SessionId.from("does not exist");
+
+ // When / Then
+ assertThatThrownBy(() -> client.getSession(sessionId)).isInstanceOf(ArmoniKException.class);
+ }
+
@Test
@DisplayName("should close channel pool when client is closed")
void should_close_channel_pool_when_client_is_closed() {
diff --git a/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java b/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java
index 13928ba..d3163d6 100644
--- a/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java
+++ b/armonik-client/src/test/java/fr/aneo/armonik/client/SessionHandleTest.java
@@ -16,14 +16,10 @@
package fr.aneo.armonik.client;
import com.google.gson.Gson;
-import fr.aneo.armonik.client.definition.SessionDefinition;
import fr.aneo.armonik.client.definition.TaskDefinition;
import fr.aneo.armonik.client.definition.blob.InputBlobDefinition;
import fr.aneo.armonik.client.definition.blob.OutputBlobDefinition;
-import fr.aneo.armonik.client.testutils.EventsGrpcMock;
-import fr.aneo.armonik.client.testutils.InProcessGrpcTestBase;
-import fr.aneo.armonik.client.testutils.ResultsGrpcMock;
-import fr.aneo.armonik.client.testutils.TasksGrpcMock;
+import fr.aneo.armonik.client.testutils.*;
import io.grpc.BindableService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
@@ -33,7 +29,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
@@ -42,7 +37,7 @@
import static fr.aneo.armonik.client.TestDataFactory.blobHandle;
import static fr.aneo.armonik.client.TestDataFactory.sessionInfo;
import static fr.aneo.armonik.client.WorkerLibrary.*;
-import static fr.aneo.armonik.client.testutils.ResultsGrpcMock.*;
+import static fr.aneo.armonik.client.testutils.ResultsGrpcMock.MetadataRequest;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -52,26 +47,22 @@ class SessionHandleTest extends InProcessGrpcTestBase {
private final ResultsGrpcMock resultsGrpcMock = new ResultsGrpcMock();
private final TasksGrpcMock taskGrpcMock = new TasksGrpcMock();
private final EventsGrpcMock eventsGrpcMock = new EventsGrpcMock();
+ private final SessionsGrpcMock sessionsGrpcMock = new SessionsGrpcMock();
private BlobCompletionListenerMock outputTaskListener;
private SessionInfo sessionInfo;
private SessionHandle sessionHandle;
@Override
protected List