From ee1737b5e2a7d70b3844bc89b0da5748d0359154 Mon Sep 17 00:00:00 2001 From: "elvandlie@gmail.com" Date: Thu, 21 May 2026 02:55:14 +0700 Subject: [PATCH] fix(knative): fix goroutine leak in Deploy The Deploy function (new-service creation path) starts a background goroutine that polls isImageInPrivateRegistry every 5 seconds and sends results on an unbuffered chprivate channel. If WaitForService completes first (via cherr), the main function returns and the poller goroutine blocks forever on the channel send with no receiver, leaking the goroutine and captured context. Fixed by adding a done channel that the main select loop closes when it exits. The poller goroutine now selects on done alongside its timer and channel send, so it always exits promptly. Fixes #3797 --- pkg/knative/deployer.go | 105 ++++++++++++++++++++++++----------- pkg/knative/deployer_test.go | 62 +++++++++++++++++++++ 2 files changed, 135 insertions(+), 32 deletions(-) diff --git a/pkg/knative/deployer.go b/pkg/knative/deployer.go index 1bd011d142..6ea894790d 100644 --- a/pkg/knative/deployer.go +++ b/pkg/knative/deployer.go @@ -45,6 +45,8 @@ const ( KnativeDeployerName = "knative" ) +var errPrivateRegistry = stdErrors.New("your function image is unreachable. It is possible that your docker registry is private. If so, make sure you have set up pull secrets https://knative.dev/docs/developer/serving/deploying-from-private-registry") + type DeployerOpt func(*Deployer) type Deployer struct { @@ -232,44 +234,21 @@ consider using the --image-pull-secret flag, or setting up pull secrets manually if d.verbose { fmt.Println("Waiting for Knative Service to become ready") } - chprivate := make(chan bool) - cherr := make(chan error) - go func() { - private := false - for !private { - time.Sleep(5 * time.Second) - private = d.isImageInPrivateRegistry(ctx, client, f) - chprivate <- private - } - close(chprivate) - }() - go func() { + waitForService := func() error { err, _ := client.WaitForService(ctx, f.Name, clientservingv1.WaitConfig{Timeout: k8s.DefaultWaitingTimeout, ErrorWindow: k8s.DefaultErrorWindowTimeout}, wait.NoopMessageCallback()) - cherr <- err - close(cherr) - }() - - presumePrivate := false - main: - // Wait for either a timeout or a container condition signaling the image is unreachable - for { - select { - case private := <-chprivate: - if private { - presumePrivate = true - break main - } - case err = <-cherr: - break main - } + return err } - if presumePrivate { - err := fmt.Errorf("your function image is unreachable. It is possible that your docker registry is private. If so, make sure you have set up pull secrets https://knative.dev/docs/developer/serving/deploying-from-private-registry") - return fn.DeploymentResult{}, err + isPrivateRegistry := func() bool { + return d.isImageInPrivateRegistry(ctx, client, f) } + + err = waitForReadyOrPrivateRegistry(waitForService, isPrivateRegistry, 5*time.Second) if err != nil { + if stdErrors.Is(err, errPrivateRegistry) { + return fn.DeploymentResult{}, err + } err = fmt.Errorf("knative deployer failed to wait for the Knative Service to become ready: %v", err) if !d.verbose { fmt.Fprintln(os.Stderr, "\nService output:") @@ -857,3 +836,65 @@ func wrapK8sConnectionError(err error) error { return nil } + +// waitForReadyOrPrivateRegistry handles polling for a private registry image pull error +// or successfully waiting for the service to become ready. +func waitForReadyOrPrivateRegistry( + waitForService func() error, + isPrivateRegistry func() bool, + pollInterval time.Duration, +) error { + chprivate := make(chan bool) + cherr := make(chan error, 1) + done := make(chan struct{}) + + go func() { + defer close(chprivate) + for { + select { + case <-time.After(pollInterval): + case <-done: + return + } + private := isPrivateRegistry() + select { + case chprivate <- private: + if private { + return + } + case <-done: + return + } + } + }() + + go func() { + cherr <- waitForService() + close(cherr) + }() + + presumePrivate := false +main: + for { + select { + case private := <-chprivate: + if private { + presumePrivate = true + break main + } + case err := <-cherr: + if err != nil { + close(done) + return err + } + break main + } + } + close(done) + + if presumePrivate { + return errPrivateRegistry + } + + return nil +} diff --git a/pkg/knative/deployer_test.go b/pkg/knative/deployer_test.go index e024ffec6e..64367ff863 100644 --- a/pkg/knative/deployer_test.go +++ b/pkg/knative/deployer_test.go @@ -13,6 +13,7 @@ import ( "strings" "sync/atomic" "testing" + "time" "github.com/docker/cli/cli/config/configfile" "github.com/docker/cli/cli/config/types" @@ -461,3 +462,64 @@ func assertAuth(uname, pwd string, w http.ResponseWriter, r *http.Request) bool _, _ = fmt.Fprintln(w, "Unauthorised.") return false } + +func TestWaitForReadyOrPrivateRegistry_Success(t *testing.T) { + waitForService := func() error { + return nil // returns immediately with success + } + isPrivateRegistry := func() bool { + return false + } + err := waitForReadyOrPrivateRegistry(waitForService, isPrivateRegistry, 10*time.Millisecond) + if err != nil { + t.Fatalf("expected nil error, got %v", err) + } +} + +func TestWaitForReadyOrPrivateRegistry_WaitError(t *testing.T) { + expectedErr := errors.New("timeout waiting for service") + waitForService := func() error { + return expectedErr // returns immediately with error + } + isPrivateRegistry := func() bool { + return false + } + err := waitForReadyOrPrivateRegistry(waitForService, isPrivateRegistry, 10*time.Millisecond) + if !errors.Is(err, expectedErr) { + t.Fatalf("expected error %v, got %v", expectedErr, err) + } +} + +func TestWaitForReadyOrPrivateRegistry_PrivateRegistry(t *testing.T) { + waitForService := func() error { + // block forever (or at least long enough for private registry check to fire) + time.Sleep(1 * time.Second) + return nil + } + isPrivate := false + isPrivateRegistry := func() bool { + isPrivate = true + return true // instantly return true + } + err := waitForReadyOrPrivateRegistry(waitForService, isPrivateRegistry, 10*time.Millisecond) + if !errors.Is(err, errPrivateRegistry) { + t.Fatalf("expected errPrivateRegistry, got %v", err) + } + if !isPrivate { + t.Fatalf("expected isPrivateRegistry to be called") + } +} + +func TestWaitForReadyOrPrivateRegistry_DelayedSuccess(t *testing.T) { + waitForService := func() error { + time.Sleep(50 * time.Millisecond) + return nil + } + isPrivateRegistry := func() bool { + return false // never private + } + err := waitForReadyOrPrivateRegistry(waitForService, isPrivateRegistry, 10*time.Millisecond) + if err != nil { + t.Fatalf("expected nil error, got %v", err) + } +}