Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/arq/arq.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ type queuedDataNackRemover interface {
type Logger interface {
Debugf(format string, args ...any)
Infof(format string, args ...any)
Warnf(format string, args ...any)
Errorf(format string, args ...any)
}

type dummyLogger struct{}

func (d *dummyLogger) Debugf(f string, a ...any) {}
func (d *dummyLogger) Infof(f string, a ...any) {}
func (d *dummyLogger) Warnf(f string, a ...any) {}
func (d *dummyLogger) Errorf(f string, a ...any) {}

type arqDataItem struct {
Expand Down Expand Up @@ -1899,6 +1901,7 @@ func (a *ARQ) checkRetransmits() {
}
} else if now.Sub(info.CreatedAt) >= a.dataPacketTTL && info.Retries >= a.maxDataRetries {
a.mu.Unlock()
a.logger.Debugf("⚠️ <yellow>ARQ max retransmissions <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan> <magenta>|</magenta> SN: <cyan>%d</cyan></yellow>", a.streamID, a.sessionID, sn)
a.Close("Max retransmissions exceeded", CloseOptions{SendRST: true})
return
}
Expand Down Expand Up @@ -2103,7 +2106,9 @@ func (a *ARQ) handleTerminalRetransmitState(now time.Time) bool {
return false
}

idleDur := now.Sub(a.lastActivity)
a.mu.Unlock()
a.logger.Debugf("⏰ <yellow>ARQ inactivity timeout <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan> <magenta>|</magenta> Idle: <cyan>%s</cyan></yellow>", a.streamID, a.sessionID, idleDur.Truncate(time.Second))
a.Close("Stream Inactivity Timeout (Dead)", CloseOptions{SendRST: true})
return true
}
Expand Down Expand Up @@ -2146,6 +2151,7 @@ func (a *ARQ) checkControlRetransmits(now time.Time) {
if exceededRetries {
reason = "Control packet max retransmissions exceeded"
}
a.logger.Debugf("⚠️ <yellow>ARQ control expired <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Type: <cyan>0x%02x</cyan> <magenta>|</magenta> Reason: <cyan>%s</cyan></yellow>", a.streamID, info.PacketType, reason)
a.mu.Unlock()
a.handleTrackedPacketTTLExpiry(info.PacketType, reason)
a.mu.Lock()
Expand All @@ -2166,6 +2172,7 @@ func (a *ARQ) checkControlRetransmits(now time.Time) {
continue
}

a.logger.Debugf("🔄 <yellow>ARQ control retransmit <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Type: <cyan>0x%02x</cyan> <magenta>|</magenta> Retry: <cyan>%d</cyan></yellow>", a.streamID, info.PacketType, info.Retries+1)
info.LastSentAt = now
info.Dispatched = false
info.Retries++
Expand Down
1 change: 1 addition & 0 deletions internal/arq/arq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type testLogger struct {

func (l *testLogger) Debugf(format string, args ...any) { l.t.Logf("[DEBUG] "+format, args...) }
func (l *testLogger) Infof(format string, args ...any) { l.t.Logf("[INFO] "+format, args...) }
func (l *testLogger) Warnf(format string, args ...any) { l.t.Logf("[WARN] "+format, args...) }
func (l *testLogger) Errorf(format string, args ...any) { l.t.Logf("[ERROR] "+format, args...) }

type eofAfterDataConn struct {
Expand Down
73 changes: 72 additions & 1 deletion internal/client/async_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"masterdnsvpn-go/internal/client/handlers"
DnsParser "masterdnsvpn-go/internal/dnsparser"
fragmentStore "masterdnsvpn-go/internal/fragmentstore"
"masterdnsvpn-go/internal/logger"
)

const clientRXDropLogInterval = 2 * time.Second
Expand Down Expand Up @@ -296,7 +297,11 @@ func (c *Client) StartAsyncRuntime(parentCtx context.Context) error {
}()
}

// 10. Lifecycle cleanup.
// 10. Periodic stats logger.
c.asyncWG.Add(1)
go c.asyncStatsLogger(runtimeCtx)

// 11. Lifecycle cleanup.
c.asyncWG.Add(1)
go func() {
defer c.asyncWG.Done()
Expand Down Expand Up @@ -379,6 +384,72 @@ func (c *Client) asyncStreamCleanupWorker(ctx context.Context) {
}
}

// asyncStatsLogger periodically logs connection and stream statistics.
func (c *Client) asyncStatsLogger(ctx context.Context) {
defer c.asyncWG.Done()

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.logPeriodicStats()
}
}
}

