Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 145 additions & 3 deletions armonik-client/src/main/java/fr/aneo/armonik/client/ArmoniKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

import fr.aneo.armonik.api.grpc.v1.sessions.SessionsGrpc;
import fr.aneo.armonik.client.definition.SessionDefinition;
import fr.aneo.armonik.client.exception.ArmoniKException;
import fr.aneo.armonik.client.internal.concurrent.Futures;
import io.grpc.ManagedChannel;

import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.CompletionStage;

import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toCreateSessionRequest;
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.*;
import static fr.aneo.armonik.client.internal.grpc.mappers.SessionMapper.toSessionState;
import static fr.aneo.armonik.client.internal.grpc.mappers.TaskMapper.toTaskConfiguration;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -104,8 +110,144 @@ public ArmoniKClient(ArmoniKConfig armoniKConfig) {
public SessionHandle openSession(SessionDefinition sessionDefinition) {
requireNonNull(sessionDefinition, "sessionDefinition must not be null");

var id = createSession(sessionDefinition);
return new SessionHandle(id, sessionDefinition, channelPool);
var sessionInfo = createSession(sessionDefinition);
return new SessionHandle(sessionInfo, sessionDefinition.outputListener(), channelPool);
}

/**
* Connects to an existing session in the ArmoniK cluster.
* <p>
* This method retrieves the session metadata associated with the given {@link SessionId}
* and returns a {@link SessionHandle} that can be used to submit tasks and monitor
* output blob completions
* <p>
* If a {@link BlobCompletionListener} is provided, the returned {@code SessionHandle}
* will automatically start listening for output blob completion events for this
* session.
*
* <p><strong>Error Handling:</strong>
* If the session does not exist, this method throws an {@link ArmoniKException}
*
* @param sessionId the identifier of the existing session to connect to
* @param outputListener an optional listener for output blob completion events;
* if {@code null}, no listener is registered
* @return a {@link SessionHandle} bound to the existing session
* @throws NullPointerException if {@code sessionId} is {@code null}
* @throws ArmoniKException if the session does not exist, or if a communication error occurs
* @see SessionHandle
* @see BlobCompletionListener
*/
public SessionHandle getSession(SessionId sessionId, BlobCompletionListener outputListener) {
requireNonNull(sessionId, "sessionId must not be null");

try {
var sessionResponse = channelPool.execute(
channel -> SessionsGrpc.newBlockingStub(channel).getSession(toGetSessionRequest(sessionId))
);
if (!sessionResponse.hasSession()) throw new ArmoniKException("Session not found");

var sessionInfo = new SessionInfo(
sessionId,
new HashSet<>(sessionResponse.getSession().getPartitionIdsList()),
toTaskConfiguration(sessionResponse.getSession().getOptions())
);

return new SessionHandle(sessionInfo, outputListener, channelPool);
} catch (Exception e) {
throw new ArmoniKException(e.getMessage(), e);
}
}

/**
* Connects to an existing session in the ArmoniK cluster without registering
* an output listener.
* <p>
* This is a convenience overload equivalent to calling:
* <pre>{@code
* getSession(sessionId, null);
* }</pre>
*
* <p><strong>Error Handling:</strong>
* If the session does not exist, this method throws an {@link ArmoniKException}.
*
* @param sessionId the identifier of the existing session to connect to
* @return a {@link SessionHandle} for interacting with the session
* @throws NullPointerException if {@code sessionId} is {@code null}
* @throws ArmoniKException if the session does not exist, or if a communication error occurs
* @see #getSession(SessionId, BlobCompletionListener)
*/
public SessionHandle getSession(SessionId sessionId) {
return getSession(sessionId, null);
}

/**
* Closes a session in the ArmoniK cluster by its identifier.
* <p>
* A closed session no longer accepts new task submissions, but all existing tasks, results,
* and metadata remain available in the control plane. Closing a session is the recommended
* finalization step once no additional tasks will be submitted.
* <p>
* This client-level method is provided as a convenience when the caller does not
* hold a {@link SessionHandle}. When a {@code SessionHandle} instance is available,
* prefer calling {@link SessionHandle#close()} so that all lifecycle-related
* operations remain grouped on the handle itself.
* <p>
* The returned {@link CompletionStage} completes asynchronously with the updated
* {@link SessionState} returned by the Sessions service, or completes exceptionally
* if the request fails or the session does not exist.
*
* @param sessionId the identifier of the session to close; must not be {@code null}
* @return a completion stage yielding the updated state of the session after the
* close operation has been applied
* @throws NullPointerException if {@code sessionId} is {@code null}
* @see SessionHandle#close()
* @see SessionState
*/
public CompletionStage<SessionState> closeSession(SessionId sessionId) {
requireNonNull(sessionId, "sessionId must not be null");

return channelPool.executeAsync(channel -> {
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
return Futures.toCompletionStage(sessionsFutureStub.closeSession(toCloseSessionRequest(sessionId)))
.thenApply(response -> toSessionState(response.getSession()));
});
}

/**
* Cancels a session in the ArmoniK cluster by its identifier.
* <p>
* Cancelling a session instructs the control plane to stop all remaining work associated
* with the session. Depending on the scheduling and execution state, running tasks may be
* interrupted, and queued tasks will no longer be scheduled.
* <p>
* A cancelled session still retains its metadata, and completed task results remain
* accessible unless the session is later purged or deleted. Cancellation is useful
* when the overall workflow must be terminated early due to failure or user decision.
* <p>
* This client-level convenience method is intended for scenarios where the caller
* does not have a {@link SessionHandle} instance. When such a handle is available,
* prefer calling {@link SessionHandle#cancel()} to keep lifecycle operations grouped
* with the associated session.
* <p>
* The returned {@link CompletionStage} completes asynchronously with the updated
* {@link SessionState} returned by the Sessions service, or completes exceptionally
* if the request fails or the session does not exist.
*
* @param sessionId the identifier of the session to cancel; must not be {@code null}
* @return a completion stage yielding the updated state of the session after the
* cancellation operation has been applied
* @throws NullPointerException if {@code sessionId} is {@code null}
* @see SessionHandle#cancel()
* @see SessionState
*/
public CompletionStage<SessionState> cancelSession(SessionId sessionId) {
requireNonNull(sessionId, "sessionId must not be null");

return channelPool.executeAsync(channel -> {
var sessionsFutureStub = SessionsGrpc.newFutureStub(channel);
return Futures.toCompletionStage(sessionsFutureStub.cancelSession(toCancelSessionRequest(sessionId)))
.thenApply(response -> toSessionState(response.getSession()));
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
* @see BlobCompletionCoordinator
* @see SessionDefinition
*/
public record BatchingPolicy(
record BatchingPolicy(
int batchSize,
Duration maxDelay,
int maxConcurrentBatches,
Expand All @@ -94,7 +94,7 @@ public record BatchingPolicy(
*
* @see SessionDefinition
*/
public static final BatchingPolicy DEFAULT = new BatchingPolicy(50, Duration.ofMillis(200), 5, 100);
static final BatchingPolicy DEFAULT = new BatchingPolicy(50, Duration.ofMillis(200), 5, 100);

/**
* Validates all policy parameters and their relationships.
Expand All @@ -110,7 +110,7 @@ public record BatchingPolicy(
* or {@code capPerBatch} is {@code <= 0}
* @throws IllegalArgumentException if {@code maxDelay} is {@code null}, zero, or negative
*/
public BatchingPolicy {
BatchingPolicy {
if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize must be > 0, got: " + batchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ final class BlobCompletionCoordinator {
* @see BlobCompletionListener
* @see BlobCompletionEventWatcher
*/
BlobCompletionCoordinator(SessionId sessionId, ChannelPool channelPool, SessionDefinition sessionDefinition) {
BlobCompletionCoordinator(SessionId sessionId, ChannelPool channelPool, BlobCompletionListener outputListener) {
this(
sessionId,
new BlobCompletionEventWatcher(sessionId, channelPool, sessionDefinition.outputListener()),
sessionDefinition.outputBatchingPolicy(),
new BlobCompletionEventWatcher(sessionId, channelPool, outputListener),
BatchingPolicy.DEFAULT,
Schedulers.shared()
);
}
Expand All @@ -116,7 +116,7 @@ final class BlobCompletionCoordinator {
) {
this.sessionId = requireNonNull(sessionId, "sessionId must not be null");
this.watcher = requireNonNull(watcher, "watcher must not be null");
this.batchingPolicy = requireNonNull(batchingPolicy, "batchingPolicy must not be null");
this.batchingPolicy = requireNonNullElse(batchingPolicy, BatchingPolicy.DEFAULT);
this.scheduler = requireNonNull(scheduler, "scheduler must not be null");
this.permits = new Semaphore(batchingPolicy.maxConcurrentBatches());
}
Expand Down
Loading
Loading