diff --git a/api/external/cinder/messages.go b/api/external/cinder/messages.go index 260e93815..08fe6edee 100644 --- a/api/external/cinder/messages.go +++ b/api/external/cinder/messages.go @@ -30,8 +30,11 @@ type ExternalSchedulerRequest struct { Weights map[string]float64 `json:"weights"` // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + // Options configure the pipeline behavior for this scheduling call. + Options lib.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/ironcore/messages.go b/api/external/ironcore/messages.go index ac517f61a..05797e15a 100644 --- a/api/external/ironcore/messages.go +++ b/api/external/ironcore/messages.go @@ -13,8 +13,11 @@ import ( type MachinePipelineRequest struct { // The available machine pools. Pools []ironcorev1alpha1.MachinePool `json:"pools"` + // Options configure the pipeline behavior for this scheduling call. + Options lib.Options `json:"options,omitempty"` } +func (r MachinePipelineRequest) GetOptions() lib.Options { return r.Options } func (r MachinePipelineRequest) GetHosts() []string { hosts := make([]string, len(r.Pools)) for i, host := range r.Pools { diff --git a/api/external/manila/messages.go b/api/external/manila/messages.go index 5255a0d4f..013fa70fb 100644 --- a/api/external/manila/messages.go +++ b/api/external/manila/messages.go @@ -30,8 +30,11 @@ type ExternalSchedulerRequest struct { Weights map[string]float64 `json:"weights"` // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + // Options configure the pipeline behavior for this scheduling call. + Options lib.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/nova/messages.go b/api/external/nova/messages.go index e82568941..202f85cf1 100644 --- a/api/external/nova/messages.go +++ b/api/external/nova/messages.go @@ -37,8 +37,14 @@ type ExternalSchedulerRequest struct { // The name of the pipeline to execute. Pipeline string `json:"pipeline"` + + // Options configure the pipeline behavior for this scheduling call. + // Set by the caller (CR controller, failover controller, Nova). + // Nova does not set these; Cortex fills in config-derived defaults server-side. + Options lib.Options `json:"options,omitempty"` } +func (r ExternalSchedulerRequest) GetOptions() lib.Options { return r.Options } func (r ExternalSchedulerRequest) GetHosts() []string { hosts := make([]string, len(r.Hosts)) for i, host := range r.Hosts { diff --git a/api/external/pods/messages.go b/api/external/pods/messages.go index 3ec329b39..0b5466415 100644 --- a/api/external/pods/messages.go +++ b/api/external/pods/messages.go @@ -15,8 +15,11 @@ type PodPipelineRequest struct { Nodes []corev1.Node `json:"nodes"` // The pod to be scheduled. Pod corev1.Pod `json:"pod"` + // Options configure the pipeline behavior for this scheduling call. + Options lib.Options `json:"options,omitempty"` } +func (r PodPipelineRequest) GetOptions() lib.Options { return r.Options } func (r PodPipelineRequest) GetHosts() []string { hosts := make([]string, len(r.Nodes)) for i, host := range r.Nodes { diff --git a/internal/scheduling/lib/filter_weigher_pipeline.go b/internal/scheduling/lib/filter_weigher_pipeline.go index ee769433d..d12b566ad 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline.go +++ b/internal/scheduling/lib/filter_weigher_pipeline.go @@ -19,6 +19,7 @@ import ( type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface { // Run the scheduling pipeline with the given request. + // Call-time options are read from request.GetOptions(). Run(request RequestType) (v1alpha1.DecisionResult, error) } @@ -263,6 +264,10 @@ func (s *filterWeigherPipeline[RequestType]) sortHostsByWeights(weights map[stri // Evaluate the pipeline and return a list of hosts in order of preference. func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1.DecisionResult, error) { + opts := request.GetOptions() + if err := opts.Validate(); err != nil { + return v1alpha1.DecisionResult{}, err + } slogArgs := request.GetTraceLogArgs() slogArgsAny := make([]any, 0, len(slogArgs)) for _, arg := range slogArgs { @@ -297,6 +302,21 @@ func (p *filterWeigherPipeline[RequestType]) Run(request RequestType) (v1alpha1. hosts := p.sortHostsByWeights(outWeights) traceLog.Info("scheduler: sorted hosts", "hosts", hosts) + if opts.MaxCandidates > 0 && len(hosts) > opts.MaxCandidates { + traceLog.Info("scheduler: trimming candidate list", "maxCandidates", opts.MaxCandidates, "before", len(hosts)) + hosts = hosts[:opts.MaxCandidates] + // Drop trimmed hosts from outWeights so AggregatedOutWeights stays consistent. + kept := make(map[string]struct{}, len(hosts)) + for _, h := range hosts { + kept[h] = struct{}{} + } + for host := range outWeights { + if _, ok := kept[host]; !ok { + delete(outWeights, host) + } + } + } + // Collect some metrics about the pipeline execution. go p.monitor.observePipelineResult(request, hosts) diff --git a/internal/scheduling/lib/filter_weigher_pipeline_request.go b/internal/scheduling/lib/filter_weigher_pipeline_request.go index 26688c358..69a9522e5 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_request.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_request.go @@ -21,4 +21,6 @@ type FilterWeigherPipelineRequest interface { // Get logging args to be used in the step's trace log. // Usually, this will be the request context including the request ID. GetTraceLogArgs() []slog.Attr + // Get the call-time options for this pipeline run. + GetOptions() Options } diff --git a/internal/scheduling/lib/filter_weigher_pipeline_request_test.go b/internal/scheduling/lib/filter_weigher_pipeline_request_test.go index 87ab0d786..765752a45 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_request_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_request_test.go @@ -11,6 +11,7 @@ type mockFilterWeigherPipelineRequest struct { Hosts []string Weights map[string]float64 Pipeline string + Options Options } func (m mockFilterWeigherPipelineRequest) GetWeightKeys() []string { return m.WeightKeys } @@ -18,6 +19,7 @@ func (m mockFilterWeigherPipelineRequest) GetTraceLogArgs() []slog.Attr { retu func (m mockFilterWeigherPipelineRequest) GetHosts() []string { return m.Hosts } func (m mockFilterWeigherPipelineRequest) GetWeights() map[string]float64 { return m.Weights } func (m mockFilterWeigherPipelineRequest) GetPipeline() string { return m.Pipeline } +func (m mockFilterWeigherPipelineRequest) GetOptions() Options { return m.Options } func (m mockFilterWeigherPipelineRequest) Filter(hosts map[string]float64) FilterWeigherPipelineRequest { filteredHosts := make([]string, 0, len(hosts)) diff --git a/internal/scheduling/lib/filter_weigher_pipeline_step.go b/internal/scheduling/lib/filter_weigher_pipeline_step.go index 26dc5de40..54816519c 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_step.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_step.go @@ -30,6 +30,8 @@ type FilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest] interfa // // A traceLog is provided that contains the global request id and should // be used to log the step's execution. + // + // Per-call options are available via request.GetOptions(). Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) } diff --git a/internal/scheduling/lib/filter_weigher_pipeline_test.go b/internal/scheduling/lib/filter_weigher_pipeline_test.go index 0e2775944..a110aeec3 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_test.go @@ -7,6 +7,7 @@ import ( "context" "log/slog" "math" + "slices" "testing" "github.com/cobaltcore-dev/cortex/api/v1alpha1" @@ -372,3 +373,54 @@ func TestFilterWeigherPipelineMonitor_SubPipeline(t *testing.T) { t.Error("original monitor should not be modified") } } + +func TestPipeline_MaxCandidates(t *testing.T) { + // Pipeline that passes all 4 hosts with descending weights. + pipeline := &filterWeigherPipeline[mockFilterWeigherPipelineRequest]{ + filters: map[string]Filter[mockFilterWeigherPipelineRequest]{}, + filtersOrder: []string{}, + weighersOrder: []string{}, + weighers: map[string]Weigher[mockFilterWeigherPipelineRequest]{}, + } + request := mockFilterWeigherPipelineRequest{ + Hosts: []string{"host1", "host2", "host3", "host4"}, + Weights: map[string]float64{"host1": 4.0, "host2": 3.0, "host3": 2.0, "host4": 1.0}, + } + + tests := []struct { + name string + maxCandidates int + wantLen int + wantFirst string + }{ + {"no limit", 0, 4, "host1"}, + {"limit to 2", 2, 2, "host1"}, + {"limit to 1", 1, 1, "host1"}, + {"limit larger than hosts", 10, 4, "host1"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := request + req.Options = Options{MaxCandidates: tt.maxCandidates} + result, err := pipeline.Run(req) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if len(result.OrderedHosts) != tt.wantLen { + t.Errorf("expected %d hosts, got %d: %v", tt.wantLen, len(result.OrderedHosts), result.OrderedHosts) + } + if len(result.OrderedHosts) > 0 && result.OrderedHosts[0] != tt.wantFirst { + t.Errorf("expected first host %s, got %s", tt.wantFirst, result.OrderedHosts[0]) + } + if tt.maxCandidates > 0 && len(result.OrderedHosts) <= tt.maxCandidates { + // AggregatedOutWeights must only contain returned hosts. + for host := range result.AggregatedOutWeights { + if !slices.Contains(result.OrderedHosts, host) { + t.Errorf("AggregatedOutWeights contains trimmed host %s", host) + } + } + } + }) + } +} diff --git a/internal/scheduling/lib/options.go b/internal/scheduling/lib/options.go new file mode 100644 index 000000000..c4e43080b --- /dev/null +++ b/internal/scheduling/lib/options.go @@ -0,0 +1,46 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package lib + +import ( + "errors" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" +) + +// Options configure the behavior of a single pipeline run at call time. +// These are distinct from per-step YAML options (FilterWeigherPipelineStepOpts), +// which are static and set when the pipeline is initialized. +type Options struct { + // ReadOnly means the pipeline run does not modify shared scheduling state (reservations, + // history, inflight records). Concurrent read-only runs are safe under a shared read lock. + // Note: the controller may still write the Decision status after Run() regardless of this flag. + ReadOnly bool + // LockReservations prevents reservation unlocking, e.g. in the capacity filter. + // Set when finding hosts for new reservations (failover, CR) to see true available capacity. + LockReservations bool + // AssumeEmptyHosts treats all hosts as having no running VMs. + AssumeEmptyHosts bool + // IgnoredReservationTypes lists reservation types the capacity filter skips entirely. + IgnoredReservationTypes []v1alpha1.ReservationType + // MaxCandidates limits the number of hosts returned after weighing. 0 means no limit. + MaxCandidates int + + // RecordHistory records the placement decision in placement history. + // Replaces pipeline.Spec.CreateHistory once pipelines consolidate. + RecordHistory bool + // CreateInflight creates pessimistic blocking reservations for all returned candidates. + CreateInflight bool +} + +// Validate checks for mutually exclusive or inconsistent option combinations. +func (o Options) Validate() error { + if o.ReadOnly && o.RecordHistory { + return errors.New("ReadOnly and RecordHistory are mutually exclusive: read-only runs must not write scheduling history") + } + if o.ReadOnly && o.CreateInflight { + return errors.New("ReadOnly and CreateInflight are mutually exclusive: read-only runs must not create inflight reservations") + } + return nil +} diff --git a/internal/scheduling/lib/options_test.go b/internal/scheduling/lib/options_test.go new file mode 100644 index 000000000..6eb366b0c --- /dev/null +++ b/internal/scheduling/lib/options_test.go @@ -0,0 +1,34 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package lib + +import "testing" + +func TestOptions_Validate(t *testing.T) { + tests := []struct { + name string + opts Options + wantErr bool + }{ + {"zero value is valid", Options{}, false}, + {"write run with history", Options{RecordHistory: true}, false}, + {"write run with inflight", Options{CreateInflight: true}, false}, + {"read-only run, no side effects", Options{ReadOnly: true}, false}, + {"ReadOnly + RecordHistory is invalid", Options{ReadOnly: true, RecordHistory: true}, true}, + {"ReadOnly + CreateInflight is invalid", Options{ReadOnly: true, CreateInflight: true}, true}, + {"ReadOnly + both invalid", Options{ReadOnly: true, RecordHistory: true, CreateInflight: true}, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.opts.Validate() + if tt.wantErr && err == nil { + t.Error("expected error, got nil") + } + if !tt.wantErr && err != nil { + t.Errorf("expected no error, got %v", err) + } + }) + } +} diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller.go b/internal/scheduling/nova/filter_weigher_pipeline_controller.go index 279ac1c3e..40252fdc2 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "sync" "time" @@ -38,8 +37,9 @@ type FilterWeigherPipelineController struct { // Toolbox shared between all pipeline controllers. lib.BasePipelineController[lib.FilterWeigherPipeline[api.ExternalSchedulerRequest]] - // Mutex to only allow one process at a time - processMu sync.Mutex + // Mutex to only allow one process at a time. + // Read-only runs (opts.ReadOnly == true) acquire a read lock; write runs acquire the full lock. + processMu sync.RWMutex // Monitor to pass down to all pipelines. Monitor lib.FilterWeigherPipelineMonitor @@ -54,13 +54,23 @@ func (c *FilterWeigherPipelineController) PipelineType() v1alpha1.PipelineType { // Callback executed when kubernetes asks to reconcile a decision resource. func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - c.processMu.Lock() - defer c.processMu.Unlock() - + // Peek at the decision before acquiring the lock so we can choose the right lock type. + // Read-only runs can proceed concurrently; write runs need the exclusive lock. decision := &v1alpha1.Decision{} if err := c.Get(ctx, req.NamespacedName, decision); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + if c.peekReadOnly(decision) { + c.processMu.RLock() + defer c.processMu.RUnlock() + } else { + c.processMu.Lock() + defer c.processMu.Unlock() + // Re-fetch after acquiring the exclusive lock to see consistent state. + if err := c.Get(ctx, req.NamespacedName, decision); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } old := decision.DeepCopy() if err := c.process(ctx, decision); err != nil { return ctrl.Result{}, err @@ -74,13 +84,16 @@ func (c *FilterWeigherPipelineController) Reconcile(ctx context.Context, req ctr // Process the decision from the API. Should create and return the updated decision. func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context.Context, decision *v1alpha1.Decision) error { - c.processMu.Lock() - defer c.processMu.Unlock() - - pipelineConf, ok := c.PipelineConfigs[decision.Spec.PipelineRef.Name] - if !ok { - return fmt.Errorf("pipeline %s not configured", decision.Spec.PipelineRef.Name) + // Read-only runs share the cached decision state; no re-fetch needed because they + // don't observe writes from concurrent exclusive-lock runs. + if c.peekReadOnly(decision) { + c.processMu.RLock() + defer c.processMu.RUnlock() + } else { + c.processMu.Lock() + defer c.processMu.Unlock() } + err := c.process(ctx, decision) if err != nil { meta.SetStatusCondition(&decision.Status.Conditions, metav1.Condition{ @@ -97,9 +110,6 @@ func (c *FilterWeigherPipelineController) ProcessNewDecisionFromAPI(ctx context. Message: "pipeline run succeeded", }) } - if pipelineConf.Spec.CreateHistory { - c.upsertHistory(ctx, decision, err) - } return err } @@ -166,7 +176,14 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision log.Info("gathered all placement candidates", "numHosts", len(request.Hosts)) } + // Fill RecordHistory from config if the caller didn't set it. + if !request.Options.RecordHistory { + request.Options.RecordHistory = pipelineConf.Spec.CreateHistory + } result, err := pipeline.Run(request) + if request.Options.RecordHistory { + c.upsertHistory(ctx, decision, err) + } if err != nil { log.Error(err, "failed to run pipeline") return err @@ -182,7 +199,19 @@ func (c *FilterWeigherPipelineController) process(ctx context.Context, decision return nil } -// The base controller will delegate the pipeline creation down to this method. +// peekReadOnly determines whether a decision should use a read lock instead of +// the exclusive write lock. Defaults to false (exclusive) on any parse error. +func (c *FilterWeigherPipelineController) peekReadOnly(decision *v1alpha1.Decision) bool { + if decision.Spec.NovaRaw == nil { + return false + } + var request api.ExternalSchedulerRequest + if err := json.Unmarshal(decision.Spec.NovaRaw.Raw, &request); err != nil { + return false + } + return request.Options.ReadOnly +} + func (c *FilterWeigherPipelineController) InitPipeline( ctx context.Context, p v1alpha1.Pipeline, diff --git a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go index 752725df8..031527b84 100644 --- a/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go +++ b/internal/scheduling/nova/filter_weigher_pipeline_controller_test.go @@ -528,7 +528,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) expectResult: false, expectHistoryCreated: false, expectUpdatedStatus: false, - errorContains: "pipeline nonexistent-pipeline not configured", + errorContains: "pipeline not found or not ready", }, { name: "decision without novaRaw spec", @@ -573,7 +573,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "no novaRaw spec defined", }, @@ -611,7 +611,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "pipeline not found or not ready", }, @@ -649,7 +649,7 @@ func TestFilterWeigherPipelineController_ProcessNewDecisionFromAPI(t *testing.T) createHistory: true, expectError: true, expectResult: false, - expectHistoryCreated: true, + expectHistoryCreated: false, expectUpdatedStatus: false, errorContains: "pipeline not found or not ready", }, @@ -928,3 +928,74 @@ func TestFilterWeigherPipelineController_IgnorePreselection(t *testing.T) { // Error variable for testing var errGathererFailed = errors.New("gatherer failed") + +func TestFilterWeigherPipelineController_PeekReadOnly(t *testing.T) { + makeRaw := func(readOnly bool) []byte { + r := api.ExternalSchedulerRequest{ + Spec: api.NovaObject[api.NovaSpec]{Data: api.NovaSpec{NumInstances: 1}}, + Options: lib.Options{ReadOnly: readOnly}, + } + raw, err := json.Marshal(r) + if err != nil { + panic(err) + } + return raw + } + + c := &FilterWeigherPipelineController{} + + tests := []struct { + name string + decision *v1alpha1.Decision + want bool + }{ + { + name: "nil NovaRaw defaults to exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + }, + }, + want: false, + }, + { + name: "invalid JSON defaults to exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: []byte("not-json")}, + }, + }, + want: false, + }, + { + name: "ReadOnly=false uses exclusive lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: makeRaw(false)}, + }, + }, + want: false, + }, + { + name: "ReadOnly=true uses read lock", + decision: &v1alpha1.Decision{ + Spec: v1alpha1.DecisionSpec{ + PipelineRef: corev1.ObjectReference{Name: "test-pipeline"}, + NovaRaw: &runtime.RawExtension{Raw: makeRaw(true)}, + }, + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := c.peekReadOnly(tt.decision) + if got != tt.want { + t.Errorf("expected peekReadOnly = %v, got %v", tt.want, got) + } + }) + } +} diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index 88e2f07d5..b01976a93 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -58,6 +58,7 @@ type FilterHasEnoughCapacity struct { // // Please also note that disk space is currently not considered by this filter. func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) { + opts := request.GetOptions() result := s.IncludeAllHostsFromRequest(request) // This map holds the free resources per host. @@ -106,7 +107,8 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa } // Check if this reservation type should be ignored - if slices.Contains(s.Options.IgnoredReservationTypes, reservation.Spec.Type) { + if slices.Contains(s.Options.IgnoredReservationTypes, reservation.Spec.Type) || + slices.Contains(opts.IgnoredReservationTypes, reservation.Spec.Type) { traceLog.Debug("ignoring reservation type", "type", reservation.Spec.Type, "reservation", reservation.Name) continue } @@ -122,18 +124,14 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa // Check if this is a CR reservation scheduling request. // If so, we should NOT unlock any CR reservations to prevent overbooking. // CR capacity should only be unlocked for actual VM scheduling. - intent, err := request.GetIntent() switch { - case err == nil && intent == api.ReserveForCommittedResourceIntent: - traceLog.Debug("keeping CR reservation locked for CR reservation scheduling", + case opts.LockReservations || s.Options.LockReserved: + traceLog.Debug("keeping CR reservation locked", "reservation", reservation.Name, - "intent", intent) + "lockReservations", opts.LockReservations, + "lockReserved", s.Options.LockReserved) // Don't continue - fall through to block the resources - case !s.Options.LockReserved && - // For committed resource reservations: unlock resources only if: - // 1. Project ID matches - // 2. ResourceGroup matches the flavor's hw_version - reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && + case reservation.Spec.CommittedResourceReservation.ProjectID == request.Spec.Data.ProjectID && reservation.Spec.CommittedResourceReservation.ResourceGroup == request.Spec.Data.Flavor.Data.ExtraSpecs["hw_version"]: traceLog.Info("unlocking resources reserved by matching committed resource reservation with allocation", "reservation", reservation.Name, diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go index 5b026408f..f6f3689b9 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity_test.go @@ -4,6 +4,7 @@ package filters import ( + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" "log/slog" "testing" @@ -807,6 +808,44 @@ func TestFilterHasEnoughCapacity_IgnoredReservationTypes(t *testing.T) { } } +func TestFilterHasEnoughCapacity_IgnoredReservationTypes_CallTime(t *testing.T) { + scheme := buildTestScheme(t) + + // Same two-host setup as the YAML-path test: CR on host1, Failover on host2. + // Each blocks 4 CPU, leaving 4 free; request needs 8 CPU so both hosts fail without ignoring. + hypervisors := []*hv1.Hypervisor{ + newHypervisor("host1", "16", "8", "32Gi", "16Gi"), + newHypervisor("host2", "16", "8", "32Gi", "16Gi"), + } + reservations := []*v1alpha1.Reservation{ + newCommittedReservation("cr-res", "host1", "project-X", "m1.large", "gp-1", "4", "8Gi", nil, nil), + newFailoverReservation("failover-res", "host2", "4", "8Gi", map[string]string{"other-vm": "host3"}), + } + request := newNovaRequest("instance-123", "project-A", "m1.large", "gp-1", 8, "16Gi", false, []string{"host1", "host2"}) + + objects := make([]client.Object, 0, len(hypervisors)+len(reservations)) + for _, h := range hypervisors { + objects = append(objects, h.DeepCopy()) + } + for _, r := range reservations { + objects = append(objects, r.DeepCopy()) + } + + step := &FilterHasEnoughCapacity{} + step.Client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build() + step.Options = FilterHasEnoughCapacityOpts{LockReserved: true} // no YAML-level ignores + + // Call-time: ignore CR reservations → host1 passes, host2 still blocked by failover. + request.Options = lib.Options{ + IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource}, + } + result, err := step.Run(slog.Default(), request) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + assertActivations(t, result.Activations, []string{"host1"}, []string{"host2"}) +} + func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) { scheme := buildTestScheme(t) @@ -819,6 +858,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) reservations []*v1alpha1.Reservation request api.ExternalSchedulerRequest opts FilterHasEnoughCapacityOpts + pipelineOpts lib.Options expectedHosts []string filteredHosts []string }{ @@ -834,8 +874,9 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) }, // Request with reserve_for_committed_resource intent (scheduling a new CR reservation) request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1", "host2"}), - opts: FilterHasEnoughCapacityOpts{LockReserved: false}, // Note: LockReserved is false, but intent overrides - expectedHosts: []string{"host2"}, // host1 blocked because existing-cr stays locked + opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: lib.Options{LockReservations: true}, + expectedHosts: []string{"host2"}, // host1 blocked because existing-cr stays locked filteredHosts: []string{"host1"}, }, { @@ -867,6 +908,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) // Request with reserve_for_committed_resource intent request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1", "host2"}), opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: lib.Options{LockReservations: true}, expectedHosts: []string{"host2"}, filteredHosts: []string{"host1"}, // host1 blocked by other project's reservation (would be blocked anyway) }, @@ -885,6 +927,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) // After blocking all 3 reservations (24 CPU), only 8 CPU free -> should fail request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 10, "20Gi", "reserve_for_committed_resource", false, []string{"host1"}), opts: FilterHasEnoughCapacityOpts{LockReserved: false}, + pipelineOpts: lib.Options{LockReservations: true}, expectedHosts: []string{}, filteredHosts: []string{"host1"}, // All reservations stay locked, not enough capacity }, @@ -916,13 +959,14 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) newCommittedReservation("existing-cr", "host1", "project-A", "m1.large", "gp-1", "8", "16Gi", nil, nil), }, // Request with reserve_for_committed_resource intent - // IgnoredReservationTypes is a safety flag that overrides everything, including intent + // IgnoredReservationTypes is a safety flag that overrides everything, including LockReservations request: newNovaRequestWithIntent("new-reservation-uuid", "project-A", "m1.large", "gp-1", 4, "8Gi", "reserve_for_committed_resource", false, []string{"host1"}), opts: FilterHasEnoughCapacityOpts{ LockReserved: false, // IgnoredReservationTypes is a safety override - ignores CR even for CR scheduling IgnoredReservationTypes: []v1alpha1.ReservationType{v1alpha1.ReservationTypeCommittedResource}, }, + pipelineOpts: lib.Options{LockReservations: true}, expectedHosts: []string{"host1"}, // CR reservation is ignored via IgnoredReservationTypes (safety override) filteredHosts: []string{}, }, @@ -960,6 +1004,7 @@ func TestFilterHasEnoughCapacity_ReserveForCommittedResourceIntent(t *testing.T) step := &FilterHasEnoughCapacity{} step.Client = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build() step.Options = tt.opts + tt.request.Options = tt.pipelineOpts result, err := step.Run(slog.Default(), tt.request) if err != nil { diff --git a/internal/scheduling/reservations/commitments/reservation_controller.go b/internal/scheduling/reservations/commitments/reservation_controller.go index b65842c60..5d9ab7483 100644 --- a/internal/scheduling/reservations/commitments/reservation_controller.go +++ b/internal/scheduling/reservations/commitments/reservation_controller.go @@ -24,6 +24,7 @@ import ( schedulerdelegationapi "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" "github.com/cobaltcore-dev/cortex/pkg/multicluster" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -286,8 +287,17 @@ func (r *CommitmentReservationController) Reconcile(ctx context.Context, req ctr "_nova_check_type": string(schedulerdelegationapi.ReserveForCommittedResourceIntent), }, } + scheduleOpts := lib.Options{ + ReadOnly: false, // mutates state (reservation placement) + LockReservations: true, // don't unlock CR reservations; finding a slot, not placing a VM + AssumeEmptyHosts: false, + IgnoredReservationTypes: nil, + MaxCandidates: 1, + RecordHistory: false, + CreateInflight: false, // not a VM placement; no pessimistic blocking needed + } - scheduleResp, err := r.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + scheduleResp, err := r.SchedulerClient.ScheduleReservation(ctx, scheduleReq, scheduleOpts) if err != nil { logger.Error(err, "failed to schedule reservation") return ctrl.Result{}, err diff --git a/internal/scheduling/reservations/failover/reservation_scheduling.go b/internal/scheduling/reservations/failover/reservation_scheduling.go index f482f3393..5f8c93767 100644 --- a/internal/scheduling/reservations/failover/reservation_scheduling.go +++ b/internal/scheduling/reservations/failover/reservation_scheduling.go @@ -11,6 +11,7 @@ import ( api "github.com/cobaltcore-dev/cortex/api/external/nova" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" ) @@ -91,7 +92,7 @@ func (c *FailoverReservationController) queryHypervisorsFromScheduler(ctx contex "eligibleHypervisors", len(eligibleHypervisors), "ignoreHypervisors", ignoreHypervisors) - scheduleResp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + scheduleResp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq, lib.Options{LockReservations: true}) if err != nil { logger.Error(err, "failed to schedule failover reservation", "vmUUID", vm.UUID, "pipeline", pipeline) return nil, fmt.Errorf("failed to schedule failover reservation: %w", err) @@ -222,7 +223,7 @@ func (c *FailoverReservationController) validateVMViaSchedulerEvacuation( "vmCurrentHost", vm.CurrentHypervisor, "pipeline", PipelineAcknowledgeFailoverReservation) - resp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq) + resp, err := c.SchedulerClient.ScheduleReservation(ctx, scheduleReq, lib.Options{ReadOnly: true}) if err != nil { logger.Error(err, "failed to validate VM for reservation host", "vmUUID", vm.UUID, "reservationHost", reservationHost) return false, fmt.Errorf("failed to validate VM for reservation host: %w", err) diff --git a/internal/scheduling/reservations/scheduler_client.go b/internal/scheduling/reservations/scheduler_client.go index a42172ce2..f10ad21d0 100644 --- a/internal/scheduling/reservations/scheduler_client.go +++ b/internal/scheduling/reservations/scheduler_client.go @@ -12,6 +12,7 @@ import ( "time" api "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" "github.com/go-logr/logr" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -89,7 +90,7 @@ type ScheduleReservationResponse struct { // ScheduleReservation calls the external scheduler API to find a host for a reservation. // The context should contain GlobalRequestID and RequestID for logging (use WithGlobalRequestID/WithRequestID). -func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleReservationRequest) (*ScheduleReservationResponse, error) { +func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleReservationRequest, opts lib.Options) (*ScheduleReservationResponse, error) { logger := loggerFromContext(ctx) // Build weights map (all zero for reservations) @@ -115,6 +116,7 @@ func (c *SchedulerClient) ScheduleReservation(ctx context.Context, req ScheduleR Pipeline: req.Pipeline, Hosts: req.EligibleHosts, Weights: weights, + Options: opts, Context: api.NovaRequestContext{ RequestID: RequestIDFromContext(ctx), GlobalRequestID: globalReqID,