diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 1bd011d142..4cb2f931d5 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "os" + "strconv" "strings" "time" @@ -410,6 +411,46 @@ func createTriggers(ctx context.Context, f fn.Function, client clientservingv1.K return err } } + + // Clean up stale triggers that are no longer in the desired subscription set. + triggerPrefix := fmt.Sprintf("%s-function-trigger-", ksvc.GetName()) + existingTriggers, err := eventingClient.ListTriggers(ctx) + if err != nil { + if errors.IsNotFound(err) || strings.HasPrefix(err.Error(), "no or newer Knative Eventing API found on the backend") { + return nil + } + return fmt.Errorf("knative deployer failed to list triggers for cleanup: %v", err) + } + triggerNames := make([]string, len(existingTriggers.Items)) + for i, t := range existingTriggers.Items { + triggerNames[i] = t.Name + } + return deleteStaleTriggers(triggerNames, triggerPrefix, len(f.Deploy.Subscriptions), func(name string) error { + return eventingClient.DeleteTrigger(ctx, name) + }) +} + +// deleteStaleTriggers removes triggers whose numeric index suffix is >= desiredCount. +// The knative deployer names triggers as . If the subscription count +// decreases, triggers with higher indices become orphaned and must be cleaned up. +func deleteStaleTriggers(triggerNames []string, triggerPrefix string, desiredCount int, deleteFn func(name string) error) error { + for _, name := range triggerNames { + if !strings.HasPrefix(name, triggerPrefix) { + continue + } + idxStr := strings.TrimPrefix(name, triggerPrefix) + idx, parseErr := strconv.Atoi(idxStr) + if parseErr != nil { + continue + } + if idx >= desiredCount { + fmt.Fprintf(os.Stderr, "Deleting stale trigger: %s\n", name) + delErr := deleteFn(name) + if delErr != nil && !errors.IsNotFound(delErr) { + return fmt.Errorf("knative deployer failed to delete stale trigger %s: %v", name, delErr) + } + } + } return nil } diff --git a/pkg/knative/deployer_test.go b/pkg/knative/deployer_test.go index e024ffec6e..7966142fdb 100644 --- a/pkg/knative/deployer_test.go +++ b/pkg/knative/deployer_test.go @@ -461,3 +461,87 @@ func assertAuth(uname, pwd string, w http.ResponseWriter, r *http.Request) bool _, _ = fmt.Fprintln(w, "Unauthorised.") return false } + +// TestDeleteStaleTriggers_NoStale ensures no triggers are deleted when all are within range. +func TestDeleteStaleTriggers_NoStale(t *testing.T) { + triggers := []string{"my-func-function-trigger-0", "my-func-function-trigger-1"} + prefix := "my-func-function-trigger-" + var deleted []string + deleteFn := func(name string) error { + deleted = append(deleted, name) + return nil + } + err := deleteStaleTriggers(triggers, prefix, 2, deleteFn) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deleted) != 0 { + t.Fatalf("expected no deletions, got %v", deleted) + } +} + +// TestDeleteStaleTriggers_DeletesStale ensures triggers beyond desired count are deleted. +func TestDeleteStaleTriggers_DeletesStale(t *testing.T) { + triggers := []string{ + "my-func-function-trigger-0", + "my-func-function-trigger-1", + "my-func-function-trigger-2", + "my-func-function-trigger-3", + } + prefix := "my-func-function-trigger-" + var deleted []string + deleteFn := func(name string) error { + deleted = append(deleted, name) + return nil + } + // desiredCount=2 means indices 2 and 3 are stale + err := deleteStaleTriggers(triggers, prefix, 2, deleteFn) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deleted) != 2 { + t.Fatalf("expected 2 deletions, got %v", deleted) + } + if deleted[0] != "my-func-function-trigger-2" || deleted[1] != "my-func-function-trigger-3" { + t.Fatalf("unexpected deleted triggers: %v", deleted) + } +} + +// TestDeleteStaleTriggers_SkipsNonMatching ensures triggers that don't match the prefix or +// have unparseable suffixes are skipped. +func TestDeleteStaleTriggers_SkipsNonMatching(t *testing.T) { + triggers := []string{ + "other-trigger-0", // wrong prefix + "my-func-function-trigger-abc", // unparseable index + "my-func-function-trigger-2", // stale, should be deleted + } + prefix := "my-func-function-trigger-" + var deleted []string + deleteFn := func(name string) error { + deleted = append(deleted, name) + return nil + } + err := deleteStaleTriggers(triggers, prefix, 1, deleteFn) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(deleted) != 1 || deleted[0] != "my-func-function-trigger-2" { + t.Fatalf("expected only trigger-2 deleted, got %v", deleted) + } +} + +// TestDeleteStaleTriggers_DeleteError ensures errors from the delete callback are returned. +func TestDeleteStaleTriggers_DeleteError(t *testing.T) { + triggers := []string{"my-func-function-trigger-5"} + prefix := "my-func-function-trigger-" + deleteFn := func(name string) error { + return fmt.Errorf("connection refused") + } + err := deleteStaleTriggers(triggers, prefix, 1, deleteFn) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "connection refused") { + t.Fatalf("expected 'connection refused' in error, got: %v", err) + } +}