fix: prevent xray goroutine leak via background sweeper#79
Conversation
Reviewer's GuideIntroduces a draining lifecycle for cached Xray servers with a background sweeper goroutine, so Close/CloseAll mark instances for deferred shutdown instead of closing them immediately, preventing goroutine leaks while allowing in-flight work to finish. Sequence diagram for deferred Close with background sweepersequenceDiagram
participant Caller
participant Functions
participant GlobalState
participant Server
participant Sweeper
participant CoreInstance as core.Instance
Caller->>Functions: Close(proxyURL)
Functions->>GlobalState: Lock mu
Functions->>GlobalState: lookup servers[proxyURL]
alt server exists and DrainedAt is zero
Functions->>Server: set DrainedAt = time.Now()
else server missing or already draining
Note over Functions,Server: No change
end
Functions->>GlobalState: Unlock mu
loop every sweepInterval
Sweeper->>GlobalState: Sleep(sweepInterval)
Sweeper->>GlobalState: Lock mu
Sweeper->>GlobalState: iterate servers
alt server DrainedAt != zero and now - DrainedAt > drainTimeout
Sweeper->>CoreInstance: Instance.Close()
Sweeper->>GlobalState: delete servers[proxyURL]
else not ready to close
Note over Sweeper,Server: Keep server in draining or active state
end
Sweeper->>GlobalState: Unlock mu
end
Sequence diagram for getServer reviving a draining instancesequenceDiagram
participant Caller
participant Functions
participant GlobalState
participant Server
Caller->>Functions: getServer(proxyURL)
Functions->>GlobalState: Lock mu
Functions->>GlobalState: lookup servers[proxyURL]
alt server exists
alt Server.DrainedAt is non_zero
Functions->>Server: set DrainedAt = zero_time
else Server.DrainedAt is zero
Note over Functions,Server: Already active
end
Functions-->>Caller: *Server
else server missing
Functions-->>Caller: nil
end
Functions->>GlobalState: Unlock mu
Class diagram for updated Xray server lifecycle managementclassDiagram
class Server {
core.Instance Instance
int SocksPort
time.Time DrainedAt
}
class GlobalState {
sync.Mutex mu
map~string,*Server~ servers
bool sweeping
const time.Duration drainTimeout
const time.Duration sweepInterval
}
class Functions {
+getServer(proxyURL string) *Server
+setServer(proxyURL string, instance *core.Instance, port int)
+Close(proxyURL string)
+CloseAll()
+sweeper()
+init()
}
GlobalState --> Server : manages
Functions --> Server : creates_and_updates
Functions --> GlobalState : reads_writes
State diagram for Server draining lifecyclestateDiagram-v2
[*] --> Active
Active: DrainedAt = zero
Draining: DrainedAt = non_zero
Closed: removed from servers map
Active --> Draining: Close or CloseAll
Draining --> Active: getServer called
Draining --> Closed: sweeper after drainTimeout
Closed --> [*]
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- Holding
muwhile callingsrv.Instance.Close()insweeperrisks blocking all map operations if close is slow; consider first collecting the instances to close under the lock, then releasing the lock and closing them outside the critical section. - The
sweepingpackage-level variable is declared but never used; it can be removed or repurposed (e.g., to guard multiple sweeper startups) to avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Holding `mu` while calling `srv.Instance.Close()` in `sweeper` risks blocking all map operations if close is slow; consider first collecting the instances to close under the lock, then releasing the lock and closing them outside the critical section.
- The `sweeping` package-level variable is declared but never used; it can be removed or repurposed (e.g., to guard multiple sweeper startups) to avoid confusion.
## Individual Comments
### Comment 1
<location path="xray/xray.go" line_range="120-123" />
<code_context>
defer mu.Unlock()
servers[proxyURL] = &Server{
- Instance: instance,
+ Instance: instance,
SocksPort: port,
+ DrainedAt: time.Time{},
}
}
</code_context>
<issue_to_address>
**issue (bug_risk):** Overwriting an existing draining server entry can leak the old instance because the sweeper no longer sees it.
With the new draining design, `setServer` now unconditionally overwrites `servers[proxyURL]`. If a server has been marked draining by `Close`/`CloseAll` but the sweeper hasn’t run yet, a new `setServer` for the same URL will remove the draining `*Server` from the map, so the sweeper never sees or closes it. To avoid this leak, ensure any existing entry is explicitly `Close()`d before overwriting, or always close the previous instance when replacing it, regardless of draining state.
</issue_to_address>
### Comment 2
<location path="xray/xray.go" line_range="142-148" />
<code_context>
+// CloseAll marks all servers as draining immediately. The sweeper will close
+// each one after drainTimeout.
func CloseAll() {
mu.Lock()
defer mu.Unlock()
- for url, server := range servers {
- server.Instance.Close() //nolint: errcheck
- delete(servers, url)
+ now := time.Now()
+ for _, srv := range servers {
+ if srv.DrainedAt.IsZero() {
+ srv.DrainedAt = now
+ }
</code_context>
<issue_to_address>
**question (bug_risk):** CloseAll no longer guarantees synchronous shutdown of instances, which may surprise callers depending on previous semantics.
Previously this called `Instance.Close()` and removed each server entry before returning, so callers could rely on all instances being fully shut down. Now it only sets `DrainedAt` and returns, leaving `Close()` and deletion to the sweeper after `drainTimeout`, which can introduce races or resource conflicts for callers expecting immediate shutdown (e.g., port reuse, goroutine termination). If that synchronous guarantee is required, consider either keeping `CloseAll` blocking for shutdown (with an internal helper for async draining) or introducing a separate async API and preserving the original semantics here.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| servers[proxyURL] = &Server{ | ||
| Instance: instance, | ||
| Instance: instance, | ||
| SocksPort: port, | ||
| DrainedAt: time.Time{}, |
There was a problem hiding this comment.
issue (bug_risk): Overwriting an existing draining server entry can leak the old instance because the sweeper no longer sees it.
With the new draining design, setServer now unconditionally overwrites servers[proxyURL]. If a server has been marked draining by Close/CloseAll but the sweeper hasn’t run yet, a new setServer for the same URL will remove the draining *Server from the map, so the sweeper never sees or closes it. To avoid this leak, ensure any existing entry is explicitly Close()d before overwriting, or always close the previous instance when replacing it, regardless of draining state.
| func CloseAll() { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| for url, server := range servers { | ||
| server.Instance.Close() //nolint: errcheck | ||
| delete(servers, url) | ||
| now := time.Now() | ||
| for _, srv := range servers { | ||
| if srv.DrainedAt.IsZero() { |
There was a problem hiding this comment.
question (bug_risk): CloseAll no longer guarantees synchronous shutdown of instances, which may surprise callers depending on previous semantics.
Previously this called Instance.Close() and removed each server entry before returning, so callers could rely on all instances being fully shut down. Now it only sets DrainedAt and returns, leaving Close() and deletion to the sweeper after drainTimeout, which can introduce races or resource conflicts for callers expecting immediate shutdown (e.g., port reuse, goroutine termination). If that synchronous guarantee is required, consider either keeping CloseAll blocking for shutdown (with an internal helper for async draining) or introducing a separate async API and preserving the original semantics here.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #79 +/- ##
========================================
+ Coverage 8.42% 8.79% +0.37%
========================================
Files 27 27
Lines 1508 1523 +15
========================================
+ Hits 127 134 +7
- Misses 1367 1374 +7
- Partials 14 15 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- Add DrainedAt field to Server struct for graceful draining - Add background sweeper goroutine that closes drained xray instances after 30s grace period (gives in-flight ops time to finish) - sweeper runs every 10s to close expired draining instances - Close() marks instance as draining instead of immediate close - CloseAll() marks all instances as draining for sweeper to clean up - This prevents goroutine accumulation when xray-core's Close() does not fully terminate all internal goroutines on short-lived proxy tests
fead215 to
6fbbbe0
Compare
Background
The
xray.goservers map caches Xray instances keyed by proxy URL. WhenClose()is called, it immediately callsInstance.Close()and removes the entry. However, xray-core's goroutines may not fully terminate on short-lived proxy tests, causing goroutine accumulation over time.After 18 days of sustained proxy testing the process holds 1.5GB RSS with 4GB swap full.
Changes
DrainedAtfield toServerstruct (zero = active, non-zero = draining since)sweeper()goroutine that runs every 10s and closes instances that have been draining for more than 30sClose()now marks the instance as draining instead of closing immediatelyCloseAll()marks all instances as draining for the sweeper to clean upgetServer()revives a draining instance if accessed again within the grace periodThis gives in-flight operations a graceful period to finish while ensuring no goroutine leaks accumulate over time.
Acceptance Criteria
Close()andCloseAll()still work correctlySummary by Sourcery
Introduce a delayed-drain mechanism for Xray server instances to prevent goroutine leaks and allow graceful shutdown of in-flight operations.
Bug Fixes:
Enhancements: