diff --git a/api/v1alpha1/flavor_group_capacity_types.go b/api/v1alpha1/flavor_group_capacity_types.go new file mode 100644 index 000000000..a7339dce2 --- /dev/null +++ b/api/v1alpha1/flavor_group_capacity_types.go @@ -0,0 +1,112 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // FlavorGroupCapacityConditionReady indicates the status data is up-to-date. + FlavorGroupCapacityConditionReady = "Ready" +) + +// FlavorGroupCapacitySpec defines the desired state of FlavorGroupCapacity. +type FlavorGroupCapacitySpec struct { + // FlavorGroup is the name of the flavor group (e.g. "hana-v2"). + // +kubebuilder:validation:Required + FlavorGroup string `json:"flavorGroup"` + + // AvailabilityZone is the OpenStack AZ this capacity data covers (e.g. "qa-de-1a"). + // +kubebuilder:validation:Required + AvailabilityZone string `json:"availabilityZone"` +} + +// FlavorCapacityStatus holds per-flavor capacity numbers for one (flavor group × AZ) pair. +type FlavorCapacityStatus struct { + // FlavorName is the OpenStack flavor name (e.g. "hana-v2-small"). + FlavorName string `json:"flavorName"` + + // PlaceableHosts is the number of hosts that can still fit this flavor given current allocations. + // +kubebuilder:validation:Optional + PlaceableHosts int64 `json:"placeableHosts,omitempty"` + + // PlaceableVMs is the number of VM slots remaining for this flavor given current allocations. + // +kubebuilder:validation:Optional + PlaceableVMs int64 `json:"placeableVms,omitempty"` + + // TotalCapacityHosts is the number of eligible hosts in an empty-datacenter scenario. + // +kubebuilder:validation:Optional + TotalCapacityHosts int64 `json:"totalCapacityHosts,omitempty"` + + // TotalCapacityVMSlots is the maximum number of VM slots in an empty-datacenter scenario. + // +kubebuilder:validation:Optional + TotalCapacityVMSlots int64 `json:"totalCapacityVmSlots,omitempty"` +} + +// FlavorGroupCapacityStatus defines the observed state of FlavorGroupCapacity. +type FlavorGroupCapacityStatus struct { + // Flavors holds per-flavor capacity data for all flavors in the group. + // +kubebuilder:validation:Optional + Flavors []FlavorCapacityStatus `json:"flavors,omitempty"` + + // CommittedCapacity is the sum of AcceptedAmount across active CommittedResource CRDs, + // expressed in multiples of the smallest flavor's memory. + // +kubebuilder:validation:Optional + CommittedCapacity int64 `json:"committedCapacity,omitempty"` + + // TotalInstances is the total number of VM instances running on hypervisors in this AZ, + // derived from Hypervisor CRD Status.Instances (not filtered by flavor group). + // +kubebuilder:validation:Optional + TotalInstances int64 `json:"totalInstances,omitempty"` + + // LastReconcileAt is the timestamp of the last successful reconcile. + // +kubebuilder:validation:Optional + LastReconcileAt metav1.Time `json:"lastReconcileAt,omitempty"` + + // The current status conditions of the FlavorGroupCapacity. + // +kubebuilder:validation:Optional + Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:printcolumn:name="FlavorGroup",type="string",JSONPath=".spec.flavorGroup" +// +kubebuilder:printcolumn:name="AZ",type="string",JSONPath=".spec.availabilityZone" +// +kubebuilder:printcolumn:name="TotalInstances",type="integer",JSONPath=".status.totalInstances" +// +kubebuilder:printcolumn:name="LastReconcile",type="date",JSONPath=".status.lastReconcileAt" +// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" + +// FlavorGroupCapacity caches pre-computed capacity data for one flavor group in one AZ. +// One CRD exists per (flavor group × AZ) pair, updated by the capacity controller on a fixed interval. +// The capacity API reads these CRDs instead of probing the scheduler on each request. +type FlavorGroupCapacity struct { + metav1.TypeMeta `json:",inline"` + + // metadata is a standard object metadata + // +optional + metav1.ObjectMeta `json:"metadata,omitempty,omitzero"` + + // spec defines the desired state of FlavorGroupCapacity + // +required + Spec FlavorGroupCapacitySpec `json:"spec"` + + // status defines the observed state of FlavorGroupCapacity + // +optional + Status FlavorGroupCapacityStatus `json:"status,omitempty,omitzero"` +} + +// +kubebuilder:object:root=true + +// FlavorGroupCapacityList contains a list of FlavorGroupCapacity. +type FlavorGroupCapacityList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []FlavorGroupCapacity `json:"items"` +} + +func init() { + SchemeBuilder.Register(&FlavorGroupCapacity{}, &FlavorGroupCapacityList{}) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index d9daa7aab..e75332b77 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -749,6 +749,123 @@ func (in *FilterSpec) DeepCopy() *FilterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlavorCapacityStatus) DeepCopyInto(out *FlavorCapacityStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlavorCapacityStatus. +func (in *FlavorCapacityStatus) DeepCopy() *FlavorCapacityStatus { + if in == nil { + return nil + } + out := new(FlavorCapacityStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlavorGroupCapacity) DeepCopyInto(out *FlavorGroupCapacity) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlavorGroupCapacity. +func (in *FlavorGroupCapacity) DeepCopy() *FlavorGroupCapacity { + if in == nil { + return nil + } + out := new(FlavorGroupCapacity) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FlavorGroupCapacity) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlavorGroupCapacityList) DeepCopyInto(out *FlavorGroupCapacityList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]FlavorGroupCapacity, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlavorGroupCapacityList. +func (in *FlavorGroupCapacityList) DeepCopy() *FlavorGroupCapacityList { + if in == nil { + return nil + } + out := new(FlavorGroupCapacityList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FlavorGroupCapacityList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlavorGroupCapacitySpec) DeepCopyInto(out *FlavorGroupCapacitySpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlavorGroupCapacitySpec. +func (in *FlavorGroupCapacitySpec) DeepCopy() *FlavorGroupCapacitySpec { + if in == nil { + return nil + } + out := new(FlavorGroupCapacitySpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlavorGroupCapacityStatus) DeepCopyInto(out *FlavorGroupCapacityStatus) { + *out = *in + if in.Flavors != nil { + in, out := &in.Flavors, &out.Flavors + *out = make([]FlavorCapacityStatus, len(*in)) + copy(*out, *in) + } + in.LastReconcileAt.DeepCopyInto(&out.LastReconcileAt) + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlavorGroupCapacityStatus. +func (in *FlavorGroupCapacityStatus) DeepCopy() *FlavorGroupCapacityStatus { + if in == nil { + return nil + } + out := new(FlavorGroupCapacityStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *History) DeepCopyInto(out *History) { *out = *in diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 4a09323a4..6449d0e2d 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -56,6 +56,7 @@ import ( "github.com/cobaltcore-dev/cortex/internal/scheduling/nova" "github.com/cobaltcore-dev/cortex/internal/scheduling/pods" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/capacity" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/commitments" commitmentsapi "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/commitments/api" "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/failover" @@ -686,6 +687,29 @@ func main() { "maxVMsToProcess", failoverConfig.MaxVMsToProcess, "vmSelectionRotationInterval", failoverConfig.VMSelectionRotationInterval) } + if slices.Contains(mainConfig.EnabledControllers, "capacity-controller") { + setupLog.Info("enabling controller", "controller", "capacity-controller") + capacityConfig := conf.GetConfigOrDie[capacity.Config]() + capacityConfig.ApplyDefaults() + + capacityMonitor := capacity.NewMonitor(multiclusterClient) + if err := metrics.Registry.Register(&capacityMonitor); err != nil { + setupLog.Error(err, "failed to register capacity monitor metrics, continuing without metrics") + } + + capacityController := capacity.NewController(multiclusterClient, capacityConfig) + if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + return capacityController.Start(ctx) + })); err != nil { + setupLog.Error(err, "unable to add capacity controller to manager") + os.Exit(1) + } + setupLog.Info("capacity-controller registered", + "schedulerURL", capacityConfig.SchedulerURL, + "reconcileInterval", capacityConfig.ReconcileInterval, + "totalPipeline", capacityConfig.TotalPipeline, + "placeablePipeline", capacityConfig.PlaceablePipeline) + } // +kubebuilder:scaffold:builder diff --git a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml index 561d9fc3c..8078c069b 100644 --- a/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml +++ b/helm/bundles/cortex-nova/templates/pipelines_kvm.yaml @@ -557,4 +557,44 @@ spec: VM is allocated get a higher weight, encouraging placement on pre-reserved failover capacity. For non-evacuation requests, this weigher has no effect. +--- +apiVersion: cortex.cloud/v1alpha1 +kind: Pipeline +metadata: + name: kvm-report-capacity +spec: + schedulingDomain: nova + description: | + This pipeline is used by the capacity controller to determine the + theoretical maximum capacity of each flavor group per availability zone, + as if all hosts were completely empty. It ignores current VM allocations + and all reservation blockings so that only raw hardware capacity is + considered. + type: filter-weigher + createDecisions: false + # Fetch all placement candidates, ignoring nova's preselection. + ignorePreselection: true + filters: + - name: filter_correct_az + description: | + Restricts host candidates to the requested availability zone. + - name: filter_has_enough_capacity + description: | + Filters hosts that cannot fit the flavor based on raw hardware capacity. + VM allocations and all reservation types are ignored to represent an + empty datacenter scenario. + params: + - {key: ignoreAllocations, boolValue: true} + - {key: ignoredReservationTypes, stringListValue: ["CommittedResourceReservation", "FailoverReservation"]} + - name: filter_has_requested_traits + description: | + Ensures hosts have the hardware traits required by the flavor. + - name: filter_capabilities + description: | + Ensures hosts meet the compute capabilities required by the flavor + extra specs (e.g., architecture, maxphysaddr bits). + - name: filter_status_conditions + description: | + Excludes hosts that are not ready or are disabled. + weighers: [] {{- end }} diff --git a/helm/bundles/cortex-nova/values.yaml b/helm/bundles/cortex-nova/values.yaml index 7158c51a4..63d9f778b 100644 --- a/helm/bundles/cortex-nova/values.yaml +++ b/helm/bundles/cortex-nova/values.yaml @@ -95,6 +95,8 @@ cortex: &cortex - cortex.cloud/v1alpha1/ReservationList - cortex.cloud/v1alpha1/CommittedResource - cortex.cloud/v1alpha1/CommittedResourceList + - cortex.cloud/v1alpha1/FlavorGroupCapacity + - cortex.cloud/v1alpha1/FlavorGroupCapacityList - kvm.cloud.sap/v1/Hypervisor - kvm.cloud.sap/v1/HypervisorList - v1/Secret @@ -130,6 +132,13 @@ cortex-scheduling-controllers: - hypervisor-overcommit-controller - committed-resource-reservations-controller - failover-reservations-controller + - capacity-controller + # Pipeline used for the empty-state capacity probe (ignores allocations and reservations). + capacityTotalPipeline: "kvm-report-capacity" + # Pipeline used for the current-state capacity probe (considers current VM allocations). + capacityPlaceablePipeline: "kvm-general-purpose-load-balancing" + # How often the capacity controller re-runs its scheduler probes. + capacityReconcileInterval: 5m enabledTasks: - nova-history-cleanup-task # If true, the external scheduler API will limit the list of hosts in its diff --git a/helm/library/cortex/files/crds/cortex.cloud_flavorgroupcapacities.yaml b/helm/library/cortex/files/crds/cortex.cloud_flavorgroupcapacities.yaml new file mode 100644 index 000000000..5f475689e --- /dev/null +++ b/helm/library/cortex/files/crds/cortex.cloud_flavorgroupcapacities.yaml @@ -0,0 +1,190 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.20.1 + name: flavorgroupcapacities.cortex.cloud +spec: + group: cortex.cloud + names: + kind: FlavorGroupCapacity + listKind: FlavorGroupCapacityList + plural: flavorgroupcapacities + singular: flavorgroupcapacity + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .spec.flavorGroup + name: FlavorGroup + type: string + - jsonPath: .spec.availabilityZone + name: AZ + type: string + - jsonPath: .status.totalInstances + name: TotalInstances + type: integer + - jsonPath: .status.lastReconcileAt + name: LastReconcile + type: date + - jsonPath: .status.conditions[?(@.type=='Ready')].status + name: Ready + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: |- + FlavorGroupCapacity caches pre-computed capacity data for one flavor group in one AZ. + One CRD exists per (flavor group × AZ) pair, updated by the capacity controller on a fixed interval. + The capacity API reads these CRDs instead of probing the scheduler on each request. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: spec defines the desired state of FlavorGroupCapacity + properties: + availabilityZone: + description: AvailabilityZone is the OpenStack AZ this capacity data + covers (e.g. "qa-de-1a"). + type: string + flavorGroup: + description: FlavorGroup is the name of the flavor group (e.g. "hana-v2"). + type: string + required: + - availabilityZone + - flavorGroup + type: object + status: + description: status defines the observed state of FlavorGroupCapacity + properties: + committedCapacity: + description: |- + CommittedCapacity is the sum of AcceptedAmount across active CommittedResource CRDs, + expressed in multiples of the smallest flavor's memory. + format: int64 + type: integer + conditions: + description: The current status conditions of the FlavorGroupCapacity. + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + flavors: + description: Flavors holds per-flavor capacity data for all flavors + in the group. + items: + description: FlavorCapacityStatus holds per-flavor capacity numbers + for one (flavor group × AZ) pair. + properties: + flavorName: + description: FlavorName is the OpenStack flavor name (e.g. "hana-v2-small"). + type: string + placeableHosts: + description: PlaceableHosts is the number of hosts that can + still fit this flavor given current allocations. + format: int64 + type: integer + placeableVms: + description: PlaceableVMs is the number of VM slots remaining + for this flavor given current allocations. + format: int64 + type: integer + totalCapacityHosts: + description: TotalCapacityHosts is the number of eligible hosts + in an empty-datacenter scenario. + format: int64 + type: integer + totalCapacityVmSlots: + description: TotalCapacityVMSlots is the maximum number of VM + slots in an empty-datacenter scenario. + format: int64 + type: integer + required: + - flavorName + type: object + type: array + lastReconcileAt: + description: LastReconcileAt is the timestamp of the last successful + reconcile. + format: date-time + type: string + totalInstances: + description: |- + TotalInstances is the total number of VM instances running on hypervisors in this AZ, + derived from Hypervisor CRD Status.Instances (not filtered by flavor group). + format: int64 + type: integer + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/helm/library/cortex/templates/rbac/role.yaml b/helm/library/cortex/templates/rbac/role.yaml index ea75c6897..6b89a2e33 100644 --- a/helm/library/cortex/templates/rbac/role.yaml +++ b/helm/library/cortex/templates/rbac/role.yaml @@ -14,6 +14,7 @@ rules: - datasources - reservations - committedresources + - flavorgroupcapacities - decisions - deschedulings - pipelines @@ -34,6 +35,7 @@ rules: - datasources/finalizers - reservations/finalizers - committedresources/finalizers + - flavorgroupcapacities/finalizers - decisions/finalizers - deschedulings/finalizers - pipelines/finalizers @@ -48,6 +50,7 @@ rules: - datasources/status - reservations/status - committedresources/status + - flavorgroupcapacities/status - decisions/status - deschedulings/status - pipelines/status diff --git a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go index 88e2f07d5..b97d3e0e5 100644 --- a/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go +++ b/internal/scheduling/nova/plugins/filters/filter_has_enough_capacity.go @@ -25,6 +25,10 @@ type FilterHasEnoughCapacityOpts struct { // When a reservation type is in this list, its capacity is not blocked. // Default: empty (all reservation types are considered) IgnoredReservationTypes []v1alpha1.ReservationType `json:"ignoredReservationTypes,omitempty"` + + // IgnoreAllocations skips subtracting current VM allocations from host capacity. + // When true, only raw hardware capacity is considered (empty datacenter scenario). + IgnoreAllocations bool `json:"ignoreAllocations,omitempty"` } func (FilterHasEnoughCapacityOpts) Validate() error { return nil } @@ -80,18 +84,20 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa freeResourcesByHost[hv.Name] = hv.Status.EffectiveCapacity } - // Subtract allocated resources. - for resourceName, allocated := range hv.Status.Allocation { - free, ok := freeResourcesByHost[hv.Name][resourceName] - if !ok { - traceLog.Error( - "hypervisor with allocation for unknown resource", - "host", hv.Name, "resource", resourceName, - ) - continue + // Subtract allocated resources (skip when ignoring allocations for empty-datacenter capacity queries). + if !s.Options.IgnoreAllocations { + for resourceName, allocated := range hv.Status.Allocation { + free, ok := freeResourcesByHost[hv.Name][resourceName] + if !ok { + traceLog.Error( + "hypervisor with allocation for unknown resource", + "host", hv.Name, "resource", resourceName, + ) + continue + } + free.Sub(allocated) + freeResourcesByHost[hv.Name][resourceName] = free } - free.Sub(allocated) - freeResourcesByHost[hv.Name][resourceName] = free } } @@ -190,6 +196,10 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa // Oversize spec-only: if a pending VM is larger than the remaining slot, block its full size. var resourcesToBlock map[hv1.ResourceName]resource.Quantity if reservation.Spec.Type == v1alpha1.ReservationTypeCommittedResource && + // When ignoring allocations (empty-datacenter scenario) VM resources are not + // deducted, so the confirmed-VM adjustment would under-block: always use the + // full slot instead. + !s.Options.IgnoreAllocations && // if the reservation is not being migrated, block only unused resources reservation.Spec.TargetHost == reservation.Status.Host && reservation.Spec.CommittedResourceReservation != nil && diff --git a/internal/scheduling/reservations/capacity/config.go b/internal/scheduling/reservations/capacity/config.go new file mode 100644 index 000000000..dc134e887 --- /dev/null +++ b/internal/scheduling/reservations/capacity/config.go @@ -0,0 +1,53 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package capacity + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Config holds configuration for the capacity controller. +type Config struct { + // ReconcileInterval is how often the controller probes the scheduler and updates CRDs. + ReconcileInterval metav1.Duration `json:"capacityReconcileInterval"` + + // TotalPipeline is the scheduler pipeline used for the empty-state probe. + // This pipeline should ignore current VM allocations (e.g. kvm-report-capacity). + TotalPipeline string `json:"capacityTotalPipeline"` + + // PlaceablePipeline is the scheduler pipeline used for the current-state probe. + // This pipeline considers current VM allocations to determine remaining placement capacity. + PlaceablePipeline string `json:"capacityPlaceablePipeline"` + + // SchedulerURL is the endpoint of the nova external scheduler. + SchedulerURL string `json:"schedulerURL"` +} + +// ApplyDefaults fills in any unset values with defaults. +func (c *Config) ApplyDefaults() { + defaults := DefaultConfig() + if c.ReconcileInterval.Duration == 0 { + c.ReconcileInterval = defaults.ReconcileInterval + } + if c.TotalPipeline == "" { + c.TotalPipeline = defaults.TotalPipeline + } + if c.PlaceablePipeline == "" { + c.PlaceablePipeline = defaults.PlaceablePipeline + } + if c.SchedulerURL == "" { + c.SchedulerURL = defaults.SchedulerURL + } +} + +func DefaultConfig() Config { + return Config{ + ReconcileInterval: metav1.Duration{Duration: 5 * time.Minute}, + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose-load-balancing", + SchedulerURL: "http://localhost:8080/scheduler/nova/external", + } +} diff --git a/internal/scheduling/reservations/capacity/controller.go b/internal/scheduling/reservations/capacity/controller.go new file mode 100644 index 000000000..7a013a0a0 --- /dev/null +++ b/internal/scheduling/reservations/capacity/controller.go @@ -0,0 +1,340 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package capacity + +import ( + "context" + "fmt" + "hash/fnv" + "sort" + "strings" + "time" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "github.com/google/uuid" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + schedulerapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" +) + +var log = ctrl.Log.WithName("capacity-controller").WithValues("module", "capacity") + +// Controller reconciles FlavorGroupCapacity CRDs on a fixed interval. +// For each (flavor group × AZ) pair it probes all flavors in the group and updates the CRD status. +type Controller struct { + client client.Client + schedulerClient *reservations.SchedulerClient + config Config +} + +func NewController(c client.Client, config Config) *Controller { + return &Controller{ + client: c, + schedulerClient: reservations.NewSchedulerClient(config.SchedulerURL), + config: config, + } +} + +// Start runs the periodic reconcile loop. Implements manager.Runnable. +func (c *Controller) Start(ctx context.Context) error { + timer := time.NewTimer(0) // fire immediately on start + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-timer.C: + if err := c.reconcileAll(ctx); err != nil { + log.Error(err, "reconcile cycle failed") + } + timer.Reset(c.config.ReconcileInterval.Duration) + } + } +} + +// reconcileAll iterates all flavor groups × AZs and upserts FlavorGroupCapacity CRDs. +func (c *Controller) reconcileAll(ctx context.Context) error { + knowledge := &reservations.FlavorGroupKnowledgeClient{Client: c.client} + flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil) + if err != nil { + return fmt.Errorf("failed to get flavor groups: %w", err) + } + + var hvList hv1.HypervisorList + if err := c.client.List(ctx, &hvList); err != nil { + return fmt.Errorf("failed to list hypervisors: %w", err) + } + + hvByName := make(map[string]hv1.Hypervisor, len(hvList.Items)) + for _, hv := range hvList.Items { + hvByName[hv.Name] = hv + } + + azs := availabilityZones(hvList.Items) + + for groupName, groupData := range flavorGroups { + for _, az := range azs { + if err := c.reconcileOne(ctx, groupName, groupData, az, hvByName, hvList.Items); err != nil { + log.Error(err, "failed to reconcile flavor group capacity", + "flavorGroup", groupName, "az", az) + // Continue with other pairs rather than aborting the whole cycle. + } + } + } + return nil +} + +// reconcileOne updates the FlavorGroupCapacity CRD for one (group × AZ) pair. +func (c *Controller) reconcileOne( + ctx context.Context, + groupName string, + groupData compute.FlavorGroupFeature, + az string, + hvByName map[string]hv1.Hypervisor, + allHVs []hv1.Hypervisor, +) error { + + smallestFlavorBytes := int64(groupData.SmallestFlavor.MemoryMB) * 1024 * 1024 //nolint:gosec + if smallestFlavorBytes <= 0 { + return fmt.Errorf("smallest flavor %q has invalid memory %d MB", + groupData.SmallestFlavor.Name, groupData.SmallestFlavor.MemoryMB) + } + + crdName := crdNameFor(groupName, az) + + var existing v1alpha1.FlavorGroupCapacity + err := c.client.Get(ctx, types.NamespacedName{Name: crdName}, &existing) + if apierrors.IsNotFound(err) { + existing = v1alpha1.FlavorGroupCapacity{ + ObjectMeta: metav1.ObjectMeta{Name: crdName}, + Spec: v1alpha1.FlavorGroupCapacitySpec{ + FlavorGroup: groupName, + AvailabilityZone: az, + }, + } + if createErr := c.client.Create(ctx, &existing); createErr != nil { + return fmt.Errorf("failed to create FlavorGroupCapacity %s: %w", crdName, createErr) + } + } else if err != nil { + return fmt.Errorf("failed to get FlavorGroupCapacity %s: %w", crdName, err) + } + + // Build a lookup of existing per-flavor data so we can preserve stale values on probe failure. + existingByName := make(map[string]v1alpha1.FlavorCapacityStatus, len(existing.Status.Flavors)) + for _, f := range existing.Status.Flavors { + existingByName[f.FlavorName] = f + } + + // Probe all flavors in the group. Sort for stable CRD output. + flavors := make([]compute.FlavorInGroup, len(groupData.Flavors)) + copy(flavors, groupData.Flavors) + sort.Slice(flavors, func(i, j int) bool { return flavors[i].Name < flavors[j].Name }) + + allFresh := true + newFlavors := make([]v1alpha1.FlavorCapacityStatus, 0, len(flavors)) + for _, flavor := range flavors { + cur := existingByName[flavor.Name] + cur.FlavorName = flavor.Name + + totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName) + placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName) + + if totalErr != nil { + allFresh = false + } else { + cur.TotalCapacityVMSlots = totalVMSlots + cur.TotalCapacityHosts = totalHosts + } + if placeableErr != nil { + allFresh = false + } else { + cur.PlaceableVMs = placeableVMs + cur.PlaceableHosts = placeableHosts + } + newFlavors = append(newFlavors, cur) + } + + // Count total instances and committed capacity (always available regardless of probe results). + totalInstances := countInstancesInAZ(allHVs, az) + committedCapacity, committedErr := c.sumCommittedCapacity(ctx, groupName, az, smallestFlavorBytes) + if committedErr != nil { + log.Error(committedErr, "failed to sum committed capacity", "flavorGroup", groupName, "az", az) + committedCapacity = 0 + } + + patch := client.MergeFrom(existing.DeepCopy()) + existing.Status.Flavors = newFlavors + existing.Status.TotalInstances = totalInstances + existing.Status.CommittedCapacity = committedCapacity + existing.Status.LastReconcileAt = metav1.Now() + + freshCondition := metav1.Condition{ + Type: v1alpha1.FlavorGroupCapacityConditionReady, + ObservedGeneration: existing.Generation, + } + if allFresh { + freshCondition.Status = metav1.ConditionTrue + freshCondition.Reason = "ReconcileSucceeded" + freshCondition.Message = "capacity data is up-to-date" + } else { + freshCondition.Status = metav1.ConditionFalse + freshCondition.Reason = "ReconcileFailed" + freshCondition.Message = "one or more flavor probes failed" + } + meta.SetStatusCondition(&existing.Status.Conditions, freshCondition) + + if patchErr := c.client.Status().Patch(ctx, &existing, patch); patchErr != nil { + return fmt.Errorf("failed to patch FlavorGroupCapacity %s status: %w", crdName, patchErr) + } + return nil +} + +// probeScheduler calls the scheduler with the given pipeline and returns VM slots + host count. +// Capacity is computed as sum of floor(hostMemory / flavorMemory) across returned hosts. +func (c *Controller) probeScheduler( + ctx context.Context, + flavor compute.FlavorInGroup, + az, pipeline string, + hvByName map[string]hv1.Hypervisor, +) (capacity, hosts int64, err error) { + + flavorBytes := int64(flavor.MemoryMB) * 1024 * 1024 //nolint:gosec + if flavorBytes <= 0 { + return 0, 0, fmt.Errorf("flavor %q has invalid memory %d MB", flavor.Name, flavor.MemoryMB) + } + + // Build EligibleHosts from all known hypervisors so that novaLimitHostsToRequest + // (which filters the response to hosts present in the request) does not zero out + // the result. The AZ filter in the pipeline handles narrowing to the correct AZ. + eligibleHosts := make([]schedulerapi.ExternalSchedulerHost, 0, len(hvByName)) + for name := range hvByName { + eligibleHosts = append(eligibleHosts, schedulerapi.ExternalSchedulerHost{ComputeHost: name}) + } + + resp, err := c.schedulerClient.ScheduleReservation(ctx, reservations.ScheduleReservationRequest{ + InstanceUUID: uuid.New().String(), + ProjectID: "cortex-capacity-probe", + FlavorName: flavor.Name, + MemoryMB: flavor.MemoryMB, + VCPUs: flavor.VCPUs, + FlavorExtraSpecs: flavor.ExtraSpecs, + AvailabilityZone: az, + Pipeline: pipeline, + EligibleHosts: eligibleHosts, + }) + if err != nil { + return 0, 0, fmt.Errorf("scheduler call failed (pipeline=%s): %w", pipeline, err) + } + + hosts = int64(len(resp.Hosts)) + for _, hostName := range resp.Hosts { + hv, ok := hvByName[hostName] + if !ok { + continue + } + effectiveCap := hv.Status.EffectiveCapacity + if effectiveCap == nil { + effectiveCap = hv.Status.Capacity + } + if effectiveCap == nil { + continue + } + memCap, ok := effectiveCap[hv1.ResourceMemory] + if !ok { + continue + } + if capBytes := memCap.Value(); capBytes > 0 { + capacity += capBytes / flavorBytes + } + } + return capacity, hosts, nil +} + +// sumCommittedCapacity sums AcceptedAmount (or Spec.Amount as fallback) across all +// CommittedResource CRDs for the given (flavorGroup, az) pair with an active state +// (guaranteed or confirmed) and resource type memory. Returns the total in slots. +func (c *Controller) sumCommittedCapacity(ctx context.Context, groupName, az string, smallestFlavorBytes int64) (int64, error) { + var list v1alpha1.CommittedResourceList + if err := c.client.List(ctx, &list); err != nil { + return 0, fmt.Errorf("failed to list CommittedResources: %w", err) + } + + var total int64 + for _, cr := range list.Items { + if cr.Spec.FlavorGroupName != groupName { + continue + } + if cr.Spec.AvailabilityZone != az { + continue + } + if cr.Spec.ResourceType != v1alpha1.CommittedResourceTypeMemory { + continue + } + if cr.Spec.State != v1alpha1.CommitmentStatusGuaranteed && cr.Spec.State != v1alpha1.CommitmentStatusConfirmed { + continue + } + amount := cr.Spec.Amount + if cr.Status.AcceptedAmount != nil { + amount = *cr.Status.AcceptedAmount + } + if bytes := amount.Value(); bytes > 0 { + total += bytes / smallestFlavorBytes + } + } + return total, nil +} + +// availabilityZones returns a sorted, deduplicated list of AZs from Hypervisor CRD labels. +func availabilityZones(hvs []hv1.Hypervisor) []string { + azSet := make(map[string]struct{}) + for _, hv := range hvs { + if az, ok := hv.Labels["topology.kubernetes.io/zone"]; ok && az != "" { + azSet[az] = struct{}{} + } + } + azs := make([]string, 0, len(azSet)) + for az := range azSet { + azs = append(azs, az) + } + sort.Strings(azs) + return azs +} + +// countInstancesInAZ counts total VM instances across all hypervisors in the given AZ. +func countInstancesInAZ(hvs []hv1.Hypervisor, az string) int64 { + var total int64 + for _, hv := range hvs { + if hv.Labels["topology.kubernetes.io/zone"] != az { + continue + } + total += int64(len(hv.Status.Instances)) + } + return total +} + +// crdNameFor produces a collision-safe DNS label for a (flavorGroup, az) pair. +// A 6-hex-char FNV-1a hash of the raw inputs is appended so that pairs differing only +// by characters that sanitise identically (e.g. "." vs "-") still get unique names. +func crdNameFor(flavorGroup, az string) string { + h := fnv.New32a() + _, _ = h.Write([]byte(flavorGroup + "\x00" + az)) + suffix := fmt.Sprintf("%06x", h.Sum32()&0xFFFFFF) + + prefix := strings.ToLower(flavorGroup + "-" + az) + prefix = strings.ReplaceAll(prefix, "_", "-") + prefix = strings.ReplaceAll(prefix, ".", "-") + if len(prefix) > 56 { // 56 + "-" + 6 = 63 chars (DNS label limit) + prefix = prefix[:56] + } + return prefix + "-" + suffix +} diff --git a/internal/scheduling/reservations/capacity/controller_test.go b/internal/scheduling/reservations/capacity/controller_test.go new file mode 100644 index 000000000..2cb15f3e7 --- /dev/null +++ b/internal/scheduling/reservations/capacity/controller_test.go @@ -0,0 +1,605 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package capacity + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "regexp" + "sort" + "testing" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + schedulerapi "github.com/cobaltcore-dev/cortex/api/external/nova" + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/knowledge/extractor/plugins/compute" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" +) + +// newTestScheme returns a runtime.Scheme with all required types registered. +func newTestScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := v1alpha1.AddToScheme(s); err != nil { + t.Fatalf("failed to add v1alpha1 scheme: %v", err) + } + if err := hv1.AddToScheme(s); err != nil { + t.Fatalf("failed to add hypervisor scheme: %v", err) + } + return s +} + +// newFlavorGroupKnowledge creates a ready Knowledge CRD with a single flavor group. +func newFlavorGroupKnowledge(t *testing.T, groupName string, smallestMemoryMB uint64) *v1alpha1.Knowledge { + t.Helper() + smallestFlavor := compute.FlavorInGroup{ + Name: groupName + "-small", + MemoryMB: smallestMemoryMB, + VCPUs: 2, + ExtraSpecs: map[string]string{"hw:cpu_policy": "dedicated"}, + } + features := []compute.FlavorGroupFeature{ + { + Name: groupName, + SmallestFlavor: smallestFlavor, + Flavors: []compute.FlavorInGroup{smallestFlavor}, + }, + } + raw, err := v1alpha1.BoxFeatureList(features) + if err != nil { + t.Fatalf("failed to box features: %v", err) + } + return &v1alpha1.Knowledge{ + ObjectMeta: metav1.ObjectMeta{Name: "flavor-groups"}, + Spec: v1alpha1.KnowledgeSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Extractor: v1alpha1.KnowledgeExtractorSpec{Name: "flavor_groups"}, + }, + Status: v1alpha1.KnowledgeStatus{ + Raw: raw, + Conditions: []metav1.Condition{ + { + Type: v1alpha1.KnowledgeConditionReady, + Status: metav1.ConditionTrue, + Reason: "ExtractorSucceeded", + }, + }, + }, + } +} + +// newHypervisor creates a Hypervisor CRD with a topology AZ label and effective capacity. +func newHypervisor(name, az string, memoryBytes int64, instanceIDs ...string) *hv1.Hypervisor { + hv := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"topology.kubernetes.io/zone": az}, + }, + } + if memoryBytes > 0 { + qty := resource.NewQuantity(memoryBytes, resource.BinarySI) + hv.Status.EffectiveCapacity = map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: *qty, + } + } + for _, id := range instanceIDs { + hv.Status.Instances = append(hv.Status.Instances, hv1.Instance{ID: id}) + } + return hv +} + +// newMockSchedulerServer creates an httptest server that always returns the given host list. +func newMockSchedulerServer(t *testing.T, hosts []string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + resp := schedulerapi.ExternalSchedulerResponse{Hosts: hosts} + if err := json.NewEncoder(w).Encode(resp); err != nil { + t.Errorf("mock scheduler: failed to encode response: %v", err) + } + })) +} + +// --- unit tests for pure helper functions --- + +var ( + dnsLabelRE = regexp.MustCompile(`^[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$`) + hashSuffixRE = regexp.MustCompile(`^[0-9a-f]{6}$`) +) + +func TestCrdNameFor(t *testing.T) { + tests := []struct { + group, az string + wantPrefix string + }{ + {"hana-v2", "qa-de-1a", "hana-v2-qa-de-1a-"}, + {"My_Group", "eu.west.1", "my-group-eu-west-1-"}, + {"G", "AZ_1", "g-az-1-"}, + } + for _, tt := range tests { + got := crdNameFor(tt.group, tt.az) + // Must be a valid DNS label (lowercase, hyphens, ≤63 chars). + if len(got) > 63 { + t.Errorf("crdNameFor(%q, %q) = %q (len=%d > 63)", tt.group, tt.az, got, len(got)) + } + if !dnsLabelRE.MatchString(got) { + t.Errorf("crdNameFor(%q, %q) = %q is not a valid DNS label", tt.group, tt.az, got) + } + // Must start with the expected sanitised prefix followed by a 6-hex-char hash suffix. + if len(got) < len(tt.wantPrefix)+6 || got[:len(tt.wantPrefix)] != tt.wantPrefix { + t.Errorf("crdNameFor(%q, %q) = %q, want prefix %q + 6 hex chars", tt.group, tt.az, got, tt.wantPrefix) + } + hashPart := got[len(tt.wantPrefix):] + if !hashSuffixRE.MatchString(hashPart) { + t.Errorf("crdNameFor(%q, %q) hash suffix %q is not 6 hex chars", tt.group, tt.az, hashPart) + } + } + + // Inputs that differ only by "." vs "-" must produce different CRD names. + dotName := crdNameFor("hana.v2", "qa-de-1a") + dashName := crdNameFor("hana-v2", "qa-de-1a") + if dotName == dashName { + t.Errorf("crdNameFor collision: hana.v2 and hana-v2 both produced %q", dotName) + } +} + +func TestAvailabilityZones(t *testing.T) { + hvs := []hv1.Hypervisor{ + *newHypervisor("h1", "az-a", 0), + *newHypervisor("h2", "az-b", 0), + *newHypervisor("h3", "az-a", 0), // duplicate + {ObjectMeta: metav1.ObjectMeta{Name: "h4"}}, // no label + } + got := availabilityZones(hvs) + want := []string{"az-a", "az-b"} + if len(got) != len(want) { + t.Fatalf("availabilityZones() = %v, want %v", got, want) + } + sort.Strings(got) + for i := range want { + if got[i] != want[i] { + t.Errorf("availabilityZones()[%d] = %q, want %q", i, got[i], want[i]) + } + } +} + +func TestCountInstancesInAZ(t *testing.T) { + hvs := []hv1.Hypervisor{ + *newHypervisor("h1", "az-a", 0, "vm1", "vm2"), + *newHypervisor("h2", "az-a", 0, "vm3"), + *newHypervisor("h3", "az-b", 0, "vm4"), + } + if got := countInstancesInAZ(hvs, "az-a"); got != 3 { + t.Errorf("countInstancesInAZ(az-a) = %d, want 3", got) + } + if got := countInstancesInAZ(hvs, "az-b"); got != 1 { + t.Errorf("countInstancesInAZ(az-b) = %d, want 1", got) + } + if got := countInstancesInAZ(hvs, "az-c"); got != 0 { + t.Errorf("countInstancesInAZ(az-c) = %d, want 0", got) + } +} + +// --- integration-style tests for reconcileOne --- + +func TestReconcileOne_CreatesCRD(t *testing.T) { + const ( + groupName = "hana-v2" + az = "qa-de-1a" + memMB = 4096 // 4 GiB + memBytes = int64(memMB) * 1024 * 1024 + ) + + scheme := newTestScheme(t) + hv := newHypervisor("host-1", az, memBytes, "vm1") + knowledge := newFlavorGroupKnowledge(t, groupName, memMB) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge, hv). + WithStatusSubresource(&v1alpha1.FlavorGroupCapacity{}, &v1alpha1.Knowledge{}). + Build() + + // Both probes return host-1 so capacity = floor(4GiB/4GiB) = 1 + schedulerServer := newMockSchedulerServer(t, []string{"host-1"}) + defer schedulerServer.Close() + + ctrl := NewController(fakeClient, Config{ + SchedulerURL: schedulerServer.URL, + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose", + }) + + smallFlavor := compute.FlavorInGroup{Name: groupName + "-small", MemoryMB: memMB, VCPUs: 2} + groupData := compute.FlavorGroupFeature{ + SmallestFlavor: smallFlavor, + Flavors: []compute.FlavorInGroup{smallFlavor}, + } + hvByName := map[string]hv1.Hypervisor{"host-1": *hv} + + if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil { + t.Fatalf("reconcileOne failed: %v", err) + } + + var crd v1alpha1.FlavorGroupCapacity + if err := fakeClient.Get(context.Background(), types.NamespacedName{Name: crdNameFor(groupName, az)}, &crd); err != nil { + t.Fatalf("failed to get CRD: %v", err) + } + if len(crd.Status.Flavors) != 1 { + t.Fatalf("len(Status.Flavors) = %d, want 1", len(crd.Status.Flavors)) + } + f := crd.Status.Flavors[0] + if f.FlavorName != groupName+"-small" { + t.Errorf("FlavorName = %q, want %q", f.FlavorName, groupName+"-small") + } + if f.TotalCapacityVMSlots != 1 { + t.Errorf("TotalCapacityVMSlots = %d, want 1", f.TotalCapacityVMSlots) + } + if f.TotalCapacityHosts != 1 { + t.Errorf("TotalCapacityHosts = %d, want 1", f.TotalCapacityHosts) + } + if f.PlaceableVMs != 1 { + t.Errorf("PlaceableVMs = %d, want 1", f.PlaceableVMs) + } + if f.PlaceableHosts != 1 { + t.Errorf("PlaceableHosts = %d, want 1", f.PlaceableHosts) + } + if crd.Status.TotalInstances != 1 { + t.Errorf("TotalInstances = %d, want 1", crd.Status.TotalInstances) + } +} + +func TestReconcileOne_SetsReadyConditionFalseOnSchedulerError(t *testing.T) { + const ( + groupName = "hana-v2" + az = "qa-de-1a" + memMB = 2048 + ) + + scheme := newTestScheme(t) + knowledge := newFlavorGroupKnowledge(t, groupName, memMB) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge). + WithStatusSubresource(&v1alpha1.FlavorGroupCapacity{}, &v1alpha1.Knowledge{}). + Build() + + // Scheduler returns 500 to simulate error + failServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer failServer.Close() + + ctrl := NewController(fakeClient, Config{ + SchedulerURL: failServer.URL, + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose", + }) + + smallFlavor := compute.FlavorInGroup{Name: groupName + "-small", MemoryMB: memMB, VCPUs: 2} + groupData := compute.FlavorGroupFeature{ + SmallestFlavor: smallFlavor, + Flavors: []compute.FlavorInGroup{smallFlavor}, + } + + // reconcileOne returns no error itself (it continues on probe failure), but sets Ready=False + if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, map[string]hv1.Hypervisor{}, []hv1.Hypervisor{}); err != nil { + t.Fatalf("reconcileOne failed: %v", err) + } + + var crd v1alpha1.FlavorGroupCapacity + if err := fakeClient.Get(context.Background(), types.NamespacedName{Name: crdNameFor(groupName, az)}, &crd); err != nil { + t.Fatalf("failed to get CRD: %v", err) + } + + var freshStatus metav1.ConditionStatus + for _, c := range crd.Status.Conditions { + if c.Type == v1alpha1.FlavorGroupCapacityConditionReady { + freshStatus = c.Status + } + } + if freshStatus != metav1.ConditionFalse { + t.Errorf("Ready condition = %q, want %q", freshStatus, metav1.ConditionFalse) + } +} + +func TestReconcileOne_IdempotentUpdate(t *testing.T) { + const ( + groupName = "hana-v2" + az = "qa-de-1a" + memMB = 2048 + memBytes = int64(memMB) * 1024 * 1024 + ) + + scheme := newTestScheme(t) + hv := newHypervisor("host-1", az, memBytes) + knowledge := newFlavorGroupKnowledge(t, groupName, memMB) + crdName := crdNameFor(groupName, az) + + // Pre-create the CRD to test the update path (not create path) + existing := &v1alpha1.FlavorGroupCapacity{ + ObjectMeta: metav1.ObjectMeta{Name: crdName}, + Spec: v1alpha1.FlavorGroupCapacitySpec{ + FlavorGroup: groupName, + AvailabilityZone: az, + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge, hv, existing). + WithStatusSubresource(&v1alpha1.FlavorGroupCapacity{}, &v1alpha1.Knowledge{}). + Build() + + schedulerServer := newMockSchedulerServer(t, []string{"host-1"}) + defer schedulerServer.Close() + + ctrl := NewController(fakeClient, Config{ + SchedulerURL: schedulerServer.URL, + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose", + }) + + smallFlavor := compute.FlavorInGroup{Name: groupName + "-small", MemoryMB: memMB, VCPUs: 2} + groupData := compute.FlavorGroupFeature{ + SmallestFlavor: smallFlavor, + Flavors: []compute.FlavorInGroup{smallFlavor}, + } + hvByName := map[string]hv1.Hypervisor{"host-1": *hv} + + // First call + if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil { + t.Fatalf("first reconcileOne failed: %v", err) + } + // Second call — should not error on the already-existing CRD + if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil { + t.Fatalf("second reconcileOne failed: %v", err) + } + + var crd v1alpha1.FlavorGroupCapacity + if err := fakeClient.Get(context.Background(), types.NamespacedName{Name: crdName}, &crd); err != nil { + t.Fatalf("failed to get CRD: %v", err) + } + if len(crd.Status.Flavors) != 1 { + t.Fatalf("len(Status.Flavors) = %d, want 1", len(crd.Status.Flavors)) + } + if crd.Status.Flavors[0].TotalCapacityVMSlots != 1 { + t.Errorf("TotalCapacityVMSlots = %d, want 1", crd.Status.Flavors[0].TotalCapacityVMSlots) + } +} + +func TestReconcileAll_SkipsGroupsWithNoAZs(t *testing.T) { + scheme := newTestScheme(t) + knowledge := newFlavorGroupKnowledge(t, "hana-v2", 2048) + + // No hypervisors → no AZs → reconcileAll returns without error + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge). + WithStatusSubresource(&v1alpha1.FlavorGroupCapacity{}, &v1alpha1.Knowledge{}). + Build() + + ctrl := NewController(fakeClient, Config{ + SchedulerURL: "http://localhost:9999", // unreachable; not called + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose", + }) + + if err := ctrl.reconcileAll(context.Background()); err != nil { + t.Errorf("reconcileAll with no hypervisors returned error: %v", err) + } + + var list v1alpha1.FlavorGroupCapacityList + if err := fakeClient.List(context.Background(), &list); err != nil { + t.Fatalf("failed to list CRDs: %v", err) + } + if len(list.Items) != 0 { + t.Errorf("expected 0 CRDs, got %d", len(list.Items)) + } +} + +func TestProbeScheduler_CapacityCalculation(t *testing.T) { + const memMB = 4096 + const memBytes = int64(memMB) * 1024 * 1024 + + scheme := newTestScheme(t) + hv1Obj := newHypervisor("host-1", "az-a", memBytes) + hv2Obj := newHypervisor("host-2", "az-a", memBytes*2) // 2x memory + + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + + // Scheduler returns both hosts + srv := newMockSchedulerServer(t, []string{"host-1", "host-2"}) + defer srv.Close() + + c := NewController(fakeClient, Config{SchedulerURL: srv.URL}) + hvByName := map[string]hv1.Hypervisor{ + "host-1": *hv1Obj, + "host-2": *hv2Obj, + } + flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB} + + capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName) + if err != nil { + t.Fatalf("probeScheduler failed: %v", err) + } + if hosts != 2 { + t.Errorf("hosts = %d, want 2", hosts) + } + // host-1 = 1 slot (4GiB/4GiB), host-2 = 2 slots (8GiB/4GiB) + if capacity != 3 { + t.Errorf("capacity = %d, want 3", capacity) + } +} + +func TestReconcileAll_MultipleGroupsAndAZs(t *testing.T) { + scheme := newTestScheme(t) + + const memMB = 2048 + const memBytes = int64(memMB) * 1024 * 1024 + + // Two AZs, two hypervisors + hv1Obj := newHypervisor("h1", "az-a", memBytes) + hv2Obj := newHypervisor("h2", "az-b", memBytes) + knowledge := newFlavorGroupKnowledge(t, "2152", memMB) + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge, hv1Obj, hv2Obj). + WithStatusSubresource(&v1alpha1.FlavorGroupCapacity{}, &v1alpha1.Knowledge{}). + Build() + + srv := newMockSchedulerServer(t, []string{}) + defer srv.Close() + + c := NewController(fakeClient, Config{ + SchedulerURL: srv.URL, + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose", + }) + + if err := c.reconcileAll(context.Background()); err != nil { + t.Fatalf("reconcileAll failed: %v", err) + } + + // Expect one CRD per AZ for the single group + var list v1alpha1.FlavorGroupCapacityList + if err := fakeClient.List(context.Background(), &list); err != nil { + t.Fatalf("failed to list CRDs: %v", err) + } + if len(list.Items) != 2 { + names := make([]string, len(list.Items)) + for i, item := range list.Items { + names[i] = item.Name + } + t.Errorf("expected 2 CRDs (one per AZ), got %d: %v", len(list.Items), names) + } +} + +func TestReconcileAll_FlavorGroupsKnowledgeNotReady(t *testing.T) { + scheme := newTestScheme(t) + + // Knowledge CRD exists but is not Ready + knowledge := &v1alpha1.Knowledge{ + ObjectMeta: metav1.ObjectMeta{Name: "flavor-groups"}, + Spec: v1alpha1.KnowledgeSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Extractor: v1alpha1.KnowledgeExtractorSpec{Name: "flavor_groups"}, + }, + Status: v1alpha1.KnowledgeStatus{ + Conditions: []metav1.Condition{ + { + Type: v1alpha1.KnowledgeConditionReady, + Status: metav1.ConditionFalse, + Reason: "NotReady", + }, + }, + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(knowledge). + WithStatusSubresource(&v1alpha1.Knowledge{}). + Build() + + c := NewController(fakeClient, Config{ + SchedulerURL: "http://localhost:9999", + TotalPipeline: "kvm-report-capacity", + PlaceablePipeline: "kvm-general-purpose", + }) + + // Should return an error when knowledge is not ready + if err := c.reconcileAll(context.Background()); err == nil { + t.Error("reconcileAll should fail when flavor groups knowledge is not ready") + } +} + +func TestReconcileOne_ZeroMemoryFlavorReturnsError(t *testing.T) { + scheme := newTestScheme(t) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + c := NewController(fakeClient, Config{}) + + groupData := compute.FlavorGroupFeature{ + SmallestFlavor: compute.FlavorInGroup{Name: "bad-flavor", MemoryMB: 0}, + } + err := c.reconcileOne(context.Background(), "hana-v2", groupData, "az-a", nil, nil) + if err == nil { + t.Error("expected error for zero-memory flavor") + } +} + +// Verify that the module-level log variable from reservations package doesn't +// collide with the one in this package. +func TestPackageLogVar(t *testing.T) { + _ = reservations.NewSchedulerClient("http://localhost") +} + +func TestSumCommittedCapacity(t *testing.T) { + const ( + groupName = "hana-v2" + az = "qa-de-1a" + memMB = 4096 + memBytes = int64(memMB) * 1024 * 1024 + ) + + newCR := func(name, group, zone string, state v1alpha1.CommitmentStatus, resType v1alpha1.CommittedResourceType, amount string, acceptedAmount string) *v1alpha1.CommittedResource { + qty := resource.MustParse(amount) + cr := &v1alpha1.CommittedResource{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: v1alpha1.CommittedResourceSpec{ + FlavorGroupName: group, + AvailabilityZone: zone, + State: state, + ResourceType: resType, + Amount: qty, + }, + } + if acceptedAmount != "" { + accepted := resource.MustParse(acceptedAmount) + cr.Status.AcceptedAmount = &accepted + } + return cr + } + + scheme := newTestScheme(t) + objects := []client.Object{ + // Should count: confirmed, memory, right group+AZ, AcceptedAmount set + newCR("cr1", groupName, az, v1alpha1.CommitmentStatusConfirmed, v1alpha1.CommittedResourceTypeMemory, "8Gi", "8Gi"), + // Should count: guaranteed, memory, right group+AZ, no AcceptedAmount → falls back to Spec.Amount + newCR("cr2", groupName, az, v1alpha1.CommitmentStatusGuaranteed, v1alpha1.CommittedResourceTypeMemory, "4Gi", ""), + // Should NOT count: wrong state + newCR("cr3", groupName, az, v1alpha1.CommitmentStatusPlanned, v1alpha1.CommittedResourceTypeMemory, "4Gi", ""), + // Should NOT count: wrong resource type + newCR("cr4", groupName, az, v1alpha1.CommitmentStatusConfirmed, v1alpha1.CommittedResourceTypeCores, "4Gi", ""), + // Should NOT count: wrong AZ + newCR("cr5", groupName, "other-az", v1alpha1.CommitmentStatusConfirmed, v1alpha1.CommittedResourceTypeMemory, "4Gi", ""), + // Should NOT count: wrong flavor group + newCR("cr6", "other-group", az, v1alpha1.CommitmentStatusConfirmed, v1alpha1.CommittedResourceTypeMemory, "4Gi", ""), + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + Build() + + c := NewController(fakeClient, Config{}) + // smallestFlavorBytes = 4GiB → cr1 = 8GiB/4GiB = 2 slots, cr2 = 4GiB/4GiB = 1 slot → total = 3 + got, err := c.sumCommittedCapacity(context.Background(), groupName, az, memBytes) + if err != nil { + t.Fatalf("sumCommittedCapacity failed: %v", err) + } + if got != 3 { + t.Errorf("sumCommittedCapacity = %d, want 3", got) + } +} diff --git a/internal/scheduling/reservations/capacity/metrics.go b/internal/scheduling/reservations/capacity/metrics.go new file mode 100644 index 000000000..bd13ca7ca --- /dev/null +++ b/internal/scheduling/reservations/capacity/metrics.go @@ -0,0 +1,118 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package capacity + +import ( + "context" + "time" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + capacityLabels = []string{"flavor_group", "az"} + capacityFlavorLabels = []string{"flavor_group", "az", "flavor_name"} +) + +// Monitor provides Prometheus metrics for FlavorGroupCapacity CRDs. +// It implements prometheus.Collector and reads CRD status on each Collect call. +type Monitor struct { + client client.Client + totalCapacityVMSlots *prometheus.GaugeVec + placeableVMs *prometheus.GaugeVec + totalCapacityHosts *prometheus.GaugeVec + placeableHosts *prometheus.GaugeVec + totalInstances *prometheus.GaugeVec + committedCapacity *prometheus.GaugeVec +} + +// NewMonitor creates a new Monitor that reads FlavorGroupCapacity CRDs. +func NewMonitor(c client.Client) Monitor { + return Monitor{ + client: c, + totalCapacityVMSlots: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_committed_resource_capacity_total", + Help: "Total schedulable slots in an empty-datacenter scenario per flavor.", + }, capacityFlavorLabels), + placeableVMs: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_committed_resource_capacity_placeable", + Help: "Schedulable slots remaining given current VM allocations per flavor.", + }, capacityFlavorLabels), + totalCapacityHosts: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_committed_resource_capacity_hosts_total", + Help: "Number of hosts eligible for this flavor in the empty-state probe.", + }, capacityFlavorLabels), + placeableHosts: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_committed_resource_capacity_hosts_placeable", + Help: "Number of hosts still able to accept a new VM of this flavor.", + }, capacityFlavorLabels), + totalInstances: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_committed_resource_capacity_instances", + Help: "Total VM instances running on hypervisors in this AZ (not filtered by flavor group).", + }, capacityLabels), + committedCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_committed_resource_capacity_committed", + Help: "Sum of AcceptedAmount across Ready CommittedResource CRDs for this flavor group and AZ.", + }, capacityLabels), + } +} + +// Describe implements prometheus.Collector. +func (m *Monitor) Describe(ch chan<- *prometheus.Desc) { + m.totalCapacityVMSlots.Describe(ch) + m.placeableVMs.Describe(ch) + m.totalCapacityHosts.Describe(ch) + m.placeableHosts.Describe(ch) + m.totalInstances.Describe(ch) + m.committedCapacity.Describe(ch) +} + +// Collect implements prometheus.Collector — lists all FlavorGroupCapacity CRDs and exports gauges. +func (m *Monitor) Collect(ch chan<- prometheus.Metric) { + var list v1alpha1.FlavorGroupCapacityList + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := m.client.List(ctx, &list); err != nil { + log.Error(err, "failed to list FlavorGroupCapacity CRDs for metrics") + return + } + + // Reset all gauges so deleted CRDs don't linger. + m.totalCapacityVMSlots.Reset() + m.placeableVMs.Reset() + m.totalCapacityHosts.Reset() + m.placeableHosts.Reset() + m.totalInstances.Reset() + m.committedCapacity.Reset() + + for _, crd := range list.Items { + groupAZLabels := prometheus.Labels{ + "flavor_group": crd.Spec.FlavorGroup, + "az": crd.Spec.AvailabilityZone, + } + m.totalInstances.With(groupAZLabels).Set(float64(crd.Status.TotalInstances)) + m.committedCapacity.With(groupAZLabels).Set(float64(crd.Status.CommittedCapacity)) + + for _, f := range crd.Status.Flavors { + flavorLabels := prometheus.Labels{ + "flavor_group": crd.Spec.FlavorGroup, + "az": crd.Spec.AvailabilityZone, + "flavor_name": f.FlavorName, + } + m.totalCapacityVMSlots.With(flavorLabels).Set(float64(f.TotalCapacityVMSlots)) + m.placeableVMs.With(flavorLabels).Set(float64(f.PlaceableVMs)) + m.totalCapacityHosts.With(flavorLabels).Set(float64(f.TotalCapacityHosts)) + m.placeableHosts.With(flavorLabels).Set(float64(f.PlaceableHosts)) + } + } + + m.totalCapacityVMSlots.Collect(ch) + m.placeableVMs.Collect(ch) + m.totalCapacityHosts.Collect(ch) + m.placeableHosts.Collect(ch) + m.totalInstances.Collect(ch) + m.committedCapacity.Collect(ch) +}