From 5964f7bca0089fe81ca1c550052f2df09b24f251 Mon Sep 17 00:00:00 2001 From: Adrian Fernandez de la Torre Date: Sat, 7 Mar 2026 13:11:35 +0100 Subject: [PATCH] Migrate to new Kubernetes events API Replace deprecated record.EventRecorder with events.EventRecorder from k8s.io/client-go/tools/events. Update all controllers, event server, and tests to use the new API signature. Signed-off-by: Adrian Fernandez de la Torre --- internal/controller/alert_controller.go | 6 +++--- internal/controller/provider_controller.go | 4 ++-- internal/controller/receiver_controller.go | 10 +++++----- .../controller/receiver_controller_test.go | 4 ++-- internal/controller/suite_test.go | 6 +++--- internal/server/event_handlers.go | 16 +++++++-------- internal/server/event_handlers_test.go | 20 +++++++++---------- internal/server/event_server.go | 6 +++--- internal/server/event_server_test.go | 4 ++-- main.go | 8 ++++---- 10 files changed, 42 insertions(+), 42 deletions(-) diff --git a/internal/controller/alert_controller.go b/internal/controller/alert_controller.go index 360dd8923..be70a4a52 100644 --- a/internal/controller/alert_controller.go +++ b/internal/controller/alert_controller.go @@ -20,7 +20,7 @@ import ( "context" corev1 "k8s.io/api/core/v1" - kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -37,7 +37,7 @@ import ( // AlertReconciler reconciles an Alert object to migrate it to static Alert. type AlertReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder ControllerName string } @@ -88,7 +88,7 @@ func (r *AlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu controllerutil.RemoveFinalizer(obj, apiv1.NotificationFinalizer) log.Info("removed finalizer from Alert to migrate to static Alert") - r.Event(obj, corev1.EventTypeNormal, "Migration", "removed finalizer from Alert to migrate to static Alert") + r.Eventf(obj, nil, corev1.EventTypeNormal, "Migration", "", "removed finalizer from Alert to migrate to static Alert") return } diff --git a/internal/controller/provider_controller.go b/internal/controller/provider_controller.go index 31a6b1682..46b48acca 100644 --- a/internal/controller/provider_controller.go +++ b/internal/controller/provider_controller.go @@ -19,7 +19,7 @@ package controller import ( "context" - kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -42,7 +42,7 @@ import ( // Provider. type ProviderReconciler struct { client.Client - kuberecorder.EventRecorder + events.EventRecorder TokenCache *cache.TokenCache } diff --git a/internal/controller/receiver_controller.go b/internal/controller/receiver_controller.go index 67a410f27..4d4133a88 100644 --- a/internal/controller/receiver_controller.go +++ b/internal/controller/receiver_controller.go @@ -25,7 +25,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" - kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -50,7 +50,7 @@ import ( type ReceiverReconciler struct { client.Client helper.Metrics - kuberecorder.EventRecorder + events.EventRecorder ControllerName string } @@ -166,14 +166,14 @@ func (r *ReceiverReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r // Emit warning event if the reconciliation failed. if retErr != nil { - r.Event(obj, corev1.EventTypeWarning, meta.FailedReason, retErr.Error()) + r.Eventf(obj, nil, corev1.EventTypeWarning, meta.FailedReason, "", retErr.Error()) } // Log and emit success event. if retErr == nil && conditions.IsReady(obj) { msg := fmt.Sprintf("Reconciliation finished, next run in %s", obj.GetInterval().String()) log.Info(msg) - r.Event(obj, corev1.EventTypeNormal, meta.SucceededReason, msg) + r.Eventf(obj, nil, corev1.EventTypeNormal, meta.SucceededReason, "", msg) } }() @@ -215,7 +215,7 @@ func (r *ReceiverReconciler) reconcile(ctx context.Context, obj *apiv1.Receiver) conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg) obj.Status.ObservedGeneration = obj.Generation log.Error(err, msg) - r.Event(obj, corev1.EventTypeWarning, meta.InvalidCELExpressionReason, errMsg) + r.Eventf(obj, nil, corev1.EventTypeWarning, meta.InvalidCELExpressionReason, "", errMsg) return ctrl.Result{}, nil } } diff --git a/internal/controller/receiver_controller_test.go b/internal/controller/receiver_controller_test.go index 8d12e8bde..a1faee06a 100644 --- a/internal/controller/receiver_controller_test.go +++ b/internal/controller/receiver_controller_test.go @@ -31,7 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -75,7 +75,7 @@ func TestReceiverReconciler_deleteBeforeFinalizer(t *testing.T) { r := &ReceiverReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32), } // NOTE: Only a real API server responds with an error in this scenario. _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(receiver)}) diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 45df74d07..f70d02eb8 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -77,14 +77,14 @@ func TestMain(m *testing.M) { if err := (&AlertReconciler{ Client: testEnv, ControllerName: controllerName, - EventRecorder: testEnv.GetEventRecorderFor(controllerName), + EventRecorder: testEnv.GetEventRecorder(controllerName), }).SetupWithManager(testEnv); err != nil { panic(fmt.Sprintf("Failed to start AlertReconciler: %v", err)) } if err := (&ProviderReconciler{ Client: testEnv, - EventRecorder: testEnv.GetEventRecorderFor(controllerName), + EventRecorder: testEnv.GetEventRecorder(controllerName), }).SetupWithManager(testEnv); err != nil { panic(fmt.Sprintf("Failed to start ProviderReconciler: %v", err)) } @@ -93,7 +93,7 @@ func TestMain(m *testing.M) { Client: testEnv, Metrics: testMetricsH, ControllerName: controllerName, - EventRecorder: testEnv.GetEventRecorderFor(controllerName), + EventRecorder: testEnv.GetEventRecorder(controllerName), }).SetupWithManager(testEnv, ReceiverReconcilerOptions{ RateLimiter: controller.GetDefaultRateLimiter(), WatchConfigsPredicate: predicate.Not(predicate.Funcs{}), diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index 4738db29a..c26df5e97 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -94,7 +94,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) dropped, err := s.dispatchNotification(ctx, event, alert) if err != nil { alertLogger.Error(err, "failed to dispatch notification") - s.Eventf(alert, corev1.EventTypeWarning, "NotificationDispatchFailed", + s.Eventf(alert, nil, corev1.EventTypeWarning, "NotificationDispatchFailed", "", "failed to dispatch notification for %s: %s", involvedObjectString(event.InvolvedObject), err) continue } @@ -208,8 +208,8 @@ func (s *EventServer) messageIsIncluded(ctx context.Context, msg string, alert * } } else { log.FromContext(ctx).Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", exp)) - s.Eventf(alert, corev1.EventTypeWarning, - "InvalidConfig", "failed to compile inclusion regex: %s", exp) + s.Eventf(alert, nil, corev1.EventTypeWarning, + "InvalidConfig", "", "failed to compile inclusion regex: %s", exp) } } return false @@ -229,7 +229,7 @@ func (s *EventServer) messageIsExcluded(ctx context.Context, msg string, alert * } } else { log.FromContext(ctx).Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exp)) - s.Eventf(alert, corev1.EventTypeWarning, "InvalidConfig", + s.Eventf(alert, nil, corev1.EventTypeWarning, "InvalidConfig", "", "failed to compile exclusion regex: %s", exp) } } @@ -263,7 +263,7 @@ func (s *EventServer) dispatchNotification(ctx context.Context, err = errors.New(maskedErrStr) } log.FromContext(ctx).Error(err, "failed to send notification") - s.Eventf(alert, corev1.EventTypeWarning, "NotificationDispatchFailed", + s.Eventf(alert, nil, corev1.EventTypeWarning, "NotificationDispatchFailed", "", "failed to send notification for %s: %s", involvedObjectString(e.InvolvedObject), err) } }(params.sender, *params.event) @@ -607,7 +607,7 @@ func (s *EventServer) eventMatchesAlertSource(ctx context.Context, event *eventv Name: event.InvolvedObject.Name, }, &obj); err != nil { logger.Error(err, "error getting the involved object") - s.Eventf(alert, corev1.EventTypeWarning, "SourceFetchFailed", + s.Eventf(alert, nil, corev1.EventTypeWarning, "SourceFetchFailed", "", "error getting source object %s", involvedObjectString(event.InvolvedObject)) return false } @@ -617,7 +617,7 @@ func (s *EventServer) eventMatchesAlertSource(ctx context.Context, event *eventv }) if err != nil { logger.Error(err, fmt.Sprintf("error using matchLabels from event source %s", crossNSObjectRefString(source))) - s.Eventf(alert, corev1.EventTypeWarning, "InvalidConfig", + s.Eventf(alert, nil, corev1.EventTypeWarning, "InvalidConfig", "", "error using matchLabels from event source %s", crossNSObjectRefString(source)) return false } @@ -704,7 +704,7 @@ func (s *EventServer) combineEventMetadata(ctx context.Context, event *eventv1.E const msg = "metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)" slices.SortFunc(conflictingKeys, func(a, b *keyConflict) int { return strings.Compare(a.Key, b.Key) }) l.Info("warning: "+msg, "conflictingKeys", conflictingKeys) - s.AnnotatedEventf(alert, conflictEventAnnotations, corev1.EventTypeWarning, "MetadataAppendFailed", "%s", msg) + s.Eventf(alert, nil, corev1.EventTypeWarning, "MetadataAppendFailed", "", "%s", msg) } if len(metadata) > 0 { diff --git a/internal/server/event_handlers_test.go b/internal/server/event_handlers_test.go index 1ee207b93..94f53079c 100644 --- a/internal/server/event_handlers_test.go +++ b/internal/server/event_handlers_test.go @@ -38,7 +38,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log" @@ -291,7 +291,7 @@ func TestFilterAlertsForEvent(t *testing.T) { eventServer := EventServer{ kubeClient: builder.Build(), logger: log.Log, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32), } result := eventServer.filterAlertsForEvent(context.TODO(), alerts, testEvent) @@ -375,7 +375,7 @@ func TestDispatchNotification(t *testing.T) { eventServer := EventServer{ kubeClient: builder.Build(), logger: log.Log, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32), } _, err := eventServer.dispatchNotification(context.TODO(), testEvent, alert) @@ -557,7 +557,7 @@ func TestGetNotificationParams(t *testing.T) { kubeClient: builder.Build(), logger: log.Log, noCrossNamespaceRefs: tt.noCrossNSRefs, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32), } params, dropped, err := eventServer.getNotificationParams(context.TODO(), event, alert) @@ -1343,7 +1343,7 @@ func TestEventMatchesAlert(t *testing.T) { eventServer := EventServer{ kubeClient: builder.Build(), logger: log.Log, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: events.NewFakeRecorder(32), } alert := &apiv1beta3.Alert{ ObjectMeta: metav1.ObjectMeta{ @@ -1415,7 +1415,7 @@ func TestCombineEventMetadata(t *testing.T) { expectedMetadata: map[string]string{ "summary": "alertSummary", }, - conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[summary:involved object annotations, Alert object .spec.summary]", + conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)", }, "alert event metadata is overriden by summary": { event: eventv1.Event{}, @@ -1430,7 +1430,7 @@ func TestCombineEventMetadata(t *testing.T) { expectedMetadata: map[string]string{ "summary": "alertSummary", }, - conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[summary:Alert object .spec.eventMetadata, Alert object .spec.summary]", + conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)", }, "summary is overriden by controller metadata": { event: eventv1.Event{ @@ -1446,7 +1446,7 @@ func TestCombineEventMetadata(t *testing.T) { expectedMetadata: map[string]string{ "summary": "controllerSummary", }, - conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[summary:Alert object .spec.summary, involved object controller metadata]", + conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)", }, "precedence order in RFC 0008 is honered": { event: eventv1.Event{ @@ -1476,13 +1476,13 @@ func TestCombineEventMetadata(t *testing.T) { "alertMetadataOverridenByController": "controllerMetadataValue2", "controllerMetadata": "controllerMetadataValue3", }, - conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[alertMetadataOverridenByController:Alert object .spec.eventMetadata, involved object controller metadata objectMetadataOverridenByAlert:involved object annotations, Alert object .spec.eventMetadata objectMetadataOverridenByController:involved object annotations, involved object controller metadata]", + conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)", }, } { t.Run(name, func(t *testing.T) { g := NewGomegaWithT(t) - eventRecorder := record.NewFakeRecorder(1) + eventRecorder := events.NewFakeRecorder(1) s := &EventServer{ logger: log.Log, EventRecorder: eventRecorder, diff --git a/internal/server/event_server.go b/internal/server/event_server.go index 18a02b3b7..fc0d986fc 100644 --- a/internal/server/event_server.go +++ b/internal/server/event_server.go @@ -33,7 +33,7 @@ import ( "github.com/sethvargo/go-limiter/httplimit" "github.com/slok/go-http-metrics/middleware" "github.com/slok/go-http-metrics/middleware/std" - kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -55,12 +55,12 @@ type EventServer struct { noCrossNamespaceRefs bool exportHTTPPathMetrics bool tokenCache *cache.TokenCache - kuberecorder.EventRecorder + events.EventRecorder } // NewEventServer returns an HTTP server that handles events func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, - eventRecorder kuberecorder.EventRecorder, noCrossNamespaceRefs bool, + eventRecorder events.EventRecorder, noCrossNamespaceRefs bool, exportHTTPPathMetrics bool, tokenCache *cache.TokenCache) *EventServer { return &EventServer{ port: port, diff --git a/internal/server/event_server_test.go b/internal/server/event_server_test.go index 25d1e060c..22ee09224 100644 --- a/internal/server/event_server_test.go +++ b/internal/server/event_server_test.go @@ -40,7 +40,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" yamlutil "k8s.io/apimachinery/pkg/util/yaml" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" log "sigs.k8s.io/controller-runtime/pkg/log" @@ -137,7 +137,7 @@ func TestEventServer(t *testing.T) { t.Fatalf("failed to create memory storage") } eventServer := NewEventServer("127.0.0.1:"+eventServerPort, - log.Log, kclient, record.NewFakeRecorder(32), true, true, nil) + log.Log, kclient, events.NewFakeRecorder(32), true, true, nil) stopCh := make(chan struct{}) go eventServer.ListenAndServe(stopCh, eventMdlw, store) defer close(stopCh) diff --git a/main.go b/main.go index a9a6e3176..2be9573cb 100644 --- a/main.go +++ b/main.go @@ -235,7 +235,7 @@ func main() { if err = (&controller.ProviderReconciler{ Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(controllerName), + EventRecorder: mgr.GetEventRecorder(controllerName), TokenCache: tokenCache, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Provider") @@ -245,7 +245,7 @@ func main() { if err = (&controller.AlertReconciler{ Client: mgr.GetClient(), ControllerName: controllerName, - EventRecorder: mgr.GetEventRecorderFor(controllerName), + EventRecorder: mgr.GetEventRecorder(controllerName), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Alert") os.Exit(1) @@ -255,7 +255,7 @@ func main() { Client: mgr.GetClient(), ControllerName: controllerName, Metrics: metricsH, - EventRecorder: mgr.GetEventRecorderFor(controllerName), + EventRecorder: mgr.GetEventRecorder(controllerName), }).SetupWithManager(mgr, controller.ReceiverReconcilerOptions{ RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions), WatchConfigs: watchConfigs, @@ -282,7 +282,7 @@ func main() { Registry: ctrlmetrics.Registry, }), }) - eventServer := server.NewEventServer(eventsAddr, ctrl.Log, mgr.GetClient(), mgr.GetEventRecorderFor(controllerName), aclOptions.NoCrossNamespaceRefs, exportHTTPPathMetrics, tokenCache) + eventServer := server.NewEventServer(eventsAddr, ctrl.Log, mgr.GetClient(), mgr.GetEventRecorder(controllerName), aclOptions.NoCrossNamespaceRefs, exportHTTPPathMetrics, tokenCache) go eventServer.ListenAndServe(ctx.Done(), eventMdlw, store) setupLog.Info("starting webhook receiver server", "addr", receiverAddr)