diff --git a/Tiltfile b/Tiltfile index 87d8d026d..ef1ee3b02 100644 --- a/Tiltfile +++ b/Tiltfile @@ -83,7 +83,7 @@ local('kubectl wait --namespace cert-manager --for=condition=available deploymen ########### Dependency CRDs # Make sure the local cluster is running if you are running into startup issues here. -url = 'https://raw.githubusercontent.com/cobaltcore-dev/openstack-hypervisor-operator/refs/heads/main/charts/openstack-hypervisor-operator/crds/kvm.cloud.sap_hypervisors.yaml' +url = 'https://raw.githubusercontent.com/cobaltcore-dev/openstack-hypervisor-operator/d35f2bc2c5d4fd634b17e7a8dd77ff3025758fbb/charts/openstack-hypervisor-operator/crds/kvm.cloud.sap_hypervisors.yaml' local('curl -L ' + url + ' | kubectl apply -f -') ########### Cortex Manager & CRDs diff --git a/go.mod b/go.mod index a23aa4ff8..0c3b9c736 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cobaltcore-dev/cortex go 1.26.0 require ( - github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260423190401-f34871697a61 + github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260429064011-d35f2bc2c5d4 github.com/go-gorp/gorp v2.2.0+incompatible github.com/gophercloud/gophercloud/v2 v2.12.0 github.com/ironcore-dev/ironcore v0.3.0 diff --git a/go.sum b/go.sum index f26c7a92b..66a44dd4d 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260423190401-f34871697a61 h1:I0qmFydo/Bibw0JLRypLmLnlZOx5fl4NNPaOiLKUfmU= -github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260423190401-f34871697a61/go.mod h1:fTJ5LAHj8NJ0AuQtsEX16Z1LXtCKqJfg+UhGfEnwImA= +github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260429064011-d35f2bc2c5d4 h1:Umm6n7LMDnqqZ6QIMIFxzJmuBX/Bke4uvstm+KFKcaQ= +github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260429064011-d35f2bc2c5d4/go.mod h1:fTJ5LAHj8NJ0AuQtsEX16Z1LXtCKqJfg+UhGfEnwImA= github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4= github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= diff --git a/internal/shim/placement/handle_resource_provider_aggregates.go b/internal/shim/placement/handle_resource_provider_aggregates.go index 3d7205193..8d509f732 100644 --- a/internal/shim/placement/handle_resource_provider_aggregates.go +++ b/internal/shim/placement/handle_resource_provider_aggregates.go @@ -4,42 +4,271 @@ package placement import ( + "encoding/json" "net/http" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" ) +// resourceProviderAggregatesResponse is the JSON body returned by +// GET /resource_providers/{uuid}/aggregates and +// PUT /resource_providers/{uuid}/aggregates (microversion 1.19+). +// +// https://docs.openstack.org/api-ref/placement/#resource-provider-aggregates +type resourceProviderAggregatesResponse struct { + Aggregates []string `json:"aggregates"` + ResourceProviderGeneration int64 `json:"resource_provider_generation"` +} + +// resourceProviderAggregatesRequest is the JSON body expected by +// PUT /resource_providers/{uuid}/aggregates (microversion 1.19+). +type resourceProviderAggregatesRequest struct { + Aggregates []string `json:"aggregates"` + ResourceProviderGeneration int64 `json:"resource_provider_generation"` +} + // HandleListResourceProviderAggregates handles // GET /resource_providers/{uuid}/aggregates requests. // // Returns the list of aggregate UUIDs associated with the resource provider. // Aggregates model relationships among providers such as shared storage, // affinity/anti-affinity groups, and availability zones. Returns an empty -// list if the provider has no aggregate associations. Available since -// microversion 1.1. +// list if the provider has no aggregate associations. +// +// Routing: the uuid is used to determine if the resource provider is a KVM +// hypervisor or vmware/ironic hypervisor. Passthrough mode forwards all +// requests to upstream placement. Hybrid mode uses the hypervisor CRD for +// KVM hypervisors and forwards for anything else. CRD-only mode rejects +// any non-KVM calls with 404. // -// The response format changed at microversion 1.19: earlier versions return -// only a flat array of UUIDs, while 1.19+ returns an object that also -// includes the resource_provider_generation for concurrency tracking. Returns -// 404 if the provider does not exist. +// https://docs.openstack.org/api-ref/placement/#list-resource-provider-aggregates func (s *Shim) HandleListResourceProviderAggregates(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { return } - s.dispatchPassthroughOnly(w, r, s.config.Features.Aggregates) + switch s.featureModeFromConfOrHeader(r, s.config.Features.Aggregates) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.listResourceProviderAggregatesHybrid(w, r, uuid) + case FeatureModeCRD: + s.listResourceProviderAggregatesCRD(w, r, uuid) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } +} + +// listResourceProviderAggregatesHybrid serves from the CRD if the provider is +// a KVM hypervisor, otherwise forwards to upstream placement. +func (s *Shim) listResourceProviderAggregatesHybrid(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid) + s.forward(w, r) + return + } + log.Info("resolved resource provider from CRD, serving aggregates", "uuid", uuid, "hypervisor", hvs.Items[0].Name) + s.writeAggregatesFromCRD(w, &hvs.Items[0]) +} + +// listResourceProviderAggregatesCRD serves exclusively from the CRD, returning +// 404 if the provider is not a known KVM hypervisor. +func (s *Shim) listResourceProviderAggregatesCRD(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { + log.Info("resource provider not found in kubernetes (crd mode)", "uuid", uuid) + http.Error(w, "resource provider not found", http.StatusNotFound) + return + } + if err != nil { + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + log.Info("serving aggregates from CRD", "uuid", uuid, "hypervisor", hvs.Items[0].Name) + s.writeAggregatesFromCRD(w, &hvs.Items[0]) +} + +func (s *Shim) writeAggregatesFromCRD(w http.ResponseWriter, hv *hv1.Hypervisor) { + aggGroups := hv1.GetAggregates(hv.Spec.Groups) + aggregates := make([]string, 0, len(aggGroups)) + for _, ag := range aggGroups { + aggregates = append(aggregates, ag.UUID) + } + s.writeJSON(w, http.StatusOK, resourceProviderAggregatesResponse{ + Aggregates: aggregates, + ResourceProviderGeneration: hv.Generation, + }) } // HandleUpdateResourceProviderAggregates handles // PUT /resource_providers/{uuid}/aggregates requests. // // Replaces the complete set of aggregate associations for a resource provider. -// Any aggregate UUIDs that do not yet exist are created automatically. The -// request format changed at microversion 1.19: earlier versions accept a -// plain array of UUIDs, while 1.19+ expects an object containing an -// aggregates array and a resource_provider_generation for optimistic -// concurrency control. Returns 409 Conflict if the generation does not match -// (1.19+). Returns 200 with the updated aggregate list on success. +// The request body must include an aggregates array and a +// resource_provider_generation for optimistic concurrency control. Returns +// 409 Conflict if the generation does not match. Returns 200 with the +// updated aggregate list on success. +// +// Routing: same selective per-provider dispatch as GET. +// +// https://docs.openstack.org/api-ref/placement/#update-resource-provider-aggregates func (s *Shim) HandleUpdateResourceProviderAggregates(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { + return + } + switch s.featureModeFromConfOrHeader(r, s.config.Features.Aggregates) { + case FeatureModePassthrough: + s.forward(w, r) + case FeatureModeHybrid: + s.updateResourceProviderAggregatesHybrid(w, r, uuid) + case FeatureModeCRD: + s.updateResourceProviderAggregatesCRD(w, r, uuid) + default: + http.Error(w, "unknown feature mode", http.StatusInternalServerError) + } +} + +// updateResourceProviderAggregatesHybrid updates aggregates via the CRD if the +// provider is a KVM hypervisor, otherwise forwards to upstream placement. +func (s *Shim) updateResourceProviderAggregatesHybrid(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid) + s.forward(w, r) return } - s.dispatchPassthroughOnly(w, r, s.config.Features.Aggregates) + hv := &hvs.Items[0] + log.Info("resolved resource provider from CRD, updating aggregates", "uuid", uuid, "hypervisor", hv.Name) + + var req resourceProviderAggregatesRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "malformed request body", http.StatusBadRequest) + return + } + if req.ResourceProviderGeneration != hv.Generation { + log.Info("generation mismatch on aggregate update", + "expected", req.ResourceProviderGeneration, "actual", hv.Generation) + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + + var newGroups []hv1.Group + for i := range hv.Spec.Groups { + if hv.Spec.Groups[i].Aggregate == nil { + newGroups = append(newGroups, hv.Spec.Groups[i]) + } + } + for _, aggUUID := range req.Aggregates { + newGroups = append(newGroups, hv1.Group{ + Aggregate: &hv1.AggregateGroup{Name: aggUUID, UUID: aggUUID}, + }) + } + hv.Spec.Groups = newGroups + + if err := s.Update(ctx, hv); err != nil { + if apierrors.IsConflict(err) { + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + log.Error(err, "failed to update hypervisor aggregates") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + log.Info("successfully updated aggregates via CRD", "uuid", uuid, "aggregateCount", len(req.Aggregates)) + s.writeJSON(w, http.StatusOK, resourceProviderAggregatesResponse{ + Aggregates: req.Aggregates, + ResourceProviderGeneration: hv.Generation, + }) +} + +// updateResourceProviderAggregatesCRD updates aggregates exclusively via the +// CRD, returning 404 if the provider is not a known KVM hypervisor. +func (s *Shim) updateResourceProviderAggregatesCRD(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { + log.Info("resource provider not found in kubernetes (crd mode)", "uuid", uuid) + http.Error(w, "resource provider not found", http.StatusNotFound) + return + } + if err != nil { + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + hv := &hvs.Items[0] + log.Info("updating aggregates via CRD", "uuid", uuid, "hypervisor", hv.Name) + + var req resourceProviderAggregatesRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "malformed request body", http.StatusBadRequest) + return + } + if req.ResourceProviderGeneration != hv.Generation { + log.Info("generation mismatch on aggregate update", + "expected", req.ResourceProviderGeneration, "actual", hv.Generation) + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + + var newGroups []hv1.Group + for i := range hv.Spec.Groups { + if hv.Spec.Groups[i].Aggregate == nil { + newGroups = append(newGroups, hv.Spec.Groups[i]) + } + } + for _, aggUUID := range req.Aggregates { + newGroups = append(newGroups, hv1.Group{ + Aggregate: &hv1.AggregateGroup{Name: aggUUID, UUID: aggUUID}, + }) + } + hv.Spec.Groups = newGroups + + if err := s.Update(ctx, hv); err != nil { + if apierrors.IsConflict(err) { + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + log.Error(err, "failed to update hypervisor aggregates") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + log.Info("successfully updated aggregates via CRD", "uuid", uuid, "aggregateCount", len(req.Aggregates)) + s.writeJSON(w, http.StatusOK, resourceProviderAggregatesResponse{ + Aggregates: req.Aggregates, + ResourceProviderGeneration: hv.Generation, + }) } diff --git a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go index 7eb6ba089..3f7f55424 100644 --- a/internal/shim/placement/handle_resource_provider_aggregates_e2e.go +++ b/internal/shim/placement/handle_resource_provider_aggregates_e2e.go @@ -10,8 +10,12 @@ import ( "fmt" "net/http" "slices" + "time" "github.com/cobaltcore-dev/cortex/pkg/conf" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "github.com/gophercloud/gophercloud/v2" + apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -19,15 +23,11 @@ import ( // e2eTestResourceProviderAggregates tests the // /resource_providers/{uuid}/aggregates endpoints. // -// 1. Pre-cleanup: DELETE any leftover test RP (ignore 404). -// 2. POST /resource_providers — create a test RP. -// 3. GET /{uuid}/aggregates — verify aggregates are empty, store generation. -// 4. PUT /{uuid}/aggregates — associate two aggregate UUIDs with the RP. -// 5. GET /{uuid}/aggregates — verify both aggregate UUIDs are present. -// 6. PUT /{uuid}/aggregates — clear aggregates by sending an empty list. -// 7. GET /{uuid}/aggregates — verify aggregates are empty after clear. -// 8. Cleanup: DELETE the test RP (also runs via deferred cleanup on failure). -func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) error { +// In passthrough mode: exercises the upstream placement path with a +// dynamically created resource provider. +// In hybrid/crd mode: exercises the spec.groups-backed CRD path using a +// real KVM hypervisor discovered from the cluster. +func e2eTestResourceProviderAggregates(ctx context.Context, cl client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider aggregates endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() @@ -43,21 +43,26 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err } log.Info("Successfully created openstack client for resource provider aggregates e2e test") + mode := e2eCurrentMode(ctx) + switch mode { + case FeatureModePassthrough: + return e2ePassthroughResourceProviderAggregates(ctx, sc) + case FeatureModeHybrid, FeatureModeCRD: + return e2eCRDResourceProviderAggregates(ctx, sc, cl) + default: + return fmt.Errorf("unexpected mode %q", mode) + } +} + +func e2ePassthroughResourceProviderAggregates(ctx context.Context, sc *gophercloud.ServiceClient) error { + log := logf.FromContext(ctx) + const testRPUUID = "e2e10000-0000-0000-0000-000000000004" const testRPName = "cortex-e2e-test-rp-agg" const testAggUUID1 = "e2e30000-0000-0000-0000-000000000001" const testAggUUID2 = "e2e30000-0000-0000-0000-000000000002" - // Probe: for non-passthrough modes, verify endpoint returns 501. - unimplemented, err := e2eProbeUnimplemented(ctx, sc, sc.Endpoint+"/resource_providers/"+testRPUUID+"/aggregates") - if err != nil { - return fmt.Errorf("probe: %w", err) - } - if unimplemented { - return nil - } - - // Pre-cleanup: delete any leftover test resource provider from a prior run. + // Pre-cleanup: delete leftover test RP. log.Info("Pre-cleanup: deleting leftover test resource provider", "uuid", testRPUUID) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody) @@ -116,8 +121,7 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err log.Info("Successfully created test resource provider for aggregates test", "uuid", testRPUUID) - // Deferred cleanup: always delete the test RP on exit so a failed - // assertion doesn't leave the fixed UUID behind. + // Deferred cleanup. defer func() { log.Info("Deferred cleanup: deleting test resource provider", "uuid", testRPUUID) dReq, dErr := http.NewRequestWithContext(ctx, @@ -137,13 +141,11 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err log.Info("Deferred cleanup completed", "status", dResp.StatusCode) }() - // Test GET /resource_providers/{uuid}/aggregates (empty). - log.Info("Testing GET /resource_providers/{uuid}/aggregates (empty)", - "uuid", testRPUUID) + // Test GET (empty). + log.Info("Testing GET /resource_providers/{uuid}/aggregates (empty)", "uuid", testRPUUID) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/aggregates", http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP aggregates", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -151,44 +153,36 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP aggregates", "uuid", testRPUUID) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP aggregates returned an error", "uuid", testRPUUID) - return err + return fmt.Errorf("GET RP aggregates: unexpected status %d", resp.StatusCode) } var aggResp struct { Aggregates []string `json:"aggregates"` ResourceProviderGeneration int `json:"resource_provider_generation"` } - err = json.NewDecoder(resp.Body).Decode(&aggResp) - if err != nil { - log.Error(err, "failed to decode RP aggregates response", "uuid", testRPUUID) + if err := json.NewDecoder(resp.Body).Decode(&aggResp); err != nil { return err } - log.Info("Successfully retrieved empty aggregates for test resource provider", - "uuid", testRPUUID, "aggregates", len(aggResp.Aggregates), - "generation", aggResp.ResourceProviderGeneration) + if len(aggResp.Aggregates) != 0 { + return fmt.Errorf("expected 0 initial aggregates, got %d", len(aggResp.Aggregates)) + } + log.Info("Verified empty aggregates", "generation", aggResp.ResourceProviderGeneration) - // Test PUT /resource_providers/{uuid}/aggregates (set two aggregates). - log.Info("Testing PUT /resource_providers/{uuid}/aggregates to set aggregates", - "uuid", testRPUUID, "agg1", testAggUUID1, "agg2", testAggUUID2) + // Test PUT (associate aggregates). putBody, err := json.Marshal(map[string]any{ "resource_provider_generation": aggResp.ResourceProviderGeneration, "aggregates": []string{testAggUUID1, testAggUUID2}, }) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/resource_providers/"+testRPUUID+"/aggregates", bytes.NewReader(putBody)) if err != nil { - log.Error(err, "failed to create PUT request for RP aggregates", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -197,35 +191,18 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request for RP aggregates", "uuid", testRPUUID) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT RP aggregates returned an error", "uuid", testRPUUID) - return err - } - var putAggResp struct { - Aggregates []string `json:"aggregates"` - ResourceProviderGeneration int `json:"resource_provider_generation"` - } - err = json.NewDecoder(resp.Body).Decode(&putAggResp) - if err != nil { - log.Error(err, "failed to decode PUT RP aggregates response", "uuid", testRPUUID) - return err + return fmt.Errorf("PUT RP aggregates: unexpected status %d", resp.StatusCode) } - log.Info("Successfully set aggregates on test resource provider", - "uuid", testRPUUID, "aggregates", len(putAggResp.Aggregates), - "generation", putAggResp.ResourceProviderGeneration) + log.Info("Successfully associated aggregates") - // Test GET /resource_providers/{uuid}/aggregates (after PUT). - log.Info("Testing GET /resource_providers/{uuid}/aggregates (after PUT)", - "uuid", testRPUUID) + // Test GET (after PUT). req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/aggregates", http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP aggregates", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -233,47 +210,29 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP aggregates", "uuid", testRPUUID) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP aggregates returned an error", "uuid", testRPUUID) - return err - } - err = json.NewDecoder(resp.Body).Decode(&aggResp) - if err != nil { - log.Error(err, "failed to decode RP aggregates response", "uuid", testRPUUID) + if err := json.NewDecoder(resp.Body).Decode(&aggResp); err != nil { return err } - if len(aggResp.Aggregates) != 2 || - !slices.Contains(aggResp.Aggregates, testAggUUID1) || - !slices.Contains(aggResp.Aggregates, testAggUUID2) { - err := fmt.Errorf("expected aggregates %v, got %v", - []string{testAggUUID1, testAggUUID2}, aggResp.Aggregates) - log.Error(err, "aggregate mismatch", "uuid", testRPUUID) - return err + if !slices.Contains(aggResp.Aggregates, testAggUUID1) || !slices.Contains(aggResp.Aggregates, testAggUUID2) { + return fmt.Errorf("expected aggregates %v and %v, got %v", testAggUUID1, testAggUUID2, aggResp.Aggregates) } - log.Info("Successfully verified aggregates on test resource provider", - "uuid", testRPUUID, "aggregates", aggResp.Aggregates) + log.Info("Verified aggregates present after PUT") // Clear aggregates by PUT with empty list. - log.Info("Testing PUT /resource_providers/{uuid}/aggregates to clear aggregates", - "uuid", testRPUUID) putBody, err = json.Marshal(map[string]any{ "resource_provider_generation": aggResp.ResourceProviderGeneration, "aggregates": []string{}, }) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/resource_providers/"+testRPUUID+"/aggregates", bytes.NewReader(putBody)) if err != nil { - log.Error(err, "failed to create PUT request to clear RP aggregates", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -282,24 +241,18 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request to clear RP aggregates", "uuid", testRPUUID) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT to clear RP aggregates returned an error", "uuid", testRPUUID) - return err + return fmt.Errorf("PUT RP aggregates (clear): unexpected status %d", resp.StatusCode) } - log.Info("Successfully cleared aggregates on test resource provider", - "uuid", testRPUUID) + log.Info("Successfully cleared aggregates") - // Verify aggregates are empty after clear. - log.Info("Verifying aggregates are empty after clear", "uuid", testRPUUID) + // Verify empty after clear. req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/aggregates", http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP aggregates", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -307,49 +260,221 @@ func e2eTestResourceProviderAggregates(ctx context.Context, _ client.Client) err req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP aggregates", "uuid", testRPUUID) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP aggregates returned an error", "uuid", testRPUUID) + if err := json.NewDecoder(resp.Body).Decode(&aggResp); err != nil { return err } - err = json.NewDecoder(resp.Body).Decode(&aggResp) + if len(aggResp.Aggregates) != 0 { + return fmt.Errorf("expected 0 aggregates after clear, got %d", len(aggResp.Aggregates)) + } + log.Info("Verified aggregates empty after clear") + + // Cleanup. + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody) if err != nil { - log.Error(err, "failed to decode RP aggregates response", "uuid", testRPUUID) return err } - if len(aggResp.Aggregates) != 0 { - err := fmt.Errorf("expected 0 aggregates after clear, got %d", len(aggResp.Aggregates)) - log.Error(err, "aggregates not empty after clear", "uuid", testRPUUID) + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.19") + resp, err = sc.HTTPClient.Do(req) + if err != nil { + return err + } + resp.Body.Close() + + return nil +} + +// e2eCRDResourceProviderAggregates tests the CRD/hybrid path by discovering a +// real KVM hypervisor in the cluster, seeding spec.groups, and exercising +// GET/PUT through the shim. +func e2eCRDResourceProviderAggregates(ctx context.Context, sc *gophercloud.ServiceClient, cl client.Client) error { + log := logf.FromContext(ctx) + + // Discover a KVM hypervisor with a non-empty OpenStack ID. + var hvs hv1.HypervisorList + if err := cl.List(ctx, &hvs); err != nil { + log.Error(err, "failed to list hypervisors for CRD aggregates path") + return err + } + var kvmHV *hv1.Hypervisor + for i := range hvs.Items { + if hvs.Items[i].Status.HypervisorID != "" { + kvmHV = &hvs.Items[i] + break + } + } + if kvmHV == nil { + log.Info("No KVM hypervisors with OpenStack ID found, skipping CRD aggregates tests") + return nil + } + kvmUUID := kvmHV.Status.HypervisorID + log.Info("Using KVM hypervisor for CRD aggregates e2e tests", "uuid", kvmUUID, "name", kvmHV.Name) + + // Save original groups for restoration. + originalGroups := kvmHV.Spec.Groups + + // Seed spec.groups with test aggregates (preserve non-aggregate groups). + const testAgg1UUID = "e2e40000-0000-0000-0000-000000000001" + const testAgg2UUID = "e2e40000-0000-0000-0000-000000000002" + var nonAggGroups []hv1.Group + for i := range kvmHV.Spec.Groups { + if kvmHV.Spec.Groups[i].Aggregate == nil { + nonAggGroups = append(nonAggGroups, kvmHV.Spec.Groups[i]) + } + } + nonAggGroups = append(nonAggGroups, + hv1.Group{Aggregate: &hv1.AggregateGroup{Name: testAgg1UUID, UUID: testAgg1UUID}}, + hv1.Group{Aggregate: &hv1.AggregateGroup{Name: testAgg2UUID, UUID: testAgg2UUID}}, + ) + kvmHV.Spec.Groups = nonAggGroups + if err := cl.Update(ctx, kvmHV); err != nil { + return fmt.Errorf("failed to seed spec.groups with test aggregates: %w", err) + } + log.Info("Seeded spec.groups with test aggregates", "uuid", kvmUUID) + + // Always restore original groups on exit (retry on conflict). + defer func() { + log.Info("Restoring original spec.groups", "uuid", kvmUUID) + for range 5 { + if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil { + log.Error(err, "failed to refetch hypervisor for restoration") + return + } + kvmHV.Spec.Groups = originalGroups + if err := cl.Update(ctx, kvmHV); err != nil { + if apierrors.IsConflict(err) { + continue + } + log.Error(err, "failed to restore original spec.groups") + return + } + return + } + log.Error(nil, "exhausted retries restoring original spec.groups") + }() + + // Refetch to get updated generation. + if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil { + return fmt.Errorf("failed to refetch hypervisor after seed: %w", err) + } + + // Test GET — should return the seeded aggregates. + // Poll because the shim's informer cache may take a moment to observe the update. + log.Info("Testing GET /resource_providers/{uuid}/aggregates (CRD)", "uuid", kvmUUID) + var aggResp struct { + Aggregates []string `json:"aggregates"` + ResourceProviderGeneration int64 `json:"resource_provider_generation"` + } + if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/aggregates", http.NoBody) + if err != nil { + return false, err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.19") + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET CRD aggregates: expected 200, got %d", resp.StatusCode) + } + if err := json.NewDecoder(resp.Body).Decode(&aggResp); err != nil { + return false, fmt.Errorf("failed to decode CRD aggregates response: %w", err) + } + return slices.Contains(aggResp.Aggregates, testAgg1UUID) && + slices.Contains(aggResp.Aggregates, testAgg2UUID), nil + }); err != nil { + return fmt.Errorf("waiting for seeded aggregates: %w (got %v)", err, aggResp.Aggregates) + } + log.Info("Verified GET returns seeded aggregates from CRD", + "aggregates", aggResp.Aggregates, "generation", aggResp.ResourceProviderGeneration) + + // Test PUT — replace aggregates. + const replacementAggUUID = "e2e40000-0000-0000-0000-000000000099" + putBody, err := json.Marshal(map[string]any{ + "resource_provider_generation": aggResp.ResourceProviderGeneration, + "aggregates": []string{replacementAggUUID}, + }) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/resource_providers/"+kvmUUID+"/aggregates", + bytes.NewReader(putBody)) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.19") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { return err } - log.Info("Verified aggregates are empty after clear", "uuid", testRPUUID) + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("PUT CRD aggregates: expected 200, got %d", resp.StatusCode) + } + log.Info("Successfully replaced aggregates via PUT (CRD)") - // Cleanup: delete the test resource provider. - log.Info("Cleaning up test resource provider", "uuid", testRPUUID) + // Test PUT with stale generation — should return 409. + putBody, err = json.Marshal(map[string]any{ + "resource_provider_generation": aggResp.ResourceProviderGeneration, + "aggregates": []string{"stale-uuid"}, + }) + if err != nil { + return err + } req, err = http.NewRequestWithContext(ctx, - http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody) + http.MethodPut, sc.Endpoint+"/resource_providers/"+kvmUUID+"/aggregates", + bytes.NewReader(putBody)) if err != nil { - log.Error(err, "failed to create DELETE request for resource provider", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.19") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for resource provider", "uuid", testRPUUID) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE /resource_providers/{uuid} returned an error", "uuid", testRPUUID) + if resp.StatusCode != http.StatusConflict { + return fmt.Errorf("PUT CRD aggregates (stale gen): expected 409, got %d", resp.StatusCode) + } + log.Info("Verified generation conflict returns 409") + + // Test GET — verify replacement persisted. + req, err = http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/aggregates", http.NoBody) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.19") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { return err } - log.Info("Successfully deleted test resource provider", "uuid", testRPUUID) + defer resp.Body.Close() + if err := json.NewDecoder(resp.Body).Decode(&aggResp); err != nil { + return err + } + if len(aggResp.Aggregates) != 1 || aggResp.Aggregates[0] != replacementAggUUID { + return fmt.Errorf("expected [%s], got %v", replacementAggUUID, aggResp.Aggregates) + } + log.Info("Verified replacement aggregate persisted") return nil } diff --git a/internal/shim/placement/handle_resource_provider_aggregates_test.go b/internal/shim/placement/handle_resource_provider_aggregates_test.go index eb35d665c..c4e1b1b27 100644 --- a/internal/shim/placement/handle_resource_provider_aggregates_test.go +++ b/internal/shim/placement/handle_resource_provider_aggregates_test.go @@ -4,8 +4,12 @@ package placement import ( + "encoding/json" "net/http" "testing" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) func TestHandleListResourceProviderAggregates(t *testing.T) { @@ -51,59 +55,207 @@ func TestHandleUpdateResourceProviderAggregates(t *testing.T) { } func TestHandleResourceProviderAggregates_HybridMode(t *testing.T) { - down, up := newTestTimers() - s := &Shim{ - config: config{ - PlacementURL: "http://should-not-be-called:1234", - Features: featuresConfig{Aggregates: FeatureModeHybrid}, - }, - maxBodyLogSize: 4096, - downstreamRequestTimer: down, - upstreamRequestTimer: up, - } - t.Run("GET returns 501", func(t *testing.T) { + s := newTestShimWithHypervisors(t, http.StatusOK, `{"aggregates":["uuid-1"],"resource_provider_generation":1}`) + s.config.Features.Aggregates = FeatureModeHybrid + t.Run("GET forwards to upstream when provider not in CRD", func(t *testing.T) { w := serveHandler(t, "GET", "/resource_providers/{uuid}/aggregates", s.HandleListResourceProviderAggregates, "/resource_providers/"+validUUID+"/aggregates") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) } }) - t.Run("PUT returns 501", func(t *testing.T) { + t.Run("PUT forwards to upstream when provider not in CRD", func(t *testing.T) { w := serveHandler(t, "PUT", "/resource_providers/{uuid}/aggregates", s.HandleUpdateResourceProviderAggregates, "/resource_providers/"+validUUID+"/aggregates") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + }) + + t.Run("GET serves from CRD when provider is KVM", func(t *testing.T) { + hv := testHypervisorWithGroups("kvm-hybrid-agg", validUUID, []hv1.Group{ + {Aggregate: &hv1.AggregateGroup{Name: "az-west", UUID: "agg-uuid-1"}}, + }) + sKVM := newTestShimWithHypervisors(t, http.StatusOK, "{}", hv) + sKVM.config.Features.Aggregates = FeatureModeHybrid + w := serveHandler(t, "GET", "/resource_providers/{uuid}/aggregates", + sKVM.HandleListResourceProviderAggregates, + "/resource_providers/"+validUUID+"/aggregates") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + var resp resourceProviderAggregatesResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Aggregates) != 1 || resp.Aggregates[0] != "agg-uuid-1" { + t.Fatalf("expected [agg-uuid-1], got %v", resp.Aggregates) } }) } func TestHandleResourceProviderAggregates_CRDMode(t *testing.T) { - down, up := newTestTimers() - s := &Shim{ - config: config{ - PlacementURL: "http://should-not-be-called:1234", - Features: featuresConfig{Aggregates: FeatureModeCRD}, - }, - maxBodyLogSize: 4096, - downstreamRequestTimer: down, - upstreamRequestTimer: up, + groups := []hv1.Group{ + {Trait: &hv1.TraitGroup{Name: "HW_CPU_X86_AVX2"}}, + {Aggregate: &hv1.AggregateGroup{Name: "fast-storage", UUID: "agg-uuid-1"}}, + {Aggregate: &hv1.AggregateGroup{Name: "az-west", UUID: "agg-uuid-2"}}, } - t.Run("GET returns 501", func(t *testing.T) { + hv := testHypervisorWithGroups("kvm-host-1", validUUID, groups) + s := newTestShimWithHypervisors(t, http.StatusOK, "{}", hv) + s.config.Features.Aggregates = FeatureModeCRD + + t.Run("GET returns aggregate UUIDs from spec.groups", func(t *testing.T) { w := serveHandler(t, "GET", "/resource_providers/{uuid}/aggregates", s.HandleListResourceProviderAggregates, "/resource_providers/"+validUUID+"/aggregates") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + var resp resourceProviderAggregatesResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Aggregates) != 2 { + t.Fatalf("aggregates count = %d, want 2", len(resp.Aggregates)) + } + if resp.Aggregates[0] != "agg-uuid-1" { + t.Errorf("aggregates[0] = %q, want agg-uuid-1", resp.Aggregates[0]) + } + if resp.Aggregates[1] != "agg-uuid-2" { + t.Errorf("aggregates[1] = %q, want agg-uuid-2", resp.Aggregates[1]) } }) - t.Run("PUT returns 501", func(t *testing.T) { - w := serveHandler(t, "PUT", "/resource_providers/{uuid}/aggregates", + + t.Run("GET returns empty aggregates when spec.groups has no aggregates", func(t *testing.T) { + hvNoAggs := testHypervisorWithGroups("kvm-no-aggs", "b1b2b3b4-c5c6-d7d8-e9e0-f1f2f3f4f5f6", []hv1.Group{ + {Trait: &hv1.TraitGroup{Name: "CUSTOM_T"}}, + }) + s2 := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvNoAggs) + s2.config.Features.Aggregates = FeatureModeCRD + w := serveHandler(t, "GET", "/resource_providers/{uuid}/aggregates", + s2.HandleListResourceProviderAggregates, + "/resource_providers/b1b2b3b4-c5c6-d7d8-e9e0-f1f2f3f4f5f6/aggregates") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + var resp resourceProviderAggregatesResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Aggregates) != 0 { + t.Fatalf("aggregates count = %d, want 0", len(resp.Aggregates)) + } + }) + + t.Run("GET returns 404 for non-existent provider", func(t *testing.T) { + nonExistUUID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + w := serveHandler(t, "GET", "/resource_providers/{uuid}/aggregates", + s.HandleListResourceProviderAggregates, + "/resource_providers/"+nonExistUUID+"/aggregates") + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNotFound) + } + }) + + t.Run("PUT replaces aggregates in spec.groups preserving traits", func(t *testing.T) { + hvPut := testHypervisorWithGroups("kvm-put-aggs", "c1c2c3c4-d5d6-e7e8-f9f0-a1a2a3a4a5a6", []hv1.Group{ + {Aggregate: &hv1.AggregateGroup{Name: "old-agg", UUID: "old-uuid"}}, + {Trait: &hv1.TraitGroup{Name: "KEEP_TRAIT"}}, + }) + sPut := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvPut) + sPut.config.Features.Aggregates = FeatureModeCRD + + body := `{"aggregates":["new-uuid-1","new-uuid-2"],"resource_provider_generation":0}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/aggregates", + sPut.HandleUpdateResourceProviderAggregates, + "/resource_providers/c1c2c3c4-d5d6-e7e8-f9f0-a1a2a3a4a5a6/aggregates", body) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + var resp resourceProviderAggregatesResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Aggregates) != 2 { + t.Fatalf("aggregates count = %d, want 2", len(resp.Aggregates)) + } + + // Verify traits were preserved. + var updated hv1.Hypervisor + if err := sPut.Get(t.Context(), client.ObjectKeyFromObject(hvPut), &updated); err != nil { + t.Fatalf("failed to get updated hypervisor: %v", err) + } + traits := hv1.GetTraits(updated.Spec.Groups) + if len(traits) != 1 || traits[0].Name != "KEEP_TRAIT" { + t.Fatalf("traits were not preserved: got %+v", traits) + } + }) + + t.Run("PUT returns 409 on generation mismatch", func(t *testing.T) { + hvConflict := testHypervisorWithGroups("kvm-agg-conflict", "d1d2d3d4-e5e6-f7f8-a9a0-b1b2b3b4b5b6", nil) + sConflict := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvConflict) + sConflict.config.Features.Aggregates = FeatureModeCRD + + body := `{"aggregates":["u1"],"resource_provider_generation":999}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/aggregates", + sConflict.HandleUpdateResourceProviderAggregates, + "/resource_providers/d1d2d3d4-e5e6-f7f8-a9a0-b1b2b3b4b5b6/aggregates", body) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + + t.Run("PUT returns 404 for non-existent provider", func(t *testing.T) { + body := `{"aggregates":["u1"],"resource_provider_generation":0}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/aggregates", s.HandleUpdateResourceProviderAggregates, - "/resource_providers/"+validUUID+"/aggregates") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + "/resource_providers/e1e2e3e4-f5f6-a7a8-b9b0-c1c2c3c4c5c6/aggregates", body) + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNotFound) + } + }) + + t.Run("PUT with empty list removes all aggregates", func(t *testing.T) { + hvClear := testHypervisorWithGroups("kvm-clear-aggs", "e1e2e3e4-f5f6-a7a8-b9b0-c1c2c3c4c5c6", []hv1.Group{ + {Aggregate: &hv1.AggregateGroup{Name: "remove-me", UUID: "remove-uuid"}}, + {Trait: &hv1.TraitGroup{Name: "KEEP_TRAIT"}}, + }) + sClear := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvClear) + sClear.config.Features.Aggregates = FeatureModeCRD + + body := `{"aggregates":[],"resource_provider_generation":0}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/aggregates", + sClear.HandleUpdateResourceProviderAggregates, + "/resource_providers/e1e2e3e4-f5f6-a7a8-b9b0-c1c2c3c4c5c6/aggregates", body) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + var resp resourceProviderAggregatesResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Aggregates) != 0 { + t.Fatalf("expected 0 aggregates, got %d", len(resp.Aggregates)) + } + + var updated hv1.Hypervisor + if err := sClear.Get(t.Context(), client.ObjectKeyFromObject(hvClear), &updated); err != nil { + t.Fatalf("failed to get updated hypervisor: %v", err) + } + traits := hv1.GetTraits(updated.Spec.Groups) + if len(traits) != 1 || traits[0].Name != "KEEP_TRAIT" { + t.Fatalf("traits were not preserved: got %+v", traits) + } + }) + + t.Run("PUT returns 400 for malformed body", func(t *testing.T) { + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/aggregates", + s.HandleUpdateResourceProviderAggregates, + "/resource_providers/"+validUUID+"/aggregates", "not json") + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) } }) } diff --git a/internal/shim/placement/handle_resource_provider_traits.go b/internal/shim/placement/handle_resource_provider_traits.go index b23ac8e59..463edfed7 100644 --- a/internal/shim/placement/handle_resource_provider_traits.go +++ b/internal/shim/placement/handle_resource_provider_traits.go @@ -4,6 +4,7 @@ package placement import ( + "encoding/json" "net/http" hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" @@ -15,11 +16,20 @@ import ( // resourceProviderTraitsResponse is the JSON body returned by // GET /resource_providers/{uuid}/traits and // PUT /resource_providers/{uuid}/traits. +// +// https://docs.openstack.org/api-ref/placement/#resource-provider-traits type resourceProviderTraitsResponse struct { Traits []string `json:"traits"` ResourceProviderGeneration int64 `json:"resource_provider_generation"` } +// resourceProviderTraitsRequest is the JSON body expected by +// PUT /resource_providers/{uuid}/traits. +type resourceProviderTraitsRequest struct { + Traits []string `json:"traits"` + ResourceProviderGeneration int64 `json:"resource_provider_generation"` +} + // HandleListResourceProviderTraits handles // GET /resource_providers/{uuid}/traits requests. // @@ -27,6 +37,8 @@ type resourceProviderTraitsResponse struct { // by {uuid}. The response includes an array of trait name strings and the // resource_provider_generation for concurrency tracking. Returns 404 if the // provider does not exist. +// +// https://docs.openstack.org/api-ref/placement/#list-resource-provider-traits func (s *Shim) HandleListResourceProviderTraits(w http.ResponseWriter, r *http.Request) { uuid, ok := requiredUUIDPathParam(w, r, "uuid") if !ok { @@ -36,7 +48,7 @@ func (s *Shim) HandleListResourceProviderTraits(w http.ResponseWriter, r *http.R case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: - s.forward(w, r) + s.listResourceProviderTraitsHybrid(w, r, uuid) case FeatureModeCRD: s.listResourceProviderTraitsCRD(w, r, uuid) default: @@ -44,6 +56,25 @@ func (s *Shim) HandleListResourceProviderTraits(w http.ResponseWriter, r *http.R } } +// listResourceProviderTraitsHybrid serves from the CRD if the provider is a +// KVM hypervisor, otherwise forwards to upstream placement. +func (s *Shim) listResourceProviderTraitsHybrid(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid) + s.forward(w, r) + return + } + log.Info("resolved resource provider from CRD, serving traits", "uuid", uuid, "hypervisor", hvs.Items[0].Name) + s.writeTraitsFromCRD(w, &hvs.Items[0]) +} + +// listResourceProviderTraitsCRD serves exclusively from the CRD, returning 404 +// if the provider is not a known KVM hypervisor. func (s *Shim) listResourceProviderTraitsCRD(w http.ResponseWriter, r *http.Request, uuid string) { ctx := r.Context() log := logf.FromContext(ctx) @@ -65,11 +96,15 @@ func (s *Shim) listResourceProviderTraitsCRD(w http.ResponseWriter, r *http.Requ http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } + log.Info("serving traits from CRD", "uuid", uuid, "hypervisor", hvs.Items[0].Name) + s.writeTraitsFromCRD(w, &hvs.Items[0]) +} - hv := hvs.Items[0] - traits := hv.Status.Traits - if traits == nil { - traits = []string{} +func (s *Shim) writeTraitsFromCRD(w http.ResponseWriter, hv *hv1.Hypervisor) { + traitGroups := hv1.GetTraits(hv.Spec.Groups) + traits := make([]string, 0, len(traitGroups)) + for _, tg := range traitGroups { + traits = append(traits, tg.Name) } s.writeJSON(w, http.StatusOK, resourceProviderTraitsResponse{ Traits: traits, @@ -84,25 +119,152 @@ func (s *Shim) listResourceProviderTraitsCRD(w http.ResponseWriter, r *http.Requ // The request body must include a traits array and the // resource_provider_generation for optimistic concurrency control. All // previously associated traits are removed and replaced by the specified set. -// Returns 400 Bad Request if any of the specified traits are invalid (i.e. -// not returned by GET /traits). Returns 409 Conflict if the generation does -// not match. +// Returns 409 Conflict if the generation does not match. +// +// https://docs.openstack.org/api-ref/placement/#update-resource-provider-traits func (s *Shim) HandleUpdateResourceProviderTraits(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { return } switch s.featureModeFromConfOrHeader(r, s.config.Features.ResourceProviderTraits) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: - s.forward(w, r) + s.updateResourceProviderTraitsHybrid(w, r, uuid) case FeatureModeCRD: - http.Error(w, "crd mode is not yet implemented for resource provider trait writes", http.StatusNotImplemented) + s.updateResourceProviderTraitsCRD(w, r, uuid) default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) } } +// updateResourceProviderTraitsHybrid updates traits via the CRD if the +// provider is a KVM hypervisor, otherwise forwards to upstream placement. +func (s *Shim) updateResourceProviderTraitsHybrid(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid) + s.forward(w, r) + return + } + hv := &hvs.Items[0] + log.Info("resolved resource provider from CRD, updating traits", "uuid", uuid, "hypervisor", hv.Name) + + var req resourceProviderTraitsRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "malformed request body", http.StatusBadRequest) + return + } + if req.ResourceProviderGeneration != hv.Generation { + log.Info("generation mismatch on trait update", + "expected", req.ResourceProviderGeneration, "actual", hv.Generation) + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + + var newGroups []hv1.Group + for i := range hv.Spec.Groups { + if hv.Spec.Groups[i].Trait == nil { + newGroups = append(newGroups, hv.Spec.Groups[i]) + } + } + for _, name := range req.Traits { + newGroups = append(newGroups, hv1.Group{ + Trait: &hv1.TraitGroup{Name: name}, + }) + } + hv.Spec.Groups = newGroups + + if err := s.Update(ctx, hv); err != nil { + if apierrors.IsConflict(err) { + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + log.Error(err, "failed to update hypervisor traits") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + log.Info("successfully updated traits via CRD", "uuid", uuid, "traitCount", len(req.Traits)) + s.writeJSON(w, http.StatusOK, resourceProviderTraitsResponse{ + Traits: req.Traits, + ResourceProviderGeneration: hv.Generation, + }) +} + +// updateResourceProviderTraitsCRD updates traits exclusively via the CRD, +// returning 404 if the provider is not a known KVM hypervisor. +func (s *Shim) updateResourceProviderTraitsCRD(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { + log.Info("resource provider not found in kubernetes (crd mode)", "uuid", uuid) + http.Error(w, "resource provider not found", http.StatusNotFound) + return + } + if err != nil { + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + hv := &hvs.Items[0] + log.Info("updating traits via CRD", "uuid", uuid, "hypervisor", hv.Name) + + var req resourceProviderTraitsRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "malformed request body", http.StatusBadRequest) + return + } + if req.ResourceProviderGeneration != hv.Generation { + log.Info("generation mismatch on trait update", + "expected", req.ResourceProviderGeneration, "actual", hv.Generation) + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + + var newGroups []hv1.Group + for i := range hv.Spec.Groups { + if hv.Spec.Groups[i].Trait == nil { + newGroups = append(newGroups, hv.Spec.Groups[i]) + } + } + for _, name := range req.Traits { + newGroups = append(newGroups, hv1.Group{ + Trait: &hv1.TraitGroup{Name: name}, + }) + } + hv.Spec.Groups = newGroups + + if err := s.Update(ctx, hv); err != nil { + if apierrors.IsConflict(err) { + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + log.Error(err, "failed to update hypervisor traits") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + log.Info("successfully updated traits via CRD", "uuid", uuid, "traitCount", len(req.Traits)) + s.writeJSON(w, http.StatusOK, resourceProviderTraitsResponse{ + Traits: req.Traits, + ResourceProviderGeneration: hv.Generation, + }) +} + // HandleDeleteResourceProviderTraits handles // DELETE /resource_providers/{uuid}/traits requests. // @@ -112,18 +274,107 @@ func (s *Shim) HandleUpdateResourceProviderTraits(w http.ResponseWriter, r *http // for the same provider, prefer PUT with an empty traits list instead. // Returns 404 if the provider does not exist. Returns 409 Conflict on // concurrent modification. Returns 204 No Content on success. +// +// https://docs.openstack.org/api-ref/placement/#delete-resource-provider-traits func (s *Shim) HandleDeleteResourceProviderTraits(w http.ResponseWriter, r *http.Request) { - if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok { + uuid, ok := requiredUUIDPathParam(w, r, "uuid") + if !ok { return } switch s.featureModeFromConfOrHeader(r, s.config.Features.ResourceProviderTraits) { case FeatureModePassthrough: s.forward(w, r) case FeatureModeHybrid: - s.forward(w, r) + s.deleteResourceProviderTraitsHybrid(w, r, uuid) case FeatureModeCRD: - http.Error(w, "crd mode is not yet implemented for resource provider trait writes", http.StatusNotImplemented) + s.deleteResourceProviderTraitsCRD(w, r, uuid) default: http.Error(w, "unknown feature mode", http.StatusInternalServerError) } } + +// deleteResourceProviderTraitsHybrid removes all traits via the CRD if the +// provider is a KVM hypervisor, otherwise forwards to upstream placement. +func (s *Shim) deleteResourceProviderTraitsHybrid(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if err != nil || len(hvs.Items) != 1 { + log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid) + s.forward(w, r) + return + } + hv := &hvs.Items[0] + log.Info("resolved resource provider from CRD, deleting traits", "uuid", uuid, "hypervisor", hv.Name) + + var newGroups []hv1.Group + for i := range hv.Spec.Groups { + if hv.Spec.Groups[i].Trait == nil { + newGroups = append(newGroups, hv.Spec.Groups[i]) + } + } + hv.Spec.Groups = newGroups + + if err := s.Update(ctx, hv); err != nil { + if apierrors.IsConflict(err) { + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + log.Error(err, "failed to delete hypervisor traits") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + log.Info("successfully deleted all traits via CRD", "uuid", uuid) + w.WriteHeader(http.StatusNoContent) +} + +// deleteResourceProviderTraitsCRD removes all traits exclusively via the CRD, +// returning 404 if the provider is not a known KVM hypervisor. +func (s *Shim) deleteResourceProviderTraitsCRD(w http.ResponseWriter, r *http.Request, uuid string) { + ctx := r.Context() + log := logf.FromContext(ctx) + + var hvs hv1.HypervisorList + err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid}) + if apierrors.IsNotFound(err) || len(hvs.Items) == 0 { + log.Info("resource provider not found in kubernetes (crd mode)", "uuid", uuid) + http.Error(w, "resource provider not found", http.StatusNotFound) + return + } + if err != nil { + log.Error(err, "failed to list hypervisors with OpenStack ID index") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + if len(hvs.Items) > 1 { + log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + hv := &hvs.Items[0] + log.Info("deleting all traits via CRD", "uuid", uuid, "hypervisor", hv.Name) + + var newGroups []hv1.Group + for i := range hv.Spec.Groups { + if hv.Spec.Groups[i].Trait == nil { + newGroups = append(newGroups, hv.Spec.Groups[i]) + } + } + hv.Spec.Groups = newGroups + + if err := s.Update(ctx, hv); err != nil { + if apierrors.IsConflict(err) { + http.Error(w, "resource provider generation conflict", http.StatusConflict) + return + } + log.Error(err, "failed to delete hypervisor traits") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + log.Info("successfully deleted all traits via CRD", "uuid", uuid) + w.WriteHeader(http.StatusNoContent) +} diff --git a/internal/shim/placement/handle_resource_provider_traits_e2e.go b/internal/shim/placement/handle_resource_provider_traits_e2e.go index 4acd665b0..7dc50b016 100644 --- a/internal/shim/placement/handle_resource_provider_traits_e2e.go +++ b/internal/shim/placement/handle_resource_provider_traits_e2e.go @@ -9,8 +9,13 @@ import ( "encoding/json" "fmt" "net/http" + "slices" + "time" "github.com/cobaltcore-dev/cortex/pkg/conf" + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + "github.com/gophercloud/gophercloud/v2" + apierrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -18,15 +23,11 @@ import ( // e2eTestResourceProviderTraits tests the // /resource_providers/{uuid}/traits endpoints. // -// 1. Pre-cleanup: DELETE leftover RP traits, RP, and custom trait (ignore 404). -// 2. Create fixtures: PUT a custom trait, POST a test RP. -// 3. GET /{uuid}/traits — verify the trait list is empty, store generation. -// 4. PUT /{uuid}/traits — associate the custom trait with the RP. -// 5. GET /{uuid}/traits — verify the custom trait is now present. -// 6. DELETE /{uuid}/traits — disassociate all traits from the RP. -// 7. GET /{uuid}/traits — verify the trait list is empty again. -// 8. Cleanup: DELETE the test RP and custom trait. -func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { +// In passthrough mode: exercises the upstream placement path with a +// dynamically created resource provider. +// In hybrid/crd mode: exercises the spec.groups-backed CRD path using a +// real KVM hypervisor discovered from the cluster. +func e2eTestResourceProviderTraits(ctx context.Context, cl client.Client) error { log := logf.FromContext(ctx) log.Info("Running resource provider traits endpoint e2e test") config, err := conf.GetConfig[e2eRootConfig]() @@ -42,14 +43,19 @@ func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { } log.Info("Successfully created openstack client for resource provider traits e2e test") - // Resource provider trait writes (PUT/DELETE) are not yet implemented in - // crd mode, and the test RP created via POST won't exist as a Hypervisor - // CRD either, so skip the entire test in crd mode. - rpTraitsMode := e2eCurrentMode(ctx) - if rpTraitsMode == FeatureModeCRD { - log.Info("Skipping resource provider traits e2e test because mode is crd (writes not implemented)") - return nil + mode := e2eCurrentMode(ctx) + switch mode { + case FeatureModePassthrough: + return e2ePassthroughResourceProviderTraits(ctx, sc) + case FeatureModeHybrid, FeatureModeCRD: + return e2eCRDResourceProviderTraits(ctx, sc, cl) + default: + return fmt.Errorf("unexpected mode %q", mode) } +} + +func e2ePassthroughResourceProviderTraits(ctx context.Context, sc *gophercloud.ServiceClient) error { + log := logf.FromContext(ctx) const testRPUUID = "e2e10000-0000-0000-0000-000000000003" const testRPName = "cortex-e2e-test-rp-traits" @@ -145,8 +151,7 @@ func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { log.Info("Successfully created test resource provider for RP traits test", "uuid", testRPUUID) - // Deferred cleanup: always delete test fixtures on exit so a failed - // assertion doesn't leave the fixed UUID/trait behind. + // Deferred cleanup. defer func() { log.Info("Deferred cleanup: deleting test resources") for _, c := range []struct { @@ -174,12 +179,11 @@ func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { } }() - // Test GET /resource_providers/{uuid}/traits (empty). + // Test GET (empty). log.Info("Testing GET /resource_providers/{uuid}/traits (empty)", "uuid", testRPUUID) req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/traits", http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP traits", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -187,49 +191,36 @@ func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP traits", "uuid", testRPUUID) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP traits returned an error", "uuid", testRPUUID) - return err + return fmt.Errorf("GET RP traits: unexpected status %d", resp.StatusCode) } var traitsResp struct { Traits []string `json:"traits"` ResourceProviderGeneration int `json:"resource_provider_generation"` } - err = json.NewDecoder(resp.Body).Decode(&traitsResp) - if err != nil { - log.Error(err, "failed to decode RP traits response", "uuid", testRPUUID) + if err := json.NewDecoder(resp.Body).Decode(&traitsResp); err != nil { return err } if len(traitsResp.Traits) != 0 { - err := fmt.Errorf("expected 0 initial traits, got %d", len(traitsResp.Traits)) - log.Error(err, "initial traits not empty", "uuid", testRPUUID) - return err + return fmt.Errorf("expected 0 initial traits, got %d", len(traitsResp.Traits)) } - log.Info("Successfully retrieved empty traits for test resource provider", - "uuid", testRPUUID, "traits", len(traitsResp.Traits), - "generation", traitsResp.ResourceProviderGeneration) + log.Info("Verified empty traits", "generation", traitsResp.ResourceProviderGeneration) - // Test PUT /resource_providers/{uuid}/traits (associate trait). - log.Info("Testing PUT /resource_providers/{uuid}/traits to associate trait", - "uuid", testRPUUID, "trait", testTrait) + // Test PUT (associate trait). putBody, err := json.Marshal(map[string]any{ "resource_provider_generation": traitsResp.ResourceProviderGeneration, "traits": []string{testTrait}, }) if err != nil { - log.Error(err, "failed to marshal request body") return err } req, err = http.NewRequestWithContext(ctx, http.MethodPut, sc.Endpoint+"/resource_providers/"+testRPUUID+"/traits", bytes.NewReader(putBody)) if err != nil { - log.Error(err, "failed to create PUT request for RP traits", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -238,25 +229,18 @@ func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send PUT request for RP traits", "uuid", testRPUUID) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "PUT RP traits returned an error", "uuid", testRPUUID) - return err + return fmt.Errorf("PUT RP traits: unexpected status %d", resp.StatusCode) } - log.Info("Successfully associated trait with test resource provider", - "uuid", testRPUUID, "trait", testTrait) + log.Info("Successfully associated trait") - // Test GET /resource_providers/{uuid}/traits (after PUT). - log.Info("Testing GET /resource_providers/{uuid}/traits (after PUT)", - "uuid", testRPUUID) + // Test GET (after PUT). req, err = http.NewRequestWithContext(ctx, http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/traits", http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP traits", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) @@ -264,128 +248,270 @@ func e2eTestResourceProviderTraits(ctx context.Context, _ client.Client) error { req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP traits", "uuid", testRPUUID) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP traits returned an error", "uuid", testRPUUID) - return err - } - err = json.NewDecoder(resp.Body).Decode(&traitsResp) - if err != nil { - log.Error(err, "failed to decode RP traits response", "uuid", testRPUUID) + if err := json.NewDecoder(resp.Body).Decode(&traitsResp); err != nil { return err } - if len(traitsResp.Traits) != 1 || traitsResp.Traits[0] != testTrait { - err := fmt.Errorf("expected trait %s, got %v", testTrait, traitsResp.Traits) - log.Error(err, "trait mismatch", "uuid", testRPUUID) - return err + if !slices.Contains(traitsResp.Traits, testTrait) { + return fmt.Errorf("expected trait %s, got %v", testTrait, traitsResp.Traits) } - log.Info("Successfully verified trait on test resource provider", - "uuid", testRPUUID, "traits", traitsResp.Traits) + log.Info("Verified trait present after PUT") - // Test DELETE /resource_providers/{uuid}/traits. - log.Info("Testing DELETE /resource_providers/{uuid}/traits", "uuid", testRPUUID) + // Test DELETE. req, err = http.NewRequestWithContext(ctx, http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID+"/traits", http.NoBody) if err != nil { - log.Error(err, "failed to create DELETE request for RP traits", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.6") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for RP traits", "uuid", testRPUUID) return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE RP traits returned an error", "uuid", testRPUUID) + return fmt.Errorf("DELETE RP traits: unexpected status %d", resp.StatusCode) + } + log.Info("Successfully deleted traits") + + // Cleanup. + req, err = http.NewRequestWithContext(ctx, + http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.6") + resp, err = sc.HTTPClient.Do(req) + if err != nil { return err } - log.Info("Successfully deleted traits from test resource provider", "uuid", testRPUUID) + resp.Body.Close() - // Verify traits cleared. - log.Info("Verifying traits cleared on test resource provider", "uuid", testRPUUID) req, err = http.NewRequestWithContext(ctx, - http.MethodGet, sc.Endpoint+"/resource_providers/"+testRPUUID+"/traits", http.NoBody) + http.MethodDelete, sc.Endpoint+"/traits/"+testTrait, http.NoBody) if err != nil { - log.Error(err, "failed to create GET request for RP traits", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.6") - req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send GET request for RP traits", "uuid", testRPUUID) + return err + } + resp.Body.Close() + + return nil +} + +// e2eCRDResourceProviderTraits tests the CRD/hybrid path by discovering a +// real KVM hypervisor in the cluster, seeding spec.groups, and exercising +// GET/PUT/DELETE through the shim. +func e2eCRDResourceProviderTraits(ctx context.Context, sc *gophercloud.ServiceClient, cl client.Client) error { + log := logf.FromContext(ctx) + + // Discover a KVM hypervisor with a non-empty OpenStack ID. + var hvs hv1.HypervisorList + if err := cl.List(ctx, &hvs); err != nil { + log.Error(err, "failed to list hypervisors for CRD traits path") + return err + } + var kvmHV *hv1.Hypervisor + for i := range hvs.Items { + if hvs.Items[i].Status.HypervisorID != "" { + kvmHV = &hvs.Items[i] + break + } + } + if kvmHV == nil { + log.Info("No KVM hypervisors with OpenStack ID found, skipping CRD traits tests") + return nil + } + kvmUUID := kvmHV.Status.HypervisorID + log.Info("Using KVM hypervisor for CRD traits e2e tests", "uuid", kvmUUID, "name", kvmHV.Name) + + // Save original groups for restoration. + originalGroups := kvmHV.Spec.Groups + + // Seed spec.groups with test traits (preserve non-trait groups). + const testTrait1 = "CUSTOM_E2E_CRD_TRAIT_1" + const testTrait2 = "CUSTOM_E2E_CRD_TRAIT_2" + var nonTraitGroups []hv1.Group + for i := range kvmHV.Spec.Groups { + if kvmHV.Spec.Groups[i].Trait == nil { + nonTraitGroups = append(nonTraitGroups, kvmHV.Spec.Groups[i]) + } + } + nonTraitGroups = append(nonTraitGroups, + hv1.Group{Trait: &hv1.TraitGroup{Name: testTrait1}}, + hv1.Group{Trait: &hv1.TraitGroup{Name: testTrait2}}, + ) + kvmHV.Spec.Groups = nonTraitGroups + if err := cl.Update(ctx, kvmHV); err != nil { + return fmt.Errorf("failed to seed spec.groups with test traits: %w", err) + } + log.Info("Seeded spec.groups with test traits", "uuid", kvmUUID) + + // Always restore original groups on exit (retry on conflict). + defer func() { + log.Info("Restoring original spec.groups", "uuid", kvmUUID) + for range 5 { + if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil { + log.Error(err, "failed to refetch hypervisor for restoration") + return + } + kvmHV.Spec.Groups = originalGroups + if err := cl.Update(ctx, kvmHV); err != nil { + if apierrors.IsConflict(err) { + continue + } + log.Error(err, "failed to restore original spec.groups") + return + } + return + } + log.Error(nil, "exhausted retries restoring original spec.groups") + }() + + // Refetch to get updated generation. + if err := cl.Get(ctx, client.ObjectKeyFromObject(kvmHV), kvmHV); err != nil { + return fmt.Errorf("failed to refetch hypervisor after seed: %w", err) + } + + // Test GET — should return the seeded traits. + // Poll because the shim's informer cache may take a moment to observe the update. + log.Info("Testing GET /resource_providers/{uuid}/traits (CRD)", "uuid", kvmUUID) + var traitsResp struct { + Traits []string `json:"traits"` + ResourceProviderGeneration int64 `json:"resource_provider_generation"` + } + if err := e2ePollUntil(ctx, 10*time.Second, func() (bool, error) { + req, err := http.NewRequestWithContext(ctx, + http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", http.NoBody) + if err != nil { + return false, err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.6") + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("GET CRD traits: expected 200, got %d", resp.StatusCode) + } + if err := json.NewDecoder(resp.Body).Decode(&traitsResp); err != nil { + return false, fmt.Errorf("failed to decode CRD traits response: %w", err) + } + return slices.Contains(traitsResp.Traits, testTrait1) && + slices.Contains(traitsResp.Traits, testTrait2), nil + }); err != nil { + return fmt.Errorf("waiting for seeded traits: %w (got %v)", err, traitsResp.Traits) + } + log.Info("Verified GET returns seeded traits from CRD", + "traits", traitsResp.Traits, "generation", traitsResp.ResourceProviderGeneration) + + // Test PUT — replace traits. + const replacementTrait = "CUSTOM_E2E_CRD_REPLACED" + putBody, err := json.Marshal(map[string]any{ + "resource_provider_generation": traitsResp.ResourceProviderGeneration, + "traits": []string{replacementTrait}, + }) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", + bytes.NewReader(putBody)) + if err != nil { + return err + } + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.6") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err := sc.HTTPClient.Do(req) + if err != nil { return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "GET RP traits returned an error", "uuid", testRPUUID) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("PUT CRD traits: expected 200, got %d", resp.StatusCode) + } + log.Info("Successfully replaced traits via PUT (CRD)") + + // Test PUT with stale generation — should return 409. + putBody, err = json.Marshal(map[string]any{ + "resource_provider_generation": traitsResp.ResourceProviderGeneration, + "traits": []string{"STALE"}, + }) + if err != nil { return err } - err = json.NewDecoder(resp.Body).Decode(&traitsResp) + req, err = http.NewRequestWithContext(ctx, + http.MethodPut, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", + bytes.NewReader(putBody)) if err != nil { - log.Error(err, "failed to decode RP traits response", "uuid", testRPUUID) return err } - if len(traitsResp.Traits) != 0 { - err := fmt.Errorf("expected 0 traits, got %d", len(traitsResp.Traits)) - log.Error(err, "traits not cleared", "uuid", testRPUUID) + req.Header.Set("X-Auth-Token", sc.TokenID) + req.Header.Set("OpenStack-API-Version", "placement 1.6") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + resp, err = sc.HTTPClient.Do(req) + if err != nil { return err } - log.Info("Verified traits cleared on test resource provider", "uuid", testRPUUID) + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + return fmt.Errorf("PUT CRD traits (stale gen): expected 409, got %d", resp.StatusCode) + } + log.Info("Verified generation conflict returns 409") - // Cleanup: delete the test resource provider and custom trait. - log.Info("Cleaning up test resources") + // Test GET — verify replacement persisted. req, err = http.NewRequestWithContext(ctx, - http.MethodDelete, sc.Endpoint+"/resource_providers/"+testRPUUID, http.NoBody) + http.MethodGet, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", http.NoBody) if err != nil { - log.Error(err, "failed to create DELETE request for resource provider", "uuid", testRPUUID) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.6") + req.Header.Set("Accept", "application/json") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for resource provider", "uuid", testRPUUID) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE resource provider returned an error", "uuid", testRPUUID) + if err := json.NewDecoder(resp.Body).Decode(&traitsResp); err != nil { return err } - log.Info("Successfully deleted test resource provider", "uuid", testRPUUID) + if len(traitsResp.Traits) != 1 || traitsResp.Traits[0] != replacementTrait { + return fmt.Errorf("expected [%s], got %v", replacementTrait, traitsResp.Traits) + } + log.Info("Verified replacement trait persisted") + // Test DELETE — remove all traits. req, err = http.NewRequestWithContext(ctx, - http.MethodDelete, sc.Endpoint+"/traits/"+testTrait, http.NoBody) + http.MethodDelete, sc.Endpoint+"/resource_providers/"+kvmUUID+"/traits", http.NoBody) if err != nil { - log.Error(err, "failed to create DELETE request for trait", "trait", testTrait) return err } req.Header.Set("X-Auth-Token", sc.TokenID) req.Header.Set("OpenStack-API-Version", "placement 1.6") resp, err = sc.HTTPClient.Do(req) if err != nil { - log.Error(err, "failed to send DELETE request for trait", "trait", testTrait) return err } defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - err := fmt.Errorf("unexpected status code: %d", resp.StatusCode) - log.Error(err, "DELETE trait returned an error", "trait", testTrait) - return err + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("DELETE CRD traits: expected 204, got %d", resp.StatusCode) } - log.Info("Successfully deleted custom trait", "trait", testTrait) + log.Info("Verified DELETE returns 204") return nil } diff --git a/internal/shim/placement/handle_resource_provider_traits_test.go b/internal/shim/placement/handle_resource_provider_traits_test.go index 69ac8cd8c..d7e94ae0e 100644 --- a/internal/shim/placement/handle_resource_provider_traits_test.go +++ b/internal/shim/placement/handle_resource_provider_traits_test.go @@ -6,9 +6,38 @@ package placement import ( "encoding/json" "net/http" + "net/http/httptest" + "strings" "testing" + + hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) +func testHypervisorWithGroups(name, openstackID string, groups []hv1.Group) *hv1.Hypervisor { + return &hv1.Hypervisor{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: hv1.HypervisorSpec{Groups: groups}, + Status: hv1.HypervisorStatus{HypervisorID: openstackID}, + } +} + +func serveHandlerWithBody(t *testing.T, method, pattern string, handler http.HandlerFunc, reqPath, body string) *httptest.ResponseRecorder { //nolint:unparam + t.Helper() + mux := http.NewServeMux() + mux.HandleFunc(method+" "+pattern, handler) + var req *http.Request + if body != "" { + req = httptest.NewRequest(method, reqPath, strings.NewReader(body)) + } else { + req = httptest.NewRequest(method, reqPath, http.NoBody) + } + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + return w +} + func TestHandleListResourceProviderTraits(t *testing.T) { t.Run("valid uuid", func(t *testing.T) { s := newTestShim(t, http.StatusOK, "{}", nil) @@ -73,9 +102,9 @@ func TestHandleDeleteResourceProviderTraits(t *testing.T) { } func TestHandleResourceProviderTraits_HybridMode(t *testing.T) { - s := newTestShim(t, http.StatusOK, `{"traits":["CUSTOM_HW_FPGA"],"resource_provider_generation":1}`, nil) + s := newTestShimWithHypervisors(t, http.StatusOK, `{"traits":["CUSTOM_HW_FPGA"],"resource_provider_generation":1}`) s.config.Features.ResourceProviderTraits = FeatureModeHybrid - t.Run("GET forwards to upstream", func(t *testing.T) { + t.Run("GET forwards to upstream when provider not in CRD", func(t *testing.T) { w := serveHandler(t, "GET", "/resource_providers/{uuid}/traits", s.HandleListResourceProviderTraits, "/resource_providers/"+validUUID+"/traits") @@ -83,7 +112,7 @@ func TestHandleResourceProviderTraits_HybridMode(t *testing.T) { t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) } }) - t.Run("PUT forwards to upstream", func(t *testing.T) { + t.Run("PUT forwards to upstream when provider not in CRD", func(t *testing.T) { w := serveHandler(t, "PUT", "/resource_providers/{uuid}/traits", s.HandleUpdateResourceProviderTraits, "/resource_providers/"+validUUID+"/traits") @@ -92,9 +121,9 @@ func TestHandleResourceProviderTraits_HybridMode(t *testing.T) { } }) - sDel := newTestShim(t, http.StatusNoContent, "", nil) + sDel := newTestShimWithHypervisors(t, http.StatusNoContent, "") sDel.config.Features.ResourceProviderTraits = FeatureModeHybrid - t.Run("DELETE forwards to upstream", func(t *testing.T) { + t.Run("DELETE forwards to upstream when provider not in CRD", func(t *testing.T) { w := serveHandler(t, "DELETE", "/resource_providers/{uuid}/traits", sDel.HandleDeleteResourceProviderTraits, "/resource_providers/"+validUUID+"/traits") @@ -102,15 +131,41 @@ func TestHandleResourceProviderTraits_HybridMode(t *testing.T) { t.Fatalf("status = %d, want %d", w.Code, http.StatusNoContent) } }) + + t.Run("GET serves from CRD when provider is KVM", func(t *testing.T) { + hv := testHypervisorWithGroups("kvm-hybrid", validUUID, []hv1.Group{ + {Trait: &hv1.TraitGroup{Name: "CUSTOM_KVM_TRAIT"}}, + }) + sKVM := newTestShimWithHypervisors(t, http.StatusOK, "{}", hv) + sKVM.config.Features.ResourceProviderTraits = FeatureModeHybrid + w := serveHandler(t, "GET", "/resource_providers/{uuid}/traits", + sKVM.HandleListResourceProviderTraits, + "/resource_providers/"+validUUID+"/traits") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + var resp resourceProviderTraitsResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Traits) != 1 || resp.Traits[0] != "CUSTOM_KVM_TRAIT" { + t.Fatalf("expected [CUSTOM_KVM_TRAIT], got %v", resp.Traits) + } + }) } func TestHandleResourceProviderTraits_CRDMode(t *testing.T) { - hv := testHypervisorFull("kvm-host-1", validUUID, nil, []string{"CUSTOM_HW_FPGA", "HW_CPU_X86_SSE42"}, nil) - s := newTestShimWithHypervisors(t, http.StatusOK, "{}", &hv) + groups := []hv1.Group{ + {Trait: &hv1.TraitGroup{Name: "CUSTOM_HW_FPGA"}}, + {Trait: &hv1.TraitGroup{Name: "HW_CPU_X86_SSE42"}}, + {Aggregate: &hv1.AggregateGroup{Name: "az1", UUID: "agg-uuid-1"}}, + } + hv := testHypervisorWithGroups("kvm-host-1", validUUID, groups) + s := newTestShimWithHypervisors(t, http.StatusOK, "{}", hv) s.config.Features.ResourceProviderTraits = FeatureModeCRD s.config.Features.ResourceProviders = FeatureModeCRD - t.Run("GET returns traits from CRD for KVM provider", func(t *testing.T) { + t.Run("GET returns traits from spec.groups", func(t *testing.T) { w := serveHandler(t, "GET", "/resource_providers/{uuid}/traits", s.HandleListResourceProviderTraits, "/resource_providers/"+validUUID+"/traits") @@ -124,8 +179,36 @@ func TestHandleResourceProviderTraits_CRDMode(t *testing.T) { if len(resp.Traits) != 2 { t.Fatalf("traits count = %d, want 2", len(resp.Traits)) } + if resp.Traits[0] != "CUSTOM_HW_FPGA" { + t.Errorf("traits[0] = %q, want CUSTOM_HW_FPGA", resp.Traits[0]) + } + if resp.Traits[1] != "HW_CPU_X86_SSE42" { + t.Errorf("traits[1] = %q, want HW_CPU_X86_SSE42", resp.Traits[1]) + } + }) + + t.Run("GET returns empty traits when spec.groups has no traits", func(t *testing.T) { + hvNoTraits := testHypervisorWithGroups("kvm-no-traits", "b1b2b3b4-c5c6-d7d8-e9e0-f1f2f3f4f5f6", []hv1.Group{ + {Aggregate: &hv1.AggregateGroup{Name: "az1", UUID: "agg-1"}}, + }) + s2 := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvNoTraits) + s2.config.Features.ResourceProviderTraits = FeatureModeCRD + w := serveHandler(t, "GET", "/resource_providers/{uuid}/traits", + s2.HandleListResourceProviderTraits, + "/resource_providers/b1b2b3b4-c5c6-d7d8-e9e0-f1f2f3f4f5f6/traits") + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", w.Code, http.StatusOK) + } + var resp resourceProviderTraitsResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Traits) != 0 { + t.Fatalf("traits count = %d, want 0", len(resp.Traits)) + } }) - t.Run("GET returns 404 for non-KVM provider", func(t *testing.T) { + + t.Run("GET returns 404 for non-existent provider", func(t *testing.T) { nonKVMUUID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890" w := serveHandler(t, "GET", "/resource_providers/{uuid}/traits", s.HandleListResourceProviderTraits, @@ -134,20 +217,100 @@ func TestHandleResourceProviderTraits_CRDMode(t *testing.T) { t.Fatalf("status = %d, want %d", w.Code, http.StatusNotFound) } }) - t.Run("PUT returns 501", func(t *testing.T) { - w := serveHandler(t, "PUT", "/resource_providers/{uuid}/traits", + + t.Run("PUT replaces traits in spec.groups preserving aggregates", func(t *testing.T) { + hvPut := testHypervisorWithGroups("kvm-put-traits", "c1c2c3c4-d5d6-e7e8-f9f0-a1a2a3a4a5a6", []hv1.Group{ + {Trait: &hv1.TraitGroup{Name: "OLD_TRAIT"}}, + {Aggregate: &hv1.AggregateGroup{Name: "keep-me", UUID: "keep-uuid"}}, + }) + sPut := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvPut) + sPut.config.Features.ResourceProviderTraits = FeatureModeCRD + + body := `{"traits":["NEW_TRAIT_1","NEW_TRAIT_2"],"resource_provider_generation":0}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/traits", + sPut.HandleUpdateResourceProviderTraits, + "/resource_providers/c1c2c3c4-d5d6-e7e8-f9f0-a1a2a3a4a5a6/traits", body) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want %d; body: %s", w.Code, http.StatusOK, w.Body.String()) + } + var resp resourceProviderTraitsResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if len(resp.Traits) != 2 { + t.Fatalf("traits count = %d, want 2", len(resp.Traits)) + } + + // Verify aggregates were preserved by fetching the updated object. + var updated hv1.Hypervisor + if err := sPut.Get(t.Context(), client.ObjectKeyFromObject(hvPut), &updated); err != nil { + t.Fatalf("failed to get updated hypervisor: %v", err) + } + aggs := hv1.GetAggregates(updated.Spec.Groups) + if len(aggs) != 1 || aggs[0].UUID != "keep-uuid" { + t.Fatalf("aggregates were not preserved: got %+v", aggs) + } + }) + + t.Run("PUT returns 409 on generation mismatch", func(t *testing.T) { + hvConflict := testHypervisorWithGroups("kvm-conflict", "d1d2d3d4-e5e6-f7f8-a9a0-b1b2b3b4b5b6", nil) + sConflict := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvConflict) + sConflict.config.Features.ResourceProviderTraits = FeatureModeCRD + + body := `{"traits":["T1"],"resource_provider_generation":999}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/traits", + sConflict.HandleUpdateResourceProviderTraits, + "/resource_providers/d1d2d3d4-e5e6-f7f8-a9a0-b1b2b3b4b5b6/traits", body) + if w.Code != http.StatusConflict { + t.Fatalf("status = %d, want %d", w.Code, http.StatusConflict) + } + }) + + t.Run("PUT returns 404 for non-existent provider", func(t *testing.T) { + body := `{"traits":["T1"],"resource_provider_generation":0}` + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/traits", s.HandleUpdateResourceProviderTraits, - "/resource_providers/"+validUUID+"/traits") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + "/resource_providers/e1e2e3e4-f5f6-a7a8-b9b0-c1c2c3c4c5c6/traits", body) + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNotFound) } }) - t.Run("DELETE returns 501", func(t *testing.T) { + + t.Run("PUT returns 400 for malformed body", func(t *testing.T) { + w := serveHandlerWithBody(t, "PUT", "/resource_providers/{uuid}/traits", + s.HandleUpdateResourceProviderTraits, + "/resource_providers/"+validUUID+"/traits", "not json") + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want %d", w.Code, http.StatusBadRequest) + } + }) + + t.Run("DELETE removes all traits preserving aggregates", func(t *testing.T) { + hvDel := testHypervisorWithGroups("kvm-del-traits", "f1f2f3f4-a5a6-b7b8-c9c0-d1d2d3d4d5d6", []hv1.Group{ + {Trait: &hv1.TraitGroup{Name: "REMOVE_ME"}}, + {Aggregate: &hv1.AggregateGroup{Name: "stay", UUID: "stay-uuid"}}, + }) + sDel := newTestShimWithHypervisors(t, http.StatusOK, "{}", hvDel) + sDel.config.Features.ResourceProviderTraits = FeatureModeCRD + w := serveHandler(t, "DELETE", "/resource_providers/{uuid}/traits", - s.HandleDeleteResourceProviderTraits, - "/resource_providers/"+validUUID+"/traits") - if w.Code != http.StatusNotImplemented { - t.Fatalf("status = %d, want %d", w.Code, http.StatusNotImplemented) + sDel.HandleDeleteResourceProviderTraits, + "/resource_providers/f1f2f3f4-a5a6-b7b8-c9c0-d1d2d3d4d5d6/traits") + if w.Code != http.StatusNoContent { + t.Fatalf("status = %d, want %d", w.Code, http.StatusNoContent) + } + + var updated hv1.Hypervisor + if err := sDel.Get(t.Context(), client.ObjectKeyFromObject(hvDel), &updated); err != nil { + t.Fatalf("failed to get updated hypervisor: %v", err) + } + traits := hv1.GetTraits(updated.Spec.Groups) + if len(traits) != 0 { + t.Fatalf("expected no traits, got %+v", traits) + } + aggs := hv1.GetAggregates(updated.Spec.Groups) + if len(aggs) != 1 || aggs[0].UUID != "stay-uuid" { + t.Fatalf("aggregates were not preserved: got %+v", aggs) } }) } diff --git a/internal/shim/placement/handle_resource_providers_e2e.go b/internal/shim/placement/handle_resource_providers_e2e.go index 2faca2fc2..963f44533 100644 --- a/internal/shim/placement/handle_resource_providers_e2e.go +++ b/internal/shim/placement/handle_resource_providers_e2e.go @@ -61,9 +61,6 @@ func e2eTestResourceProviders(ctx context.Context, cl client.Client) error { // The VMware path creates synthetic test RPs against upstream placement. // In crd mode there is no upstream, so skip it. mode := e2eCurrentMode(ctx) - if mode == "" { - mode = config.Features.ResourceProviders.orDefault() - } if mode != FeatureModeCRD { log.Info("=== VMware path: passthrough resource provider tests ===") if err := e2eVMwareResourceProviders(ctx, sc); err != nil { diff --git a/internal/shim/placement/handle_traits_e2e.go b/internal/shim/placement/handle_traits_e2e.go index 4a5831f72..893ec3771 100644 --- a/internal/shim/placement/handle_traits_e2e.go +++ b/internal/shim/placement/handle_traits_e2e.go @@ -84,9 +84,6 @@ func e2eTestTraits(ctx context.Context, _ client.Client) error { // be empty. Only require at least one trait when forwarding to upstream // placement, which always has standard traits. traitsMode := e2eCurrentMode(ctx) - if traitsMode == "" { - traitsMode = config.Features.Traits.orDefault() - } if traitsMode == FeatureModePassthrough && len(listResp.Traits) == 0 { return errors.New("GET /traits: expected at least one trait, got 0") } diff --git a/internal/shim/placement/shim_e2e.go b/internal/shim/placement/shim_e2e.go index d839751a5..b678fa6e5 100644 --- a/internal/shim/placement/shim_e2e.go +++ b/internal/shim/placement/shim_e2e.go @@ -212,3 +212,27 @@ func RunE2E(ctx context.Context, cl client.Client) error { "took_ms", time.Since(totalStart).Milliseconds()) return nil } + +// e2ePollUntil retries check at short intervals until it returns true or the +// timeout expires. Used to wait for the informer cache to pick up a CRD +// update before asserting via the HTTP API. +func e2ePollUntil(ctx context.Context, timeout time.Duration, check func() (bool, error)) error { + deadline := time.Now().Add(timeout) + for { + ok, err := check() + if err != nil { + return err + } + if ok { + return nil + } + if time.Now().After(deadline) { + return fmt.Errorf("timed out after %s waiting for condition", timeout) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(500 * time.Millisecond): + } + } +}