Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions cmd/soperatorchecks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,15 @@ func main() {
maintenanceIgnoreNodeLabels string
controllersFlag string

reconcileTimeout time.Duration
reconcileTimeoutPodEphemeralStorageCheck time.Duration
maxConcurrency int
maxConcurrencyPodEphemeralStorageCheck int
cacheSyncTimeout time.Duration
ephemeralStorageThreshold float64
ephemeralStorageResumeThreshold float64
requeueAfterSlurmNodes time.Duration
requeueAfterActiveCheck time.Duration
requeueAfterActiveCheckJob time.Duration
requeueAfterPodEphemeralStorageCheck time.Duration
maxConcurrency int
maxConcurrencyPodEphemeralStorageCheck int
cacheSyncTimeout time.Duration
ephemeralStorageThreshold float64
ephemeralStorageResumeThreshold float64
)

var watchNsCacheByName = make(map[string]cache.Config)
Expand All @@ -147,8 +149,10 @@ func main() {
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.StringVar(&logFormat, "log-format", "json", "Log format: plain or json")
flag.StringVar(&logLevel, "log-level", "debug", "Log level: debug, info, warn, error, dpanic, panic, fatal")
flag.DurationVar(&reconcileTimeout, "reconcile-timeout", 3*time.Minute, "The maximum duration allowed for a single reconcile")
flag.DurationVar(&reconcileTimeoutPodEphemeralStorageCheck, "pod-ephemeral-reconcile-timeout", 15*time.Second, "The maximum duration allowed for a single reconcile of Pod Ephemeral Storage Check")
flag.DurationVar(&requeueAfterSlurmNodes, "requeue-after-slurm-nodes", 3*time.Minute, "The duration after which SlurmNodesController will be requeued for reconciliation.")
flag.DurationVar(&requeueAfterActiveCheck, "requeue-after-activecheck", 10*time.Second, "The duration after which ActiveCheck will be requeued for reconciliation.")
flag.DurationVar(&requeueAfterActiveCheckJob, "requeue-after-activecheckjob", time.Minute, "The duration after which ActiveCheckJob will be requeued for reconciliation.")
flag.DurationVar(&requeueAfterPodEphemeralStorageCheck, "requeue-after-pod-ephemeral-storage-check", time.Minute, "The duration after which Pod Ephemeral Storage Check will be requeued for reconciliation.")
flag.IntVar(&maxConcurrency, "max-concurrent-reconciles", 1, "Configures number of concurrent reconciles. It should improve performance for clusters with many objects.")
flag.IntVar(&maxConcurrencyPodEphemeralStorageCheck, "pod-ephemeral-max-concurrent-reconciles", 10, "Configures number of concurrent reconciles for Pod Ephemeral Storage Check. It should improve performance for clusters with many pods.")
flag.DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 2*time.Minute, "The maximum duration allowed for caching sync")
Expand Down Expand Up @@ -293,7 +297,7 @@ func main() {
mgr.GetScheme(),
mgr.GetEventRecorderFor(soperatorchecks.SlurmNodesControllerName),
slurmAPIClients,
reconcileTimeout,
requeueAfterSlurmNodes,
enabledNodeReplacement,
enableExtensiveCheck,
mgr.GetAPIReader(),
Expand All @@ -320,7 +324,7 @@ func main() {
mgr.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor(soperatorchecks.SlurmActiveCheckControllerName),
reconcileTimeout,
requeueAfterActiveCheck,
).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil {
cli.Fail(setupLog, err, "unable to create activecheck controller", "controller", "ActiveCheck")
}
Expand All @@ -331,7 +335,7 @@ func main() {
mgr.GetScheme(),
mgr.GetEventRecorderFor(soperatorchecks.SlurmActiveCheckJobControllerName),
slurmAPIClients,
reconcileTimeout,
requeueAfterActiveCheckJob,
).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil {
cli.Fail(setupLog, err, "unable to create activecheckjob controller", "controller", "ActiveCheckJob")
}
Expand All @@ -341,7 +345,6 @@ func main() {
mgr.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor(soperatorchecks.SlurmChecksServiceAccountControllerName),
reconcileTimeout,
).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil {
cli.Fail(setupLog, err, "unable to create soperatorchecks serviceaccount controller", "controller", "ServiceAccount")
}
Expand All @@ -351,7 +354,6 @@ func main() {
mgr.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor(soperatorchecks.SlurmActiveCheckPrologControllerName),
reconcileTimeout,
).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil {
cli.Fail(setupLog, err, "unable to create soperatorchecks prolog controller", "controller", "Prolog")
}
Expand All @@ -363,7 +365,7 @@ func main() {
mgr.GetScheme(),
mgr.GetEventRecorderFor(soperatorchecks.PodEphemeralStorageCheckName),
ctrl.GetConfigOrDie(),
reconcileTimeoutPodEphemeralStorageCheck,
requeueAfterPodEphemeralStorageCheck,
ephemeralStorageThreshold,
ephemeralStorageResumeThreshold,
slurmAPIClients,
Expand Down
1 change: 1 addition & 0 deletions helm/slurm-cluster/slurm_scripts/pyxis_caching_importer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ readonly cmd="$1"
readonly cache_dir="${ENROOT_CONTAINER_IMAGES_CACHE_DIR:-/var/cache/enroot-container-images}"
readonly squashfs_temp_path="${cache_dir}/${SLURM_JOB_ID}.${SLURM_STEP_ID}.sqsh"
readonly lock_path="${squashfs_temp_path}.lock"
trap 'rm -f "${lock_path}"' EXIT

# Since it's not an ephemeral squashfs file, we can use compression.
export ENROOT_SQUASH_OPTIONS="-comp zstd -Xcompression-level 3 -b 1M"
Expand Down
5 changes: 4 additions & 1 deletion helm/soperatorchecks/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ checks:
args:
- --log-format=json
- --log-level=info
- --reconcile-timeout=3m
- --requeue-after-slurm-nodes=3m
- --requeue-after-activecheck=10s
- --requeue-after-activecheckjob=1m
- --requeue-after-pod-ephemeral-storage-check=1m
- --max-concurrent-reconciles=1
- --cache-sync-timeout=2m
- --not-ready-timeout=15m
Expand Down
4 changes: 2 additions & 2 deletions images/slurm_check_job/slurm_check_job_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ if [[ -z "$K8S_JOB_NAME" ]]; then
exit 1
fi

echo "Annotating Job $K8S_JOB_NAME with slurm-job-id=$SLURM_JOB_ID"
kubectl annotate job "$K8S_JOB_NAME" slurm-job-id="$SLURM_JOB_ID" \
echo "Annotating Job $K8S_JOB_NAME with slurm-job-id and unhandled-slurm-job-id = $SLURM_JOB_ID"
kubectl annotate job "$K8S_JOB_NAME" slurm-job-id="$SLURM_JOB_ID" unhandled-slurm-job-id="$SLURM_JOB_ID" \
-n "$K8S_POD_NAMESPACE" --overwrite || {
echo "Failed to annotate Job"
exit 1
Expand Down
14 changes: 7 additions & 7 deletions internal/controller/soperatorchecks/activecheck_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (

type ActiveCheckReconciler struct {
*reconciler.Reconciler
reconcileTimeout time.Duration
requeueAfter time.Duration

CronJob *reconciler.CronJobReconciler
ConfigMap *reconciler.ConfigMapReconciler
Expand All @@ -48,17 +48,17 @@ func NewActiveCheckController(
client client.Client,
scheme *runtime.Scheme,
recorder record.EventRecorder,
reconcileTimeout time.Duration,
requeueAfter time.Duration,
) *ActiveCheckReconciler {
r := reconciler.NewReconciler(client, scheme, recorder)
cronJobReconciler := reconciler.NewCronJobReconciler(r)
configMapReconciler := reconciler.NewConfigMapReconciler(r)

return &ActiveCheckReconciler{
Reconciler: r,
reconcileTimeout: reconcileTimeout,
CronJob: cronJobReconciler,
ConfigMap: configMapReconciler,
Reconciler: r,
requeueAfter: requeueAfter,
CronJob: cronJobReconciler,
ConfigMap: configMapReconciler,
}
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func (r *ActiveCheckReconciler) Reconcile(
if !dependenciesReady {
logger.Info("Not all dependencies are ready, requeueing in 10 seconds.")
return ctrl.Result{
RequeueAfter: time.Second * 10,
RequeueAfter: r.requeueAfter,
}, nil
}

Expand Down
83 changes: 54 additions & 29 deletions internal/controller/soperatorchecks/activecheck_jobs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"nebius.ai/slurm-operator/internal/slurmapi"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,6 +29,7 @@ import (
"nebius.ai/slurm-operator/internal/controller/reconciler"
"nebius.ai/slurm-operator/internal/controllerconfig"
"nebius.ai/slurm-operator/internal/logfield"
"nebius.ai/slurm-operator/internal/slurmapi"
)

var (
Expand All @@ -40,25 +40,25 @@ var (

type ActiveCheckJobReconciler struct {
*reconciler.Reconciler
Job *reconciler.JobReconciler
slurmAPIClients *slurmapi.ClientSet
reconcileTimeout time.Duration
Job *reconciler.JobReconciler
slurmAPIClients *slurmapi.ClientSet
requeueAfter time.Duration
}

func NewActiveCheckJobController(
client client.Client,
scheme *runtime.Scheme,
recorder record.EventRecorder,
slurmAPIClients *slurmapi.ClientSet,
reconcileTimeout time.Duration,
requeueAfter time.Duration,
) *ActiveCheckJobReconciler {
r := reconciler.NewReconciler(client, scheme, recorder)

return &ActiveCheckJobReconciler{
Reconciler: r,
Job: reconciler.NewJobReconciler(r),
slurmAPIClients: slurmAPIClients,
reconcileTimeout: reconcileTimeout,
Reconciler: r,
Job: reconciler.NewJobReconciler(r),
slurmAPIClients: slurmAPIClients,
requeueAfter: requeueAfter,
}
}

Expand Down Expand Up @@ -128,10 +128,15 @@ func (r *ActiveCheckJobReconciler) Reconcile(
return ctrl.Result{}, fmt.Errorf("get active check job: %w", err)
}

activeCheckName, err := r.getActiveCheckNameFromJob(ctx, k8sJob)
activeCheckName, found, err := r.getActiveCheckNameFromJob(ctx, k8sJob)
if err != nil {
return ctrl.Result{}, fmt.Errorf("get active check name: %w", err)
return ctrl.Result{}, err
}
// No need to return an error and reconcile again if the annotation is not found because it won't appear anyway.
if !found {
return ctrl.Result{}, nil
}

activeCheck := &slurmv1alpha1.ActiveCheck{}
err = r.Get(ctx, types.NamespacedName{
Namespace: req.Namespace,
Expand Down Expand Up @@ -174,7 +179,7 @@ func (r *ActiveCheckJobReconciler) Reconcile(
// is still running; processing it too early would be premature.
if status := getK8sJobStatus(k8sJob); status != consts.ActiveCheckK8sJobStatusComplete &&
status != consts.ActiveCheckK8sJobStatusFailed {
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil
return ctrl.Result{Requeue: true, RequeueAfter: r.requeueAfter}, nil
}

activeCheck.Status.SlurmJobsStatus = slurmv1alpha1.ActiveCheckSlurmJobsStatus{
Expand Down Expand Up @@ -231,13 +236,25 @@ func (r *ActiveCheckJobReconciler) Reconcile(
return ctrl.Result{}, fmt.Errorf("slurm cluster %v not found", slurmClusterName)
}

slurmJobIDs, ok := k8sJob.Annotations["slurm-job-id"]
allSlurmJobIDs, ok := k8sJob.Annotations["slurm-job-id"]
if !ok {
logger.Error(err, "failed to get slurm job id")
return ctrl.Result{}, err
}
ids := strings.Split(slurmJobIDs, ",")
firstJobId := ids[0]

oldUnhandledSlurmJobIDs, ok := k8sJob.Annotations["unhandled-slurm-job-id"]
if !ok {
// Most likely controller was updated, but the previous job is still running.
oldUnhandledSlurmJobIDs = allSlurmJobIDs
}

oldUnhandledIDs := strings.Split(oldUnhandledSlurmJobIDs, ",")
if oldUnhandledSlurmJobIDs == "" {
oldUnhandledIDs = []string{}
}
allIDs := strings.Split(allSlurmJobIDs, ",")
unhandledIds := make([]string, 0)
firstJobId := allIDs[0]

var jobName string
var submitTime *metav1.Time
Expand All @@ -261,11 +278,12 @@ func (r *ActiveCheckJobReconciler) Reconcile(
requeue := false
latestHandledFinalStateTime := lastHandledFinalStateTime
var notFoundInAccountingIDs []string
for _, slurmJobID := range ids {
for _, slurmJobID := range oldUnhandledIDs {
slurmJobs, err := slurmAPIClient.GetJobsByIDFromAccounting(ctx, slurmJobID)
if err != nil {
logger.Error(err, "failed to get slurm job status")
return ctrl.Result{}, err
logger.Error(err, "failed to get slurm job status", "slurm_job_id", slurmJobID)
unhandledIds = append(unhandledIds, slurmJobID)
continue
}

// slurmdbd may not yet have a record for a just-submitted job; the propagation
Expand All @@ -275,6 +293,7 @@ func (r *ActiveCheckJobReconciler) Reconcile(
// surfaces in logs instead of degrading silently into an infinite requeue.
if len(slurmJobs) == 0 {
notFoundInAccountingIDs = append(notFoundInAccountingIDs, slurmJobID)
unhandledIds = append(unhandledIds, slurmJobID)
requeue = true
continue
}
Expand All @@ -288,6 +307,7 @@ func (r *ActiveCheckJobReconciler) Reconcile(
// Job is not yet finished
if !slurmJob.IsTerminalState() || slurmJob.EndTime == nil {
requeue = true
unhandledIds = append(unhandledIds, slurmJobID)
continue
}

Expand Down Expand Up @@ -332,18 +352,20 @@ func (r *ActiveCheckJobReconciler) Reconcile(
logger.Info("Slurm jobs not found in accounting, will retry",
"count", len(notFoundInAccountingIDs),
"job_ids", notFoundInAccountingIDs,
"total", len(ids),
"total", len(oldUnhandledIDs),
)
}

k8sJobPatch := client.MergeFrom(k8sJob.DeepCopy())
k8sJob.Annotations["unhandled-slurm-job-id"] = strings.Join(unhandledIds, ",")
if latestHandledFinalStateTime > lastHandledFinalStateTime {
// Maybe we could delete the job because it will not be processed anymore
// Otherwise, we will have many of these jobs and they will keep being listed in every Reconcile()
k8sJobPatch := client.MergeFrom(k8sJob.DeepCopy())
k8sJob.Annotations[K8sAnnotationSoperatorChecksFinalStateTime] = fmt.Sprintf("%d", latestHandledFinalStateTime)
if err := r.Job.Patch(ctx, k8sJob, k8sJobPatch); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to patch k8s Job: %w", err)
}
}

if err := r.Job.Patch(ctx, k8sJob, k8sJobPatch); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to patch k8s Job: %w", err)
}

// Updating the ActiveCheck status is relevant for normal active checks
Expand Down Expand Up @@ -373,7 +395,7 @@ func (r *ActiveCheckJobReconciler) Reconcile(
}

if requeue {
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil
return ctrl.Result{Requeue: true, RequeueAfter: r.requeueAfter}, nil
}

} else if activeCheck.Spec.CheckType == "k8sJob" {
Expand Down Expand Up @@ -473,20 +495,23 @@ func updateSlurmNodeWithReactions(
return nil
}

func (r *ActiveCheckJobReconciler) getActiveCheckNameFromJob(ctx context.Context, k8sJob *batchv1.Job) (string, error) {
func (r *ActiveCheckJobReconciler) getActiveCheckNameFromJob(ctx context.Context, k8sJob *batchv1.Job) (string, bool, error) {
podList := &corev1.PodList{}
err := r.List(ctx, podList, client.InNamespace(k8sJob.Namespace), client.MatchingLabels{"job-name": k8sJob.Name})
if err != nil || len(podList.Items) == 0 {
return "", fmt.Errorf("failed to find pod for job %s: %w", k8sJob.Name, err)
if err != nil {
return "", false, fmt.Errorf("failed to find pod for job %s: %w", k8sJob.Name, err)
}
if len(podList.Items) == 0 {
return "", false, nil
}

pod := &podList.Items[0]
activeCheckName, ok := pod.Annotations[consts.AnnotationActiveCheckName]
if !ok {
return "", fmt.Errorf("annotation %s not found on pod %s", consts.AnnotationActiveCheckName, pod.Name)
return "", false, nil
}

return activeCheckName, nil
return activeCheckName, true, nil
}

func getK8sJobStatus(k8sJob *batchv1.Job) consts.ActiveCheckK8sJobStatus {
Expand Down
Loading
Loading