Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 121 additions & 30 deletions internal/controller/connector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand All @@ -27,12 +28,33 @@ import (

networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1"
"go.datum.net/network-services-operator/internal/config"
nsosource "go.datum.net/network-services-operator/internal/controller/source"
)

// ConnectorReconciler reconciles a Connector object
type ConnectorReconciler struct {
mgr mcmanager.Manager
Config config.NetworkServicesOperator

// LeaseClient builds a typed Lease client for a given project cluster.
// Production leaves this nil; the reconciler falls back to
// coordinationv1client.NewForConfig(cl.GetConfig()). The typed client
// encodes its own group/version, so it does not depend on the cluster's
// REST mapper — that's the whole point of routing Lease access through
// it rather than the controller-runtime client, which would fail with
// `no matches for kind "Lease"` against project control planes that
// omit coordination.k8s.io from discovery (see
// network-services-operator#160). Tests inject a fake clientset's
// CoordinationV1() so they don't need a real REST config.
LeaseClient func(cluster.Cluster) (coordinationv1client.LeasesGetter, error)
}

// leases returns the typed Lease client for the supplied cluster.
func (r *ConnectorReconciler) leases(cl cluster.Cluster) (coordinationv1client.LeasesGetter, error) {
if r.LeaseClient != nil {
return r.LeaseClient(cl)
}
return coordinationv1client.NewForConfig(cl.GetConfig())
}

// +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -119,31 +141,21 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
return ctrl.Result{}, nil
}

leases, err := r.leases(cl)
if err != nil {
return ctrl.Result{}, fmt.Errorf("build lease client: %w", err)
}

leaseDurationSeconds := r.connectorLeaseDurationSeconds()
if connector.Status.LeaseRef == nil || connector.Status.LeaseRef.Name == "" {
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: connector.Name,
Namespace: connector.Namespace,
},
}

if _, err := controllerutil.CreateOrUpdate(ctx, cl.GetClient(), lease, func() error {
if err := controllerutil.SetControllerReference(&connector, lease, cl.GetScheme()); err != nil {
return err
}
if lease.Spec.LeaseDurationSeconds == nil || *lease.Spec.LeaseDurationSeconds == 0 {
lease.Spec.LeaseDurationSeconds = &leaseDurationSeconds
}
return nil
}); err != nil {
lease, err := r.ensureConnectorLease(ctx, cl, leases, &connector, leaseDurationSeconds)
if err != nil {
return ctrl.Result{}, err
}

connector.Status.LeaseRef = &corev1.LocalObjectReference{Name: lease.Name}
}

leaseStatus, err := r.connectorLeaseReady(ctx, cl.GetClient(), &connector)
leaseStatus, err := r.connectorLeaseReady(ctx, leases, &connector)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -184,27 +196,92 @@ type connectorLeaseStatus struct {
message string
}

func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, cl client.Client, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) {
// ensureConnectorLease creates the per-Connector Lease if absent, or refreshes
// the owner reference / lease duration on an existing one. Goes through the
// typed Lease client to avoid the controller-runtime client's REST mapper,
// which has no mapping for coordination.k8s.io on project control planes that
// hide the group from discovery.
func (r *ConnectorReconciler) ensureConnectorLease(
ctx context.Context,
cl cluster.Cluster,
leases coordinationv1client.LeasesGetter,
connector *networkingv1alpha1.Connector,
leaseDurationSeconds int32,
) (*coordinationv1.Lease, error) {
existing, err := leases.Leases(connector.Namespace).Get(ctx, connector.Name, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: connector.Name,
Namespace: connector.Namespace,
},
Spec: coordinationv1.LeaseSpec{
LeaseDurationSeconds: &leaseDurationSeconds,
},
}
if err := controllerutil.SetControllerReference(connector, lease, cl.GetScheme()); err != nil {
return nil, fmt.Errorf("set owner ref on connector lease: %w", err)
}
created, err := leases.Leases(connector.Namespace).Create(ctx, lease, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
// A racing reconcile (or the agent) created it; re-fetch.
return leases.Leases(connector.Namespace).Get(ctx, connector.Name, metav1.GetOptions{})
}
if err != nil {
return nil, fmt.Errorf("create connector lease: %w", err)
}
return created, nil
case err != nil:
return nil, fmt.Errorf("load connector lease: %w", err)
}

mutated := existing.DeepCopy()
if err := controllerutil.SetControllerReference(connector, mutated, cl.GetScheme()); err != nil {
return nil, fmt.Errorf("set owner ref on connector lease: %w", err)
}
if mutated.Spec.LeaseDurationSeconds == nil || *mutated.Spec.LeaseDurationSeconds == 0 {
mutated.Spec.LeaseDurationSeconds = &leaseDurationSeconds
}
if equality.Semantic.DeepEqual(existing, mutated) {
return existing, nil
}
updated, err := leases.Leases(connector.Namespace).Update(ctx, mutated, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("update connector lease: %w", err)
}
return updated, nil
}

