Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions arthas/internal/k8s/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -22,8 +23,11 @@ type PortMapping struct {

// Forwarder owns a running port-forward session.
type Forwarder struct {
stop chan struct{}
done chan error
stop chan struct{}
stopOnce sync.Once
done chan struct{}
mu sync.RWMutex
err error
}

// Ready blocks until the forwarder is accepting connections on all local ports
Expand All @@ -32,6 +36,11 @@ func (f *Forwarder) Ready(ctx context.Context, ready <-chan struct{}) error {
select {
case <-ready:
return nil
case <-f.done:
if err := f.Err(); err != nil {
return err
}
return fmt.Errorf("port-forward exited before becoming ready")
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -40,13 +49,22 @@ func (f *Forwarder) Ready(ctx context.Context, ready <-chan struct{}) error {
// Close stops the forwarder. Returns the final error from the port-forward
// goroutine (usually nil).
func (f *Forwarder) Close() error {
select {
case <-f.stop:
// already closed
default:
close(f.stop)
}
return <-f.done
f.stopOnce.Do(func() { close(f.stop) })
<-f.done
return f.Err()
}

func (f *Forwarder) Err() error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.err
}

func (f *Forwarder) finish(err error) {
f.mu.Lock()
f.err = err
f.mu.Unlock()
close(f.done)
}

// StartPortForward opens a port-forward to the given pod. Returns the forwarder
Expand Down Expand Up @@ -87,12 +105,12 @@ func StartPortForward(restCfg *rest.Config, namespace, pod string, ports []PortM
return nil, nil, fmt.Errorf("create port-forward: %w", err)
}

done := make(chan error, 1)
fwd := &Forwarder{stop: stop, done: make(chan struct{})}
go func() {
done <- pf.ForwardPorts()
fwd.finish(pf.ForwardPorts())
}()

return &Forwarder{stop: stop, done: done}, ready, nil
return fwd, ready, nil
}

func mustURL(raw string) *url.URL {
Expand Down
41 changes: 30 additions & 11 deletions golang/internal/k8s/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -20,26 +21,44 @@ type PortMapping struct {
}

type Forwarder struct {
stop chan struct{}
done chan error
stop chan struct{}
stopOnce sync.Once
done chan struct{}
mu sync.RWMutex
err error
}

func (f *Forwarder) Ready(ctx context.Context, ready <-chan struct{}) error {
select {
case <-ready:
return nil
case <-f.done:
if err := f.Err(); err != nil {
return err
}
return fmt.Errorf("port-forward exited before becoming ready")
case <-ctx.Done():
return ctx.Err()
}
}

func (f *Forwarder) Close() error {
select {
case <-f.stop:
default:
close(f.stop)
}
return <-f.done
f.stopOnce.Do(func() { close(f.stop) })
<-f.done
return f.Err()
}

func (f *Forwarder) Err() error {
f.mu.RLock()
defer f.mu.RUnlock()
return f.err
}

func (f *Forwarder) finish(err error) {
f.mu.Lock()
f.err = err
f.mu.Unlock()
close(f.done)
}

func StartPortForward(restCfg *rest.Config, namespace, pod string, ports []PortMapping, errOut, infoOut io.Writer) (*Forwarder, <-chan struct{}, error) {
Expand Down Expand Up @@ -76,11 +95,11 @@ func StartPortForward(restCfg *rest.Config, namespace, pod string, ports []PortM
return nil, nil, fmt.Errorf("create port-forward: %w", err)
}

done := make(chan error, 1)
fwd := &Forwarder{stop: stop, done: make(chan struct{})}
go func() {
done <- pf.ForwardPorts()
fwd.finish(pf.ForwardPorts())
}()
return &Forwarder{stop: stop, done: done}, ready, nil
return fwd, ready, nil
}

func mustURL(raw string) *url.URL {
Expand Down
Loading