From e7ad5a17d2adcfdf10f81c290a3e7a334b9aa035 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Mon, 4 May 2026 07:41:03 +0200 Subject: [PATCH 1/3] Initial implementation of allocations --- internal/shim/placement/field_index.go | 25 + internal/shim/placement/field_index_test.go | 59 ++ internal/shim/placement/handle_allocations.go | 684 +++++++++++++++++- .../shim/placement/handle_allocations_e2e.go | 551 ++++++++++---- .../shim/placement/handle_allocations_test.go | 396 ++++++++-- .../handle_resource_providers_test.go | 15 + 6 files changed, 1504 insertions(+), 226 deletions(-) diff --git a/internal/shim/placement/field_index.go b/internal/shim/placement/field_index.go index 23bf1470c..2d20d1e89 100644 --- a/internal/shim/placement/field_index.go +++ b/internal/shim/placement/field_index.go @@ -26,6 +26,10 @@ const ( // by their metadata.name field, which represents the name of the hypervisor // in Kubernetes. idxHypervisorName = "metadata.name" + // idxBookingConsumerUUID is the name of the index for looking up + // hypervisors by the consumer UUIDs in their spec.bookings entries. + // A single hypervisor may index multiple consumer UUIDs. + idxBookingConsumerUUID = "spec.bookings.consumer.uuid" ) // IndexFields indexes all fields that are needed by the shim to quickly @@ -77,5 +81,26 @@ func IndexFields(ctx context.Context, mcl *multicluster.Client) error { } log.Info("Successfully set up index for hypervisor name") + if err := mcl.IndexField(ctx, h, hl, idxBookingConsumerUUID, func(obj client.Object) []string { + hv, ok := obj.(*hv1.Hypervisor) + if !ok { + log.Error(errors.New("unexpected type"), "object", obj) + return nil + } + consumers := hv1.GetConsumers(hv.Spec.Bookings) + if len(consumers) == 0 { + return nil + } + uuids := make([]string, 0, len(consumers)) + for _, c := range consumers { + uuids = append(uuids, c.UUID) + } + return uuids + }); err != nil { + log.Error(err, "failed to set up index for booking consumer UUID") + return err + } + log.Info("Successfully set up index for booking consumer UUID") + return nil } diff --git a/internal/shim/placement/field_index_test.go b/internal/shim/placement/field_index_test.go index 8db1f2169..5987fe8d2 100644 --- a/internal/shim/placement/field_index_test.go +++ b/internal/shim/placement/field_index_test.go @@ -118,6 +118,7 @@ func TestIndexFields_RegistersAllIndexes(t *testing.T) { idxHypervisorOpenStackId, idxHypervisorKubernetesId, idxHypervisorName, + idxBookingConsumerUUID, } if len(cc.calls) != len(wantFields) { t.Fatalf("got %d IndexField calls, want %d", len(cc.calls), len(wantFields)) @@ -284,3 +285,61 @@ func strSliceEqual(a, b []string) bool { } return true } + +func TestExtractor_BookingConsumerUUID(t *testing.T) { + cc := &captureCache{} + mcl := buildClient(t, cc) + if err := IndexFields(context.Background(), mcl); err != nil { + t.Fatalf("IndexFields: %v", err) + } + fn := extractorByField(t, cc.calls, idxBookingConsumerUUID) + + tests := []struct { + name string + obj client.Object + want []string + }{ + { + name: "hypervisor with consumer bookings", + obj: &hv1.Hypervisor{ + Spec: hv1.HypervisorSpec{ + Bookings: []hv1.Booking{ + {Consumer: &hv1.ConsumerBooking{UUID: "consumer-aaa"}}, + {Consumer: &hv1.ConsumerBooking{UUID: "consumer-bbb"}}, + {Reservation: &hv1.ReservationBooking{Name: "nova-reserved"}}, + }, + }, + }, + want: []string{"consumer-aaa", "consumer-bbb"}, + }, + { + name: "hypervisor with no bookings", + obj: &hv1.Hypervisor{}, + want: nil, + }, + { + name: "hypervisor with only reservation bookings", + obj: &hv1.Hypervisor{ + Spec: hv1.HypervisorSpec{ + Bookings: []hv1.Booking{ + {Reservation: &hv1.ReservationBooking{Name: "nova-reserved"}}, + }, + }, + }, + want: nil, + }, + { + name: "wrong type", + obj: &corev1.ConfigMap{}, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := fn(tt.obj) + if !strSliceEqual(got, tt.want) { + t.Errorf("got %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/shim/placement/handle_allocations.go b/internal/shim/placement/handle_allocations.go index f909cc5a7..a5bf35c37 100644 --- a/internal/shim/placement/handle_allocations.go +++ b/internal/shim/placement/handle_allocations.go @@ -4,9 +4,83 @@ package placement import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" "net/http" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) +// allocationsRequest is the JSON body for PUT /allocations/{consumer_uuid} +// (microversion 1.28+). +type allocationsRequest struct { + Allocations map[string]allocationEntry `json:"allocations"` + ConsumerGeneration *int64 `json:"consumer_generation"` + ProjectID string `json:"project_id"` + UserID string `json:"user_id"` + ConsumerType string `json:"consumer_type,omitempty"` +} + +// allocationEntry represents a single resource provider's allocation within +// a consumer's allocation set. +type allocationEntry struct { + Resources map[string]int64 `json:"resources"` +} + +// allocationsResponse is the JSON body returned by +// GET /allocations/{consumer_uuid} (microversion 1.28+). +// +// https://docs.openstack.org/api-ref/placement/#list-allocations +type allocationsResponse struct { + Allocations map[string]allocationEntry `json:"allocations"` + ConsumerGeneration *int64 `json:"consumer_generation,omitempty"` + ProjectID string `json:"project_id,omitempty"` + UserID string `json:"user_id,omitempty"` +} + +// placementToHVResources converts Placement resource class amounts to +// Hypervisor CRD resource quantities. +// VCPU → cpu (1:1), MEMORY_MB → memory (MB→bytes), DISK_GB → disk (GB→bytes). +func placementToHVResources(resources map[string]int64) map[hv1.ResourceName]resource.Quantity { + out := make(map[hv1.ResourceName]resource.Quantity, len(resources)) + for name, amount := range resources { + switch name { + case "VCPU": + out[hv1.ResourceCPU] = *resource.NewQuantity(amount, resource.DecimalSI) + case "MEMORY_MB": + out[hv1.ResourceMemory] = *resource.NewQuantity(amount*1024*1024, resource.BinarySI) + default: + out[hv1.ResourceName(name)] = *resource.NewQuantity(amount, resource.DecimalSI) + } + } + return out +} + +// hvToPlacementResources converts Hypervisor CRD resource quantities back to +// Placement resource class amounts. +func hvToPlacementResources(resources map[hv1.ResourceName]resource.Quantity) map[string]int64 { + out := make(map[string]int64, len(resources)) + for name, qty := range resources { + switch name { + case hv1.ResourceCPU: + out["VCPU"] = qty.Value() + case hv1.ResourceMemory: + out["MEMORY_MB"] = qty.Value() / (1024 * 1024) + default: + out[string(name)] = qty.Value() + } + } + return out +} + // HandleManageAllocations handles POST /allocations requests. // // Atomically creates, updates, or deletes allocations for multiple consumers @@ -22,8 +96,19 @@ import ( // (e.g. INSTANCE, MIGRATION) is supported. Returns 204 No Content on // success, or 409 Conflict if inventory is insufficient or a concurrent // update is detected (error code: placement.concurrent_update). +// +// https://docs.openstack.org/api-ref/placement/#manage-allocations func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) { - s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations) + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.manageAllocationsHybrid(w, r) + case FeatureModeCRD: + s.manageAllocationsCRD(w, r) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } } // HandleListAllocations handles GET /allocations/{consumer_uuid} requests. @@ -37,11 +122,23 @@ func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) { // added at 1.12, consumer_generation at 1.28, and consumer_type at 1.38. // The consumer_generation and consumer_type fields are absent when the // consumer has no allocations. +// +// https://docs.openstack.org/api-ref/placement/#list-allocations func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "consumer_uuid"); !ok { + consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid") + if !ok { return } - s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations) + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.listAllocationsHybrid(w, r, consumerUUID) + case FeatureModeCRD: + s.listAllocationsCRD(w, r, consumerUUID) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } } // HandleUpdateAllocations handles PUT /allocations/{consumer_uuid} requests. @@ -55,11 +152,23 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) { // // Returns 204 No Content on success. Returns 409 Conflict if there is // insufficient inventory or if a concurrent update was detected. +// +// https://docs.openstack.org/api-ref/placement/#update-allocations func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "consumer_uuid"); !ok { + consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid") + if !ok { return } - s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations) + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.updateAllocationsHybrid(w, r, consumerUUID) + case FeatureModeCRD: + s.updateAllocationsCRD(w, r, consumerUUID) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } } // HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests. @@ -67,9 +176,570 @@ func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) { // Removes all allocation records for the consumer across all resource // providers. Returns 204 No Content on success, or 404 Not Found if the // consumer has no existing allocations. +// +// https://docs.openstack.org/api-ref/placement/#delete-allocations func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "consumer_uuid"); !ok { + consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid") + if !ok { + return + } + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.deleteAllocationsHybrid(w, r, consumerUUID) + case FeatureModeCRD: + s.deleteAllocationsCRD(w, r, consumerUUID) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } +} + +// --------------------------------------------------------------------------- +// PUT /allocations/{consumer_uuid} — hybrid and crd +// --------------------------------------------------------------------------- + +func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + log.Error(err, "failed to read request body") + http.Error(w, "failed to read request body", http.StatusBadRequest) + return + } + + var req allocationsRequest + if err := json.Unmarshal(bodyBytes, &req); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + + kvmAllocs := make(map[string]allocationEntry) + nonKvmAllocs := make(map[string]allocationEntry) + kvmHypervisors := make(map[string]*hv1.Hypervisor) + + for rpUUID, entry := range req.Allocations { + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 { + kvmAllocs[rpUUID] = entry + hv := hvs.Items[0] + kvmHypervisors[rpUUID] = &hv + } else { + nonKvmAllocs[rpUUID] = entry + } + } + + // Forward non-KVM allocations to upstream first. + if len(nonKvmAllocs) > 0 { + upstreamReq := allocationsRequest{ + Allocations: nonKvmAllocs, + ConsumerGeneration: req.ConsumerGeneration, + ProjectID: req.ProjectID, + UserID: req.UserID, + ConsumerType: req.ConsumerType, + } + upstreamBody, err := json.Marshal(upstreamReq) + if err != nil { + log.Error(err, "failed to marshal upstream request") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + r.Body = io.NopCloser(bytes.NewReader(upstreamBody)) + rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)} + s.forward(rec, r) + if rec.statusCode >= 300 { + // Upstream failed — copy its response to the client. + for k, vs := range rec.header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(rec.statusCode) + w.Write(rec.body.Bytes()) //nolint:errcheck + return + } + } + + // Write KVM allocations to CRD. + if err := s.writeKVMBookings(ctx, consumerUUID, &req, kvmAllocs, kvmHypervisors); err != nil { + log.Error(err, "failed to write KVM bookings") + if apierrors.IsConflict(err) { + http.Error(w, "consumer generation conflict", http.StatusConflict) + return + } + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (s *Shim) updateAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var req allocationsRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + + kvmAllocs := make(map[string]allocationEntry) + kvmHypervisors := make(map[string]*hv1.Hypervisor) + + for rpUUID, entry := range req.Allocations { + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID) + http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest) + return + } + kvmAllocs[rpUUID] = entry + hv := hvs.Items[0] + kvmHypervisors[rpUUID] = &hv + } + + if err := s.writeKVMBookings(ctx, consumerUUID, &req, kvmAllocs, kvmHypervisors); err != nil { + log.Error(err, "failed to write KVM bookings") + if apierrors.IsConflict(err) { + http.Error(w, "consumer generation conflict", http.StatusConflict) + return + } + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// writeKVMBookings writes consumer bookings to the respective Hypervisor CRs. +// It performs consumer generation checks before writing. +func (s *Shim) writeKVMBookings( + ctx context.Context, + consumerUUID string, + req *allocationsRequest, + kvmAllocs map[string]allocationEntry, + kvmHypervisors map[string]*hv1.Hypervisor, +) error { + log := logf.FromContext(ctx) + + hvGR := schema.GroupResource{Group: "kvm.cloud.sap", Resource: "hypervisors"} + for rpUUID, entry := range kvmAllocs { + hv := kvmHypervisors[rpUUID] + existing := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) + + // Consumer generation check. + if existing == nil && req.ConsumerGeneration != nil { + return apierrors.NewConflict(hvGR, hv.Name, + fmt.Errorf("consumer %s does not exist but generation is non-null", consumerUUID)) + } + if existing != nil && req.ConsumerGeneration == nil { + return apierrors.NewConflict(hvGR, hv.Name, + fmt.Errorf("consumer %s exists but generation is null", consumerUUID)) + } + if existing != nil && req.ConsumerGeneration != nil && existing.ConsumerGeneration != nil { + if *req.ConsumerGeneration != *existing.ConsumerGeneration { + return apierrors.NewConflict(hvGR, hv.Name, + fmt.Errorf("consumer %s generation mismatch: got %d, want %d", consumerUUID, *req.ConsumerGeneration, *existing.ConsumerGeneration)) + } + } + + // Compute new generation. + var newGen int64 + if existing != nil && existing.ConsumerGeneration != nil { + newGen = *existing.ConsumerGeneration + 1 + } else { + newGen = 1 + } + + newBooking := hv1.Booking{ + Consumer: &hv1.ConsumerBooking{ + UUID: consumerUUID, + Resources: placementToHVResources(entry.Resources), + ConsumerGeneration: &newGen, + ConsumerType: req.ConsumerType, + ProjectID: req.ProjectID, + UserID: req.UserID, + }, + } + + // Replace or append. + var newBookings []hv1.Booking + replaced := false + for _, b := range hv.Spec.Bookings { + if b.Consumer != nil && b.Consumer.UUID == consumerUUID { + newBookings = append(newBookings, newBooking) + replaced = true + } else { + newBookings = append(newBookings, b) + } + } + if !replaced { + newBookings = append(newBookings, newBooking) + } + hv.Spec.Bookings = newBookings + + if err := s.Update(ctx, hv); err != nil { + log.Error(err, "failed to update hypervisor bookings", "hypervisor", hv.Name, "consumer", consumerUUID) + return err + } + log.Info("wrote consumer booking to hypervisor", "hypervisor", hv.Name, "consumer", consumerUUID, "rpUUID", rpUUID) + } + return nil +} + +// --------------------------------------------------------------------------- +// GET /allocations/{consumer_uuid} — hybrid and crd +// --------------------------------------------------------------------------- + +func (s *Shim) listAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + var upstreamResp allocationsResponse + if resp.StatusCode == http.StatusOK { + if err := json.NewDecoder(resp.Body).Decode(&upstreamResp); err != nil { + log.Error(err, "failed to decode upstream allocations response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + } else { + // If upstream returned non-200, initialize empty. + upstreamResp.Allocations = make(map[string]allocationEntry) + } + if upstreamResp.Allocations == nil { + upstreamResp.Allocations = make(map[string]allocationEntry) + } + + // Look up consumer in CRD. + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to look up consumer in CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Merge CRD bookings into the response. CRD takes precedence on collision. + for _, hv := range hvs.Items { + consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) + if consumer == nil { + continue + } + rpUUID := hv.Status.HypervisorID + upstreamResp.Allocations[rpUUID] = allocationEntry{ + Resources: hvToPlacementResources(consumer.Resources), + } + if upstreamResp.ConsumerGeneration == nil { + upstreamResp.ConsumerGeneration = consumer.ConsumerGeneration + } + if upstreamResp.ProjectID == "" { + upstreamResp.ProjectID = consumer.ProjectID + } + if upstreamResp.UserID == "" { + upstreamResp.UserID = consumer.UserID + } + } + + s.writeJSON(w, http.StatusOK, upstreamResp) + }) +} + +func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to look up consumer in CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } - s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations) + + resp := allocationsResponse{ + Allocations: make(map[string]allocationEntry), + } + + for _, hv := range hvs.Items { + consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) + if consumer == nil { + continue + } + rpUUID := hv.Status.HypervisorID + resp.Allocations[rpUUID] = allocationEntry{ + Resources: hvToPlacementResources(consumer.Resources), + } + resp.ConsumerGeneration = consumer.ConsumerGeneration + resp.ProjectID = consumer.ProjectID + resp.UserID = consumer.UserID + } + + s.writeJSON(w, http.StatusOK, resp) +} + +// --------------------------------------------------------------------------- +// DELETE /allocations/{consumer_uuid} — hybrid and crd +// --------------------------------------------------------------------------- + +func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + // Forward to upstream first. + rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)} + s.forward(rec, r) + // Upstream returning 404 is acceptable — the consumer may only exist in CRD. + if rec.statusCode >= 300 && rec.statusCode != http.StatusNotFound { + for k, vs := range rec.header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(rec.statusCode) + w.Write(rec.body.Bytes()) //nolint:errcheck + return + } + + // Remove from CRD. + if err := s.removeConsumerFromCRD(ctx, consumerUUID); err != nil { + log.Error(err, "failed to remove consumer from CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (s *Shim) deleteAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to look up consumer in CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + if len(hvs.Items) == 0 { + http.Error(w, "consumer not found", http.StatusNotFound) + return + } + + if err := s.removeConsumerFromCRD(ctx, consumerUUID); err != nil { + log.Error(err, "failed to remove consumer from CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// removeConsumerFromCRD removes all booking entries for a consumer from all +// Hypervisor CRs that hold it. +func (s *Shim) removeConsumerFromCRD(ctx context.Context, consumerUUID string) error { + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + for i := range hvs.Items { + hv := &hvs.Items[i] + var newBookings []hv1.Booking + for _, b := range hv.Spec.Bookings { + if b.Consumer != nil && b.Consumer.UUID == consumerUUID { + continue + } + newBookings = append(newBookings, b) + } + hv.Spec.Bookings = newBookings + if err := s.Update(ctx, hv); err != nil { + log.Error(err, "failed to update hypervisor after removing consumer", "hypervisor", hv.Name, "consumer", consumerUUID) + return err + } + log.Info("removed consumer booking from hypervisor", "hypervisor", hv.Name, "consumer", consumerUUID) + } + return nil +} + +// --------------------------------------------------------------------------- +// POST /allocations — hybrid and crd +// --------------------------------------------------------------------------- + +// manageAllocationsRequest represents the batch body for POST /allocations. +// It is keyed by consumer UUID. +type manageAllocationsRequest map[string]allocationsRequest + +func (s *Shim) manageAllocationsHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + log.Error(err, "failed to read request body") + http.Error(w, "failed to read request body", http.StatusBadRequest) + return + } + + var batch manageAllocationsRequest + if err := json.Unmarshal(bodyBytes, &batch); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + + // Separate KVM from non-KVM across all consumers. + type kvmWork struct { + consumerUUID string + req *allocationsRequest + kvmAllocs map[string]allocationEntry + kvmHypervisors map[string]*hv1.Hypervisor + } + var kvmWorkItems []kvmWork + nonKvmBatch := make(manageAllocationsRequest) + + for consumerUUID, consumerReq := range batch { + kvmA := make(map[string]allocationEntry) + nonKvmA := make(map[string]allocationEntry) + kvmHVs := make(map[string]*hv1.Hypervisor) + + for rpUUID, entry := range consumerReq.Allocations { + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 { + kvmA[rpUUID] = entry + hv := hvs.Items[0] + kvmHVs[rpUUID] = &hv + } else { + nonKvmA[rpUUID] = entry + } + } + + if len(nonKvmA) > 0 { + nonKvmBatch[consumerUUID] = allocationsRequest{ + Allocations: nonKvmA, + ConsumerGeneration: consumerReq.ConsumerGeneration, + ProjectID: consumerReq.ProjectID, + UserID: consumerReq.UserID, + ConsumerType: consumerReq.ConsumerType, + } + } + if len(kvmA) > 0 { + cr := consumerReq + kvmWorkItems = append(kvmWorkItems, kvmWork{ + consumerUUID: consumerUUID, + req: &cr, + kvmAllocs: kvmA, + kvmHypervisors: kvmHVs, + }) + } + } + + // Forward non-KVM portion to upstream first. + if len(nonKvmBatch) > 0 { + upstreamBody, err := json.Marshal(nonKvmBatch) + if err != nil { + log.Error(err, "failed to marshal upstream batch request") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + r.Body = io.NopCloser(bytes.NewReader(upstreamBody)) + rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)} + s.forward(rec, r) + if rec.statusCode >= 300 { + for k, vs := range rec.header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(rec.statusCode) + w.Write(rec.body.Bytes()) //nolint:errcheck + return + } + } + + // Write KVM bookings. + for _, work := range kvmWorkItems { + if err := s.writeKVMBookings(ctx, work.consumerUUID, work.req, work.kvmAllocs, work.kvmHypervisors); err != nil { + log.Error(err, "failed to write KVM bookings in batch", "consumer", work.consumerUUID) + if apierrors.IsConflict(err) { + http.Error(w, "consumer generation conflict", http.StatusConflict) + return + } + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + } + + w.WriteHeader(http.StatusNoContent) +} + +func (s *Shim) manageAllocationsCRD(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var batch manageAllocationsRequest + if err := json.NewDecoder(r.Body).Decode(&batch); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + + for consumerUUID, consumerReq := range batch { + kvmAllocs := make(map[string]allocationEntry) + kvmHypervisors := make(map[string]*hv1.Hypervisor) + + for rpUUID, entry := range consumerReq.Allocations { + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID) + http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest) + return + } + kvmAllocs[rpUUID] = entry + hv := hvs.Items[0] + kvmHypervisors[rpUUID] = &hv + } + + cr := consumerReq + if err := s.writeKVMBookings(ctx, consumerUUID, &cr, kvmAllocs, kvmHypervisors); err != nil { + log.Error(err, "failed to write KVM bookings in batch", "consumer", consumerUUID) + if apierrors.IsConflict(err) { + http.Error(w, "consumer generation conflict", http.StatusConflict) + return + } + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + } + + w.WriteHeader(http.StatusNoContent) +} + +// --------------------------------------------------------------------------- +// statusRecorder captures the upstream response so we can inspect status before +// committing to write the real response. +// --------------------------------------------------------------------------- + +type statusRecorder struct { + http.ResponseWriter + statusCode int + header http.Header + body bytes.Buffer +} + +func (r *statusRecorder) Header() http.Header { + return r.header +} + +func (r *statusRecorder) WriteHeader(code int) { + r.statusCode = code +} + +func (r *statusRecorder) Write(b []byte) (int, error) { + return r.body.Write(b) } diff --git a/internal/shim/placement/handle_allocations_e2e.go b/internal/shim/placement/handle_allocations_e2e.go index 27887ca7f..cb8cf3244 100644 --- a/internal/shim/placement/handle_allocations_e2e.go +++ b/internal/shim/placement/handle_allocations_e2e.go @@ -9,8 +9,12 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/cobaltcore-dev/cortex/pkg/conf" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "github.com/gophercloud/gophercloud/v2" + apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -18,20 +22,11 @@ import ( // e2eTestAllocations tests the /allocations/{consumer_uuid} and // POST /allocations (batch) endpoints. // -// 1. Pre-cleanup: DELETE leftover consumer allocations, RP, and custom RC. -// 2. Create fixtures: PUT a custom resource class, POST a test RP, PUT -// inventory on the RP (total=100). -// 3. GET /allocations/{consumer1} — verify allocations are empty. -// 4. PUT /allocations/{consumer1} — create an allocation of 10 units against -// the test RP using fake project/user IDs. -// 5. GET /allocations/{consumer1} — verify the allocation exists and points -// to the test RP. -// 6. POST /allocations — batch-create a second consumer's allocation of -// 5 units against the same RP. -// 7. GET /allocations/{consumer2} — verify the second allocation exists. -// 8. DELETE /allocations/{consumer} — remove allocations for both consumers. -// 9. Cleanup: DELETE the test RP and custom resource class. -func e2eTestAllocations(ctx context.Context, _ client.Client) error { +// In passthrough mode: exercises the upstream placement path with a +// dynamically created resource provider and custom resource class. +// In hybrid/crd mode: exercises the CRD-backed booking path using a +// real KVM hypervisor discovered from the cluster. +func e2eTestAllocations(ctx context.Context, cl client.Client) error { log := logf.FromContext(ctx) log.Info("Running allocations endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() @@ -47,6 +42,20 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { } log.Info("Successfully created openstack client for allocations e2e test") + mode := e2eCurrentMode(ctx) + switch mode { + case FeatureModePassthrough: + return e2ePassthroughAllocations(ctx, sc) + case FeatureModeHybrid, FeatureModeCRD: + return e2eCRDAllocations(ctx, sc, cl) + default: + return fmt.Errorf("unexpected mode %q", mode) + } +} + +func e2ePassthroughAllocations(ctx context.Context, sc *gophercloud.ServiceClient) error { + log := logf.FromContext(ctx) + const testRPUUID = "e2e10000-0000-0000-0000-000000000007" const testRPName = "cortex-e2e-test-rp-alloc" const testRC = "CUSTOM_CORTEX_E2E_ALLOC_RC" @@ -56,15 +65,6 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { const userID = "e2e50000-0000-0000-0000-000000000001" const apiVersion = "placement 1.28" - // Probe: for non-passthrough modes, verify endpoint returns 501. - unimplemented, err := e2eProbeUnimplemented(ctx, sc, sc.Endpoint+"/allocations/"+consumerUUID1) - if err != nil { - return fmt.Errorf("probe: %w", err) - } - if unimplemented { - return nil - } - // Pre-cleanup: delete allocations, resource provider, and resource class. log.Info("Pre-cleanup: deleting leftover test resources") for _, cleanup := range []struct { @@ -97,38 +97,28 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req, err := http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) if err != nil { - log.Error(err, "failed to create PUT request for resource class", "class", testRC) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", apiVersion) resp, err := sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request for resource class", "class", testRC) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT /resource_classes returned an error", "class", testRC) - return err + return fmt.Errorf("PUT /resource_classes: unexpected status %d", resp.StatusCode) } log.Info("Successfully created custom resource class", "class", testRC) - log.Info("Creating test resource provider for allocations test", - "uuid", testRPUUID, "name", testRPName) - body, err := json.Marshal(map[string]string{ - "name": testRPName, - "uuid": testRPUUID, - }) + log.Info("Creating test resource provider", "uuid", testRPUUID, "name", testRPName) + body, err := json.Marshal(map[string]string{"name": testRPName, "uuid": testRPUUID}) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPost, sc.Endpoint+"/resource_providers", bytes.NewReader(body)) if err != nil { - log.Error(err, "failed to create POST request for resource_providers") return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -137,23 +127,18 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send POST request to /resource_providers") return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "POST /resource_providers returned an error") - return err + return fmt.Errorf("POST /resource_providers: unexpected status %d", resp.StatusCode) } log.Info("Successfully created test resource provider", "uuid", testRPUUID) // Get the generation for the resource provider. - log.Info("Getting resource provider generation", "uuid", testRPUUID) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/inventories", http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP inventories") return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -161,28 +146,22 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP inventories") return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP inventories returned an error") - return err + return fmt.Errorf("GET RP inventories: unexpected status %d", resp.StatusCode) } var invResp struct { ResourceProviderGeneration int `json:"resource_provider_generation"` } - err = json.NewDecoder(resp.Body).Decode(&invResp) - if err != nil { - log.Error(err, "failed to decode RP inventories response") + if err := json.NewDecoder(resp.Body).Decode(&invResp); err != nil { return err } generation := invResp.ResourceProviderGeneration // Set inventory on the resource provider. - log.Info("Setting inventory on test resource provider", - "uuid", testRPUUID, "class", testRC, "total", 100) + log.Info("Setting inventory on test resource provider", "total", 100) putBody, err := json.Marshal(map[string]any{ "resource_provider_generation": generation, "inventories": map[string]any{ @@ -190,14 +169,12 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { }, }) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/resource_providers/"+testRPUUID+"/inventories", bytes.NewReader(putBody)) if err != nil { - log.Error(err, "failed to create PUT request for RP inventories") return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -206,23 +183,19 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request for RP inventories") return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT RP inventories returned an error") - return err + return fmt.Errorf("PUT RP inventories: unexpected status %d", resp.StatusCode) } - log.Info("Successfully set inventory on test resource provider", "uuid", testRPUUID) + log.Info("Successfully set inventory on test resource provider") // Test GET /allocations/{consumer_uuid} (empty). - log.Info("Testing GET /allocations/{consumer_uuid} (empty)", "consumer", consumerUUID1) + log.Info("Testing GET /allocations (empty)", "consumer", consumerUUID1) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID1, http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for allocations", "consumer", consumerUUID1) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -230,50 +203,37 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for allocations", "consumer", consumerUUID1) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET /allocations returned an error", "consumer", consumerUUID1) - return err + return fmt.Errorf("GET /allocations (empty): unexpected status %d", resp.StatusCode) } var allocResp struct { Allocations map[string]json.RawMessage `json:"allocations"` } - err = json.NewDecoder(resp.Body).Decode(&allocResp) - if err != nil { - log.Error(err, "failed to decode allocations response", "consumer", consumerUUID1) + if err := json.NewDecoder(resp.Body).Decode(&allocResp); err != nil { return err } - log.Info("Successfully retrieved empty allocations for consumer", - "consumer", consumerUUID1, "allocationCount", len(allocResp.Allocations)) + log.Info("Successfully retrieved empty allocations", "count", len(allocResp.Allocations)) // Test PUT /allocations/{consumer_uuid} (create allocation). - log.Info("Testing PUT /allocations/{consumer_uuid} to create allocation", - "consumer", consumerUUID1, "rp", testRPUUID, "amount", 10) + log.Info("Testing PUT /allocations (create)", "consumer", consumerUUID1) allocBody, err := json.Marshal(map[string]any{ "allocations": map[string]any{ - testRPUUID: map[string]any{ - "resources": map[string]int{ - testRC: 10, - }, - }, + testRPUUID: map[string]any{"resources": map[string]int{testRC: 10}}, }, "project_id": projectID, "user_id": userID, "consumer_generation": nil, }) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/allocations/"+consumerUUID1, bytes.NewReader(allocBody)) if err != nil { - log.Error(err, "failed to create PUT request for allocations", "consumer", consumerUUID1) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -282,25 +242,19 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request for allocations", "consumer", consumerUUID1) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT /allocations returned an error", "consumer", consumerUUID1) - return err + return fmt.Errorf("PUT /allocations (create): unexpected status %d", resp.StatusCode) } - log.Info("Successfully created allocation for consumer", - "consumer", consumerUUID1, "rp", testRPUUID) + log.Info("Successfully created allocation", "consumer", consumerUUID1) // Test GET /allocations/{consumer_uuid} (after PUT). - log.Info("Testing GET /allocations/{consumer_uuid} (after PUT)", - "consumer", consumerUUID1) + log.Info("Testing GET /allocations (after PUT)", "consumer", consumerUUID1) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID1, http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for allocations", "consumer", consumerUUID1) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -308,39 +262,26 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for allocations", "consumer", consumerUUID1) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET /allocations returned an error", "consumer", consumerUUID1) - return err + return fmt.Errorf("GET /allocations (after PUT): unexpected status %d", resp.StatusCode) } - err = json.NewDecoder(resp.Body).Decode(&allocResp) - if err != nil { - log.Error(err, "failed to decode allocations response", "consumer", consumerUUID1) + if err := json.NewDecoder(resp.Body).Decode(&allocResp); err != nil { return err } if _, ok := allocResp.Allocations[testRPUUID]; !ok { - err := fmt.Errorf("expected allocation against RP %s", testRPUUID) - log.Error(err, "allocation not found", "consumer", consumerUUID1) - return err + return fmt.Errorf("expected allocation against RP %s, got keys %v", testRPUUID, allocResp.Allocations) } - log.Info("Successfully verified allocation for consumer", - "consumer", consumerUUID1, "allocationCount", len(allocResp.Allocations)) + log.Info("Verified allocation exists after PUT") // Test POST /allocations (batch manage) — create a second consumer. - log.Info("Testing POST /allocations (batch) to create second consumer allocation", - "consumer", consumerUUID2, "rp", testRPUUID, "amount", 5) + log.Info("Testing POST /allocations (batch)", "consumer", consumerUUID2) batchBody, err := json.Marshal(map[string]any{ consumerUUID2: map[string]any{ "allocations": map[string]any{ - testRPUUID: map[string]any{ - "resources": map[string]int{ - testRC: 5, - }, - }, + testRPUUID: map[string]any{"resources": map[string]int{testRC: 5}}, }, "project_id": projectID, "user_id": userID, @@ -348,14 +289,12 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { }, }) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPost, sc.Endpoint+"/allocations", bytes.NewReader(batchBody)) if err != nil { - log.Error(err, "failed to create POST request for batch allocations") return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -364,24 +303,18 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send POST request for batch allocations") return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "POST /allocations returned an error") - return err + return fmt.Errorf("POST /allocations (batch): unexpected status %d", resp.StatusCode) } - log.Info("Successfully created batch allocation for second consumer", - "consumer", consumerUUID2) + log.Info("Successfully created batch allocation", "consumer", consumerUUID2) // Verify the second consumer's allocation. - log.Info("Verifying second consumer's allocation", "consumer", consumerUUID2) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID2, http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for allocations", "consumer", consumerUUID2) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -389,52 +322,39 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for allocations", "consumer", consumerUUID2) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET /allocations returned an error", "consumer", consumerUUID2) - return err + return fmt.Errorf("GET /allocations (consumer2): unexpected status %d", resp.StatusCode) } - err = json.NewDecoder(resp.Body).Decode(&allocResp) - if err != nil { - log.Error(err, "failed to decode allocations response", "consumer", consumerUUID2) + if err := json.NewDecoder(resp.Body).Decode(&allocResp); err != nil { return err } if _, ok := allocResp.Allocations[testRPUUID]; !ok { - err := fmt.Errorf("expected allocation against RP %s", testRPUUID) - log.Error(err, "allocation not found for second consumer", "consumer", consumerUUID2) - return err + return fmt.Errorf("expected allocation for consumer2 against RP %s", testRPUUID) } - log.Info("Successfully verified second consumer's allocation", - "consumer", consumerUUID2) + log.Info("Verified second consumer's allocation") // Test DELETE /allocations/{consumer_uuid} for both consumers. for _, consumer := range []string{consumerUUID1, consumerUUID2} { - log.Info("Testing DELETE /allocations/{consumer_uuid}", "consumer", consumer) + log.Info("Testing DELETE /allocations", "consumer", consumer) req, err = http.NewRequestWithContext(ctx, http.MethodDelete, sc.Endpoint+"/allocations/"+consumer, http.NoBody) if err != nil { - log.Error(err, "failed to create DELETE request for allocations", "consumer", consumer) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", apiVersion) resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for allocations", "consumer", consumer) return err } + resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - resp.Body.Close() - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE /allocations returned an error", "consumer", consumer) - return err + return fmt.Errorf("DELETE /allocations: unexpected status %d for consumer %s", resp.StatusCode, consumer) } - resp.Body.Close() - log.Info("Successfully deleted allocation for consumer", "consumer", consumer) + log.Info("Successfully deleted allocation", "consumer", consumer) } // Cleanup: delete the resource provider and custom resource class. @@ -442,48 +362,379 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error { req, err = http.NewRequestWithContext(ctx, http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody) if err != nil { - log.Error(err, "failed to create DELETE request for resource provider", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", apiVersion) resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for resource provider", "uuid", testRPUUID) + return err + } + resp.Body.Close() + + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return err + } + resp.Body.Close() + log.Info("Cleanup complete") + + return nil +} + +// e2eCRDAllocations tests the CRD/hybrid path by discovering a real KVM +// hypervisor in the cluster, writing a booking to it, and then exercising +// GET/PUT/DELETE/POST through the shim's allocation handlers. +func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl client.Client) error { + log := logf.FromContext(ctx) + + const consumerUUID = "e2e20000-0000-0000-0000-000000000010" + const consumerUUID2 = "e2e20000-0000-0000-0000-000000000011" + const projectID = "e2e40000-0000-0000-0000-000000000001" + const userID = "e2e50000-0000-0000-0000-000000000001" + const apiVersion = "placement 1.28" + + // Discover a KVM hypervisor with a non-empty OpenStack ID. + var hvs hv1.HypervisorList + if err := cl.List(ctx, &hvs); err != nil { + return fmt.Errorf("failed to list hypervisors: %w", err) + } + var kvmHV *hv1.Hypervisor + for i := range hvs.Items { + if hvs.Items[i].Status.HypervisorID != "" { + kvmHV = &hvs.Items[i] + break + } + } + if kvmHV == nil { + log.Info("No KVM hypervisors with OpenStack ID found, skipping CRD allocations tests") + return nil + } + kvmUUID := kvmHV.Status.HypervisorID + log.Info("Using KVM hypervisor for CRD allocations e2e", "uuid", kvmUUID, "name", kvmHV.Name) + + // Save original bookings for restoration. + originalBookings := kvmHV.Spec.Bookings + + // Always restore original bookings on exit. + defer func() { + log.Info("Restoring original bookings", "name", kvmHV.Name) + for range 5 { + if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil { + log.Error(err, "failed to refetch hypervisor for restoration") + return + } + kvmHV.Spec.Bookings = originalBookings + if err := cl.Update(ctx, kvmHV); err != nil { + if apierrors.IsConflict(err) { + continue + } + log.Error(err, "failed to restore original bookings") + return + } + return + } + log.Error(nil, "exhausted retries restoring original bookings") + }() + + // Pre-cleanup: remove any leftover test bookings from prior runs. + kvmHV.Spec.Bookings = removeTestBookings(kvmHV.Spec.Bookings, consumerUUID, consumerUUID2) + if err := cl.Update(ctx, kvmHV); err != nil { + return fmt.Errorf("pre-cleanup: failed to remove leftover bookings: %w", err) + } + if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil { + return fmt.Errorf("failed to refetch hypervisor after pre-cleanup: %w", err) + } + + // 1. Test GET /allocations/{consumer_uuid} — empty (consumer not booked). + log.Info("Testing GET /allocations (empty, CRD)", "consumer", consumerUUID) + if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody) + if err != nil { + return false, err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (empty): expected 200, got %d", resp.StatusCode) + } + var r allocationsResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return false, err + } + return len(r.Allocations) == 0, nil + }); err != nil { + return fmt.Errorf("GET empty allocations: %w", err) + } + log.Info("Verified empty allocations for unbooked consumer") + + // 2. Test PUT /allocations/{consumer_uuid} — create allocation (new consumer). + log.Info("Testing PUT /allocations (create, CRD)", "consumer", consumerUUID, "rp", kvmUUID) + allocBody, err := json.Marshal(map[string]any{ + "allocations": map[string]any{ + kvmUUID: map[string]any{"resources": map[string]int64{"VCPU": 2, "MEMORY_MB": 4096}}, + }, + "project_id": projectID, + "user_id": userID, + "consumer_generation": nil, + }) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/allocations/"+consumerUUID, + bytes.NewReader(allocBody)) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE resource provider returned an error", "uuid", testRPUUID) + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("PUT /allocations (create): expected 204, got %d", resp.StatusCode) + } + log.Info("Successfully created allocation via PUT (CRD)") + + // 3. Test GET /allocations/{consumer_uuid} — verify booking present. + log.Info("Testing GET /allocations (after PUT, CRD)", "consumer", consumerUUID) + var getResp allocationsResponse + if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody) + if err != nil { + return false, err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (after PUT): expected 200, got %d", resp.StatusCode) + } + if err := json.NewDecoder(resp.Body).Decode(&getResp); err != nil { + return false, err + } + _, ok := getResp.Allocations[kvmUUID] + return ok, nil + }); err != nil { + return fmt.Errorf("GET allocations after PUT: %w (resp: %+v)", err, getResp) + } + if getResp.Allocations[kvmUUID].Resources["VCPU"] != 2 { + return fmt.Errorf("VCPU = %d, want 2", getResp.Allocations[kvmUUID].Resources["VCPU"]) + } + if getResp.Allocations[kvmUUID].Resources["MEMORY_MB"] != 4096 { + return fmt.Errorf("MEMORY_MB = %d, want 4096", getResp.Allocations[kvmUUID].Resources["MEMORY_MB"]) + } + log.Info("Verified allocation present after PUT (CRD)") + + // 4. Test PUT with wrong consumer_generation — should 409. + log.Info("Testing PUT /allocations (stale generation, CRD)") + staleBody, err := json.Marshal(map[string]any{ + "allocations": map[string]any{ + kvmUUID: map[string]any{"resources": map[string]int64{"VCPU": 8}}, + }, + "project_id": projectID, + "user_id": userID, + "consumer_generation": 999, + }) + if err != nil { return err } - log.Info("Successfully deleted test resource provider", "uuid", testRPUUID) + req, err = http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/allocations/"+consumerUUID, + bytes.NewReader(staleBody)) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + return fmt.Errorf("PUT /allocations (stale gen): expected 409, got %d", resp.StatusCode) + } + log.Info("Verified generation conflict returns 409 (CRD)") + // 5. Test POST /allocations (batch) — create a second consumer. + log.Info("Testing POST /allocations (batch, CRD)", "consumer", consumerUUID2) + batchBody, err := json.Marshal(map[string]any{ + consumerUUID2: map[string]any{ + "allocations": map[string]any{ + kvmUUID: map[string]any{"resources": map[string]int64{"VCPU": 1, "MEMORY_MB": 2048}}, + }, + "project_id": projectID, + "user_id": userID, + "consumer_generation": nil, + }, + }) + if err != nil { + return err + } req, err = http.NewRequestWithContext(ctx, - http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody) + http.MethodPost, sc.Endpoint+"/allocations", + bytes.NewReader(batchBody)) if err != nil { - log.Error(err, "failed to create DELETE request for resource class", "class", testRC) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for resource class", "class", testRC) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE resource class returned an error", "class", testRC) + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("POST /allocations (batch): expected 204, got %d", resp.StatusCode) + } + log.Info("Successfully created batch allocation (CRD)") + + // Verify second consumer's allocation. + var getResp2 allocationsResponse + if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID2, http.NoBody) + if err != nil { + return false, err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (consumer2): expected 200, got %d", resp.StatusCode) + } + if err := json.NewDecoder(resp.Body).Decode(&getResp2); err != nil { + return false, err + } + _, ok := getResp2.Allocations[kvmUUID] + return ok, nil + }); err != nil { + return fmt.Errorf("GET allocations (consumer2): %w", err) + } + log.Info("Verified second consumer's allocation (CRD)") + + // 6. Test DELETE /allocations/{consumer_uuid}. + for _, consumer := range []string{consumerUUID, consumerUUID2} { + log.Info("Testing DELETE /allocations (CRD)", "consumer", consumer) + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/allocations/"+consumer, http.NoBody) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return err + } + resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("DELETE /allocations: expected 204, got %d for consumer %s", resp.StatusCode, consumer) + } + log.Info("Successfully deleted allocation (CRD)", "consumer", consumer) + } + + // 7. Verify GET after DELETE returns empty. + if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody) + if err != nil { + return false, err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (post-delete): expected 200, got %d", resp.StatusCode) + } + var r allocationsResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return false, err + } + return len(r.Allocations) == 0, nil + }); err != nil { + return fmt.Errorf("GET allocations after DELETE: %w", err) + } + log.Info("Verified allocations empty after DELETE (CRD)") + + // 8. Test DELETE /allocations for unknown consumer — should 404. + unknownConsumer := "e2e20000-0000-0000-0000-ffffffffffff" + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/allocations/"+unknownConsumer, http.NoBody) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", apiVersion) + resp, err = sc.HTTPClient.Do(req) + if err != nil { return err } - log.Info("Successfully deleted custom resource class", "class", testRC) + resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + return fmt.Errorf("DELETE /allocations (unknown): expected 404, got %d", resp.StatusCode) + } + log.Info("Verified DELETE for unknown consumer returns 404 (CRD)") return nil } +// removeTestBookings removes consumer bookings matching any of the given UUIDs. +func removeTestBookings(bookings []hv1.Booking, uuids ...string) []hv1.Booking { + uuidSet := make(map[string]bool, len(uuids)) + for _, u := range uuids { + uuidSet[u] = true + } + var kept []hv1.Booking + for i := range bookings { + if bookings[i].Consumer != nil && uuidSet[bookings[i].Consumer.UUID] { + continue + } + kept = append(kept, bookings[i]) + } + return kept +} + func init() { e2eTests = append(e2eTests, e2eTest{name: "allocations", run: e2eWrapWithModes(e2eTestAllocations)}) } diff --git a/internal/shim/placement/handle_allocations_test.go b/internal/shim/placement/handle_allocations_test.go index e39d5c43a..e32be5747 100644 --- a/internal/shim/placement/handle_allocations_test.go +++ b/internal/shim/placement/handle_allocations_test.go @@ -4,10 +4,22 @@ package placement import ( + "encoding/json" "net/http" + "net/http/httptest" + "strings" "testing" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) +// --------------------------------------------------------------------------- +// Passthrough mode tests (unchanged behavior) +// --------------------------------------------------------------------------- + func TestHandleManageAllocations(t *testing.T) { var gotPath string s := newTestShim(t, http.StatusNoContent, "", &gotPath) @@ -77,84 +89,330 @@ func TestHandleDeleteAllocations(t *testing.T) { }) } -func TestHandleAllocations_HybridMode(t *testing.T) { - down, up := newTestTimers() - s := &Shim{ - config: config{ - PlacementURL: "http://should-not-be-called:1234", - Features: featuresConfig{Allocations: FeatureModeHybrid}, +// --------------------------------------------------------------------------- +// Helper to create a test shim with allocations feature mode set. +// --------------------------------------------------------------------------- + +const ( + testConsumerUUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + testRPUUID = "11111111-2222-3333-4444-555555555555" + testRPUUID2 = "66666666-7777-8888-9999-aaaaaaaaaaaa" +) + +func testHypervisorWithBooking(name, openstackID, consumerUUID string, vcpu int64, memMB int64) *hv1.Hypervisor { + gen := int64(1) + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Status: hv1.HypervisorStatus{HypervisorID: openstackID}, + Spec: hv1.HypervisorSpec{ + Bookings: []hv1.Booking{ + {Consumer: &hv1.ConsumerBooking{ + UUID: consumerUUID, + Resources: map[hv1.ResourceName]resource.Quantity{hv1.ResourceCPU: *resource.NewQuantity(vcpu, resource.DecimalSI), hv1.ResourceMemory: *resource.NewQuantity(memMB*1024*1024, resource.BinarySI)}, + ConsumerGeneration: &gen, + ProjectID: "proj-1", + UserID: "user-1", + }}, + }, }, - maxBodyLogSize: 4096, - downstreamRequestTimer: down, - upstreamRequestTimer: up, } - t.Run("POST returns 501", func(t *testing.T) { - w := serveHandler(t, "POST", "/allocations", - s.HandleManageAllocations, "/allocations") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("GET returns 501", func(t *testing.T) { - w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", - s.HandleListAllocations, "/allocations/"+validUUID) - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("PUT returns 501", func(t *testing.T) { - w := serveHandler(t, "PUT", "/allocations/{consumer_uuid}", - s.HandleUpdateAllocations, "/allocations/"+validUUID) - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("DELETE returns 501", func(t *testing.T) { - w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", - s.HandleDeleteAllocations, "/allocations/"+validUUID) - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) } -func TestHandleAllocations_CRDMode(t *testing.T) { +func newAllocationsTestShim(t *testing.T, mode FeatureMode, upstreamStatus int, upstreamBody string, hvs ...client.Object) *Shim { + t.Helper() + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(upstreamStatus) + w.Write([]byte(upstreamBody)) //nolint:errcheck + })) + t.Cleanup(upstream.Close) down, up := newTestTimers() - s := &Shim{ + return &Shim{ + Client: newFakeClient(t, hvs...), config: config{ - PlacementURL: "http://should-not-be-called:1234", - Features: featuresConfig{Allocations: FeatureModeCRD}, + PlacementURL: upstream.URL, + Features: featuresConfig{Allocations: mode}, }, + httpClient: upstream.Client(), maxBodyLogSize: 4096, downstreamRequestTimer: down, upstreamRequestTimer: up, } - t.Run("POST returns 501", func(t *testing.T) { - w := serveHandler(t, "POST", "/allocations", - s.HandleManageAllocations, "/allocations") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("GET returns 501", func(t *testing.T) { - w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", - s.HandleListAllocations, "/allocations/"+validUUID) - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("PUT returns 501", func(t *testing.T) { - w := serveHandler(t, "PUT", "/allocations/{consumer_uuid}", - s.HandleUpdateAllocations, "/allocations/"+validUUID) - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) - t.Run("DELETE returns 501", func(t *testing.T) { - w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", - s.HandleDeleteAllocations, "/allocations/"+validUUID) - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) - } - }) +} + +// --------------------------------------------------------------------------- +// CRD mode: PUT +// --------------------------------------------------------------------------- + +func TestUpdateAllocations_CRD_NewConsumer(t *testing.T) { + hv := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-node-1"}, + Status: hv1.HypervisorStatus{HypervisorID: testRPUUID}, + } + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) + + body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4,"MEMORY_MB":8192}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}` + w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}", + s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body)) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String()) + } +} + +func TestUpdateAllocations_CRD_GenerationMismatch(t *testing.T) { + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 2, 4096) + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) + + wrongGen := int64(99) + _ = wrongGen + body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4,"MEMORY_MB":8192}}},"consumer_generation":99,"project_id":"proj-1","user_id":"user-1"}` + w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}", + s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body)) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusConflict, w.Body.String()) + } +} + +func TestUpdateAllocations_CRD_UnknownRP(t *testing.T) { + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "") + + body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}` + w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}", + s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body)) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusBadRequest, w.Body.String()) + } +} + +// --------------------------------------------------------------------------- +// CRD mode: GET +// --------------------------------------------------------------------------- + +func TestListAllocations_CRD_Found(t *testing.T) { + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) + + w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", + s.HandleListAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp allocationsResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if _, ok := resp.Allocations[testRPUUID]; !ok { + t.Fatalf("expected allocation for RP %s, got %v", testRPUUID, resp.Allocations) + } + if resp.Allocations[testRPUUID].Resources["VCPU"] != 4 { + t.Fatalf("VCPU = %d, want 4", resp.Allocations[testRPUUID].Resources["VCPU"]) + } + if resp.Allocations[testRPUUID].Resources["MEMORY_MB"] != 8192 { + t.Fatalf("MEMORY_MB = %d, want 8192", resp.Allocations[testRPUUID].Resources["MEMORY_MB"]) + } +} + +func TestListAllocations_CRD_NotFound(t *testing.T) { + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "") + + w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", + s.HandleListAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp allocationsResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Allocations) != 0 { + t.Fatalf("expected empty allocations, got %v", resp.Allocations) + } +} + +func TestListAllocations_CRD_MultiCR(t *testing.T) { + hv1obj := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + hv2obj := testHypervisorWithBooking("hv-node-2", testRPUUID2, testConsumerUUID, 2, 4096) + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv1obj, hv2obj) + + w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", + s.HandleListAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp allocationsResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Allocations) != 2 { + t.Fatalf("expected 2 allocations, got %d: %v", len(resp.Allocations), resp.Allocations) + } +} + +// --------------------------------------------------------------------------- +// CRD mode: DELETE +// --------------------------------------------------------------------------- + +func TestDeleteAllocations_CRD_Found(t *testing.T) { + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) + + w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", + s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String()) + } +} + +func TestDeleteAllocations_CRD_NotFound(t *testing.T) { + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "") + + w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", + s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNotFound, w.Body.String()) + } +} + +// --------------------------------------------------------------------------- +// Hybrid mode: PUT (KVM-only allocation, skips upstream) +// --------------------------------------------------------------------------- + +func TestUpdateAllocations_Hybrid_KVMOnly(t *testing.T) { + hv := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-node-1"}, + Status: hv1.HypervisorStatus{HypervisorID: testRPUUID}, + } + s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusInternalServerError, "should not be called", hv) + + body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4,"MEMORY_MB":8192}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}` + w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}", + s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body)) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String()) + } +} + +// --------------------------------------------------------------------------- +// Hybrid mode: PUT (non-KVM allocation, forwards to upstream) +// --------------------------------------------------------------------------- + +func TestUpdateAllocations_Hybrid_NonKVMOnly(t *testing.T) { + s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusNoContent, "") + + unknownRP := "99999999-9999-9999-9999-999999999999" + body := `{"allocations":{"` + unknownRP + `":{"resources":{"VCPU":4}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}` + w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}", + s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body)) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String()) + } +} + +// --------------------------------------------------------------------------- +// Hybrid mode: GET (merges upstream + CRD) +// --------------------------------------------------------------------------- + +func TestListAllocations_Hybrid_Merge(t *testing.T) { + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + upstreamBody := `{"allocations":{"upstream-rp-uuid":{"resources":{"VCPU":2}}},"consumer_generation":1,"project_id":"proj-1","user_id":"user-1"}` + s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusOK, upstreamBody, hv) + + w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", + s.HandleListAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp allocationsResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Allocations) != 2 { + t.Fatalf("expected 2 allocations (upstream + CRD), got %d: %v", len(resp.Allocations), resp.Allocations) + } + if _, ok := resp.Allocations["upstream-rp-uuid"]; !ok { + t.Fatal("expected upstream allocation in merged response") + } + if _, ok := resp.Allocations[testRPUUID]; !ok { + t.Fatal("expected CRD allocation in merged response") + } +} + +// --------------------------------------------------------------------------- +// Hybrid mode: DELETE (upstream first, then CRD) +// --------------------------------------------------------------------------- + +func TestDeleteAllocations_Hybrid(t *testing.T) { + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusNoContent, "", hv) + + w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", + s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String()) + } +} + +func TestDeleteAllocations_Hybrid_UpstreamFails(t *testing.T) { + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusInternalServerError, "upstream error", hv) + + w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", + s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID) + if w.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusInternalServerError, w.Body.String()) + } +} + +// --------------------------------------------------------------------------- +// Resource translation unit tests +// --------------------------------------------------------------------------- + +func TestPlacementToHVResources(t *testing.T) { + in := map[string]int64{"VCPU": 4, "MEMORY_MB": 8192} + out := placementToHVResources(in) + if out[hv1.ResourceCPU] != *resource.NewQuantity(4, resource.DecimalSI) { + t.Fatalf("cpu = %v, want 4", out[hv1.ResourceCPU]) + } + expectedMem := resource.NewQuantity(8192*1024*1024, resource.BinarySI) + gotMem := out[hv1.ResourceMemory] + if gotMem.Cmp(*expectedMem) != 0 { + t.Fatalf("memory = %v, want %v", gotMem, expectedMem) + } +} + +func TestHVToPlacementResources(t *testing.T) { + in := map[hv1.ResourceName]resource.Quantity{ + hv1.ResourceCPU: *resource.NewQuantity(4, resource.DecimalSI), + hv1.ResourceMemory: *resource.NewQuantity(8192*1024*1024, resource.BinarySI), + } + out := hvToPlacementResources(in) + if out["VCPU"] != 4 { + t.Fatalf("VCPU = %d, want 4", out["VCPU"]) + } + if out["MEMORY_MB"] != 8192 { + t.Fatalf("MEMORY_MB = %d, want 8192", out["MEMORY_MB"]) + } +} + +// --------------------------------------------------------------------------- +// CRD mode: POST (batch) +// --------------------------------------------------------------------------- + +func TestManageAllocations_CRD(t *testing.T) { + hv := &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: "hv-node-1"}, + Status: hv1.HypervisorStatus{HypervisorID: testRPUUID}, + } + s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) + + body := `{"` + testConsumerUUID + `":{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":2}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}}` + w := serveHandlerWithBody(t, "POST", "/allocations", + s.HandleManageAllocations, "/allocations", strings.NewReader(body)) + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String()) + } } diff --git a/internal/shim/placement/handle_resource_providers_test.go b/internal/shim/placement/handle_resource_providers_test.go index 8c73db39e..37cde7e09 100644 --- a/internal/shim/placement/handle_resource_providers_test.go +++ b/internal/shim/placement/handle_resource_providers_test.go @@ -81,6 +81,21 @@ func newFakeClient(t *testing.T, objs ...client.Object) client.Client { } return []string{hv.Name} }) + builder = builder.WithIndex(&hv1.Hypervisor{}, idxBookingConsumerUUID, func(obj client.Object) []string { + hv, ok := obj.(*hv1.Hypervisor) + if !ok { + return nil + } + consumers := hv1.GetConsumers(hv.Spec.Bookings) + if len(consumers) == 0 { + return nil + } + uuids := make([]string, 0, len(consumers)) + for _, c := range consumers { + uuids = append(uuids, c.UUID) + } + return uuids + }) return builder.Build() } From 4e42d6f25d9a06a512b305d8a6b722012138c738 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Mon, 4 May 2026 13:43:12 +0200 Subject: [PATCH 2/3] Improve allocations handler code organization and robustness - Reorganize file so lowercase handlers sit directly below their corresponding exported Handle* functions - Remove section divider comments between function groups - Add doc comments to all unexported handler functions - Fix listAllocationsHybrid to bubble up non-OK upstream responses instead of falling through to CRD queries - Fix deleteAllocationsHybrid to check CRD existence first and use statusRecorder consistently with other hybrid handlers - Replace //nolint:errcheck with proper error handling - Use hv1.GroupVersion for GroupResource instead of hardcoded literal - Fix lint issues: variable shadowing in e2e closures, unused imports, constant parameters (e2ePollUntil, testHypervisorWithBooking) - Add info logging at key decision points in hybrid/crd handlers --- internal/shim/placement/handle_allocations.go | 595 +++++++++--------- .../shim/placement/handle_allocations_e2e.go | 63 +- .../shim/placement/handle_allocations_test.go | 20 +- ...handle_resource_provider_aggregates_e2e.go | 3 +- .../handle_resource_provider_traits_e2e.go | 3 +- internal/shim/placement/shim_e2e.go | 9 +- 6 files changed, 363 insertions(+), 330 deletions(-) diff --git a/internal/shim/placement/handle_allocations.go b/internal/shim/placement/handle_allocations.go index a5bf35c37..e4b7393da 100644 --- a/internal/shim/placement/handle_allocations.go +++ b/internal/shim/placement/handle_allocations.go @@ -12,9 +12,8 @@ import ( "net/http" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" - "k8s.io/apimachinery/pkg/api/resource" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/api/resource" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -111,6 +110,165 @@ func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) { } } +// manageAllocationsRequest represents the batch body for POST /allocations. +// It is keyed by consumer UUID. +type manageAllocationsRequest map[string]allocationsRequest + +// manageAllocationsHybrid handles the batch POST by splitting each consumer's +// allocations into KVM and non-KVM sets. Non-KVM allocations are forwarded to +// upstream as a batch; KVM allocations are written to Hypervisor CRDs. +func (s *Shim) manageAllocationsHybrid(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + log.Error(err, "failed to read request body") + http.Error(w, "failed to read request body", http.StatusBadRequest) + return + } + + var batch manageAllocationsRequest + if err := json.Unmarshal(bodyBytes, &batch); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + + log.Info("managing batch allocations (hybrid)", "consumers", len(batch)) + + // Separate KVM from non-KVM across all consumers. + type kvmWork struct { + consumerUUID string + req *allocationsRequest + kvmAllocs map[string]allocationEntry + kvmHypervisors map[string]*hv1.Hypervisor + } + var kvmWorkItems []kvmWork + nonKvmBatch := make(manageAllocationsRequest) + + for consumerUUID, consumerReq := range batch { + kvmA := make(map[string]allocationEntry) + nonKvmA := make(map[string]allocationEntry) + kvmHVs := make(map[string]*hv1.Hypervisor) + + for rpUUID, entry := range consumerReq.Allocations { + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 { + kvmA[rpUUID] = entry + hv := hvs.Items[0] + kvmHVs[rpUUID] = &hv + } else { + nonKvmA[rpUUID] = entry + } + } + + if len(nonKvmA) > 0 { + nonKvmBatch[consumerUUID] = allocationsRequest{ + Allocations: nonKvmA, + ConsumerGeneration: consumerReq.ConsumerGeneration, + ProjectID: consumerReq.ProjectID, + UserID: consumerReq.UserID, + ConsumerType: consumerReq.ConsumerType, + } + } + if len(kvmA) > 0 { + cr := consumerReq + kvmWorkItems = append(kvmWorkItems, kvmWork{ + consumerUUID: consumerUUID, + req: &cr, + kvmAllocs: kvmA, + kvmHypervisors: kvmHVs, + }) + } + } + + // Forward non-KVM portion to upstream first. + if len(nonKvmBatch) > 0 { + log.Info("forwarding non-KVM allocations to upstream (hybrid batch)", "consumers", len(nonKvmBatch)) + upstreamBody, err := json.Marshal(nonKvmBatch) + if err != nil { + log.Error(err, "failed to marshal upstream batch request") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + r.Body = io.NopCloser(bytes.NewReader(upstreamBody)) + rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)} + s.forward(rec, r) + if rec.statusCode >= 300 { + for k, vs := range rec.header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(rec.statusCode) + if _, err := w.Write(rec.body.Bytes()); err != nil { + log.Error(err, "failed to write response body") + } + return + } + } + + // Write KVM bookings. + for _, work := range kvmWorkItems { + if err := s.writeKVMBookings(ctx, work.consumerUUID, work.req, work.kvmAllocs, work.kvmHypervisors); err != nil { + log.Error(err, "failed to write KVM bookings in batch", "consumer", work.consumerUUID) + if apierrors.IsConflict(err) { + http.Error(w, "consumer generation conflict", http.StatusConflict) + return + } + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + } + + w.WriteHeader(http.StatusNoContent) +} + +// manageAllocationsCRD handles the batch POST exclusively via CRDs. All +// resource providers across all consumers must resolve to known Hypervisor CRs. +func (s *Shim) manageAllocationsCRD(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var batch manageAllocationsRequest + if err := json.NewDecoder(r.Body).Decode(&batch); err != nil { + http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) + return + } + + log.Info("managing batch allocations (crd)", "consumers", len(batch)) + + for consumerUUID, consumerReq := range batch { + kvmAllocs := make(map[string]allocationEntry) + kvmHypervisors := make(map[string]*hv1.Hypervisor) + + for rpUUID, entry := range consumerReq.Allocations { + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID) + http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest) + return + } + kvmAllocs[rpUUID] = entry + hv := hvs.Items[0] + kvmHypervisors[rpUUID] = &hv + } + + cr := consumerReq + if err := s.writeKVMBookings(ctx, consumerUUID, &cr, kvmAllocs, kvmHypervisors); err != nil { + log.Error(err, "failed to write KVM bookings in batch", "consumer", consumerUUID) + if apierrors.IsConflict(err) { + http.Error(w, "consumer generation conflict", http.StatusConflict) + return + } + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + } + + w.WriteHeader(http.StatusNoContent) +} + // HandleListAllocations handles GET /allocations/{consumer_uuid} requests. // // Returns all allocation records for the consumer identified by @@ -141,6 +299,110 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) { } } +// listAllocationsHybrid merges allocations from upstream Placement with +// bookings stored in Hypervisor CRDs. CRD data takes precedence when the same +// resource provider appears in both sources. +func (s *Shim) listAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { + if resp.StatusCode != http.StatusOK { + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error(err, "failed to read upstream response body") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + if _, err := w.Write(body); err != nil { + log.Error(err, "failed to write response body") + } + return + } + + var upstreamResp allocationsResponse + if err := json.NewDecoder(resp.Body).Decode(&upstreamResp); err != nil { + log.Error(err, "failed to decode upstream allocations response") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if upstreamResp.Allocations == nil { + upstreamResp.Allocations = make(map[string]allocationEntry) + } + + // Look up consumer in CRD. + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to look up consumer in CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + // Merge CRD bookings into the response. CRD takes precedence on collision. + for _, hv := range hvs.Items { + consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) + if consumer == nil { + continue + } + rpUUID := hv.Status.HypervisorID + upstreamResp.Allocations[rpUUID] = allocationEntry{ + Resources: hvToPlacementResources(consumer.Resources), + } + if upstreamResp.ConsumerGeneration == nil { + upstreamResp.ConsumerGeneration = consumer.ConsumerGeneration + } + if upstreamResp.ProjectID == "" { + upstreamResp.ProjectID = consumer.ProjectID + } + if upstreamResp.UserID == "" { + upstreamResp.UserID = consumer.UserID + } + } + + s.writeJSON(w, http.StatusOK, upstreamResp) + }) +} + +// listAllocationsCRD retrieves allocations exclusively from Hypervisor CRDs, +// returning an empty allocations map if the consumer has no bookings. +func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to look up consumer in CRD") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + resp := allocationsResponse{ + Allocations: make(map[string]allocationEntry), + } + + for _, hv := range hvs.Items { + consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) + if consumer == nil { + continue + } + rpUUID := hv.Status.HypervisorID + resp.Allocations[rpUUID] = allocationEntry{ + Resources: hvToPlacementResources(consumer.Resources), + } + resp.ConsumerGeneration = consumer.ConsumerGeneration + resp.ProjectID = consumer.ProjectID + resp.UserID = consumer.UserID + } + + s.writeJSON(w, http.StatusOK, resp) +} + // HandleUpdateAllocations handles PUT /allocations/{consumer_uuid} requests. // // Creates or replaces all allocation records for a single consumer. If @@ -154,31 +416,7 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) { // insufficient inventory or if a concurrent update was detected. // // https://docs.openstack.org/api-ref/placement/#update-allocations -func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) { - consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid") - if !ok { - return - } - switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { - case FeatureModePassthrough: - s.forward(w, r) - case FeatureModeHybrid: - s.updateAllocationsHybrid(w, r, consumerUUID) - case FeatureModeCRD: - s.updateAllocationsCRD(w, r, consumerUUID) - default: - http.Error(w, "unknown feature mode", http.StatusInternalServerError) - } -} - -// HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests. -// -// Removes all allocation records for the consumer across all resource -// providers. Returns 204 No Content on success, or 404 Not Found if the -// consumer has no existing allocations. -// -// https://docs.openstack.org/api-ref/placement/#delete-allocations -func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) { +func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) { consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid") if !ok { return @@ -187,18 +425,18 @@ func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: - s.deleteAllocationsHybrid(w, r, consumerUUID) + s.updateAllocationsHybrid(w, r, consumerUUID) case FeatureModeCRD: - s.deleteAllocationsCRD(w, r, consumerUUID) + s.updateAllocationsCRD(w, r, consumerUUID) default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) } } -// --------------------------------------------------------------------------- -// PUT /allocations/{consumer_uuid} — hybrid and crd -// --------------------------------------------------------------------------- - +// updateAllocationsHybrid splits the allocation set into KVM-managed resource +// providers (written to Hypervisor CRDs) and non-KVM providers (forwarded to +// upstream Placement). Non-KVM allocations are forwarded first; only if +// upstream succeeds are KVM bookings persisted. func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { ctx := r.Context() log := logf.FromContext(ctx) @@ -257,7 +495,9 @@ func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, c } } w.WriteHeader(rec.statusCode) - w.Write(rec.body.Bytes()) //nolint:errcheck + if _, err := w.Write(rec.body.Bytes()); err != nil { + log.Error(err, "failed to write response body") + } return } } @@ -276,6 +516,9 @@ func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, c w.WriteHeader(http.StatusNoContent) } +// updateAllocationsCRD handles PUT /allocations/{consumer_uuid} exclusively +// via CRDs. All resource providers in the request must resolve to a known +// Hypervisor CR; otherwise the request is rejected with 400. func (s *Shim) updateAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { ctx := r.Context() log := logf.FromContext(ctx) @@ -323,9 +566,10 @@ func (s *Shim) writeKVMBookings( kvmAllocs map[string]allocationEntry, kvmHypervisors map[string]*hv1.Hypervisor, ) error { + log := logf.FromContext(ctx) - hvGR := schema.GroupResource{Group: "kvm.cloud.sap", Resource: "hypervisors"} + hvGR := hv1.GroupVersion.WithResource("hypervisors").GroupResource() for rpUUID, entry := range kvmAllocs { hv := kvmHypervisors[rpUUID] existing := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) @@ -390,64 +634,35 @@ func (s *Shim) writeKVMBookings( return nil } -// --------------------------------------------------------------------------- -// GET /allocations/{consumer_uuid} — hybrid and crd -// --------------------------------------------------------------------------- - -func (s *Shim) listAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { - ctx := r.Context() - log := logf.FromContext(ctx) - - s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) { - var upstreamResp allocationsResponse - if resp.StatusCode == http.StatusOK { - if err := json.NewDecoder(resp.Body).Decode(&upstreamResp); err != nil { - log.Error(err, "failed to decode upstream allocations response") - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - } else { - // If upstream returned non-200, initialize empty. - upstreamResp.Allocations = make(map[string]allocationEntry) - } - if upstreamResp.Allocations == nil { - upstreamResp.Allocations = make(map[string]allocationEntry) - } - - // Look up consumer in CRD. - var hvs hv1.HypervisorList - if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) { - log.Error(err, "failed to look up consumer in CRD") - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - - // Merge CRD bookings into the response. CRD takes precedence on collision. - for _, hv := range hvs.Items { - consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) - if consumer == nil { - continue - } - rpUUID := hv.Status.HypervisorID - upstreamResp.Allocations[rpUUID] = allocationEntry{ - Resources: hvToPlacementResources(consumer.Resources), - } - if upstreamResp.ConsumerGeneration == nil { - upstreamResp.ConsumerGeneration = consumer.ConsumerGeneration - } - if upstreamResp.ProjectID == "" { - upstreamResp.ProjectID = consumer.ProjectID - } - if upstreamResp.UserID == "" { - upstreamResp.UserID = consumer.UserID - } - } - - s.writeJSON(w, http.StatusOK, upstreamResp) - }) +// HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests. +// +// Removes all allocation records for the consumer across all resource +// providers. Returns 204 No Content on success, or 404 Not Found if the +// consumer has no existing allocations. +// +// https://docs.openstack.org/api-ref/placement/#delete-allocations +func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) { + consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid") + if !ok { + return + } + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.deleteAllocationsHybrid(w, r, consumerUUID) + case FeatureModeCRD: + s.deleteAllocationsCRD(w, r, consumerUUID) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } } -func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { +// deleteAllocationsHybrid checks whether the consumer has bookings in +// Hypervisor CRDs. If it does, those bookings are removed and the request is +// also forwarded to upstream (tolerating a 404 from upstream). If the consumer +// has no CRD bookings, the request is forwarded to upstream as-is. +func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { ctx := r.Context() log := logf.FromContext(ctx) @@ -458,47 +673,19 @@ func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consum return } - resp := allocationsResponse{ - Allocations: make(map[string]allocationEntry), - } - - for _, hv := range hvs.Items { - consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID) - if consumer == nil { - continue - } - rpUUID := hv.Status.HypervisorID - resp.Allocations[rpUUID] = allocationEntry{ - Resources: hvToPlacementResources(consumer.Resources), - } - resp.ConsumerGeneration = consumer.ConsumerGeneration - resp.ProjectID = consumer.ProjectID - resp.UserID = consumer.UserID - } - - s.writeJSON(w, http.StatusOK, resp) -} - -// --------------------------------------------------------------------------- -// DELETE /allocations/{consumer_uuid} — hybrid and crd -// --------------------------------------------------------------------------- - -func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) { - ctx := r.Context() - log := logf.FromContext(ctx) - - // Forward to upstream first. rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)} s.forward(rec, r) - // Upstream returning 404 is acceptable — the consumer may only exist in CRD. - if rec.statusCode >= 300 && rec.statusCode != http.StatusNotFound { + + if len(hvs.Items) == 0 || (rec.statusCode >= 300 && rec.statusCode != http.StatusNotFound) { for k, vs := range rec.header { for _, v := range vs { w.Header().Add(k, v) } } w.WriteHeader(rec.statusCode) - w.Write(rec.body.Bytes()) //nolint:errcheck + if _, err := w.Write(rec.body.Bytes()); err != nil { + log.Error(err, "failed to write response body") + } return } @@ -512,6 +699,8 @@ func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, c w.WriteHeader(http.StatusNoContent) } +// deleteAllocationsCRD removes all bookings for the consumer exclusively from +// Hypervisor CRDs. Returns 404 if the consumer has no existing bookings. func (s *Shim) deleteAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) { ctx := r.Context() log := logf.FromContext(ctx) @@ -569,162 +758,8 @@ func (s *Shim) removeConsumerFromCRD(ctx context.Context, consumerUUID string) e return nil } -// --------------------------------------------------------------------------- -// POST /allocations — hybrid and crd -// --------------------------------------------------------------------------- - -// manageAllocationsRequest represents the batch body for POST /allocations. -// It is keyed by consumer UUID. -type manageAllocationsRequest map[string]allocationsRequest - -func (s *Shim) manageAllocationsHybrid(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := logf.FromContext(ctx) - - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - log.Error(err, "failed to read request body") - http.Error(w, "failed to read request body", http.StatusBadRequest) - return - } - - var batch manageAllocationsRequest - if err := json.Unmarshal(bodyBytes, &batch); err != nil { - http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) - return - } - - // Separate KVM from non-KVM across all consumers. - type kvmWork struct { - consumerUUID string - req *allocationsRequest - kvmAllocs map[string]allocationEntry - kvmHypervisors map[string]*hv1.Hypervisor - } - var kvmWorkItems []kvmWork - nonKvmBatch := make(manageAllocationsRequest) - - for consumerUUID, consumerReq := range batch { - kvmA := make(map[string]allocationEntry) - nonKvmA := make(map[string]allocationEntry) - kvmHVs := make(map[string]*hv1.Hypervisor) - - for rpUUID, entry := range consumerReq.Allocations { - var hvs hv1.HypervisorList - if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 { - kvmA[rpUUID] = entry - hv := hvs.Items[0] - kvmHVs[rpUUID] = &hv - } else { - nonKvmA[rpUUID] = entry - } - } - - if len(nonKvmA) > 0 { - nonKvmBatch[consumerUUID] = allocationsRequest{ - Allocations: nonKvmA, - ConsumerGeneration: consumerReq.ConsumerGeneration, - ProjectID: consumerReq.ProjectID, - UserID: consumerReq.UserID, - ConsumerType: consumerReq.ConsumerType, - } - } - if len(kvmA) > 0 { - cr := consumerReq - kvmWorkItems = append(kvmWorkItems, kvmWork{ - consumerUUID: consumerUUID, - req: &cr, - kvmAllocs: kvmA, - kvmHypervisors: kvmHVs, - }) - } - } - - // Forward non-KVM portion to upstream first. - if len(nonKvmBatch) > 0 { - upstreamBody, err := json.Marshal(nonKvmBatch) - if err != nil { - log.Error(err, "failed to marshal upstream batch request") - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - r.Body = io.NopCloser(bytes.NewReader(upstreamBody)) - rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)} - s.forward(rec, r) - if rec.statusCode >= 300 { - for k, vs := range rec.header { - for _, v := range vs { - w.Header().Add(k, v) - } - } - w.WriteHeader(rec.statusCode) - w.Write(rec.body.Bytes()) //nolint:errcheck - return - } - } - - // Write KVM bookings. - for _, work := range kvmWorkItems { - if err := s.writeKVMBookings(ctx, work.consumerUUID, work.req, work.kvmAllocs, work.kvmHypervisors); err != nil { - log.Error(err, "failed to write KVM bookings in batch", "consumer", work.consumerUUID) - if apierrors.IsConflict(err) { - http.Error(w, "consumer generation conflict", http.StatusConflict) - return - } - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - } - - w.WriteHeader(http.StatusNoContent) -} - -func (s *Shim) manageAllocationsCRD(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - log := logf.FromContext(ctx) - - var batch manageAllocationsRequest - if err := json.NewDecoder(r.Body).Decode(&batch); err != nil { - http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest) - return - } - - for consumerUUID, consumerReq := range batch { - kvmAllocs := make(map[string]allocationEntry) - kvmHypervisors := make(map[string]*hv1.Hypervisor) - - for rpUUID, entry := range consumerReq.Allocations { - var hvs hv1.HypervisorList - if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 { - log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID) - http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest) - return - } - kvmAllocs[rpUUID] = entry - hv := hvs.Items[0] - kvmHypervisors[rpUUID] = &hv - } - - cr := consumerReq - if err := s.writeKVMBookings(ctx, consumerUUID, &cr, kvmAllocs, kvmHypervisors); err != nil { - log.Error(err, "failed to write KVM bookings in batch", "consumer", consumerUUID) - if apierrors.IsConflict(err) { - http.Error(w, "consumer generation conflict", http.StatusConflict) - return - } - http.Error(w, "Internal Server Error", http.StatusInternalServerError) - return - } - } - - w.WriteHeader(http.StatusNoContent) -} - -// --------------------------------------------------------------------------- -// statusRecorder captures the upstream response so we can inspect status before -// committing to write the real response. -// --------------------------------------------------------------------------- - +// statusRecorder captures the upstream response so we can inspect the status +// code before committing to write the real client response. type statusRecorder struct { http.ResponseWriter statusCode int diff --git a/internal/shim/placement/handle_allocations_e2e.go b/internal/shim/placement/handle_allocations_e2e.go index cb8cf3244..6d0f45634 100644 --- a/internal/shim/placement/handle_allocations_e2e.go +++ b/internal/shim/placement/handle_allocations_e2e.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "net/http" - "time" "github.com/cobaltcore-dev/cortex/pkg/conf" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -455,7 +454,7 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl // 1. Test GET /allocations/{consumer_uuid} — empty (consumer not booked). log.Info("Testing GET /allocations (empty, CRD)", "consumer", consumerUUID) - if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + if err := e2ePollUntil(ctx, func() (bool, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody) if err != nil { @@ -518,24 +517,24 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl // 3. Test GET /allocations/{consumer_uuid} — verify booking present. log.Info("Testing GET /allocations (after PUT, CRD)", "consumer", consumerUUID) var getResp allocationsResponse - if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { - req, err := http.NewRequestWithContext(ctx, + if err := e2ePollUntil(ctx, func() (bool, error) { + pollReq, err := http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody) if err != nil { return false, err } - req.Header.Set("X-Auth-Token", sc.TokenID) - req.Header.Set("OpenStack-API-Version", apiVersion) - req.Header.Set("Accept", "application/json") - resp, err := sc.HTTPClient.Do(req) + pollReq.Header.Set("X-Auth-Token", sc.TokenID) + pollReq.Header.Set("OpenStack-API-Version", apiVersion) + pollReq.Header.Set("Accept", "application/json") + pollResp, err := sc.HTTPClient.Do(pollReq) if err != nil { return false, err } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("GET /allocations (after PUT): expected 200, got %d", resp.StatusCode) + defer pollResp.Body.Close() + if pollResp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (after PUT): expected 200, got %d", pollResp.StatusCode) } - if err := json.NewDecoder(resp.Body).Decode(&getResp); err != nil { + if err := json.NewDecoder(pollResp.Body).Decode(&getResp); err != nil { return false, err } _, ok := getResp.Allocations[kvmUUID] @@ -621,24 +620,24 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl // Verify second consumer's allocation. var getResp2 allocationsResponse - if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { - req, err := http.NewRequestWithContext(ctx, + if err := e2ePollUntil(ctx, func() (bool, error) { + pollReq, err := http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID2, http.NoBody) if err != nil { return false, err } - req.Header.Set("X-Auth-Token", sc.TokenID) - req.Header.Set("OpenStack-API-Version", apiVersion) - req.Header.Set("Accept", "application/json") - resp, err := sc.HTTPClient.Do(req) + pollReq.Header.Set("X-Auth-Token", sc.TokenID) + pollReq.Header.Set("OpenStack-API-Version", apiVersion) + pollReq.Header.Set("Accept", "application/json") + pollResp, err := sc.HTTPClient.Do(pollReq) if err != nil { return false, err } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("GET /allocations (consumer2): expected 200, got %d", resp.StatusCode) + defer pollResp.Body.Close() + if pollResp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (consumer2): expected 200, got %d", pollResp.StatusCode) } - if err := json.NewDecoder(resp.Body).Decode(&getResp2); err != nil { + if err := json.NewDecoder(pollResp.Body).Decode(&getResp2); err != nil { return false, err } _, ok := getResp2.Allocations[kvmUUID] @@ -670,25 +669,25 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl } // 7. Verify GET after DELETE returns empty. - if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { - req, err := http.NewRequestWithContext(ctx, + if err := e2ePollUntil(ctx, func() (bool, error) { + pollReq, err := http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody) if err != nil { return false, err } - req.Header.Set("X-Auth-Token", sc.TokenID) - req.Header.Set("OpenStack-API-Version", apiVersion) - req.Header.Set("Accept", "application/json") - resp, err := sc.HTTPClient.Do(req) + pollReq.Header.Set("X-Auth-Token", sc.TokenID) + pollReq.Header.Set("OpenStack-API-Version", apiVersion) + pollReq.Header.Set("Accept", "application/json") + pollResp, err := sc.HTTPClient.Do(pollReq) if err != nil { return false, err } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("GET /allocations (post-delete): expected 200, got %d", resp.StatusCode) + defer pollResp.Body.Close() + if pollResp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET /allocations (post-delete): expected 200, got %d", pollResp.StatusCode) } var r allocationsResponse - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + if err := json.NewDecoder(pollResp.Body).Decode(&r); err != nil { return false, err } return len(r.Allocations) == 0, nil diff --git a/internal/shim/placement/handle_allocations_test.go b/internal/shim/placement/handle_allocations_test.go index e32be5747..55e4bb300 100644 --- a/internal/shim/placement/handle_allocations_test.go +++ b/internal/shim/placement/handle_allocations_test.go @@ -99,7 +99,7 @@ const ( testRPUUID2 = "66666666-7777-8888-9999-aaaaaaaaaaaa" ) -func testHypervisorWithBooking(name, openstackID, consumerUUID string, vcpu int64, memMB int64) *hv1.Hypervisor { +func testHypervisorWithBooking(name, openstackID string, vcpu, memMB int64) *hv1.Hypervisor { gen := int64(1) return &hv1.Hypervisor{ ObjectMeta: metav1.ObjectMeta{Name: name}, @@ -107,7 +107,7 @@ func testHypervisorWithBooking(name, openstackID, consumerUUID string, vcpu int6 Spec: hv1.HypervisorSpec{ Bookings: []hv1.Booking{ {Consumer: &hv1.ConsumerBooking{ - UUID: consumerUUID, + UUID: testConsumerUUID, Resources: map[hv1.ResourceName]resource.Quantity{hv1.ResourceCPU: *resource.NewQuantity(vcpu, resource.DecimalSI), hv1.ResourceMemory: *resource.NewQuantity(memMB*1024*1024, resource.BinarySI)}, ConsumerGeneration: &gen, ProjectID: "proj-1", @@ -160,7 +160,7 @@ func TestUpdateAllocations_CRD_NewConsumer(t *testing.T) { } func TestUpdateAllocations_CRD_GenerationMismatch(t *testing.T) { - hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 2, 4096) + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 2, 4096) s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) wrongGen := int64(99) @@ -189,7 +189,7 @@ func TestUpdateAllocations_CRD_UnknownRP(t *testing.T) { // --------------------------------------------------------------------------- func TestListAllocations_CRD_Found(t *testing.T) { - hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192) s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", @@ -232,8 +232,8 @@ func TestListAllocations_CRD_NotFound(t *testing.T) { } func TestListAllocations_CRD_MultiCR(t *testing.T) { - hv1obj := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) - hv2obj := testHypervisorWithBooking("hv-node-2", testRPUUID2, testConsumerUUID, 2, 4096) + hv1obj := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192) + hv2obj := testHypervisorWithBooking("hv-node-2", testRPUUID2, 2, 4096) s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv1obj, hv2obj) w := serveHandler(t, "GET", "/allocations/{consumer_uuid}", @@ -256,7 +256,7 @@ func TestListAllocations_CRD_MultiCR(t *testing.T) { // --------------------------------------------------------------------------- func TestDeleteAllocations_CRD_Found(t *testing.T) { - hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192) s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv) w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", @@ -316,7 +316,7 @@ func TestUpdateAllocations_Hybrid_NonKVMOnly(t *testing.T) { // --------------------------------------------------------------------------- func TestListAllocations_Hybrid_Merge(t *testing.T) { - hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192) upstreamBody := `{"allocations":{"upstream-rp-uuid":{"resources":{"VCPU":2}}},"consumer_generation":1,"project_id":"proj-1","user_id":"user-1"}` s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusOK, upstreamBody, hv) @@ -346,7 +346,7 @@ func TestListAllocations_Hybrid_Merge(t *testing.T) { // --------------------------------------------------------------------------- func TestDeleteAllocations_Hybrid(t *testing.T) { - hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192) s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusNoContent, "", hv) w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", @@ -357,7 +357,7 @@ func TestDeleteAllocations_Hybrid(t *testing.T) { } func TestDeleteAllocations_Hybrid_UpstreamFails(t *testing.T) { - hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192) + hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192) s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusInternalServerError, "upstream error", hv) w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}", diff --git a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go index 3f7f55424..5382d96b7 100644 --- a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go +++ b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go @@ -10,7 +10,6 @@ import ( "fmt" "net/http" "slices" - "time" "github.com/cobaltcore-dev/cortex/pkg/conf" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -369,7 +368,7 @@ func e2eCRDResourceProviderAggregates(ctx context.Context, sc *gophercloud.Servi Aggregates []string `json:"aggregates"` ResourceProviderGeneration int64 `json:"resource_provider_generation"` } - if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + if err := e2ePollUntil(ctx, func() (bool, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/aggregates", http.NoBody) if err != nil { diff --git a/internal/shim/placement/handle_resource_provider_traits_e2e.go b/internal/shim/placement/handle_resource_provider_traits_e2e.go index 7dc50b016..b55ac6e08 100644 --- a/internal/shim/placement/handle_resource_provider_traits_e2e.go +++ b/internal/shim/placement/handle_resource_provider_traits_e2e.go @@ -10,7 +10,6 @@ import ( "fmt" "net/http" "slices" - "time" "github.com/cobaltcore-dev/cortex/pkg/conf" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -388,7 +387,7 @@ func e2eCRDResourceProviderTraits(ctx context.Context, sc *gophercloud.ServiceCl Traits []string `json:"traits"` ResourceProviderGeneration int64 `json:"resource_provider_generation"` } - if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + if err := e2ePollUntil(ctx, func() (bool, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", http.NoBody) if err != nil { diff --git a/internal/shim/placement/shim_e2e.go b/internal/shim/placement/shim_e2e.go index b678fa6e5..6aa3cc069 100644 --- a/internal/shim/placement/shim_e2e.go +++ b/internal/shim/placement/shim_e2e.go @@ -213,10 +213,11 @@ func RunE2E(ctx context.Context, cl client.Client) error { return nil } -// e2ePollUntil retries check at short intervals until it returns true or the -// timeout expires. Used to wait for the informer cache to pick up a CRD -// update before asserting via the HTTP API. -func e2ePollUntil(ctx context.Context, timeout time.Duration, check func() (bool, error)) error { +// e2ePollUntil retries check at short intervals until it returns true or +// 10 seconds have elapsed. Used to wait for the informer cache to pick up a +// CRD update before asserting via the HTTP API. +func e2ePollUntil(ctx context.Context, check func() (bool, error)) error { + const timeout = 10 * time.Second deadline := time.Now().Add(timeout) for { ok, err := check() From d692723d4585e6132270bb020ed02cfcdbff73cc Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Mon, 4 May 2026 15:09:28 +0200 Subject: [PATCH 3/3] Fix build after rebase: restore operator pre-release and adapt to new API The rebase onto main upgraded the operator dependency to v1.2.0, which does not yet include spec.bookings (PR #296 is still open). Restore the pre-release pseudo-version. Also add the third `hasBackingConfig` argument to featureModeFromConfOrHeader calls introduced on main. --- go.mod | 2 +- go.sum | 4 ++-- internal/shim/placement/handle_allocations.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 52da7fef1..48196a3cb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cobaltcore-dev/cortex go 1.26.0 require ( - github.com/cobaltcore-dev/openstack-hypervisor-operator v1.2.0 + github.com/cobaltcore-dev/openstack-hypervisor-operator v1.1.1-0.20260430095528-58fbe3ff4c3e github.com/go-gorp/gorp v2.2.0+incompatible github.com/gophercloud/gophercloud/v2 v2.12.0 github.com/ironcore-dev/ironcore v0.3.0 diff --git a/go.sum b/go.sum index 638b047bd..96ccff9c8 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cobaltcore-dev/openstack-hypervisor-operator v1.2.0 h1:XYVIKTC19dj4jck2uinYzTNXcoED5HNTvv+BJ75M2E0= -github.com/cobaltcore-dev/openstack-hypervisor-operator v1.2.0/go.mod h1:iuhqhW6ozxfYWbGlEeh9rW9xyTb/EgelkDJqzJXBclk= +github.com/cobaltcore-dev/openstack-hypervisor-operator v1.1.1-0.20260430095528-58fbe3ff4c3e h1:gegFF2HeBNzpOqQTqPAEkllLDdIHXCiGUxlKT4i5D/o= +github.com/cobaltcore-dev/openstack-hypervisor-operator v1.1.1-0.20260430095528-58fbe3ff4c3e/go.mod h1:iuhqhW6ozxfYWbGlEeh9rW9xyTb/EgelkDJqzJXBclk= github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4= github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= diff --git a/internal/shim/placement/handle_allocations.go b/internal/shim/placement/handle_allocations.go index e4b7393da..30f227d80 100644 --- a/internal/shim/placement/handle_allocations.go +++ b/internal/shim/placement/handle_allocations.go @@ -98,7 +98,7 @@ func hvToPlacementResources(resources map[hv1.ResourceName]resource.Quantity) ma // // https://docs.openstack.org/api-ref/placement/#manage-allocations func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) { - switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: @@ -287,7 +287,7 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) { if !ok { return } - switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: @@ -421,7 +421,7 @@ func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) { if !ok { return } - switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: @@ -646,7 +646,7 @@ func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) { if !ok { return } - switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) { + switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: