Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/knative/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -410,6 +411,46 @@ func createTriggers(ctx context.Context, f fn.Function, client clientservingv1.K
return err
}
}

// Clean up stale triggers that are no longer in the desired subscription set.
triggerPrefix := fmt.Sprintf("%s-function-trigger-", ksvc.GetName())
existingTriggers, err := eventingClient.ListTriggers(ctx)
if err != nil {
if errors.IsNotFound(err) || strings.HasPrefix(err.Error(), "no or newer Knative Eventing API found on the backend") {
return nil
}
return fmt.Errorf("knative deployer failed to list triggers for cleanup: %v", err)
}
triggerNames := make([]string, len(existingTriggers.Items))
for i, t := range existingTriggers.Items {
triggerNames[i] = t.Name
}
return deleteStaleTriggers(triggerNames, triggerPrefix, len(f.Deploy.Subscriptions), func(name string) error {
return eventingClient.DeleteTrigger(ctx, name)
})
}

// deleteStaleTriggers removes triggers whose numeric index suffix is >= desiredCount.
// The knative deployer names triggers as <prefix><index>. If the subscription count
// decreases, triggers with higher indices become orphaned and must be cleaned up.
func deleteStaleTriggers(triggerNames []string, triggerPrefix string, desiredCount int, deleteFn func(name string) error) error {
for _, name := range triggerNames {
if !strings.HasPrefix(name, triggerPrefix) {
continue
}
idxStr := strings.TrimPrefix(name, triggerPrefix)
idx, parseErr := strconv.Atoi(idxStr)
if parseErr != nil {
continue
}
if idx >= desiredCount {
fmt.Fprintf(os.Stderr, "Deleting stale trigger: %s\n", name)
delErr := deleteFn(name)
if delErr != nil && !errors.IsNotFound(delErr) {
return fmt.Errorf("knative deployer failed to delete stale trigger %s: %v", name, delErr)
}
}
}
return nil
}

Expand Down
84 changes: 84 additions & 0 deletions pkg/knative/deployer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,87 @@ func assertAuth(uname, pwd string, w http.ResponseWriter, r *http.Request) bool
_, _ = fmt.Fprintln(w, "Unauthorised.")
return false
}

// TestDeleteStaleTriggers_NoStale ensures no triggers are deleted when all are within range.
func TestDeleteStaleTriggers_NoStale(t *testing.T) {
triggers := []string{"my-func-function-trigger-0", "my-func-function-trigger-1"}
prefix := "my-func-function-trigger-"
var deleted []string
deleteFn := func(name string) error {
deleted = append(deleted, name)
return nil
}
err := deleteStaleTriggers(triggers, prefix, 2, deleteFn)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(deleted) != 0 {
t.Fatalf("expected no deletions, got %v", deleted)
}
}

// TestDeleteStaleTriggers_DeletesStale ensures triggers beyond desired count are deleted.
func TestDeleteStaleTriggers_DeletesStale(t *testing.T) {
triggers := []string{
"my-func-function-trigger-0",
"my-func-function-trigger-1",
"my-func-function-trigger-2",
"my-func-function-trigger-3",
}
prefix := "my-func-function-trigger-"
var deleted []string
deleteFn := func(name string) error {
deleted = append(deleted, name)
return nil
}
// desiredCount=2 means indices 2 and 3 are stale
err := deleteStaleTriggers(triggers, prefix, 2, deleteFn)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(deleted) != 2 {
t.Fatalf("expected 2 deletions, got %v", deleted)
}
if deleted[0] != "my-func-function-trigger-2" || deleted[1] != "my-func-function-trigger-3" {
t.Fatalf("unexpected deleted triggers: %v", deleted)
}
}

// TestDeleteStaleTriggers_SkipsNonMatching ensures triggers that don't match the prefix or
// have unparseable suffixes are skipped.
func TestDeleteStaleTriggers_SkipsNonMatching(t *testing.T) {
triggers := []string{
"other-trigger-0", // wrong prefix
"my-func-function-trigger-abc", // unparseable index
"my-func-function-trigger-2", // stale, should be deleted
}
prefix := "my-func-function-trigger-"
var deleted []string
deleteFn := func(name string) error {
deleted = append(deleted, name)
return nil
}
err := deleteStaleTriggers(triggers, prefix, 1, deleteFn)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(deleted) != 1 || deleted[0] != "my-func-function-trigger-2" {
t.Fatalf("expected only trigger-2 deleted, got %v", deleted)
}
}

// TestDeleteStaleTriggers_DeleteError ensures errors from the delete callback are returned.
func TestDeleteStaleTriggers_DeleteError(t *testing.T) {
triggers := []string{"my-func-function-trigger-5"}
prefix := "my-func-function-trigger-"
deleteFn := func(name string) error {
return fmt.Errorf("connection refused")
}
err := deleteStaleTriggers(triggers, prefix, 1, deleteFn)
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "connection refused") {
t.Fatalf("expected 'connection refused' in error, got: %v", err)
}
}
Loading