func (c *Client) logPeriodicStats() {
if c.log == nil {
return
}

// Stream stats
c.streamsMu.RLock()
total := len(c.active_streams)
active, draining, pending := 0, 0, 0
for _, s := range c.active_streams {
if s == nil {
continue
}
switch s.StatusValue() {
case streamStatusActive:
active++
case streamStatusDraining, streamStatusClosing, streamStatusTimeWait:
draining++
case streamStatusPending, streamStatusSocksConnecting:
pending++
}
}
c.streamsMu.RUnlock()

// Balancer stats
bs := c.balancer.Stats()

c.log.Debugf(
"📊 <green>Stats <magenta>|</magenta> Streams: <cyan>%d</cyan> active, <cyan>%d</cyan> pending, <cyan>%d</cyan> draining (<cyan>%d</cyan> total) <magenta>|</magenta> Resolvers: <cyan>%d</cyan>/<cyan>%d</cyan> valid</green>",
active, pending, draining, total, bs.Valid, bs.Total,
)

if c.log.Enabled(logger.LevelDebug) {
for _, e := range bs.Entries {
var lossStr string
if e.Sent > 0 {
lossPct := float64(e.Sent-e.Acked) * 100 / float64(e.Sent)
lossStr = fmt.Sprintf("%.1f%%", lossPct)
} else {
lossStr = "N/A"
}
c.log.Debugf(
"📊 <cyan>Resolver %s <magenta>|</magenta> Valid: <cyan>%t</cyan> <magenta>|</magenta> Sent: <cyan>%d</cyan> <magenta>|</magenta> Acked: <cyan>%d</cyan> <magenta>|</magenta> Loss: <cyan>%s</cyan> <magenta>|</magenta> AvgRTT: <cyan>%dms</cyan></cyan>",
e.Key, e.Valid, e.Sent, e.Acked, lossStr, e.AvgRTTMicro/1000,
)
}
}
}

// drainQueues removes any stale packets from TX and RX channels.
// Buffers from the RX channel are returned to the pool to prevent leaks.
func (c *Client) drainQueues() {
Expand Down
51 changes: 51 additions & 0 deletions internal/client/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,3 +596,54 @@ func xorshift64(v uint64) uint64 {
v ^= v << 17
return v
}

// ConnectionStatEntry holds per-resolver stats for periodic reporting.
type ConnectionStatEntry struct {
Key string
Valid bool
Sent uint64
Acked uint64
AvgRTTMicro uint64
}

// BalancerStats holds aggregated snapshot stats.
type BalancerStats struct {
Total int
Valid int
Entries []ConnectionStatEntry
}

// Stats returns a snapshot of per-connection statistics.
func (b *Balancer) Stats() BalancerStats {
snap := b.snapshot.Load()
if snap == nil {
return BalancerStats{}
}

entries := make([]ConnectionStatEntry, 0, len(snap.connections))
for idx, conn := range snap.connections {
if conn == nil {
continue
}
entry := ConnectionStatEntry{
Key: conn.Key,
Valid: conn.IsValid,
}
if idx < len(snap.stats) && snap.stats[idx] != nil {
s := snap.stats[idx]
entry.Sent = s.sent.Load()
entry.Acked = s.acked.Load()
cnt := s.rttCount.Load()
if cnt > 0 {
entry.AvgRTTMicro = s.rttMicrosSum.Load() / cnt
}
}
entries = append(entries, entry)
}

return BalancerStats{
Total: len(snap.connections),
Valid: len(snap.valid),
Entries: entries,
}
}
26 changes: 25 additions & 1 deletion internal/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (c *Client) InitializeSession(maxAttempts int) error {
}
}

