diff --git a/internal/controller/connector_controller.go b/internal/controller/connector_controller.go index 683e99f..ec37382 100644 --- a/internal/controller/connector_controller.go +++ b/internal/controller/connector_controller.go @@ -14,6 +14,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -27,12 +28,33 @@ import ( networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" "go.datum.net/network-services-operator/internal/config" + nsosource "go.datum.net/network-services-operator/internal/controller/source" ) // ConnectorReconciler reconciles a Connector object type ConnectorReconciler struct { mgr mcmanager.Manager Config config.NetworkServicesOperator + + // LeaseClient builds a typed Lease client for a given project cluster. + // Production leaves this nil; the reconciler falls back to + // coordinationv1client.NewForConfig(cl.GetConfig()). The typed client + // encodes its own group/version, so it does not depend on the cluster's + // REST mapper — that's the whole point of routing Lease access through + // it rather than the controller-runtime client, which would fail with + // `no matches for kind "Lease"` against project control planes that + // omit coordination.k8s.io from discovery (see + // network-services-operator#160). Tests inject a fake clientset's + // CoordinationV1() so they don't need a real REST config. + LeaseClient func(cluster.Cluster) (coordinationv1client.LeasesGetter, error) +} + +// leases returns the typed Lease client for the supplied cluster. +func (r *ConnectorReconciler) leases(cl cluster.Cluster) (coordinationv1client.LeasesGetter, error) { + if r.LeaseClient != nil { + return r.LeaseClient(cl) + } + return coordinationv1client.NewForConfig(cl.GetConfig()) } // +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors,verbs=get;list;watch;create;update;patch;delete @@ -119,31 +141,21 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req return ctrl.Result{}, nil } + leases, err := r.leases(cl) + if err != nil { + return ctrl.Result{}, fmt.Errorf("build lease client: %w", err) + } + leaseDurationSeconds := r.connectorLeaseDurationSeconds() if connector.Status.LeaseRef == nil || connector.Status.LeaseRef.Name == "" { - lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: connector.Name, - Namespace: connector.Namespace, - }, - } - - if _, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), lease, func() error { - if err := controllerutil.SetControllerReference(&connector, lease, cl.GetScheme()); err != nil { - return err - } - if lease.Spec.LeaseDurationSeconds == nil || *lease.Spec.LeaseDurationSeconds == 0 { - lease.Spec.LeaseDurationSeconds = &leaseDurationSeconds - } - return nil - }); err != nil { + lease, err := r.ensureConnectorLease(ctx, cl, leases, &connector, leaseDurationSeconds) + if err != nil { return ctrl.Result{}, err } - connector.Status.LeaseRef = &corev1.LocalObjectReference{Name: lease.Name} } - leaseStatus, err := r.connectorLeaseReady(ctx, cl.GetClient(), &connector) + leaseStatus, err := r.connectorLeaseReady(ctx, leases, &connector) if err != nil { return ctrl.Result{}, err } @@ -184,27 +196,92 @@ type connectorLeaseStatus struct { message string } -func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, cl client.Client, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) { +// ensureConnectorLease creates the per-Connector Lease if absent, or refreshes +// the owner reference / lease duration on an existing one. Goes through the +// typed Lease client to avoid the controller-runtime client's REST mapper, +// which has no mapping for coordination.k8s.io on project control planes that +// hide the group from discovery. +func (r *ConnectorReconciler) ensureConnectorLease( + ctx context.Context, + cl cluster.Cluster, + leases coordinationv1client.LeasesGetter, + connector *networkingv1alpha1.Connector, + leaseDurationSeconds int32, +) (*coordinationv1.Lease, error) { + existing, err := leases.Leases(connector.Namespace).Get(ctx, connector.Name, metav1.GetOptions{}) + switch { + case apierrors.IsNotFound(err): + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: connector.Name, + Namespace: connector.Namespace, + }, + Spec: coordinationv1.LeaseSpec{ + LeaseDurationSeconds: &leaseDurationSeconds, + }, + } + if err := controllerutil.SetControllerReference(connector, lease, cl.GetScheme()); err != nil { + return nil, fmt.Errorf("set owner ref on connector lease: %w", err) + } + created, err := leases.Leases(connector.Namespace).Create(ctx, lease, metav1.CreateOptions{}) + if apierrors.IsAlreadyExists(err) { + // A racing reconcile (or the agent) created it; re-fetch. + return leases.Leases(connector.Namespace).Get(ctx, connector.Name, metav1.GetOptions{}) + } + if err != nil { + return nil, fmt.Errorf("create connector lease: %w", err) + } + return created, nil + case err != nil: + return nil, fmt.Errorf("load connector lease: %w", err) + } + + mutated := existing.DeepCopy() + if err := controllerutil.SetControllerReference(connector, mutated, cl.GetScheme()); err != nil { + return nil, fmt.Errorf("set owner ref on connector lease: %w", err) + } + if mutated.Spec.LeaseDurationSeconds == nil || *mutated.Spec.LeaseDurationSeconds == 0 { + mutated.Spec.LeaseDurationSeconds = &leaseDurationSeconds + } + if equality.Semantic.DeepEqual(existing, mutated) { + return existing, nil + } + updated, err := leases.Leases(connector.Namespace).Update(ctx, mutated, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("update connector lease: %w", err) + } + return updated, nil +} + +func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, leases coordinationv1client.LeasesGetter, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) { + // Fallback poll interval for the not-ready paths. Used when discovery on + // this project control plane is missing coordination.k8s.io, so the Lease + // watch is no-op'd by nsosource.OptionalKind and only this RequeueAfter + // keeps the Connector converging. Matches the per-Connector lease cadence + // so a freshly-renewed Lease shows up Ready within one duration. + leaseDuration := time.Duration(r.connectorLeaseDurationSeconds()) * time.Second + pollAfter := leaseDuration + leaseJitter(leaseDuration) + if connector.Status.LeaseRef == nil || connector.Status.LeaseRef.Name == "" { - return connectorLeaseStatus{message: "Connector lease has not been created yet."}, nil + return connectorLeaseStatus{message: "Connector lease has not been created yet.", requeueAfter: &pollAfter}, nil } - var lease coordinationv1.Lease - if err := cl.Get(ctx, client.ObjectKey{Namespace: connector.Namespace, Name: connector.Status.LeaseRef.Name}, &lease); err != nil { + lease, err := leases.Leases(connector.Namespace).Get(ctx, connector.Status.LeaseRef.Name, metav1.GetOptions{}) + if err != nil { if apierrors.IsNotFound(err) { - return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline."}, nil + return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline.", requeueAfter: &pollAfter}, nil } return connectorLeaseStatus{}, fmt.Errorf("failed to load connector lease: %w", err) } if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil { - return connectorLeaseStatus{message: "Connector lease has not been renewed yet."}, nil + return connectorLeaseStatus{message: "Connector lease has not been renewed yet.", requeueAfter: &pollAfter}, nil } expiryDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second expiresAt := lease.Spec.RenewTime.Add(expiryDuration) if time.Now().After(expiresAt) { - return connectorLeaseStatus{message: "Connector lease has expired. Agent may be offline."}, nil + return connectorLeaseStatus{message: "Connector lease has expired. Agent may be offline.", requeueAfter: &pollAfter}, nil } requeueAfter := time.Until(expiresAt) + leaseJitter(expiryDuration) @@ -226,7 +303,7 @@ func leaseJitter(base time.Duration) time.Duration { func (r *ConnectorReconciler) SetupWithManager(mgr mcmanager.Manager) error { r.mgr = mgr - return mcbuilder.ControllerManagedBy(mgr). + c, err := mcbuilder.ControllerManagedBy(mgr). For(&networkingv1alpha1.Connector{}). Watches( &networkingv1alpha1.ConnectorClass{}, @@ -263,10 +340,24 @@ func (r *ConnectorReconciler) SetupWithManager(mgr mcmanager.Manager) error { }) }, ). - Watches( + Named("connector"). + Build(r) + if err != nil { + return err + } + + // Lease watch is best-effort: some project control planes omit + // coordination.k8s.io from API discovery even though Lease itself is + // served (see network-services-operator#160). nsosource.OptionalKind + // keeps the per-cluster engagement alive when the GVK is missing from + // discovery; reconcile latency on those clusters falls back to the + // connectorLeaseReady RequeueAfter cadence above. The builder's + // .Watches() always wires mcsource.Kind, so we bypass it for Lease and + // call MultiClusterWatch directly with the optional wrapper. + return c.MultiClusterWatch( + nsosource.OptionalKind( &coordinationv1.Lease{}, mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()), - ). - Named("connector"). - Complete(r) + ), + ) } diff --git a/internal/controller/connector_controllers_test.go b/internal/controller/connector_controllers_test.go index 7e2f2eb..93271bb 100644 --- a/internal/controller/connector_controllers_test.go +++ b/internal/controller/connector_controllers_test.go @@ -10,10 +10,13 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -123,13 +126,25 @@ func TestConnectorReconcile(t *testing.T) { if tt.connectorClass != nil { builder = builder.WithObjects(tt.connectorClass) } - if tt.lease != nil { - builder = builder.WithObjects(tt.lease) - } builder = builder.WithStatusSubresource(tt.connector) cl := builder.Build() - reconciler := &ConnectorReconciler{mgr: &fakeMockManager{cl: cl}} + // Lease access goes through the typed coordinationv1 client to + // bypass the controller-runtime client's REST mapper (which on + // some project control planes has no mapping for Lease). Mirror + // that in tests with a typed fake clientset. + var leaseSeed []runtime.Object + if tt.lease != nil { + leaseSeed = append(leaseSeed, tt.lease) + } + leaseCS := kubefake.NewSimpleClientset(leaseSeed...) + + reconciler := &ConnectorReconciler{ + mgr: &fakeMockManager{cl: cl}, + LeaseClient: func(cluster.Cluster) (coordinationv1client.LeasesGetter, error) { + return leaseCS.CoordinationV1(), nil + }, + } req := mcreconcile.Request{ Request: reconcile.Request{ NamespacedName: client.ObjectKeyFromObject(tt.connector), diff --git a/internal/controller/iroh_dns_controller.go b/internal/controller/iroh_dns_controller.go index feb11ee..31bdeeb 100644 --- a/internal/controller/iroh_dns_controller.go +++ b/internal/controller/iroh_dns_controller.go @@ -10,6 +10,7 @@ import ( "slices" "strconv" "strings" + "time" coordinationv1 "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" @@ -33,6 +34,7 @@ import ( networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" "go.datum.net/network-services-operator/internal/config" + nsosource "go.datum.net/network-services-operator/internal/controller/source" "go.datum.net/network-services-operator/internal/iroh" ) @@ -123,20 +125,39 @@ func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Reque return ctrl.Result{}, nil } + // Periodic requeue as the safety net for sibling handover on project + // control planes where Lease discovery is missing and nsosource.OptionalKind + // has disengaged the Lease watch. Owners get redundant polls; siblings get + // their only convergence signal. Bound on handover ≈ one lease duration. + result := ctrl.Result{RequeueAfter: r.requeueInterval()} + desired, ok, err := r.buildDesiredRecordSet(req.ClusterName, &connector) if err != nil { - return ctrl.Result{}, err + return result, err } if !ok { // Status not yet populated by the agent. If we previously claimed // this endpoint id, release so a sibling can take over. if err := r.releaseIfOwner(ctx, &connector); err != nil { - return ctrl.Result{}, err + return result, err } - return ctrl.Result{}, r.setPublishedCondition(ctx, cl, &connector, metav1.ConditionFalse, connectorReasonIrohPending, "Connector status does not yet carry connection details.") + return result, r.setPublishedCondition(ctx, cl, &connector, metav1.ConditionFalse, connectorReasonIrohPending, "Connector status does not yet carry connection details.") } - return ctrl.Result{}, r.applyClaim(ctx, cl, &connector, desired) + return result, r.applyClaim(ctx, cl, &connector, desired) +} + +// requeueInterval is the periodic re-reconcile cadence for connectors whose +// ConnectorClass routes to iroh. Matches the Connector lease duration so +// sibling handover converges within ≈ one duration on clusters where the +// Lease watch is unavailable. +func (r *IrohDNSReconciler) requeueInterval() time.Duration { + d := int32(30) + if r.Config.Connector.LeaseDurationSeconds > 0 { + d = r.Config.Connector.LeaseDurationSeconds + } + base := time.Duration(d) * time.Second + return base + leaseJitter(base) } // applyClaim implements the claim-then-write loop: @@ -387,13 +408,20 @@ func (r *IrohDNSReconciler) setPublishedCondition(ctx context.Context, cl cluste // - Connector (For) and ConnectorClass (Watches) — the primary multicluster // event sources. // -// - Lease (Watches with EnqueueRequestForOwner) — agent heartbeats renew the -// Connector's Lease on every interval; that update fires our reconcile -// even when the Connector itself hasn't changed. This is the load-bearing -// trigger for sibling handover: when an owner Connector is deleted and +// - Lease (WatchesRawSource via nsosource.OptionalKind) — agent heartbeats +// renew the Connector's Lease on every interval; that update fires our +// reconcile even when the Connector itself hasn't changed. When the +// project control plane advertises coordination.k8s.io in discovery this +// watch drives sibling handover: when an owner Connector is deleted and // its DNSRecordSet is GC'd, every sibling's next lease renewal drives its // reconcile, and one of them wins the Create race for the now-empty z32. -// Bound on handover ≈ leaseDurationSeconds. +// +// The watch is best-effort — see network-services-operator#160. Some +// PCPs serve Lease at the raw URL but omit it from discovery, in which +// case OptionalKind disengages this source so the rest of the controller +// still reconciles. On those clusters sibling handover is driven instead +// by the irohRequeueInterval RequeueAfter at the bottom of Reconcile. +// Either way, bound on handover ≈ leaseDurationSeconds. // // - DNSRecordSet on the downstream cluster — drift detection. Mapper // enqueues the *current owner* Connector identified by the labels on the @@ -401,7 +429,8 @@ func (r *IrohDNSReconciler) setPublishedCondition(ctx context.Context, cl cluste // record. Sibling handover does NOT flow through this watch: // multicluster-runtime's manager exposes GetCluster(name) but no // enumeration, so a downstream event can't fan out to siblings across -// project clusters. That's the Lease watch's job. +// project clusters. That's the Lease watch's (and now the periodic +// requeue's) job. func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error { r.mgr = mgr @@ -424,7 +453,7 @@ func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error { ) downstreamClusterSource, _, _ := downstreamSource.ForCluster("", r.Downstream) - return mcbuilder.ControllerManagedBy(mgr). + c, err := mcbuilder.ControllerManagedBy(mgr). For(&networkingv1alpha1.Connector{}). Watches( &networkingv1alpha1.ConnectorClass{}, @@ -455,11 +484,20 @@ func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error { }) }, ). - Watches( - &coordinationv1.Lease{}, - mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()), - ). WatchesRawSource(downstreamClusterSource). Named("iroh-dns"). - Complete(r) + Build(r) + if err != nil { + return err + } + + // Best-effort multi-cluster Lease watch — bypasses the builder so it can + // degrade gracefully on PCPs missing coordination.k8s.io in discovery. + // See connector_controller.go SetupWithManager for the rationale. + return c.MultiClusterWatch( + nsosource.OptionalKind( + &coordinationv1.Lease{}, + mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()), + ), + ) } diff --git a/internal/controller/source/optional.go b/internal/controller/source/optional.go new file mode 100644 index 0000000..d07190e --- /dev/null +++ b/internal/controller/source/optional.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +// Package source provides controller-runtime [source.Source] wrappers tailored +// for multi-cluster reconcilers running against API servers with non-uniform +// discovery. +package source + +import ( + "fmt" + + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + + crsource "sigs.k8s.io/controller-runtime/pkg/source" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + mcsource "sigs.k8s.io/multicluster-runtime/pkg/source" +) + +// OptionalKind wraps [mcsource.Kind] so that a per-cluster engagement does not +// fail when the watched GVK is missing from that cluster's discovery. The +// underlying multicluster controller engages each source against each cluster +// at runtime; if any source returns an error (or a cache informer for a +// missing kind fails to start), the whole controller/cluster pair is rejected +// and never reconciles. +// +// In practice this happens against Datum project control planes whose API +// servers serve coordination.k8s.io/v1.Lease at the raw URL but omit the group +// from discovery. controller-runtime's REST mapper then returns +// [apimeta.NoKindMatchError], which aborts the engagement and leaves both the +// Lease watch and every other source on that cluster offline. +// +// OptionalKind defers to [mcsource.Kind] for clusters where the GVK is +// discoverable. When [apimeta.IsNoMatchError] is true at engagement time, it +// returns shouldEngage=false so the multicluster controller skips that source +// for that cluster and proceeds with the rest of the engagement. Reconcilers +// that previously relied on this watch must compensate with a periodic +// [ctrl.Result.RequeueAfter] so the object's state still converges. +func OptionalKind( + obj client.Object, + handler mchandler.EventHandlerFunc, + predicates ...predicate.Predicate, +) mcsource.Source { + return &optionalKind{ + obj: obj, + inner: mcsource.Kind(obj, handler, predicates...), + predicates: predicates, + } +} + +type optionalKind struct { + obj client.Object + inner mcsource.SyncingSource[client.Object] + predicates []predicate.Predicate +} + +// ForCluster probes the cluster's REST mapper for the wrapped GVK. If the +// mapper has no match, the source disengages from this cluster (engagement +// succeeds, no events) and logs once at V(1). Any other mapper error is +// propagated so the manager retries — only the no-match case is treated as +// "stay offline." +func (o *optionalKind) ForCluster(name string, cl cluster.Cluster) (crsource.TypedSource[mcreconcile.Request], bool, error) { + gvk, err := gvkForObject(o.obj, cl) + if err != nil { + return nil, false, fmt.Errorf("determine GVK for optional kind: %w", err) + } + + if _, err := cl.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + if apimeta.IsNoMatchError(err) { + log.Log.WithName("optional-kind").V(1).Info( + "optional watch unavailable on cluster, degrading to no-op", + "cluster", name, + "gvk", gvk.String(), + ) + return nil, false, nil + } + return nil, false, fmt.Errorf("rest mapping for %s on cluster %q: %w", gvk, name, err) + } + + return o.inner.ForCluster(name, cl) +} + +// gvkForObject resolves the object's GVK from the cluster's scheme. The cluster +// scheme is preferred over the manager's so this works correctly for cluster +// adapters that augment the scheme on a per-cluster basis. +func gvkForObject(obj client.Object, cl cluster.Cluster) (schema.GroupVersionKind, error) { + // Honor an explicit GVK if the caller already set one (e.g. via + // PartialObjectMetadata). Otherwise resolve via the scheme. + if gvk := obj.GetObjectKind().GroupVersionKind(); gvk.Kind != "" { + return gvk, nil + } + gvks, _, err := cl.GetScheme().ObjectKinds(obj) + if err != nil { + return schema.GroupVersionKind{}, err + } + if len(gvks) == 0 { + return schema.GroupVersionKind{}, fmt.Errorf("no GVK registered for %T", obj) + } + return gvks[0], nil +} diff --git a/internal/controller/source/optional_test.go b/internal/controller/source/optional_test.go new file mode 100644 index 0000000..404d884 --- /dev/null +++ b/internal/controller/source/optional_test.go @@ -0,0 +1,153 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package source + +import ( + "context" + "errors" + "net/http" + "testing" + + coordinationv1 "k8s.io/api/coordination/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/meta/testrestmapper" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + crcache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + + mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" +) + +// noopHandler is the simplest valid mchandler.EventHandlerFunc — it never +// actually wires events, which is fine because none of these tests reach the +// Start path. +func noopHandler() mchandler.EventHandlerFunc { + return mchandler.EnqueueRequestForOwner(&coordinationv1.Lease{}) +} + +// fakeCluster is a hand-written stub satisfying cluster.Cluster. It returns +// the configured scheme + REST mapper and panics on anything OptionalKind +// shouldn't touch — guarding against accidental scope creep in the wrapper. +type fakeCluster struct { + scheme *runtime.Scheme + mapper apimeta.RESTMapper +} + +func (f *fakeCluster) GetHTTPClient() *http.Client { panic("not used") } +func (f *fakeCluster) GetConfig() *rest.Config { panic("not used") } +func (f *fakeCluster) GetCache() crcache.Cache { panic("not used") } +func (f *fakeCluster) GetScheme() *runtime.Scheme { return f.scheme } +func (f *fakeCluster) GetClient() client.Client { panic("not used") } +func (f *fakeCluster) GetFieldIndexer() client.FieldIndexer { panic("not used") } +func (f *fakeCluster) GetEventRecorderFor(name string) record.EventRecorder { panic("not used") } +func (f *fakeCluster) GetRESTMapper() apimeta.RESTMapper { return f.mapper } +func (f *fakeCluster) GetAPIReader() client.Reader { panic("not used") } +func (f *fakeCluster) Start(_ context.Context) error { panic("not used") } + +var _ cluster.Cluster = (*fakeCluster)(nil) + +// noMatchMapper wraps a real mapper and rewrites RESTMapping on coordinationv1.Lease +// to return apimeta.NoKindMatchError — simulating a PCP that doesn't advertise +// coordination.k8s.io in discovery. +type noMatchMapper struct { + apimeta.RESTMapper + hideGroup string +} + +func (m *noMatchMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*apimeta.RESTMapping, error) { + if gk.Group == m.hideGroup { + return nil, &apimeta.NoKindMatchError{GroupKind: gk, SearchedVersions: versions} + } + return m.RESTMapper.RESTMapping(gk, versions...) +} + +func newScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + if err := scheme.AddToScheme(s); err != nil { + t.Fatalf("add to scheme: %v", err) + } + return s +} + +func TestOptionalKind_NoMatchErrorDegradesGracefully(t *testing.T) { + s := newScheme(t) + mapper := &noMatchMapper{ + RESTMapper: testrestmapper.TestOnlyStaticRESTMapper(s), + hideGroup: coordinationv1.SchemeGroupVersion.Group, + } + cl := &fakeCluster{scheme: s, mapper: mapper} + + src := OptionalKind(&coordinationv1.Lease{}, noopHandler()) + + got, shouldEngage, err := src.ForCluster("cluster-a", cl) + if err != nil { + t.Fatalf("ForCluster returned error: %v", err) + } + if shouldEngage { + t.Errorf("shouldEngage=true, want false when REST mapper has no match") + } + if got != nil { + t.Errorf("source = %v, want nil when REST mapper has no match", got) + } +} + +func TestOptionalKind_HealthyMappingDelegates(t *testing.T) { + s := newScheme(t) + cl := &fakeCluster{ + scheme: s, + mapper: testrestmapper.TestOnlyStaticRESTMapper(s), + } + + src := OptionalKind(&coordinationv1.Lease{}, noopHandler()) + + got, shouldEngage, err := src.ForCluster("cluster-a", cl) + if err != nil { + t.Fatalf("ForCluster returned error: %v", err) + } + if !shouldEngage { + t.Errorf("shouldEngage=false, want true when REST mapper is healthy") + } + if got == nil { + t.Errorf("source = nil, want non-nil delegate when REST mapper is healthy") + } +} + +// otherMapperError is a generic mapper error that is NOT NoKindMatchError, used +// to verify OptionalKind propagates it instead of silently no-op'ing. +type otherErrorMapper struct { + apimeta.RESTMapper + err error +} + +func (m *otherErrorMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*apimeta.RESTMapping, error) { + return nil, m.err +} + +func TestOptionalKind_NonMatchErrorsPropagate(t *testing.T) { + s := newScheme(t) + boom := errors.New("transient discovery failure") + cl := &fakeCluster{ + scheme: s, + mapper: &otherErrorMapper{ + RESTMapper: testrestmapper.TestOnlyStaticRESTMapper(s), + err: boom, + }, + } + + src := OptionalKind(&coordinationv1.Lease{}, noopHandler()) + + _, _, err := src.ForCluster("cluster-a", cl) + if err == nil { + t.Fatalf("expected error to propagate, got nil") + } + if !errors.Is(err, boom) { + t.Errorf("error %v does not wrap %v", err, boom) + } +}