diff --git a/arthas/internal/k8s/portforward.go b/arthas/internal/k8s/portforward.go index be3e576..4a5aee1 100644 --- a/arthas/internal/k8s/portforward.go +++ b/arthas/internal/k8s/portforward.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -22,8 +23,11 @@ type PortMapping struct { // Forwarder owns a running port-forward session. type Forwarder struct { - stop chan struct{} - done chan error + stop chan struct{} + stopOnce sync.Once + done chan struct{} + mu sync.RWMutex + err error } // Ready blocks until the forwarder is accepting connections on all local ports @@ -32,6 +36,11 @@ func (f *Forwarder) Ready(ctx context.Context, ready <-chan struct{}) error { select { case <-ready: return nil + case <-f.done: + if err := f.Err(); err != nil { + return err + } + return fmt.Errorf("port-forward exited before becoming ready") case <-ctx.Done(): return ctx.Err() } @@ -40,13 +49,22 @@ func (f *Forwarder) Ready(ctx context.Context, ready <-chan struct{}) error { // Close stops the forwarder. Returns the final error from the port-forward // goroutine (usually nil). func (f *Forwarder) Close() error { - select { - case <-f.stop: - // already closed - default: - close(f.stop) - } - return <-f.done + f.stopOnce.Do(func() { close(f.stop) }) + <-f.done + return f.Err() +} + +func (f *Forwarder) Err() error { + f.mu.RLock() + defer f.mu.RUnlock() + return f.err +} + +func (f *Forwarder) finish(err error) { + f.mu.Lock() + f.err = err + f.mu.Unlock() + close(f.done) } // StartPortForward opens a port-forward to the given pod. Returns the forwarder @@ -87,12 +105,12 @@ func StartPortForward(restCfg *rest.Config, namespace, pod string, ports []PortM return nil, nil, fmt.Errorf("create port-forward: %w", err) } - done := make(chan error, 1) + fwd := &Forwarder{stop: stop, done: make(chan struct{})} go func() { - done <- pf.ForwardPorts() + fwd.finish(pf.ForwardPorts()) }() - return &Forwarder{stop: stop, done: done}, ready, nil + return fwd, ready, nil } func mustURL(raw string) *url.URL { diff --git a/golang/internal/k8s/portforward.go b/golang/internal/k8s/portforward.go index fc3ceb6..0816d5b 100644 --- a/golang/internal/k8s/portforward.go +++ b/golang/internal/k8s/portforward.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -20,26 +21,44 @@ type PortMapping struct { } type Forwarder struct { - stop chan struct{} - done chan error + stop chan struct{} + stopOnce sync.Once + done chan struct{} + mu sync.RWMutex + err error } func (f *Forwarder) Ready(ctx context.Context, ready <-chan struct{}) error { select { case <-ready: return nil + case <-f.done: + if err := f.Err(); err != nil { + return err + } + return fmt.Errorf("port-forward exited before becoming ready") case <-ctx.Done(): return ctx.Err() } } func (f *Forwarder) Close() error { - select { - case <-f.stop: - default: - close(f.stop) - } - return <-f.done + f.stopOnce.Do(func() { close(f.stop) }) + <-f.done + return f.Err() +} + +func (f *Forwarder) Err() error { + f.mu.RLock() + defer f.mu.RUnlock() + return f.err +} + +func (f *Forwarder) finish(err error) { + f.mu.Lock() + f.err = err + f.mu.Unlock() + close(f.done) } func StartPortForward(restCfg *rest.Config, namespace, pod string, ports []PortMapping, errOut, infoOut io.Writer) (*Forwarder, <-chan struct{}, error) { @@ -76,11 +95,11 @@ func StartPortForward(restCfg *rest.Config, namespace, pod string, ports []PortM return nil, nil, fmt.Errorf("create port-forward: %w", err) } - done := make(chan error, 1) + fwd := &Forwarder{stop: stop, done: make(chan struct{})} go func() { - done <- pf.ForwardPorts() + fwd.finish(pf.ForwardPorts()) }() - return &Forwarder{stop: stop, done: done}, ready, nil + return fwd, ready, nil } func mustURL(raw string) *url.URL {