From 3578c30db1f15b2839557bb00601a3999a973c02 Mon Sep 17 00:00:00 2001 From: Matt Jenkinson <75292329+mattdjenkinson@users.noreply.github.com> Date: Thu, 14 May 2026 15:45:58 +0100 Subject: [PATCH 1/2] fix: tolerate missing Lease discovery on project control planes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The connector and iroh-dns reconcilers both registered a watch on coordination.k8s.io/v1.Lease. multicluster-runtime engages every source against every cluster at runtime; if any source's REST mapper returns no-match the whole controller/cluster pair is rejected and never reconciles. In staging that's the state of 221 project control planes — they serve Lease at the raw URL but omit the group from discovery, so connector status freezes and the operator burns a 5s retry storm per affected cluster. Add an OptionalKind wrapper that probes the REST mapper at engage time and disengages the watch (shouldEngage=false) when the GVK has no match, letting the rest of the controller carry on. Other mapper errors still propagate so transient discovery failures retry. The Lease watch is wired via the builder's Watches API which always constructs an mcsource.Kind; there's no seam to inject a custom source. Build the controller, then call MultiClusterWatch directly with the optional wrapper. Without the Lease watch, reconciles still need to converge. Extend connectorLeaseReady to set RequeueAfter on every not-ready path so freshness is re-evaluated at the lease cadence, and have the iroh-dns reconcile always requeue at the lease duration for iroh-routed connectors so sibling handover still happens within ≈ one duration when the watch is degraded. Fixes #160 --- internal/controller/connector_controller.go | 41 ++++-- internal/controller/iroh_dns_controller.go | 70 +++++++-- internal/controller/source/optional.go | 106 ++++++++++++++ internal/controller/source/optional_test.go | 153 ++++++++++++++++++++ 4 files changed, 345 insertions(+), 25 deletions(-) create mode 100644 internal/controller/source/optional.go create mode 100644 internal/controller/source/optional_test.go diff --git a/internal/controller/connector_controller.go b/internal/controller/connector_controller.go index 683e99f..692f397 100644 --- a/internal/controller/connector_controller.go +++ b/internal/controller/connector_controller.go @@ -27,6 +27,7 @@ import ( networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" "go.datum.net/network-services-operator/internal/config" + nsosource "go.datum.net/network-services-operator/internal/controller/source" ) // ConnectorReconciler reconciles a Connector object @@ -185,26 +186,34 @@ type connectorLeaseStatus struct { } func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, cl client.Client, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) { + // Fallback poll interval for the not-ready paths. Used when discovery on + // this project control plane is missing coordination.k8s.io, so the Lease + // watch is no-op'd by nsosource.OptionalKind and only this RequeueAfter + // keeps the Connector converging. Matches the per-Connector lease cadence + // so a freshly-renewed Lease shows up Ready within one duration. + leaseDuration := time.Duration(r.connectorLeaseDurationSeconds()) * time.Second + pollAfter := leaseDuration + leaseJitter(leaseDuration) + if connector.Status.LeaseRef == nil || connector.Status.LeaseRef.Name == "" { - return connectorLeaseStatus{message: "Connector lease has not been created yet."}, nil + return connectorLeaseStatus{message: "Connector lease has not been created yet.", requeueAfter: &pollAfter}, nil } var lease coordinationv1.Lease if err := cl.Get(ctx, client.ObjectKey{Namespace: connector.Namespace, Name: connector.Status.LeaseRef.Name}, &lease); err != nil { if apierrors.IsNotFound(err) { - return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline."}, nil + return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline.", requeueAfter: &pollAfter}, nil } return connectorLeaseStatus{}, fmt.Errorf("failed to load connector lease: %w", err) } if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil { - return connectorLeaseStatus{message: "Connector lease has not been renewed yet."}, nil + return connectorLeaseStatus{message: "Connector lease has not been renewed yet.", requeueAfter: &pollAfter}, nil } expiryDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second expiresAt := lease.Spec.RenewTime.Add(expiryDuration) if time.Now().After(expiresAt) { - return connectorLeaseStatus{message: "Connector lease has expired. Agent may be offline."}, nil + return connectorLeaseStatus{message: "Connector lease has expired. Agent may be offline.", requeueAfter: &pollAfter}, nil } requeueAfter := time.Until(expiresAt) + leaseJitter(expiryDuration) @@ -226,7 +235,7 @@ func leaseJitter(base time.Duration) time.Duration { func (r *ConnectorReconciler) SetupWithManager(mgr mcmanager.Manager) error { r.mgr = mgr - return mcbuilder.ControllerManagedBy(mgr). + c, err := mcbuilder.ControllerManagedBy(mgr). For(&networkingv1alpha1.Connector{}). Watches( &networkingv1alpha1.ConnectorClass{}, @@ -263,10 +272,24 @@ func (r *ConnectorReconciler) SetupWithManager(mgr mcmanager.Manager) error { }) }, ). - Watches( + Named("connector"). + Build(r) + if err != nil { + return err + } + + // Lease watch is best-effort: some project control planes omit + // coordination.k8s.io from API discovery even though Lease itself is + // served (see network-services-operator#160). nsosource.OptionalKind + // keeps the per-cluster engagement alive when the GVK is missing from + // discovery; reconcile latency on those clusters falls back to the + // connectorLeaseReady RequeueAfter cadence above. The builder's + // .Watches() always wires mcsource.Kind, so we bypass it for Lease and + // call MultiClusterWatch directly with the optional wrapper. + return c.MultiClusterWatch( + nsosource.OptionalKind( &coordinationv1.Lease{}, mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()), - ). - Named("connector"). - Complete(r) + ), + ) } diff --git a/internal/controller/iroh_dns_controller.go b/internal/controller/iroh_dns_controller.go index feb11ee..31bdeeb 100644 --- a/internal/controller/iroh_dns_controller.go +++ b/internal/controller/iroh_dns_controller.go @@ -10,6 +10,7 @@ import ( "slices" "strconv" "strings" + "time" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" @@ -33,6 +34,7 @@ import ( networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" "go.datum.net/network-services-operator/internal/config" + nsosource "go.datum.net/network-services-operator/internal/controller/source" "go.datum.net/network-services-operator/internal/iroh" ) @@ -123,20 +125,39 @@ func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Reque return ctrl.Result{}, nil } + // Periodic requeue as the safety net for sibling handover on project + // control planes where Lease discovery is missing and nsosource.OptionalKind + // has disengaged the Lease watch. Owners get redundant polls; siblings get + // their only convergence signal. Bound on handover ≈ one lease duration. + result := ctrl.Result{RequeueAfter: r.requeueInterval()} + desired, ok, err := r.buildDesiredRecordSet(req.ClusterName, &connector) if err != nil { - return ctrl.Result{}, err + return result, err } if !ok { // Status not yet populated by the agent. If we previously claimed // this endpoint id, release so a sibling can take over. if err := r.releaseIfOwner(ctx, &connector); err != nil { - return ctrl.Result{}, err + return result, err } - return ctrl.Result{}, r.setPublishedCondition(ctx, cl, &connector, metav1.ConditionFalse, connectorReasonIrohPending, "Connector status does not yet carry connection details.") + return result, r.setPublishedCondition(ctx, cl, &connector, metav1.ConditionFalse, connectorReasonIrohPending, "Connector status does not yet carry connection details.") } - return ctrl.Result{}, r.applyClaim(ctx, cl, &connector, desired) + return result, r.applyClaim(ctx, cl, &connector, desired) +} + +// requeueInterval is the periodic re-reconcile cadence for connectors whose +// ConnectorClass routes to iroh. Matches the Connector lease duration so +// sibling handover converges within ≈ one duration on clusters where the +// Lease watch is unavailable. +func (r *IrohDNSReconciler) requeueInterval() time.Duration { + d := int32(30) + if r.Config.Connector.LeaseDurationSeconds > 0 { + d = r.Config.Connector.LeaseDurationSeconds + } + base := time.Duration(d) * time.Second + return base + leaseJitter(base) } // applyClaim implements the claim-then-write loop: @@ -387,13 +408,20 @@ func (r *IrohDNSReconciler) setPublishedCondition(ctx context.Context, cl cluste // - Connector (For) and ConnectorClass (Watches) — the primary multicluster // event sources. // -// - Lease (Watches with EnqueueRequestForOwner) — agent heartbeats renew the -// Connector's Lease on every interval; that update fires our reconcile -// even when the Connector itself hasn't changed. This is the load-bearing -// trigger for sibling handover: when an owner Connector is deleted and +// - Lease (WatchesRawSource via nsosource.OptionalKind) — agent heartbeats +// renew the Connector's Lease on every interval; that update fires our +// reconcile even when the Connector itself hasn't changed. When the +// project control plane advertises coordination.k8s.io in discovery this +// watch drives sibling handover: when an owner Connector is deleted and // its DNSRecordSet is GC'd, every sibling's next lease renewal drives its // reconcile, and one of them wins the Create race for the now-empty z32. -// Bound on handover ≈ leaseDurationSeconds. +// +// The watch is best-effort — see network-services-operator#160. Some +// PCPs serve Lease at the raw URL but omit it from discovery, in which +// case OptionalKind disengages this source so the rest of the controller +// still reconciles. On those clusters sibling handover is driven instead +// by the irohRequeueInterval RequeueAfter at the bottom of Reconcile. +// Either way, bound on handover ≈ leaseDurationSeconds. // // - DNSRecordSet on the downstream cluster — drift detection. Mapper // enqueues the *current owner* Connector identified by the labels on the @@ -401,7 +429,8 @@ func (r *IrohDNSReconciler) setPublishedCondition(ctx context.Context, cl cluste // record. Sibling handover does NOT flow through this watch: // multicluster-runtime's manager exposes GetCluster(name) but no // enumeration, so a downstream event can't fan out to siblings across -// project clusters. That's the Lease watch's job. +// project clusters. That's the Lease watch's (and now the periodic +// requeue's) job. func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error { r.mgr = mgr @@ -424,7 +453,7 @@ func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error { ) downstreamClusterSource, _, _ := downstreamSource.ForCluster("", r.Downstream) - return mcbuilder.ControllerManagedBy(mgr). + c, err := mcbuilder.ControllerManagedBy(mgr). For(&networkingv1alpha1.Connector{}). Watches( &networkingv1alpha1.ConnectorClass{}, @@ -455,11 +484,20 @@ func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error { }) }, ). - Watches( - &coordinationv1.Lease{}, - mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()), - ). WatchesRawSource(downstreamClusterSource). Named("iroh-dns"). - Complete(r) + Build(r) + if err != nil { + return err + } + + // Best-effort multi-cluster Lease watch — bypasses the builder so it can + // degrade gracefully on PCPs missing coordination.k8s.io in discovery. + // See connector_controller.go SetupWithManager for the rationale. + return c.MultiClusterWatch( + nsosource.OptionalKind( + &coordinationv1.Lease{}, + mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()), + ), + ) } diff --git a/internal/controller/source/optional.go b/internal/controller/source/optional.go new file mode 100644 index 0000000..d07190e --- /dev/null +++ b/internal/controller/source/optional.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +// Package source provides controller-runtime [source.Source] wrappers tailored +// for multi-cluster reconcilers running against API servers with non-uniform +// discovery. +package source + +import ( + "fmt" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + + crsource "sigs.k8s.io/controller-runtime/pkg/source" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + mcsource "sigs.k8s.io/multicluster-runtime/pkg/source" +) + +// OptionalKind wraps [mcsource.Kind] so that a per-cluster engagement does not +// fail when the watched GVK is missing from that cluster's discovery. The +// underlying multicluster controller engages each source against each cluster +// at runtime; if any source returns an error (or a cache informer for a +// missing kind fails to start), the whole controller/cluster pair is rejected +// and never reconciles. +// +// In practice this happens against Datum project control planes whose API +// servers serve coordination.k8s.io/v1.Lease at the raw URL but omit the group +// from discovery. controller-runtime's REST mapper then returns +// [apimeta.NoKindMatchError], which aborts the engagement and leaves both the +// Lease watch and every other source on that cluster offline. +// +// OptionalKind defers to [mcsource.Kind] for clusters where the GVK is +// discoverable. When [apimeta.IsNoMatchError] is true at engagement time, it +// returns shouldEngage=false so the multicluster controller skips that source +// for that cluster and proceeds with the rest of the engagement. Reconcilers +// that previously relied on this watch must compensate with a periodic +// [ctrl.Result.RequeueAfter] so the object's state still converges. +func OptionalKind( + obj client.Object, + handler mchandler.EventHandlerFunc, + predicates ...predicate.Predicate, +) mcsource.Source { + return &optionalKind{ + obj: obj, + inner: mcsource.Kind(obj, handler, predicates...), + predicates: predicates, + } +} + +type optionalKind struct { + obj client.Object + inner mcsource.SyncingSource[client.Object] + predicates []predicate.Predicate +} + +// ForCluster probes the cluster's REST mapper for the wrapped GVK. If the +// mapper has no match, the source disengages from this cluster (engagement +// succeeds, no events) and logs once at V(1). Any other mapper error is +// propagated so the manager retries — only the no-match case is treated as +// "stay offline." +func (o *optionalKind) ForCluster(name string, cl cluster.Cluster) (crsource.TypedSource[mcreconcile.Request], bool, error) { + gvk, err := gvkForObject(o.obj, cl) + if err != nil { + return nil, false, fmt.Errorf("determine GVK for optional kind: %w", err) + } + + if _, err := cl.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + if apimeta.IsNoMatchError(err) { + log.Log.WithName("optional-kind").V(1).Info( + "optional watch unavailable on cluster, degrading to no-op", + "cluster", name, + "gvk", gvk.String(), + ) + return nil, false, nil + } + return nil, false, fmt.Errorf("rest mapping for %s on cluster %q: %w", gvk, name, err) + } + + return o.inner.ForCluster(name, cl) +} + +// gvkForObject resolves the object's GVK from the cluster's scheme. The cluster +// scheme is preferred over the manager's so this works correctly for cluster +// adapters that augment the scheme on a per-cluster basis. +func gvkForObject(obj client.Object, cl cluster.Cluster) (schema.GroupVersionKind, error) { + // Honor an explicit GVK if the caller already set one (e.g. via + // PartialObjectMetadata). Otherwise resolve via the scheme. + if gvk := obj.GetObjectKind().GroupVersionKind(); gvk.Kind != "" { + return gvk, nil + } + gvks, _, err := cl.GetScheme().ObjectKinds(obj) + if err != nil { + return schema.GroupVersionKind{}, err + } + if len(gvks) == 0 { + return schema.GroupVersionKind{}, fmt.Errorf("no GVK registered for %T", obj) + } + return gvks[0], nil +} diff --git a/internal/controller/source/optional_test.go b/internal/controller/source/optional_test.go new file mode 100644 index 0000000..404d884 --- /dev/null +++ b/internal/controller/source/optional_test.go @@ -0,0 +1,153 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package source + +import ( + "context" + "errors" + "net/http" + "testing" + + coordinationv1 "k8s.io/api/coordination/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/meta/testrestmapper" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + crcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + + mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" +) + +// noopHandler is the simplest valid mchandler.EventHandlerFunc — it never +// actually wires events, which is fine because none of these tests reach the +// Start path. +func noopHandler() mchandler.EventHandlerFunc { + return mchandler.EnqueueRequestForOwner(&coordinationv1.Lease{}) +} + +// fakeCluster is a hand-written stub satisfying cluster.Cluster. It returns +// the configured scheme + REST mapper and panics on anything OptionalKind +// shouldn't touch — guarding against accidental scope creep in the wrapper. +type fakeCluster struct { + scheme *runtime.Scheme + mapper apimeta.RESTMapper +} + +func (f *fakeCluster) GetHTTPClient() *http.Client { panic("not used") } +func (f *fakeCluster) GetConfig() *rest.Config { panic("not used") } +func (f *fakeCluster) GetCache() crcache.Cache { panic("not used") } +func (f *fakeCluster) GetScheme() *runtime.Scheme { return f.scheme } +func (f *fakeCluster) GetClient() client.Client { panic("not used") } +func (f *fakeCluster) GetFieldIndexer() client.FieldIndexer { panic("not used") } +func (f *fakeCluster) GetEventRecorderFor(name string) record.EventRecorder { panic("not used") } +func (f *fakeCluster) GetRESTMapper() apimeta.RESTMapper { return f.mapper } +func (f *fakeCluster) GetAPIReader() client.Reader { panic("not used") } +func (f *fakeCluster) Start(_ context.Context) error { panic("not used") } + +var _ cluster.Cluster = (*fakeCluster)(nil) + +// noMatchMapper wraps a real mapper and rewrites RESTMapping on coordinationv1.Lease +// to return apimeta.NoKindMatchError — simulating a PCP that doesn't advertise +// coordination.k8s.io in discovery. +type noMatchMapper struct { + apimeta.RESTMapper + hideGroup string +} + +func (m *noMatchMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*apimeta.RESTMapping, error) { + if gk.Group == m.hideGroup { + return nil, &apimeta.NoKindMatchError{GroupKind: gk, SearchedVersions: versions} + } + return m.RESTMapper.RESTMapping(gk, versions...) +} + +func newScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := scheme.AddToScheme(s); err != nil { + t.Fatalf("add to scheme: %v", err) + } + return s +} + +func TestOptionalKind_NoMatchErrorDegradesGracefully(t *testing.T) { + s := newScheme(t) + mapper := &noMatchMapper{ + RESTMapper: testrestmapper.TestOnlyStaticRESTMapper(s), + hideGroup: coordinationv1.SchemeGroupVersion.Group, + } + cl := &fakeCluster{scheme: s, mapper: mapper} + + src := OptionalKind(&coordinationv1.Lease{}, noopHandler()) + + got, shouldEngage, err := src.ForCluster("cluster-a", cl) + if err != nil { + t.Fatalf("ForCluster returned error: %v", err) + } + if shouldEngage { + t.Errorf("shouldEngage=true, want false when REST mapper has no match") + } + if got != nil { + t.Errorf("source = %v, want nil when REST mapper has no match", got) + } +} + +func TestOptionalKind_HealthyMappingDelegates(t *testing.T) { + s := newScheme(t) + cl := &fakeCluster{ + scheme: s, + mapper: testrestmapper.TestOnlyStaticRESTMapper(s), + } + + src := OptionalKind(&coordinationv1.Lease{}, noopHandler()) + + got, shouldEngage, err := src.ForCluster("cluster-a", cl) + if err != nil { + t.Fatalf("ForCluster returned error: %v", err) + } + if !shouldEngage { + t.Errorf("shouldEngage=false, want true when REST mapper is healthy") + } + if got == nil { + t.Errorf("source = nil, want non-nil delegate when REST mapper is healthy") + } +} + +// otherMapperError is a generic mapper error that is NOT NoKindMatchError, used +// to verify OptionalKind propagates it instead of silently no-op'ing. +type otherErrorMapper struct { + apimeta.RESTMapper + err error +} + +func (m *otherErrorMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*apimeta.RESTMapping, error) { + return nil, m.err +} + +func TestOptionalKind_NonMatchErrorsPropagate(t *testing.T) { + s := newScheme(t) + boom := errors.New("transient discovery failure") + cl := &fakeCluster{ + scheme: s, + mapper: &otherErrorMapper{ + RESTMapper: testrestmapper.TestOnlyStaticRESTMapper(s), + err: boom, + }, + } + + src := OptionalKind(&coordinationv1.Lease{}, noopHandler()) + + _, _, err := src.ForCluster("cluster-a", cl) + if err == nil { + t.Fatalf("expected error to propagate, got nil") + } + if !errors.Is(err, boom) { + t.Errorf("error %v does not wrap %v", err, boom) + } +} From 2bedb7af8871742498e34e442867abbfb89ef9d8 Mon Sep 17 00:00:00 2001 From: Matt Jenkinson <75292329+mattdjenkinson@users.noreply.github.com> Date: Thu, 14 May 2026 16:41:50 +0100 Subject: [PATCH 2/2] fix: bypass REST mapper for connector Lease access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The optional Lease watch in 3578c30 stops engagement failing on project control planes that omit coordination.k8s.io from API discovery, but the connector reconciler body still hit the same RESTMapper through cl.Get(&coordinationv1.Lease{}, …) and controllerutil.CreateOrUpdate. On every reconcile against an affected cluster, both paths returned no matches for kind "Lease" in version "coordination.k8s.io/v1" which surfaced as Reconciler errors and prevented Connector.Ready from ever transitioning to True even though the agent was renewing the lease at the raw URL. Route Lease access through k8s.io/client-go/kubernetes/typed/coordination/v1. The typed client encodes its own group/version so it does not depend on the cluster's REST mapper. Production lazy-builds one from the cluster's REST config; tests inject the typed fake clientset via a new LeaseClient hook on the reconciler. ensureConnectorLease replaces controllerutil.CreateOrUpdate for the same reason — preserves the no-op-when-unchanged behaviour via DeepEqual before issuing an Update so the reconcile cadence doesn't turn into a write loop. --- internal/controller/connector_controller.go | 110 ++++++++++++++---- .../controller/connector_controllers_test.go | 23 +++- 2 files changed, 108 insertions(+), 25 deletions(-) diff --git a/internal/controller/connector_controller.go b/internal/controller/connector_controller.go index 692f397..ec37382 100644 --- a/internal/controller/connector_controller.go +++ b/internal/controller/connector_controller.go @@ -14,6 +14,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -34,6 +35,26 @@ import ( type ConnectorReconciler struct { mgr mcmanager.Manager Config config.NetworkServicesOperator + + // LeaseClient builds a typed Lease client for a given project cluster. + // Production leaves this nil; the reconciler falls back to + // coordinationv1client.NewForConfig(cl.GetConfig()). The typed client + // encodes its own group/version, so it does not depend on the cluster's + // REST mapper — that's the whole point of routing Lease access through + // it rather than the controller-runtime client, which would fail with + // `no matches for kind "Lease"` against project control planes that + // omit coordination.k8s.io from discovery (see + // network-services-operator#160). Tests inject a fake clientset's + // CoordinationV1() so they don't need a real REST config. + LeaseClient func(cluster.Cluster) (coordinationv1client.LeasesGetter, error) +} + +// leases returns the typed Lease client for the supplied cluster. +func (r *ConnectorReconciler) leases(cl cluster.Cluster) (coordinationv1client.LeasesGetter, error) { + if r.LeaseClient != nil { + return r.LeaseClient(cl) + } + return coordinationv1client.NewForConfig(cl.GetConfig()) } // +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors,verbs=get;list;watch;create;update;patch;delete @@ -120,31 +141,21 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req return ctrl.Result{}, nil } + leases, err := r.leases(cl) + if err != nil { + return ctrl.Result{}, fmt.Errorf("build lease client: %w", err) + } + leaseDurationSeconds := r.connectorLeaseDurationSeconds() if connector.Status.LeaseRef == nil || connector.Status.LeaseRef.Name == "" { - lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: connector.Name, - Namespace: connector.Namespace, - }, - } - - if _, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), lease, func() error { - if err := controllerutil.SetControllerReference(&connector, lease, cl.GetScheme()); err != nil { - return err - } - if lease.Spec.LeaseDurationSeconds == nil || *lease.Spec.LeaseDurationSeconds == 0 { - lease.Spec.LeaseDurationSeconds = &leaseDurationSeconds - } - return nil - }); err != nil { + lease, err := r.ensureConnectorLease(ctx, cl, leases, &connector, leaseDurationSeconds) + if err != nil { return ctrl.Result{}, err } - connector.Status.LeaseRef = &corev1.LocalObjectReference{Name: lease.Name} } - leaseStatus, err := r.connectorLeaseReady(ctx, cl.GetClient(), &connector) + leaseStatus, err := r.connectorLeaseReady(ctx, leases, &connector) if err != nil { return ctrl.Result{}, err } @@ -185,7 +196,64 @@ type connectorLeaseStatus struct { message string } -func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, cl client.Client, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) { +// ensureConnectorLease creates the per-Connector Lease if absent, or refreshes +// the owner reference / lease duration on an existing one. Goes through the +// typed Lease client to avoid the controller-runtime client's REST mapper, +// which has no mapping for coordination.k8s.io on project control planes that +// hide the group from discovery. +func (r *ConnectorReconciler) ensureConnectorLease( + ctx context.Context, + cl cluster.Cluster, + leases coordinationv1client.LeasesGetter, + connector *networkingv1alpha1.Connector, + leaseDurationSeconds int32, +) (*coordinationv1.Lease, error) { + existing, err := leases.Leases(connector.Namespace).Get(ctx, connector.Name, metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(err): + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: connector.Name, + Namespace: connector.Namespace, + }, + Spec: coordinationv1.LeaseSpec{ + LeaseDurationSeconds: &leaseDurationSeconds, + }, + } + if err := controllerutil.SetControllerReference(connector, lease, cl.GetScheme()); err != nil { + return nil, fmt.Errorf("set owner ref on connector lease: %w", err) + } + created, err := leases.Leases(connector.Namespace).Create(ctx, lease, metav1.CreateOptions{}) + if apierrors.IsAlreadyExists(err) { + // A racing reconcile (or the agent) created it; re-fetch. + return leases.Leases(connector.Namespace).Get(ctx, connector.Name, metav1.GetOptions{}) + } + if err != nil { + return nil, fmt.Errorf("create connector lease: %w", err) + } + return created, nil + case err != nil: + return nil, fmt.Errorf("load connector lease: %w", err) + } + + mutated := existing.DeepCopy() + if err := controllerutil.SetControllerReference(connector, mutated, cl.GetScheme()); err != nil { + return nil, fmt.Errorf("set owner ref on connector lease: %w", err) + } + if mutated.Spec.LeaseDurationSeconds == nil || *mutated.Spec.LeaseDurationSeconds == 0 { + mutated.Spec.LeaseDurationSeconds = &leaseDurationSeconds + } + if equality.Semantic.DeepEqual(existing, mutated) { + return existing, nil + } + updated, err := leases.Leases(connector.Namespace).Update(ctx, mutated, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("update connector lease: %w", err) + } + return updated, nil +} + +func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, leases coordinationv1client.LeasesGetter, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) { // Fallback poll interval for the not-ready paths. Used when discovery on // this project control plane is missing coordination.k8s.io, so the Lease // watch is no-op'd by nsosource.OptionalKind and only this RequeueAfter @@ -198,8 +266,8 @@ func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, cl client return connectorLeaseStatus{message: "Connector lease has not been created yet.", requeueAfter: &pollAfter}, nil } - var lease coordinationv1.Lease - if err := cl.Get(ctx, client.ObjectKey{Namespace: connector.Namespace, Name: connector.Status.LeaseRef.Name}, &lease); err != nil { + lease, err := leases.Leases(connector.Namespace).Get(ctx, connector.Status.LeaseRef.Name, metav1.GetOptions{}) + if err != nil { if apierrors.IsNotFound(err) { return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline.", requeueAfter: &pollAfter}, nil } diff --git a/internal/controller/connector_controllers_test.go b/internal/controller/connector_controllers_test.go index 7e2f2eb..93271bb 100644 --- a/internal/controller/connector_controllers_test.go +++ b/internal/controller/connector_controllers_test.go @@ -10,10 +10,13 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -123,13 +126,25 @@ func TestConnectorReconcile(t *testing.T) { if tt.connectorClass != nil { builder = builder.WithObjects(tt.connectorClass) } - if tt.lease != nil { - builder = builder.WithObjects(tt.lease) - } builder = builder.WithStatusSubresource(tt.connector) cl := builder.Build() - reconciler := &ConnectorReconciler{mgr: &fakeMockManager{cl: cl}} + // Lease access goes through the typed coordinationv1 client to + // bypass the controller-runtime client's REST mapper (which on + // some project control planes has no mapping for Lease). Mirror + // that in tests with a typed fake clientset. + var leaseSeed []runtime.Object + if tt.lease != nil { + leaseSeed = append(leaseSeed, tt.lease) + } + leaseCS := kubefake.NewSimpleClientset(leaseSeed...) + + reconciler := &ConnectorReconciler{ + mgr: &fakeMockManager{cl: cl}, + LeaseClient: func(cluster.Cluster) (coordinationv1client.LeasesGetter, error) { + return leaseCS.CoordinationV1(), nil + }, + } req := mcreconcile.Request{ Request: reconcile.Request{ NamespacedName: client.ObjectKeyFromObject(tt.connector),