Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ local('kubectl wait --namespace cert-manager --for=condition=available deploymen

########### Dependency CRDs
# Make sure the local cluster is running if you are running into startup issues here.
url = 'https://raw.githubusercontent.com/cobaltcore-dev/openstack-hypervisor-operator/refs/heads/main/charts/openstack-hypervisor-operator/crds/kvm.cloud.sap_hypervisors.yaml'
url = 'https://raw.githubusercontent.com/cobaltcore-dev/openstack-hypervisor-operator/d35f2bc2c5d4fd634b17e7a8dd77ff3025758fbb/charts/openstack-hypervisor-operator/crds/kvm.cloud.sap_hypervisors.yaml'
local('curl -L ' + url + ' | kubectl apply -f -')

########### Cortex Manager & CRDs
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/cobaltcore-dev/cortex
go 1.26.0

require (
github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260423190401-f34871697a61
github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260429064011-d35f2bc2c5d4
github.com/go-gorp/gorp v2.2.0+incompatible
github.com/gophercloud/gophercloud/v2 v2.12.0
github.com/ironcore-dev/ironcore v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x
github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260423190401-f34871697a61 h1:I0qmFydo/Bibw0JLRypLmLnlZOx5fl4NNPaOiLKUfmU=
github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260423190401-f34871697a61/go.mod h1:fTJ5LAHj8NJ0AuQtsEX16Z1LXtCKqJfg+UhGfEnwImA=
github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260429064011-d35f2bc2c5d4 h1:Umm6n7LMDnqqZ6QIMIFxzJmuBX/Bke4uvstm+KFKcaQ=
github.com/cobaltcore-dev/openstack-hypervisor-operator v1.0.2-0.20260429064011-d35f2bc2c5d4/go.mod h1:fTJ5LAHj8NJ0AuQtsEX16Z1LXtCKqJfg+UhGfEnwImA=
github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4=
github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
Expand Down
261 changes: 245 additions & 16 deletions internal/shim/placement/handle_resource_provider_aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,271 @@
package placement

import (
"encoding/json"
"net/http"

hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// resourceProviderAggregatesResponse is the JSON body returned by
// GET /resource_providers/{uuid}/aggregates and
// PUT /resource_providers/{uuid}/aggregates (microversion 1.19+).
//
// https://docs.openstack.org/api-ref/placement/#resource-provider-aggregates
type resourceProviderAggregatesResponse struct {
Aggregates []string `json:"aggregates"`
ResourceProviderGeneration int64 `json:"resource_provider_generation"`
}

// resourceProviderAggregatesRequest is the JSON body expected by
// PUT /resource_providers/{uuid}/aggregates (microversion 1.19+).
type resourceProviderAggregatesRequest struct {
Aggregates []string `json:"aggregates"`
ResourceProviderGeneration int64 `json:"resource_provider_generation"`
}

// HandleListResourceProviderAggregates handles
// GET /resource_providers/{uuid}/aggregates requests.
//
// Returns the list of aggregate UUIDs associated with the resource provider.
// Aggregates model relationships among providers such as shared storage,
// affinity/anti-affinity groups, and availability zones. Returns an empty
// list if the provider has no aggregate associations. Available since
// microversion 1.1.
// list if the provider has no aggregate associations.
//
// Routing: the uuid is used to determine if the resource provider is a KVM
// hypervisor or vmware/ironic hypervisor. Passthrough mode forwards all
// requests to upstream placement. Hybrid mode uses the hypervisor CRD for
// KVM hypervisors and forwards for anything else. CRD-only mode rejects
// any non-KVM calls with 404.
//
// The response format changed at microversion 1.19: earlier versions return
// only a flat array of UUIDs, while 1.19+ returns an object that also
// includes the resource_provider_generation for concurrency tracking. Returns
// 404 if the provider does not exist.
// https://docs.openstack.org/api-ref/placement/#list-resource-provider-aggregates
func (s *Shim) HandleListResourceProviderAggregates(w http.ResponseWriter, r *http.Request) {
if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok {
uuid, ok := requiredUUIDPathParam(w, r, "uuid")
if !ok {
return
}
s.dispatchPassthroughOnly(w, r, s.config.Features.Aggregates)
switch s.featureModeFromConfOrHeader(r, s.config.Features.Aggregates) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid:
s.listResourceProviderAggregatesHybrid(w, r, uuid)
case FeatureModeCRD:
s.listResourceProviderAggregatesCRD(w, r, uuid)
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
}
}

// listResourceProviderAggregatesHybrid serves from the CRD if the provider is
// a KVM hypervisor, otherwise forwards to upstream placement.
func (s *Shim) listResourceProviderAggregatesHybrid(w http.ResponseWriter, r *http.Request, uuid string) {
ctx := r.Context()
log := logf.FromContext(ctx)

var hvs hv1.HypervisorList
err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid})
if err != nil || len(hvs.Items) != 1 {
log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid)
s.forward(w, r)
return
}
log.Info("resolved resource provider from CRD, serving aggregates", "uuid", uuid, "hypervisor", hvs.Items[0].Name)
s.writeAggregatesFromCRD(w, &hvs.Items[0])
}

