From e7ad5a17d2adcfdf10f81c290a3e7a334b9aa035 Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Mon, 4 May 2026 07:41:03 +0200
Subject: [PATCH 1/3] Initial implementation of allocations
---
internal/shim/placement/field_index.go | 25 +
internal/shim/placement/field_index_test.go | 59 ++
internal/shim/placement/handle_allocations.go | 684 +++++++++++++++++-
.../shim/placement/handle_allocations_e2e.go | 551 ++++++++++----
.../shim/placement/handle_allocations_test.go | 396 ++++++++--
.../handle_resource_providers_test.go | 15 +
6 files changed, 1504 insertions(+), 226 deletions(-)
diff --git a/internal/shim/placement/field_index.go b/internal/shim/placement/field_index.go
index 23bf1470c..2d20d1e89 100644
--- a/internal/shim/placement/field_index.go
+++ b/internal/shim/placement/field_index.go
@@ -26,6 +26,10 @@ const (
// by their metadata.name field, which represents the name of the hypervisor
// in Kubernetes.
idxHypervisorName = "metadata.name"
+ // idxBookingConsumerUUID is the name of the index for looking up
+ // hypervisors by the consumer UUIDs in their spec.bookings entries.
+ // A single hypervisor may index multiple consumer UUIDs.
+ idxBookingConsumerUUID = "spec.bookings.consumer.uuid"
)
// IndexFields indexes all fields that are needed by the shim to quickly
@@ -77,5 +81,26 @@ func IndexFields(ctx context.Context, mcl *multicluster.Client) error {
}
log.Info("Successfully set up index for hypervisor name")
+ if err := mcl.IndexField(ctx, h, hl, idxBookingConsumerUUID, func(obj client.Object) []string {
+ hv, ok := obj.(*hv1.Hypervisor)
+ if !ok {
+ log.Error(errors.New("unexpected type"), "object", obj)
+ return nil
+ }
+ consumers := hv1.GetConsumers(hv.Spec.Bookings)
+ if len(consumers) == 0 {
+ return nil
+ }
+ uuids := make([]string, 0, len(consumers))
+ for _, c := range consumers {
+ uuids = append(uuids, c.UUID)
+ }
+ return uuids
+ }); err != nil {
+ log.Error(err, "failed to set up index for booking consumer UUID")
+ return err
+ }
+ log.Info("Successfully set up index for booking consumer UUID")
+
return nil
}
diff --git a/internal/shim/placement/field_index_test.go b/internal/shim/placement/field_index_test.go
index 8db1f2169..5987fe8d2 100644
--- a/internal/shim/placement/field_index_test.go
+++ b/internal/shim/placement/field_index_test.go
@@ -118,6 +118,7 @@ func TestIndexFields_RegistersAllIndexes(t *testing.T) {
idxHypervisorOpenStackId,
idxHypervisorKubernetesId,
idxHypervisorName,
+ idxBookingConsumerUUID,
}
if len(cc.calls) != len(wantFields) {
t.Fatalf("got %d IndexField calls, want %d", len(cc.calls), len(wantFields))
@@ -284,3 +285,61 @@ func strSliceEqual(a, b []string) bool {
}
return true
}
+
+func TestExtractor_BookingConsumerUUID(t *testing.T) {
+ cc := &captureCache{}
+ mcl := buildClient(t, cc)
+ if err := IndexFields(context.Background(), mcl); err != nil {
+ t.Fatalf("IndexFields: %v", err)
+ }
+ fn := extractorByField(t, cc.calls, idxBookingConsumerUUID)
+
+ tests := []struct {
+ name string
+ obj client.Object
+ want []string
+ }{
+ {
+ name: "hypervisor with consumer bookings",
+ obj: &hv1.Hypervisor{
+ Spec: hv1.HypervisorSpec{
+ Bookings: []hv1.Booking{
+ {Consumer: &hv1.ConsumerBooking{UUID: "consumer-aaa"}},
+ {Consumer: &hv1.ConsumerBooking{UUID: "consumer-bbb"}},
+ {Reservation: &hv1.ReservationBooking{Name: "nova-reserved"}},
+ },
+ },
+ },
+ want: []string{"consumer-aaa", "consumer-bbb"},
+ },
+ {
+ name: "hypervisor with no bookings",
+ obj: &hv1.Hypervisor{},
+ want: nil,
+ },
+ {
+ name: "hypervisor with only reservation bookings",
+ obj: &hv1.Hypervisor{
+ Spec: hv1.HypervisorSpec{
+ Bookings: []hv1.Booking{
+ {Reservation: &hv1.ReservationBooking{Name: "nova-reserved"}},
+ },
+ },
+ },
+ want: nil,
+ },
+ {
+ name: "wrong type",
+ obj: &corev1.ConfigMap{},
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := fn(tt.obj)
+ if !strSliceEqual(got, tt.want) {
+ t.Errorf("got %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/internal/shim/placement/handle_allocations.go b/internal/shim/placement/handle_allocations.go
index f909cc5a7..a5bf35c37 100644
--- a/internal/shim/placement/handle_allocations.go
+++ b/internal/shim/placement/handle_allocations.go
@@ -4,9 +4,83 @@
package placement
import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
"net/http"
+
+ hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ logf "sigs.k8s.io/controller-runtime/pkg/log"
)
+// allocationsRequest is the JSON body for PUT /allocations/{consumer_uuid}
+// (microversion 1.28+).
+type allocationsRequest struct {
+ Allocations map[string]allocationEntry `json:"allocations"`
+ ConsumerGeneration *int64 `json:"consumer_generation"`
+ ProjectID string `json:"project_id"`
+ UserID string `json:"user_id"`
+ ConsumerType string `json:"consumer_type,omitempty"`
+}
+
+// allocationEntry represents a single resource provider's allocation within
+// a consumer's allocation set.
+type allocationEntry struct {
+ Resources map[string]int64 `json:"resources"`
+}
+
+// allocationsResponse is the JSON body returned by
+// GET /allocations/{consumer_uuid} (microversion 1.28+).
+//
+// https://docs.openstack.org/api-ref/placement/#list-allocations
+type allocationsResponse struct {
+ Allocations map[string]allocationEntry `json:"allocations"`
+ ConsumerGeneration *int64 `json:"consumer_generation,omitempty"`
+ ProjectID string `json:"project_id,omitempty"`
+ UserID string `json:"user_id,omitempty"`
+}
+
+// placementToHVResources converts Placement resource class amounts to
+// Hypervisor CRD resource quantities.
+// VCPU → cpu (1:1), MEMORY_MB → memory (MB→bytes), DISK_GB → disk (GB→bytes).
+func placementToHVResources(resources map[string]int64) map[hv1.ResourceName]resource.Quantity {
+ out := make(map[hv1.ResourceName]resource.Quantity, len(resources))
+ for name, amount := range resources {
+ switch name {
+ case "VCPU":
+ out[hv1.ResourceCPU] = *resource.NewQuantity(amount, resource.DecimalSI)
+ case "MEMORY_MB":
+ out[hv1.ResourceMemory] = *resource.NewQuantity(amount*1024*1024, resource.BinarySI)
+ default:
+ out[hv1.ResourceName(name)] = *resource.NewQuantity(amount, resource.DecimalSI)
+ }
+ }
+ return out
+}
+
+// hvToPlacementResources converts Hypervisor CRD resource quantities back to
+// Placement resource class amounts.
+func hvToPlacementResources(resources map[hv1.ResourceName]resource.Quantity) map[string]int64 {
+ out := make(map[string]int64, len(resources))
+ for name, qty := range resources {
+ switch name {
+ case hv1.ResourceCPU:
+ out["VCPU"] = qty.Value()
+ case hv1.ResourceMemory:
+ out["MEMORY_MB"] = qty.Value() / (1024 * 1024)
+ default:
+ out[string(name)] = qty.Value()
+ }
+ }
+ return out
+}
+
// HandleManageAllocations handles POST /allocations requests.
//
// Atomically creates, updates, or deletes allocations for multiple consumers
@@ -22,8 +96,19 @@ import (
// (e.g. INSTANCE, MIGRATION) is supported. Returns 204 No Content on
// success, or 409 Conflict if inventory is insufficient or a concurrent
// update is detected (error code: placement.concurrent_update).
+//
+// https://docs.openstack.org/api-ref/placement/#manage-allocations
func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) {
- s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations)
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ case FeatureModePassthrough:
+ s.forward(w, r)
+ case FeatureModeHybrid:
+ s.manageAllocationsHybrid(w, r)
+ case FeatureModeCRD:
+ s.manageAllocationsCRD(w, r)
+ default:
+ http.Error(w, "unknown feature mode", http.StatusInternalServerError)
+ }
}
// HandleListAllocations handles GET /allocations/{consumer_uuid} requests.
@@ -37,11 +122,23 @@ func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) {
// added at 1.12, consumer_generation at 1.28, and consumer_type at 1.38.
// The consumer_generation and consumer_type fields are absent when the
// consumer has no allocations.
+//
+// https://docs.openstack.org/api-ref/placement/#list-allocations
func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) {
- if _, ok := requiredUUIDPathParam(w, r, "consumer_uuid"); !ok {
+ consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
+ if !ok {
return
}
- s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations)
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ case FeatureModePassthrough:
+ s.forward(w, r)
+ case FeatureModeHybrid:
+ s.listAllocationsHybrid(w, r, consumerUUID)
+ case FeatureModeCRD:
+ s.listAllocationsCRD(w, r, consumerUUID)
+ default:
+ http.Error(w, "unknown feature mode", http.StatusInternalServerError)
+ }
}
// HandleUpdateAllocations handles PUT /allocations/{consumer_uuid} requests.
@@ -55,11 +152,23 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) {
//
// Returns 204 No Content on success. Returns 409 Conflict if there is
// insufficient inventory or if a concurrent update was detected.
+//
+// https://docs.openstack.org/api-ref/placement/#update-allocations
func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) {
- if _, ok := requiredUUIDPathParam(w, r, "consumer_uuid"); !ok {
+ consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
+ if !ok {
return
}
- s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations)
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ case FeatureModePassthrough:
+ s.forward(w, r)
+ case FeatureModeHybrid:
+ s.updateAllocationsHybrid(w, r, consumerUUID)
+ case FeatureModeCRD:
+ s.updateAllocationsCRD(w, r, consumerUUID)
+ default:
+ http.Error(w, "unknown feature mode", http.StatusInternalServerError)
+ }
}
// HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests.
@@ -67,9 +176,570 @@ func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) {
// Removes all allocation records for the consumer across all resource
// providers. Returns 204 No Content on success, or 404 Not Found if the
// consumer has no existing allocations.
+//
+// https://docs.openstack.org/api-ref/placement/#delete-allocations
func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) {
- if _, ok := requiredUUIDPathParam(w, r, "consumer_uuid"); !ok {
+ consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
+ if !ok {
+ return
+ }
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ case FeatureModePassthrough:
+ s.forward(w, r)
+ case FeatureModeHybrid:
+ s.deleteAllocationsHybrid(w, r, consumerUUID)
+ case FeatureModeCRD:
+ s.deleteAllocationsCRD(w, r, consumerUUID)
+ default:
+ http.Error(w, "unknown feature mode", http.StatusInternalServerError)
+ }
+}
+
+// ---------------------------------------------------------------------------
+// PUT /allocations/{consumer_uuid} — hybrid and crd
+// ---------------------------------------------------------------------------
+
+func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ bodyBytes, err := io.ReadAll(r.Body)
+ if err != nil {
+ log.Error(err, "failed to read request body")
+ http.Error(w, "failed to read request body", http.StatusBadRequest)
+ return
+ }
+
+ var req allocationsRequest
+ if err := json.Unmarshal(bodyBytes, &req); err != nil {
+ http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ kvmAllocs := make(map[string]allocationEntry)
+ nonKvmAllocs := make(map[string]allocationEntry)
+ kvmHypervisors := make(map[string]*hv1.Hypervisor)
+
+ for rpUUID, entry := range req.Allocations {
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 {
+ kvmAllocs[rpUUID] = entry
+ hv := hvs.Items[0]
+ kvmHypervisors[rpUUID] = &hv
+ } else {
+ nonKvmAllocs[rpUUID] = entry
+ }
+ }
+
+ // Forward non-KVM allocations to upstream first.
+ if len(nonKvmAllocs) > 0 {
+ upstreamReq := allocationsRequest{
+ Allocations: nonKvmAllocs,
+ ConsumerGeneration: req.ConsumerGeneration,
+ ProjectID: req.ProjectID,
+ UserID: req.UserID,
+ ConsumerType: req.ConsumerType,
+ }
+ upstreamBody, err := json.Marshal(upstreamReq)
+ if err != nil {
+ log.Error(err, "failed to marshal upstream request")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ r.Body = io.NopCloser(bytes.NewReader(upstreamBody))
+ rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)}
+ s.forward(rec, r)
+ if rec.statusCode >= 300 {
+ // Upstream failed — copy its response to the client.
+ for k, vs := range rec.header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(rec.statusCode)
+ w.Write(rec.body.Bytes()) //nolint:errcheck
+ return
+ }
+ }
+
+ // Write KVM allocations to CRD.
+ if err := s.writeKVMBookings(ctx, consumerUUID, &req, kvmAllocs, kvmHypervisors); err != nil {
+ log.Error(err, "failed to write KVM bookings")
+ if apierrors.IsConflict(err) {
+ http.Error(w, "consumer generation conflict", http.StatusConflict)
+ return
+ }
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func (s *Shim) updateAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ var req allocationsRequest
+ if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
+ http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ kvmAllocs := make(map[string]allocationEntry)
+ kvmHypervisors := make(map[string]*hv1.Hypervisor)
+
+ for rpUUID, entry := range req.Allocations {
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 {
+ log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID)
+ http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest)
+ return
+ }
+ kvmAllocs[rpUUID] = entry
+ hv := hvs.Items[0]
+ kvmHypervisors[rpUUID] = &hv
+ }
+
+ if err := s.writeKVMBookings(ctx, consumerUUID, &req, kvmAllocs, kvmHypervisors); err != nil {
+ log.Error(err, "failed to write KVM bookings")
+ if apierrors.IsConflict(err) {
+ http.Error(w, "consumer generation conflict", http.StatusConflict)
+ return
+ }
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// writeKVMBookings writes consumer bookings to the respective Hypervisor CRs.
+// It performs consumer generation checks before writing.
+func (s *Shim) writeKVMBookings(
+ ctx context.Context,
+ consumerUUID string,
+ req *allocationsRequest,
+ kvmAllocs map[string]allocationEntry,
+ kvmHypervisors map[string]*hv1.Hypervisor,
+) error {
+ log := logf.FromContext(ctx)
+
+ hvGR := schema.GroupResource{Group: "kvm.cloud.sap", Resource: "hypervisors"}
+ for rpUUID, entry := range kvmAllocs {
+ hv := kvmHypervisors[rpUUID]
+ existing := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
+
+ // Consumer generation check.
+ if existing == nil && req.ConsumerGeneration != nil {
+ return apierrors.NewConflict(hvGR, hv.Name,
+ fmt.Errorf("consumer %s does not exist but generation is non-null", consumerUUID))
+ }
+ if existing != nil && req.ConsumerGeneration == nil {
+ return apierrors.NewConflict(hvGR, hv.Name,
+ fmt.Errorf("consumer %s exists but generation is null", consumerUUID))
+ }
+ if existing != nil && req.ConsumerGeneration != nil && existing.ConsumerGeneration != nil {
+ if *req.ConsumerGeneration != *existing.ConsumerGeneration {
+ return apierrors.NewConflict(hvGR, hv.Name,
+ fmt.Errorf("consumer %s generation mismatch: got %d, want %d", consumerUUID, *req.ConsumerGeneration, *existing.ConsumerGeneration))
+ }
+ }
+
+ // Compute new generation.
+ var newGen int64
+ if existing != nil && existing.ConsumerGeneration != nil {
+ newGen = *existing.ConsumerGeneration + 1
+ } else {
+ newGen = 1
+ }
+
+ newBooking := hv1.Booking{
+ Consumer: &hv1.ConsumerBooking{
+ UUID: consumerUUID,
+ Resources: placementToHVResources(entry.Resources),
+ ConsumerGeneration: &newGen,
+ ConsumerType: req.ConsumerType,
+ ProjectID: req.ProjectID,
+ UserID: req.UserID,
+ },
+ }
+
+ // Replace or append.
+ var newBookings []hv1.Booking
+ replaced := false
+ for _, b := range hv.Spec.Bookings {
+ if b.Consumer != nil && b.Consumer.UUID == consumerUUID {
+ newBookings = append(newBookings, newBooking)
+ replaced = true
+ } else {
+ newBookings = append(newBookings, b)
+ }
+ }
+ if !replaced {
+ newBookings = append(newBookings, newBooking)
+ }
+ hv.Spec.Bookings = newBookings
+
+ if err := s.Update(ctx, hv); err != nil {
+ log.Error(err, "failed to update hypervisor bookings", "hypervisor", hv.Name, "consumer", consumerUUID)
+ return err
+ }
+ log.Info("wrote consumer booking to hypervisor", "hypervisor", hv.Name, "consumer", consumerUUID, "rpUUID", rpUUID)
+ }
+ return nil
+}
+
+// ---------------------------------------------------------------------------
+// GET /allocations/{consumer_uuid} — hybrid and crd
+// ---------------------------------------------------------------------------
+
+func (s *Shim) listAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) {
+ var upstreamResp allocationsResponse
+ if resp.StatusCode == http.StatusOK {
+ if err := json.NewDecoder(resp.Body).Decode(&upstreamResp); err != nil {
+ log.Error(err, "failed to decode upstream allocations response")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ } else {
+ // If upstream returned non-200, initialize empty.
+ upstreamResp.Allocations = make(map[string]allocationEntry)
+ }
+ if upstreamResp.Allocations == nil {
+ upstreamResp.Allocations = make(map[string]allocationEntry)
+ }
+
+ // Look up consumer in CRD.
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) {
+ log.Error(err, "failed to look up consumer in CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ // Merge CRD bookings into the response. CRD takes precedence on collision.
+ for _, hv := range hvs.Items {
+ consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
+ if consumer == nil {
+ continue
+ }
+ rpUUID := hv.Status.HypervisorID
+ upstreamResp.Allocations[rpUUID] = allocationEntry{
+ Resources: hvToPlacementResources(consumer.Resources),
+ }
+ if upstreamResp.ConsumerGeneration == nil {
+ upstreamResp.ConsumerGeneration = consumer.ConsumerGeneration
+ }
+ if upstreamResp.ProjectID == "" {
+ upstreamResp.ProjectID = consumer.ProjectID
+ }
+ if upstreamResp.UserID == "" {
+ upstreamResp.UserID = consumer.UserID
+ }
+ }
+
+ s.writeJSON(w, http.StatusOK, upstreamResp)
+ })
+}
+
+func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) {
+ log.Error(err, "failed to look up consumer in CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
- s.dispatchPassthroughOnly(w, r, s.config.Features.Allocations)
+
+ resp := allocationsResponse{
+ Allocations: make(map[string]allocationEntry),
+ }
+
+ for _, hv := range hvs.Items {
+ consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
+ if consumer == nil {
+ continue
+ }
+ rpUUID := hv.Status.HypervisorID
+ resp.Allocations[rpUUID] = allocationEntry{
+ Resources: hvToPlacementResources(consumer.Resources),
+ }
+ resp.ConsumerGeneration = consumer.ConsumerGeneration
+ resp.ProjectID = consumer.ProjectID
+ resp.UserID = consumer.UserID
+ }
+
+ s.writeJSON(w, http.StatusOK, resp)
+}
+
+// ---------------------------------------------------------------------------
+// DELETE /allocations/{consumer_uuid} — hybrid and crd
+// ---------------------------------------------------------------------------
+
+func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ // Forward to upstream first.
+ rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)}
+ s.forward(rec, r)
+ // Upstream returning 404 is acceptable — the consumer may only exist in CRD.
+ if rec.statusCode >= 300 && rec.statusCode != http.StatusNotFound {
+ for k, vs := range rec.header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(rec.statusCode)
+ w.Write(rec.body.Bytes()) //nolint:errcheck
+ return
+ }
+
+ // Remove from CRD.
+ if err := s.removeConsumerFromCRD(ctx, consumerUUID); err != nil {
+ log.Error(err, "failed to remove consumer from CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func (s *Shim) deleteAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) {
+ log.Error(err, "failed to look up consumer in CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ if len(hvs.Items) == 0 {
+ http.Error(w, "consumer not found", http.StatusNotFound)
+ return
+ }
+
+ if err := s.removeConsumerFromCRD(ctx, consumerUUID); err != nil {
+ log.Error(err, "failed to remove consumer from CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// removeConsumerFromCRD removes all booking entries for a consumer from all
+// Hypervisor CRs that hold it.
+func (s *Shim) removeConsumerFromCRD(ctx context.Context, consumerUUID string) error {
+ log := logf.FromContext(ctx)
+
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil {
+ if apierrors.IsNotFound(err) {
+ return nil
+ }
+ return err
+ }
+
+ for i := range hvs.Items {
+ hv := &hvs.Items[i]
+ var newBookings []hv1.Booking
+ for _, b := range hv.Spec.Bookings {
+ if b.Consumer != nil && b.Consumer.UUID == consumerUUID {
+ continue
+ }
+ newBookings = append(newBookings, b)
+ }
+ hv.Spec.Bookings = newBookings
+ if err := s.Update(ctx, hv); err != nil {
+ log.Error(err, "failed to update hypervisor after removing consumer", "hypervisor", hv.Name, "consumer", consumerUUID)
+ return err
+ }
+ log.Info("removed consumer booking from hypervisor", "hypervisor", hv.Name, "consumer", consumerUUID)
+ }
+ return nil
+}
+
+// ---------------------------------------------------------------------------
+// POST /allocations — hybrid and crd
+// ---------------------------------------------------------------------------
+
+// manageAllocationsRequest represents the batch body for POST /allocations.
+// It is keyed by consumer UUID.
+type manageAllocationsRequest map[string]allocationsRequest
+
+func (s *Shim) manageAllocationsHybrid(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ bodyBytes, err := io.ReadAll(r.Body)
+ if err != nil {
+ log.Error(err, "failed to read request body")
+ http.Error(w, "failed to read request body", http.StatusBadRequest)
+ return
+ }
+
+ var batch manageAllocationsRequest
+ if err := json.Unmarshal(bodyBytes, &batch); err != nil {
+ http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ // Separate KVM from non-KVM across all consumers.
+ type kvmWork struct {
+ consumerUUID string
+ req *allocationsRequest
+ kvmAllocs map[string]allocationEntry
+ kvmHypervisors map[string]*hv1.Hypervisor
+ }
+ var kvmWorkItems []kvmWork
+ nonKvmBatch := make(manageAllocationsRequest)
+
+ for consumerUUID, consumerReq := range batch {
+ kvmA := make(map[string]allocationEntry)
+ nonKvmA := make(map[string]allocationEntry)
+ kvmHVs := make(map[string]*hv1.Hypervisor)
+
+ for rpUUID, entry := range consumerReq.Allocations {
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 {
+ kvmA[rpUUID] = entry
+ hv := hvs.Items[0]
+ kvmHVs[rpUUID] = &hv
+ } else {
+ nonKvmA[rpUUID] = entry
+ }
+ }
+
+ if len(nonKvmA) > 0 {
+ nonKvmBatch[consumerUUID] = allocationsRequest{
+ Allocations: nonKvmA,
+ ConsumerGeneration: consumerReq.ConsumerGeneration,
+ ProjectID: consumerReq.ProjectID,
+ UserID: consumerReq.UserID,
+ ConsumerType: consumerReq.ConsumerType,
+ }
+ }
+ if len(kvmA) > 0 {
+ cr := consumerReq
+ kvmWorkItems = append(kvmWorkItems, kvmWork{
+ consumerUUID: consumerUUID,
+ req: &cr,
+ kvmAllocs: kvmA,
+ kvmHypervisors: kvmHVs,
+ })
+ }
+ }
+
+ // Forward non-KVM portion to upstream first.
+ if len(nonKvmBatch) > 0 {
+ upstreamBody, err := json.Marshal(nonKvmBatch)
+ if err != nil {
+ log.Error(err, "failed to marshal upstream batch request")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ r.Body = io.NopCloser(bytes.NewReader(upstreamBody))
+ rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)}
+ s.forward(rec, r)
+ if rec.statusCode >= 300 {
+ for k, vs := range rec.header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(rec.statusCode)
+ w.Write(rec.body.Bytes()) //nolint:errcheck
+ return
+ }
+ }
+
+ // Write KVM bookings.
+ for _, work := range kvmWorkItems {
+ if err := s.writeKVMBookings(ctx, work.consumerUUID, work.req, work.kvmAllocs, work.kvmHypervisors); err != nil {
+ log.Error(err, "failed to write KVM bookings in batch", "consumer", work.consumerUUID)
+ if apierrors.IsConflict(err) {
+ http.Error(w, "consumer generation conflict", http.StatusConflict)
+ return
+ }
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func (s *Shim) manageAllocationsCRD(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ var batch manageAllocationsRequest
+ if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
+ http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ for consumerUUID, consumerReq := range batch {
+ kvmAllocs := make(map[string]allocationEntry)
+ kvmHypervisors := make(map[string]*hv1.Hypervisor)
+
+ for rpUUID, entry := range consumerReq.Allocations {
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 {
+ log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID)
+ http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest)
+ return
+ }
+ kvmAllocs[rpUUID] = entry
+ hv := hvs.Items[0]
+ kvmHypervisors[rpUUID] = &hv
+ }
+
+ cr := consumerReq
+ if err := s.writeKVMBookings(ctx, consumerUUID, &cr, kvmAllocs, kvmHypervisors); err != nil {
+ log.Error(err, "failed to write KVM bookings in batch", "consumer", consumerUUID)
+ if apierrors.IsConflict(err) {
+ http.Error(w, "consumer generation conflict", http.StatusConflict)
+ return
+ }
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// ---------------------------------------------------------------------------
+// statusRecorder captures the upstream response so we can inspect status before
+// committing to write the real response.
+// ---------------------------------------------------------------------------
+
+type statusRecorder struct {
+ http.ResponseWriter
+ statusCode int
+ header http.Header
+ body bytes.Buffer
+}
+
+func (r *statusRecorder) Header() http.Header {
+ return r.header
+}
+
+func (r *statusRecorder) WriteHeader(code int) {
+ r.statusCode = code
+}
+
+func (r *statusRecorder) Write(b []byte) (int, error) {
+ return r.body.Write(b)
}
diff --git a/internal/shim/placement/handle_allocations_e2e.go b/internal/shim/placement/handle_allocations_e2e.go
index 27887ca7f..cb8cf3244 100644
--- a/internal/shim/placement/handle_allocations_e2e.go
+++ b/internal/shim/placement/handle_allocations_e2e.go
@@ -9,8 +9,12 @@ import (
"encoding/json"
"fmt"
"net/http"
+ "time"
"github.com/cobaltcore-dev/cortex/pkg/conf"
+ hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
+ "github.com/gophercloud/gophercloud/v2"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
@@ -18,20 +22,11 @@ import (
// e2eTestAllocations tests the /allocations/{consumer_uuid} and
// POST /allocations (batch) endpoints.
//
-// 1. Pre-cleanup: DELETE leftover consumer allocations, RP, and custom RC.
-// 2. Create fixtures: PUT a custom resource class, POST a test RP, PUT
-// inventory on the RP (total=100).
-// 3. GET /allocations/{consumer1} — verify allocations are empty.
-// 4. PUT /allocations/{consumer1} — create an allocation of 10 units against
-// the test RP using fake project/user IDs.
-// 5. GET /allocations/{consumer1} — verify the allocation exists and points
-// to the test RP.
-// 6. POST /allocations — batch-create a second consumer's allocation of
-// 5 units against the same RP.
-// 7. GET /allocations/{consumer2} — verify the second allocation exists.
-// 8. DELETE /allocations/{consumer} — remove allocations for both consumers.
-// 9. Cleanup: DELETE the test RP and custom resource class.
-func e2eTestAllocations(ctx context.Context, _ client.Client) error {
+// In passthrough mode: exercises the upstream placement path with a
+// dynamically created resource provider and custom resource class.
+// In hybrid/crd mode: exercises the CRD-backed booking path using a
+// real KVM hypervisor discovered from the cluster.
+func e2eTestAllocations(ctx context.Context, cl client.Client) error {
log := logf.FromContext(ctx)
log.Info("Running allocations endpoint e2e test")
config, err := conf.GetConfig[e2eRootConfig]()
@@ -47,6 +42,20 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
}
log.Info("Successfully created openstack client for allocations e2e test")
+ mode := e2eCurrentMode(ctx)
+ switch mode {
+ case FeatureModePassthrough:
+ return e2ePassthroughAllocations(ctx, sc)
+ case FeatureModeHybrid, FeatureModeCRD:
+ return e2eCRDAllocations(ctx, sc, cl)
+ default:
+ return fmt.Errorf("unexpected mode %q", mode)
+ }
+}
+
+func e2ePassthroughAllocations(ctx context.Context, sc *gophercloud.ServiceClient) error {
+ log := logf.FromContext(ctx)
+
const testRPUUID = "e2e10000-0000-0000-0000-000000000007"
const testRPName = "cortex-e2e-test-rp-alloc"
const testRC = "CUSTOM_CORTEX_E2E_ALLOC_RC"
@@ -56,15 +65,6 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
const userID = "e2e50000-0000-0000-0000-000000000001"
const apiVersion = "placement 1.28"
- // Probe: for non-passthrough modes, verify endpoint returns 501.
- unimplemented, err := e2eProbeUnimplemented(ctx, sc, sc.Endpoint+"/allocations/"+consumerUUID1)
- if err != nil {
- return fmt.Errorf("probe: %w", err)
- }
- if unimplemented {
- return nil
- }
-
// Pre-cleanup: delete allocations, resource provider, and resource class.
log.Info("Pre-cleanup: deleting leftover test resources")
for _, cleanup := range []struct {
@@ -97,38 +97,28 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req, err := http.NewRequestWithContext(ctx,
http.MethodPut, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody)
if err != nil {
- log.Error(err, "failed to create PUT request for resource class", "class", testRC)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
req.Header.Set("OpenStack-API-Version", apiVersion)
resp, err := sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send PUT request for resource class", "class", testRC)
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "PUT /resource_classes returned an error", "class", testRC)
- return err
+ return fmt.Errorf("PUT /resource_classes: unexpected status %d", resp.StatusCode)
}
log.Info("Successfully created custom resource class", "class", testRC)
- log.Info("Creating test resource provider for allocations test",
- "uuid", testRPUUID, "name", testRPName)
- body, err := json.Marshal(map[string]string{
- "name": testRPName,
- "uuid": testRPUUID,
- })
+ log.Info("Creating test resource provider", "uuid", testRPUUID, "name", testRPName)
+ body, err := json.Marshal(map[string]string{"name": testRPName, "uuid": testRPUUID})
if err != nil {
- log.Error(err, "failed to marshal request body")
return err
}
req, err = http.NewRequestWithContext(ctx,
http.MethodPost, sc.Endpoint+"/resource_providers", bytes.NewReader(body))
if err != nil {
- log.Error(err, "failed to create POST request for resource_providers")
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -137,23 +127,18 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send POST request to /resource_providers")
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "POST /resource_providers returned an error")
- return err
+ return fmt.Errorf("POST /resource_providers: unexpected status %d", resp.StatusCode)
}
log.Info("Successfully created test resource provider", "uuid", testRPUUID)
// Get the generation for the resource provider.
- log.Info("Getting resource provider generation", "uuid", testRPUUID)
req, err = http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/inventories", http.NoBody)
if err != nil {
- log.Error(err, "failed to create GET request for RP inventories")
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -161,28 +146,22 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send GET request for RP inventories")
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "GET RP inventories returned an error")
- return err
+ return fmt.Errorf("GET RP inventories: unexpected status %d", resp.StatusCode)
}
var invResp struct {
ResourceProviderGeneration int `json:"resource_provider_generation"`
}
- err = json.NewDecoder(resp.Body).Decode(&invResp)
- if err != nil {
- log.Error(err, "failed to decode RP inventories response")
+ if err := json.NewDecoder(resp.Body).Decode(&invResp); err != nil {
return err
}
generation := invResp.ResourceProviderGeneration
// Set inventory on the resource provider.
- log.Info("Setting inventory on test resource provider",
- "uuid", testRPUUID, "class", testRC, "total", 100)
+ log.Info("Setting inventory on test resource provider", "total", 100)
putBody, err := json.Marshal(map[string]any{
"resource_provider_generation": generation,
"inventories": map[string]any{
@@ -190,14 +169,12 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
},
})
if err != nil {
- log.Error(err, "failed to marshal request body")
return err
}
req, err = http.NewRequestWithContext(ctx,
http.MethodPut, sc.Endpoint+"/resource_providers/"+testRPUUID+"/inventories",
bytes.NewReader(putBody))
if err != nil {
- log.Error(err, "failed to create PUT request for RP inventories")
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -206,23 +183,19 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send PUT request for RP inventories")
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "PUT RP inventories returned an error")
- return err
+ return fmt.Errorf("PUT RP inventories: unexpected status %d", resp.StatusCode)
}
- log.Info("Successfully set inventory on test resource provider", "uuid", testRPUUID)
+ log.Info("Successfully set inventory on test resource provider")
// Test GET /allocations/{consumer_uuid} (empty).
- log.Info("Testing GET /allocations/{consumer_uuid} (empty)", "consumer", consumerUUID1)
+ log.Info("Testing GET /allocations (empty)", "consumer", consumerUUID1)
req, err = http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID1, http.NoBody)
if err != nil {
- log.Error(err, "failed to create GET request for allocations", "consumer", consumerUUID1)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -230,50 +203,37 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send GET request for allocations", "consumer", consumerUUID1)
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "GET /allocations returned an error", "consumer", consumerUUID1)
- return err
+ return fmt.Errorf("GET /allocations (empty): unexpected status %d", resp.StatusCode)
}
var allocResp struct {
Allocations map[string]json.RawMessage `json:"allocations"`
}
- err = json.NewDecoder(resp.Body).Decode(&allocResp)
- if err != nil {
- log.Error(err, "failed to decode allocations response", "consumer", consumerUUID1)
+ if err := json.NewDecoder(resp.Body).Decode(&allocResp); err != nil {
return err
}
- log.Info("Successfully retrieved empty allocations for consumer",
- "consumer", consumerUUID1, "allocationCount", len(allocResp.Allocations))
+ log.Info("Successfully retrieved empty allocations", "count", len(allocResp.Allocations))
// Test PUT /allocations/{consumer_uuid} (create allocation).
- log.Info("Testing PUT /allocations/{consumer_uuid} to create allocation",
- "consumer", consumerUUID1, "rp", testRPUUID, "amount", 10)
+ log.Info("Testing PUT /allocations (create)", "consumer", consumerUUID1)
allocBody, err := json.Marshal(map[string]any{
"allocations": map[string]any{
- testRPUUID: map[string]any{
- "resources": map[string]int{
- testRC: 10,
- },
- },
+ testRPUUID: map[string]any{"resources": map[string]int{testRC: 10}},
},
"project_id": projectID,
"user_id": userID,
"consumer_generation": nil,
})
if err != nil {
- log.Error(err, "failed to marshal request body")
return err
}
req, err = http.NewRequestWithContext(ctx,
http.MethodPut, sc.Endpoint+"/allocations/"+consumerUUID1,
bytes.NewReader(allocBody))
if err != nil {
- log.Error(err, "failed to create PUT request for allocations", "consumer", consumerUUID1)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -282,25 +242,19 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send PUT request for allocations", "consumer", consumerUUID1)
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "PUT /allocations returned an error", "consumer", consumerUUID1)
- return err
+ return fmt.Errorf("PUT /allocations (create): unexpected status %d", resp.StatusCode)
}
- log.Info("Successfully created allocation for consumer",
- "consumer", consumerUUID1, "rp", testRPUUID)
+ log.Info("Successfully created allocation", "consumer", consumerUUID1)
// Test GET /allocations/{consumer_uuid} (after PUT).
- log.Info("Testing GET /allocations/{consumer_uuid} (after PUT)",
- "consumer", consumerUUID1)
+ log.Info("Testing GET /allocations (after PUT)", "consumer", consumerUUID1)
req, err = http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID1, http.NoBody)
if err != nil {
- log.Error(err, "failed to create GET request for allocations", "consumer", consumerUUID1)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -308,39 +262,26 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send GET request for allocations", "consumer", consumerUUID1)
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "GET /allocations returned an error", "consumer", consumerUUID1)
- return err
+ return fmt.Errorf("GET /allocations (after PUT): unexpected status %d", resp.StatusCode)
}
- err = json.NewDecoder(resp.Body).Decode(&allocResp)
- if err != nil {
- log.Error(err, "failed to decode allocations response", "consumer", consumerUUID1)
+ if err := json.NewDecoder(resp.Body).Decode(&allocResp); err != nil {
return err
}
if _, ok := allocResp.Allocations[testRPUUID]; !ok {
- err := fmt.Errorf("expected allocation against RP %s", testRPUUID)
- log.Error(err, "allocation not found", "consumer", consumerUUID1)
- return err
+ return fmt.Errorf("expected allocation against RP %s, got keys %v", testRPUUID, allocResp.Allocations)
}
- log.Info("Successfully verified allocation for consumer",
- "consumer", consumerUUID1, "allocationCount", len(allocResp.Allocations))
+ log.Info("Verified allocation exists after PUT")
// Test POST /allocations (batch manage) — create a second consumer.
- log.Info("Testing POST /allocations (batch) to create second consumer allocation",
- "consumer", consumerUUID2, "rp", testRPUUID, "amount", 5)
+ log.Info("Testing POST /allocations (batch)", "consumer", consumerUUID2)
batchBody, err := json.Marshal(map[string]any{
consumerUUID2: map[string]any{
"allocations": map[string]any{
- testRPUUID: map[string]any{
- "resources": map[string]int{
- testRC: 5,
- },
- },
+ testRPUUID: map[string]any{"resources": map[string]int{testRC: 5}},
},
"project_id": projectID,
"user_id": userID,
@@ -348,14 +289,12 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
},
})
if err != nil {
- log.Error(err, "failed to marshal request body")
return err
}
req, err = http.NewRequestWithContext(ctx,
http.MethodPost, sc.Endpoint+"/allocations",
bytes.NewReader(batchBody))
if err != nil {
- log.Error(err, "failed to create POST request for batch allocations")
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -364,24 +303,18 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send POST request for batch allocations")
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "POST /allocations returned an error")
- return err
+ return fmt.Errorf("POST /allocations (batch): unexpected status %d", resp.StatusCode)
}
- log.Info("Successfully created batch allocation for second consumer",
- "consumer", consumerUUID2)
+ log.Info("Successfully created batch allocation", "consumer", consumerUUID2)
// Verify the second consumer's allocation.
- log.Info("Verifying second consumer's allocation", "consumer", consumerUUID2)
req, err = http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID2, http.NoBody)
if err != nil {
- log.Error(err, "failed to create GET request for allocations", "consumer", consumerUUID2)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
@@ -389,52 +322,39 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send GET request for allocations", "consumer", consumerUUID2)
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "GET /allocations returned an error", "consumer", consumerUUID2)
- return err
+ return fmt.Errorf("GET /allocations (consumer2): unexpected status %d", resp.StatusCode)
}
- err = json.NewDecoder(resp.Body).Decode(&allocResp)
- if err != nil {
- log.Error(err, "failed to decode allocations response", "consumer", consumerUUID2)
+ if err := json.NewDecoder(resp.Body).Decode(&allocResp); err != nil {
return err
}
if _, ok := allocResp.Allocations[testRPUUID]; !ok {
- err := fmt.Errorf("expected allocation against RP %s", testRPUUID)
- log.Error(err, "allocation not found for second consumer", "consumer", consumerUUID2)
- return err
+ return fmt.Errorf("expected allocation for consumer2 against RP %s", testRPUUID)
}
- log.Info("Successfully verified second consumer's allocation",
- "consumer", consumerUUID2)
+ log.Info("Verified second consumer's allocation")
// Test DELETE /allocations/{consumer_uuid} for both consumers.
for _, consumer := range []string{consumerUUID1, consumerUUID2} {
- log.Info("Testing DELETE /allocations/{consumer_uuid}", "consumer", consumer)
+ log.Info("Testing DELETE /allocations", "consumer", consumer)
req, err = http.NewRequestWithContext(ctx,
http.MethodDelete, sc.Endpoint+"/allocations/"+consumer, http.NoBody)
if err != nil {
- log.Error(err, "failed to create DELETE request for allocations", "consumer", consumer)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
req.Header.Set("OpenStack-API-Version", apiVersion)
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send DELETE request for allocations", "consumer", consumer)
return err
}
+ resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- resp.Body.Close()
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "DELETE /allocations returned an error", "consumer", consumer)
- return err
+ return fmt.Errorf("DELETE /allocations: unexpected status %d for consumer %s", resp.StatusCode, consumer)
}
- resp.Body.Close()
- log.Info("Successfully deleted allocation for consumer", "consumer", consumer)
+ log.Info("Successfully deleted allocation", "consumer", consumer)
}
// Cleanup: delete the resource provider and custom resource class.
@@ -442,48 +362,379 @@ func e2eTestAllocations(ctx context.Context, _ client.Client) error {
req, err = http.NewRequestWithContext(ctx,
http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody)
if err != nil {
- log.Error(err, "failed to create DELETE request for resource provider", "uuid", testRPUUID)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
req.Header.Set("OpenStack-API-Version", apiVersion)
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send DELETE request for resource provider", "uuid", testRPUUID)
+ return err
+ }
+ resp.Body.Close()
+
+ req, err = http.NewRequestWithContext(ctx,
+ http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ resp, err = sc.HTTPClient.Do(req)
+ if err != nil {
+ return err
+ }
+ resp.Body.Close()
+ log.Info("Cleanup complete")
+
+ return nil
+}
+
+// e2eCRDAllocations tests the CRD/hybrid path by discovering a real KVM
+// hypervisor in the cluster, writing a booking to it, and then exercising
+// GET/PUT/DELETE/POST through the shim's allocation handlers.
+func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl client.Client) error {
+ log := logf.FromContext(ctx)
+
+ const consumerUUID = "e2e20000-0000-0000-0000-000000000010"
+ const consumerUUID2 = "e2e20000-0000-0000-0000-000000000011"
+ const projectID = "e2e40000-0000-0000-0000-000000000001"
+ const userID = "e2e50000-0000-0000-0000-000000000001"
+ const apiVersion = "placement 1.28"
+
+ // Discover a KVM hypervisor with a non-empty OpenStack ID.
+ var hvs hv1.HypervisorList
+ if err := cl.List(ctx, &hvs); err != nil {
+ return fmt.Errorf("failed to list hypervisors: %w", err)
+ }
+ var kvmHV *hv1.Hypervisor
+ for i := range hvs.Items {
+ if hvs.Items[i].Status.HypervisorID != "" {
+ kvmHV = &hvs.Items[i]
+ break
+ }
+ }
+ if kvmHV == nil {
+ log.Info("No KVM hypervisors with OpenStack ID found, skipping CRD allocations tests")
+ return nil
+ }
+ kvmUUID := kvmHV.Status.HypervisorID
+ log.Info("Using KVM hypervisor for CRD allocations e2e", "uuid", kvmUUID, "name", kvmHV.Name)
+
+ // Save original bookings for restoration.
+ originalBookings := kvmHV.Spec.Bookings
+
+ // Always restore original bookings on exit.
+ defer func() {
+ log.Info("Restoring original bookings", "name", kvmHV.Name)
+ for range 5 {
+ if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil {
+ log.Error(err, "failed to refetch hypervisor for restoration")
+ return
+ }
+ kvmHV.Spec.Bookings = originalBookings
+ if err := cl.Update(ctx, kvmHV); err != nil {
+ if apierrors.IsConflict(err) {
+ continue
+ }
+ log.Error(err, "failed to restore original bookings")
+ return
+ }
+ return
+ }
+ log.Error(nil, "exhausted retries restoring original bookings")
+ }()
+
+ // Pre-cleanup: remove any leftover test bookings from prior runs.
+ kvmHV.Spec.Bookings = removeTestBookings(kvmHV.Spec.Bookings, consumerUUID, consumerUUID2)
+ if err := cl.Update(ctx, kvmHV); err != nil {
+ return fmt.Errorf("pre-cleanup: failed to remove leftover bookings: %w", err)
+ }
+ if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil {
+ return fmt.Errorf("failed to refetch hypervisor after pre-cleanup: %w", err)
+ }
+
+ // 1. Test GET /allocations/{consumer_uuid} — empty (consumer not booked).
+ log.Info("Testing GET /allocations (empty, CRD)", "consumer", consumerUUID)
+ if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ req, err := http.NewRequestWithContext(ctx,
+ http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody)
+ if err != nil {
+ return false, err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Accept", "application/json")
+ resp, err := sc.HTTPClient.Do(req)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (empty): expected 200, got %d", resp.StatusCode)
+ }
+ var r allocationsResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ return false, err
+ }
+ return len(r.Allocations) == 0, nil
+ }); err != nil {
+ return fmt.Errorf("GET empty allocations: %w", err)
+ }
+ log.Info("Verified empty allocations for unbooked consumer")
+
+ // 2. Test PUT /allocations/{consumer_uuid} — create allocation (new consumer).
+ log.Info("Testing PUT /allocations (create, CRD)", "consumer", consumerUUID, "rp", kvmUUID)
+ allocBody, err := json.Marshal(map[string]any{
+ "allocations": map[string]any{
+ kvmUUID: map[string]any{"resources": map[string]int64{"VCPU": 2, "MEMORY_MB": 4096}},
+ },
+ "project_id": projectID,
+ "user_id": userID,
+ "consumer_generation": nil,
+ })
+ if err != nil {
+ return err
+ }
+ req, err := http.NewRequestWithContext(ctx,
+ http.MethodPut, sc.Endpoint+"/allocations/"+consumerUUID,
+ bytes.NewReader(allocBody))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+ resp, err := sc.HTTPClient.Do(req)
+ if err != nil {
return err
}
defer resp.Body.Close()
- if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "DELETE resource provider returned an error", "uuid", testRPUUID)
+ if resp.StatusCode != http.StatusNoContent {
+ return fmt.Errorf("PUT /allocations (create): expected 204, got %d", resp.StatusCode)
+ }
+ log.Info("Successfully created allocation via PUT (CRD)")
+
+ // 3. Test GET /allocations/{consumer_uuid} — verify booking present.
+ log.Info("Testing GET /allocations (after PUT, CRD)", "consumer", consumerUUID)
+ var getResp allocationsResponse
+ if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ req, err := http.NewRequestWithContext(ctx,
+ http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody)
+ if err != nil {
+ return false, err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Accept", "application/json")
+ resp, err := sc.HTTPClient.Do(req)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (after PUT): expected 200, got %d", resp.StatusCode)
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&getResp); err != nil {
+ return false, err
+ }
+ _, ok := getResp.Allocations[kvmUUID]
+ return ok, nil
+ }); err != nil {
+ return fmt.Errorf("GET allocations after PUT: %w (resp: %+v)", err, getResp)
+ }
+ if getResp.Allocations[kvmUUID].Resources["VCPU"] != 2 {
+ return fmt.Errorf("VCPU = %d, want 2", getResp.Allocations[kvmUUID].Resources["VCPU"])
+ }
+ if getResp.Allocations[kvmUUID].Resources["MEMORY_MB"] != 4096 {
+ return fmt.Errorf("MEMORY_MB = %d, want 4096", getResp.Allocations[kvmUUID].Resources["MEMORY_MB"])
+ }
+ log.Info("Verified allocation present after PUT (CRD)")
+
+ // 4. Test PUT with wrong consumer_generation — should 409.
+ log.Info("Testing PUT /allocations (stale generation, CRD)")
+ staleBody, err := json.Marshal(map[string]any{
+ "allocations": map[string]any{
+ kvmUUID: map[string]any{"resources": map[string]int64{"VCPU": 8}},
+ },
+ "project_id": projectID,
+ "user_id": userID,
+ "consumer_generation": 999,
+ })
+ if err != nil {
return err
}
- log.Info("Successfully deleted test resource provider", "uuid", testRPUUID)
+ req, err = http.NewRequestWithContext(ctx,
+ http.MethodPut, sc.Endpoint+"/allocations/"+consumerUUID,
+ bytes.NewReader(staleBody))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+ resp, err = sc.HTTPClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusConflict {
+ return fmt.Errorf("PUT /allocations (stale gen): expected 409, got %d", resp.StatusCode)
+ }
+ log.Info("Verified generation conflict returns 409 (CRD)")
+ // 5. Test POST /allocations (batch) — create a second consumer.
+ log.Info("Testing POST /allocations (batch, CRD)", "consumer", consumerUUID2)
+ batchBody, err := json.Marshal(map[string]any{
+ consumerUUID2: map[string]any{
+ "allocations": map[string]any{
+ kvmUUID: map[string]any{"resources": map[string]int64{"VCPU": 1, "MEMORY_MB": 2048}},
+ },
+ "project_id": projectID,
+ "user_id": userID,
+ "consumer_generation": nil,
+ },
+ })
+ if err != nil {
+ return err
+ }
req, err = http.NewRequestWithContext(ctx,
- http.MethodDelete, sc.Endpoint+"/resource_classes/"+testRC, http.NoBody)
+ http.MethodPost, sc.Endpoint+"/allocations",
+ bytes.NewReader(batchBody))
if err != nil {
- log.Error(err, "failed to create DELETE request for resource class", "class", testRC)
return err
}
req.Header.Set("X-Auth-Token", sc.TokenID)
req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
resp, err = sc.HTTPClient.Do(req)
if err != nil {
- log.Error(err, "failed to send DELETE request for resource class", "class", testRC)
return err
}
defer resp.Body.Close()
- if resp.StatusCode < 200 || resp.StatusCode >= 300 {
- err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
- log.Error(err, "DELETE resource class returned an error", "class", testRC)
+ if resp.StatusCode != http.StatusNoContent {
+ return fmt.Errorf("POST /allocations (batch): expected 204, got %d", resp.StatusCode)
+ }
+ log.Info("Successfully created batch allocation (CRD)")
+
+ // Verify second consumer's allocation.
+ var getResp2 allocationsResponse
+ if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ req, err := http.NewRequestWithContext(ctx,
+ http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID2, http.NoBody)
+ if err != nil {
+ return false, err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Accept", "application/json")
+ resp, err := sc.HTTPClient.Do(req)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (consumer2): expected 200, got %d", resp.StatusCode)
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&getResp2); err != nil {
+ return false, err
+ }
+ _, ok := getResp2.Allocations[kvmUUID]
+ return ok, nil
+ }); err != nil {
+ return fmt.Errorf("GET allocations (consumer2): %w", err)
+ }
+ log.Info("Verified second consumer's allocation (CRD)")
+
+ // 6. Test DELETE /allocations/{consumer_uuid}.
+ for _, consumer := range []string{consumerUUID, consumerUUID2} {
+ log.Info("Testing DELETE /allocations (CRD)", "consumer", consumer)
+ req, err = http.NewRequestWithContext(ctx,
+ http.MethodDelete, sc.Endpoint+"/allocations/"+consumer, http.NoBody)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ resp, err = sc.HTTPClient.Do(req)
+ if err != nil {
+ return err
+ }
+ resp.Body.Close()
+ if resp.StatusCode != http.StatusNoContent {
+ return fmt.Errorf("DELETE /allocations: expected 204, got %d for consumer %s", resp.StatusCode, consumer)
+ }
+ log.Info("Successfully deleted allocation (CRD)", "consumer", consumer)
+ }
+
+ // 7. Verify GET after DELETE returns empty.
+ if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ req, err := http.NewRequestWithContext(ctx,
+ http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody)
+ if err != nil {
+ return false, err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ req.Header.Set("Accept", "application/json")
+ resp, err := sc.HTTPClient.Do(req)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (post-delete): expected 200, got %d", resp.StatusCode)
+ }
+ var r allocationsResponse
+ if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ return false, err
+ }
+ return len(r.Allocations) == 0, nil
+ }); err != nil {
+ return fmt.Errorf("GET allocations after DELETE: %w", err)
+ }
+ log.Info("Verified allocations empty after DELETE (CRD)")
+
+ // 8. Test DELETE /allocations for unknown consumer — should 404.
+ unknownConsumer := "e2e20000-0000-0000-0000-ffffffffffff"
+ req, err = http.NewRequestWithContext(ctx,
+ http.MethodDelete, sc.Endpoint+"/allocations/"+unknownConsumer, http.NoBody)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-Auth-Token", sc.TokenID)
+ req.Header.Set("OpenStack-API-Version", apiVersion)
+ resp, err = sc.HTTPClient.Do(req)
+ if err != nil {
return err
}
- log.Info("Successfully deleted custom resource class", "class", testRC)
+ resp.Body.Close()
+ if resp.StatusCode != http.StatusNotFound {
+ return fmt.Errorf("DELETE /allocations (unknown): expected 404, got %d", resp.StatusCode)
+ }
+ log.Info("Verified DELETE for unknown consumer returns 404 (CRD)")
return nil
}
+// removeTestBookings removes consumer bookings matching any of the given UUIDs.
+func removeTestBookings(bookings []hv1.Booking, uuids ...string) []hv1.Booking {
+ uuidSet := make(map[string]bool, len(uuids))
+ for _, u := range uuids {
+ uuidSet[u] = true
+ }
+ var kept []hv1.Booking
+ for i := range bookings {
+ if bookings[i].Consumer != nil && uuidSet[bookings[i].Consumer.UUID] {
+ continue
+ }
+ kept = append(kept, bookings[i])
+ }
+ return kept
+}
+
func init() {
e2eTests = append(e2eTests, e2eTest{name: "allocations", run: e2eWrapWithModes(e2eTestAllocations)})
}
diff --git a/internal/shim/placement/handle_allocations_test.go b/internal/shim/placement/handle_allocations_test.go
index e39d5c43a..e32be5747 100644
--- a/internal/shim/placement/handle_allocations_test.go
+++ b/internal/shim/placement/handle_allocations_test.go
@@ -4,10 +4,22 @@
package placement
import (
+ "encoding/json"
"net/http"
+ "net/http/httptest"
+ "strings"
"testing"
+
+ hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "sigs.k8s.io/controller-runtime/pkg/client"
)
+// ---------------------------------------------------------------------------
+// Passthrough mode tests (unchanged behavior)
+// ---------------------------------------------------------------------------
+
func TestHandleManageAllocations(t *testing.T) {
var gotPath string
s := newTestShim(t, http.StatusNoContent, "", &gotPath)
@@ -77,84 +89,330 @@ func TestHandleDeleteAllocations(t *testing.T) {
})
}
-func TestHandleAllocations_HybridMode(t *testing.T) {
- down, up := newTestTimers()
- s := &Shim{
- config: config{
- PlacementURL: "http://should-not-be-called:1234",
- Features: featuresConfig{Allocations: FeatureModeHybrid},
+// ---------------------------------------------------------------------------
+// Helper to create a test shim with allocations feature mode set.
+// ---------------------------------------------------------------------------
+
+const (
+ testConsumerUUID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
+ testRPUUID = "11111111-2222-3333-4444-555555555555"
+ testRPUUID2 = "66666666-7777-8888-9999-aaaaaaaaaaaa"
+)
+
+func testHypervisorWithBooking(name, openstackID, consumerUUID string, vcpu int64, memMB int64) *hv1.Hypervisor {
+ gen := int64(1)
+ return &hv1.Hypervisor{
+ ObjectMeta: metav1.ObjectMeta{Name: name},
+ Status: hv1.HypervisorStatus{HypervisorID: openstackID},
+ Spec: hv1.HypervisorSpec{
+ Bookings: []hv1.Booking{
+ {Consumer: &hv1.ConsumerBooking{
+ UUID: consumerUUID,
+ Resources: map[hv1.ResourceName]resource.Quantity{hv1.ResourceCPU: *resource.NewQuantity(vcpu, resource.DecimalSI), hv1.ResourceMemory: *resource.NewQuantity(memMB*1024*1024, resource.BinarySI)},
+ ConsumerGeneration: &gen,
+ ProjectID: "proj-1",
+ UserID: "user-1",
+ }},
+ },
},
- maxBodyLogSize: 4096,
- downstreamRequestTimer: down,
- upstreamRequestTimer: up,
}
- t.Run("POST returns 501", func(t *testing.T) {
- w := serveHandler(t, "POST", "/allocations",
- s.HandleManageAllocations, "/allocations")
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
- t.Run("GET returns 501", func(t *testing.T) {
- w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
- s.HandleListAllocations, "/allocations/"+validUUID)
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
- t.Run("PUT returns 501", func(t *testing.T) {
- w := serveHandler(t, "PUT", "/allocations/{consumer_uuid}",
- s.HandleUpdateAllocations, "/allocations/"+validUUID)
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
- t.Run("DELETE returns 501", func(t *testing.T) {
- w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
- s.HandleDeleteAllocations, "/allocations/"+validUUID)
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
}
-func TestHandleAllocations_CRDMode(t *testing.T) {
+func newAllocationsTestShim(t *testing.T, mode FeatureMode, upstreamStatus int, upstreamBody string, hvs ...client.Object) *Shim {
+ t.Helper()
+ upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(upstreamStatus)
+ w.Write([]byte(upstreamBody)) //nolint:errcheck
+ }))
+ t.Cleanup(upstream.Close)
down, up := newTestTimers()
- s := &Shim{
+ return &Shim{
+ Client: newFakeClient(t, hvs...),
config: config{
- PlacementURL: "http://should-not-be-called:1234",
- Features: featuresConfig{Allocations: FeatureModeCRD},
+ PlacementURL: upstream.URL,
+ Features: featuresConfig{Allocations: mode},
},
+ httpClient: upstream.Client(),
maxBodyLogSize: 4096,
downstreamRequestTimer: down,
upstreamRequestTimer: up,
}
- t.Run("POST returns 501", func(t *testing.T) {
- w := serveHandler(t, "POST", "/allocations",
- s.HandleManageAllocations, "/allocations")
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
- t.Run("GET returns 501", func(t *testing.T) {
- w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
- s.HandleListAllocations, "/allocations/"+validUUID)
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
- t.Run("PUT returns 501", func(t *testing.T) {
- w := serveHandler(t, "PUT", "/allocations/{consumer_uuid}",
- s.HandleUpdateAllocations, "/allocations/"+validUUID)
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
- t.Run("DELETE returns 501", func(t *testing.T) {
- w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
- s.HandleDeleteAllocations, "/allocations/"+validUUID)
- if w.Code != http.StatusNotImplemented {
- t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented)
- }
- })
+}
+
+// ---------------------------------------------------------------------------
+// CRD mode: PUT
+// ---------------------------------------------------------------------------
+
+func TestUpdateAllocations_CRD_NewConsumer(t *testing.T) {
+ hv := &hv1.Hypervisor{
+ ObjectMeta: metav1.ObjectMeta{Name: "hv-node-1"},
+ Status: hv1.HypervisorStatus{HypervisorID: testRPUUID},
+ }
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
+
+ body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4,"MEMORY_MB":8192}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}`
+ w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}",
+ s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body))
+ if w.Code != http.StatusNoContent {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String())
+ }
+}
+
+func TestUpdateAllocations_CRD_GenerationMismatch(t *testing.T) {
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 2, 4096)
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
+
+ wrongGen := int64(99)
+ _ = wrongGen
+ body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4,"MEMORY_MB":8192}}},"consumer_generation":99,"project_id":"proj-1","user_id":"user-1"}`
+ w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}",
+ s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body))
+ if w.Code != http.StatusConflict {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusConflict, w.Body.String())
+ }
+}
+
+func TestUpdateAllocations_CRD_UnknownRP(t *testing.T) {
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "")
+
+ body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}`
+ w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}",
+ s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body))
+ if w.Code != http.StatusBadRequest {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusBadRequest, w.Body.String())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// CRD mode: GET
+// ---------------------------------------------------------------------------
+
+func TestListAllocations_CRD_Found(t *testing.T) {
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
+
+ w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
+ s.HandleListAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusOK {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String())
+ }
+
+ var resp allocationsResponse
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to decode response: %v", err)
+ }
+ if _, ok := resp.Allocations[testRPUUID]; !ok {
+ t.Fatalf("expected allocation for RP %s, got %v", testRPUUID, resp.Allocations)
+ }
+ if resp.Allocations[testRPUUID].Resources["VCPU"] != 4 {
+ t.Fatalf("VCPU = %d, want 4", resp.Allocations[testRPUUID].Resources["VCPU"])
+ }
+ if resp.Allocations[testRPUUID].Resources["MEMORY_MB"] != 8192 {
+ t.Fatalf("MEMORY_MB = %d, want 8192", resp.Allocations[testRPUUID].Resources["MEMORY_MB"])
+ }
+}
+
+func TestListAllocations_CRD_NotFound(t *testing.T) {
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "")
+
+ w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
+ s.HandleListAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusOK {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String())
+ }
+
+ var resp allocationsResponse
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to decode response: %v", err)
+ }
+ if len(resp.Allocations) != 0 {
+ t.Fatalf("expected empty allocations, got %v", resp.Allocations)
+ }
+}
+
+func TestListAllocations_CRD_MultiCR(t *testing.T) {
+ hv1obj := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ hv2obj := testHypervisorWithBooking("hv-node-2", testRPUUID2, testConsumerUUID, 2, 4096)
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv1obj, hv2obj)
+
+ w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
+ s.HandleListAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusOK {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String())
+ }
+
+ var resp allocationsResponse
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to decode response: %v", err)
+ }
+ if len(resp.Allocations) != 2 {
+ t.Fatalf("expected 2 allocations, got %d: %v", len(resp.Allocations), resp.Allocations)
+ }
+}
+
+// ---------------------------------------------------------------------------
+// CRD mode: DELETE
+// ---------------------------------------------------------------------------
+
+func TestDeleteAllocations_CRD_Found(t *testing.T) {
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
+
+ w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
+ s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusNoContent {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String())
+ }
+}
+
+func TestDeleteAllocations_CRD_NotFound(t *testing.T) {
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "")
+
+ w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
+ s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusNotFound {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNotFound, w.Body.String())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Hybrid mode: PUT (KVM-only allocation, skips upstream)
+// ---------------------------------------------------------------------------
+
+func TestUpdateAllocations_Hybrid_KVMOnly(t *testing.T) {
+ hv := &hv1.Hypervisor{
+ ObjectMeta: metav1.ObjectMeta{Name: "hv-node-1"},
+ Status: hv1.HypervisorStatus{HypervisorID: testRPUUID},
+ }
+ s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusInternalServerError, "should not be called", hv)
+
+ body := `{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":4,"MEMORY_MB":8192}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}`
+ w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}",
+ s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body))
+ if w.Code != http.StatusNoContent {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Hybrid mode: PUT (non-KVM allocation, forwards to upstream)
+// ---------------------------------------------------------------------------
+
+func TestUpdateAllocations_Hybrid_NonKVMOnly(t *testing.T) {
+ s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusNoContent, "")
+
+ unknownRP := "99999999-9999-9999-9999-999999999999"
+ body := `{"allocations":{"` + unknownRP + `":{"resources":{"VCPU":4}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}`
+ w := serveHandlerWithBody(t, "PUT", "/allocations/{consumer_uuid}",
+ s.HandleUpdateAllocations, "/allocations/"+testConsumerUUID, strings.NewReader(body))
+ if w.Code != http.StatusNoContent {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Hybrid mode: GET (merges upstream + CRD)
+// ---------------------------------------------------------------------------
+
+func TestListAllocations_Hybrid_Merge(t *testing.T) {
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ upstreamBody := `{"allocations":{"upstream-rp-uuid":{"resources":{"VCPU":2}}},"consumer_generation":1,"project_id":"proj-1","user_id":"user-1"}`
+ s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusOK, upstreamBody, hv)
+
+ w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
+ s.HandleListAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusOK {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String())
+ }
+
+ var resp allocationsResponse
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to decode response: %v", err)
+ }
+ if len(resp.Allocations) != 2 {
+ t.Fatalf("expected 2 allocations (upstream + CRD), got %d: %v", len(resp.Allocations), resp.Allocations)
+ }
+ if _, ok := resp.Allocations["upstream-rp-uuid"]; !ok {
+ t.Fatal("expected upstream allocation in merged response")
+ }
+ if _, ok := resp.Allocations[testRPUUID]; !ok {
+ t.Fatal("expected CRD allocation in merged response")
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Hybrid mode: DELETE (upstream first, then CRD)
+// ---------------------------------------------------------------------------
+
+func TestDeleteAllocations_Hybrid(t *testing.T) {
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusNoContent, "", hv)
+
+ w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
+ s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusNoContent {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String())
+ }
+}
+
+func TestDeleteAllocations_Hybrid_UpstreamFails(t *testing.T) {
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusInternalServerError, "upstream error", hv)
+
+ w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
+ s.HandleDeleteAllocations, "/allocations/"+testConsumerUUID)
+ if w.Code != http.StatusInternalServerError {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusInternalServerError, w.Body.String())
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Resource translation unit tests
+// ---------------------------------------------------------------------------
+
+func TestPlacementToHVResources(t *testing.T) {
+ in := map[string]int64{"VCPU": 4, "MEMORY_MB": 8192}
+ out := placementToHVResources(in)
+ if out[hv1.ResourceCPU] != *resource.NewQuantity(4, resource.DecimalSI) {
+ t.Fatalf("cpu = %v, want 4", out[hv1.ResourceCPU])
+ }
+ expectedMem := resource.NewQuantity(8192*1024*1024, resource.BinarySI)
+ gotMem := out[hv1.ResourceMemory]
+ if gotMem.Cmp(*expectedMem) != 0 {
+ t.Fatalf("memory = %v, want %v", gotMem, expectedMem)
+ }
+}
+
+func TestHVToPlacementResources(t *testing.T) {
+ in := map[hv1.ResourceName]resource.Quantity{
+ hv1.ResourceCPU: *resource.NewQuantity(4, resource.DecimalSI),
+ hv1.ResourceMemory: *resource.NewQuantity(8192*1024*1024, resource.BinarySI),
+ }
+ out := hvToPlacementResources(in)
+ if out["VCPU"] != 4 {
+ t.Fatalf("VCPU = %d, want 4", out["VCPU"])
+ }
+ if out["MEMORY_MB"] != 8192 {
+ t.Fatalf("MEMORY_MB = %d, want 8192", out["MEMORY_MB"])
+ }
+}
+
+// ---------------------------------------------------------------------------
+// CRD mode: POST (batch)
+// ---------------------------------------------------------------------------
+
+func TestManageAllocations_CRD(t *testing.T) {
+ hv := &hv1.Hypervisor{
+ ObjectMeta: metav1.ObjectMeta{Name: "hv-node-1"},
+ Status: hv1.HypervisorStatus{HypervisorID: testRPUUID},
+ }
+ s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
+
+ body := `{"` + testConsumerUUID + `":{"allocations":{"` + testRPUUID + `":{"resources":{"VCPU":2}}},"consumer_generation":null,"project_id":"proj-1","user_id":"user-1"}}`
+ w := serveHandlerWithBody(t, "POST", "/allocations",
+ s.HandleManageAllocations, "/allocations", strings.NewReader(body))
+ if w.Code != http.StatusNoContent {
+ t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusNoContent, w.Body.String())
+ }
}
diff --git a/internal/shim/placement/handle_resource_providers_test.go b/internal/shim/placement/handle_resource_providers_test.go
index 8c73db39e..37cde7e09 100644
--- a/internal/shim/placement/handle_resource_providers_test.go
+++ b/internal/shim/placement/handle_resource_providers_test.go
@@ -81,6 +81,21 @@ func newFakeClient(t *testing.T, objs ...client.Object) client.Client {
}
return []string{hv.Name}
})
+ builder = builder.WithIndex(&hv1.Hypervisor{}, idxBookingConsumerUUID, func(obj client.Object) []string {
+ hv, ok := obj.(*hv1.Hypervisor)
+ if !ok {
+ return nil
+ }
+ consumers := hv1.GetConsumers(hv.Spec.Bookings)
+ if len(consumers) == 0 {
+ return nil
+ }
+ uuids := make([]string, 0, len(consumers))
+ for _, c := range consumers {
+ uuids = append(uuids, c.UUID)
+ }
+ return uuids
+ })
return builder.Build()
}
From 4e42d6f25d9a06a512b305d8a6b722012138c738 Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Mon, 4 May 2026 13:43:12 +0200
Subject: [PATCH 2/3] Improve allocations handler code organization and
robustness
- Reorganize file so lowercase handlers sit directly below their
corresponding exported Handle* functions
- Remove section divider comments between function groups
- Add doc comments to all unexported handler functions
- Fix listAllocationsHybrid to bubble up non-OK upstream responses
instead of falling through to CRD queries
- Fix deleteAllocationsHybrid to check CRD existence first and use
statusRecorder consistently with other hybrid handlers
- Replace //nolint:errcheck with proper error handling
- Use hv1.GroupVersion for GroupResource instead of hardcoded literal
- Fix lint issues: variable shadowing in e2e closures, unused imports,
constant parameters (e2ePollUntil, testHypervisorWithBooking)
- Add info logging at key decision points in hybrid/crd handlers
---
internal/shim/placement/handle_allocations.go | 595 +++++++++---------
.../shim/placement/handle_allocations_e2e.go | 63 +-
.../shim/placement/handle_allocations_test.go | 20 +-
...handle_resource_provider_aggregates_e2e.go | 3 +-
.../handle_resource_provider_traits_e2e.go | 3 +-
internal/shim/placement/shim_e2e.go | 9 +-
6 files changed, 363 insertions(+), 330 deletions(-)
diff --git a/internal/shim/placement/handle_allocations.go b/internal/shim/placement/handle_allocations.go
index a5bf35c37..e4b7393da 100644
--- a/internal/shim/placement/handle_allocations.go
+++ b/internal/shim/placement/handle_allocations.go
@@ -12,9 +12,8 @@ import (
"net/http"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
- "k8s.io/apimachinery/pkg/api/resource"
apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)
@@ -111,6 +110,165 @@ func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) {
}
}
+// manageAllocationsRequest represents the batch body for POST /allocations.
+// It is keyed by consumer UUID.
+type manageAllocationsRequest map[string]allocationsRequest
+
+// manageAllocationsHybrid handles the batch POST by splitting each consumer's
+// allocations into KVM and non-KVM sets. Non-KVM allocations are forwarded to
+// upstream as a batch; KVM allocations are written to Hypervisor CRDs.
+func (s *Shim) manageAllocationsHybrid(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ bodyBytes, err := io.ReadAll(r.Body)
+ if err != nil {
+ log.Error(err, "failed to read request body")
+ http.Error(w, "failed to read request body", http.StatusBadRequest)
+ return
+ }
+
+ var batch manageAllocationsRequest
+ if err := json.Unmarshal(bodyBytes, &batch); err != nil {
+ http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ log.Info("managing batch allocations (hybrid)", "consumers", len(batch))
+
+ // Separate KVM from non-KVM across all consumers.
+ type kvmWork struct {
+ consumerUUID string
+ req *allocationsRequest
+ kvmAllocs map[string]allocationEntry
+ kvmHypervisors map[string]*hv1.Hypervisor
+ }
+ var kvmWorkItems []kvmWork
+ nonKvmBatch := make(manageAllocationsRequest)
+
+ for consumerUUID, consumerReq := range batch {
+ kvmA := make(map[string]allocationEntry)
+ nonKvmA := make(map[string]allocationEntry)
+ kvmHVs := make(map[string]*hv1.Hypervisor)
+
+ for rpUUID, entry := range consumerReq.Allocations {
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 {
+ kvmA[rpUUID] = entry
+ hv := hvs.Items[0]
+ kvmHVs[rpUUID] = &hv
+ } else {
+ nonKvmA[rpUUID] = entry
+ }
+ }
+
+ if len(nonKvmA) > 0 {
+ nonKvmBatch[consumerUUID] = allocationsRequest{
+ Allocations: nonKvmA,
+ ConsumerGeneration: consumerReq.ConsumerGeneration,
+ ProjectID: consumerReq.ProjectID,
+ UserID: consumerReq.UserID,
+ ConsumerType: consumerReq.ConsumerType,
+ }
+ }
+ if len(kvmA) > 0 {
+ cr := consumerReq
+ kvmWorkItems = append(kvmWorkItems, kvmWork{
+ consumerUUID: consumerUUID,
+ req: &cr,
+ kvmAllocs: kvmA,
+ kvmHypervisors: kvmHVs,
+ })
+ }
+ }
+
+ // Forward non-KVM portion to upstream first.
+ if len(nonKvmBatch) > 0 {
+ log.Info("forwarding non-KVM allocations to upstream (hybrid batch)", "consumers", len(nonKvmBatch))
+ upstreamBody, err := json.Marshal(nonKvmBatch)
+ if err != nil {
+ log.Error(err, "failed to marshal upstream batch request")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ r.Body = io.NopCloser(bytes.NewReader(upstreamBody))
+ rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)}
+ s.forward(rec, r)
+ if rec.statusCode >= 300 {
+ for k, vs := range rec.header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(rec.statusCode)
+ if _, err := w.Write(rec.body.Bytes()); err != nil {
+ log.Error(err, "failed to write response body")
+ }
+ return
+ }
+ }
+
+ // Write KVM bookings.
+ for _, work := range kvmWorkItems {
+ if err := s.writeKVMBookings(ctx, work.consumerUUID, work.req, work.kvmAllocs, work.kvmHypervisors); err != nil {
+ log.Error(err, "failed to write KVM bookings in batch", "consumer", work.consumerUUID)
+ if apierrors.IsConflict(err) {
+ http.Error(w, "consumer generation conflict", http.StatusConflict)
+ return
+ }
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// manageAllocationsCRD handles the batch POST exclusively via CRDs. All
+// resource providers across all consumers must resolve to known Hypervisor CRs.
+func (s *Shim) manageAllocationsCRD(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ var batch manageAllocationsRequest
+ if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
+ http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ log.Info("managing batch allocations (crd)", "consumers", len(batch))
+
+ for consumerUUID, consumerReq := range batch {
+ kvmAllocs := make(map[string]allocationEntry)
+ kvmHypervisors := make(map[string]*hv1.Hypervisor)
+
+ for rpUUID, entry := range consumerReq.Allocations {
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 {
+ log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID)
+ http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest)
+ return
+ }
+ kvmAllocs[rpUUID] = entry
+ hv := hvs.Items[0]
+ kvmHypervisors[rpUUID] = &hv
+ }
+
+ cr := consumerReq
+ if err := s.writeKVMBookings(ctx, consumerUUID, &cr, kvmAllocs, kvmHypervisors); err != nil {
+ log.Error(err, "failed to write KVM bookings in batch", "consumer", consumerUUID)
+ if apierrors.IsConflict(err) {
+ http.Error(w, "consumer generation conflict", http.StatusConflict)
+ return
+ }
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
// HandleListAllocations handles GET /allocations/{consumer_uuid} requests.
//
// Returns all allocation records for the consumer identified by
@@ -141,6 +299,110 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) {
}
}
+// listAllocationsHybrid merges allocations from upstream Placement with
+// bookings stored in Hypervisor CRDs. CRD data takes precedence when the same
+// resource provider appears in both sources.
+func (s *Shim) listAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) {
+ if resp.StatusCode != http.StatusOK {
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ log.Error(err, "failed to read upstream response body")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ for k, vs := range resp.Header {
+ for _, v := range vs {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(resp.StatusCode)
+ if _, err := w.Write(body); err != nil {
+ log.Error(err, "failed to write response body")
+ }
+ return
+ }
+
+ var upstreamResp allocationsResponse
+ if err := json.NewDecoder(resp.Body).Decode(&upstreamResp); err != nil {
+ log.Error(err, "failed to decode upstream allocations response")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+ if upstreamResp.Allocations == nil {
+ upstreamResp.Allocations = make(map[string]allocationEntry)
+ }
+
+ // Look up consumer in CRD.
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) {
+ log.Error(err, "failed to look up consumer in CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ // Merge CRD bookings into the response. CRD takes precedence on collision.
+ for _, hv := range hvs.Items {
+ consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
+ if consumer == nil {
+ continue
+ }
+ rpUUID := hv.Status.HypervisorID
+ upstreamResp.Allocations[rpUUID] = allocationEntry{
+ Resources: hvToPlacementResources(consumer.Resources),
+ }
+ if upstreamResp.ConsumerGeneration == nil {
+ upstreamResp.ConsumerGeneration = consumer.ConsumerGeneration
+ }
+ if upstreamResp.ProjectID == "" {
+ upstreamResp.ProjectID = consumer.ProjectID
+ }
+ if upstreamResp.UserID == "" {
+ upstreamResp.UserID = consumer.UserID
+ }
+ }
+
+ s.writeJSON(w, http.StatusOK, upstreamResp)
+ })
+}
+
+// listAllocationsCRD retrieves allocations exclusively from Hypervisor CRDs,
+// returning an empty allocations map if the consumer has no bookings.
+func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+ ctx := r.Context()
+ log := logf.FromContext(ctx)
+
+ var hvs hv1.HypervisorList
+ if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) {
+ log.Error(err, "failed to look up consumer in CRD")
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ resp := allocationsResponse{
+ Allocations: make(map[string]allocationEntry),
+ }
+
+ for _, hv := range hvs.Items {
+ consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
+ if consumer == nil {
+ continue
+ }
+ rpUUID := hv.Status.HypervisorID
+ resp.Allocations[rpUUID] = allocationEntry{
+ Resources: hvToPlacementResources(consumer.Resources),
+ }
+ resp.ConsumerGeneration = consumer.ConsumerGeneration
+ resp.ProjectID = consumer.ProjectID
+ resp.UserID = consumer.UserID
+ }
+
+ s.writeJSON(w, http.StatusOK, resp)
+}
+
// HandleUpdateAllocations handles PUT /allocations/{consumer_uuid} requests.
//
// Creates or replaces all allocation records for a single consumer. If
@@ -154,31 +416,7 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) {
// insufficient inventory or if a concurrent update was detected.
//
// https://docs.openstack.org/api-ref/placement/#update-allocations
-func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) {
- consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
- if !ok {
- return
- }
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
- case FeatureModePassthrough:
- s.forward(w, r)
- case FeatureModeHybrid:
- s.updateAllocationsHybrid(w, r, consumerUUID)
- case FeatureModeCRD:
- s.updateAllocationsCRD(w, r, consumerUUID)
- default:
- http.Error(w, "unknown feature mode", http.StatusInternalServerError)
- }
-}
-
-// HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests.
-//
-// Removes all allocation records for the consumer across all resource
-// providers. Returns 204 No Content on success, or 404 Not Found if the
-// consumer has no existing allocations.
-//
-// https://docs.openstack.org/api-ref/placement/#delete-allocations
-func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) {
+func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) {
consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
if !ok {
return
@@ -187,18 +425,18 @@ func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid:
- s.deleteAllocationsHybrid(w, r, consumerUUID)
+ s.updateAllocationsHybrid(w, r, consumerUUID)
case FeatureModeCRD:
- s.deleteAllocationsCRD(w, r, consumerUUID)
+ s.updateAllocationsCRD(w, r, consumerUUID)
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
}
}
-// ---------------------------------------------------------------------------
-// PUT /allocations/{consumer_uuid} — hybrid and crd
-// ---------------------------------------------------------------------------
-
+// updateAllocationsHybrid splits the allocation set into KVM-managed resource
+// providers (written to Hypervisor CRDs) and non-KVM providers (forwarded to
+// upstream Placement). Non-KVM allocations are forwarded first; only if
+// upstream succeeds are KVM bookings persisted.
func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
ctx := r.Context()
log := logf.FromContext(ctx)
@@ -257,7 +495,9 @@ func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, c
}
}
w.WriteHeader(rec.statusCode)
- w.Write(rec.body.Bytes()) //nolint:errcheck
+ if _, err := w.Write(rec.body.Bytes()); err != nil {
+ log.Error(err, "failed to write response body")
+ }
return
}
}
@@ -276,6 +516,9 @@ func (s *Shim) updateAllocationsHybrid(w http.ResponseWriter, r *http.Request, c
w.WriteHeader(http.StatusNoContent)
}
+// updateAllocationsCRD handles PUT /allocations/{consumer_uuid} exclusively
+// via CRDs. All resource providers in the request must resolve to a known
+// Hypervisor CR; otherwise the request is rejected with 400.
func (s *Shim) updateAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
ctx := r.Context()
log := logf.FromContext(ctx)
@@ -323,9 +566,10 @@ func (s *Shim) writeKVMBookings(
kvmAllocs map[string]allocationEntry,
kvmHypervisors map[string]*hv1.Hypervisor,
) error {
+
log := logf.FromContext(ctx)
- hvGR := schema.GroupResource{Group: "kvm.cloud.sap", Resource: "hypervisors"}
+ hvGR := hv1.GroupVersion.WithResource("hypervisors").GroupResource()
for rpUUID, entry := range kvmAllocs {
hv := kvmHypervisors[rpUUID]
existing := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
@@ -390,64 +634,35 @@ func (s *Shim) writeKVMBookings(
return nil
}
-// ---------------------------------------------------------------------------
-// GET /allocations/{consumer_uuid} — hybrid and crd
-// ---------------------------------------------------------------------------
-
-func (s *Shim) listAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
- ctx := r.Context()
- log := logf.FromContext(ctx)
-
- s.forwardWithHook(w, r, func(w http.ResponseWriter, resp *http.Response) {
- var upstreamResp allocationsResponse
- if resp.StatusCode == http.StatusOK {
- if err := json.NewDecoder(resp.Body).Decode(&upstreamResp); err != nil {
- log.Error(err, "failed to decode upstream allocations response")
- http.Error(w, "Internal Server Error", http.StatusInternalServerError)
- return
- }
- } else {
- // If upstream returned non-200, initialize empty.
- upstreamResp.Allocations = make(map[string]allocationEntry)
- }
- if upstreamResp.Allocations == nil {
- upstreamResp.Allocations = make(map[string]allocationEntry)
- }
-
- // Look up consumer in CRD.
- var hvs hv1.HypervisorList
- if err := s.List(ctx, &hvs, client.MatchingFields{idxBookingConsumerUUID: consumerUUID}); err != nil && !apierrors.IsNotFound(err) {
- log.Error(err, "failed to look up consumer in CRD")
- http.Error(w, "Internal Server Error", http.StatusInternalServerError)
- return
- }
-
- // Merge CRD bookings into the response. CRD takes precedence on collision.
- for _, hv := range hvs.Items {
- consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
- if consumer == nil {
- continue
- }
- rpUUID := hv.Status.HypervisorID
- upstreamResp.Allocations[rpUUID] = allocationEntry{
- Resources: hvToPlacementResources(consumer.Resources),
- }
- if upstreamResp.ConsumerGeneration == nil {
- upstreamResp.ConsumerGeneration = consumer.ConsumerGeneration
- }
- if upstreamResp.ProjectID == "" {
- upstreamResp.ProjectID = consumer.ProjectID
- }
- if upstreamResp.UserID == "" {
- upstreamResp.UserID = consumer.UserID
- }
- }
-
- s.writeJSON(w, http.StatusOK, upstreamResp)
- })
+// HandleDeleteAllocations handles DELETE /allocations/{consumer_uuid} requests.
+//
+// Removes all allocation records for the consumer across all resource
+// providers. Returns 204 No Content on success, or 404 Not Found if the
+// consumer has no existing allocations.
+//
+// https://docs.openstack.org/api-ref/placement/#delete-allocations
+func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) {
+ consumerUUID, ok := requiredUUIDPathParam(w, r, "consumer_uuid")
+ if !ok {
+ return
+ }
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ case FeatureModePassthrough:
+ s.forward(w, r)
+ case FeatureModeHybrid:
+ s.deleteAllocationsHybrid(w, r, consumerUUID)
+ case FeatureModeCRD:
+ s.deleteAllocationsCRD(w, r, consumerUUID)
+ default:
+ http.Error(w, "unknown feature mode", http.StatusInternalServerError)
+ }
}
-func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
+// deleteAllocationsHybrid checks whether the consumer has bookings in
+// Hypervisor CRDs. If it does, those bookings are removed and the request is
+// also forwarded to upstream (tolerating a 404 from upstream). If the consumer
+// has no CRD bookings, the request is forwarded to upstream as-is.
+func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
ctx := r.Context()
log := logf.FromContext(ctx)
@@ -458,47 +673,19 @@ func (s *Shim) listAllocationsCRD(w http.ResponseWriter, r *http.Request, consum
return
}
- resp := allocationsResponse{
- Allocations: make(map[string]allocationEntry),
- }
-
- for _, hv := range hvs.Items {
- consumer := hv1.GetConsumer(hv.Spec.Bookings, consumerUUID)
- if consumer == nil {
- continue
- }
- rpUUID := hv.Status.HypervisorID
- resp.Allocations[rpUUID] = allocationEntry{
- Resources: hvToPlacementResources(consumer.Resources),
- }
- resp.ConsumerGeneration = consumer.ConsumerGeneration
- resp.ProjectID = consumer.ProjectID
- resp.UserID = consumer.UserID
- }
-
- s.writeJSON(w, http.StatusOK, resp)
-}
-
-// ---------------------------------------------------------------------------
-// DELETE /allocations/{consumer_uuid} — hybrid and crd
-// ---------------------------------------------------------------------------
-
-func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, consumerUUID string) {
- ctx := r.Context()
- log := logf.FromContext(ctx)
-
- // Forward to upstream first.
rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)}
s.forward(rec, r)
- // Upstream returning 404 is acceptable — the consumer may only exist in CRD.
- if rec.statusCode >= 300 && rec.statusCode != http.StatusNotFound {
+
+ if len(hvs.Items) == 0 || (rec.statusCode >= 300 && rec.statusCode != http.StatusNotFound) {
for k, vs := range rec.header {
for _, v := range vs {
w.Header().Add(k, v)
}
}
w.WriteHeader(rec.statusCode)
- w.Write(rec.body.Bytes()) //nolint:errcheck
+ if _, err := w.Write(rec.body.Bytes()); err != nil {
+ log.Error(err, "failed to write response body")
+ }
return
}
@@ -512,6 +699,8 @@ func (s *Shim) deleteAllocationsHybrid(w http.ResponseWriter, r *http.Request, c
w.WriteHeader(http.StatusNoContent)
}
+// deleteAllocationsCRD removes all bookings for the consumer exclusively from
+// Hypervisor CRDs. Returns 404 if the consumer has no existing bookings.
func (s *Shim) deleteAllocationsCRD(w http.ResponseWriter, r *http.Request, consumerUUID string) {
ctx := r.Context()
log := logf.FromContext(ctx)
@@ -569,162 +758,8 @@ func (s *Shim) removeConsumerFromCRD(ctx context.Context, consumerUUID string) e
return nil
}
-// ---------------------------------------------------------------------------
-// POST /allocations — hybrid and crd
-// ---------------------------------------------------------------------------
-
-// manageAllocationsRequest represents the batch body for POST /allocations.
-// It is keyed by consumer UUID.
-type manageAllocationsRequest map[string]allocationsRequest
-
-func (s *Shim) manageAllocationsHybrid(w http.ResponseWriter, r *http.Request) {
- ctx := r.Context()
- log := logf.FromContext(ctx)
-
- bodyBytes, err := io.ReadAll(r.Body)
- if err != nil {
- log.Error(err, "failed to read request body")
- http.Error(w, "failed to read request body", http.StatusBadRequest)
- return
- }
-
- var batch manageAllocationsRequest
- if err := json.Unmarshal(bodyBytes, &batch); err != nil {
- http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
- return
- }
-
- // Separate KVM from non-KVM across all consumers.
- type kvmWork struct {
- consumerUUID string
- req *allocationsRequest
- kvmAllocs map[string]allocationEntry
- kvmHypervisors map[string]*hv1.Hypervisor
- }
- var kvmWorkItems []kvmWork
- nonKvmBatch := make(manageAllocationsRequest)
-
- for consumerUUID, consumerReq := range batch {
- kvmA := make(map[string]allocationEntry)
- nonKvmA := make(map[string]allocationEntry)
- kvmHVs := make(map[string]*hv1.Hypervisor)
-
- for rpUUID, entry := range consumerReq.Allocations {
- var hvs hv1.HypervisorList
- if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err == nil && len(hvs.Items) == 1 {
- kvmA[rpUUID] = entry
- hv := hvs.Items[0]
- kvmHVs[rpUUID] = &hv
- } else {
- nonKvmA[rpUUID] = entry
- }
- }
-
- if len(nonKvmA) > 0 {
- nonKvmBatch[consumerUUID] = allocationsRequest{
- Allocations: nonKvmA,
- ConsumerGeneration: consumerReq.ConsumerGeneration,
- ProjectID: consumerReq.ProjectID,
- UserID: consumerReq.UserID,
- ConsumerType: consumerReq.ConsumerType,
- }
- }
- if len(kvmA) > 0 {
- cr := consumerReq
- kvmWorkItems = append(kvmWorkItems, kvmWork{
- consumerUUID: consumerUUID,
- req: &cr,
- kvmAllocs: kvmA,
- kvmHypervisors: kvmHVs,
- })
- }
- }
-
- // Forward non-KVM portion to upstream first.
- if len(nonKvmBatch) > 0 {
- upstreamBody, err := json.Marshal(nonKvmBatch)
- if err != nil {
- log.Error(err, "failed to marshal upstream batch request")
- http.Error(w, "Internal Server Error", http.StatusInternalServerError)
- return
- }
- r.Body = io.NopCloser(bytes.NewReader(upstreamBody))
- rec := &statusRecorder{ResponseWriter: w, header: make(http.Header)}
- s.forward(rec, r)
- if rec.statusCode >= 300 {
- for k, vs := range rec.header {
- for _, v := range vs {
- w.Header().Add(k, v)
- }
- }
- w.WriteHeader(rec.statusCode)
- w.Write(rec.body.Bytes()) //nolint:errcheck
- return
- }
- }
-
- // Write KVM bookings.
- for _, work := range kvmWorkItems {
- if err := s.writeKVMBookings(ctx, work.consumerUUID, work.req, work.kvmAllocs, work.kvmHypervisors); err != nil {
- log.Error(err, "failed to write KVM bookings in batch", "consumer", work.consumerUUID)
- if apierrors.IsConflict(err) {
- http.Error(w, "consumer generation conflict", http.StatusConflict)
- return
- }
- http.Error(w, "Internal Server Error", http.StatusInternalServerError)
- return
- }
- }
-
- w.WriteHeader(http.StatusNoContent)
-}
-
-func (s *Shim) manageAllocationsCRD(w http.ResponseWriter, r *http.Request) {
- ctx := r.Context()
- log := logf.FromContext(ctx)
-
- var batch manageAllocationsRequest
- if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
- http.Error(w, "invalid JSON body: "+err.Error(), http.StatusBadRequest)
- return
- }
-
- for consumerUUID, consumerReq := range batch {
- kvmAllocs := make(map[string]allocationEntry)
- kvmHypervisors := make(map[string]*hv1.Hypervisor)
-
- for rpUUID, entry := range consumerReq.Allocations {
- var hvs hv1.HypervisorList
- if err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: rpUUID}); err != nil || len(hvs.Items) != 1 {
- log.Info("resource provider not found in CRD (crd mode)", "rpUUID", rpUUID)
- http.Error(w, fmt.Sprintf("resource provider %s not found", rpUUID), http.StatusBadRequest)
- return
- }
- kvmAllocs[rpUUID] = entry
- hv := hvs.Items[0]
- kvmHypervisors[rpUUID] = &hv
- }
-
- cr := consumerReq
- if err := s.writeKVMBookings(ctx, consumerUUID, &cr, kvmAllocs, kvmHypervisors); err != nil {
- log.Error(err, "failed to write KVM bookings in batch", "consumer", consumerUUID)
- if apierrors.IsConflict(err) {
- http.Error(w, "consumer generation conflict", http.StatusConflict)
- return
- }
- http.Error(w, "Internal Server Error", http.StatusInternalServerError)
- return
- }
- }
-
- w.WriteHeader(http.StatusNoContent)
-}
-
-// ---------------------------------------------------------------------------
-// statusRecorder captures the upstream response so we can inspect status before
-// committing to write the real response.
-// ---------------------------------------------------------------------------
-
+// statusRecorder captures the upstream response so we can inspect the status
+// code before committing to write the real client response.
type statusRecorder struct {
http.ResponseWriter
statusCode int
diff --git a/internal/shim/placement/handle_allocations_e2e.go b/internal/shim/placement/handle_allocations_e2e.go
index cb8cf3244..6d0f45634 100644
--- a/internal/shim/placement/handle_allocations_e2e.go
+++ b/internal/shim/placement/handle_allocations_e2e.go
@@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"net/http"
- "time"
"github.com/cobaltcore-dev/cortex/pkg/conf"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
@@ -455,7 +454,7 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl
// 1. Test GET /allocations/{consumer_uuid} — empty (consumer not booked).
log.Info("Testing GET /allocations (empty, CRD)", "consumer", consumerUUID)
- if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ if err := e2ePollUntil(ctx, func() (bool, error) {
req, err := http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody)
if err != nil {
@@ -518,24 +517,24 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl
// 3. Test GET /allocations/{consumer_uuid} — verify booking present.
log.Info("Testing GET /allocations (after PUT, CRD)", "consumer", consumerUUID)
var getResp allocationsResponse
- if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
- req, err := http.NewRequestWithContext(ctx,
+ if err := e2ePollUntil(ctx, func() (bool, error) {
+ pollReq, err := http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody)
if err != nil {
return false, err
}
- req.Header.Set("X-Auth-Token", sc.TokenID)
- req.Header.Set("OpenStack-API-Version", apiVersion)
- req.Header.Set("Accept", "application/json")
- resp, err := sc.HTTPClient.Do(req)
+ pollReq.Header.Set("X-Auth-Token", sc.TokenID)
+ pollReq.Header.Set("OpenStack-API-Version", apiVersion)
+ pollReq.Header.Set("Accept", "application/json")
+ pollResp, err := sc.HTTPClient.Do(pollReq)
if err != nil {
return false, err
}
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return false, fmt.Errorf("GET /allocations (after PUT): expected 200, got %d", resp.StatusCode)
+ defer pollResp.Body.Close()
+ if pollResp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (after PUT): expected 200, got %d", pollResp.StatusCode)
}
- if err := json.NewDecoder(resp.Body).Decode(&getResp); err != nil {
+ if err := json.NewDecoder(pollResp.Body).Decode(&getResp); err != nil {
return false, err
}
_, ok := getResp.Allocations[kvmUUID]
@@ -621,24 +620,24 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl
// Verify second consumer's allocation.
var getResp2 allocationsResponse
- if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
- req, err := http.NewRequestWithContext(ctx,
+ if err := e2ePollUntil(ctx, func() (bool, error) {
+ pollReq, err := http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID2, http.NoBody)
if err != nil {
return false, err
}
- req.Header.Set("X-Auth-Token", sc.TokenID)
- req.Header.Set("OpenStack-API-Version", apiVersion)
- req.Header.Set("Accept", "application/json")
- resp, err := sc.HTTPClient.Do(req)
+ pollReq.Header.Set("X-Auth-Token", sc.TokenID)
+ pollReq.Header.Set("OpenStack-API-Version", apiVersion)
+ pollReq.Header.Set("Accept", "application/json")
+ pollResp, err := sc.HTTPClient.Do(pollReq)
if err != nil {
return false, err
}
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return false, fmt.Errorf("GET /allocations (consumer2): expected 200, got %d", resp.StatusCode)
+ defer pollResp.Body.Close()
+ if pollResp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (consumer2): expected 200, got %d", pollResp.StatusCode)
}
- if err := json.NewDecoder(resp.Body).Decode(&getResp2); err != nil {
+ if err := json.NewDecoder(pollResp.Body).Decode(&getResp2); err != nil {
return false, err
}
_, ok := getResp2.Allocations[kvmUUID]
@@ -670,25 +669,25 @@ func e2eCRDAllocations(ctx context.Context, sc *gophercloud.ServiceClient, cl cl
}
// 7. Verify GET after DELETE returns empty.
- if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
- req, err := http.NewRequestWithContext(ctx,
+ if err := e2ePollUntil(ctx, func() (bool, error) {
+ pollReq, err := http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/allocations/"+consumerUUID, http.NoBody)
if err != nil {
return false, err
}
- req.Header.Set("X-Auth-Token", sc.TokenID)
- req.Header.Set("OpenStack-API-Version", apiVersion)
- req.Header.Set("Accept", "application/json")
- resp, err := sc.HTTPClient.Do(req)
+ pollReq.Header.Set("X-Auth-Token", sc.TokenID)
+ pollReq.Header.Set("OpenStack-API-Version", apiVersion)
+ pollReq.Header.Set("Accept", "application/json")
+ pollResp, err := sc.HTTPClient.Do(pollReq)
if err != nil {
return false, err
}
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return false, fmt.Errorf("GET /allocations (post-delete): expected 200, got %d", resp.StatusCode)
+ defer pollResp.Body.Close()
+ if pollResp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("GET /allocations (post-delete): expected 200, got %d", pollResp.StatusCode)
}
var r allocationsResponse
- if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
+ if err := json.NewDecoder(pollResp.Body).Decode(&r); err != nil {
return false, err
}
return len(r.Allocations) == 0, nil
diff --git a/internal/shim/placement/handle_allocations_test.go b/internal/shim/placement/handle_allocations_test.go
index e32be5747..55e4bb300 100644
--- a/internal/shim/placement/handle_allocations_test.go
+++ b/internal/shim/placement/handle_allocations_test.go
@@ -99,7 +99,7 @@ const (
testRPUUID2 = "66666666-7777-8888-9999-aaaaaaaaaaaa"
)
-func testHypervisorWithBooking(name, openstackID, consumerUUID string, vcpu int64, memMB int64) *hv1.Hypervisor {
+func testHypervisorWithBooking(name, openstackID string, vcpu, memMB int64) *hv1.Hypervisor {
gen := int64(1)
return &hv1.Hypervisor{
ObjectMeta: metav1.ObjectMeta{Name: name},
@@ -107,7 +107,7 @@ func testHypervisorWithBooking(name, openstackID, consumerUUID string, vcpu int6
Spec: hv1.HypervisorSpec{
Bookings: []hv1.Booking{
{Consumer: &hv1.ConsumerBooking{
- UUID: consumerUUID,
+ UUID: testConsumerUUID,
Resources: map[hv1.ResourceName]resource.Quantity{hv1.ResourceCPU: *resource.NewQuantity(vcpu, resource.DecimalSI), hv1.ResourceMemory: *resource.NewQuantity(memMB*1024*1024, resource.BinarySI)},
ConsumerGeneration: &gen,
ProjectID: "proj-1",
@@ -160,7 +160,7 @@ func TestUpdateAllocations_CRD_NewConsumer(t *testing.T) {
}
func TestUpdateAllocations_CRD_GenerationMismatch(t *testing.T) {
- hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 2, 4096)
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 2, 4096)
s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
wrongGen := int64(99)
@@ -189,7 +189,7 @@ func TestUpdateAllocations_CRD_UnknownRP(t *testing.T) {
// ---------------------------------------------------------------------------
func TestListAllocations_CRD_Found(t *testing.T) {
- hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192)
s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
@@ -232,8 +232,8 @@ func TestListAllocations_CRD_NotFound(t *testing.T) {
}
func TestListAllocations_CRD_MultiCR(t *testing.T) {
- hv1obj := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
- hv2obj := testHypervisorWithBooking("hv-node-2", testRPUUID2, testConsumerUUID, 2, 4096)
+ hv1obj := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192)
+ hv2obj := testHypervisorWithBooking("hv-node-2", testRPUUID2, 2, 4096)
s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv1obj, hv2obj)
w := serveHandler(t, "GET", "/allocations/{consumer_uuid}",
@@ -256,7 +256,7 @@ func TestListAllocations_CRD_MultiCR(t *testing.T) {
// ---------------------------------------------------------------------------
func TestDeleteAllocations_CRD_Found(t *testing.T) {
- hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192)
s := newAllocationsTestShim(t, FeatureModeCRD, 0, "", hv)
w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
@@ -316,7 +316,7 @@ func TestUpdateAllocations_Hybrid_NonKVMOnly(t *testing.T) {
// ---------------------------------------------------------------------------
func TestListAllocations_Hybrid_Merge(t *testing.T) {
- hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192)
upstreamBody := `{"allocations":{"upstream-rp-uuid":{"resources":{"VCPU":2}}},"consumer_generation":1,"project_id":"proj-1","user_id":"user-1"}`
s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusOK, upstreamBody, hv)
@@ -346,7 +346,7 @@ func TestListAllocations_Hybrid_Merge(t *testing.T) {
// ---------------------------------------------------------------------------
func TestDeleteAllocations_Hybrid(t *testing.T) {
- hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192)
s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusNoContent, "", hv)
w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
@@ -357,7 +357,7 @@ func TestDeleteAllocations_Hybrid(t *testing.T) {
}
func TestDeleteAllocations_Hybrid_UpstreamFails(t *testing.T) {
- hv := testHypervisorWithBooking("hv-node-1", testRPUUID, testConsumerUUID, 4, 8192)
+ hv := testHypervisorWithBooking("hv-node-1", testRPUUID, 4, 8192)
s := newAllocationsTestShim(t, FeatureModeHybrid, http.StatusInternalServerError, "upstream error", hv)
w := serveHandler(t, "DELETE", "/allocations/{consumer_uuid}",
diff --git a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go
index 3f7f55424..5382d96b7 100644
--- a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go
+++ b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go
@@ -10,7 +10,6 @@ import (
"fmt"
"net/http"
"slices"
- "time"
"github.com/cobaltcore-dev/cortex/pkg/conf"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
@@ -369,7 +368,7 @@ func e2eCRDResourceProviderAggregates(ctx context.Context, sc *gophercloud.Servi
Aggregates []string `json:"aggregates"`
ResourceProviderGeneration int64 `json:"resource_provider_generation"`
}
- if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ if err := e2ePollUntil(ctx, func() (bool, error) {
req, err := http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/aggregates", http.NoBody)
if err != nil {
diff --git a/internal/shim/placement/handle_resource_provider_traits_e2e.go b/internal/shim/placement/handle_resource_provider_traits_e2e.go
index 7dc50b016..b55ac6e08 100644
--- a/internal/shim/placement/handle_resource_provider_traits_e2e.go
+++ b/internal/shim/placement/handle_resource_provider_traits_e2e.go
@@ -10,7 +10,6 @@ import (
"fmt"
"net/http"
"slices"
- "time"
"github.com/cobaltcore-dev/cortex/pkg/conf"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
@@ -388,7 +387,7 @@ func e2eCRDResourceProviderTraits(ctx context.Context, sc *gophercloud.ServiceCl
Traits []string `json:"traits"`
ResourceProviderGeneration int64 `json:"resource_provider_generation"`
}
- if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) {
+ if err := e2ePollUntil(ctx, func() (bool, error) {
req, err := http.NewRequestWithContext(ctx,
http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", http.NoBody)
if err != nil {
diff --git a/internal/shim/placement/shim_e2e.go b/internal/shim/placement/shim_e2e.go
index b678fa6e5..6aa3cc069 100644
--- a/internal/shim/placement/shim_e2e.go
+++ b/internal/shim/placement/shim_e2e.go
@@ -213,10 +213,11 @@ func RunE2E(ctx context.Context, cl client.Client) error {
return nil
}
-// e2ePollUntil retries check at short intervals until it returns true or the
-// timeout expires. Used to wait for the informer cache to pick up a CRD
-// update before asserting via the HTTP API.
-func e2ePollUntil(ctx context.Context, timeout time.Duration, check func() (bool, error)) error {
+// e2ePollUntil retries check at short intervals until it returns true or
+// 10 seconds have elapsed. Used to wait for the informer cache to pick up a
+// CRD update before asserting via the HTTP API.
+func e2ePollUntil(ctx context.Context, check func() (bool, error)) error {
+ const timeout = 10 * time.Second
deadline := time.Now().Add(timeout)
for {
ok, err := check()
From d692723d4585e6132270bb020ed02cfcdbff73cc Mon Sep 17 00:00:00 2001
From: Philipp Matthes
Date: Mon, 4 May 2026 15:09:28 +0200
Subject: [PATCH 3/3] Fix build after rebase: restore operator pre-release and
adapt to new API
The rebase onto main upgraded the operator dependency to v1.2.0, which
does not yet include spec.bookings (PR #296 is still open). Restore
the pre-release pseudo-version. Also add the third `hasBackingConfig`
argument to featureModeFromConfOrHeader calls introduced on main.
---
go.mod | 2 +-
go.sum | 4 ++--
internal/shim/placement/handle_allocations.go | 8 ++++----
3 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/go.mod b/go.mod
index 52da7fef1..48196a3cb 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/cobaltcore-dev/cortex
go 1.26.0
require (
- github.com/cobaltcore-dev/openstack-hypervisor-operator v1.2.0
+ github.com/cobaltcore-dev/openstack-hypervisor-operator v1.1.1-0.20260430095528-58fbe3ff4c3e
github.com/go-gorp/gorp v2.2.0+incompatible
github.com/gophercloud/gophercloud/v2 v2.12.0
github.com/ironcore-dev/ironcore v0.3.0
diff --git a/go.sum b/go.sum
index 638b047bd..96ccff9c8 100644
--- a/go.sum
+++ b/go.sum
@@ -20,8 +20,8 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cobaltcore-dev/openstack-hypervisor-operator v1.2.0 h1:XYVIKTC19dj4jck2uinYzTNXcoED5HNTvv+BJ75M2E0=
-github.com/cobaltcore-dev/openstack-hypervisor-operator v1.2.0/go.mod h1:iuhqhW6ozxfYWbGlEeh9rW9xyTb/EgelkDJqzJXBclk=
+github.com/cobaltcore-dev/openstack-hypervisor-operator v1.1.1-0.20260430095528-58fbe3ff4c3e h1:gegFF2HeBNzpOqQTqPAEkllLDdIHXCiGUxlKT4i5D/o=
+github.com/cobaltcore-dev/openstack-hypervisor-operator v1.1.1-0.20260430095528-58fbe3ff4c3e/go.mod h1:iuhqhW6ozxfYWbGlEeh9rW9xyTb/EgelkDJqzJXBclk=
github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4=
github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
diff --git a/internal/shim/placement/handle_allocations.go b/internal/shim/placement/handle_allocations.go
index e4b7393da..30f227d80 100644
--- a/internal/shim/placement/handle_allocations.go
+++ b/internal/shim/placement/handle_allocations.go
@@ -98,7 +98,7 @@ func hvToPlacementResources(resources map[hv1.ResourceName]resource.Quantity) ma
//
// https://docs.openstack.org/api-ref/placement/#manage-allocations
func (s *Shim) HandleManageAllocations(w http.ResponseWriter, r *http.Request) {
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid:
@@ -287,7 +287,7 @@ func (s *Shim) HandleListAllocations(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid:
@@ -421,7 +421,7 @@ func (s *Shim) HandleUpdateAllocations(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid:
@@ -646,7 +646,7 @@ func (s *Shim) HandleDeleteAllocations(w http.ResponseWriter, r *http.Request) {
if !ok {
return
}
- switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations) {
+ switch s.featureModeFromConfOrHeader(r, s.config.Features.Allocations, true) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid: