Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions xray/xray.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ package xray

import (
"sync"
"time"

core "github.com/xtls/xray-core/core"

// The following are necessary as they register handlers in their init functions.

// Mandatory features. Can't remove unless there are replacements.
_ "github.com/xtls/xray-core/app/dispatcher"
_ "github.com/xtls/xray-core/app/proxyman/inbound"
_ "github.com/xtls/xray-core/app/proxyman/outbound"

// Other optional features.
_ "github.com/xtls/xray-core/app/dns"
// _ "github.com/xtls/xray-core/app/dns/fakedns"
Expand All @@ -21,13 +19,10 @@ import (
// _ "github.com/xtls/xray-core/app/reverse"
_ "github.com/xtls/xray-core/app/router"
// _ "github.com/xtls/xray-core/app/stats"

// Fix dependency cycle caused by core import in internet package
_ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl"

// Developer preview features
// _ "github.com/xtls/xray-core/app/observatory"

// Inbound and outbound proxies.
_ "github.com/xtls/xray-core/proxy/blackhole"
_ "github.com/xtls/xray-core/proxy/dns"
Expand All @@ -43,7 +38,6 @@ import (
_ "github.com/xtls/xray-core/proxy/vmess/inbound"
_ "github.com/xtls/xray-core/proxy/vmess/outbound"
_ "github.com/xtls/xray-core/proxy/wireguard"

// Transports
_ "github.com/xtls/xray-core/transport/internet/grpc"
_ "github.com/xtls/xray-core/transport/internet/httpupgrade"
Expand All @@ -54,7 +48,6 @@ import (
_ "github.com/xtls/xray-core/transport/internet/tls"
_ "github.com/xtls/xray-core/transport/internet/udp"
_ "github.com/xtls/xray-core/transport/internet/websocket"

// Transport headers
_ "github.com/xtls/xray-core/transport/internet/headers/http"
_ "github.com/xtls/xray-core/transport/internet/headers/noop"
Expand All @@ -63,26 +56,57 @@ import (
_ "github.com/xtls/xray-core/transport/internet/headers/utp"
_ "github.com/xtls/xray-core/transport/internet/headers/wechat"
_ "github.com/xtls/xray-core/transport/internet/headers/wireguard"

// JSON & TOML & YAML
_ "github.com/xtls/xray-core/main/json"
)

// drainTimeout is how long an instance stays in the servers map after Close() is called.
// This gives any in-flight operations a chance to finish before the instance is actually closed,
// which helps xray-core's goroutines clean up properly.
const drainTimeout = 30 * time.Second

// sweepInterval how often the background sweeper runs.
const sweepInterval = 10 * time.Second

type Server struct {
Instance *core.Instance
SocksPort int
DrainedAt time.Time // zero = active; non-zero = draining since this time
}

var (
mu sync.Mutex
servers = make(map[string]*Server)
)

type Server struct {
Instance *core.Instance
SocksPort int
func init() {
go sweeper()
}

func sweeper() {
for {
time.Sleep(sweepInterval)
mu.Lock()
now := time.Now()
for url, srv := range servers {
if !srv.DrainedAt.IsZero() && now.Sub(srv.DrainedAt) > drainTimeout {
srv.Instance.Close() //nolint: errcheck
delete(servers, url)
}
}
mu.Unlock()
}
}

func getServer(proxyURL string) *Server {
mu.Lock()
defer mu.Unlock()

if proxy, ok := servers[proxyURL]; ok {
// If draining, revive it.
if !proxy.DrainedAt.IsZero() {
proxy.DrainedAt = time.Time{}
}
return proxy
}
return nil
Expand All @@ -93,28 +117,35 @@ func setServer(proxyURL string, instance *core.Instance, port int) {
defer mu.Unlock()

servers[proxyURL] = &Server{
Instance: instance,
Instance: instance,
SocksPort: port,
DrainedAt: time.Time{},
Comment on lines 119 to +122
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Overwriting an existing draining server entry can leak the old instance because the sweeper no longer sees it.

With the new draining design, setServer now unconditionally overwrites servers[proxyURL]. If a server has been marked draining by Close/CloseAll but the sweeper hasn’t run yet, a new setServer for the same URL will remove the draining *Server from the map, so the sweeper never sees or closes it. To avoid this leak, ensure any existing entry is explicitly Close()d before overwriting, or always close the previous instance when replacing it, regardless of draining state.

}
}

// Close marks the server as draining. The sweeper goroutine will actually close
// the xray instance after drainTimeout elapses, giving in-flight operations a
// chance to finish cleanly and preventing premature close from leaking goroutines.
func Close(proxyURL string) {
mu.Lock()
defer mu.Unlock()

i, ok := servers[proxyURL]
if ok {
i.Instance.Close() //nolint: errcheck
delete(servers, proxyURL)
if ok && i.DrainedAt.IsZero() {
i.DrainedAt = time.Now()
}
}

// CloseAll marks all servers as draining immediately. The sweeper will close
// each one after drainTimeout.
func CloseAll() {
mu.Lock()
defer mu.Unlock()

for url, server := range servers {
server.Instance.Close() //nolint: errcheck
delete(servers, url)
now := time.Now()
for _, srv := range servers {
if srv.DrainedAt.IsZero() {
Comment on lines 141 to +147
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (bug_risk): CloseAll no longer guarantees synchronous shutdown of instances, which may surprise callers depending on previous semantics.

Previously this called Instance.Close() and removed each server entry before returning, so callers could rely on all instances being fully shut down. Now it only sets DrainedAt and returns, leaving Close() and deletion to the sweeper after drainTimeout, which can introduce races or resource conflicts for callers expecting immediate shutdown (e.g., port reuse, goroutine termination). If that synchronous guarantee is required, consider either keeping CloseAll blocking for shutdown (with an internal helper for async draining) or introducing a separate async API and preserving the original semantics here.

srv.DrainedAt = now
}
}
}
Loading