Skip to content

Commit cc9e367

Browse files
committed
feat(client): add full session lifecycle management
1 parent 254aa9e commit cc9e367

9 files changed

Lines changed: 756 additions & 41 deletions

File tree

armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc;
1919
import fr.aneo.armonik.client.definition.SessionDefinition;
2020
import fr.aneo.armonik.client.exception.ArmoniKException;
21+
import fr.aneo.armonik.client.internal.concurrent.Futures;
2122
import io.grpc.ManagedChannel;
2223

2324
import java.time.Duration;
2425
import java.util.HashSet;
26+
import java.util.concurrent.CompletionStage;
2527

26-
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toCreateSessionRequest;
27-
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toGetSessionRequest;
28+
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*;
29+
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toSessionState;
2830
import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration;
2931
import static java.util.Objects.requireNonNull;
3032

@@ -126,19 +128,12 @@ public SessionHandle openSession(SessionDefinition sessionDefinition) {
126128
* <p><strong>Error Handling:</strong>
127129
* If the session does not exist, this method throws an {@link ArmoniKException}
128130
*
129-
* @param sessionId
130-
* the identifier of the existing session to connect to
131-
* @param outputListener
132-
* an optional listener for output blob completion events;
133-
* if {@code null}, no listener is registered
134-
*
131+
* @param sessionId the identifier of the existing session to connect to
132+
* @param outputListener an optional listener for output blob completion events;
133+
* if {@code null}, no listener is registered
135134
* @return a {@link SessionHandle} bound to the existing session
136-
*
137-
* @throws NullPointerException
138-
* if {@code sessionId} is {@code null}
139-
* @throws ArmoniKException
140-
* if the session does not exist, or if a communication error occurs
141-
*
135+
* @throws NullPointerException if {@code sessionId} is {@code null}
136+
* @throws ArmoniKException if the session does not exist, or if a communication error occurs
142137
* @see SessionHandle
143138
* @see BlobCompletionListener
144139
*/
@@ -175,22 +170,86 @@ public SessionHandle getSession(SessionId sessionId, BlobCompletionListener outp
175170
* <p><strong>Error Handling:</strong>
176171
* If the session does not exist, this method throws an {@link ArmoniKException}.
177172
*
178-
* @param sessionId
179-
* the identifier of the existing session to connect to
180-
*
173+
* @param sessionId the identifier of the existing session to connect to
181174
* @return a {@link SessionHandle} for interacting with the session
182-
*
183-
* @throws NullPointerException
184-
* if {@code sessionId} is {@code null}
185-
* @throws ArmoniKException
186-
* if the session does not exist, or if a communication error occurs
187-
*
175+
* @throws NullPointerException if {@code sessionId} is {@code null}
176+
* @throws ArmoniKException if the session does not exist, or if a communication error occurs
188177
* @see #getSession(SessionId, BlobCompletionListener)
189178
*/
190179
public SessionHandle getSession(SessionId sessionId) {
191180
return getSession(sessionId, null);
192181
}
193182

183+
/**
184+
* Closes a session in the ArmoniK cluster by its identifier.
185+
* <p>
186+
* A closed session no longer accepts new task submissions, but all existing tasks, results,
187+
* and metadata remain available in the control plane. Closing a session is the recommended
188+
* finalization step once no additional tasks will be submitted.
189+
* <p>
190+
* This client-level method is provided as a convenience when the caller does not
191+
* hold a {@link SessionHandle}. When a {@code SessionHandle} instance is available,
192+
* prefer calling {@link SessionHandle#close()} so that all lifecycle-related
193+
* operations remain grouped on the handle itself.
194+
* <p>
195+
* The returned {@link CompletionStage} completes asynchronously with the updated
196+
* {@link SessionState} returned by the Sessions service, or completes exceptionally
197+
* if the request fails or the session does not exist.
198+
*
199+
* @param sessionId the identifier of the session to close; must not be {@code null}
200+
* @return a completion stage yielding the updated state of the session after the
201+
* close operation has been applied
202+
* @throws NullPointerException if {@code sessionId} is {@code null}
203+
* @see SessionHandle#close()
204+
* @see SessionState
205+
*/
206+
public CompletionStage<SessionState> closeSession(SessionId sessionId) {
207+
requireNonNull(sessionId, "sessionId must not be null");
208+
209+
return channelPool.executeAsync(channel -> {
210+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
211+
return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionId)))
212+
.thenApply(response -> toSessionState(response.getSession()));
213+
});
214+
}
215+
216+
/**
217+
* Cancels a session in the ArmoniK cluster by its identifier.
218+
* <p>
219+
* Cancelling a session instructs the control plane to stop all remaining work associated
220+
* with the session. Depending on the scheduling and execution state, running tasks may be
221+
* interrupted, and queued tasks will no longer be scheduled.
222+
* <p>
223+
* A cancelled session still retains its metadata, and completed task results remain
224+
* accessible unless the session is later purged or deleted. Cancellation is useful
225+
* when the overall workflow must be terminated early due to failure or user decision.
226+
* <p>
227+
* This client-level convenience method is intended for scenarios where the caller
228+
* does not have a {@link SessionHandle} instance. When such a handle is available,
229+
* prefer calling {@link SessionHandle#cancel()} to keep lifecycle operations grouped
230+
* with the associated session.
231+
* <p>
232+
* The returned {@link CompletionStage} completes asynchronously with the updated
233+
* {@link SessionState} returned by the Sessions service, or completes exceptionally
234+
* if the request fails or the session does not exist.
235+
*
236+
* @param sessionId the identifier of the session to cancel; must not be {@code null}
237+
* @return a completion stage yielding the updated state of the session after the
238+
* cancellation operation has been applied
239+
* @throws NullPointerException if {@code sessionId} is {@code null}
240+
* @see SessionHandle#cancel()
241+
* @see SessionState
242+
*/
243+
public CompletionStage<SessionState> cancelSession(SessionId sessionId) {
244+
requireNonNull(sessionId, "sessionId must not be null");
245+
246+
return channelPool.executeAsync(channel -> {
247+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
248+
return Futures.toCompletionStage(sessionsFutureStub.cancelSession(toCancelSessionRequest(sessionId)))
249+
.thenApply(response -> toSessionState(response.getSession()));
250+
});
251+
}
252+
194253
/**
195254
* Closes this client and releases all associated resources.
196255
* <p>

armonik-client/src/main/java/fr/aneo/armonik/client/SessionHandle.java

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package fr.aneo.armonik.client;
1717

1818
import fr.aneo.armonik.api.grpc.v1.results.ResultsGrpc;
19+
import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc;
1920
import fr.aneo.armonik.client.definition.SessionDefinition;
2021
import fr.aneo.armonik.client.definition.TaskDefinition;
2122
import fr.aneo.armonik.client.definition.blob.BlobDefinition;
@@ -29,6 +30,7 @@
2930
import java.util.function.Function;
3031

3132
import static fr.aneo.armonik.client.internal.grpc.mappers.BlobMapper.toResultMetaDataRequest;
33+
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*;
3234
import static java.util.Objects.requireNonNull;
3335

3436
/**
@@ -187,6 +189,176 @@ public BlobHandle createBlob(InputBlobDefinition blobDefinition) {
187189
return blobHandle;
188190
}
189191

192+
/**
193+
* Requests cancellation of this session in the ArmoniK cluster.
194+
* <p>
195+
* Cancelling a session signals the control plane to stop all remaining work associated
196+
* with this session. Depending on the server configuration and current task states,
197+
* running tasks may be interrupted and queued tasks will no longer be scheduled.
198+
* Results that have already been produced remain accessible unless the session
199+
* is subsequently purged or deleted.
200+
* <p>
201+
* The returned completion stage is completed asynchronously with the updated
202+
* {@link SessionState} once the cancellation request has been processed by the
203+
* Sessions service, or completed exceptionally if the request fails.
204+
*
205+
* @return a completion stage that yields the updated state of this session after
206+
* the cancellation request has been applied
207+
*
208+
* @see SessionState
209+
*/
210+
public CompletionStage<SessionState> cancel() {
211+
return channelPool.executeAsync(channel -> {
212+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
213+
return Futures.toCompletionStage(sessionsFutureStub.cancelSession(toCancelSessionRequest(sessionInfo.id())))
214+
.thenApply(response -> toSessionState(response.getSession()));
215+
});
216+
}
217+
218+
/**
219+
* Pauses this session in the ArmoniK cluster.
220+
* <p>
221+
* Pausing a session temporarily suspends the scheduling of new tasks in the session.
222+
* Tasks that are already running continue until completion, but pending tasks are
223+
* not picked up for execution until the session is resumed via {@link #resume()}.
224+
* <p>
225+
* This operation is useful when you need to temporarily throttle or stop processing
226+
* without cancelling the session or losing its state.
227+
* The returned completion stage is completed asynchronously with the updated
228+
* {@link SessionState} once the pause request has been processed by the Sessions
229+
* service, or completed exceptionally if the request fails.
230+
*
231+
* @return a completion stage that yields the updated state of this session after
232+
* the pause request has been applied
233+
*
234+
* @see #resume()
235+
* @see SessionState
236+
*/
237+
public CompletionStage<SessionState> pause() {
238+
return channelPool.executeAsync(channel -> {
239+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
240+
return Futures.toCompletionStage(sessionsFutureStub.pauseSession(toPauseSessionRequest(sessionInfo.id())))
241+
.thenApply(response -> toSessionState(response.getSession()));
242+
});
243+
}
244+
/**
245+
* Resumes a previously paused session in the ArmoniK cluster.
246+
* <p>
247+
* Resuming a session re-enables scheduling of tasks that were previously held
248+
* while the session was paused. Any pending tasks become eligible for execution
249+
* again according to the cluster scheduling policies.
250+
* <p>
251+
* Calling this method on a session that is not paused is safe; the Sessions service
252+
* will simply return the current state of the session.
253+
* The returned completion stage is completed asynchronously with the updated
254+
* {@link SessionState} once the resume request has been processed by the Sessions
255+
* service, or completed exceptionally if the request fails.
256+
*
257+
* @return a completion stage that yields the updated state of this session after
258+
* the resume request has been applied
259+
*
260+
* @see #pause()
261+
* @see SessionState
262+
*/
263+
public CompletionStage<SessionState> resume() {
264+
return channelPool.executeAsync(channel -> {
265+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
266+
return Futures.toCompletionStage(sessionsFutureStub.resumeSession(toResumeSessionRequest(sessionInfo.id())))
267+
.thenApply(response -> toSessionState(response.getSession()));
268+
});
269+
}
270+
271+
/**
272+
* Closes this session in the ArmoniK cluster.
273+
* <p>
274+
* Closing a session finalizes it and prevents any new task submissions, while
275+
* preserving existing tasks, results, and metadata. This is the recommended way
276+
* to indicate that no further work will be submitted for this session once all
277+
* expected tasks have been created.
278+
* <p>
279+
* Closing a session does not remove its data. To free up storage or completely
280+
* remove the session, combine this operation with {@link #purge()} and
281+
* {@link #delete()} as appropriate.
282+
* The returned completion stage is completed asynchronously with the updated
283+
* {@link SessionState} once the close request has been processed by the Sessions
284+
* service, or completed exceptionally if the request fails.
285+
*
286+
* @return a completion stage that yields the updated state of this session after
287+
* the close request has been applied
288+
*
289+
* @see #purge()
290+
* @see #delete()
291+
* @see SessionState
292+
*/
293+
public CompletionStage<SessionState> close() {
294+
return channelPool.executeAsync(channel -> {
295+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
296+
return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionInfo.id())))
297+
.thenApply(response -> toSessionState(response.getSession()));
298+
});
299+
}
300+
301+
/**
302+
* Purges this session's data in the ArmoniK cluster.
303+
* <p>
304+
* Purging a session removes the underlying data for its blobs (task inputs and
305+
* outputs) from the storage layer while keeping the session and task metadata.
306+
* This operation is useful to reclaim storage space once the actual data is no
307+
* longer needed but you still want to keep an audit trail or execution history.
308+
* <p>
309+
* After a purge, attempts to download blob data associated with this session
310+
* will fail, but session and task information remain available until the session
311+
* is deleted.
312+
* The returned completion stage is completed asynchronously with the updated
313+
* {@link SessionState} once the purge request has been processed by the Sessions
314+
* service, or completed exceptionally if the request fails.
315+
*
316+
* @return a completion stage that yields the updated state of this session after
317+
* the purge request has been applied
318+
*
319+
* @see #delete()
320+
* @see SessionState
321+
*/
322+
public CompletionStage<SessionState> purge() {
323+
return channelPool.executeAsync(channel -> {
324+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
325+
return Futures.toCompletionStage(sessionsFutureStub.purgeSession(toPurgeSessionRequest(sessionInfo.id())))
326+
.thenApply(response -> toSessionState(response.getSession()));
327+
});
328+
}
329+
330+
/**
331+
* Deletes this session from the ArmoniK cluster.
332+
* <p>
333+
* Deleting a session permanently removes its metadata from the Sessions, Tasks,
334+
* and Blobs. This is typically the final step in the lifecycle of a
335+
* session once it has been closed and, optionally, purged.
336+
* <p>
337+
* After deletion, this handle still exists as a local object but any further
338+
* interaction with the remote session (such as submitting tasks or querying
339+
* state) will fail because the session no longer exists in the cluster.
340+
* Clients are expected to discard the handle after deletion.
341+
* The returned completion stage is completed asynchronously with the updated
342+
* {@link SessionState} reported by the Sessions service just before removal,
343+
* or completed exceptionally if the request fails.
344+
*
345+
* @return a completion stage that yields the last known state of this session
346+
* before it is removed from the cluster
347+
*
348+
* @see #close()
349+
* @see #purge()
350+
* @see SessionState
351+
*/
352+
public CompletionStage<SessionState> delete() {
353+
return channelPool.executeAsync(channel -> {
354+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
355+
return Futures.toCompletionStage(sessionsFutureStub.deleteSession(toDeleteSessionRequest(sessionInfo.id())))
356+
.thenApply(response -> toSessionState(response.getSession()));
357+
});
358+
}
359+
360+
361+
190362
private Function<ManagedChannel, CompletionStage<BlobInfo>> createBlobInfo(BlobDefinition blobDefinition) {
191363
return channel -> {
192364
var request = toResultMetaDataRequest(sessionInfo.id(), List.of(blobDefinition));

0 commit comments

Comments
 (0)