diff --git a/controllers/disruption_controller.go b/controllers/disruption_controller.go index a0c90d54d..9781fc01f 100644 --- a/controllers/disruption_controller.go +++ b/controllers/disruption_controller.go @@ -1212,10 +1212,24 @@ func (r *DisruptionReconciler) recordEventOnTarget(ctx context.Context, instance // SetupWithManager setups the current reconciler with the given manager func (r *DisruptionReconciler) SetupWithManager(mgr ctrl.Manager, kubeInformerFactory kubeinformers.SharedInformerFactory) (controller.Controller, error) { - podToDisruption := func(ctx context.Context, d *corev1.Pod) []reconcile.Request { + if kubeInformerFactory == nil { + return nil, fmt.Errorf("kubernetes informer factory cannot be nil") + } + + podToDisruption := func(ctx context.Context, o client.Object) []reconcile.Request { + d, ok := o.(*corev1.Pod) + if !ok { + return nil + } + // podtoDisruption is a function that maps pods to disruptions. it is meant to be used as an event handler on a pod informer // this function should safely return an empty list of requests to reconcile if the object we receive is not actually a chaos pod // which we determine by checking the object labels for the name and namespace labels that we add to all injector pods + requests := mapPodToDisruptionRequests(d) + if len(requests) == 0 { + return nil + } + if r.BaseLog != nil { r.BaseLog.Debugw("watching event from pod", tagutil.ChaosPodNameKey, d.GetName(), tagutil.ChaosPodNamespaceKey, d.GetNamespace()) } @@ -1225,21 +1239,40 @@ func (r *DisruptionReconciler) SetupWithManager(mgr ctrl.Manager, kubeInformerFa tagutil.FormatTag(tagutil.PodNamespaceKey, d.GetNamespace()), })) - podLabels := d.GetLabels() - name := podLabels[chaostypes.DisruptionNameLabel] - namespace := podLabels[chaostypes.DisruptionNamespaceLabel] + return requests + } - return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}}} + podInformerSource := &source.Informer{ + Informer: kubeInformerFactory.Core().V1().Pods().Informer(), + Handler: handler.EnqueueRequestsFromMapFunc(podToDisruption), + Predicates: []predicate.Predicate{ + chaosEventsPredicate(), + }, } return ctrl.NewControllerManagedBy(mgr). For(&chaosv1beta1.Disruption{}). WithOptions(controller.Options{RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, time.Hour)}). - WatchesRawSource(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc(podToDisruption))). - WithEventFilter(chaosEventsPredicate()). + WatchesRawSource(podInformerSource). Build(r) } +func mapPodToDisruptionRequests(pod *corev1.Pod) []reconcile.Request { + if pod == nil { + return nil + } + + podLabels := pod.GetLabels() + name := podLabels[chaostypes.DisruptionNameLabel] + namespace := podLabels[chaostypes.DisruptionNamespaceLabel] + + if name == "" || namespace == "" { + return nil + } + + return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}}} +} + // chaosEventsPredicate determines if given event is a chaos related one or not func chaosEventsPredicate() predicate.Predicate { return predicate.Funcs{ diff --git a/controllers/disruption_controller_pod_mapping_test.go b/controllers/disruption_controller_pod_mapping_test.go new file mode 100644 index 000000000..88c5550ba --- /dev/null +++ b/controllers/disruption_controller_pod_mapping_test.go @@ -0,0 +1,129 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026 Datadog, Inc. + +package controllers + +import ( + "testing" + + chaosv1beta1 "github.com/DataDog/chaos-controller/api/v1beta1" + chaostypes "github.com/DataDog/chaos-controller/types" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func TestMapPodToDisruptionRequests(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + pod *corev1.Pod + expected []reconcile.Request + }{ + { + name: "returns empty list for nil pod", + pod: nil, + expected: nil, + }, + { + name: "returns empty list for pod without disruption labels", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "target", + Namespace: "target-namespace", + }, + }, + expected: nil, + }, + { + name: "returns empty list for pod missing disruption namespace label", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "chaos-pod", + Namespace: "chaos-engineering", + Labels: map[string]string{ + chaostypes.DisruptionNameLabel: "disruption-a", + }, + }, + }, + expected: nil, + }, + { + name: "maps pod with disruption labels to reconcile request", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "chaos-pod", + Namespace: "chaos-engineering", + Labels: map[string]string{ + chaostypes.DisruptionNameLabel: "disruption-a", + chaostypes.DisruptionNamespaceLabel: "ns-a", + }, + }, + }, + expected: []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: "disruption-a", + Namespace: "ns-a", + }, + }, + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + result := mapPodToDisruptionRequests(testCase.pod) + + if len(result) != len(testCase.expected) { + t.Fatalf("unexpected number of requests: got %d, expected %d", len(result), len(testCase.expected)) + } + + for i := range testCase.expected { + if result[i] != testCase.expected[i] { + t.Fatalf("unexpected request at index %d: got %#v, expected %#v", i, result[i], testCase.expected[i]) + } + } + }) + } +} + +func TestShouldTriggerReconcile(t *testing.T) { + t.Parallel() + + t.Run("returns true for disruption objects", func(t *testing.T) { + t.Parallel() + if !shouldTriggerReconcile(&chaosv1beta1.Disruption{}) { + t.Fatal("expected disruption object to trigger reconcile") + } + }) + + t.Run("returns false for pod without disruption labels", func(t *testing.T) { + t.Parallel() + if shouldTriggerReconcile(&corev1.Pod{}) { + t.Fatal("expected pod without disruption labels to not trigger reconcile") + } + }) + + t.Run("returns true for pod with disruption labels", func(t *testing.T) { + t.Parallel() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + chaostypes.DisruptionNameLabel: "disruption-a", + chaostypes.DisruptionNamespaceLabel: "ns-a", + }, + }, + } + + if !shouldTriggerReconcile(pod) { + t.Fatal("expected pod with disruption labels to trigger reconcile") + } + }) +}