diff --git a/xray/xray.go b/xray/xray.go index 42599a5..ae4c558 100644 --- a/xray/xray.go +++ b/xray/xray.go @@ -2,16 +2,14 @@ package xray import ( "sync" + "time" core "github.com/xtls/xray-core/core" - // The following are necessary as they register handlers in their init functions. - // Mandatory features. Can't remove unless there are replacements. _ "github.com/xtls/xray-core/app/dispatcher" _ "github.com/xtls/xray-core/app/proxyman/inbound" _ "github.com/xtls/xray-core/app/proxyman/outbound" - // Other optional features. _ "github.com/xtls/xray-core/app/dns" // _ "github.com/xtls/xray-core/app/dns/fakedns" @@ -21,13 +19,10 @@ import ( // _ "github.com/xtls/xray-core/app/reverse" _ "github.com/xtls/xray-core/app/router" // _ "github.com/xtls/xray-core/app/stats" - // Fix dependency cycle caused by core import in internet package _ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl" - // Developer preview features // _ "github.com/xtls/xray-core/app/observatory" - // Inbound and outbound proxies. _ "github.com/xtls/xray-core/proxy/blackhole" _ "github.com/xtls/xray-core/proxy/dns" @@ -43,7 +38,6 @@ import ( _ "github.com/xtls/xray-core/proxy/vmess/inbound" _ "github.com/xtls/xray-core/proxy/vmess/outbound" _ "github.com/xtls/xray-core/proxy/wireguard" - // Transports _ "github.com/xtls/xray-core/transport/internet/grpc" _ "github.com/xtls/xray-core/transport/internet/httpupgrade" @@ -54,7 +48,6 @@ import ( _ "github.com/xtls/xray-core/transport/internet/tls" _ "github.com/xtls/xray-core/transport/internet/udp" _ "github.com/xtls/xray-core/transport/internet/websocket" - // Transport headers _ "github.com/xtls/xray-core/transport/internet/headers/http" _ "github.com/xtls/xray-core/transport/internet/headers/noop" @@ -63,19 +56,46 @@ import ( _ "github.com/xtls/xray-core/transport/internet/headers/utp" _ "github.com/xtls/xray-core/transport/internet/headers/wechat" _ "github.com/xtls/xray-core/transport/internet/headers/wireguard" - // JSON & TOML & YAML _ "github.com/xtls/xray-core/main/json" ) +// drainTimeout is how long an instance stays in the servers map after Close() is called. +// This gives any in-flight operations a chance to finish before the instance is actually closed, +// which helps xray-core's goroutines clean up properly. +const drainTimeout = 30 * time.Second + +// sweepInterval how often the background sweeper runs. +const sweepInterval = 10 * time.Second + +type Server struct { + Instance *core.Instance + SocksPort int + DrainedAt time.Time // zero = active; non-zero = draining since this time +} + var ( mu sync.Mutex servers = make(map[string]*Server) ) -type Server struct { - Instance *core.Instance - SocksPort int +func init() { + go sweeper() +} + +func sweeper() { + for { + time.Sleep(sweepInterval) + mu.Lock() + now := time.Now() + for url, srv := range servers { + if !srv.DrainedAt.IsZero() && now.Sub(srv.DrainedAt) > drainTimeout { + srv.Instance.Close() //nolint: errcheck + delete(servers, url) + } + } + mu.Unlock() + } } func getServer(proxyURL string) *Server { @@ -83,6 +103,10 @@ func getServer(proxyURL string) *Server { defer mu.Unlock() if proxy, ok := servers[proxyURL]; ok { + // If draining, revive it. + if !proxy.DrainedAt.IsZero() { + proxy.DrainedAt = time.Time{} + } return proxy } return nil @@ -93,28 +117,35 @@ func setServer(proxyURL string, instance *core.Instance, port int) { defer mu.Unlock() servers[proxyURL] = &Server{ - Instance: instance, + Instance: instance, SocksPort: port, + DrainedAt: time.Time{}, } } +// Close marks the server as draining. The sweeper goroutine will actually close +// the xray instance after drainTimeout elapses, giving in-flight operations a +// chance to finish cleanly and preventing premature close from leaking goroutines. func Close(proxyURL string) { mu.Lock() defer mu.Unlock() i, ok := servers[proxyURL] - if ok { - i.Instance.Close() //nolint: errcheck - delete(servers, proxyURL) + if ok && i.DrainedAt.IsZero() { + i.DrainedAt = time.Now() } } +// CloseAll marks all servers as draining immediately. The sweeper will close +// each one after drainTimeout. func CloseAll() { mu.Lock() defer mu.Unlock() - for url, server := range servers { - server.Instance.Close() //nolint: errcheck - delete(servers, url) + now := time.Now() + for _, srv := range servers { + if srv.DrainedAt.IsZero() { + srv.DrainedAt = now + } } }