// listResourceProviderAggregatesCRD serves exclusively from the CRD, returning
// 404 if the provider is not a known KVM hypervisor.
func (s *Shim) listResourceProviderAggregatesCRD(w http.ResponseWriter, r *http.Request, uuid string) {
ctx := r.Context()
log := logf.FromContext(ctx)

var hvs hv1.HypervisorList
err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid})
if apierrors.IsNotFound(err) || len(hvs.Items) == 0 {
log.Info("resource provider not found in kubernetes (crd mode)", "uuid", uuid)
http.Error(w, "resource provider not found", http.StatusNotFound)
return
}
if err != nil {
log.Error(err, "failed to list hypervisors with OpenStack ID index")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if len(hvs.Items) > 1 {
log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
log.Info("serving aggregates from CRD", "uuid", uuid, "hypervisor", hvs.Items[0].Name)
s.writeAggregatesFromCRD(w, &hvs.Items[0])
}

func (s *Shim) writeAggregatesFromCRD(w http.ResponseWriter, hv *hv1.Hypervisor) {
aggGroups := hv1.GetAggregates(hv.Spec.Groups)
aggregates := make([]string, 0, len(aggGroups))
for _, ag := range aggGroups {
aggregates = append(aggregates, ag.UUID)
}
s.writeJSON(w, http.StatusOK, resourceProviderAggregatesResponse{
Aggregates: aggregates,
ResourceProviderGeneration: hv.Generation,
})
}

// HandleUpdateResourceProviderAggregates handles
// PUT /resource_providers/{uuid}/aggregates requests.
//
// Replaces the complete set of aggregate associations for a resource provider.
// Any aggregate UUIDs that do not yet exist are created automatically. The
// request format changed at microversion 1.19: earlier versions accept a
// plain array of UUIDs, while 1.19+ expects an object containing an
// aggregates array and a resource_provider_generation for optimistic
// concurrency control. Returns 409 Conflict if the generation does not match
// (1.19+). Returns 200 with the updated aggregate list on success.
// The request body must include an aggregates array and a
// resource_provider_generation for optimistic concurrency control. Returns
// 409 Conflict if the generation does not match. Returns 200 with the
// updated aggregate list on success.
//
// Routing: same selective per-provider dispatch as GET.
//
// https://docs.openstack.org/api-ref/placement/#update-resource-provider-aggregates
func (s *Shim) HandleUpdateResourceProviderAggregates(w http.ResponseWriter, r *http.Request) {
if _, ok := requiredUUIDPathParam(w, r, "uuid"); !ok {
uuid, ok := requiredUUIDPathParam(w, r, "uuid")
if !ok {
return
}
switch s.featureModeFromConfOrHeader(r, s.config.Features.Aggregates) {
case FeatureModePassthrough:
s.forward(w, r)
case FeatureModeHybrid:
s.updateResourceProviderAggregatesHybrid(w, r, uuid)
case FeatureModeCRD:
s.updateResourceProviderAggregatesCRD(w, r, uuid)
default:
http.Error(w, "unknown feature mode", http.StatusInternalServerError)
}
}

// updateResourceProviderAggregatesHybrid updates aggregates via the CRD if the
// provider is a KVM hypervisor, otherwise forwards to upstream placement.
func (s *Shim) updateResourceProviderAggregatesHybrid(w http.ResponseWriter, r *http.Request, uuid string) {
ctx := r.Context()
log := logf.FromContext(ctx)

var hvs hv1.HypervisorList
err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid})
if err != nil || len(hvs.Items) != 1 {
log.Info("resource provider not resolved from kubernetes, forwarding to upstream placement", "uuid", uuid)
s.forward(w, r)
return
}
s.dispatchPassthroughOnly(w, r, s.config.Features.Aggregates)
hv := &hvs.Items[0]
log.Info("resolved resource provider from CRD, updating aggregates", "uuid", uuid, "hypervisor", hv.Name)

var req resourceProviderAggregatesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "malformed request body", http.StatusBadRequest)
return
}
if req.ResourceProviderGeneration != hv.Generation {
log.Info("generation mismatch on aggregate update",
"expected", req.ResourceProviderGeneration, "actual", hv.Generation)
http.Error(w, "resource provider generation conflict", http.StatusConflict)
return
}