if c.log != nil {
c.log.Debugf("⚠️ <yellow>Session init failed after <cyan>%d</cyan> attempts</yellow>", maxAttempts)
}
return ErrSessionInitFailed
}

Expand All @@ -62,11 +65,17 @@ func (c *Client) initializeSessionOnce() error {

query, err := c.buildSessionQuery(conn.Domain, Enums.PACKET_SESSION_INIT, initPayload)
if err != nil {
if c.log != nil {
c.log.Debugf("⚠️ <yellow>Session init query build failed: <cyan>%v</cyan></yellow>", err)
}
return ErrSessionInitFailed
}

packet, err := c.exchangeDNSOverConnection(conn, query, c.mtuTestTimeout)
if err != nil {
if c.log != nil {
c.log.Debugf("🔄 <yellow>Session init DNS exchange failed: <cyan>%v</cyan></yellow>", err)
}
return ErrSessionInitFailed
}

Expand All @@ -75,7 +84,11 @@ func (c *Client) initializeSessionOnce() error {
if len(packet.Payload) < sessionBusyPayloadSize || !bytes.Equal(packet.Payload[:sessionBusyPayloadSize], verifyCode[:]) {
return ErrSessionInitFailed
}
c.setSessionInitBusyUntil(time.Now().Add(c.cfg.SessionInitBusyRetryInterval()))
busyRetry := c.cfg.SessionInitBusyRetryInterval()
c.setSessionInitBusyUntil(time.Now().Add(busyRetry))
if c.log != nil {
c.log.Debugf("⚠️ <yellow>Server busy, retry after <cyan>%s</cyan></yellow>", busyRetry)
}
return ErrSessionInitBusy
case Enums.PACKET_SESSION_ACCEPT:
if len(packet.Payload) < sessionAcceptPayloadSize || !bytes.Equal(packet.Payload[3:7], verifyCode[:]) {
Expand All @@ -91,6 +104,14 @@ func (c *Client) initializeSessionOnce() error {
c.clearSessionInitBusyUntil()
c.resetSessionInitState()
c.clearSessionResetPending()
if c.log != nil {
c.log.Debugf(
"✅ <green>Session Accepted <magenta>|</magenta> ID: <cyan>%d</cyan> <magenta>|</magenta> Cookie: <cyan>%d</cyan> <magenta>|</magenta> Upload: <cyan>%s</cyan> <magenta>|</magenta> Download: <cyan>%s</cyan></green>",
c.sessionID, c.sessionCookie,
compression.TypeName(c.uploadCompression),
compression.TypeName(c.downloadCompression),
)
}
return nil
default:
return ErrSessionInitFailed
Expand Down Expand Up @@ -319,6 +340,9 @@ func (c *Client) sendSessionCloseRound(targets []Connection, deadline time.Time)
PacketType: Enums.PACKET_SESSION_CLOSE,
})
if err != nil {
if c.log != nil {
c.log.Debugf("⚠️ <yellow>Session close query build failed: <cyan>%v</cyan></yellow>", err)
}
return
}
c.sendOneWayDNSQuery(conn, query, deadline)
Expand Down
9 changes: 8 additions & 1 deletion internal/client/socks_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func (c *Client) supportsSOCKS4() bool {
func (c *Client) HandleSOCKS5(ctx context.Context, conn net.Conn) {
version := make([]byte, 1)
if _, err := io.ReadFull(conn, version); err != nil {
if c.log != nil {
c.log.Debugf("🔌 <yellow>SOCKS handshake read failed: <cyan>%v</cyan></yellow>", err)
}
_ = conn.Close()
return
}
Expand All @@ -83,6 +86,9 @@ func (c *Client) HandleSOCKS5(ctx context.Context, conn net.Conn) {
}
c.handleSOCKS4Request(ctx, conn)
default:
if c.log != nil {
c.log.Debugf("🔌 <yellow>SOCKS unknown version: <cyan>0x%02x</cyan></yellow>", version[0])
}
_ = conn.Close()
}
}
Expand Down Expand Up @@ -662,7 +668,7 @@ func (c *Client) HandleSocksConnected(packet VpnProto.Packet) error {
arqObj.SetIOReady(true)
}

c.log.Debugf("🔌 <green>Socks successfully connected for stream %d</green>", packet.StreamID)
c.log.Debugf("🔌 <green>SOCKS connected <magenta>|</magenta> Stream: <cyan>%d</cyan></green>", packet.StreamID)
return nil
}

Expand Down Expand Up @@ -706,6 +712,7 @@ func (c *Client) HandleSocksFailure(packet VpnProto.Packet) error {
return nil
}

c.log.Debugf("🔌 <yellow>SOCKS failure <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> PacketType: <cyan>0x%02x</cyan></yellow>", packet.StreamID, packet.PacketType)
arqObj.Close("SOCKS failure received", arq.CloseOptions{Force: true})
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions internal/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ func (c *Client) new_stream(streamID uint16, conn net.Conn, targetPayload []byte
c.ensureStreamPreferredConnection(s)
}

if c.log != nil && streamID != 0 {
c.log.Debugf(
"🧦 <green>Stream Created <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan> <magenta>|</magenta> Status: <cyan>%s</cyan></green>",
streamID, c.sessionID, s.StatusValue(),
)
}

return s
}