func (r *ConnectorReconciler) connectorLeaseReady(ctx context.Context, leases coordinationv1client.LeasesGetter, connector *networkingv1alpha1.Connector) (connectorLeaseStatus, error) {
// Fallback poll interval for the not-ready paths. Used when discovery on
// this project control plane is missing coordination.k8s.io, so the Lease
// watch is no-op'd by nsosource.OptionalKind and only this RequeueAfter
// keeps the Connector converging. Matches the per-Connector lease cadence
// so a freshly-renewed Lease shows up Ready within one duration.
leaseDuration := time.Duration(r.connectorLeaseDurationSeconds()) * time.Second
pollAfter := leaseDuration + leaseJitter(leaseDuration)

if connector.Status.LeaseRef == nil || connector.Status.LeaseRef.Name == "" {
return connectorLeaseStatus{message: "Connector lease has not been created yet."}, nil
return connectorLeaseStatus{message: "Connector lease has not been created yet.", requeueAfter: &pollAfter}, nil
}

var lease coordinationv1.Lease
if err := cl.Get(ctx, client.ObjectKey{Namespace: connector.Namespace, Name: connector.Status.LeaseRef.Name}, &lease); err != nil {
lease, err := leases.Leases(connector.Namespace).Get(ctx, connector.Status.LeaseRef.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline."}, nil
return connectorLeaseStatus{message: "Connector lease not found. Agent may be offline.", requeueAfter: &pollAfter}, nil
}
return connectorLeaseStatus{}, fmt.Errorf("failed to load connector lease: %w", err)
}

if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil {
return connectorLeaseStatus{message: "Connector lease has not been renewed yet."}, nil
return connectorLeaseStatus{message: "Connector lease has not been renewed yet.", requeueAfter: &pollAfter}, nil
}

expiryDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second
expiresAt := lease.Spec.RenewTime.Add(expiryDuration)
if time.Now().After(expiresAt) {
return connectorLeaseStatus{message: "Connector lease has expired. Agent may be offline."}, nil
return connectorLeaseStatus{message: "Connector lease has expired. Agent may be offline.", requeueAfter: &pollAfter}, nil
}

requeueAfter := time.Until(expiresAt) + leaseJitter(expiryDuration)
Expand All @@ -226,7 +303,7 @@ func leaseJitter(base time.Duration) time.Duration {
func (r *ConnectorReconciler) SetupWithManager(mgr mcmanager.Manager) error {
r.mgr = mgr

return mcbuilder.ControllerManagedBy(mgr).
c, err := mcbuilder.ControllerManagedBy(mgr).
For(&networkingv1alpha1.Connector{}).
Watches(
&networkingv1alpha1.ConnectorClass{},
Expand Down Expand Up @@ -263,10 +340,24 @@ func (r *ConnectorReconciler) SetupWithManager(mgr mcmanager.Manager) error {
})
},
).
Watches(
Named("connector").
Build(r)
if err != nil {
return err
}

// Lease watch is best-effort: some project control planes omit
// coordination.k8s.io from API discovery even though Lease itself is
// served (see network-services-operator#160). nsosource.OptionalKind
// keeps the per-cluster engagement alive when the GVK is missing from
// discovery; reconcile latency on those clusters falls back to the
// connectorLeaseReady RequeueAfter cadence above. The builder's
// .Watches() always wires mcsource.Kind, so we bypass it for Lease and
// call MultiClusterWatch directly with the optional wrapper.
return c.MultiClusterWatch(
nsosource.OptionalKind(
&coordinationv1.Lease{},
mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()),
).
Named("connector").
Complete(r)
),
)
}
23 changes: 19 additions & 4 deletions internal/controller/connector_controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -123,13 +126,25 @@ func TestConnectorReconcile(t *testing.T) {
if tt.connectorClass != nil {
builder = builder.WithObjects(tt.connectorClass)
}
if tt.lease != nil {
builder = builder.WithObjects(tt.lease)
}
builder = builder.WithStatusSubresource(tt.connector)
cl := builder.Build()

reconciler := &ConnectorReconciler{mgr: &fakeMockManager{cl: cl}}
// Lease access goes through the typed coordinationv1 client to
// bypass the controller-runtime client's REST mapper (which on
// some project control planes has no mapping for Lease). Mirror
// that in tests with a typed fake clientset.
var leaseSeed []runtime.Object
if tt.lease != nil {
leaseSeed = append(leaseSeed, tt.lease)
}
leaseCS := kubefake.NewSimpleClientset(leaseSeed...)

reconciler := &ConnectorReconciler{
mgr: &fakeMockManager{cl: cl},
LeaseClient: func(cluster.Cluster) (coordinationv1client.LeasesGetter, error) {
return leaseCS.CoordinationV1(), nil
},
}
req := mcreconcile.Request{
Request: reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(tt.connector),
Expand Down
70 changes: 54 additions & 16 deletions internal/controller/iroh_dns_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"
"strconv"
"strings"
"time"

coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +34,7 @@ import (

networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1"
"go.datum.net/network-services-operator/internal/config"
nsosource "go.datum.net/network-services-operator/internal/controller/source"
"go.datum.net/network-services-operator/internal/iroh"
)

Expand Down Expand Up @@ -123,20 +125,39 @@ func (r *IrohDNSReconciler) Reconcile(ctx context.Context, req mcreconcile.Reque
return ctrl.Result{}, nil
}

// Periodic requeue as the safety net for sibling handover on project
// control planes where Lease discovery is missing and nsosource.OptionalKind
// has disengaged the Lease watch. Owners get redundant polls; siblings get
// their only convergence signal. Bound on handover ≈ one lease duration.
result := ctrl.Result{RequeueAfter: r.requeueInterval()}

desired, ok, err := r.buildDesiredRecordSet(req.ClusterName, &connector)
if err != nil {
return ctrl.Result{}, err
return result, err
}
if !ok {
// Status not yet populated by the agent. If we previously claimed
// this endpoint id, release so a sibling can take over.
if err := r.releaseIfOwner(ctx, &connector); err != nil {
return ctrl.Result{}, err
return result, err
}
return ctrl.Result{}, r.setPublishedCondition(ctx, cl, &connector, metav1.ConditionFalse, connectorReasonIrohPending, "Connector status does not yet carry connection details.")
return result, r.setPublishedCondition(ctx, cl, &connector, metav1.ConditionFalse, connectorReasonIrohPending, "Connector status does not yet carry connection details.")
}

return ctrl.Result{}, r.applyClaim(ctx, cl, &connector, desired)
return result, r.applyClaim(ctx, cl, &connector, desired)
}

// requeueInterval is the periodic re-reconcile cadence for connectors whose
// ConnectorClass routes to iroh. Matches the Connector lease duration so
// sibling handover converges within ≈ one duration on clusters where the
// Lease watch is unavailable.
func (r *IrohDNSReconciler) requeueInterval() time.Duration {
d := int32(30)
if r.Config.Connector.LeaseDurationSeconds > 0 {
d = r.Config.Connector.LeaseDurationSeconds
}
base := time.Duration(d) * time.Second
return base + leaseJitter(base)
}

// applyClaim implements the claim-then-write loop:
Expand Down Expand Up @@ -387,21 +408,29 @@ func (r *IrohDNSReconciler) setPublishedCondition(ctx context.Context, cl cluste
// - Connector (For) and ConnectorClass (Watches) — the primary multicluster
// event sources.
//
// - Lease (Watches with EnqueueRequestForOwner) — agent heartbeats renew the
// Connector's Lease on every interval; that update fires our reconcile
// even when the Connector itself hasn't changed. This is the load-bearing
// trigger for sibling handover: when an owner Connector is deleted and
// - Lease (WatchesRawSource via nsosource.OptionalKind) — agent heartbeats
// renew the Connector's Lease on every interval; that update fires our
// reconcile even when the Connector itself hasn't changed. When the
// project control plane advertises coordination.k8s.io in discovery this
// watch drives sibling handover: when an owner Connector is deleted and
// its DNSRecordSet is GC'd, every sibling's next lease renewal drives its
// reconcile, and one of them wins the Create race for the now-empty z32.
// Bound on handover ≈ leaseDurationSeconds.
//
// The watch is best-effort — see network-services-operator#160. Some
// PCPs serve Lease at the raw URL but omit it from discovery, in which
// case OptionalKind disengages this source so the rest of the controller
// still reconciles. On those clusters sibling handover is driven instead
// by the irohRequeueInterval RequeueAfter at the bottom of Reconcile.
// Either way, bound on handover ≈ leaseDurationSeconds.
//
// - DNSRecordSet on the downstream cluster — drift detection. Mapper
// enqueues the *current owner* Connector identified by the labels on the
// DNSRecordSet, catching cases like a manual external delete of the
// record. Sibling handover does NOT flow through this watch:
// multicluster-runtime's manager exposes GetCluster(name) but no
// enumeration, so a downstream event can't fan out to siblings across
// project clusters. That's the Lease watch's job.
// project clusters. That's the Lease watch's (and now the periodic
// requeue's) job.
func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error {
r.mgr = mgr

Expand All @@ -424,7 +453,7 @@ func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error {
)
downstreamClusterSource, _, _ := downstreamSource.ForCluster("", r.Downstream)

return mcbuilder.ControllerManagedBy(mgr).
c, err := mcbuilder.ControllerManagedBy(mgr).
For(&networkingv1alpha1.Connector{}).
Watches(
&networkingv1alpha1.ConnectorClass{},
Expand Down Expand Up @@ -455,11 +484,20 @@ func (r *IrohDNSReconciler) SetupWithManager(mgr mcmanager.Manager) error {
})
},
).
Watches(
&coordinationv1.Lease{},
mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()),
).
WatchesRawSource(downstreamClusterSource).
Named("iroh-dns").
Complete(r)
Build(r)
if err != nil {
return err
}

// Best-effort multi-cluster Lease watch — bypasses the builder so it can
// degrade gracefully on PCPs missing coordination.k8s.io in discovery.
// See connector_controller.go SetupWithManager for the rationale.
return c.MultiClusterWatch(
nsosource.OptionalKind(
&coordinationv1.Lease{},
mchandler.EnqueueRequestForOwner(&networkingv1alpha1.Connector{}, handler.OnlyControllerOwner()),
),
)
}
Loading
Loading