Skip to content

Commit 00f0373

Browse files
committed
feat(client): add full session lifecycle management
1 parent 254aa9e commit 00f0373

9 files changed

Lines changed: 707 additions & 41 deletions

File tree

armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc;
1919
import fr.aneo.armonik.client.definition.SessionDefinition;
2020
import fr.aneo.armonik.client.exception.ArmoniKException;
21+
import fr.aneo.armonik.client.internal.concurrent.Futures;
2122
import io.grpc.ManagedChannel;
2223

2324
import java.time.Duration;
2425
import java.util.HashSet;
26+
import java.util.concurrent.CompletionStage;
2527

26-
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toCreateSessionRequest;
27-
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toGetSessionRequest;
28+
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*;
29+
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toSessionState;
2830
import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration;
2931
import static java.util.Objects.requireNonNull;
3032

@@ -126,19 +128,12 @@ public SessionHandle openSession(SessionDefinition sessionDefinition) {
126128
* <p><strong>Error Handling:</strong>
127129
* If the session does not exist, this method throws an {@link ArmoniKException}
128130
*
129-
* @param sessionId
130-
* the identifier of the existing session to connect to
131-
* @param outputListener
132-
* an optional listener for output blob completion events;
133-
* if {@code null}, no listener is registered
134-
*
131+
* @param sessionId the identifier of the existing session to connect to
132+
* @param outputListener an optional listener for output blob completion events;
133+
* if {@code null}, no listener is registered
135134
* @return a {@link SessionHandle} bound to the existing session
136-
*
137-
* @throws NullPointerException
138-
* if {@code sessionId} is {@code null}
139-
* @throws ArmoniKException
140-
* if the session does not exist, or if a communication error occurs
141-
*
135+
* @throws NullPointerException if {@code sessionId} is {@code null}
136+
* @throws ArmoniKException if the session does not exist, or if a communication error occurs
142137
* @see SessionHandle
143138
* @see BlobCompletionListener
144139
*/
@@ -175,22 +170,51 @@ public SessionHandle getSession(SessionId sessionId, BlobCompletionListener outp
175170
* <p><strong>Error Handling:</strong>
176171
* If the session does not exist, this method throws an {@link ArmoniKException}.
177172
*
178-
* @param sessionId
179-
* the identifier of the existing session to connect to
180-
*
173+
* @param sessionId the identifier of the existing session to connect to
181174
* @return a {@link SessionHandle} for interacting with the session
182-
*
183-
* @throws NullPointerException
184-
* if {@code sessionId} is {@code null}
185-
* @throws ArmoniKException
186-
* if the session does not exist, or if a communication error occurs
187-
*
175+
* @throws NullPointerException if {@code sessionId} is {@code null}
176+
* @throws ArmoniKException if the session does not exist, or if a communication error occurs
188177
* @see #getSession(SessionId, BlobCompletionListener)
189178
*/
190179
public SessionHandle getSession(SessionId sessionId) {
191180
return getSession(sessionId, null);
192181
}
193182

183+
/**
184+
* Closes a session in the ArmoniK cluster by its identifier.
185+
* <p>
186+
* This method sends a request to the Sessions service to transition the session
187+
* into the <strong>closed</strong> state. A closed session no longer accepts new task
188+
* submissions, but all existing tasks, results, and metadata remain available in the
189+
* control plane. Closing a session is the recommended finalization step once no
190+
* additional tasks will be submitted.
191+
* <p>
192+
* This client-level method is provided as a convenience when the caller does not
193+
* hold a {@link SessionHandle}. When a {@code SessionHandle} instance is available,
194+
* prefer calling {@link SessionHandle#close()} so that all lifecycle-related
195+
* operations remain grouped on the handle itself.
196+
* <p>
197+
* The returned {@link CompletionStage} completes asynchronously with the updated
198+
* {@link SessionState} returned by the Sessions service, or completes exceptionally
199+
* if the request fails or the session does not exist.
200+
*
201+
* @param sessionId the identifier of the session to close; must not be {@code null}
202+
* @return a completion stage yielding the updated state of the session after the
203+
* close operation has been applied
204+
* @throws NullPointerException if {@code sessionId} is {@code null}
205+
* @see SessionHandle#close()
206+
* @see SessionState
207+
*/
208+
public CompletionStage<SessionState> closeSession(SessionId sessionId) {
209+
requireNonNull(sessionId, "sessionId must not be null");
210+
211+
return channelPool.executeAsync(channel -> {
212+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
213+
return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionId)))
214+
.thenApply(response -> toSessionState(response.getSession()));
215+
});
216+
}
217+
194218
/**
195219
* Closes this client and releases all associated resources.
196220
* <p>

armonik-client/src/main/java/fr/aneo/armonik/client/SessionHandle.java

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package fr.aneo.armonik.client;
1717

1818
import fr.aneo.armonik.api.grpc.v1.results.ResultsGrpc;
19+
import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc;
1920
import fr.aneo.armonik.client.definition.SessionDefinition;
2021
import fr.aneo.armonik.client.definition.TaskDefinition;
2122
import fr.aneo.armonik.client.definition.blob.BlobDefinition;
@@ -29,6 +30,7 @@
2930
import java.util.function.Function;
3031

3132
import static fr.aneo.armonik.client.internal.grpc.mappers.BlobMapper.toResultMetaDataRequest;
33+
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*;
3234
import static java.util.Objects.requireNonNull;
3335

3436
/**
@@ -187,6 +189,176 @@ public BlobHandle createBlob(InputBlobDefinition blobDefinition) {
187189
return blobHandle;
188190
}
189191

192+
/**
193+
* Requests cancellation of this session in the ArmoniK cluster.
194+
* <p>
195+
* Cancelling a session signals the control plane to stop all remaining work associated
196+
* with this session. Depending on the server configuration and current task states,
197+
* running tasks may be interrupted and queued tasks will no longer be scheduled.
198+
* Results that have already been produced remain accessible unless the session
199+
* is subsequently purged or deleted.
200+
* <p>
201+
* The returned completion stage is completed asynchronously with the updated
202+
* {@link SessionState} once the cancellation request has been processed by the
203+
* Sessions service, or completed exceptionally if the request fails.
204+
*
205+
* @return a completion stage that yields the updated state of this session after
206+
* the cancellation request has been applied
207+
*
208+
* @see SessionState
209+
*/
210+
public CompletionStage<SessionState> cancel() {
211+
return channelPool.executeAsync(channel -> {
212+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
213+
return Futures.toCompletionStage(sessionsFutureStub.cancelSession(toCancelSessionRequest(sessionInfo.id())))
214+
.thenApply(response -> toSessionState(response.getSession()));
215+
});
216+
}
217+
218+
/**
219+
* Pauses this session in the ArmoniK cluster.
220+
* <p>
221+
* Pausing a session temporarily suspends the scheduling of new tasks in the session.
222+
* Tasks that are already running continue until completion, but pending tasks are
223+
* not picked up for execution until the session is resumed via {@link #resume()}.
224+
* <p>
225+
* This operation is useful when you need to temporarily throttle or stop processing
226+
* without cancelling the session or losing its state.
227+
* The returned completion stage is completed asynchronously with the updated
228+
* {@link SessionState} once the pause request has been processed by the Sessions
229+
* service, or completed exceptionally if the request fails.
230+
*
231+
* @return a completion stage that yields the updated state of this session after
232+
* the pause request has been applied
233+
*
234+
* @see #resume()
235+
* @see SessionState
236+
*/
237+
public CompletionStage<SessionState> pause() {
238+
return channelPool.executeAsync(channel -> {
239+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
240+
return Futures.toCompletionStage(sessionsFutureStub.pauseSession(toPauseSessionRequest(sessionInfo.id())))
241+
.thenApply(response -> toSessionState(response.getSession()));
242+
});
243+
}
244+
/**
245+
* Resumes a previously paused session in the ArmoniK cluster.
246+
* <p>
247+
* Resuming a session re-enables scheduling of tasks that were previously held
248+
* while the session was paused. Any pending tasks become eligible for execution
249+
* again according to the cluster scheduling policies.
250+
* <p>
251+
* Calling this method on a session that is not paused is safe; the Sessions service
252+
* will simply return the current state of the session.
253+
* The returned completion stage is completed asynchronously with the updated
254+
* {@link SessionState} once the resume request has been processed by the Sessions
255+
* service, or completed exceptionally if the request fails.
256+
*
257+
* @return a completion stage that yields the updated state of this session after
258+
* the resume request has been applied
259+
*
260+
* @see #pause()
261+
* @see SessionState
262+
*/
263+
public CompletionStage<SessionState> resume() {
264+
return channelPool.executeAsync(channel -> {
265+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
266+
return Futures.toCompletionStage(sessionsFutureStub.resumeSession(toResumeSessionRequest(sessionInfo.id())))
267+
.thenApply(response -> toSessionState(response.getSession()));
268+
});
269+
}
270+
271+
/**
272+
* Closes this session in the ArmoniK cluster.
273+
* <p>
274+
* Closing a session finalizes it and prevents any new task submissions, while
275+
* preserving existing tasks, results, and metadata. This is the recommended way
276+
* to indicate that no further work will be submitted for this session once all
277+
* expected tasks have been created.
278+
* <p>
279+
* Closing a session does not remove its data. To free up storage or completely
280+
* remove the session, combine this operation with {@link #purge()} and
281+
* {@link #delete()} as appropriate.
282+
* The returned completion stage is completed asynchronously with the updated
283+
* {@link SessionState} once the close request has been processed by the Sessions
284+
* service, or completed exceptionally if the request fails.
285+
*
286+
* @return a completion stage that yields the updated state of this session after
287+
* the close request has been applied
288+
*
289+
* @see #purge()
290+
* @see #delete()
291+
* @see SessionState
292+
*/
293+
public CompletionStage<SessionState> close() {
294+
return channelPool.executeAsync(channel -> {
295+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
296+
return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionInfo.id())))
297+
.thenApply(response -> toSessionState(response.getSession()));
298+
});
299+
}
300+
301+
/**
302+
* Purges this session's data in the ArmoniK cluster.
303+
* <p>
304+
* Purging a session removes the underlying data for its blobs (task inputs and
305+
* outputs) from the storage layer while keeping the session and task metadata.
306+
* This operation is useful to reclaim storage space once the actual data is no
307+
* longer needed but you still want to keep an audit trail or execution history.
308+
* <p>
309+
* After a purge, attempts to download blob data associated with this session
310+
* will fail, but session and task information remain available until the session
311+
* is deleted.
312+
* The returned completion stage is completed asynchronously with the updated
313+
* {@link SessionState} once the purge request has been processed by the Sessions
314+
* service, or completed exceptionally if the request fails.
315+
*
316+
* @return a completion stage that yields the updated state of this session after
317+
* the purge request has been applied
318+
*
319+
* @see #delete()
320+
* @see SessionState
321+
*/
322+
public CompletionStage<SessionState> purge() {
323+
return channelPool.executeAsync(channel -> {
324+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
325+
return Futures.toCompletionStage(sessionsFutureStub.purgeSession(toPurgeSessionRequest(sessionInfo.id())))
326+
.thenApply(response -> toSessionState(response.getSession()));
327+
});
328+
}
329+
330+
/**
331+
* Deletes this session from the ArmoniK cluster.
332+
* <p>
333+
* Deleting a session permanently removes its metadata from the Sessions, Tasks,
334+
* and Blobs. This is typically the final step in the lifecycle of a
335+
* session once it has been closed and, optionally, purged.
336+
* <p>
337+
* After deletion, this handle still exists as a local object but any further
338+
* interaction with the remote session (such as submitting tasks or querying
339+
* state) will fail because the session no longer exists in the cluster.
340+
* Clients are expected to discard the handle after deletion.
341+
* The returned completion stage is completed asynchronously with the updated
342+
* {@link SessionState} reported by the Sessions service just before removal,
343+
* or completed exceptionally if the request fails.
344+
*
345+
* @return a completion stage that yields the last known state of this session
346+
* before it is removed from the cluster
347+
*
348+
* @see #close()
349+
* @see #purge()
350+
* @see SessionState
351+
*/
352+
public CompletionStage<SessionState> delete() {
353+
return channelPool.executeAsync(channel -> {
354+
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
355+
return Futures.toCompletionStage(sessionsFutureStub.deleteSession(toDeleteSessionRequest(sessionInfo.id())))
356+
.thenApply(response -> toSessionState(response.getSession()));
357+
});
358+
}
359+
360+
361+
190362
private Function<ManagedChannel, CompletionStage<BlobInfo>> createBlobInfo(BlobDefinition blobDefinition) {
191363
return channel -> {
192364
var request = toResultMetaDataRequest(sessionInfo.id(), List.of(blobDefinition));
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright © 2025 ANEO (armonik@aneo.fr)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package fr.aneo.armonik.client;
17+
18+
import java.time.Duration;
19+
import java.time.Instant;
20+
import java.util.List;
21+
22+
/**
23+
* Immutable snapshot of a session's state in the ArmoniK cluster.
24+
* <p>
25+
* A {@code SessionState} instance mirrors the information returned by the Sessions service
26+
* for a given session. It aggregates the current lifecycle status, submission flags,
27+
* configuration and key timestamps into a single value object. Instances of this record
28+
* are typically obtained from session lifecycle operations such as
29+
* {@link SessionHandle#cancel()}, {@link SessionHandle#pause()}, {@link SessionHandle#resume()},
30+
* {@link SessionHandle#close()}, {@link SessionHandle#purge()} and {@link SessionHandle#delete()},
31+
* or from {@link ArmoniKClient#closeSession(SessionId)}.
32+
* <p>
33+
* Time-related fields may be {@code null} when the corresponding lifecycle transition
34+
* has not occurred yet. For example, {@code cancelledAt} is only populated when the
35+
* session has been cancelled, and {@code closedAt} is only populated after the session
36+
* has been closed. The {@code duration} field is generally set when the session is in a
37+
* terminal state (such as {@code CANCELLED} or {@code CLOSED}) and represents the elapsed
38+
* time between creation and termination as computed by the control plane.
39+
*
40+
* @param sessionId the unique identifier of the session
41+
* @param status the current lifecycle status of the session as reported by the cluster
42+
* @param clientSubmission whether clients are currently allowed to submit new tasks in this session
43+
* @param workerSubmission whether workers are currently allowed to submit tasks in this session
44+
* @param partitionIds the set of partition identifiers associated with this session; determines where
45+
* tasks may be scheduled
46+
* @param taskConfiguration the default task configuration applied to tasks created in this session
47+
* @param createdAt the instant at which the session was created in the cluster; never {@code null}
48+
* for a valid session
49+
* @param cancelledAt the instant at which the session was cancelled, or {@code null} if the session
50+
* has not been cancelled
51+
* @param closedAt the instant at which the session was closed, or {@code null} if the session
52+
* has not been closed
53+
* @param purgedAt the instant at which the session's data was purged from storage, or {@code null}
54+
* if no purge has been performed
55+
* @param deletedAt the instant at which the session was deleted from the control plane, or {@code null}
56+
* if the session still exists
57+
* @param duration the total duration of the session as computed by the control plane, typically set
58+
* when the session reaches a terminal state; may be {@code null} otherwise
59+
* @see SessionHandle
60+
* @see SessionStatus
61+
* @see TaskConfiguration
62+
*/
63+
public record SessionState(
64+
SessionId sessionId,
65+
SessionStatus status,
66+
boolean clientSubmission,
67+
boolean workerSubmission,
68+
List<String> partitionIds,
69+
TaskConfiguration taskConfiguration,
70+
Instant createdAt,
71+
Instant cancelledAt,
72+
Instant closedAt,
73+
Instant purgedAt,
74+
Instant deletedAt,
75+
Duration duration
76+
) {
77+
}

0 commit comments

Comments
 (0)