diff --git a/cmd/soperatorchecks/main.go b/cmd/soperatorchecks/main.go index ed5910d81..ef929a4f4 100644 --- a/cmd/soperatorchecks/main.go +++ b/cmd/soperatorchecks/main.go @@ -119,13 +119,15 @@ func main() { maintenanceIgnoreNodeLabels string controllersFlag string - reconcileTimeout time.Duration - reconcileTimeoutPodEphemeralStorageCheck time.Duration - maxConcurrency int - maxConcurrencyPodEphemeralStorageCheck int - cacheSyncTimeout time.Duration - ephemeralStorageThreshold float64 - ephemeralStorageResumeThreshold float64 + requeueAfterSlurmNodes time.Duration + requeueAfterActiveCheck time.Duration + requeueAfterActiveCheckJob time.Duration + requeueAfterPodEphemeralStorageCheck time.Duration + maxConcurrency int + maxConcurrencyPodEphemeralStorageCheck int + cacheSyncTimeout time.Duration + ephemeralStorageThreshold float64 + ephemeralStorageResumeThreshold float64 ) var watchNsCacheByName = make(map[string]cache.Config) @@ -147,8 +149,10 @@ func main() { "If set, HTTP/2 will be enabled for the metrics and webhook servers") flag.StringVar(&logFormat, "log-format", "json", "Log format: plain or json") flag.StringVar(&logLevel, "log-level", "debug", "Log level: debug, info, warn, error, dpanic, panic, fatal") - flag.DurationVar(&reconcileTimeout, "reconcile-timeout", 3*time.Minute, "The maximum duration allowed for a single reconcile") - flag.DurationVar(&reconcileTimeoutPodEphemeralStorageCheck, "pod-ephemeral-reconcile-timeout", 15*time.Second, "The maximum duration allowed for a single reconcile of Pod Ephemeral Storage Check") + flag.DurationVar(&requeueAfterSlurmNodes, "requeue-after-slurm-nodes", 3*time.Minute, "The duration after which SlurmNodesController will be requeued for reconciliation.") + flag.DurationVar(&requeueAfterActiveCheck, "requeue-after-activecheck", 10*time.Second, "The duration after which ActiveCheck will be requeued for reconciliation.") + flag.DurationVar(&requeueAfterActiveCheckJob, "requeue-after-activecheckjob", time.Minute, "The duration after which ActiveCheckJob will be requeued for reconciliation.") + flag.DurationVar(&requeueAfterPodEphemeralStorageCheck, "requeue-after-pod-ephemeral-storage-check", time.Minute, "The duration after which Pod Ephemeral Storage Check will be requeued for reconciliation.") flag.IntVar(&maxConcurrency, "max-concurrent-reconciles", 1, "Configures number of concurrent reconciles. It should improve performance for clusters with many objects.") flag.IntVar(&maxConcurrencyPodEphemeralStorageCheck, "pod-ephemeral-max-concurrent-reconciles", 10, "Configures number of concurrent reconciles for Pod Ephemeral Storage Check. It should improve performance for clusters with many pods.") flag.DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 2*time.Minute, "The maximum duration allowed for caching sync") @@ -293,7 +297,7 @@ func main() { mgr.GetScheme(), mgr.GetEventRecorderFor(soperatorchecks.SlurmNodesControllerName), slurmAPIClients, - reconcileTimeout, + requeueAfterSlurmNodes, enabledNodeReplacement, enableExtensiveCheck, mgr.GetAPIReader(), @@ -320,7 +324,7 @@ func main() { mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor(soperatorchecks.SlurmActiveCheckControllerName), - reconcileTimeout, + requeueAfterActiveCheck, ).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil { cli.Fail(setupLog, err, "unable to create activecheck controller", "controller", "ActiveCheck") } @@ -331,7 +335,7 @@ func main() { mgr.GetScheme(), mgr.GetEventRecorderFor(soperatorchecks.SlurmActiveCheckJobControllerName), slurmAPIClients, - reconcileTimeout, + requeueAfterActiveCheckJob, ).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil { cli.Fail(setupLog, err, "unable to create activecheckjob controller", "controller", "ActiveCheckJob") } @@ -341,7 +345,6 @@ func main() { mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor(soperatorchecks.SlurmChecksServiceAccountControllerName), - reconcileTimeout, ).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil { cli.Fail(setupLog, err, "unable to create soperatorchecks serviceaccount controller", "controller", "ServiceAccount") } @@ -351,7 +354,6 @@ func main() { mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor(soperatorchecks.SlurmActiveCheckPrologControllerName), - reconcileTimeout, ).SetupWithManager(mgr, maxConcurrency, cacheSyncTimeout); err != nil { cli.Fail(setupLog, err, "unable to create soperatorchecks prolog controller", "controller", "Prolog") } @@ -363,7 +365,7 @@ func main() { mgr.GetScheme(), mgr.GetEventRecorderFor(soperatorchecks.PodEphemeralStorageCheckName), ctrl.GetConfigOrDie(), - reconcileTimeoutPodEphemeralStorageCheck, + requeueAfterPodEphemeralStorageCheck, ephemeralStorageThreshold, ephemeralStorageResumeThreshold, slurmAPIClients, diff --git a/helm/slurm-cluster/slurm_scripts/pyxis_caching_importer.sh b/helm/slurm-cluster/slurm_scripts/pyxis_caching_importer.sh index 949859382..d48f28ea4 100644 --- a/helm/slurm-cluster/slurm_scripts/pyxis_caching_importer.sh +++ b/helm/slurm-cluster/slurm_scripts/pyxis_caching_importer.sh @@ -8,6 +8,7 @@ readonly cmd="$1" readonly cache_dir="${ENROOT_CONTAINER_IMAGES_CACHE_DIR:-/var/cache/enroot-container-images}" readonly squashfs_temp_path="${cache_dir}/${SLURM_JOB_ID}.${SLURM_STEP_ID}.sqsh" readonly lock_path="${squashfs_temp_path}.lock" +trap 'rm -f "${lock_path}"' EXIT # Since it's not an ephemeral squashfs file, we can use compression. export ENROOT_SQUASH_OPTIONS="-comp zstd -Xcompression-level 3 -b 1M" diff --git a/helm/soperatorchecks/values.yaml b/helm/soperatorchecks/values.yaml index ef5fe44b5..e94c1ac77 100644 --- a/helm/soperatorchecks/values.yaml +++ b/helm/soperatorchecks/values.yaml @@ -25,7 +25,10 @@ checks: args: - --log-format=json - --log-level=info - - --reconcile-timeout=3m + - --requeue-after-slurm-nodes=3m + - --requeue-after-activecheck=10s + - --requeue-after-activecheckjob=1m + - --requeue-after-pod-ephemeral-storage-check=1m - --max-concurrent-reconciles=1 - --cache-sync-timeout=2m - --not-ready-timeout=15m diff --git a/images/slurm_check_job/slurm_check_job_entrypoint.sh b/images/slurm_check_job/slurm_check_job_entrypoint.sh index 2dfebeac9..83d3ce9da 100644 --- a/images/slurm_check_job/slurm_check_job_entrypoint.sh +++ b/images/slurm_check_job/slurm_check_job_entrypoint.sh @@ -123,8 +123,8 @@ if [[ -z "$K8S_JOB_NAME" ]]; then exit 1 fi -echo "Annotating Job $K8S_JOB_NAME with slurm-job-id=$SLURM_JOB_ID" -kubectl annotate job "$K8S_JOB_NAME" slurm-job-id="$SLURM_JOB_ID" \ +echo "Annotating Job $K8S_JOB_NAME with slurm-job-id and unhandled-slurm-job-id = $SLURM_JOB_ID" +kubectl annotate job "$K8S_JOB_NAME" slurm-job-id="$SLURM_JOB_ID" unhandled-slurm-job-id="$SLURM_JOB_ID" \ -n "$K8S_POD_NAMESPACE" --overwrite || { echo "Failed to annotate Job" exit 1 diff --git a/internal/controller/soperatorchecks/activecheck_controller.go b/internal/controller/soperatorchecks/activecheck_controller.go index e5edbc927..3cd02ab25 100644 --- a/internal/controller/soperatorchecks/activecheck_controller.go +++ b/internal/controller/soperatorchecks/activecheck_controller.go @@ -38,7 +38,7 @@ var ( type ActiveCheckReconciler struct { *reconciler.Reconciler - reconcileTimeout time.Duration + requeueAfter time.Duration CronJob *reconciler.CronJobReconciler ConfigMap *reconciler.ConfigMapReconciler @@ -48,17 +48,17 @@ func NewActiveCheckController( client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, - reconcileTimeout time.Duration, + requeueAfter time.Duration, ) *ActiveCheckReconciler { r := reconciler.NewReconciler(client, scheme, recorder) cronJobReconciler := reconciler.NewCronJobReconciler(r) configMapReconciler := reconciler.NewConfigMapReconciler(r) return &ActiveCheckReconciler{ - Reconciler: r, - reconcileTimeout: reconcileTimeout, - CronJob: cronJobReconciler, - ConfigMap: configMapReconciler, + Reconciler: r, + requeueAfter: requeueAfter, + CronJob: cronJobReconciler, + ConfigMap: configMapReconciler, } } @@ -159,7 +159,7 @@ func (r *ActiveCheckReconciler) Reconcile( if !dependenciesReady { logger.Info("Not all dependencies are ready, requeueing in 10 seconds.") return ctrl.Result{ - RequeueAfter: time.Second * 10, + RequeueAfter: r.requeueAfter, }, nil } diff --git a/internal/controller/soperatorchecks/activecheck_jobs_controller.go b/internal/controller/soperatorchecks/activecheck_jobs_controller.go index 1c20b5620..acc424ce9 100644 --- a/internal/controller/soperatorchecks/activecheck_jobs_controller.go +++ b/internal/controller/soperatorchecks/activecheck_jobs_controller.go @@ -17,7 +17,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" - "nebius.ai/slurm-operator/internal/slurmapi" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,6 +29,7 @@ import ( "nebius.ai/slurm-operator/internal/controller/reconciler" "nebius.ai/slurm-operator/internal/controllerconfig" "nebius.ai/slurm-operator/internal/logfield" + "nebius.ai/slurm-operator/internal/slurmapi" ) var ( @@ -40,9 +40,9 @@ var ( type ActiveCheckJobReconciler struct { *reconciler.Reconciler - Job *reconciler.JobReconciler - slurmAPIClients *slurmapi.ClientSet - reconcileTimeout time.Duration + Job *reconciler.JobReconciler + slurmAPIClients *slurmapi.ClientSet + requeueAfter time.Duration } func NewActiveCheckJobController( @@ -50,15 +50,15 @@ func NewActiveCheckJobController( scheme *runtime.Scheme, recorder record.EventRecorder, slurmAPIClients *slurmapi.ClientSet, - reconcileTimeout time.Duration, + requeueAfter time.Duration, ) *ActiveCheckJobReconciler { r := reconciler.NewReconciler(client, scheme, recorder) return &ActiveCheckJobReconciler{ - Reconciler: r, - Job: reconciler.NewJobReconciler(r), - slurmAPIClients: slurmAPIClients, - reconcileTimeout: reconcileTimeout, + Reconciler: r, + Job: reconciler.NewJobReconciler(r), + slurmAPIClients: slurmAPIClients, + requeueAfter: requeueAfter, } } @@ -128,10 +128,15 @@ func (r *ActiveCheckJobReconciler) Reconcile( return ctrl.Result{}, fmt.Errorf("get active check job: %w", err) } - activeCheckName, err := r.getActiveCheckNameFromJob(ctx, k8sJob) + activeCheckName, found, err := r.getActiveCheckNameFromJob(ctx, k8sJob) if err != nil { - return ctrl.Result{}, fmt.Errorf("get active check name: %w", err) + return ctrl.Result{}, err } + // No need to return an error and reconcile again if the annotation is not found because it won't appear anyway. + if !found { + return ctrl.Result{}, nil + } + activeCheck := &slurmv1alpha1.ActiveCheck{} err = r.Get(ctx, types.NamespacedName{ Namespace: req.Namespace, @@ -174,7 +179,7 @@ func (r *ActiveCheckJobReconciler) Reconcile( // is still running; processing it too early would be premature. if status := getK8sJobStatus(k8sJob); status != consts.ActiveCheckK8sJobStatusComplete && status != consts.ActiveCheckK8sJobStatusFailed { - return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil + return ctrl.Result{Requeue: true, RequeueAfter: r.requeueAfter}, nil } activeCheck.Status.SlurmJobsStatus = slurmv1alpha1.ActiveCheckSlurmJobsStatus{ @@ -231,13 +236,25 @@ func (r *ActiveCheckJobReconciler) Reconcile( return ctrl.Result{}, fmt.Errorf("slurm cluster %v not found", slurmClusterName) } - slurmJobIDs, ok := k8sJob.Annotations["slurm-job-id"] + allSlurmJobIDs, ok := k8sJob.Annotations["slurm-job-id"] if !ok { logger.Error(err, "failed to get slurm job id") return ctrl.Result{}, err } - ids := strings.Split(slurmJobIDs, ",") - firstJobId := ids[0] + + oldUnhandledSlurmJobIDs, ok := k8sJob.Annotations["unhandled-slurm-job-id"] + if !ok { + // Most likely controller was updated, but the previous job is still running. + oldUnhandledSlurmJobIDs = allSlurmJobIDs + } + + oldUnhandledIDs := strings.Split(oldUnhandledSlurmJobIDs, ",") + if oldUnhandledSlurmJobIDs == "" { + oldUnhandledIDs = []string{} + } + allIDs := strings.Split(allSlurmJobIDs, ",") + unhandledIds := make([]string, 0) + firstJobId := allIDs[0] var jobName string var submitTime *metav1.Time @@ -261,11 +278,12 @@ func (r *ActiveCheckJobReconciler) Reconcile( requeue := false latestHandledFinalStateTime := lastHandledFinalStateTime var notFoundInAccountingIDs []string - for _, slurmJobID := range ids { + for _, slurmJobID := range oldUnhandledIDs { slurmJobs, err := slurmAPIClient.GetJobsByIDFromAccounting(ctx, slurmJobID) if err != nil { - logger.Error(err, "failed to get slurm job status") - return ctrl.Result{}, err + logger.Error(err, "failed to get slurm job status", "slurm_job_id", slurmJobID) + unhandledIds = append(unhandledIds, slurmJobID) + continue } // slurmdbd may not yet have a record for a just-submitted job; the propagation @@ -275,6 +293,7 @@ func (r *ActiveCheckJobReconciler) Reconcile( // surfaces in logs instead of degrading silently into an infinite requeue. if len(slurmJobs) == 0 { notFoundInAccountingIDs = append(notFoundInAccountingIDs, slurmJobID) + unhandledIds = append(unhandledIds, slurmJobID) requeue = true continue } @@ -288,6 +307,7 @@ func (r *ActiveCheckJobReconciler) Reconcile( // Job is not yet finished if !slurmJob.IsTerminalState() || slurmJob.EndTime == nil { requeue = true + unhandledIds = append(unhandledIds, slurmJobID) continue } @@ -332,18 +352,20 @@ func (r *ActiveCheckJobReconciler) Reconcile( logger.Info("Slurm jobs not found in accounting, will retry", "count", len(notFoundInAccountingIDs), "job_ids", notFoundInAccountingIDs, - "total", len(ids), + "total", len(oldUnhandledIDs), ) } + k8sJobPatch := client.MergeFrom(k8sJob.DeepCopy()) + k8sJob.Annotations["unhandled-slurm-job-id"] = strings.Join(unhandledIds, ",") if latestHandledFinalStateTime > lastHandledFinalStateTime { // Maybe we could delete the job because it will not be processed anymore // Otherwise, we will have many of these jobs and they will keep being listed in every Reconcile() - k8sJobPatch := client.MergeFrom(k8sJob.DeepCopy()) k8sJob.Annotations[K8sAnnotationSoperatorChecksFinalStateTime] = fmt.Sprintf("%d", latestHandledFinalStateTime) - if err := r.Job.Patch(ctx, k8sJob, k8sJobPatch); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to patch k8s Job: %w", err) - } + } + + if err := r.Job.Patch(ctx, k8sJob, k8sJobPatch); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to patch k8s Job: %w", err) } // Updating the ActiveCheck status is relevant for normal active checks @@ -373,7 +395,7 @@ func (r *ActiveCheckJobReconciler) Reconcile( } if requeue { - return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil + return ctrl.Result{Requeue: true, RequeueAfter: r.requeueAfter}, nil } } else if activeCheck.Spec.CheckType == "k8sJob" { @@ -473,20 +495,23 @@ func updateSlurmNodeWithReactions( return nil } -func (r *ActiveCheckJobReconciler) getActiveCheckNameFromJob(ctx context.Context, k8sJob *batchv1.Job) (string, error) { +func (r *ActiveCheckJobReconciler) getActiveCheckNameFromJob(ctx context.Context, k8sJob *batchv1.Job) (string, bool, error) { podList := &corev1.PodList{} err := r.List(ctx, podList, client.InNamespace(k8sJob.Namespace), client.MatchingLabels{"job-name": k8sJob.Name}) - if err != nil || len(podList.Items) == 0 { - return "", fmt.Errorf("failed to find pod for job %s: %w", k8sJob.Name, err) + if err != nil { + return "", false, fmt.Errorf("failed to find pod for job %s: %w", k8sJob.Name, err) + } + if len(podList.Items) == 0 { + return "", false, nil } pod := &podList.Items[0] activeCheckName, ok := pod.Annotations[consts.AnnotationActiveCheckName] if !ok { - return "", fmt.Errorf("annotation %s not found on pod %s", consts.AnnotationActiveCheckName, pod.Name) + return "", false, nil } - return activeCheckName, nil + return activeCheckName, true, nil } func getK8sJobStatus(k8sJob *batchv1.Job) consts.ActiveCheckK8sJobStatus { diff --git a/internal/controller/soperatorchecks/activecheck_jobs_controller_test.go b/internal/controller/soperatorchecks/activecheck_jobs_controller_test.go index 9c3ea2f8d..e24eaf619 100644 --- a/internal/controller/soperatorchecks/activecheck_jobs_controller_test.go +++ b/internal/controller/soperatorchecks/activecheck_jobs_controller_test.go @@ -77,7 +77,8 @@ func TestActiveCheckJobReconciler_Reconcile_DoesNotFinalizeUntilAllSlurmJobsFini consts.LabelComponentKey: consts.ComponentTypeSoperatorChecks.String(), }, Annotations: map[string]string{ - "slurm-job-id": firstSlurmJobID + "," + nextSlurmJobID, + "unhandled-slurm-job-id": firstSlurmJobID + "," + nextSlurmJobID, + "slurm-job-id": firstSlurmJobID + "," + nextSlurmJobID, }, }, } @@ -114,7 +115,7 @@ func TestActiveCheckJobReconciler_Reconcile_DoesNotFinalizeUntilAllSlurmJobsFini EndTime: &firstEndTime, }}, nil }). - Twice() + Once() mockClient.EXPECT(). GetJobsByIDFromAccounting(mock.Anything, nextSlurmJobID). @@ -303,6 +304,7 @@ func TestActiveCheckJobReconciler_Reconcile_SlurmJobAggregatesTerminalResultsInS scheme := newActiveCheckJobTestScheme(t) activeCheck, cronJob, k8sJob, pod := newActiveCheckJobTestObjects("gpu-check", "gpu-check-123", "slurmJob") + k8sJob.Annotations["unhandled-slurm-job-id"] = "101,102,103,104" k8sJob.Annotations["slurm-job-id"] = "101,102,103,104" submitTime := metav1.NewTime(time.Date(2026, time.April, 13, 10, 0, 0, 0, time.UTC)) @@ -378,6 +380,7 @@ func TestActiveCheckJobReconciler_Reconcile_SlurmJobAccumulatesTerminalResultsAc scheme := newActiveCheckJobTestScheme(t) activeCheck, cronJob, k8sJob, pod := newActiveCheckJobTestObjects("gpu-check", "gpu-check-123", "slurmJob") + k8sJob.Annotations["unhandled-slurm-job-id"] = "101,102" k8sJob.Annotations["slurm-job-id"] = "101,102" submitTime := metav1.NewTime(time.Date(2026, time.April, 13, 10, 0, 0, 0, time.UTC)) @@ -399,7 +402,7 @@ func TestActiveCheckJobReconciler_Reconcile_SlurmJobAccumulatesTerminalResultsAc EndTime: &failedEndTime, }}, nil }). - Twice() + Once() mockClient.EXPECT(). GetJobsByIDFromAccounting(mock.Anything, "102"). @@ -464,6 +467,7 @@ func TestActiveCheckJobReconciler_Reconcile_FailedSlurmJobWithoutReactionsOnlyUp scheme := newActiveCheckJobTestScheme(t) activeCheck, cronJob, k8sJob, pod := newActiveCheckJobTestObjects("gpu-check", "gpu-check-123", "slurmJob") + k8sJob.Annotations["unhandled-slurm-job-id"] = "101" k8sJob.Annotations["slurm-job-id"] = "101" submitTime := metav1.NewTime(time.Date(2026, time.April, 13, 10, 0, 0, 0, time.UTC)) @@ -505,6 +509,7 @@ func TestActiveCheckJobReconciler_Reconcile_SlurmJobNotYetVisibleInAccountingReq scheme := newActiveCheckJobTestScheme(t) activeCheck, cronJob, k8sJob, pod := newActiveCheckJobTestObjects("gpu-check", "gpu-check-123", "slurmJob") + k8sJob.Annotations["unhandled-slurm-job-id"] = "101" k8sJob.Annotations["slurm-job-id"] = "101" mockClient := slurmapifake.NewMockClient(t) @@ -541,6 +546,7 @@ func TestActiveCheckJobReconciler_Reconcile_FailedSlurmJobExecutesCommentReactio CommentPrefix: "[node_problem]", }, } + k8sJob.Annotations["unhandled-slurm-job-id"] = "101" k8sJob.Annotations["slurm-job-id"] = "101" submitTime := metav1.NewTime(time.Date(2026, time.April, 13, 10, 0, 0, 0, time.UTC)) diff --git a/internal/controller/soperatorchecks/activecheck_prolog_controller.go b/internal/controller/soperatorchecks/activecheck_prolog_controller.go index 1bab0b1f2..cb3da11ba 100644 --- a/internal/controller/soperatorchecks/activecheck_prolog_controller.go +++ b/internal/controller/soperatorchecks/activecheck_prolog_controller.go @@ -9,8 +9,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" - slurmv1 "nebius.ai/slurm-operator/api/v1" - "nebius.ai/slurm-operator/internal/consts" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -18,6 +16,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + slurmv1 "nebius.ai/slurm-operator/api/v1" + "nebius.ai/slurm-operator/internal/consts" + "nebius.ai/slurm-operator/api/v1alpha1" "nebius.ai/slurm-operator/internal/controller/reconciler" "nebius.ai/slurm-operator/internal/controllerconfig" @@ -33,20 +34,17 @@ var ( type ActiveCheckPrologReconciler struct { *reconciler.Reconciler - reconcileTimeout time.Duration } func NewActiveCheckPrologController( client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, - reconcileTimeout time.Duration, ) *ActiveCheckPrologReconciler { r := reconciler.NewReconciler(client, scheme, recorder) return &ActiveCheckPrologReconciler{ - Reconciler: r, - reconcileTimeout: reconcileTimeout, + Reconciler: r, } } diff --git a/internal/controller/soperatorchecks/activecheck_skipped_test.go b/internal/controller/soperatorchecks/activecheck_skipped_test.go index 4de105b95..370cdcf5c 100644 --- a/internal/controller/soperatorchecks/activecheck_skipped_test.go +++ b/internal/controller/soperatorchecks/activecheck_skipped_test.go @@ -99,9 +99,9 @@ func TestActiveCheckJobReconciler_SkippedAnnotation(t *testing.T) { baseReconciler := reconciler.NewReconciler(client, scheme, record.NewFakeRecorder(10)) r := &ActiveCheckJobReconciler{ - Reconciler: baseReconciler, - Job: reconciler.NewJobReconciler(baseReconciler), - reconcileTimeout: 30 * time.Second, + Reconciler: baseReconciler, + Job: reconciler.NewJobReconciler(baseReconciler), + requeueAfter: 30 * time.Second, } req := ctrl.Request{NamespacedName: types.NamespacedName{Name: jobName, Namespace: ns}} @@ -183,9 +183,9 @@ func TestActiveCheckJobReconciler_SkippedAnnotation_Idempotent(t *testing.T) { baseReconciler := reconciler.NewReconciler(client, scheme, record.NewFakeRecorder(10)) r := &ActiveCheckJobReconciler{ - Reconciler: baseReconciler, - Job: reconciler.NewJobReconciler(baseReconciler), - reconcileTimeout: 30 * time.Second, + Reconciler: baseReconciler, + Job: reconciler.NewJobReconciler(baseReconciler), + requeueAfter: 30 * time.Second, } req := ctrl.Request{NamespacedName: types.NamespacedName{Name: jobName, Namespace: ns}} diff --git a/internal/controller/soperatorchecks/pod_ephemeral_storage_check.go b/internal/controller/soperatorchecks/pod_ephemeral_storage_check.go index 6636b3029..3fbda1d84 100644 --- a/internal/controller/soperatorchecks/pod_ephemeral_storage_check.go +++ b/internal/controller/soperatorchecks/pod_ephemeral_storage_check.go @@ -26,6 +26,7 @@ import ( api "github.com/SlinkyProject/slurm-client/api/v0041" kruisev1b1 "github.com/openkruise/kruise-api/apps/v1beta1" + slurmv1 "nebius.ai/slurm-operator/api/v1" "nebius.ai/slurm-operator/internal/consts" "nebius.ai/slurm-operator/internal/controller/reconciler" @@ -78,12 +79,12 @@ type EphemeralStorageInfo struct { type PodEphemeralStorageCheck struct { *reconciler.Reconciler - reconcileTimeout time.Duration - clientset kubernetes.Interface - restConfig *rest.Config - usageThreshold float64 - resumeThreshold float64 - slurmAPIClients *slurmapi.ClientSet + requeueAfter time.Duration + clientset kubernetes.Interface + restConfig *rest.Config + usageThreshold float64 + resumeThreshold float64 + slurmAPIClients *slurmapi.ClientSet } func NewPodEphemeralStorageCheck( @@ -91,7 +92,7 @@ func NewPodEphemeralStorageCheck( scheme *runtime.Scheme, recorder record.EventRecorder, restConfig *rest.Config, - reconcileTimeout time.Duration, + requeueAfter time.Duration, usageThreshold float64, resumeThreshold float64, slurmAPIClients *slurmapi.ClientSet, @@ -104,13 +105,13 @@ func NewPodEphemeralStorageCheck( } return &PodEphemeralStorageCheck{ - Reconciler: r, - reconcileTimeout: reconcileTimeout, - clientset: clientset, - restConfig: restConfig, - usageThreshold: usageThreshold, - resumeThreshold: resumeThreshold, - slurmAPIClients: slurmAPIClients, + Reconciler: r, + requeueAfter: requeueAfter, + clientset: clientset, + restConfig: restConfig, + usageThreshold: usageThreshold, + resumeThreshold: resumeThreshold, + slurmAPIClients: slurmAPIClients, }, nil } @@ -178,8 +179,8 @@ func (r *PodEphemeralStorageCheck) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, fmt.Errorf("reconciling Pod Ephemeral Storage Check: %w", err) } - logger.Info("Pod Ephemeral Storage Check completed successfully") - return ctrl.Result{RequeueAfter: r.reconcileTimeout}, nil + logger.V(1).Info("Pod Ephemeral Storage Check completed successfully") + return ctrl.Result{RequeueAfter: r.requeueAfter}, nil } func (r *PodEphemeralStorageCheck) isPodRelevant(pod *corev1.Pod) bool { @@ -591,7 +592,7 @@ func (r *PodEphemeralStorageCheck) ensureNodeCache( nc := r.slurmAPIClients.EnsureNodeCache( slurmClusterNamespacedName, - r.reconcileTimeout, + r.requeueAfter, log.Log.WithName("NodeCache").WithValues("cluster", slurmClusterNamespacedName), ) if nc == nil { diff --git a/internal/controller/soperatorchecks/pod_ephemeral_storage_check_test.go b/internal/controller/soperatorchecks/pod_ephemeral_storage_check_test.go index 300a8cf3d..307d00b2e 100644 --- a/internal/controller/soperatorchecks/pod_ephemeral_storage_check_test.go +++ b/internal/controller/soperatorchecks/pod_ephemeral_storage_check_test.go @@ -547,11 +547,11 @@ func TestGetEphemeralStorageStatsFromNode(t *testing.T) { } controller := &PodEphemeralStorageCheck{ - clientset: fakeClientset, - restConfig: restConfig, - usageThreshold: 80.0, - resumeThreshold: 75.0, - reconcileTimeout: time.Minute, + clientset: fakeClientset, + restConfig: restConfig, + usageThreshold: 80.0, + resumeThreshold: 75.0, + requeueAfter: time.Minute, } // Note: This test will fail because we can't easily mock the kubernetes REST client diff --git a/internal/controller/soperatorchecks/serviceaccount_controller.go b/internal/controller/soperatorchecks/serviceaccount_controller.go index 14edf04da..746f39c91 100644 --- a/internal/controller/soperatorchecks/serviceaccount_controller.go +++ b/internal/controller/soperatorchecks/serviceaccount_controller.go @@ -33,7 +33,6 @@ var ( type ServiceAccountReconciler struct { *reconciler.Reconciler - reconcileTimeout time.Duration ServiceAccount *reconciler.ServiceAccountReconciler Role *reconciler.RoleReconciler @@ -44,7 +43,6 @@ func NewServiceAccountController( client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, - reconcileTimeout time.Duration, ) *ServiceAccountReconciler { r := reconciler.NewReconciler(client, scheme, recorder) serviceAccountReconciler := reconciler.NewServiceAccountReconciler(r) @@ -52,11 +50,10 @@ func NewServiceAccountController( roleBindingReconciler := reconciler.NewRoleBindingReconciler(r) return &ServiceAccountReconciler{ - Reconciler: r, - reconcileTimeout: reconcileTimeout, - ServiceAccount: serviceAccountReconciler, - Role: roleReconciler, - RoleBinding: roleBindingReconciler, + Reconciler: r, + ServiceAccount: serviceAccountReconciler, + Role: roleReconciler, + RoleBinding: roleBindingReconciler, } } diff --git a/internal/controller/soperatorchecks/slurm_nodes_controller.go b/internal/controller/soperatorchecks/slurm_nodes_controller.go index 70bd8e5ef..5c5a20a8c 100644 --- a/internal/controller/soperatorchecks/slurm_nodes_controller.go +++ b/internal/controller/soperatorchecks/slurm_nodes_controller.go @@ -36,7 +36,7 @@ const ( type SlurmNodesController struct { *reconciler.Reconciler slurmAPIClients *slurmapi.ClientSet - reconcileTimeout time.Duration + requeueAfter time.Duration enabledNodeReplacement bool enableExtensiveCheck bool apiReader client.Reader // Direct API reader for pagination @@ -48,7 +48,7 @@ func NewSlurmNodesController( scheme *runtime.Scheme, recorder record.EventRecorder, slurmAPIClients *slurmapi.ClientSet, - reconcileTimeout time.Duration, + requeueAfter time.Duration, enabledNodeReplacement bool, enableExtensiveCheck bool, apiReader client.Reader, @@ -63,7 +63,7 @@ func NewSlurmNodesController( return &SlurmNodesController{ Reconciler: r, slurmAPIClients: slurmAPIClients, - reconcileTimeout: reconcileTimeout, + requeueAfter: requeueAfter, enabledNodeReplacement: enabledNodeReplacement, enableExtensiveCheck: enableExtensiveCheck, apiReader: apiReader, @@ -127,7 +127,7 @@ func (c *SlurmNodesController) Reconcile(ctx context.Context, req ctrl.Request) // Set RequeueAfter so SlurmNodesController can perform periodical checks against // slurm nodes to find degraded nodes and k8s nodes to find maintenance. - return ctrl.Result{RequeueAfter: c.reconcileTimeout}, nil + return ctrl.Result{RequeueAfter: c.requeueAfter}, nil } // TODO: filter slurmNodes by supported slurm clusters