Expand Down Expand Up @@ -327,6 +334,13 @@ func (s *Stream_client) RemoveQueuedDataNack(sequenceNum uint16) bool {
}

func (s *Stream_client) cleanupResources() {
if s.client != nil && s.client.log != nil && s.StreamID != 0 {
s.client.log.Debugf(
"🧹 <yellow>Stream Cleanup <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan></yellow>",
s.StreamID, s.client.sessionID,
)
}

if s.NetConn != nil {
_ = s.NetConn.Close()
}
Expand Down Expand Up @@ -612,6 +626,10 @@ func (c *Client) CloseAllStreams() {
c.streamsMu.Unlock()
c.bumpStreamSetVersion()

if c.log != nil {
c.log.Debugf("🧦 <yellow>Closing all streams <magenta>|</magenta> Count: <cyan>%d</cyan></yellow>", len(streams))
}

for _, s := range streams {
if s != nil {
s.Close()
Expand Down
34 changes: 34 additions & 0 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,40 @@ func (l *Logger) Enabled(level int) bool {
return l != nil && level >= l.level
}

// SubLogger wraps a Logger with a pre-rendered context prefix (e.g. "[Sess:3] [Str:42]").
// It satisfies the arq.Logger interface and can be chained via With().
type SubLogger struct {
parent *Logger
prefix string
}

// With creates a SubLogger with a single [key:value] context field.
func (l *Logger) With(key, value string) *SubLogger {
return &SubLogger{parent: l, prefix: "[" + key + ":" + value + "]"}
}

// With chains an additional [key:value] context field onto an existing SubLogger.
func (s *SubLogger) With(key, value string) *SubLogger {
return &SubLogger{parent: s.parent, prefix: s.prefix + " [" + key + ":" + value + "]"}
}

func (s *SubLogger) Debugf(format string, args ...any) {
s.parent.logf(levelDebug, s.prefix+" "+format, args...)
}
func (s *SubLogger) Infof(format string, args ...any) {
s.parent.logf(levelInfo, s.prefix+" "+format, args...)
}
func (s *SubLogger) Warnf(format string, args ...any) {
s.parent.logf(levelWarn, s.prefix+" "+format, args...)
}
func (s *SubLogger) Errorf(format string, args ...any) {
s.parent.logf(levelError, s.prefix+" "+format, args...)
}

func (s *SubLogger) Enabled(level int) bool {
return s.parent.Enabled(level)
}

func stripColorTags(text string) string {
start := strings.IndexByte(text, '<')
if start == -1 {
Expand Down
Loading