Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/controller/alert_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"

corev1 "k8s.io/api/core/v1"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -37,7 +37,7 @@ import (
// AlertReconciler reconciles an Alert object to migrate it to static Alert.
type AlertReconciler struct {
client.Client
kuberecorder.EventRecorder
events.EventRecorder

ControllerName string
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func (r *AlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
controllerutil.RemoveFinalizer(obj, apiv1.NotificationFinalizer)

log.Info("removed finalizer from Alert to migrate to static Alert")
r.Event(obj, corev1.EventTypeNormal, "Migration", "removed finalizer from Alert to migrate to static Alert")
r.Eventf(obj, nil, corev1.EventTypeNormal, "Migration", "", "removed finalizer from Alert to migrate to static Alert")

return
}
4 changes: 2 additions & 2 deletions internal/controller/provider_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package controller
import (
"context"

kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,7 +42,7 @@ import (
// Provider.
type ProviderReconciler struct {
client.Client
kuberecorder.EventRecorder
events.EventRecorder

TokenCache *cache.TokenCache
}
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/receiver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -50,7 +50,7 @@ import (
type ReceiverReconciler struct {
client.Client
helper.Metrics
kuberecorder.EventRecorder
events.EventRecorder

ControllerName string
}
Expand Down Expand Up @@ -166,14 +166,14 @@ func (r *ReceiverReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r

// Emit warning event if the reconciliation failed.
if retErr != nil {
r.Event(obj, corev1.EventTypeWarning, meta.FailedReason, retErr.Error())
r.Eventf(obj, nil, corev1.EventTypeWarning, meta.FailedReason, "", retErr.Error())
}

// Log and emit success event.
if retErr == nil && conditions.IsReady(obj) {
msg := fmt.Sprintf("Reconciliation finished, next run in %s", obj.GetInterval().String())
log.Info(msg)
r.Event(obj, corev1.EventTypeNormal, meta.SucceededReason, msg)
r.Eventf(obj, nil, corev1.EventTypeNormal, meta.SucceededReason, "", msg)
}
}()

Expand Down Expand Up @@ -215,7 +215,7 @@ func (r *ReceiverReconciler) reconcile(ctx context.Context, obj *apiv1.Receiver)
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
obj.Status.ObservedGeneration = obj.Generation
log.Error(err, msg)
r.Event(obj, corev1.EventTypeWarning, meta.InvalidCELExpressionReason, errMsg)
r.Eventf(obj, nil, corev1.EventTypeWarning, meta.InvalidCELExpressionReason, "", errMsg)
return ctrl.Result{}, nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/receiver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestReceiverReconciler_deleteBeforeFinalizer(t *testing.T) {

r := &ReceiverReconciler{
Client: k8sClient,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: events.NewFakeRecorder(32),
}
// NOTE: Only a real API server responds with an error in this scenario.
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(receiver)})
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func TestMain(m *testing.M) {
if err := (&AlertReconciler{
Client: testEnv,
ControllerName: controllerName,
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
EventRecorder: testEnv.GetEventRecorder(controllerName),
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start AlertReconciler: %v", err))
}

if err := (&ProviderReconciler{
Client: testEnv,
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
EventRecorder: testEnv.GetEventRecorder(controllerName),
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start ProviderReconciler: %v", err))
}
Expand All @@ -93,7 +93,7 @@ func TestMain(m *testing.M) {
Client: testEnv,
Metrics: testMetricsH,
ControllerName: controllerName,
EventRecorder: testEnv.GetEventRecorderFor(controllerName),
EventRecorder: testEnv.GetEventRecorder(controllerName),
}).SetupWithManager(testEnv, ReceiverReconcilerOptions{
RateLimiter: controller.GetDefaultRateLimiter(),
WatchConfigsPredicate: predicate.Not(predicate.Funcs{}),
Expand Down
16 changes: 8 additions & 8 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
dropped, err := s.dispatchNotification(ctx, event, alert)
if err != nil {
alertLogger.Error(err, "failed to dispatch notification")
s.Eventf(alert, corev1.EventTypeWarning, "NotificationDispatchFailed",
s.Eventf(alert, nil, corev1.EventTypeWarning, "NotificationDispatchFailed", "",
"failed to dispatch notification for %s: %s", involvedObjectString(event.InvolvedObject), err)
continue
}
Expand Down Expand Up @@ -208,8 +208,8 @@ func (s *EventServer) messageIsIncluded(ctx context.Context, msg string, alert *
}
} else {
log.FromContext(ctx).Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", exp))
s.Eventf(alert, corev1.EventTypeWarning,
"InvalidConfig", "failed to compile inclusion regex: %s", exp)
s.Eventf(alert, nil, corev1.EventTypeWarning,
"InvalidConfig", "", "failed to compile inclusion regex: %s", exp)
}
}
return false
Expand All @@ -229,7 +229,7 @@ func (s *EventServer) messageIsExcluded(ctx context.Context, msg string, alert *
}
} else {
log.FromContext(ctx).Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exp))
s.Eventf(alert, corev1.EventTypeWarning, "InvalidConfig",
s.Eventf(alert, nil, corev1.EventTypeWarning, "InvalidConfig", "",
"failed to compile exclusion regex: %s", exp)
}
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (s *EventServer) dispatchNotification(ctx context.Context,
err = errors.New(maskedErrStr)
}
log.FromContext(ctx).Error(err, "failed to send notification")
s.Eventf(alert, corev1.EventTypeWarning, "NotificationDispatchFailed",
s.Eventf(alert, nil, corev1.EventTypeWarning, "NotificationDispatchFailed", "",
"failed to send notification for %s: %s", involvedObjectString(e.InvolvedObject), err)
}
}(params.sender, *params.event)
Expand Down Expand Up @@ -607,7 +607,7 @@ func (s *EventServer) eventMatchesAlertSource(ctx context.Context, event *eventv
Name: event.InvolvedObject.Name,
}, &obj); err != nil {
logger.Error(err, "error getting the involved object")
s.Eventf(alert, corev1.EventTypeWarning, "SourceFetchFailed",
s.Eventf(alert, nil, corev1.EventTypeWarning, "SourceFetchFailed", "",
"error getting source object %s", involvedObjectString(event.InvolvedObject))
return false
}
Expand All @@ -617,7 +617,7 @@ func (s *EventServer) eventMatchesAlertSource(ctx context.Context, event *eventv
})
if err != nil {
logger.Error(err, fmt.Sprintf("error using matchLabels from event source %s", crossNSObjectRefString(source)))
s.Eventf(alert, corev1.EventTypeWarning, "InvalidConfig",
s.Eventf(alert, nil, corev1.EventTypeWarning, "InvalidConfig", "",
"error using matchLabels from event source %s", crossNSObjectRefString(source))
return false
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func (s *EventServer) combineEventMetadata(ctx context.Context, event *eventv1.E
const msg = "metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)"
slices.SortFunc(conflictingKeys, func(a, b *keyConflict) int { return strings.Compare(a.Key, b.Key) })
l.Info("warning: "+msg, "conflictingKeys", conflictingKeys)
s.AnnotatedEventf(alert, conflictEventAnnotations, corev1.EventTypeWarning, "MetadataAppendFailed", "%s", msg)
s.Eventf(alert, nil, corev1.EventTypeWarning, "MetadataAppendFailed", "", "%s", msg)
}

if len(metadata) > 0 {
Expand Down
20 changes: 10 additions & 10 deletions internal/server/event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -291,7 +291,7 @@ func TestFilterAlertsForEvent(t *testing.T) {
eventServer := EventServer{
kubeClient: builder.Build(),
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: events.NewFakeRecorder(32),
}

result := eventServer.filterAlertsForEvent(context.TODO(), alerts, testEvent)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestDispatchNotification(t *testing.T) {
eventServer := EventServer{
kubeClient: builder.Build(),
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: events.NewFakeRecorder(32),
}

_, err := eventServer.dispatchNotification(context.TODO(), testEvent, alert)
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestGetNotificationParams(t *testing.T) {
kubeClient: builder.Build(),
logger: log.Log,
noCrossNamespaceRefs: tt.noCrossNSRefs,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: events.NewFakeRecorder(32),
}

params, dropped, err := eventServer.getNotificationParams(context.TODO(), event, alert)
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func TestEventMatchesAlert(t *testing.T) {
eventServer := EventServer{
kubeClient: builder.Build(),
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
EventRecorder: events.NewFakeRecorder(32),
}
alert := &apiv1beta3.Alert{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1415,7 +1415,7 @@ func TestCombineEventMetadata(t *testing.T) {
expectedMetadata: map[string]string{
"summary": "alertSummary",
},
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[summary:involved object annotations, Alert object .spec.summary]",
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)",
},
"alert event metadata is overriden by summary": {
event: eventv1.Event{},
Expand All @@ -1430,7 +1430,7 @@ func TestCombineEventMetadata(t *testing.T) {
expectedMetadata: map[string]string{
"summary": "alertSummary",
},
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[summary:Alert object .spec.eventMetadata, Alert object .spec.summary]",
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)",
},
"summary is overriden by controller metadata": {
event: eventv1.Event{
Expand All @@ -1446,7 +1446,7 @@ func TestCombineEventMetadata(t *testing.T) {
expectedMetadata: map[string]string{
"summary": "controllerSummary",
},
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[summary:Alert object .spec.summary, involved object controller metadata]",
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)",
},
"precedence order in RFC 0008 is honered": {
event: eventv1.Event{
Expand Down Expand Up @@ -1476,13 +1476,13 @@ func TestCombineEventMetadata(t *testing.T) {
"alertMetadataOverridenByController": "controllerMetadataValue2",
"controllerMetadata": "controllerMetadataValue3",
},
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information) map[alertMetadataOverridenByController:Alert object .spec.eventMetadata, involved object controller metadata objectMetadataOverridenByAlert:involved object annotations, Alert object .spec.eventMetadata objectMetadataOverridenByController:involved object annotations, involved object controller metadata]",
conflictEvent: "Warning MetadataAppendFailed metadata key conflicts detected (please refer to the Alert API docs and Flux RFC 0008 for more information)",
},
} {
t.Run(name, func(t *testing.T) {
g := NewGomegaWithT(t)

eventRecorder := record.NewFakeRecorder(1)
eventRecorder := events.NewFakeRecorder(1)
s := &EventServer{
logger: log.Log,
EventRecorder: eventRecorder,
Expand Down
6 changes: 3 additions & 3 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/sethvargo/go-limiter/httplimit"
"github.com/slok/go-http-metrics/middleware"
"github.com/slok/go-http-metrics/middleware/std"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -55,12 +55,12 @@ type EventServer struct {
noCrossNamespaceRefs bool
exportHTTPPathMetrics bool
tokenCache *cache.TokenCache
kuberecorder.EventRecorder
events.EventRecorder
}

// NewEventServer returns an HTTP server that handles events
func NewEventServer(port string, logger logr.Logger, kubeClient client.Client,
eventRecorder kuberecorder.EventRecorder, noCrossNamespaceRefs bool,
eventRecorder events.EventRecorder, noCrossNamespaceRefs bool,
exportHTTPPathMetrics bool, tokenCache *cache.TokenCache) *EventServer {
return &EventServer{
port: port,
Expand Down
4 changes: 2 additions & 2 deletions internal/server/event_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
log "sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -137,7 +137,7 @@ func TestEventServer(t *testing.T) {
t.Fatalf("failed to create memory storage")
}
eventServer := NewEventServer("127.0.0.1:"+eventServerPort,
log.Log, kclient, record.NewFakeRecorder(32), true, true, nil)
log.Log, kclient, events.NewFakeRecorder(32), true, true, nil)
stopCh := make(chan struct{})
go eventServer.ListenAndServe(stopCh, eventMdlw, store)
defer close(stopCh)
Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func main() {

if err = (&controller.ProviderReconciler{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(controllerName),
EventRecorder: mgr.GetEventRecorder(controllerName),
TokenCache: tokenCache,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Provider")
Expand All @@ -245,7 +245,7 @@ func main() {
if err = (&controller.AlertReconciler{
Client: mgr.GetClient(),
ControllerName: controllerName,
EventRecorder: mgr.GetEventRecorderFor(controllerName),
EventRecorder: mgr.GetEventRecorder(controllerName),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Alert")
os.Exit(1)
Expand All @@ -255,7 +255,7 @@ func main() {
Client: mgr.GetClient(),
ControllerName: controllerName,
Metrics: metricsH,
EventRecorder: mgr.GetEventRecorderFor(controllerName),
EventRecorder: mgr.GetEventRecorder(controllerName),
}).SetupWithManager(mgr, controller.ReceiverReconcilerOptions{
RateLimiter: runtimeCtrl.GetRateLimiter(rateLimiterOptions),
WatchConfigs: watchConfigs,
Expand All @@ -282,7 +282,7 @@ func main() {
Registry: ctrlmetrics.Registry,
}),
})
eventServer := server.NewEventServer(eventsAddr, ctrl.Log, mgr.GetClient(), mgr.GetEventRecorderFor(controllerName), aclOptions.NoCrossNamespaceRefs, exportHTTPPathMetrics, tokenCache)
eventServer := server.NewEventServer(eventsAddr, ctrl.Log, mgr.GetClient(), mgr.GetEventRecorder(controllerName), aclOptions.NoCrossNamespaceRefs, exportHTTPPathMetrics, tokenCache)
go eventServer.ListenAndServe(ctx.Done(), eventMdlw, store)

setupLog.Info("starting webhook receiver server", "addr", receiverAddr)
Expand Down