var newGroups []hv1.Group
for i := range hv.Spec.Groups {
if hv.Spec.Groups[i].Aggregate == nil {
newGroups = append(newGroups, hv.Spec.Groups[i])
}
}
for _, aggUUID := range req.Aggregates {
newGroups = append(newGroups, hv1.Group{
Aggregate: &hv1.AggregateGroup{Name: aggUUID, UUID: aggUUID},
})
}
hv.Spec.Groups = newGroups

if err := s.Update(ctx, hv); err != nil {
if apierrors.IsConflict(err) {
http.Error(w, "resource provider generation conflict", http.StatusConflict)
return
}
log.Error(err, "failed to update hypervisor aggregates")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

log.Info("successfully updated aggregates via CRD", "uuid", uuid, "aggregateCount", len(req.Aggregates))
s.writeJSON(w, http.StatusOK, resourceProviderAggregatesResponse{
Aggregates: req.Aggregates,
ResourceProviderGeneration: hv.Generation,
})
}

// updateResourceProviderAggregatesCRD updates aggregates exclusively via the
// CRD, returning 404 if the provider is not a known KVM hypervisor.
func (s *Shim) updateResourceProviderAggregatesCRD(w http.ResponseWriter, r *http.Request, uuid string) {
ctx := r.Context()
log := logf.FromContext(ctx)

var hvs hv1.HypervisorList
err := s.List(ctx, &hvs, client.MatchingFields{idxHypervisorOpenStackId: uuid})
if apierrors.IsNotFound(err) || len(hvs.Items) == 0 {
log.Info("resource provider not found in kubernetes (crd mode)", "uuid", uuid)
http.Error(w, "resource provider not found", http.StatusNotFound)
return
}
if err != nil {
log.Error(err, "failed to list hypervisors with OpenStack ID index")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if len(hvs.Items) > 1 {
log.Error(nil, "multiple hypervisors found with the same OpenStack ID", "uuid", uuid)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
hv := &hvs.Items[0]
log.Info("updating aggregates via CRD", "uuid", uuid, "hypervisor", hv.Name)

var req resourceProviderAggregatesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "malformed request body", http.StatusBadRequest)
return
}
if req.ResourceProviderGeneration != hv.Generation {
log.Info("generation mismatch on aggregate update",
"expected", req.ResourceProviderGeneration, "actual", hv.Generation)
http.Error(w, "resource provider generation conflict", http.StatusConflict)
return
}

var newGroups []hv1.Group
for i := range hv.Spec.Groups {
if hv.Spec.Groups[i].Aggregate == nil {
newGroups = append(newGroups, hv.Spec.Groups[i])
}
}
for _, aggUUID := range req.Aggregates {
newGroups = append(newGroups, hv1.Group{
Aggregate: &hv1.AggregateGroup{Name: aggUUID, UUID: aggUUID},
})
}
hv.Spec.Groups = newGroups

if err := s.Update(ctx, hv); err != nil {
if apierrors.IsConflict(err) {
http.Error(w, "resource provider generation conflict", http.StatusConflict)
return
}
log.Error(err, "failed to update hypervisor aggregates")
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

log.Info("successfully updated aggregates via CRD", "uuid", uuid, "aggregateCount", len(req.Aggregates))
s.writeJSON(w, http.StatusOK, resourceProviderAggregatesResponse{
Aggregates: req.Aggregates,
ResourceProviderGeneration: hv.Generation,
})
}
Loading
Loading