Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions controllers/disruption_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,10 +1212,24 @@ func (r *DisruptionReconciler) recordEventOnTarget(ctx context.Context, instance

// SetupWithManager setups the current reconciler with the given manager
func (r *DisruptionReconciler) SetupWithManager(mgr ctrl.Manager, kubeInformerFactory kubeinformers.SharedInformerFactory) (controller.Controller, error) {
podToDisruption := func(ctx context.Context, d *corev1.Pod) []reconcile.Request {
if kubeInformerFactory == nil {
return nil, fmt.Errorf("kubernetes informer factory cannot be nil")
}

podToDisruption := func(ctx context.Context, o client.Object) []reconcile.Request {
d, ok := o.(*corev1.Pod)
if !ok {
return nil
}

// podtoDisruption is a function that maps pods to disruptions. it is meant to be used as an event handler on a pod informer
// this function should safely return an empty list of requests to reconcile if the object we receive is not actually a chaos pod
// which we determine by checking the object labels for the name and namespace labels that we add to all injector pods
requests := mapPodToDisruptionRequests(d)
if len(requests) == 0 {
return nil
}

if r.BaseLog != nil {
r.BaseLog.Debugw("watching event from pod", tagutil.ChaosPodNameKey, d.GetName(), tagutil.ChaosPodNamespaceKey, d.GetNamespace())
}
Expand All @@ -1225,21 +1239,40 @@ func (r *DisruptionReconciler) SetupWithManager(mgr ctrl.Manager, kubeInformerFa
tagutil.FormatTag(tagutil.PodNamespaceKey, d.GetNamespace()),
}))

podLabels := d.GetLabels()
name := podLabels[chaostypes.DisruptionNameLabel]
namespace := podLabels[chaostypes.DisruptionNamespaceLabel]
return requests
}

return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}}}
podInformerSource := &source.Informer{
Informer: kubeInformerFactory.Core().V1().Pods().Informer(),
Handler: handler.EnqueueRequestsFromMapFunc(podToDisruption),
Predicates: []predicate.Predicate{
chaosEventsPredicate(),
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&chaosv1beta1.Disruption{}).
WithOptions(controller.Options{RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](time.Second, time.Hour)}).
WatchesRawSource(source.Kind(mgr.GetCache(), &corev1.Pod{}, handler.TypedEnqueueRequestsFromMapFunc(podToDisruption))).
WithEventFilter(chaosEventsPredicate()).
WatchesRawSource(podInformerSource).
Build(r)
}

func mapPodToDisruptionRequests(pod *corev1.Pod) []reconcile.Request {
if pod == nil {
return nil
}

podLabels := pod.GetLabels()
name := podLabels[chaostypes.DisruptionNameLabel]
namespace := podLabels[chaostypes.DisruptionNamespaceLabel]

if name == "" || namespace == "" {
return nil
}

return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}}}
}

// chaosEventsPredicate determines if given event is a chaos related one or not
func chaosEventsPredicate() predicate.Predicate {
return predicate.Funcs{
Expand Down
129 changes: 129 additions & 0 deletions controllers/disruption_controller_pod_mapping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026 Datadog, Inc.

package controllers

import (
"testing"

chaosv1beta1 "github.com/DataDog/chaos-controller/api/v1beta1"
chaostypes "github.com/DataDog/chaos-controller/types"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func TestMapPodToDisruptionRequests(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
pod *corev1.Pod
expected []reconcile.Request
}{
{
name: "returns empty list for nil pod",
pod: nil,
expected: nil,
},
{
name: "returns empty list for pod without disruption labels",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "target",
Namespace: "target-namespace",
},
},
expected: nil,
},
{
name: "returns empty list for pod missing disruption namespace label",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "chaos-pod",
Namespace: "chaos-engineering",
Labels: map[string]string{
chaostypes.DisruptionNameLabel: "disruption-a",
},
},
},
expected: nil,
},
{
name: "maps pod with disruption labels to reconcile request",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "chaos-pod",
Namespace: "chaos-engineering",
Labels: map[string]string{
chaostypes.DisruptionNameLabel: "disruption-a",
chaostypes.DisruptionNamespaceLabel: "ns-a",
},
},
},
expected: []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: "disruption-a",
Namespace: "ns-a",
},
},
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

result := mapPodToDisruptionRequests(testCase.pod)

if len(result) != len(testCase.expected) {
t.Fatalf("unexpected number of requests: got %d, expected %d", len(result), len(testCase.expected))
}

for i := range testCase.expected {
if result[i] != testCase.expected[i] {
t.Fatalf("unexpected request at index %d: got %#v, expected %#v", i, result[i], testCase.expected[i])
}
}
})
}
}

func TestShouldTriggerReconcile(t *testing.T) {
t.Parallel()

t.Run("returns true for disruption objects", func(t *testing.T) {
t.Parallel()
if !shouldTriggerReconcile(&chaosv1beta1.Disruption{}) {
t.Fatal("expected disruption object to trigger reconcile")
}
})

t.Run("returns false for pod without disruption labels", func(t *testing.T) {
t.Parallel()
if shouldTriggerReconcile(&corev1.Pod{}) {
t.Fatal("expected pod without disruption labels to not trigger reconcile")
}
})

t.Run("returns true for pod with disruption labels", func(t *testing.T) {
t.Parallel()
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
chaostypes.DisruptionNameLabel: "disruption-a",
chaostypes.DisruptionNamespaceLabel: "ns-a",
},
},
}

if !shouldTriggerReconcile(pod) {
t.Fatal("expected pod with disruption labels to trigger reconcile")
}
})
}