diff --git a/api/v1alpha1/committed_resource_types.go b/api/v1alpha1/committed_resource_types.go index 31365887f..60b4a833d 100644 --- a/api/v1alpha1/committed_resource_types.go +++ b/api/v1alpha1/committed_resource_types.go @@ -111,6 +111,18 @@ type CommittedResourceStatus struct { // +kubebuilder:validation:Optional AcceptedAmount *resource.Quantity `json:"acceptedAmount,omitempty"` + // AcceptedSpec is a snapshot of Spec from the last successful reconcile. + // Used by rollbackToAccepted to restore the exact previously-accepted placement (AZ, amount, + // project, domain, flavor group) even when the current spec has already been mutated to a new value. + // +kubebuilder:validation:Optional + AcceptedSpec *CommittedResourceSpec `json:"acceptedSpec,omitempty"` + + // ConsecutiveFailures counts reconcile cycles that ended in a placement failure (applyErr or anyFailed). + // Reset to 0 on successful acceptance. Used to compute exponential backoff for the retry interval + // and to suppress Reservation watch re-enqueues while backing off. + // +kubebuilder:validation:Optional + ConsecutiveFailures int32 `json:"consecutiveFailures,omitempty"` + // AcceptedAt is when the controller last successfully reconciled the spec into Reservation slots. // +kubebuilder:validation:Optional AcceptedAt *metav1.Time `json:"acceptedAt,omitempty"` @@ -124,20 +136,27 @@ type CommittedResourceStatus struct { // +kubebuilder:validation:Optional LastReconcileAt *metav1.Time `json:"lastReconcileAt,omitempty"` - // AssignedVMs holds the UUIDs of VMs deterministically assigned to this committed resource. - // Populated by the usage reconciler; used to compute UsedAmount and drive the quota controller. + // AssignedInstances holds the UUIDs of VM instances deterministically assigned to this committed resource. + // Populated by the usage reconciler; used to compute UsedResources and drive the quota controller. // +kubebuilder:validation:Optional - AssignedVMs []string `json:"assignedVMs,omitempty"` + AssignedInstances []string `json:"assignedInstances,omitempty"` - // UsedAmount is the sum of assigned VM resources expressed in the same units as Spec.Amount. - // Populated by the usage reconciler. + // UsedResources is the total resource consumption of assigned VM instances, keyed by resource type + // (e.g. "memory" in MiB binary SI, "cpu" as core count). Populated by the usage reconciler. // +kubebuilder:validation:Optional - UsedAmount *resource.Quantity `json:"usedAmount,omitempty"` + UsedResources map[string]resource.Quantity `json:"usedResources,omitempty"` - // LastUsageReconcileAt is when the usage reconciler last updated AssignedVMs and UsedAmount. + // LastUsageReconcileAt is when the usage reconciler last updated AssignedInstances and UsedResources. // +kubebuilder:validation:Optional LastUsageReconcileAt *metav1.Time `json:"lastUsageReconcileAt,omitempty"` + // UsageObservedGeneration is the CR generation that the usage reconciler last processed. + // Follows the Kubernetes observedGeneration pattern: when this differs from + // metadata.generation the cooldown is bypassed so spec changes (e.g. shrink) are reflected + // immediately rather than waiting for the next cooldown interval. + // +kubebuilder:validation:Optional + UsageObservedGeneration *int64 `json:"usageObservedGeneration,omitempty"` + // Conditions holds the current status conditions. // +kubebuilder:validation:Optional Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` @@ -164,7 +183,8 @@ const ( // +kubebuilder:printcolumn:name="AZ",type="string",JSONPath=".spec.availabilityZone" // +kubebuilder:printcolumn:name="Amount",type="string",JSONPath=".spec.amount" // +kubebuilder:printcolumn:name="AcceptedAmount",type="string",JSONPath=".status.acceptedAmount" -// +kubebuilder:printcolumn:name="UsedAmount",type="string",JSONPath=".status.usedAmount" +// +kubebuilder:printcolumn:name="UsedMemory",type="string",JSONPath=".status.usedResources.memory",priority=1 +// +kubebuilder:printcolumn:name="UsedCPU",type="string",JSONPath=".status.usedResources.cpu",priority=1 // +kubebuilder:printcolumn:name="State",type="string",JSONPath=".spec.state" // +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" // +kubebuilder:printcolumn:name="StartTime",type="date",JSONPath=".spec.startTime",priority=1 diff --git a/api/v1alpha1/datasource_types.go b/api/v1alpha1/datasource_types.go index fff321c48..f9963a35c 100644 --- a/api/v1alpha1/datasource_types.go +++ b/api/v1alpha1/datasource_types.go @@ -52,6 +52,7 @@ const ( NovaDatasourceTypeFlavors NovaDatasourceType = "flavors" NovaDatasourceTypeMigrations NovaDatasourceType = "migrations" NovaDatasourceTypeAggregates NovaDatasourceType = "aggregates" + NovaDatasourceTypeImages NovaDatasourceType = "images" ) type NovaDatasource struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index d9daa7aab..7d44d1df9 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -192,6 +192,11 @@ func (in *CommittedResourceStatus) DeepCopyInto(out *CommittedResourceStatus) { x := (*in).DeepCopy() *out = &x } + if in.AcceptedSpec != nil { + in, out := &in.AcceptedSpec, &out.AcceptedSpec + *out = new(CommittedResourceSpec) + (*in).DeepCopyInto(*out) + } if in.AcceptedAt != nil { in, out := &in.AcceptedAt, &out.AcceptedAt *out = (*in).DeepCopy() @@ -204,20 +209,27 @@ func (in *CommittedResourceStatus) DeepCopyInto(out *CommittedResourceStatus) { in, out := &in.LastReconcileAt, &out.LastReconcileAt *out = (*in).DeepCopy() } - if in.AssignedVMs != nil { - in, out := &in.AssignedVMs, &out.AssignedVMs + if in.AssignedInstances != nil { + in, out := &in.AssignedInstances, &out.AssignedInstances *out = make([]string, len(*in)) copy(*out, *in) } - if in.UsedAmount != nil { - in, out := &in.UsedAmount, &out.UsedAmount - x := (*in).DeepCopy() - *out = &x + if in.UsedResources != nil { + in, out := &in.UsedResources, &out.UsedResources + *out = make(map[string]resource.Quantity, len(*in)) + for key, val := range *in { + (*out)[key] = val.DeepCopy() + } } if in.LastUsageReconcileAt != nil { in, out := &in.LastUsageReconcileAt, &out.LastUsageReconcileAt *out = (*in).DeepCopy() } + if in.UsageObservedGeneration != nil { + in, out := &in.UsageObservedGeneration, &out.UsageObservedGeneration + *out = new(int64) + **out = **in + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]v1.Condition, len(*in)) diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 4a09323a4..4168d9409 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -548,14 +548,34 @@ func main() { os.Exit(1) } + crControllerConf := commitmentsConfig.CommittedResourceController + crControllerConf.ApplyDefaults() if err := (&commitments.CommittedResourceController{ Client: multiclusterClient, Scheme: mgr.GetScheme(), - Conf: commitmentsConfig.CommittedResourceController, + Conf: crControllerConf, }).SetupWithManager(mgr, multiclusterClient); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CommittedResource") os.Exit(1) } + + usageReconcilerMonitor := commitments.NewUsageReconcilerMonitor() + metrics.Registry.MustRegister(&usageReconcilerMonitor) + if commitmentsUsageDB == nil { + setupLog.Error(nil, "UsageReconciler requires a datasource but commitments.datasourceName is not configured — skipping") + } else { + usageReconcilerConf := commitmentsConfig.UsageReconciler + usageReconcilerConf.ApplyDefaults() + if err := (&commitments.UsageReconciler{ + Client: multiclusterClient, + Conf: usageReconcilerConf, + UsageDB: commitmentsUsageDB, + Monitor: usageReconcilerMonitor, + }).SetupWithManager(mgr, multiclusterClient); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "CommittedResourceUsage") + os.Exit(1) + } + } } if slices.Contains(mainConfig.EnabledControllers, "datasource-controllers") { setupLog.Info("enabling controller", "controller", "datasource-controllers") diff --git a/helm/bundles/cortex-nova/templates/datasources.yaml b/helm/bundles/cortex-nova/templates/datasources.yaml index f9160602f..582effac2 100644 --- a/helm/bundles/cortex-nova/templates/datasources.yaml +++ b/helm/bundles/cortex-nova/templates/datasources.yaml @@ -337,6 +337,30 @@ spec: --- apiVersion: cortex.cloud/v1alpha1 kind: Datasource +metadata: + name: nova-images +spec: + schedulingDomain: nova + databaseSecretRef: + name: cortex-nova-postgres + namespace: {{ .Release.Namespace }} + {{- if .Values.openstack.sso.enabled }} + ssoSecretRef: + name: cortex-nova-openstack-sso + namespace: {{ .Release.Namespace }} + {{- end }} + type: openstack + openstack: + syncInterval: 3600s + secretRef: + name: cortex-nova-openstack-keystone + namespace: {{ .Release.Namespace }} + type: nova + nova: + type: images +--- +apiVersion: cortex.cloud/v1alpha1 +kind: Datasource metadata: name: limes-project-commitments spec: diff --git a/helm/bundles/cortex-nova/values.yaml b/helm/bundles/cortex-nova/values.yaml index 91ce26ea9..32170eb28 100644 --- a/helm/bundles/cortex-nova/values.yaml +++ b/helm/bundles/cortex-nova/values.yaml @@ -158,8 +158,10 @@ cortex-scheduling-controllers: # URL of the nova external scheduler API for placement decisions schedulerURL: "http://localhost:8080/scheduler/nova/external" committedResourceController: - # Back-off interval while CommittedResource placement is pending or failed + # Back-off interval while CommittedResource placement is pending or failed (base for exponential backoff) requeueIntervalRetry: "1m" + # Maximum back-off interval cap for the exponential retry delay + maxRequeueInterval: "30m" committedResourceAPI: # Timeout for watching CommittedResource CRDs before rolling back watchTimeout: "10s" @@ -187,6 +189,11 @@ cortex-scheduling-controllers: handlesCommitments: false hasCapacity: true hasQuota: false + committedResourceUsageReconciler: + # Minimum time between usage reconcile runs for the same CommittedResource. + # Also acts as the periodic fallback interval: a successful reconcile schedules + # the next run after this duration, so this is also the maximum status staleness. + cooldownInterval: "5m" # OvercommitMappings is a list of mappings that map hypervisor traits to # overcommit ratios. Note that this list is applied in order, so if there # are multiple mappings applying to the same hypervisors, the last mapping diff --git a/helm/library/cortex/files/crds/cortex.cloud_committedresources.yaml b/helm/library/cortex/files/crds/cortex.cloud_committedresources.yaml index 092827edd..22d21c882 100644 --- a/helm/library/cortex/files/crds/cortex.cloud_committedresources.yaml +++ b/helm/library/cortex/files/crds/cortex.cloud_committedresources.yaml @@ -33,8 +33,13 @@ spec: - jsonPath: .status.acceptedAmount name: AcceptedAmount type: string - - jsonPath: .status.usedAmount - name: UsedAmount + - jsonPath: .status.usedResources.memory + name: UsedMemory + priority: 1 + type: string + - jsonPath: .status.usedResources.cpu + name: UsedCPU + priority: 1 type: string - jsonPath: .spec.state name: State @@ -180,10 +185,105 @@ spec: the spec into Reservation slots. format: date-time type: string - assignedVMs: + acceptedSpec: description: |- - AssignedVMs holds the UUIDs of VMs deterministically assigned to this committed resource. - Populated by the usage reconciler; used to compute UsedAmount and drive the quota controller. + AcceptedSpec is a snapshot of Spec from the last successful reconcile. + Used by rollbackToAccepted to restore the exact previously-accepted placement (AZ, amount, + project, domain, flavor group) even when the current spec has already been mutated to a new value. + properties: + allowRejection: + description: |- + AllowRejection controls what the CommittedResource controller does when placement fails + for a guaranteed or confirmed commitment. + true — controller may reject: on failure, child Reservations are rolled back and the CR + is marked Rejected. Use this when the caller is making a first-time placement + decision and a "no" answer is acceptable (e.g. the change-commitments API). + false — controller must retry: on failure, existing child Reservations are kept and the + CR is set to Reserving so the controller retries later. Use this when the caller + is restoring already-committed state that Cortex must honour (e.g. the syncer). + Only meaningful for state=guaranteed or state=confirmed; ignored for all other states. + type: boolean + amount: + anyOf: + - type: integer + - type: string + description: |- + Amount is the total committed quantity. + memory: MiB expressed in K8s binary SI notation (e.g. "1280Gi", "640Mi"). + cores: integer core count (e.g. "40"). + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + availabilityZone: + description: AvailabilityZone specifies the availability zone + for this commitment. + type: string + commitmentUUID: + description: UUID of the commitment this resource corresponds + to. + type: string + confirmedAt: + description: ConfirmedAt is when the commitment was confirmed. + format: date-time + type: string + domainID: + description: DomainID of the OpenStack domain this commitment + belongs to. + type: string + endTime: + description: EndTime is when Reservation slots expire. Nil for + unbounded commitments with no expiry. + format: date-time + type: string + flavorGroupName: + description: FlavorGroupName identifies the flavor group this + commitment targets, e.g. "kvm_v2_hana_s". + type: string + projectID: + description: ProjectID of the OpenStack project this commitment + belongs to. + type: string + resourceType: + description: 'ResourceType identifies the kind of resource committed: + memory drives Reservation slots; cores uses an arithmetic check + only.' + enum: + - memory + - cores + type: string + schedulingDomain: + description: SchedulingDomain specifies the scheduling domain + for this committed resource (e.g., "nova", "ironcore"). + type: string + startTime: + description: |- + StartTime is the activation time for Reservation slots. + Nil for guaranteed commitments (slots are active from creation); set to ConfirmedAt for confirmed ones. + format: date-time + type: string + state: + description: State is the lifecycle state of the commitment. + enum: + - planned + - pending + - guaranteed + - confirmed + - superseded + - expired + type: string + required: + - amount + - availabilityZone + - commitmentUUID + - domainID + - flavorGroupName + - projectID + - resourceType + - state + type: object + assignedInstances: + description: |- + AssignedInstances holds the UUIDs of VM instances deterministically assigned to this committed resource. + Populated by the usage reconciler; used to compute UsedResources and drive the quota controller. items: type: string type: array @@ -244,6 +344,13 @@ spec: - type type: object type: array + consecutiveFailures: + description: |- + ConsecutiveFailures counts reconcile cycles that ended in a placement failure (applyErr or anyFailed). + Reset to 0 on successful acceptance. Used to compute exponential backoff for the retry interval + and to suppress Reservation watch re-enqueues while backing off. + format: int32 + type: integer lastChanged: description: |- LastChanged is when the spec was last written by the syncer. @@ -257,18 +364,28 @@ spec: type: string lastUsageReconcileAt: description: LastUsageReconcileAt is when the usage reconciler last - updated AssignedVMs and UsedAmount. + updated AssignedInstances and UsedResources. format: date-time type: string - usedAmount: - anyOf: - - type: integer - - type: string + usageObservedGeneration: description: |- - UsedAmount is the sum of assigned VM resources expressed in the same units as Spec.Amount. - Populated by the usage reconciler. - pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ - x-kubernetes-int-or-string: true + UsageObservedGeneration is the CR generation that the usage reconciler last processed. + Follows the Kubernetes observedGeneration pattern: when this differs from + metadata.generation the cooldown is bypassed so spec changes (e.g. shrink) are reflected + immediately rather than waiting for the next cooldown interval. + format: int64 + type: integer + usedResources: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + UsedResources is the total resource consumption of assigned VM instances, keyed by resource type + (e.g. "memory" in MiB binary SI, "cpu" as core count). Populated by the usage reconciler. + type: object type: object required: - spec diff --git a/internal/knowledge/datasources/plugins/openstack/controller_test.go b/internal/knowledge/datasources/plugins/openstack/controller_test.go index 899e83237..586238d54 100644 --- a/internal/knowledge/datasources/plugins/openstack/controller_test.go +++ b/internal/knowledge/datasources/plugins/openstack/controller_test.go @@ -104,6 +104,7 @@ func TestNovaDatasourceTypeConstants(t *testing.T) { {v1alpha1.NovaDatasourceTypeFlavors, "flavors"}, {v1alpha1.NovaDatasourceTypeMigrations, "migrations"}, {v1alpha1.NovaDatasourceTypeAggregates, "aggregates"}, + {v1alpha1.NovaDatasourceTypeImages, "images"}, } for _, test := range tests { diff --git a/internal/knowledge/datasources/plugins/openstack/nova/nova_api.go b/internal/knowledge/datasources/plugins/openstack/nova/nova_api.go index ca25868f2..03298e9db 100644 --- a/internal/knowledge/datasources/plugins/openstack/nova/nova_api.go +++ b/internal/knowledge/datasources/plugins/openstack/nova/nova_api.go @@ -10,14 +10,17 @@ import ( "log/slog" "net/http" "net/url" + "strings" "time" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/knowledge/datasources" "github.com/cobaltcore-dev/cortex/pkg/keystone" "github.com/gophercloud/gophercloud/v2" + "github.com/gophercloud/gophercloud/v2/openstack" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates" "github.com/gophercloud/gophercloud/v2/openstack/compute/v2/flavors" + glanceimages "github.com/gophercloud/gophercloud/v2/openstack/image/v2/images" "github.com/gophercloud/gophercloud/v2/pagination" "github.com/prometheus/client_golang/prometheus" ) @@ -37,6 +40,8 @@ type NovaAPI interface { GetAllMigrations(ctx context.Context) ([]Migration, error) // Get all aggregates. GetAllAggregates(ctx context.Context) ([]Aggregate, error) + // Get all Glance images with pre-computed os_type. + GetAllImages(ctx context.Context) ([]Image, error) } // API for OpenStack Nova. @@ -47,8 +52,10 @@ type novaAPI struct { keystoneClient keystone.KeystoneClient // Nova configuration. conf v1alpha1.NovaDatasource - // Authenticated OpenStack service client to fetch the data. + // Authenticated OpenStack compute service client. sc *gophercloud.ServiceClient + // Authenticated Glance image service client (only used for NovaDatasourceTypeImages). + glance *gophercloud.ServiceClient } func NewNovaAPI(mon datasources.Monitor, k keystone.KeystoneClient, conf v1alpha1.NovaDatasource) NovaAPI { @@ -78,6 +85,16 @@ func (api *novaAPI) Init(ctx context.Context) error { // Since 2.61, the extra_specs are returned in the flavor details. Microversion: "2.61", } + // Initialize the Glance client only when this datasource is used for images. + if api.conf.Type == v1alpha1.NovaDatasourceTypeImages { + glanceClient, err := openstack.NewImageV2(provider, gophercloud.EndpointOpts{ + Availability: gophercloud.Availability(sameAsKeystone), + }) + if err != nil { + return fmt.Errorf("failed to create Glance client: %w", err) + } + api.glance = glanceClient + } return nil } @@ -436,3 +453,72 @@ func (api *novaAPI) GetAllAggregates(ctx context.Context) ([]Aggregate, error) { } return aggregates, nil } + +// GetAllImages fetches all Glance images and returns them with pre-computed os_type. +// See deriveOSType for the derivation logic. +func (api *novaAPI) GetAllImages(ctx context.Context) ([]Image, error) { + if api.glance == nil { + return nil, fmt.Errorf("glance client not initialized: datasource type must be %q", v1alpha1.NovaDatasourceTypeImages) + } + + label := Image{}.TableName() + slog.Info("fetching nova data", "label", label) + if api.mon.RequestTimer != nil { + hist := api.mon.RequestTimer.WithLabelValues(label) + timer := prometheus.NewTimer(hist) + defer timer.ObserveDuration() + } + + var result []Image + opts := glanceimages.ListOpts{Limit: 1000} + err := glanceimages.List(api.glance, opts).EachPage(ctx, func(_ context.Context, page pagination.Page) (bool, error) { + imgs, err := glanceimages.ExtractImages(page) + if err != nil { + return false, err + } + for _, img := range imgs { + result = append(result, Image{ + ID: img.ID, + OSType: deriveOSType(img.Properties, img.Tags), + }) + } + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("failed to list Glance images: %w", err) + } + slog.Info("fetched", "label", label, "count", len(result)) + return result, nil +} + +// deriveOSType computes os_type from image properties and tags. +// Mirrors the logic of OSTypeProber.findFromImage in github.com/sapcc/go-bits/liquidapi, +// with two intentional simplifications: +// 1. No regex validation on vmware_ostype — Nova validates that field at VM boot time, +// so any value stored in Glance is already valid. +// 2. Volume-booted VMs are not yet supported — os_type will be "unknown" for them. +// Supporting them would require per-VM Cinder calls (volume_image_metadata.vmware_ostype) +// either at server sync time or via a dedicated datasource. +func deriveOSType(properties map[string]any, tags []string) string { + if v, ok := properties["vmware_ostype"]; ok { + if s, ok := v.(string); ok && s != "" { + return s + } + } + var osType string + for _, tag := range tags { + if after, ok := strings.CutPrefix(tag, "ostype:"); ok { + if osType == "" { + osType = after + } else { + // multiple ostype: tags → ambiguous, fall through to unknown + osType = "" + break + } + } + } + if osType != "" { + return osType + } + return "unknown" +} diff --git a/internal/knowledge/datasources/plugins/openstack/nova/nova_sync.go b/internal/knowledge/datasources/plugins/openstack/nova/nova_sync.go index b8b0bc35d..a2c466c42 100644 --- a/internal/knowledge/datasources/plugins/openstack/nova/nova_sync.go +++ b/internal/knowledge/datasources/plugins/openstack/nova/nova_sync.go @@ -45,6 +45,8 @@ func (s *NovaSyncer) Init(ctx context.Context) error { tables = append(tables, s.DB.AddTable(Migration{})) case v1alpha1.NovaDatasourceTypeAggregates: tables = append(tables, s.DB.AddTable(Aggregate{})) + case v1alpha1.NovaDatasourceTypeImages: + tables = append(tables, s.DB.AddTable(Image{})) } return s.DB.CreateTable(tables...) } @@ -67,6 +69,8 @@ func (s *NovaSyncer) Sync(ctx context.Context) (int64, error) { nResults, err = s.SyncAllMigrations(ctx) case v1alpha1.NovaDatasourceTypeAggregates: nResults, err = s.SyncAllAggregates(ctx) + case v1alpha1.NovaDatasourceTypeImages: + nResults, err = s.SyncAllImages(ctx) } return nResults, err } @@ -192,6 +196,26 @@ func (s *NovaSyncer) SyncAllMigrations(ctx context.Context) (int64, error) { return int64(len(allMigrations)), nil } +// Sync all Glance images into the database with pre-computed os_type. +func (s *NovaSyncer) SyncAllImages(ctx context.Context) (int64, error) { + allImages, err := s.API.GetAllImages(ctx) + if err != nil { + return 0, err + } + err = db.ReplaceAll(s.DB, allImages...) + if err != nil { + return 0, err + } + label := Image{}.TableName() + if s.Mon.ObjectsGauge != nil { + s.Mon.ObjectsGauge.WithLabelValues(label).Set(float64(len(allImages))) + } + if s.Mon.RequestProcessedCounter != nil { + s.Mon.RequestProcessedCounter.WithLabelValues(label).Inc() + } + return int64(len(allImages)), nil +} + // Sync the OpenStack aggregates into the database. func (s *NovaSyncer) SyncAllAggregates(ctx context.Context) (int64, error) { allAggregates, err := s.API.GetAllAggregates(ctx) diff --git a/internal/knowledge/datasources/plugins/openstack/nova/nova_sync_test.go b/internal/knowledge/datasources/plugins/openstack/nova/nova_sync_test.go index b88d25aa6..3f03ff2bf 100644 --- a/internal/knowledge/datasources/plugins/openstack/nova/nova_sync_test.go +++ b/internal/knowledge/datasources/plugins/openstack/nova/nova_sync_test.go @@ -56,6 +56,10 @@ func (m *mockNovaAPI) GetAllAggregates(ctx context.Context) ([]Aggregate, error) return []Aggregate{{Name: "aggregate1"}}, nil } +func (m *mockNovaAPI) GetAllImages(ctx context.Context) ([]Image, error) { + return []Image{{ID: "img-1", OSType: "windows8Server64Guest"}}, nil +} + func TestNovaSyncer_Init(t *testing.T) { dbEnv := testlibDB.SetupDBEnv(t) testDB := db.DB{DbMap: dbEnv.DbMap} @@ -268,3 +272,35 @@ func TestNovaSyncer_SyncAggregates(t *testing.T) { t.Fatalf("expected 1 aggregate, got %d", n) } } + +func TestNovaSyncer_SyncImages(t *testing.T) { + dbEnv := testlibDB.SetupDBEnv(t) + testDB := db.DB{DbMap: dbEnv.DbMap} + defer dbEnv.Close() + mon := datasources.Monitor{} + syncer := &NovaSyncer{ + DB: testDB, + Mon: mon, + Conf: v1alpha1.NovaDatasource{Type: v1alpha1.NovaDatasourceTypeImages}, + API: &mockNovaAPI{}, + } + + ctx := t.Context() + if err := syncer.Init(ctx); err != nil { + t.Fatalf("failed to init images syncer: %v", err) + } + n, err := syncer.SyncAllImages(ctx) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if n != 1 { + t.Fatalf("expected 1 image, got %d", n) + } + var images []Image + if _, err := testDB.Select(&images, "SELECT * FROM "+Image{}.TableName()); err != nil { + t.Fatalf("select images: %v", err) + } + if len(images) != 1 || images[0].ID != "img-1" || images[0].OSType != "windows8Server64Guest" { + t.Errorf("unexpected images in DB: %+v", images) + } +} diff --git a/internal/knowledge/datasources/plugins/openstack/nova/nova_types.go b/internal/knowledge/datasources/plugins/openstack/nova/nova_types.go index 1be2b7a29..2f5d36a33 100644 --- a/internal/knowledge/datasources/plugins/openstack/nova/nova_types.go +++ b/internal/knowledge/datasources/plugins/openstack/nova/nova_types.go @@ -111,6 +111,10 @@ type Server struct { // From nested server.flavor JSON FlavorName string `json:"-" db:"flavor_name"` + // ImageRef is the Glance image UUID the server was booted from. + // Empty string for volume-booted servers. + ImageRef string `json:"-" db:"image_ref"` + // From nested server.fault JSON // The error response code. @@ -136,6 +140,8 @@ func (s *Server) UnmarshalJSON(data []byte) error { aux := &struct { Flavor json.RawMessage `json:"flavor"` Fault *json.RawMessage `json:"fault,omitempty"` + // Nova returns image as a map {"id": "..."} for image-booted or "" for volume-booted. + Image json.RawMessage `json:"image"` *Alias }{ Alias: (*Alias)(s), @@ -151,6 +157,15 @@ func (s *Server) UnmarshalJSON(data []byte) error { return err } s.FlavorName = flavor.Name + // Parse image ref: map → extract id; empty string → leave blank (volume-booted). + if len(aux.Image) > 0 && aux.Image[0] == '{' { + var imageMap struct { + ID string `json:"id"` + } + if err := json.Unmarshal(aux.Image, &imageMap); err == nil { + s.ImageRef = imageMap.ID + } + } var fault struct { Code uint `json:"code"` Created string `json:"created"` @@ -194,20 +209,29 @@ func (s *Server) MarshalJSON() ([]byte, error) { Details: s.FaultDetails, } } + // Represent image as {"id": ""} for image-booted or "" for volume-booted. + var imageVal any + if s.ImageRef != "" { + imageVal = map[string]string{"id": s.ImageRef} + } else { + imageVal = "" + } aux := &struct { Flavor flavor `json:"flavor"` Fault *fault `json:"fault,omitempty"` + Image any `json:"image"` *Alias }{ Alias: (*Alias)(s), Flavor: flavorVal, Fault: faultVal, + Image: imageVal, } return json.Marshal(aux) } // Table in which the openstack model is stored. -func (Server) TableName() string { return "openstack_servers_v2" } +func (Server) TableName() string { return "openstack_servers_v3" } // Index for the openstack model. func (Server) Indexes() map[string][]string { return nil } @@ -481,3 +505,17 @@ func (Aggregate) TableName() string { return "openstack_aggregates_v2" } // Index for the openstack model. func (Aggregate) Indexes() map[string][]string { return nil } + +// Image stores pre-computed os_type for a Glance image UUID. +// Populated by the NovaDatasourceTypeImages syncer from the Glance API. +// Used by the CR usage API to include os_type in VM subresources without live API calls. +type Image struct { + ID string `json:"id" db:"id,primarykey"` + OSType string `json:"os_type" db:"os_type"` +} + +// Table in which the openstack model is stored. +func (Image) TableName() string { return "openstack_images" } + +// Index for the openstack model. +func (Image) Indexes() map[string][]string { return nil } diff --git a/internal/knowledge/extractor/plugins/compute/libvirt_domain_cpu_steal_pct.sql b/internal/knowledge/extractor/plugins/compute/libvirt_domain_cpu_steal_pct.sql index ab3c7b8a7..56b20a980 100644 --- a/internal/knowledge/extractor/plugins/compute/libvirt_domain_cpu_steal_pct.sql +++ b/internal/knowledge/extractor/plugins/compute/libvirt_domain_cpu_steal_pct.sql @@ -3,6 +3,6 @@ SELECT os.os_ext_srv_attr_host AS host, MAX(value) AS max_steal_time_pct FROM kvm_libvirt_domain_metrics kvm -JOIN openstack_servers_v2 os ON os.os_ext_srv_attr_instance_name = kvm.domain +JOIN openstack_servers_v3 os ON os.os_ext_srv_attr_instance_name = kvm.domain WHERE kvm.name = 'kvm_libvirt_domain_steal_pct' AND os.id IS NOT NULL GROUP BY os.os_ext_srv_attr_host, os.id; \ No newline at end of file diff --git a/internal/knowledge/extractor/plugins/compute/vm_host_residency.sql b/internal/knowledge/extractor/plugins/compute/vm_host_residency.sql index c2b4b8846..190f2da19 100644 --- a/internal/knowledge/extractor/plugins/compute/vm_host_residency.sql +++ b/internal/knowledge/extractor/plugins/compute/vm_host_residency.sql @@ -21,7 +21,7 @@ WITH durations AS ( )) AS BIGINT) ) AS duration FROM openstack_migrations AS migrations - LEFT JOIN openstack_servers_v2 AS servers ON servers.id = migrations.instance_uuid + LEFT JOIN openstack_servers_v3 AS servers ON servers.id = migrations.instance_uuid LEFT JOIN openstack_flavors_v2 AS flavors ON flavors.name = servers.flavor_name ) SELECT diff --git a/internal/knowledge/extractor/plugins/compute/vm_life_span.sql b/internal/knowledge/extractor/plugins/compute/vm_life_span.sql index 1fad31536..38b8762ba 100644 --- a/internal/knowledge/extractor/plugins/compute/vm_life_span.sql +++ b/internal/knowledge/extractor/plugins/compute/vm_life_span.sql @@ -13,7 +13,7 @@ running_servers AS ( EXTRACT(EPOCH FROM (NOW()::timestamp - servers.created::timestamp))::BIGINT AS duration, COALESCE(flavors.name, 'unknown')::TEXT AS flavor_name, false::BOOLEAN AS deleted - FROM openstack_servers_v2 servers + FROM openstack_servers_v3 servers LEFT JOIN openstack_flavors_v2 flavors ON flavors.name = servers.flavor_name WHERE servers.created IS NOT NULL ) diff --git a/internal/knowledge/extractor/plugins/compute/vrops_hostsystem_resolver.sql b/internal/knowledge/extractor/plugins/compute/vrops_hostsystem_resolver.sql index 8ab0a2c70..21f3104fd 100644 --- a/internal/knowledge/extractor/plugins/compute/vrops_hostsystem_resolver.sql +++ b/internal/knowledge/extractor/plugins/compute/vrops_hostsystem_resolver.sql @@ -3,5 +3,5 @@ SELECT DISTINCT m.hostsystem AS vrops_hostsystem, s.os_ext_srv_attr_host AS nova_compute_host FROM vrops_vm_metrics m -LEFT JOIN openstack_servers_v2 s ON m.instance_uuid = s.id +LEFT JOIN openstack_servers_v3 s ON m.instance_uuid = s.id WHERE s.os_ext_srv_attr_host IS NOT NULL; diff --git a/internal/knowledge/extractor/plugins/compute/vrops_project_noisiness.sql b/internal/knowledge/extractor/plugins/compute/vrops_project_noisiness.sql index 0b0067790..850cbbca1 100644 --- a/internal/knowledge/extractor/plugins/compute/vrops_project_noisiness.sql +++ b/internal/knowledge/extractor/plugins/compute/vrops_project_noisiness.sql @@ -19,7 +19,7 @@ host_cpu_usage AS ( s.tenant_id, h.service_host, AVG(p.avg_cpu) AS avg_cpu_of_project - FROM openstack_servers_v2 s + FROM openstack_servers_v3 s JOIN vrops_vm_metrics m ON s.id = m.instance_uuid JOIN projects_avg_cpu p ON s.tenant_id = p.tenant_id JOIN openstack_hypervisors h ON s.os_ext_srv_attr_hypervisor_hostname = h.hostname diff --git a/internal/scheduling/reservations/commitments/api/change_commitments.go b/internal/scheduling/reservations/commitments/api/change_commitments.go index b7783b599..fb9f17544 100644 --- a/internal/scheduling/reservations/commitments/api/change_commitments.go +++ b/internal/scheduling/reservations/commitments/api/change_commitments.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) @@ -40,6 +41,13 @@ func sortedKeys[K ~string, V any](m map[K]V) []K { return keys } +// crWatch pairs a CRD name with the generation written by the API so the polling loop +// can skip cache reads that have not yet reflected the write (stale-cache guard). +type crWatch struct { + name string + generation int64 +} + // crSnapshot captures a CommittedResource CRD's prior state for batch rollback. // prevSpec is nil when the CRD was newly created (i.e. did not exist before the batch). // wasDeleted is true when the batch operation deleted the CRD; rollback must re-create it. @@ -156,7 +164,7 @@ func (api *HTTPAPI) processCommitmentChanges(ctx context.Context, w http.Respons allowRejection := req.RequiresConfirmation() var ( - toWatch []string // CRD names to poll for terminal conditions (upserts only) + toWatch []crWatch // CRD names + expected generations to poll for terminal conditions (upserts only) snapshots []crSnapshot // ordered list for deterministic rollback failedReason string rollback bool @@ -199,7 +207,7 @@ ProcessLoop: isDelete := commitment.NewStatus.IsNone() crName := "commitment-" + string(commitment.UUID) - logger.V(1).Info("processing commitment", + logger.Info("processing commitment", "commitmentUUID", commitment.UUID, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"), @@ -223,15 +231,20 @@ ProcessLoop: if isDelete { // Limes is removing this commitment; delete the CRD if it exists. snap.wasDeleted = true + snapshots = append(snapshots, snap) if snap.prevSpec != nil { if err := api.client.Delete(ctx, existing); err != nil && !apierrors.IsNotFound(err) { failedReason = fmt.Sprintf("commitment %s: failed to delete CommittedResource CRD: %v", commitment.UUID, err) rollback = true break ProcessLoop } + if err := commitments.DeleteChildReservations(ctx, api.client, existing); err != nil { + failedReason = fmt.Sprintf("commitment %s: failed to delete child reservations: %v", commitment.UUID, err) + rollback = true + break ProcessLoop + } logger.V(1).Info("deleted CommittedResource CRD", "name", crName) } - snapshots = append(snapshots, snap) continue } @@ -258,7 +271,7 @@ ProcessLoop: break ProcessLoop } - toWatch = append(toWatch, crName) + toWatch = append(toWatch, crWatch{name: crName, generation: cr.Generation}) snapshots = append(snapshots, snap) logger.V(1).Info("upserted CommittedResource CRD", "name", crName) } @@ -289,9 +302,9 @@ ProcessLoop: case len(rejected) > 0: var b strings.Builder fmt.Fprintf(&b, "%d commitment(s) failed to apply:", len(rejected)) - for _, crName := range toWatch { // iterate toWatch for deterministic order - if reason, ok := rejected[crName]; ok { - fmt.Fprintf(&b, "\n- commitment %s: %s", strings.TrimPrefix(crName, "commitment-"), reason) + for _, w := range toWatch { // iterate toWatch for deterministic order + if reason, ok := rejected[w.name]; ok { + fmt.Fprintf(&b, "\n- commitment %s: %s", strings.TrimPrefix(w.name, "commitment-"), reason) } } failedReason = b.String() @@ -326,26 +339,32 @@ ProcessLoop: // - Ready=False, Reason=Planned — success; controller reserves capacity at activation time // - Ready=False, Reason=Rejected — failure; reason reported to caller // +// Each entry in watches carries the generation written by the API. The polling loop skips any +// cache read whose generation is older than that value, preventing a stale Ready=True (or +// Ready=False/Rejected) condition from a prior reconcile cycle from being mistaken for the +// outcome of the current write. +// // Returns a map of crName → rejection reason for failed CRDs, and any polling errors (e.g. timeout). func watchCRsUntilReady( ctx context.Context, logger logr.Logger, k8sClient client.Client, - crNames []string, + watches []crWatch, timeout time.Duration, pollInterval time.Duration, ) (rejected map[string]string, errs []error) { - if len(crNames) == 0 { + if len(watches) == 0 { return nil, nil } rejected = make(map[string]string) deadline := time.Now().Add(timeout) - pending := make(map[string]struct{}, len(crNames)) - for _, name := range crNames { - pending[name] = struct{}{} + // pending maps CR name → the minimum generation the cache must show before we trust conditions. + pending := make(map[string]int64, len(watches)) + for _, w := range watches { + pending[w.name] = w.generation } for { @@ -354,23 +373,49 @@ func watchCRsUntilReady( return rejected, errs } - for name := range pending { + for name, expectedGen := range pending { cr := &v1alpha1.CommittedResource{} if err := k8sClient.Get(ctx, types.NamespacedName{Name: name}, cr); err != nil { continue // transient; keep waiting } + // The informer cache may not have caught up with the spec write yet. Until the + // cache reflects at least the generation we wrote, any condition we read belongs + // to an older spec version and must not be treated as terminal. + if cr.Generation < expectedGen { + logger.V(1).Info("cache not yet reflecting write, skipping", + "name", name, + "cacheGeneration", cr.Generation, + "expectedGeneration", expectedGen, + ) + continue + } + cond := meta.FindStatusCondition(cr.Status.Conditions, v1alpha1.CommittedResourceConditionReady) if cond == nil { continue // controller hasn't reconciled yet } + // Skip conditions stamped by a prior reconcile: ObservedGeneration < Generation means + // the condition reflects an older spec version and must not be treated as terminal. + if cond.ObservedGeneration < cr.Generation { + logger.V(1).Info("skipping stale condition on CommittedResource", + "name", name, + "generation", cr.Generation, + "conditionObservedGeneration", cond.ObservedGeneration, + "reason", cond.Reason, + ) + continue + } switch { case cond.Status == metav1.ConditionTrue: + logger.Info("CommittedResource accepted", "name", name) delete(pending, name) case cond.Status == metav1.ConditionFalse && cond.Reason == v1alpha1.CommittedResourceReasonPlanned: + logger.Info("CommittedResource planned (will reserve at activation)", "name", name) delete(pending, name) // planned = accepted; controller will reserve at activation case cond.Status == metav1.ConditionFalse && cond.Reason == v1alpha1.CommittedResourceReasonRejected: + logger.Info("CommittedResource rejected", "name", name, "reason", cond.Message) delete(pending, name) rejected[name] = cond.Message // Reason=Reserving: controller is placing slots; keep waiting. @@ -418,15 +463,21 @@ func rollbackCR(ctx context.Context, logger logr.Logger, k8sClient client.Client return } - cr := &v1alpha1.CommittedResource{} - if err := k8sClient.Get(ctx, types.NamespacedName{Name: snap.crName}, cr); err != nil { - logger.Error(err, "failed to fetch CommittedResource CRD for rollback", "name", snap.crName) - return - } - cr.Spec = *snap.prevSpec - if err := k8sClient.Update(ctx, cr); err != nil { + // The controller may write status (bumping resourceVersion) between our Get and Update. + // RetryOnConflict retries with exponential backoff when that race occurs. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + cr := &v1alpha1.CommittedResource{} + if err := k8sClient.Get(ctx, types.NamespacedName{Name: snap.crName}, cr); err != nil { + return err + } + cr.Spec = *snap.prevSpec + return k8sClient.Update(ctx, cr) + }) + if err != nil { logger.Error(err, "failed to restore CommittedResource CRD spec during rollback", "name", snap.crName) + return } + logger.V(1).Info("restored CommittedResource CRD spec during rollback", "name", snap.crName) } // applyCRSpec writes CommitmentState fields into a CommittedResource CRD spec. diff --git a/internal/scheduling/reservations/commitments/api/change_commitments_e2e_test.go b/internal/scheduling/reservations/commitments/api/change_commitments_e2e_test.go index 3f19b4857..c0be4ad7a 100644 --- a/internal/scheduling/reservations/commitments/api/change_commitments_e2e_test.go +++ b/internal/scheduling/reservations/commitments/api/change_commitments_e2e_test.go @@ -174,7 +174,7 @@ func (e *e2eEnv) driveReconciles(ctx context.Context) { } // reconcileAll drives one round of reconciles: -// 1. CR pass 1 — adds finalizer and creates Reservation CRDs. +// 1. CR pass 1 — creates Reservation CRDs based on current state. // 2. Reservation pass — calls the scheduler, sets TargetHost (first reconcile) then Ready=True (second). // 3. CR pass 2 — re-fetches each CR and picks up Reservation outcomes (placed or rejected). // @@ -225,11 +225,9 @@ func (e *e2eEnv) reconcileAll(ctx context.Context) { // e2eIsTerminalCR returns true for states the API polling loop treats as final: // Accepted (Ready=True), Rejected, or Planned. -// CRs with DeletionTimestamp are never terminal here: they need one more reconcile to remove -// their finalizer (set by the controller on first reconcile) so the fake client can delete them. func e2eIsTerminalCR(cr v1alpha1.CommittedResource) bool { if !cr.DeletionTimestamp.IsZero() { - return false + return true } cond := apimeta.FindStatusCondition(cr.Status.Conditions, v1alpha1.CommittedResourceConditionReady) if cond == nil { @@ -243,7 +241,6 @@ func e2eIsTerminalCR(cr v1alpha1.CommittedResource) bool { } // waitForCRAbsent polls until the named CommittedResource no longer exists or the 1s deadline passes. -// Used after rollback calls because the finalizer removal happens asynchronously in the background reconcile loop. func (e *e2eEnv) waitForCRAbsent(t *testing.T, crName string) { t.Helper() deadline := time.Now().Add(1 * time.Second) diff --git a/internal/scheduling/reservations/commitments/api/change_commitments_test.go b/internal/scheduling/reservations/commitments/api/change_commitments_test.go index a98e840aa..f7fc3d025 100644 --- a/internal/scheduling/reservations/commitments/api/change_commitments_test.go +++ b/internal/scheduling/reservations/commitments/api/change_commitments_test.go @@ -130,6 +130,39 @@ func TestHandleChangeCommitments(t *testing.T) { ExpectedAPIResponse: newAPIResponse("uuid-b: not sufficient capacity"), ExpectedDeletedCRs: []string{"commitment-uuid-a", "commitment-uuid-b"}, }, + // --- Stale condition bug --- + { + // Regression: when an already-accepted CR is updated (e.g. AZ change), the old + // Ready=True condition from the previous reconcile must not cause the polling loop + // to return 200 OK before the controller has processed the new spec. + // NoCondition simulates the real async case where the controller hasn't reconciled yet. + Name: "AZ update on already-accepted CR: stale Ready=True must not cause premature 200 OK", + Flavors: []*TestFlavor{m1Small}, + ExistingCRs: []*TestCR{ + {CommitmentUUID: "uuid-az-stale", State: v1alpha1.CommitmentStatusConfirmed, + AmountMiB: 1024, ProjectID: "project-A", AZ: "az-old", ReadyCondition: true}, + }, + // Fake controller is suppressed: simulates the controller not yet having reconciled + // the updated spec. The only condition on the CR is the stale one from generation 1. + NoCondition: []string{"commitment-uuid-az-stale"}, + CommitmentRequest: newCommitmentRequest("az-new", false, 1234, + createCommitment("hw_version_hana_1_ram", "project-A", "uuid-az-stale", "confirmed", 2)), + CustomConfig: func() *commitments.APIConfig { + cfg := commitments.DefaultAPIConfig() + // Short but non-zero: long enough for the polling loop to read the stale + // condition, short enough that the test doesn't take long. + cfg.WatchTimeout = metav1.Duration{Duration: 50 * time.Millisecond} + cfg.WatchPollInterval = metav1.Duration{Duration: 10 * time.Millisecond} + cfg.FlavorGroupResourceConfig = map[string]commitments.FlavorGroupResourcesConfig{ + "*": {RAM: commitments.ResourceTypeConfig{HandlesCommitments: true, HasCapacity: true}}, + } + return &cfg + }(), + // The polling loop must not trust the stale condition and must time out. + ExpectedAPIResponse: newAPIResponse("timeout reached"), + // CR spec is rolled back on timeout. + ExpectedCRSpecs: map[string]int64{"commitment-uuid-az-stale": 1024 * 1024 * 1024}, + }, // --- Timeout --- { Name: "Timeout: no condition set → rollback and timeout error", @@ -457,6 +490,10 @@ type TestCR struct { AmountMiB int64 ProjectID string AZ string + // ReadyCondition pre-sets Ready=True (Generation=1, ObservedGeneration=1) on the CR to simulate + // a CR that was previously accepted. Use together with NoCondition to test that the polling loop + // does not treat this stale condition as a valid outcome for a subsequent spec update. + ReadyCondition bool } type CommitmentChangeRequest struct { @@ -591,6 +628,9 @@ type fakeControllerClient struct { } func (c *fakeControllerClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if cr, ok := obj.(*v1alpha1.CommittedResource); ok { + cr.Generation = 1 // k8s sets generation=1 on first creation + } if err := c.Client.Create(ctx, obj, opts...); err != nil { return err } @@ -601,6 +641,14 @@ func (c *fakeControllerClient) Create(ctx context.Context, obj client.Object, op } func (c *fakeControllerClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if cr, ok := obj.(*v1alpha1.CommittedResource); ok { + // k8s increments generation on each spec change; simulate that here so the + // polling loop can detect stale conditions from a prior generation. + existing := &v1alpha1.CommittedResource{} + if err := c.Get(ctx, client.ObjectKeyFromObject(cr), existing); err == nil { + cr.Generation = existing.Generation + 1 + } + } if err := c.Client.Update(ctx, obj, opts...); err != nil { return err } @@ -620,36 +668,40 @@ func (c *fakeControllerClient) setConditionFor(ctx context.Context, crName strin return } + cr := &v1alpha1.CommittedResource{} + if err := c.Get(ctx, client.ObjectKey{Name: crName}, cr); err != nil { + return + } + var cond metav1.Condition switch { case !hasOutcome || outcome == "": // Default: controller accepts. cond = metav1.Condition{ - Type: v1alpha1.CommittedResourceConditionReady, - Status: metav1.ConditionTrue, - Reason: v1alpha1.CommittedResourceReasonAccepted, - Message: "accepted", + Type: v1alpha1.CommittedResourceConditionReady, + Status: metav1.ConditionTrue, + Reason: v1alpha1.CommittedResourceReasonAccepted, + Message: "accepted", + ObservedGeneration: cr.Generation, } case outcome == v1alpha1.CommittedResourceReasonPlanned: cond = metav1.Condition{ - Type: v1alpha1.CommittedResourceConditionReady, - Status: metav1.ConditionFalse, - Reason: v1alpha1.CommittedResourceReasonPlanned, - Message: "commitment is not yet active", + Type: v1alpha1.CommittedResourceConditionReady, + Status: metav1.ConditionFalse, + Reason: v1alpha1.CommittedResourceReasonPlanned, + Message: "commitment is not yet active", + ObservedGeneration: cr.Generation, } default: cond = metav1.Condition{ - Type: v1alpha1.CommittedResourceConditionReady, - Status: metav1.ConditionFalse, - Reason: v1alpha1.CommittedResourceReasonRejected, - Message: outcome, + Type: v1alpha1.CommittedResourceConditionReady, + Status: metav1.ConditionFalse, + Reason: v1alpha1.CommittedResourceReasonRejected, + Message: outcome, + ObservedGeneration: cr.Generation, } } - cr := &v1alpha1.CommittedResource{} - if err := c.Get(ctx, client.ObjectKey{Name: crName}, cr); err != nil { - return - } meta.SetStatusCondition(&cr.Status.Conditions, cond) if err := c.Client.Status().Update(ctx, cr); err != nil { return // best-effort: if the update races with another write, the polling loop retries @@ -695,6 +747,13 @@ func newCRTestEnv(t *testing.T, tc CommitmentChangeTestCase) *CRTestEnv { WithScheme(scheme). WithObjects(objects...). WithStatusSubresource(&v1alpha1.CommittedResource{}, &v1alpha1.Knowledge{}). + WithIndex(&v1alpha1.Reservation{}, "spec.committedResourceReservation.commitmentUUID", func(obj client.Object) []string { + res, ok := obj.(*v1alpha1.Reservation) + if !ok || res.Spec.CommittedResourceReservation == nil { + return nil + } + return []string{res.Spec.CommittedResourceReservation.CommitmentUUID} + }). Build() noCondition := make(map[string]struct{}) @@ -830,7 +889,7 @@ func (env *CRTestEnv) VerifyCRAmountBytes(crName string, wantBytes int64) { func (tc *TestCR) toCommittedResource() *v1alpha1.CommittedResource { amount := resource.NewQuantity(tc.AmountMiB*1024*1024, resource.BinarySI) - return &v1alpha1.CommittedResource{ + cr := &v1alpha1.CommittedResource{ ObjectMeta: metav1.ObjectMeta{ Name: "commitment-" + tc.CommitmentUUID, }, @@ -844,6 +903,18 @@ func (tc *TestCR) toCommittedResource() *v1alpha1.CommittedResource { State: tc.State, }, } + if tc.ReadyCondition { + cr.Generation = 1 + meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CommittedResourceConditionReady, + Status: metav1.ConditionTrue, + Reason: v1alpha1.CommittedResourceReasonAccepted, + Message: "accepted", + LastTransitionTime: metav1.Now(), + ObservedGeneration: 1, + }) + } + return cr } // ============================================================================ diff --git a/internal/scheduling/reservations/commitments/api/report_usage_test.go b/internal/scheduling/reservations/commitments/api/report_usage_test.go index 719a7bbb1..09bdd59a1 100644 --- a/internal/scheduling/reservations/commitments/api/report_usage_test.go +++ b/internal/scheduling/reservations/commitments/api/report_usage_test.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log" @@ -427,6 +429,7 @@ type TestVMUsage struct { AZ string Host string CreatedAt time.Time + OSType string // pre-computed os_type, e.g. "windows8Server64Guest" or "unknown" } func newTestVMUsage(uuid string, flavor *TestFlavor, projectID, az, host string, createdAt time.Time) *TestVMUsage { @@ -465,6 +468,7 @@ type ExpectedVMUsage struct { CommitmentID string // Empty string = PAYG MemoryMB uint64 // For verification VideoRAMMiB *uint64 // nil = expect field absent + OSType string // Empty string = skip check } // ============================================================================ @@ -497,6 +501,10 @@ func (m *mockUsageDBClient) addVM(vm *TestVMUsage) { extraSpecs["hw_video:ram_max_mb"] = strconv.FormatUint(*vm.Flavor.VideoRAMMiB, 10) } extrasJSON, _ := json.Marshal(extraSpecs) //nolint:errcheck // test helper, always valid + osType := vm.OSType + if osType == "" { + osType = "unknown" + } row := commitments.VMRow{ ID: vm.UUID, Name: vm.UUID, @@ -509,6 +517,7 @@ func (m *mockUsageDBClient) addVM(vm *TestVMUsage) { FlavorVCPUs: uint64(vm.Flavor.VCPUs), //nolint:gosec FlavorDisk: vm.Flavor.DiskGB, FlavorExtras: string(extrasJSON), + OSType: osType, } m.rows[vm.ProjectID] = append(m.rows[vm.ProjectID], row) } @@ -562,15 +571,36 @@ func newUsageTestEnv( knowledgeCRD := createKnowledgeCRD(flavorGroups) k8sReservations = append(k8sReservations, knowledgeCRD) + // Create CommittedResource CRDs (one per unique commitment). + // The usage reconciler writes assignment results into these; CalculateUsage reads them back. + seenCommitments := make(map[string]bool) + var crObjects []client.Object + for _, tr := range reservations { + if seenCommitments[tr.CommitmentID] { + continue + } + seenCommitments[tr.CommitmentID] = true + crObjects = append(crObjects, tr.toCommittedResourceCRD()) + } + + k8sReservations = append(k8sReservations, crObjects...) k8sClient := fake.NewClientBuilder(). WithScheme(scheme). WithObjects(k8sReservations...). WithStatusSubresource(&v1alpha1.Reservation{}). WithStatusSubresource(&v1alpha1.Knowledge{}). + WithStatusSubresource(&v1alpha1.CommittedResource{}). WithIndex(&v1alpha1.Reservation{}, "spec.type", func(obj client.Object) []string { res := obj.(*v1alpha1.Reservation) return []string{string(res.Spec.Type)} }). + WithIndex(&v1alpha1.CommittedResource{}, "spec.commitmentUUID", func(obj client.Object) []string { + cr, ok := obj.(*v1alpha1.CommittedResource) + if !ok { + return nil + } + return []string{cr.Spec.CommitmentUUID} + }). Build() // Create mock DB client with VMs @@ -579,6 +609,25 @@ func newUsageTestEnv( dbClient.addVM(vm) } + // Run usage reconciler to populate CommittedResource.Status with VM assignments. + // CalculateUsage reads from this status, so the API returns the correct commitment assignments. + if len(crObjects) > 0 { + rec := &commitments.UsageReconciler{ + Client: k8sClient, + Conf: commitments.UsageReconcilerConfig{CooldownInterval: metav1.Duration{Duration: 0}}, + UsageDB: dbClient, + Monitor: commitments.NewUsageReconcilerMonitor(), + } + ctx := context.Background() + for _, obj := range crObjects { + cr := obj.(*v1alpha1.CommittedResource) + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: cr.Name}} + if _, err := rec.Reconcile(ctx, req); err != nil { + t.Fatalf("usage reconciler failed for %s: %v", cr.Name, err) + } + } + } + // Create API with mock DB client api := NewAPIWithConfig(k8sClient, commitments.DefaultAPIConfig(), dbClient) mux := http.NewServeMux() @@ -806,6 +855,12 @@ func verifyUsageReport(t *testing.T, tc UsageReportTestCase, actual liquid.Servi } } + // Verify os_type when specified + if expectedVM.OSType != "" && actualVM.OSType != expectedVM.OSType { + t.Errorf("Resource %s AZ %s VM %s: expected os_type %q, got %q", + instancesResourceName, azName, expectedVM.UUID, expectedVM.OSType, actualVM.OSType) + } + // Assert HWVersion is absent from the serialized output (must not appear per LIQUID schema) if rawFlavor, ok := actualRawVMs[expectedVM.UUID]; ok { if flavorRaw, ok := rawFlavor["flavor"]; ok { @@ -848,6 +903,44 @@ type vmFlavorAttrs struct { // Helper Functions // ============================================================================ +// toCommittedResourceCRD creates a minimal CommittedResource CRD for this commitment. +// Used by the test setup to pre-populate the CR objects that the usage reconciler writes status into. +func (tr *UsageTestReservation) toCommittedResourceCRD() *v1alpha1.CommittedResource { + amount := resource.MustParse(strconv.FormatInt(tr.Flavor.MemoryMB*int64(tr.Count), 10) + "Mi") + spec := v1alpha1.CommittedResourceSpec{ + CommitmentUUID: tr.CommitmentID, + ProjectID: tr.ProjectID, + DomainID: "test-domain", + AvailabilityZone: tr.AZ, + FlavorGroupName: tr.Flavor.Group, + ResourceType: v1alpha1.CommittedResourceTypeMemory, + State: v1alpha1.CommitmentStatusConfirmed, + Amount: amount, + } + if !tr.StartTime.IsZero() { + spec.StartTime = &metav1.Time{Time: tr.StartTime} + } + return &v1alpha1.CommittedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "cr-" + tr.CommitmentID}, + Spec: spec, + Status: v1alpha1.CommittedResourceStatus{ + AcceptedAmount: &amount, + AcceptedSpec: &spec, + // Simulate the CR controller having accepted the current generation (0 for fake client). + // Without this, the usage reconciler's readiness gate blocks usage calculation. + Conditions: []metav1.Condition{ + { + Type: v1alpha1.CommittedResourceConditionReady, + Status: metav1.ConditionTrue, + Reason: v1alpha1.CommittedResourceReasonAccepted, + ObservedGeneration: 0, + LastTransitionTime: metav1.Now(), + }, + }, + }, + } +} + // toK8sReservation converts a UsageTestReservation to a K8s Reservation. func (tr *UsageTestReservation) toK8sReservation(number int) *v1alpha1.Reservation { name := fmt.Sprintf("commitment-%s-%d", tr.CommitmentID, number) diff --git a/internal/scheduling/reservations/commitments/api/usage_test.go b/internal/scheduling/reservations/commitments/api/usage_test.go index 71fed90bb..492048b9d 100644 --- a/internal/scheduling/reservations/commitments/api/usage_test.go +++ b/internal/scheduling/reservations/commitments/api/usage_test.go @@ -19,6 +19,8 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log" @@ -301,6 +303,14 @@ func TestUsageCalculator_ExpiredAndFutureCommitments(t *testing.T) { k8sClient := fake.NewClientBuilder(). WithScheme(scheme). WithObjects(objects...). + WithStatusSubresource(&v1alpha1.CommittedResource{}). + WithIndex(&v1alpha1.CommittedResource{}, "spec.commitmentUUID", func(obj client.Object) []string { + cr, ok := obj.(*v1alpha1.CommittedResource) + if !ok { + return nil + } + return []string{cr.Spec.CommitmentUUID} + }). Build() dbClient := &mockUsageDBClient{ @@ -309,6 +319,66 @@ func TestUsageCalculator_ExpiredAndFutureCommitments(t *testing.T) { }, } + // Create CommittedResource CRDs and run the usage reconciler so that + // CalculateUsage can read pre-computed assignments from CRD status. + seen := make(map[string]bool) + for _, r := range tt.reservations { + if r.Spec.CommittedResourceReservation == nil { + continue + } + uuid := r.Spec.CommittedResourceReservation.CommitmentUUID + if seen[uuid] { + continue + } + seen[uuid] = true + amount := resource.MustParse("4Gi") + spec := v1alpha1.CommittedResourceSpec{ + CommitmentUUID: uuid, + ProjectID: r.Spec.CommittedResourceReservation.ProjectID, + DomainID: "test-domain", + AvailabilityZone: r.Spec.AvailabilityZone, + FlavorGroupName: r.Spec.CommittedResourceReservation.ResourceGroup, + ResourceType: v1alpha1.CommittedResourceTypeMemory, + State: v1alpha1.CommitmentStatusConfirmed, + Amount: amount, + StartTime: r.Spec.StartTime, + EndTime: r.Spec.EndTime, + } + cr := &v1alpha1.CommittedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "cr-" + uuid}, + Spec: spec, + } + if err := k8sClient.Create(ctx, cr); err != nil { + t.Fatalf("failed to create CommittedResource %s: %v", uuid, err) + } + cr.Status = v1alpha1.CommittedResourceStatus{ + AcceptedAmount: &amount, + AcceptedSpec: &spec, + Conditions: []metav1.Condition{ + { + Type: v1alpha1.CommittedResourceConditionReady, + Status: metav1.ConditionTrue, + Reason: v1alpha1.CommittedResourceReasonAccepted, + ObservedGeneration: 0, + LastTransitionTime: metav1.Now(), + }, + }, + } + if err := k8sClient.Status().Update(ctx, cr); err != nil { + t.Fatalf("failed to update CommittedResource status %s: %v", uuid, err) + } + rec := &commitments.UsageReconciler{ + Client: k8sClient, + Conf: commitments.UsageReconcilerConfig{CooldownInterval: metav1.Duration{Duration: 0}}, + UsageDB: dbClient, + Monitor: commitments.NewUsageReconcilerMonitor(), + } + req := ctrl.Request{NamespacedName: types.NamespacedName{Name: cr.Name}} + if _, err := rec.Reconcile(ctx, req); err != nil { + t.Fatalf("usage reconciler failed for %s: %v", uuid, err) + } + } + calc := commitments.NewUsageCalculator(k8sClient, dbClient) logger := log.FromContext(ctx) report, err := calc.CalculateUsage(ctx, logger, tt.projectID, tt.allAZs) diff --git a/internal/scheduling/reservations/commitments/committed_resource_controller.go b/internal/scheduling/reservations/commitments/committed_resource_controller.go index 2389440e3..e24e892a5 100644 --- a/internal/scheduling/reservations/commitments/committed_resource_controller.go +++ b/internal/scheduling/reservations/commitments/committed_resource_controller.go @@ -6,6 +6,7 @@ package commitments import ( "context" "fmt" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" @@ -15,7 +16,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "github.com/cobaltcore-dev/cortex/api/v1alpha1" @@ -23,7 +23,13 @@ import ( "github.com/cobaltcore-dev/cortex/pkg/multicluster" ) -const crFinalizer = "committed-resource.reservations.cortex.cloud/cleanup" +const ( + // maxConsecutiveFailuresForSlowdown is the ConsecutiveFailures threshold above which the + // Reservation watch stops re-enqueuing this CR. Without this guard, a broken rollback creates + // reservation churn that bypasses RequeueAfter and keeps the controller in a tight retry loop. + // The CR is still re-enqueued by RequeueAfter (with exponential backoff) and by spec changes. + maxConsecutiveFailuresForSlowdown = 10 +) // CommittedResourceController reconciles CommittedResource CRDs and owns all child Reservation CRUD. type CommittedResourceController struct { @@ -49,14 +55,6 @@ func (r *CommittedResourceController) Reconcile(ctx context.Context, req ctrl.Re ) if !cr.DeletionTimestamp.IsZero() { - return r.reconcileDeletion(ctx, logger, &cr) - } - - if !controllerutil.ContainsFinalizer(&cr, crFinalizer) { - controllerutil.AddFinalizer(&cr, crFinalizer) - if err := r.Update(ctx, &cr); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to add finalizer: %w", err) - } return ctrl.Result{}, nil } @@ -78,8 +76,21 @@ func (r *CommittedResourceController) Reconcile(ctx context.Context, req ctrl.Re // reconcilePending handles a confirmation attempt (Limes state: pending). // If AllowRejection=true (API path), placement failure marks the CR Rejected so the HTTP API // can report the outcome back to Limes. If AllowRejection=false (syncer path), the controller -// retries indefinitely — Limes does not require confirmation for these transitions. +// retries with exponential backoff — Limes does not require confirmation for these transitions. func (r *CommittedResourceController) reconcilePending(ctx context.Context, logger logr.Logger, cr *v1alpha1.CommittedResource) (ctrl.Result, error) { + logger.Info("reconciling pending resource", + "generation", cr.Generation, + "az", cr.Spec.AvailabilityZone, + "amount", cr.Spec.Amount.String(), + "allowRejection", cr.Spec.AllowRejection, + ) + // If this spec generation was already rejected, don't re-apply. + // Without this guard the controller oscillates: apply bad spec → delete reservations → + // Reservation watch re-enqueues → apply bad spec again → loop. + if isRejectedForGeneration(cr) { + logger.V(1).Info("spec already rejected for current generation", "generation", cr.Generation) + return ctrl.Result{}, nil + } result, applyErr := r.applyReservationState(ctx, logger, cr) if applyErr != nil { if cr.Spec.AllowRejection { @@ -89,8 +100,9 @@ func (r *CommittedResourceController) reconcilePending(ctx context.Context, logg } return ctrl.Result{}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonRejected, applyErr.Error()) } - logger.Error(applyErr, "pending commitment placement failed, will retry", "requeueAfter", r.Conf.RequeueIntervalRetry.Duration) - return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalRetry.Duration}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, applyErr.Error()) + delay := r.retryDelay(cr) + logger.Error(applyErr, "pending commitment placement failed, will retry", "requeueAfter", delay) + return ctrl.Result{RequeueAfter: delay}, r.setNotReadyRetry(ctx, cr, applyErr.Error()) } allReady, anyFailed, failReason, err := r.checkChildReservationStatus(ctx, cr, result.TotalSlots) if err != nil { @@ -104,17 +116,32 @@ func (r *CommittedResourceController) reconcilePending(ctx context.Context, logg } return ctrl.Result{}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonRejected, failReason) } - logger.Info("pending commitment placement failed, will retry", "reason", failReason, "requeueAfter", r.Conf.RequeueIntervalRetry.Duration) - return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalRetry.Duration}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, failReason) + delay := r.retryDelay(cr) + logger.Info("pending commitment placement failed, will retry", "reason", failReason, "requeueAfter", delay) + return ctrl.Result{RequeueAfter: delay}, r.setNotReadyRetry(ctx, cr, failReason) } if !allReady { // Reservation controller hasn't processed all slots yet; Reservation watch will re-enqueue. return ctrl.Result{}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, "waiting for reservation placement") } + logger.Info("committed resource accepted", "generation", cr.Generation, "amount", cr.Spec.Amount.String()) return ctrl.Result{}, r.setAccepted(ctx, cr) } func (r *CommittedResourceController) reconcileCommitted(ctx context.Context, logger logr.Logger, cr *v1alpha1.CommittedResource) (ctrl.Result, error) { + logger.Info("reconciling committed resource", + "generation", cr.Generation, + "az", cr.Spec.AvailabilityZone, + "amount", cr.Spec.Amount.String(), + "allowRejection", cr.Spec.AllowRejection, + ) + // If this spec generation was already rejected, maintain rollback state without re-applying. + // Without this guard the controller oscillates: apply bad spec → rollback → + // Reservation watch re-enqueues → apply bad spec again → loop. + if isRejectedForGeneration(cr) { + logger.V(1).Info("spec already rejected for current generation, maintaining rollback state", "generation", cr.Generation) + return ctrl.Result{}, r.rollbackToAccepted(ctx, logger, cr) + } // Spec errors are permanent regardless of AllowRejection — a bad spec won't fix itself. if _, err := FromCommittedResource(*cr); err != nil { logger.Error(err, "invalid commitment spec, rejecting") @@ -123,14 +150,15 @@ func (r *CommittedResourceController) reconcileCommitted(ctx context.Context, lo result, applyErr := r.applyReservationState(ctx, logger, cr) if applyErr != nil { if cr.Spec.AllowRejection { - logger.Error(applyErr, "committed placement failed, rolling back to accepted amount") + logger.Error(applyErr, "committed placement failed, rolling back to accepted spec") if rollbackErr := r.rollbackToAccepted(ctx, logger, cr); rollbackErr != nil { return ctrl.Result{}, rollbackErr } return ctrl.Result{}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonRejected, applyErr.Error()) } - logger.Error(applyErr, "committed placement incomplete, will retry", "requeueAfter", r.Conf.RequeueIntervalRetry.Duration) - return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalRetry.Duration}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, applyErr.Error()) + delay := r.retryDelay(cr) + logger.Error(applyErr, "committed placement incomplete, will retry", "requeueAfter", delay) + return ctrl.Result{RequeueAfter: delay}, r.setNotReadyRetry(ctx, cr, applyErr.Error()) } allReady, anyFailed, failReason, err := r.checkChildReservationStatus(ctx, cr, result.TotalSlots) if err != nil { @@ -138,19 +166,21 @@ func (r *CommittedResourceController) reconcileCommitted(ctx context.Context, lo } if anyFailed { if cr.Spec.AllowRejection { - logger.Info("committed placement failed, rolling back to accepted amount", "reason", failReason) + logger.Info("committed placement failed, rolling back to accepted spec", "reason", failReason) if rollbackErr := r.rollbackToAccepted(ctx, logger, cr); rollbackErr != nil { return ctrl.Result{}, rollbackErr } return ctrl.Result{}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonRejected, failReason) } - logger.Info("committed placement failed, will retry", "reason", failReason, "requeueAfter", r.Conf.RequeueIntervalRetry.Duration) - return ctrl.Result{RequeueAfter: r.Conf.RequeueIntervalRetry.Duration}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, failReason) + delay := r.retryDelay(cr) + logger.Info("committed placement failed, will retry", "reason", failReason, "requeueAfter", delay) + return ctrl.Result{RequeueAfter: delay}, r.setNotReadyRetry(ctx, cr, failReason) } if !allReady { // Reservation controller hasn't processed all slots yet; Reservation watch will re-enqueue. return ctrl.Result{}, r.setNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, "waiting for reservation placement") } + logger.Info("committed resource accepted", "generation", cr.Generation, "amount", cr.Spec.Amount.String()) return ctrl.Result{}, r.setAccepted(ctx, cr) } @@ -231,7 +261,10 @@ func (r *CommittedResourceController) setAccepted(ctx context.Context, cr *v1alp now := metav1.Now() old := cr.DeepCopy() acceptedAmount := cr.Spec.Amount.DeepCopy() + specCopy := cr.Spec.DeepCopy() cr.Status.AcceptedAmount = &acceptedAmount + cr.Status.AcceptedSpec = specCopy + cr.Status.ConsecutiveFailures = 0 cr.Status.AcceptedAt = &now meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{ Type: v1alpha1.CommittedResourceConditionReady, @@ -239,6 +272,7 @@ func (r *CommittedResourceController) setAccepted(ctx context.Context, cr *v1alp Reason: v1alpha1.CommittedResourceReasonAccepted, Message: "commitment successfully reserved", LastTransitionTime: now, + ObservedGeneration: cr.Generation, }) if err := r.Status().Patch(ctx, cr, client.MergeFrom(old)); err != nil { return client.IgnoreNotFound(err) @@ -254,23 +288,17 @@ func (r *CommittedResourceController) reconcileInactive(ctx context.Context, log return ctrl.Result{}, r.setNotReady(ctx, cr, string(cr.Spec.State), "commitment is no longer active") } -func (r *CommittedResourceController) reconcileDeletion(ctx context.Context, logger logr.Logger, cr *v1alpha1.CommittedResource) (ctrl.Result, error) { - if err := r.deleteChildReservations(ctx, cr); err != nil { - return ctrl.Result{}, err - } - controllerutil.RemoveFinalizer(cr, crFinalizer) - if err := r.Update(ctx, cr); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - logger.Info("committed resource deleted, child reservations cleaned up") - return ctrl.Result{}, nil -} - // deleteChildReservations deletes all Reservation CRDs owned by this CommittedResource, // identified by matching CommitmentUUID in the reservation spec. func (r *CommittedResourceController) deleteChildReservations(ctx context.Context, cr *v1alpha1.CommittedResource) error { + return DeleteChildReservations(ctx, r.Client, cr) +} + +// DeleteChildReservations deletes all Reservation CRDs belonging to cr, matched by CommitmentUUID. +// Called both by the controller on inactive/rollback transitions and by the API handler on CR deletion. +func DeleteChildReservations(ctx context.Context, k8sClient client.Client, cr *v1alpha1.CommittedResource) error { var list v1alpha1.ReservationList - if err := r.List(ctx, &list, + if err := k8sClient.List(ctx, &list, client.MatchingLabels{v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource}, client.MatchingFields{idxReservationByCommitmentUUID: cr.Spec.CommitmentUUID}, ); err != nil { @@ -278,33 +306,48 @@ func (r *CommittedResourceController) deleteChildReservations(ctx context.Contex } for i := range list.Items { res := &list.Items[i] - if err := r.Delete(ctx, res); client.IgnoreNotFound(err) != nil { + if err := k8sClient.Delete(ctx, res); client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to delete reservation %s: %w", res.Name, err) } } return nil } -// rollbackToAccepted restores child Reservations to match Status.AcceptedAmount. -// If AcceptedAmount is nil (new CR that was never accepted), all child Reservations are deleted. +// rollbackToAccepted restores child Reservations to match Status.AcceptedSpec. +// AcceptedSpec is a full snapshot of the spec at the last successful reconcile, so rollback always +// targets the correct AZ, amount, project, domain — even when the current spec has been mutated. +// Falls back to AcceptedAmount + current spec fields for CRs accepted before AcceptedSpec existed. +// If neither is set (CR was never accepted), all child Reservations are deleted. func (r *CommittedResourceController) rollbackToAccepted(ctx context.Context, logger logr.Logger, cr *v1alpha1.CommittedResource) error { - if cr.Status.AcceptedAmount == nil { + if cr.Status.AcceptedSpec == nil && cr.Status.AcceptedAmount == nil { return r.deleteChildReservations(ctx, cr) } knowledge := &reservations.FlavorGroupKnowledgeClient{Client: r.Client} flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil) if err != nil { // Can't compute the rollback target — fall back to full delete rather than leaving - // a partial state that's inconsistent with the unknown AcceptedAmount. + // a partial state that's inconsistent with the unknown accepted state. logger.Error(err, "flavor knowledge unavailable during rollback, deleting all child reservations") return r.deleteChildReservations(ctx, cr) } - state, err := FromCommittedResource(*cr) + + var state *CommitmentState + if cr.Status.AcceptedSpec != nil { + // Use the full accepted spec snapshot: ensures rollback targets the exact previously-accepted + // placement (AZ, amount, project, domain) even if the current spec has been mutated. + tempCR := v1alpha1.CommittedResource{Spec: *cr.Status.AcceptedSpec} + state, err = FromCommittedResource(tempCR) + } else { + // AcceptedSpec not yet populated (CR accepted before this field existed). + // We cannot determine the previous placement, so delete all child reservations. + // The controller will recreate them on the next reconcile using the current spec. + logger.Info("AcceptedSpec missing during rollback, deleting child reservations for controller repair") + return r.deleteChildReservations(ctx, cr) + } if err != nil { logger.Error(err, "invalid spec during rollback, deleting all child reservations") return r.deleteChildReservations(ctx, cr) } - state.TotalMemoryBytes = cr.Status.AcceptedAmount.Value() state.NamePrefix = cr.Name + "-" state.CreatorRequestID = reservations.GlobalRequestIDFromContext(ctx) state.ParentGeneration = cr.Generation @@ -314,15 +357,57 @@ func (r *CommittedResourceController) rollbackToAccepted(ctx context.Context, lo return nil } +// isRejectedForGeneration returns true when the CR's Ready condition is already Rejected +// for the current spec generation. Used to short-circuit re-applying a spec that was +// already tried and rejected in a previous reconcile cycle. +func isRejectedForGeneration(cr *v1alpha1.CommittedResource) bool { + cond := meta.FindStatusCondition(cr.Status.Conditions, v1alpha1.CommittedResourceConditionReady) + return cond != nil && + cond.Status == metav1.ConditionFalse && + cond.Reason == v1alpha1.CommittedResourceReasonRejected && + cond.ObservedGeneration == cr.Generation +} + +// retryDelay computes an exponential backoff interval for the AllowRejection=false retry paths. +// Uses the pre-increment ConsecutiveFailures value: on the first failure (failures=0) the delay is +// base * 2^0 = base; setNotReadyRetry increments to 1 afterwards, so the second failure yields 2*base. +// The delay is capped at MaxRequeueInterval. +func (r *CommittedResourceController) retryDelay(cr *v1alpha1.CommittedResource) time.Duration { + base := r.Conf.RequeueIntervalRetry.Duration + exp := cr.Status.ConsecutiveFailures + if exp > 6 { + exp = 6 // overflow guard: 2^6 = 64 fits safely in uint; MaxRequeueInterval caps the actual duration + } + delay := base * time.Duration(uint64(1)< 0 && delay > maxDelay { + return maxDelay + } + return delay +} + // setNotReady patches Ready=False on CommittedResource status. func (r *CommittedResourceController) setNotReady(ctx context.Context, cr *v1alpha1.CommittedResource, reason, message string) error { + return r.patchNotReady(ctx, cr, reason, message, false) +} + +// setNotReadyRetry increments ConsecutiveFailures and patches Ready=False/Reserving. +// Use this in the AllowRejection=false retry paths so the failure counter drives backoff. +func (r *CommittedResourceController) setNotReadyRetry(ctx context.Context, cr *v1alpha1.CommittedResource, message string) error { + return r.patchNotReady(ctx, cr, v1alpha1.CommittedResourceReasonReserving, message, true) +} + +func (r *CommittedResourceController) patchNotReady(ctx context.Context, cr *v1alpha1.CommittedResource, reason, message string, countFailure bool) error { old := cr.DeepCopy() + if countFailure { + cr.Status.ConsecutiveFailures++ + } meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{ Type: v1alpha1.CommittedResourceConditionReady, Status: metav1.ConditionFalse, Reason: reason, Message: message, LastTransitionTime: metav1.Now(), + ObservedGeneration: cr.Generation, }) if err := r.Status().Patch(ctx, cr, client.MergeFrom(old)); err != nil { return client.IgnoreNotFound(err) @@ -333,8 +418,11 @@ func (r *CommittedResourceController) setNotReady(ctx context.Context, cr *v1alp // SetupWithManager sets up the controller with the Manager. func (r *CommittedResourceController) SetupWithManager(mgr ctrl.Manager, mcl *multicluster.Client) error { ctx := context.Background() - if err := IndexFields(ctx, mcl); err != nil { - return fmt.Errorf("failed to set up field indexes: %w", err) + if err := indexReservationByCommitmentUUID(ctx, mcl); err != nil { + return fmt.Errorf("failed to set up reservation field index: %w", err) + } + if err := indexCommittedResourceByUUID(ctx, mcl); err != nil { + return fmt.Errorf("failed to set up committed resource field index: %w", err) } bldr := multicluster.BuildController(mcl, mgr) @@ -347,6 +435,9 @@ func (r *CommittedResourceController) SetupWithManager(mgr ctrl.Manager, mcl *mu return err } // Re-enqueue the parent CommittedResource when a child Reservation changes (e.g. external deletion). + // Suppressed when ConsecutiveFailures is high: a broken rollback creates reservation churn that + // would bypass RequeueAfter and keep the controller in a tight loop. The Reservation watch is the + // fast path for normal "waiting for placement" transitions; the RequeueAfter backoff handles retry. bldr, err = bldr.WatchesMulticluster( &v1alpha1.Reservation{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []ctrl.Request { @@ -363,7 +454,16 @@ func (r *CommittedResourceController) SetupWithManager(mgr ctrl.Manager, mcl *mu if len(crList.Items) == 0 { return nil } - return []ctrl.Request{{NamespacedName: types.NamespacedName{Name: crList.Items[0].Name}}} + cr := &crList.Items[0] + // Suppress fast-path re-enqueues only when the reservation belongs to the current + // generation AND failures are high. A new spec (higher generation) gets a fresh start. + if cr.Status.ConsecutiveFailures >= maxConsecutiveFailuresForSlowdown && + res.Spec.CommittedResourceReservation.ParentGeneration == cr.Generation { + LoggerFromContext(ctx).V(1).Info("placement failures exceeded threshold, watch re-enqueues suppressed — retrying via backoff timer only", + "name", cr.Name, "consecutiveFailures", cr.Status.ConsecutiveFailures, "threshold", maxConsecutiveFailuresForSlowdown) + return nil + } + return []ctrl.Request{{NamespacedName: types.NamespacedName{Name: cr.Name}}} }), ) if err != nil { diff --git a/internal/scheduling/reservations/commitments/committed_resource_controller_test.go b/internal/scheduling/reservations/commitments/committed_resource_controller_test.go index 1029ec997..cf919d267 100644 --- a/internal/scheduling/reservations/commitments/committed_resource_controller_test.go +++ b/internal/scheduling/reservations/commitments/committed_resource_controller_test.go @@ -28,13 +28,10 @@ import ( // ============================================================================ // newTestCommittedResource returns a CommittedResource with sensible defaults. -// The finalizer is pre-populated so tests can call Reconcile once without a -// separate finalizer-add round-trip. func newTestCommittedResource(name string, state v1alpha1.CommitmentStatus) *v1alpha1.CommittedResource { return &v1alpha1.CommittedResource{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Finalizers: []string{crFinalizer}, + Name: name, }, Spec: v1alpha1.CommittedResourceSpec{ CommitmentUUID: "test-uuid-1234", @@ -286,6 +283,12 @@ func TestCommittedResourceController_Reconcile(t *testing.T) { if updated.Status.AcceptedAmount == nil { t.Errorf("expected AcceptedAmount to be set on acceptance") } + if updated.Status.AcceptedSpec == nil { + t.Errorf("expected AcceptedSpec to be set on acceptance") + } else if updated.Status.AcceptedSpec.AvailabilityZone != cr.Spec.AvailabilityZone { + t.Errorf("AcceptedSpec.AvailabilityZone: want %q, got %q", + cr.Spec.AvailabilityZone, updated.Status.AcceptedSpec.AvailabilityZone) + } } }) } @@ -410,11 +413,13 @@ func TestCommittedResourceController_PlacementFailure(t *testing.T) { func TestCommittedResourceController_Rollback(t *testing.T) { scheme := newCRTestScheme(t) - // CR at generation 2; AcceptedAmount reflects what was accepted at generation 1. + // CR at generation 2; AcceptedSpec and AcceptedAmount reflect what was accepted at generation 1. cr := newTestCommittedResource("test-cr", v1alpha1.CommitmentStatusConfirmed) cr.Generation = 2 accepted := resource.MustParse("4Gi") cr.Status.AcceptedAmount = &accepted + acceptedSpec := cr.Spec + cr.Status.AcceptedSpec = &acceptedSpec // Existing reservation with stale ParentGeneration from the previous generation. existing := &v1alpha1.Reservation{ @@ -458,12 +463,290 @@ func TestCommittedResourceController_Rollback(t *testing.T) { } } +// TestCommittedResourceController_RollbackUsesAcceptedSpecAZ verifies that rollbackToAccepted +// targets the AZ from AcceptedSpec, not from the current (mutated) Spec. This is the core fix +// for the oscillation bug where a failed AZ change left the CR stuck placing reservations in +// the wrong AZ on every retry. +func TestCommittedResourceController_RollbackUsesAcceptedSpecAZ(t *testing.T) { + scheme := newCRTestScheme(t) + + // Spec has been mutated to a new AZ that failed placement. + cr := newTestCommittedResource("test-cr", v1alpha1.CommitmentStatusConfirmed) + cr.Spec.AvailabilityZone = "new-az" // the failed AZ + cr.Generation = 2 + + accepted := resource.MustParse("4Gi") + acceptedSpec := cr.Spec + acceptedSpec.AvailabilityZone = "accepted-az" // last successfully placed AZ + cr.Status.AcceptedAmount = &accepted + cr.Status.AcceptedSpec = &acceptedSpec + + // Existing reservation was placed in the wrong AZ by the failed rollback. + existing := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cr-0", + Labels: map[string]string{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }, + }, + Spec: v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + SchedulingDomain: v1alpha1.SchedulingDomainNova, + AvailabilityZone: "new-az", // wrong AZ + Resources: map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceMemory: resource.MustParse("4Gi"), + hv1.ResourceCPU: resource.MustParse("2"), + }, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + CommitmentUUID: "test-uuid-1234", + ProjectID: "test-project", + DomainID: "test-domain", + ResourceGroup: "test-group", + ParentGeneration: 2, + }, + }, + } + + k8sClient := newCRTestClient(scheme, cr, existing, newTestFlavorKnowledge()) + controller := &CommittedResourceController{Client: k8sClient, Scheme: scheme, Conf: CommittedResourceControllerConfig{}} + + if err := controller.rollbackToAccepted(context.Background(), logr.Discard(), cr); err != nil { + t.Fatalf("rollbackToAccepted: %v", err) + } + + // The reservation manager deletes the wrong-AZ reservation and creates a new one + // with the accepted AZ from AcceptedSpec. + var list v1alpha1.ReservationList + if err := k8sClient.List(context.Background(), &list, client.MatchingLabels{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }); err != nil { + t.Fatalf("list reservations: %v", err) + } + if len(list.Items) != 1 { + t.Fatalf("expected 1 reservation after rollback, got %d", len(list.Items)) + } + if got := list.Items[0].Spec.AvailabilityZone; got != "accepted-az" { + t.Errorf("rollback: reservation AZ: want %q (from AcceptedSpec), got %q (wrong: from current Spec)", "accepted-az", got) + } +} + +// TestCommittedResourceController_RollbackNilAcceptedSpec verifies that when AcceptedSpec is +// absent (pre-dates the field), rollbackToAccepted deletes child reservations rather than +// attempting a rollback with stale/wrong placement data. The controller repairs state on +// the next reconcile via ApplyCommitmentState. +func TestCommittedResourceController_RollbackNilAcceptedSpec(t *testing.T) { + scheme := newCRTestScheme(t) + + cr := newTestCommittedResource("test-cr", v1alpha1.CommitmentStatusConfirmed) + cr.Generation = 2 + accepted := resource.MustParse("4Gi") + cr.Status.AcceptedAmount = &accepted + // AcceptedSpec intentionally nil — simulates a CR accepted before the field existed. + + existing := &v1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cr-0", + Labels: map[string]string{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }, + }, + Spec: v1alpha1.ReservationSpec{ + Type: v1alpha1.ReservationTypeCommittedResource, + CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ + CommitmentUUID: "test-uuid-1234", + ProjectID: "test-project", + }, + }, + } + + k8sClient := newCRTestClient(scheme, cr, existing, newTestFlavorKnowledge()) + controller := &CommittedResourceController{Client: k8sClient, Scheme: scheme, Conf: CommittedResourceControllerConfig{}} + + if err := controller.rollbackToAccepted(context.Background(), logr.Discard(), cr); err != nil { + t.Fatalf("rollbackToAccepted: %v", err) + } + + var list v1alpha1.ReservationList + if err := k8sClient.List(context.Background(), &list, client.MatchingLabels{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }); err != nil { + t.Fatalf("list reservations: %v", err) + } + if len(list.Items) != 0 { + t.Errorf("expected all reservations deleted when AcceptedSpec is nil, got %d", len(list.Items)) + } +} + +// increments on each placement failure (AllowRejection=false) and resets to 0 on acceptance. +// It also checks that the retry delay grows with each failure. +// TestCommittedResourceController_RejectedStaysRejected verifies that a CR rejected on one +// reconcile cycle stays rejected on subsequent cycles triggered by Reservation watch events, +// without re-applying the bad spec. This is the oscillation regression test: without the +// isRejectedForGeneration guard the controller would re-apply the bad spec on every +// Reservation watch re-enqueue, undoing the rollback each time. +func TestCommittedResourceController_RejectedStaysRejected(t *testing.T) { + tests := []struct { + name string + state v1alpha1.CommitmentStatus + }{ + {name: "confirmed", state: v1alpha1.CommitmentStatusConfirmed}, + {name: "pending", state: v1alpha1.CommitmentStatusPending}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := newCRTestScheme(t) + + // CR was previously accepted at AZ "accepted-az". + cr := newTestCommittedResource("test-cr", tt.state) + cr.Spec.AllowRejection = true + cr.Spec.AvailabilityZone = "bad-az" // spec was mutated to a failing AZ + cr.Generation = 2 + accepted := resource.MustParse("4Gi") + acceptedSpec := cr.Spec.DeepCopy() + acceptedSpec.AvailabilityZone = "accepted-az" + cr.Status.AcceptedAmount = &accepted + cr.Status.AcceptedSpec = acceptedSpec + + // No Knowledge → placement always fails. + k8sClient := newCRTestClient(scheme, cr) + controller := &CommittedResourceController{Client: k8sClient, Scheme: scheme, Conf: CommittedResourceControllerConfig{}} + + // Reconcile 1: applies bad spec → fails → rollback + Rejected. + if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil { + t.Fatalf("reconcile 1: %v", err) + } + assertCondition(t, k8sClient, cr.Name, metav1.ConditionFalse, v1alpha1.CommittedResourceReasonRejected) + + // Reconcile 2: simulates Reservation watch re-enqueue after rollback. + // Must stay Rejected without re-applying the bad spec. + if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil { + t.Fatalf("reconcile 2: %v", err) + } + assertCondition(t, k8sClient, cr.Name, metav1.ConditionFalse, v1alpha1.CommittedResourceReasonRejected) + + // Reconcile 3: another watch event — still stable. + if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil { + t.Fatalf("reconcile 3: %v", err) + } + assertCondition(t, k8sClient, cr.Name, metav1.ConditionFalse, v1alpha1.CommittedResourceReasonRejected) + + // For committed state: rollback reservations should be in accepted-az, not bad-az. + if tt.state == v1alpha1.CommitmentStatusConfirmed { + var list v1alpha1.ReservationList + if err := k8sClient.List(context.Background(), &list, client.MatchingLabels{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }); err != nil { + t.Fatalf("list reservations: %v", err) + } + for _, res := range list.Items { + if res.Spec.AvailabilityZone == "bad-az" { + t.Errorf("rollback reservation still points to bad-az after %d reconciles — oscillation not fixed", 3) + } + } + } + }) + } +} + +func TestCommittedResourceController_ConsecutiveFailures(t *testing.T) { + scheme := newCRTestScheme(t) + cr := newTestCommittedResource("test-cr", v1alpha1.CommitmentStatusConfirmed) + cr.Spec.AllowRejection = false + base := 30 * time.Second + k8sClient := newCRTestClient(scheme, cr) // no Knowledge → placement fails + controller := &CommittedResourceController{ + Client: k8sClient, + Scheme: scheme, + Conf: CommittedResourceControllerConfig{RequeueIntervalRetry: metav1.Duration{Duration: base}}, + } + + getCR := func() v1alpha1.CommittedResource { + t.Helper() + var updated v1alpha1.CommittedResource + if err := k8sClient.Get(context.Background(), types.NamespacedName{Name: cr.Name}, &updated); err != nil { + t.Fatalf("get CR: %v", err) + } + return updated + } + + // First failure: ConsecutiveFailures 0→1, delay = base * 2^0. + result1, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)) + if err != nil { + t.Fatalf("reconcile 1: %v", err) + } + if got := getCR().Status.ConsecutiveFailures; got != 1 { + t.Errorf("after failure 1: ConsecutiveFailures want 1, got %d", got) + } + if result1.RequeueAfter != base { + t.Errorf("after failure 1: RequeueAfter want %v, got %v", base, result1.RequeueAfter) + } + + // Second failure: ConsecutiveFailures 1→2, delay = base * 2^1. + result2, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)) + if err != nil { + t.Fatalf("reconcile 2: %v", err) + } + if got := getCR().Status.ConsecutiveFailures; got != 2 { + t.Errorf("after failure 2: ConsecutiveFailures want 2, got %d", got) + } + if result2.RequeueAfter != 2*base { + t.Errorf("after failure 2: RequeueAfter want %v, got %v", 2*base, result2.RequeueAfter) + } + + // Add Knowledge so placement succeeds; simulate reservation controller marking ready. + if err := k8sClient.Create(context.Background(), newTestFlavorKnowledge()); err != nil { + t.Fatalf("create knowledge: %v", err) + } + if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil { + t.Fatalf("reconcile 3 (apply): %v", err) + } + setChildReservationsReady(t, k8sClient, cr.Spec.CommitmentUUID) + if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil { + t.Fatalf("reconcile 4 (accept): %v", err) + } + + // Acceptance resets ConsecutiveFailures to 0. + if got := getCR().Status.ConsecutiveFailures; got != 0 { + t.Errorf("after acceptance: ConsecutiveFailures want 0, got %d", got) + } +} + +func TestRetryDelay(t *testing.T) { + base := 30 * time.Second + maxDelay := 30 * time.Minute + controller := &CommittedResourceController{ + Conf: CommittedResourceControllerConfig{ + RequeueIntervalRetry: metav1.Duration{Duration: base}, + MaxRequeueInterval: metav1.Duration{Duration: maxDelay}, + }, + } + tests := []struct { + failures int32 + want time.Duration + }{ + {0, 30 * time.Second}, // base * 2^0 + {1, 60 * time.Second}, // base * 2^1 + {2, 2 * time.Minute}, // base * 2^2 + {5, 16 * time.Minute}, // base * 2^5 = 960s + {6, 30 * time.Minute}, // base * 2^6 = 1920s → capped at 30min + {10, 30 * time.Minute}, // exp capped at 6, still 30min + } + for _, tt := range tests { + cr := &v1alpha1.CommittedResource{ + Status: v1alpha1.CommittedResourceStatus{ConsecutiveFailures: tt.failures}, + } + if got := controller.retryDelay(cr); got != tt.want { + t.Errorf("failures=%d: want %v, got %v", tt.failures, tt.want, got) + } + } +} + func TestCommittedResourceController_BadSpec(t *testing.T) { scheme := newCRTestScheme(t) cr := &v1alpha1.CommittedResource{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-cr", - Finalizers: []string{crFinalizer}, + Name: "test-cr", }, Spec: v1alpha1.CommittedResourceSpec{ CommitmentUUID: "x", // too short, fails commitmentUUIDPattern @@ -593,39 +876,3 @@ func TestCheckChildReservationStatus_GenerationGuard(t *testing.T) { }) } } - -func TestCommittedResourceController_Deletion(t *testing.T) { - scheme := newCRTestScheme(t) - cr := newTestCommittedResource("test-cr", v1alpha1.CommitmentStatusConfirmed) - child := &v1alpha1.Reservation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cr-0", - Labels: map[string]string{ - v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, - }, - }, - Spec: v1alpha1.ReservationSpec{ - Type: v1alpha1.ReservationTypeCommittedResource, - CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ - CommitmentUUID: "test-uuid-1234", - }, - }, - } - k8sClient := newCRTestClient(scheme, cr, child) - controller := &CommittedResourceController{Client: k8sClient, Scheme: scheme, Conf: CommittedResourceControllerConfig{}} - - if err := k8sClient.Delete(context.Background(), cr); err != nil { - t.Fatalf("delete CR: %v", err) - } - if _, err := controller.Reconcile(context.Background(), reconcileReq(cr.Name)); err != nil { - t.Fatalf("reconcile: %v", err) - } - - if got := countChildReservations(t, k8sClient, cr.Spec.CommitmentUUID); got != 0 { - t.Errorf("expected 0 child reservations after deletion, got %d", got) - } - var deleted v1alpha1.CommittedResource - if err := k8sClient.Get(context.Background(), types.NamespacedName{Name: cr.Name}, &deleted); err == nil { - t.Errorf("expected CR to be gone after deletion, but it still exists with finalizers=%v", deleted.Finalizers) - } -} diff --git a/internal/scheduling/reservations/commitments/config.go b/internal/scheduling/reservations/commitments/config.go index c30c87953..5524d0a65 100644 --- a/internal/scheduling/reservations/commitments/config.go +++ b/internal/scheduling/reservations/commitments/config.go @@ -15,13 +15,37 @@ import ( type Config struct { ReservationController ReservationControllerConfig `json:"committedResourceReservationController"` CommittedResourceController CommittedResourceControllerConfig `json:"committedResourceController"` + UsageReconciler UsageReconcilerConfig `json:"committedResourceUsageReconciler"` API APIConfig `json:"committedResourceAPI"` // DatasourceName is the name of the Datasource CRD that provides database - // connection info. Used to construct the UsageDBClient for report-usage. + // connection info. Used to construct the UsageDBClient for report-usage and usage reconciler. DatasourceName string `json:"datasourceName,omitempty"` } +// UsageReconcilerConfig holds tuning knobs for the usage reconciler. +type UsageReconcilerConfig struct { + // CooldownInterval is the minimum time between usage reconcile runs for the same CommittedResource. + // If a reconcile ran within this window, the next trigger is deferred until the window expires. + // This interval also acts as the periodic fallback: every successful reconcile schedules the + // next run after this duration so that changes not caught by watches are still picked up. + CooldownInterval metav1.Duration `json:"cooldownInterval"` +} + +func DefaultUsageReconcilerConfig() UsageReconcilerConfig { + return UsageReconcilerConfig{ + CooldownInterval: metav1.Duration{Duration: 5 * time.Minute}, + } +} + +// ApplyDefaults fills in zero-value fields from the defaults, leaving explicitly configured values intact. +func (c *UsageReconcilerConfig) ApplyDefaults() { + d := DefaultUsageReconcilerConfig() + if c.CooldownInterval.Duration == 0 { + c.CooldownInterval = d.CooldownInterval + } +} + // ReservationControllerConfig holds tuning knobs for the Reservation CRD controller. type ReservationControllerConfig struct { // RequeueIntervalActive is how often to re-verify a healthy Reservation CRD. @@ -45,8 +69,32 @@ type ReservationControllerConfig struct { // CommittedResourceControllerConfig holds tuning knobs for the CommittedResource CRD controller. type CommittedResourceControllerConfig struct { - // RequeueIntervalRetry is the back-off interval when placement is pending or failed. + // RequeueIntervalRetry is the base back-off interval when placement fails (AllowRejection=false path). + // The actual delay doubles with each consecutive failure: base * 2^min(failures, 6), capped at MaxRequeueInterval. + // If zero (unconfigured), backoff is disabled and the controller retries immediately on every failure. RequeueIntervalRetry metav1.Duration `json:"requeueIntervalRetry"` + + // MaxRequeueInterval caps the exponential backoff delay. + // Once this ceiling is reached, every subsequent retry fires after exactly this interval. + MaxRequeueInterval metav1.Duration `json:"maxRequeueInterval"` +} + +func DefaultCommittedResourceControllerConfig() CommittedResourceControllerConfig { + return CommittedResourceControllerConfig{ + RequeueIntervalRetry: metav1.Duration{Duration: 30 * time.Second}, + MaxRequeueInterval: metav1.Duration{Duration: 30 * time.Minute}, + } +} + +// ApplyDefaults fills in zero-value fields from the defaults, leaving explicitly configured values intact. +func (c *CommittedResourceControllerConfig) ApplyDefaults() { + d := DefaultCommittedResourceControllerConfig() + if c.RequeueIntervalRetry.Duration == 0 { + c.RequeueIntervalRetry = d.RequeueIntervalRetry + } + if c.MaxRequeueInterval.Duration == 0 { + c.MaxRequeueInterval = d.MaxRequeueInterval + } } // ResourceTypeConfig holds per-resource flags for a single resource type within a flavor group. diff --git a/internal/scheduling/reservations/commitments/field_index.go b/internal/scheduling/reservations/commitments/field_index.go index 40760655d..3371e36a8 100644 --- a/internal/scheduling/reservations/commitments/field_index.go +++ b/internal/scheduling/reservations/commitments/field_index.go @@ -16,11 +16,11 @@ import ( const idxCommittedResourceByUUID = "spec.commitmentUUID" const idxReservationByCommitmentUUID = "spec.committedResourceReservation.commitmentUUID" -// IndexFields registers field indexes required by the CommittedResource controller. -func IndexFields(ctx context.Context, mcl *multicluster.Client) error { +// indexCommittedResourceByUUID registers the index used by UsageReconciler to look up +// CommittedResources by their CommitmentUUID. +func indexCommittedResourceByUUID(ctx context.Context, mcl *multicluster.Client) error { log := logf.FromContext(ctx) - log.Info("Setting up field indexes for the CommittedResource controller") - if err := mcl.IndexField(ctx, + return mcl.IndexField(ctx, &v1alpha1.CommittedResource{}, &v1alpha1.CommittedResourceList{}, idxCommittedResourceByUUID, @@ -35,11 +35,14 @@ func IndexFields(ctx context.Context, mcl *multicluster.Client) error { } return []string{cr.Spec.CommitmentUUID} }, - ); err != nil { - log.Error(err, "failed to set up index for commitmentUUID") - return err - } - if err := mcl.IndexField(ctx, + ) +} + +// indexReservationByCommitmentUUID registers the index used by CommittedResourceController to +// look up child Reservations by their CommitmentUUID. +func indexReservationByCommitmentUUID(ctx context.Context, mcl *multicluster.Client) error { + log := logf.FromContext(ctx) + return mcl.IndexField(ctx, &v1alpha1.Reservation{}, &v1alpha1.ReservationList{}, idxReservationByCommitmentUUID, @@ -54,10 +57,5 @@ func IndexFields(ctx context.Context, mcl *multicluster.Client) error { } return []string{res.Spec.CommittedResourceReservation.CommitmentUUID} }, - ); err != nil { - log.Error(err, "failed to set up index for reservation commitmentUUID") - return err - } - log.Info("Successfully set up field indexes") - return nil + ) } diff --git a/internal/scheduling/reservations/commitments/integration_test.go b/internal/scheduling/reservations/commitments/integration_test.go index e89e2adb1..49e57a0c8 100644 --- a/internal/scheduling/reservations/commitments/integration_test.go +++ b/internal/scheduling/reservations/commitments/integration_test.go @@ -454,7 +454,7 @@ func (e *intgEnv) reconcileChildReservations(t *testing.T, crName string) { // condition or the 5 s deadline is reached. // // One pass: -// 1. CR controller (adds finalizer / creates Reservation CRDs / handles inactive states) +// 1. CR controller (creates Reservation CRDs / handles inactive states) // 2. Reservation controller ×2 per slot (first call sets TargetHost, second sets Ready=True) // 3. CR controller again (picks up placement outcomes: Accepted or Rejected) func intgDriveToTerminal(t *testing.T, env *intgEnv, crNames []string) { @@ -528,9 +528,6 @@ func intgDriveToTerminal(t *testing.T, env *intgEnv, crNames []string) { } func intgIsTerminalCR(cr v1alpha1.CommittedResource) bool { - if !cr.DeletionTimestamp.IsZero() { - return false // needs one more reconcile to remove its finalizer - } cond := meta.FindStatusCondition(cr.Status.Conditions, v1alpha1.CommittedResourceConditionReady) if cond == nil { return false @@ -696,7 +693,7 @@ func TestCRLifecycle(t *testing.T) { t.Fatalf("create CR: %v", err) } - // Reconcile as planned: finalizer added, no Reservations. + // Reconcile as planned: no Reservations created. env.reconcileCR(t, cr.Name) env.reconcileCR(t, cr.Name) if got := env.listChildReservations(t, cr.Name); len(got) != 0 { @@ -738,8 +735,8 @@ func TestCRLifecycle(t *testing.T) { } // Bring to confirmed+Ready=True. - env.reconcileCR(t, cr.Name) // adds finalizer env.reconcileCR(t, cr.Name) // creates Reservations + env.reconcileCR(t, cr.Name) // picks up Reservation outcomes env.reconcileChildReservations(t, cr.Name) // places slots → Ready=True if got := env.listChildReservations(t, cr.Name); len(got) != 1 { @@ -810,58 +807,6 @@ func TestCRLifecycle(t *testing.T) { } }) - t.Run("deletion: finalizer removed, child Reservations cleaned up", func(t *testing.T) { - env := newDefaultIntgEnv(t) - defer env.close() - - cr := newTestCommittedResource("my-cr", v1alpha1.CommitmentStatusConfirmed) - if err := env.k8sClient.Create(context.Background(), cr); err != nil { - t.Fatalf("create CR: %v", err) - } - - // Pre-create a child Reservation to verify it gets cleaned up on deletion. - // newTestCommittedResource pre-populates the finalizer, so Delete() immediately sets DeletionTimestamp. - child := &v1alpha1.Reservation{ - ObjectMeta: metav1.ObjectMeta{ - Name: "my-cr-0", - Labels: map[string]string{ - v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, - }, - }, - Spec: v1alpha1.ReservationSpec{ - Type: v1alpha1.ReservationTypeCommittedResource, - CommittedResourceReservation: &v1alpha1.CommittedResourceReservationSpec{ - CommitmentUUID: "test-uuid-1234", - }, - }, - } - if err := env.k8sClient.Create(context.Background(), child); err != nil { - t.Fatalf("create child reservation: %v", err) - } - - crState := env.getCR(t, cr.Name) - if err := env.k8sClient.Delete(context.Background(), &crState); err != nil { - t.Fatalf("delete CR: %v", err) - } - env.reconcileCR(t, cr.Name) - - if got := env.listChildReservations(t, cr.Name); len(got) != 0 { - t.Errorf("post-deletion: expected 0 reservations, got %d", len(got)) - } - var final v1alpha1.CommittedResource - err := env.k8sClient.Get(context.Background(), types.NamespacedName{Name: cr.Name}, &final) - if client.IgnoreNotFound(err) != nil { - t.Fatalf("unexpected error after deletion: %v", err) - } - if err == nil { - for _, f := range final.Finalizers { - if f == crFinalizer { - t.Errorf("finalizer not removed after deletion reconcile") - } - } - } - }) - t.Run("confirmed→superseded: child Reservations deleted, CR marked inactive", func(t *testing.T) { env := newDefaultIntgEnv(t) defer env.close() diff --git a/internal/scheduling/reservations/commitments/reservation_manager.go b/internal/scheduling/reservations/commitments/reservation_manager.go index 6d70bcd20..3e0d7dbc6 100644 --- a/internal/scheduling/reservations/commitments/reservation_manager.go +++ b/internal/scheduling/reservations/commitments/reservation_manager.go @@ -109,19 +109,24 @@ func (m *ReservationManager) ApplyCommitmentState( nextSlotIndex := GetNextSlotIndex(existing) - // Phase 3 (DELETE): Delete inconsistent reservations (wrong flavor group/project) + // Phase 3 (DELETE): Delete inconsistent reservations (wrong flavor group, project, or AZ). + // AZ is included because the reservation is pinned to a specific host; changing AZ requires + // re-placement on a host in the new AZ — patching the spec field in place is not sufficient. // They will be recreated with correct metadata in subsequent phases. var validReservations []v1alpha1.Reservation for _, res := range existing { if res.Spec.CommittedResourceReservation.ResourceGroup != desiredState.FlavorGroupName || - res.Spec.CommittedResourceReservation.ProjectID != desiredState.ProjectID { - log.Info("Found a reservation with wrong flavor group or project, delete and recreate afterward", + res.Spec.CommittedResourceReservation.ProjectID != desiredState.ProjectID || + res.Spec.AvailabilityZone != desiredState.AvailabilityZone { + log.Info("Found a reservation with wrong flavor group, project, or AZ, delete and recreate afterward", "commitmentUUID", desiredState.CommitmentUUID, "name", res.Name, "expectedFlavorGroup", desiredState.FlavorGroupName, "actualFlavorGroup", res.Spec.CommittedResourceReservation.ResourceGroup, "expectedProjectID", desiredState.ProjectID, - "actualProjectID", res.Spec.CommittedResourceReservation.ProjectID) + "actualProjectID", res.Spec.CommittedResourceReservation.ProjectID, + "expectedAZ", desiredState.AvailabilityZone, + "actualAZ", res.Spec.AvailabilityZone) result.Repaired++ result.RemovedReservations = append(result.RemovedReservations, res) memValue := res.Spec.Resources[hv1.ResourceMemory] @@ -227,9 +232,11 @@ func (m *ReservationManager) syncReservationMetadata( state *CommitmentState, ) (*v1alpha1.Reservation, error) { - // if any of CommitmentUUID, AZ, StarTime, EndTime differ from desired state, need to patch + // if any of CommitmentUUID, DomainID, StartTime, EndTime, ParentGeneration differ from desired state, need to patch. + // AvailabilityZone is intentionally excluded: an AZ mismatch is handled in Phase 3 (delete + recreate) + // because the reservation is pinned to a host and cannot simply be patched to a different AZ. if (state.CommitmentUUID != "" && reservation.Spec.CommittedResourceReservation.CommitmentUUID != state.CommitmentUUID) || - (state.AvailabilityZone != "" && reservation.Spec.AvailabilityZone != state.AvailabilityZone) || + (state.DomainID != "" && reservation.Spec.CommittedResourceReservation.DomainID != state.DomainID) || (state.StartTime != nil && (reservation.Spec.StartTime == nil || !reservation.Spec.StartTime.Time.Equal(*state.StartTime))) || (state.EndTime != nil && (reservation.Spec.EndTime == nil || !reservation.Spec.EndTime.Time.Equal(*state.EndTime))) || (state.ParentGeneration != 0 && reservation.Spec.CommittedResourceReservation.ParentGeneration != state.ParentGeneration) { @@ -243,13 +250,12 @@ func (m *ReservationManager) syncReservationMetadata( if state.CommitmentUUID != "" { reservation.Spec.CommittedResourceReservation.CommitmentUUID = state.CommitmentUUID } + if state.DomainID != "" { + reservation.Spec.CommittedResourceReservation.DomainID = state.DomainID + } if state.ParentGeneration != 0 { reservation.Spec.CommittedResourceReservation.ParentGeneration = state.ParentGeneration } - - if state.AvailabilityZone != "" { - reservation.Spec.AvailabilityZone = state.AvailabilityZone - } if state.StartTime != nil { reservation.Spec.StartTime = &metav1.Time{Time: *state.StartTime} } diff --git a/internal/scheduling/reservations/commitments/reservation_manager_test.go b/internal/scheduling/reservations/commitments/reservation_manager_test.go index bb2fdaf52..e2ce2f26c 100644 --- a/internal/scheduling/reservations/commitments/reservation_manager_test.go +++ b/internal/scheduling/reservations/commitments/reservation_manager_test.go @@ -45,6 +45,20 @@ func newTestCRSlot(name string, memGiB int64, targetHost, resourceGroup string, } } +// withAZ returns a copy of the reservation with the given availability zone set. +func withAZ(res v1alpha1.Reservation, az string) v1alpha1.Reservation { + res.Spec.AvailabilityZone = az + return res +} + +// withDomainID returns a copy of the reservation with the given domain ID set. +func withDomainID(res v1alpha1.Reservation, domainID string) v1alpha1.Reservation { + spec := *res.Spec.CommittedResourceReservation + spec.DomainID = domainID + res.Spec.CommittedResourceReservation = &spec + return res +} + // testFlavorGroups returns the default flavor groups map used across tests. func testFlavorGroups() map[string]compute.FlavorGroupFeature { return map[string]compute.FlavorGroupFeature{"test-group": testFlavorGroup()} @@ -59,6 +73,8 @@ func TestApplyCommitmentState(t *testing.T) { name string existingSlots []v1alpha1.Reservation desiredMemoryGiB int64 + desiredAZ string + desiredDomainID string flavorGroupOverride map[string]compute.FlavorGroupFeature // nil = testFlavorGroups() wantError bool wantRemovedCount int // exact count; -1 = at least one @@ -210,6 +226,97 @@ func TestApplyCommitmentState(t *testing.T) { } }, }, + // ---------------------------------------------------------------- + // AZ change must delete+recreate (re-placement required) + // ---------------------------------------------------------------- + { + // Bug: before the fix, AZ change was handled by syncReservationMetadata which + // patched the spec in place, leaving the reservation pinned to a host in the wrong AZ. + name: "AZ change on placed reservation triggers delete and recreate", + existingSlots: []v1alpha1.Reservation{ + withAZ(newTestCRSlot("commitment-abc123-0", 8, "host-1", "test-group", nil), "az-old"), + }, + desiredMemoryGiB: 8, + desiredAZ: "az-new", + wantRemovedCount: 1, + validateRemoved: func(t *testing.T, removed []v1alpha1.Reservation) { + if got := removed[0].Spec.AvailabilityZone; got != "az-old" { + t.Errorf("expected removed slot to have AZ az-old, got %q", got) + } + }, + validateRemaining: func(t *testing.T, remaining []v1alpha1.Reservation) { + if len(remaining) != 1 { + t.Fatalf("expected 1 remaining slot, got %d", len(remaining)) + } + r := remaining[0] + if got := r.Spec.AvailabilityZone; got != "az-new" { + t.Errorf("expected recreated slot to have AZ az-new, got %q", got) + } + if r.Spec.TargetHost != "" { + t.Errorf("expected recreated slot to have no TargetHost (pending re-placement), got %q", r.Spec.TargetHost) + } + }, + }, + { + name: "AZ change on unplaced reservation also triggers delete and recreate", + existingSlots: []v1alpha1.Reservation{ + withAZ(newTestCRSlot("commitment-abc123-0", 8, "", "test-group", nil), "az-old"), + }, + desiredMemoryGiB: 8, + desiredAZ: "az-new", + wantRemovedCount: 1, + validateRemaining: func(t *testing.T, remaining []v1alpha1.Reservation) { + if len(remaining) != 1 { + t.Fatalf("expected 1 remaining slot, got %d", len(remaining)) + } + if got := remaining[0].Spec.AvailabilityZone; got != "az-new" { + t.Errorf("expected recreated slot to have AZ az-new, got %q", got) + } + }, + }, + { + name: "matching AZ does not trigger delete", + existingSlots: []v1alpha1.Reservation{ + withAZ(newTestCRSlot("commitment-abc123-0", 8, "host-1", "test-group", nil), "az-1"), + }, + desiredMemoryGiB: 8, + desiredAZ: "az-1", + wantRemovedCount: 0, + validateRemaining: func(t *testing.T, remaining []v1alpha1.Reservation) { + if len(remaining) != 1 { + t.Fatalf("expected 1 remaining, got %d", len(remaining)) + } + if remaining[0].Spec.TargetHost != "host-1" { + t.Errorf("expected host-1 to be preserved, got %q", remaining[0].Spec.TargetHost) + } + }, + }, + // ---------------------------------------------------------------- + // DomainID change must be synced in place (no re-placement) + // ---------------------------------------------------------------- + { + // Bug: before the fix, DomainID was never synced — existing reservations silently + // kept stale domain metadata if Limes updated project information. + name: "DomainID change is synced in place without re-placement", + existingSlots: []v1alpha1.Reservation{ + withDomainID(newTestCRSlot("commitment-abc123-0", 8, "host-1", "test-group", nil), "domain-old"), + }, + desiredMemoryGiB: 8, + desiredDomainID: "domain-new", + wantRemovedCount: 0, + validateRemaining: func(t *testing.T, remaining []v1alpha1.Reservation) { + if len(remaining) != 1 { + t.Fatalf("expected 1 remaining, got %d", len(remaining)) + } + r := remaining[0] + if got := r.Spec.CommittedResourceReservation.DomainID; got != "domain-new" { + t.Errorf("expected DomainID domain-new, got %q", got) + } + if r.Spec.TargetHost != "host-1" { + t.Errorf("expected host-1 to be preserved (no re-placement), got %q", r.Spec.TargetHost) + } + }, + }, } scheme := newCRTestScheme(t) @@ -232,6 +339,8 @@ func TestApplyCommitmentState(t *testing.T) { ProjectID: "project-1", FlavorGroupName: "test-group", TotalMemoryBytes: tt.desiredMemoryGiB * 1024 * 1024 * 1024, + AvailabilityZone: tt.desiredAZ, + DomainID: tt.desiredDomainID, } applyResult, err := manager.ApplyCommitmentState( diff --git a/internal/scheduling/reservations/commitments/syncer.go b/internal/scheduling/reservations/commitments/syncer.go index 8d3a43adf..d7c6ca705 100644 --- a/internal/scheduling/reservations/commitments/syncer.go +++ b/internal/scheduling/reservations/commitments/syncer.go @@ -291,8 +291,8 @@ func (s *Syncer) SyncReservations(ctx context.Context) error { // Count CommittedResource CRDs present locally but absent from Limes (do not delete — Limes // responses may be transient and deleting active CRDs would drop Reservation slots). - // Also GC CRDs whose EndTime has passed: the commitment is over, the controller's finalizer - // will clean up child Reservations on deletion. + // Also GC CRDs whose EndTime has passed: the commitment is over, child Reservations will be + // cleaned up by the syncer's orphan GC on the next sync cycle. var existingCRs v1alpha1.CommittedResourceList if err := s.List(ctx, &existingCRs); err != nil { logger.Error(err, "failed to list existing committed resource CRDs") @@ -392,7 +392,8 @@ func (s *Syncer) applyCommittedResourceSpec(cr *v1alpha1.CommittedResource, stat cr.Spec.ProjectID = state.ProjectID cr.Spec.DomainID = state.DomainID cr.Spec.State = state.State - cr.Spec.AllowRejection = false + // AllowRejection is not set here: the API path (applyCRSpec) sets it explicitly, + // and callers that go through CreateOrUpdate preserve the existing value. if state.StartTime != nil { t := metav1.NewTime(*state.StartTime) @@ -413,7 +414,12 @@ func (s *Syncer) upsertCommittedResource(ctx context.Context, logger logr.Logger cr.Name = "commitment-" + state.CommitmentUUID op, err := controllerutil.CreateOrUpdate(ctx, s.Client, cr, func() error { + // AllowRejection is an API execution flag, not a Limes commitment property. + // Preserve the existing value so a syncer write never clobbers an in-flight + // change-commitments request. For new CRDs the zero value (false) is correct. + allowRejection := cr.Spec.AllowRejection s.applyCommittedResourceSpec(cr, state) + cr.Spec.AllowRejection = allowRejection return nil }) if err != nil { diff --git a/internal/scheduling/reservations/commitments/usage.go b/internal/scheduling/reservations/commitments/usage.go index d634fc2a0..e2682d26d 100644 --- a/internal/scheduling/reservations/commitments/usage.go +++ b/internal/scheduling/reservations/commitments/usage.go @@ -30,7 +30,7 @@ type UsageDBClient interface { ListProjectVMs(ctx context.Context, projectID string) ([]VMRow, error) } -// VMRow is the result of a joined server+flavor query from Postgres. +// VMRow is the result of a joined server+flavor+image query from Postgres. type VMRow struct { ID string Name string @@ -43,6 +43,7 @@ type VMRow struct { FlavorVCPUs uint64 FlavorDisk uint64 FlavorExtras string // JSON string of flavor extra_specs + OSType string // pre-computed from Glance image properties; "unknown" when not found } // CommitmentStateWithUsage extends CommitmentState with usage tracking for billing calculations. @@ -51,8 +52,10 @@ type CommitmentStateWithUsage struct { CommitmentState // RemainingMemoryBytes is the uncommitted capacity left for VM assignment RemainingMemoryBytes int64 - // AssignedVMs tracks which VMs have been assigned to this commitment - AssignedVMs []string + // AssignedInstances tracks which VM instances have been assigned to this commitment + AssignedInstances []string + // UsedVCPUs is the total vCPU count of assigned VM instances + UsedVCPUs int64 } // NewCommitmentStateWithUsage creates a CommitmentStateWithUsage from a CommitmentState. @@ -60,16 +63,17 @@ func NewCommitmentStateWithUsage(state *CommitmentState) *CommitmentStateWithUsa return &CommitmentStateWithUsage{ CommitmentState: *state, RemainingMemoryBytes: state.TotalMemoryBytes, - AssignedVMs: []string{}, + AssignedInstances: []string{}, } } // AssignVM attempts to assign a VM to this commitment if there's enough capacity. // Returns true if the VM was assigned, false if not enough capacity. -func (c *CommitmentStateWithUsage) AssignVM(vmUUID string, vmMemoryBytes int64) bool { +func (c *CommitmentStateWithUsage) AssignVM(vmUUID string, vmMemoryBytes, vCPUs int64) bool { if c.RemainingMemoryBytes >= vmMemoryBytes { c.RemainingMemoryBytes -= vmMemoryBytes - c.AssignedVMs = append(c.AssignedVMs, vmUUID) + c.UsedVCPUs += vCPUs + c.AssignedInstances = append(c.AssignedInstances, vmUUID) return true } return false @@ -92,6 +96,7 @@ type VMUsageInfo struct { VCPUs uint64 DiskGB uint64 VideoRAMMiB *uint64 // optional, from flavor extra_specs hw_video:ram_max_mb + OSType string // pre-computed from Glance image; "unknown" for volume-booted or unmapped images AZ string Hypervisor string CreatedAt time.Time @@ -113,96 +118,116 @@ func NewUsageCalculator(client client.Client, usageDB UsageDBClient) *UsageCalcu } // CalculateUsage computes the usage report for a specific project. +// VM-to-commitment assignment is read from CommittedResource CRD status (pre-computed by the +// UsageReconciler). If a CR has no usage status yet, its VMs appear as PAYG until the first +// reconcile completes (within one CooldownInterval). func (c *UsageCalculator) CalculateUsage( ctx context.Context, log logr.Logger, projectID string, allAZs []liquid.AvailabilityZone, ) (liquid.ServiceUsageReport, error) { - // Step 1: Get flavor groups from knowledge + knowledge := &reservations.FlavorGroupKnowledgeClient{Client: c.client} flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil) if err != nil { return liquid.ServiceUsageReport{}, fmt.Errorf("failed to get flavor groups: %w", err) } - // Get info version from Knowledge CRD (used by Limes to detect metadata changes) var infoVersion int64 = -1 if knowledgeCRD, err := knowledge.Get(ctx); err == nil && knowledgeCRD != nil && !knowledgeCRD.Status.LastContentChange.IsZero() { infoVersion = knowledgeCRD.Status.LastContentChange.Unix() } - // Step 2: Build commitment capacity map from K8s Reservation CRDs - commitmentsByAZFlavorGroup, err := c.buildCommitmentCapacityMap(ctx, log, projectID) + vmAssignments, err := c.BuildVMAssignmentsFromStatus(ctx, projectID) if err != nil { - return liquid.ServiceUsageReport{}, fmt.Errorf("failed to build commitment capacity map: %w", err) + return liquid.ServiceUsageReport{}, fmt.Errorf("failed to read VM assignments from CRD status: %w", err) } - // Step 3: Get and sort VMs for the project vms, err := c.getProjectVMs(ctx, log, projectID, flavorGroups, allAZs) if err != nil { return liquid.ServiceUsageReport{}, fmt.Errorf("failed to get project VMs: %w", err) } - sortVMsForUsageCalculation(vms) - - // Step 4: Assign VMs to commitments - vmAssignments, assignedToCommitments := c.assignVMsToCommitments(vms, commitmentsByAZFlavorGroup) - // Step 5: Build the response report := c.buildUsageResponse(vms, vmAssignments, flavorGroups, allAZs, infoVersion) + assignedToCommitments := 0 + for _, commitmentUUID := range vmAssignments { + if commitmentUUID != "" { + assignedToCommitments++ + } + } log.Info("completed usage report", "projectID", projectID, "vmCount", len(vms), "assignedToCommitments", assignedToCommitments, "payg", len(vms)-assignedToCommitments, - "commitments", countCommitmentStates(commitmentsByAZFlavorGroup), "resources", len(report.Resources)) return report, nil } +// BuildVMAssignmentsFromStatus reads pre-computed VM-to-commitment assignments from +// CommittedResource CRD status. Returns a map of vmUUID → commitmentUUID (empty string = PAYG). +// This is the read path that replaces the inline assignment algorithm in the usage API. +func (c *UsageCalculator) BuildVMAssignmentsFromStatus(ctx context.Context, projectID string) (map[string]string, error) { + var crList v1alpha1.CommittedResourceList + if err := c.client.List(ctx, &crList); err != nil { + return nil, fmt.Errorf("failed to list CommittedResources: %w", err) + } + assignments := make(map[string]string) + for _, cr := range crList.Items { + if cr.Spec.ProjectID != projectID { + continue + } + for _, vmUUID := range cr.Status.AssignedInstances { + assignments[vmUUID] = cr.Spec.CommitmentUUID + } + } + return assignments, nil +} + // azFlavorGroupKey creates a deterministic key for az:flavorGroup lookups. func azFlavorGroupKey(az, flavorGroup string) string { return az + ":" + flavorGroup } -// buildCommitmentCapacityMap retrieves all CR reservations for a project and builds -// a map of az:flavorGroup -> list of CommitmentStateWithUsage, sorted for deterministic assignment. +// buildCommitmentCapacityMap builds a map of az:flavorGroup -> list of CommitmentStateWithUsage +// from CommittedResource CRD status (AcceptedAmount + AcceptedSpec). +// Using AcceptedAmount gives the billing-perspective capacity — what was confirmed — rather than +// the sum of internally-placed Reservation slots, which can lag behind spec changes. func (c *UsageCalculator) buildCommitmentCapacityMap( ctx context.Context, log logr.Logger, projectID string, ) (map[string][]*CommitmentStateWithUsage, error) { - // List all committed resource reservations - var allReservations v1alpha1.ReservationList - if err := c.client.List(ctx, &allReservations, client.MatchingLabels{ - v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, - }); err != nil { - return nil, fmt.Errorf("failed to list reservations: %w", err) + + var allCRs v1alpha1.CommittedResourceList + if err := c.client.List(ctx, &allCRs); err != nil { + return nil, fmt.Errorf("failed to list CommittedResources: %w", err) } - // Group reservations by commitment UUID, filtering by project - reservationsByCommitment := make(map[string][]v1alpha1.Reservation) - for _, res := range allReservations.Items { - if res.Spec.CommittedResourceReservation == nil { + now := time.Now() + result := make(map[string][]*CommitmentStateWithUsage) + for _, cr := range allCRs.Items { + if cr.Spec.ProjectID != projectID { continue } - if res.Spec.CommittedResourceReservation.ProjectID != projectID { + if cr.Spec.State != v1alpha1.CommitmentStatusConfirmed && cr.Spec.State != v1alpha1.CommitmentStatusGuaranteed { + continue + } + if cr.Status.AcceptedSpec == nil || cr.Status.AcceptedAmount == nil { + log.V(1).Info("skipping CR with no accepted spec/amount", "cr", cr.Name) continue } - commitmentUUID := res.Spec.CommittedResourceReservation.CommitmentUUID - reservationsByCommitment[commitmentUUID] = append(reservationsByCommitment[commitmentUUID], res) - } - // Build CommitmentState for each commitment and group by az:flavorGroup - // Only include commitments that are currently active (started and not expired) - now := time.Now() - result := make(map[string][]*CommitmentStateWithUsage) - for _, reservations := range reservationsByCommitment { - state, err := FromReservations(reservations) + // Build state from the accepted spec snapshot so capacity always reflects + // what was confirmed, not the potentially-mutated current spec. + tempCR := v1alpha1.CommittedResource{Spec: *cr.Status.AcceptedSpec} + tempCR.Spec.Amount = *cr.Status.AcceptedAmount + state, err := FromCommittedResource(tempCR) if err != nil { - log.Error(err, "failed to build commitment state from reservations") + log.Error(err, "skipping CR with invalid accepted spec", "cr", cr.Name) continue } @@ -313,6 +338,7 @@ func (c *UsageCalculator) getProjectVMs( VCPUs: row.FlavorVCPUs, DiskGB: row.FlavorDisk, VideoRAMMiB: videoRAMMiB, + OSType: row.OSType, AZ: string(normalizedAZ), Hypervisor: row.Hypervisor, CreatedAt: createdAt, @@ -390,7 +416,7 @@ func (c *UsageCalculator) assignVMsToCommitments( // Try to assign to first commitment with remaining capacity for _, commitment := range commitments { - if commitment.AssignVM(vm.UUID, vmMemoryBytes) { + if commitment.AssignVM(vm.UUID, vmMemoryBytes, int64(vm.VCPUs)) { //nolint:gosec // VCPUs from Nova, realistically bounded vmAssignments[vm.UUID] = commitment.CommitmentUUID assigned = true assignedCount++ @@ -457,6 +483,7 @@ func (c *UsageCalculator) buildUsageResponse( subresource, err := liquid.SubresourceBuilder[map[string]any]{ ID: vm.UUID, + Name: vm.Name, Attributes: attributes, }.Finalize() if err != nil { @@ -564,8 +591,9 @@ func buildVMAttributes(vm VMUsageInfo, commitmentID string) map[string]any { } result := map[string]any{ - "status": vm.Status, - "flavor": flavor, + "status": vm.Status, + "flavor": flavor, + "os_type": vm.OSType, } // Add commitment_id - nil for PAYG, string for committed @@ -614,7 +642,7 @@ func (c *dbUsageClient) getReader(ctx context.Context) (*external.PostgresReader return reader, nil } -// vmQueryRow is the scan target for the server+flavor JOIN query. +// vmQueryRow is the scan target for the server+flavor+image JOIN query. type vmQueryRow struct { ID string `db:"id"` Name string `db:"name"` @@ -627,6 +655,7 @@ type vmQueryRow struct { FlavorVCPUs uint64 `db:"flavor_vcpus"` FlavorDisk uint64 `db:"flavor_disk"` FlavorExtras string `db:"flavor_extras"` + OSType string `db:"os_type"` } // ListProjectVMs returns all VMs for a project joined with their flavor data from Postgres. @@ -645,9 +674,11 @@ func (c *dbUsageClient) ListProjectVMs(ctx context.Context, projectID string) ([ COALESCE(f.ram, 0) AS flavor_ram, COALESCE(f.vcpus, 0) AS flavor_vcpus, COALESCE(f.disk, 0) AS flavor_disk, - COALESCE(f.extra_specs, '') AS flavor_extras + COALESCE(f.extra_specs, '') AS flavor_extras, + COALESCE(i.os_type, 'unknown') AS os_type FROM ` + nova.Server{}.TableName() + ` s LEFT JOIN ` + nova.Flavor{}.TableName() + ` f ON f.name = s.flavor_name + LEFT JOIN ` + nova.Image{}.TableName() + ` i ON i.id = s.image_ref WHERE s.tenant_id = $1` var rows []vmQueryRow diff --git a/internal/scheduling/reservations/commitments/usage_internals_test.go b/internal/scheduling/reservations/commitments/usage_internals_test.go index d9f6c474f..1c354275f 100644 --- a/internal/scheduling/reservations/commitments/usage_internals_test.go +++ b/internal/scheduling/reservations/commitments/usage_internals_test.go @@ -177,6 +177,7 @@ func TestBuildVMAttributes(t *testing.T) { MemoryMB: 4096, VCPUs: 16, DiskGB: 100, + OSType: "windows8Server64Guest", } t.Run("with commitment", func(t *testing.T) { @@ -186,7 +187,11 @@ func TestBuildVMAttributes(t *testing.T) { t.Errorf("status = %v, expected ACTIVE", attrs["status"]) } - for _, absent := range []string{"metadata", "tags", "os_type"} { + if attrs["os_type"] != "windows8Server64Guest" { + t.Errorf("os_type = %v, expected windows8Server64Guest", attrs["os_type"]) + } + + for _, absent := range []string{"metadata", "tags"} { if _, present := attrs[absent]; present { t.Errorf("%s must not appear in output (not available from Postgres cache)", absent) } diff --git a/internal/scheduling/reservations/commitments/usage_reconciler.go b/internal/scheduling/reservations/commitments/usage_reconciler.go new file mode 100644 index 000000000..fa4a72e6a --- /dev/null +++ b/internal/scheduling/reservations/commitments/usage_reconciler.go @@ -0,0 +1,303 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "context" + "time" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "github.com/sapcc/go-api-declarations/liquid" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations" + "github.com/cobaltcore-dev/cortex/pkg/multicluster" +) + +// UsageReconciler reconciles CommittedResource.Status usage fields (AssignedInstances, UsedResources, +// LastUsageReconcileAt) by running the deterministic VM-to-CR assignment periodically and on +// relevant change events. +type UsageReconciler struct { + client.Client + Conf UsageReconcilerConfig + UsageDB UsageDBClient + Monitor UsageReconcilerMonitor +} + +func (r *UsageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + start := time.Now() + + var cr v1alpha1.CommittedResource + if err := r.Get(ctx, req.NamespacedName, &cr); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Only active commitments have assigned VMs. Clear stale usage status if present. + if cr.Spec.State != v1alpha1.CommitmentStatusConfirmed && cr.Spec.State != v1alpha1.CommitmentStatusGuaranteed { + if len(cr.Status.AssignedInstances) > 0 || len(cr.Status.UsedResources) > 0 { + old := cr.DeepCopy() + cr.Status.AssignedInstances = nil + cr.Status.UsedResources = nil + cr.Status.LastUsageReconcileAt = nil + cr.Status.UsageObservedGeneration = nil + if err := r.Status().Patch(ctx, &cr, client.MergeFrom(old)); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + } + return ctrl.Result{}, nil + } + + cooldown := r.Conf.CooldownInterval.Duration + + // Gate: wait until the CR controller has accepted the current generation. + // The CR controller writes the Ready condition (with ObservedGeneration) only after + // updating the AcceptedSpec. Running before that would read stale capacity. + // We don't requeue here — the acceptedGenerationPredicate watch fires when the + // condition is written, triggering a fresh reconcile at that point. + readyCond := meta.FindStatusCondition(cr.Status.Conditions, v1alpha1.CommittedResourceConditionReady) + if readyCond == nil || readyCond.ObservedGeneration != cr.Generation || readyCond.Status != metav1.ConditionTrue { + return ctrl.Result{}, nil + } + + // Bypass cooldown when the spec generation has advanced since the last usage reconcile. + // This ensures spec changes (e.g. shrink) are reflected immediately rather than waiting + // for the next cooldown interval — follows the Kubernetes observedGeneration pattern. + generationAdvanced := cr.Status.UsageObservedGeneration == nil || + *cr.Status.UsageObservedGeneration != cr.Generation + if !generationAdvanced && cr.Status.LastUsageReconcileAt != nil { + if elapsed := time.Since(cr.Status.LastUsageReconcileAt.Time); elapsed < cooldown { + return ctrl.Result{RequeueAfter: cooldown - elapsed}, nil + } + } + + logger := ctrl.LoggerFrom(ctx).WithValues( + "component", "usage-reconciler", + "committedResource", req.Name, + "projectID", cr.Spec.ProjectID, + ) + + calc := &UsageCalculator{client: r.Client, usageDB: r.UsageDB} + + knowledge := &reservations.FlavorGroupKnowledgeClient{Client: r.Client} + flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil) + if err != nil { + r.Monitor.reconcileDuration.WithLabelValues("error").Observe(time.Since(start).Seconds()) + return ctrl.Result{}, err + } + + commitmentsByAZFG, err := calc.buildCommitmentCapacityMap(ctx, logger, cr.Spec.ProjectID) + if err != nil { + r.Monitor.reconcileDuration.WithLabelValues("error").Observe(time.Since(start).Seconds()) + return ctrl.Result{}, err + } + if len(commitmentsByAZFG) == 0 { + return ctrl.Result{RequeueAfter: cooldown}, nil + } + + // Derive the known AZs from the commitment map so that NormalizeAZ maps VM AZ strings + // to the same values used as commitment keys. VMs in unrecognised AZs get "unknown" and + // are treated as PAYG, which is the correct fallback. + allAZs := make([]liquid.AvailabilityZone, 0, len(commitmentsByAZFG)) + seenAZs := make(map[liquid.AvailabilityZone]struct{}) + for _, states := range commitmentsByAZFG { + for _, state := range states { + az := liquid.AvailabilityZone(state.AvailabilityZone) + if _, ok := seenAZs[az]; !ok { + seenAZs[az] = struct{}{} + allAZs = append(allAZs, az) + } + } + } + + vms, err := calc.getProjectVMs(ctx, logger, cr.Spec.ProjectID, flavorGroups, allAZs) + if err != nil { + r.Monitor.reconcileDuration.WithLabelValues("error").Observe(time.Since(start).Seconds()) + return ctrl.Result{}, err + } + sortVMsForUsageCalculation(vms) + calc.assignVMsToCommitments(vms, commitmentsByAZFG) + + now := metav1.Now() + written := 0 + totalAssigned := 0 + var writeErr error + for _, group := range commitmentsByAZFG { + for _, state := range group { + if err := r.writeUsageStatus(ctx, state, now); err != nil { + logger.Error(err, "failed to write usage status", "commitmentUUID", state.CommitmentUUID) + writeErr = err + } else { + written++ + totalAssigned += len(state.AssignedInstances) + // Observe status age: how long ago was it last reconciled before this run. + if cr.Status.LastUsageReconcileAt != nil { + r.Monitor.statusAge.Observe(now.Time.Sub(cr.Status.LastUsageReconcileAt.Time).Seconds()) + } + } + } + } + if writeErr != nil { + return ctrl.Result{}, writeErr + } + + r.Monitor.reconcileDuration.WithLabelValues("success").Observe(time.Since(start).Seconds()) + r.Monitor.assignedInstances.Set(float64(totalAssigned)) + + logger.Info("usage reconcile complete", + "commitments", written, + "vms", len(vms), + "assignedInstances", totalAssigned, + ) + + // Successful reconcile schedules the next run after the cooldown — acts as the periodic fallback. + return ctrl.Result{RequeueAfter: cooldown}, nil +} + +// writeUsageStatus patches AssignedInstances, UsedResources, and LastUsageReconcileAt on the CommittedResource +// identified by state.CommitmentUUID. +func (r *UsageReconciler) writeUsageStatus(ctx context.Context, state *CommitmentStateWithUsage, now metav1.Time) error { + var crList v1alpha1.CommittedResourceList + if err := r.List(ctx, &crList, client.MatchingFields{idxCommittedResourceByUUID: state.CommitmentUUID}); err != nil { + return err + } + if len(crList.Items) == 0 { + return nil + } + target := &crList.Items[0] + old := target.DeepCopy() + + usedBytes := state.TotalMemoryBytes - state.RemainingMemoryBytes + usedQty := resource.NewQuantity(usedBytes, resource.BinarySI) + usedCores := resource.NewQuantity(state.UsedVCPUs, resource.DecimalSI) + + target.Status.AssignedInstances = state.AssignedInstances + target.Status.UsedResources = map[string]resource.Quantity{ + "memory": *usedQty, + "cpu": *usedCores, + } + target.Status.LastUsageReconcileAt = &now + target.Status.UsageObservedGeneration = &target.Generation + + return r.Status().Patch(ctx, target, client.MergeFrom(old)) +} + +// hypervisorToCommittedResources maps a Hypervisor change to the CommittedResources of affected projects. +// When a hypervisor's VM list changes, all CommittedResources for projects that have reservations +// on that host need their usage re-evaluated. +func (r *UsageReconciler) hypervisorToCommittedResources(ctx context.Context, obj client.Object) []reconcile.Request { + hvName := obj.GetName() + log := ctrl.LoggerFrom(ctx) + + var reservationList v1alpha1.ReservationList + if err := r.List(ctx, &reservationList, client.MatchingLabels{ + v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource, + }); err != nil { + log.Error(err, "failed to list reservations for hypervisor event", "hypervisor", hvName) + return nil + } + + projectIDs := make(map[string]struct{}) + for _, res := range reservationList.Items { + if res.Status.Host == hvName && res.Spec.CommittedResourceReservation != nil { + projectIDs[res.Spec.CommittedResourceReservation.ProjectID] = struct{}{} + } + } + if len(projectIDs) == 0 { + return nil + } + + var allCRs v1alpha1.CommittedResourceList + if err := r.List(ctx, &allCRs); err != nil { + log.Error(err, "failed to list CommittedResources for hypervisor event", "hypervisor", hvName) + return nil + } + + var requests []reconcile.Request + for _, cr := range allCRs.Items { + if _, affected := projectIDs[cr.Spec.ProjectID]; affected { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{Name: cr.Name}, + }) + } + } + return requests +} + +// SetupWithManager registers the usage reconciler with the controller manager. +func (r *UsageReconciler) SetupWithManager(mgr ctrl.Manager, mcl *multicluster.Client) error { + bldr := multicluster.BuildController(mcl, mgr) + + // Watch CommittedResource status updates where the CR controller has just accepted the + // current generation. Fires when the Ready condition's ObservedGeneration advances to match + // metadata.generation. We intentionally do NOT watch spec changes (GenerationChangedPredicate): + // capacity is read from AcceptedSpec in status, which is only valid after the CR controller + // has finished — so triggering on spec changes would always hit the readiness gate and do nothing. + var err error + bldr, err = bldr.WatchesMulticluster( + &v1alpha1.CommittedResource{}, + &handler.EnqueueRequestForObject{}, + acceptedGenerationPredicate{}, + ) + if err != nil { + return err + } + + // Watch Hypervisor CRDs: when VM instances on a host change, re-evaluate usage for + // projects that have reservations on that host. + bldr, err = bldr.WatchesMulticluster( + &hv1.Hypervisor{}, + handler.EnqueueRequestsFromMapFunc(r.hypervisorToCommittedResources), + ) + if err != nil { + return err + } + + // MaxConcurrentReconciles=1: the per-project assignment is globally consistent only when + // run serially — concurrent runs for the same project could produce conflicting writes. + return bldr.Named("committed-resource-usage"). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). + Complete(r) +} + +// acceptedGenerationPredicate fires on status-only updates where the CR controller has +// just accepted the current spec generation — i.e. the Ready condition's ObservedGeneration +// advanced to match metadata.generation. This drives the usage reconciler immediately after +// the CR controller finishes, without any polling timeout. +type acceptedGenerationPredicate struct{ predicate.Funcs } + +func (acceptedGenerationPredicate) Update(e event.UpdateEvent) bool { + oldCR, ok1 := e.ObjectOld.(*v1alpha1.CommittedResource) + newCR, ok2 := e.ObjectNew.(*v1alpha1.CommittedResource) + if !ok1 || !ok2 { + return false + } + // Only react to status-only updates; spec changes are handled by GenerationChangedPredicate. + if oldCR.Generation != newCR.Generation { + return false + } + oldCond := meta.FindStatusCondition(oldCR.Status.Conditions, v1alpha1.CommittedResourceConditionReady) + newCond := meta.FindStatusCondition(newCR.Status.Conditions, v1alpha1.CommittedResourceConditionReady) + if newCond == nil { + return false + } + var oldObservedGen int64 + if oldCond != nil { + oldObservedGen = oldCond.ObservedGeneration + } + // Fire only when ObservedGeneration advances to match the current spec generation. + return oldObservedGen != newCond.ObservedGeneration && newCond.ObservedGeneration == newCR.Generation +} diff --git a/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go b/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go new file mode 100644 index 000000000..748739f80 --- /dev/null +++ b/internal/scheduling/reservations/commitments/usage_reconciler_monitor.go @@ -0,0 +1,55 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// UsageReconcilerMonitor provides metrics for the usage reconciler. +type UsageReconcilerMonitor struct { + reconcileDuration *prometheus.HistogramVec + statusAge prometheus.Histogram + assignedInstances prometheus.Gauge +} + +// NewUsageReconcilerMonitor creates a new monitor with Prometheus metrics. +func NewUsageReconcilerMonitor() UsageReconcilerMonitor { + m := UsageReconcilerMonitor{ + reconcileDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_cr_usage_reconcile_duration_seconds", + Help: "Duration of committed resource usage reconcile runs in seconds.", + Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30}, + }, []string{"result"}), + statusAge: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_cr_usage_status_age_seconds", + Help: "Age of CommittedResource usage status at reconcile time, in seconds. Distribution across all active commitments shows freshness spread.", + Buckets: []float64{30, 60, 120, 300, 600, 900, 1800}, + }), + assignedInstances: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_cr_usage_assigned_vms_total", + Help: "Total number of VMs currently assigned to committed resources across all active commitments.", + }), + } + + // Pre-initialize result labels so metrics appear before the first reconcile. + m.reconcileDuration.WithLabelValues("success") + m.reconcileDuration.WithLabelValues("error") + + return m +} + +// Describe implements prometheus.Collector. +func (m *UsageReconcilerMonitor) Describe(ch chan<- *prometheus.Desc) { + m.reconcileDuration.Describe(ch) + m.statusAge.Describe(ch) + m.assignedInstances.Describe(ch) +} + +// Collect implements prometheus.Collector. +func (m *UsageReconcilerMonitor) Collect(ch chan<- prometheus.Metric) { + m.reconcileDuration.Collect(ch) + m.statusAge.Collect(ch) + m.assignedInstances.Collect(ch) +} diff --git a/tools/visualize-committed-resources/main.go b/tools/visualize-committed-resources/main.go index afa16a372..7db428d32 100644 --- a/tools/visualize-committed-resources/main.go +++ b/tools/visualize-committed-resources/main.go @@ -318,8 +318,9 @@ func printCommitments(crs []v1alpha1.CommittedResource) { }(), ) - if cr.Status.UsedAmount != nil { - fmt.Printf(" used=%-12s\n", cr.Status.UsedAmount.String()) + if mem, ok := cr.Status.UsedResources["memory"]; ok { + cpu := cr.Status.UsedResources["cpu"] + fmt.Printf(" used=%-12s usedCPU=%s\n", mem.String(), cpu.String()) } endStr := gray("no expiry")