Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 86 additions & 29 deletions xray/xray.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ import (
_ "github.com/xtls/xray-core/main/json"
)

// drainTimeout is how long an instance stays in the servers map after Close() is called.
// This gives any in-flight operations a chance to finish before the instance is actually closed,
// which helps xray-core's goroutines clean up properly.
const drainTimeout = 30 * time.Second
// DrainTimeout is how long an instance stays in the servers map after Close()
// is called. This gives in-flight operations a chance to finish before the
// instance is actually closed, preventing goroutine leaks in xray-core.
var DrainTimeout = 30 * time.Second

// sweepInterval how often the background sweeper runs.
const sweepInterval = 10 * time.Second
// SweepInterval is how often the background sweeper runs.
var SweepInterval = 10 * time.Second

type Server struct {
Instance *core.Instance
Expand All @@ -75,52 +75,76 @@ type Server struct {
}

var (
mu sync.Mutex
servers = make(map[string]*Server)
mu sync.Mutex
servers = make(map[string]*Server)
sweeperOnce sync.Once
stopCh chan struct{}
sweeperWG sync.WaitGroup
)

func init() {
go sweeper()
// StartSweeper launches the background sweeper goroutine if not already running.
// It is called automatically by the public API; you do not need to call it.
func startSweeper() {
sweeperOnce.Do(func() {
ch := make(chan struct{})
stopCh = ch
sweeperWG.Add(1)
go func(stop <-chan struct{}) {
sweeper(stop)
sweeperWG.Done()
}(ch) // pass as parameter so the goroutine uses its own copy
})
}

func sweeper() {
// StopSweeper stops the running sweeper goroutine (if any) and waits for it
// to exit, then resets its Once gate so a new sweeper can be started.
// Intended for use in tests only.
func StopSweeper() {
if stopCh != nil {
close(stopCh)
}
sweeperWG.Wait()
// Reset state so a fresh sweeper can be started in the next test.
stopCh = nil
sweeperOnce = sync.Once{}
}

func sweeper(stop <-chan struct{}) {
for {
time.Sleep(sweepInterval)
select {
case <-stop:
return
case <-time.After(SweepInterval):
}

// Collect expired URLs under lock, then release lock before closing
// to avoid blocking all map operations while Instance.Close() runs.
var expired []struct {
url string
srv *Server
url string
srv *Server
}
mu.Lock()
now := time.Now()
for url, srv := range servers {
if !srv.DrainedAt.IsZero() && now.Sub(srv.DrainedAt) > drainTimeout {
if !srv.DrainedAt.IsZero() && now.Sub(srv.DrainedAt) > DrainTimeout {
expired = append(expired, struct {
url string
srv *Server
url string
srv *Server
}{url, srv})
}
}
mu.Unlock()

// Close instances outside the critical section.
for _, e := range expired {
e.srv.Instance.Close() //nolint: errcheck
mu.Lock()
delete(servers, e.url)
mu.Unlock()
tryCloseAndDelete(e.url, e.srv)
}
}
}

func getServer(proxyURL string) *Server {
startSweeper()
mu.Lock()
defer mu.Unlock()

if proxy, ok := servers[proxyURL]; ok {
// If draining, revive it.
if !proxy.DrainedAt.IsZero() {
proxy.DrainedAt = time.Time{}
}
Expand All @@ -130,20 +154,40 @@ func getServer(proxyURL string) *Server {
}

func setServer(proxyURL string, instance *core.Instance, port int) {
startSweeper()
mu.Lock()
defer mu.Unlock()

servers[proxyURL] = &Server{
Instance: instance,
Instance: instance,
SocksPort: port,
DrainedAt: time.Time{},
DrainedAt: time.Time{},
}
}

// tryCloseAndDelete checks the entry under lock, closes it if still draining,
// then removes it from the map. The lock pattern ensures:
// - The entry hasn't been revived (DrainedAt reset to zero) since collection.
// - The entry hasn't been replaced by a newer server for the same URL.
func tryCloseAndDelete(url string, srv *Server) {
mu.Lock()
defer mu.Unlock()
if srv == nil || servers[url] != srv || srv.DrainedAt.IsZero() {
return
}
if srv.Instance != nil {
srv.Instance.Close() //nolint: errcheck
}
if servers[url] == srv {
delete(servers, url)
}
}

// Close marks the server as draining. The sweeper goroutine will actually close
// the xray instance after drainTimeout elapses, giving in-flight operations a
// the xray instance after DrainTimeout elapses, giving in-flight operations a
// chance to finish cleanly and preventing premature close from leaking goroutines.
func Close(proxyURL string) {
startSweeper()
mu.Lock()
defer mu.Unlock()

Expand All @@ -154,8 +198,9 @@ func Close(proxyURL string) {
}

// CloseAll marks all servers as draining immediately. The sweeper will close
// each one after drainTimeout.
// each one after DrainTimeout.
func CloseAll() {
startSweeper()
mu.Lock()
defer mu.Unlock()

Expand All @@ -166,3 +211,15 @@ func CloseAll() {
}
}
}

// ResetForTest clears all entries from the servers map and resets the sweeper,
// so tests get a clean state without reassigning the map variable (which would
// race with any goroutines still iterating over the old map). Safe to call from tests.
func ResetForTest() {
mu.Lock()
for url := range servers {
delete(servers, url)
}
mu.Unlock()
StopSweeper()
}
Loading
Loading