Skip to content

Commit 91d2a8b

Browse files
committed
Update.
1 parent 516f8e9 commit 91d2a8b

4 files changed

Lines changed: 60 additions & 26 deletions

File tree

cmd/dashboard/rpc/rpc.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,18 @@ func isGRPCTransportError(err error) bool {
4444
func ServeRPC(port uint) {
4545
// 配置 gRPC 服务器选项,防止 goroutine 泄漏和连接问题
4646
opts := []grpc.ServerOption{
47-
// 优化 keepalive 参数,减少broken pipe错误
47+
// 优化 keepalive 参数:使用长连接策略,减少重连
4848
grpc.KeepaliveParams(keepalive.ServerParameters{
49-
MaxConnectionIdle: 3 * time.Minute, // 减少到3分钟,更快检测断开连接
50-
MaxConnectionAge: 15 * time.Minute, // 增加到15分钟,减少频繁重连
51-
MaxConnectionAgeGrace: 60 * time.Second, // 增加优雅关闭时间到60秒
52-
Time: 20 * time.Second, // 减少到20秒,更频繁的心跳检测
53-
Timeout: 10 * time.Second, // 增加超时到10秒,避免网络抖动
49+
MaxConnectionIdle: 30 * time.Minute, // 空闲30分钟后断开(防止僵尸连接)
50+
MaxConnectionAge: 2 * time.Hour, // 连接最长2小时(定期刷新,防止资源泄漏)
51+
MaxConnectionAgeGrace: 30 * time.Second, // 优雅关闭时间30秒
52+
Time: 30 * time.Second, // 每30秒发送心跳
53+
Timeout: 10 * time.Second, // 心跳超时10秒
5454
}),
5555
// 优化 keepalive 强制策略
5656
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
57-
MinTime: 5 * time.Second, // 减少到5秒,允许更频繁的keepalive
58-
PermitWithoutStream: true, // 允许没有活跃流时发送keepalive
57+
MinTime: 10 * time.Second, // 最小心跳间隔10秒
58+
PermitWithoutStream: true, // 允许没有活跃流时发送keepalive
5959
}),
6060
// 设置最大接收消息大小
6161
grpc.MaxRecvMsgSize(4 * 1024 * 1024), // 4MB
@@ -129,8 +129,19 @@ func DispatchTask(serviceSentinelDispatchBus <-chan model.Monitor) {
129129
workedServerIndex++
130130
continue
131131
}
132+
133+
// 安全发送任务的辅助函数,防止竞态条件下的nil pointer dereference
134+
sendTask := func() error {
135+
// 再次检查TaskStream是否为nil(防止竞态条件)
136+
stream := currentServer.TaskStream
137+
if stream == nil {
138+
return nil // 连接已断开,静默跳过
139+
}
140+
return stream.Send(task.PB())
141+
}
142+
132143
if task.Cover == model.MonitorCoverIgnoreAll && skipServers[currentServer.ID] {
133-
if err := currentServer.TaskStream.Send(task.PB()); err != nil {
144+
if err := sendTask(); err != nil {
134145
// 清理失效的连接
135146
currentServer.TaskStream = nil
136147
// 只在非正常网络错误时记录日志
@@ -142,7 +153,7 @@ func DispatchTask(serviceSentinelDispatchBus <-chan model.Monitor) {
142153
continue
143154
}
144155
if task.Cover == model.MonitorCoverAll && !skipServers[currentServer.ID] {
145-
if err := currentServer.TaskStream.Send(task.PB()); err != nil {
156+
if err := sendTask(); err != nil {
146157
// 清理失效的连接
147158
currentServer.TaskStream = nil
148159
// 只在非正常网络错误时记录日志
@@ -154,7 +165,7 @@ func DispatchTask(serviceSentinelDispatchBus <-chan model.Monitor) {
154165
continue
155166
}
156167
// 找到合适机器执行任务,跳出循环
157-
if err := currentServer.TaskStream.Send(task.PB()); err != nil {
168+
if err := sendTask(); err != nil {
158169
// 清理失效的连接
159170
currentServer.TaskStream = nil
160171
// 只在非正常网络错误时记录日志
@@ -174,11 +185,16 @@ func DispatchKeepalive() {
174185
singleton.SortedServerLock.RLock()
175186
defer singleton.SortedServerLock.RUnlock()
176187
for i := 0; i < len(singleton.SortedServerList); i++ {
177-
if singleton.SortedServerList[i] == nil || singleton.SortedServerList[i].TaskStream == nil {
188+
server := singleton.SortedServerList[i]
189+
if server == nil {
178190
continue
179191
}
180-
181-
singleton.SortedServerList[i].TaskStream.Send(&pb.Task{Type: model.TaskTypeKeepalive})
192+
// 防止竞态条件下的nil pointer dereference
193+
stream := server.TaskStream
194+
if stream == nil {
195+
continue
196+
}
197+
stream.Send(&pb.Task{Type: model.TaskTypeKeepalive})
182198
}
183199
})
184200
}

model/alertrule.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,18 @@ func (r *AlertRule) Check(points [][]interface{}) (int, bool) {
126126
count++
127127
}
128128
}
129+
} else if r.Rules[i].Type == "offline" {
130+
// 离线规则特殊处理:Duration 表示离线秒数阈值,不是采样点数
131+
// 只需要检查最新的采样点即可
132+
if maxNum < 1 {
133+
maxNum = 1
134+
}
135+
if len(points) > 0 && i < len(points[len(points)-1]) {
136+
// 检查最新采样点,如果不为 nil 说明已触发离线检测
137+
if points[len(points)-1][i] != nil {
138+
count++
139+
}
140+
}
129141
} else {
130142
// 常规报警
131143
total := 0.0

model/rule.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,13 @@ func (u *Rule) Snapshot(cycleTransferStats *CycleTransferStats, server *Server,
297297

298298
if u.Type == "offline" {
299299
// 修复离线检测逻辑:只有曾经上线过且当前离线的服务器才能触发离线通知
300-
if !server.LastActive.IsZero() && !server.IsOnline && float64(time.Now().Unix())-src > 6 {
300+
// 使用用户配置的 Duration(秒)作为离线阈值,如果未设置则默认 60 秒
301+
offlineThreshold := float64(u.Duration)
302+
if offlineThreshold <= 0 {
303+
offlineThreshold = 60 // 默认 60 秒
304+
}
305+
offlineSeconds := float64(time.Now().Unix()) - src
306+
if !server.LastActive.IsZero() && !server.IsOnline && offlineSeconds > offlineThreshold {
301307
return struct{}{}
302308
}
303309
// 从未上线的服务器或当前在线的服务器不触发离线通知

service/rpc/server.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"runtime"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/nicksnyder/go-i18n/v2/i18n"
@@ -50,7 +51,7 @@ func isConnectionError(err error) bool {
5051

5152
// GetGoroutineStats 获取 goroutine 统计信息
5253
func GetGoroutineStats() (total int, requestTasks int64) {
53-
return runtime.NumGoroutine(), activeRequestTaskGoroutines
54+
return runtime.NumGoroutine(), atomic.LoadInt64(&activeRequestTaskGoroutines)
5455
}
5556

5657
// ForceCleanupStaleConnections 强制清理僵尸连接(紧急情况使用)
@@ -224,7 +225,7 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
224225
// 检查 goroutine 数量限制,防止 goroutine 泄漏导致程序崩溃
225226
currentGoroutines := func() int64 {
226227
// 使用原子操作获取当前活跃的 RequestTask goroutine 数量
227-
current := activeRequestTaskGoroutines
228+
current := atomic.LoadInt64(&activeRequestTaskGoroutines)
228229
total := int64(runtime.NumGoroutine())
229230

230231
// 修复根本问题后,使用更合理的阈值
@@ -270,11 +271,11 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
270271
return fmt.Errorf("服务器负载过高,请稍后重试")
271272
}
272273

273-
// 增加活跃 goroutine 计数
274-
activeRequestTaskGoroutines++
274+
// 增加活跃 goroutine 计数(使用原子操作)
275+
atomic.AddInt64(&activeRequestTaskGoroutines, 1)
275276
defer func() {
276-
// 确保在函数退出时减少计数
277-
activeRequestTaskGoroutines--
277+
// 确保在函数退出时减少计数(使用原子操作)
278+
atomic.AddInt64(&activeRequestTaskGoroutines, -1)
278279
}()
279280

280281
// 使用带缓冲的通道避免阻塞
@@ -301,10 +302,9 @@ func (s *ServerHandler) RequestTask(h *pb.Host, stream pb.ServerService_RequestT
301302
singleton.ServerList[clientID].TaskCloseLock.Unlock()
302303
singleton.ServerLock.RUnlock()
303304

304-
// 创建一个带超时的上下文,确保所有goroutine都能正确退出
305-
// 修复:使用更合理的超时时间,避免正常连接被误杀
306-
ctx, cancel := context.WithTimeout(stream.Context(), 5*time.Minute)
307-
defer cancel()
305+
// 使用stream自带的context,不设置额外超时
306+
// 让gRPC的keepalive机制和客户端自己管理连接生命周期
307+
ctx := stream.Context()
308308

309309
// 根本修复:不再创建额外的监控goroutine
310310
// 所有逻辑都在主goroutine中处理,避免goroutine泄漏
@@ -1112,7 +1112,7 @@ func checkAndResetCycleTraffic(clientID uint64) {
11121112

11131113
// GetConnectionStats 获取连接统计信息
11141114
func GetConnectionStats() (int, int, error) {
1115-
activeConns := int(activeRequestTaskGoroutines)
1115+
activeConns := int(atomic.LoadInt64(&activeRequestTaskGoroutines))
11161116
maxConns := int(maxRequestTaskGoroutines)
11171117
return activeConns, maxConns, nil
11181118
}

0 commit comments

Comments
 (0)