Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ type RouterChain struct {

// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
func (c *RouterChain) Route(url *common.URL, invocation base.Invocation) []base.Invoker {
c.mutex.RLock()
invokers := c.invokers
c.mutex.RUnlock()

invokers := c.snapshotInvokers()
finalInvokers := make([]base.Invoker, 0, len(invokers))
// multiple invoker may include different methods, find correct invoker otherwise
// will return the invoker without methods
Expand Down Expand Up @@ -111,6 +108,15 @@ func (c *RouterChain) copyRouters() []router.PriorityRouter {
return ret
}

// snapshotInvokers returns a copy of current invokers under lock.
func (c *RouterChain) snapshotInvokers() []base.Invoker {
c.mutex.RLock()
defer c.mutex.RUnlock()
ret := make([]base.Invoker, len(c.invokers))
copy(ret, c.invokers)
return ret
}

// injectStaticRouters injects static router configurations into the router chain.
// Called after all routers are created to ensure they exist.
// The injected static configs act as bootstrap state only during initialization. For the shared
Expand Down
140 changes: 105 additions & 35 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ type RegistryDirectory struct {
consumerURL *common.URL
cacheOriginUrl *common.URL
configurators []config_center.Configurator
configuratorsLock sync.RWMutex
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
registerLock sync.Mutex // this lock if for register
subscribedUrlLock sync.RWMutex
SubscribedUrl *common.URL
RegisteredUrl *common.URL
closingTombstones *sync.Map // map[string]closingTombstone
Expand Down Expand Up @@ -182,14 +184,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director
// subscribe from registry
func (dir *RegistryDirectory) Subscribe(url *common.URL) error {
logger.Infof("Start subscribing for service :%s with a new go routine.", url.Key())

go func() {
dir.SubscribedUrl = url
if err := dir.registry.Subscribe(url, dir); err != nil {
logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
}

}()
dir.setSubscribedURL(url)

// Get the timeout time from the registration center configuration (default time 5s)
registerUrl := dir.registry.GetURL()
Expand All @@ -208,26 +203,58 @@ func (dir *RegistryDirectory) Subscribe(url *common.URL) error {
timeout, _ = time.ParseDuration(constant.DefaultRegTimeout)
}

done := make(chan struct{})

serviceKey := url.Key()
go func() {
urlToReg := getConsumerUrlToRegistry(url)
err := dir.registry.Register(urlToReg)
if err != nil {
logger.Errorf("consumer service %v register registry %v error, error message is %v",
url.String(), dir.registry.GetURL().String(), err)
if err := dir.registry.Subscribe(url, dir); err != nil {
logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err)
}
}()

// Registration is bounded by registry timeout (default 5s), but subscription
// stays decoupled from registration so discovery can continue even on register error.
if err := dir.registerConsumerWithTimeout(url, timeout, serviceKey); err != nil {
return err
}

close(done)
logger.Infof("register completed successfully for service: %s", serviceKey)
return nil
}

func (dir *RegistryDirectory) registerConsumerWithTimeout(url *common.URL, timeout time.Duration, serviceKey string) error {
registerErrCh := make(chan error, 1)
urlToReg := getConsumerUrlToRegistry(url.Clone())
go func() {
registerErrCh <- dir.registry.Register(urlToReg)
}()

timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case <-done:
logger.Infof("register completed successfully for service: %s", url.Key())
case err := <-registerErrCh:
if err != nil {
registryURL := dir.registry.GetURL()
registryURLString := ""
if registryURL != nil {
registryURLString = registryURL.String()
}
logger.Errorf("consumer service %v register registry %v error, error message is %v",
url.String(), registryURLString, err)
return err
}
return nil
case <-time.After(timeout):
logger.Errorf("register timed out for service: %s", url.Key())
return fmt.Errorf("register timed out for service: %s", url.Key())
case <-timer.C:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的超时只是让调用方提前返回,后台 Register goroutine 还会继续跑。如果它在超时之后成功,consumer 实际上已经注册了,但 Subscribe 不会启动,后续也没有补偿 UnRegister,状态会变成“返回失败但部分初始化成功”。建议把取消能力下推到 registry.Register,或者在超时后处理 late success 并做回滚。

Copy link
Copy Markdown
Contributor Author

@xxs588 xxs588 Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感谢 ,已修改成 registerConsumerWithTimeout() 超时返回后,如果 Register 迟到成功,则执行 UnRegister 回滚
逻辑详见图片
图片

图片 如果`Subscribe`超时返回错误了但是后面goroutine可能没那么快结束,所以再起一个goroutine等待结果,如果`Register`失败了就不用补偿直接return ,如果成功了说明出现 调用方收到超时失败,但实际上已经注册成功的状态,然后就再补偿 回滚把迟到成功的注册注销掉,如果回滚失败也补上日志方便定位

logger.Errorf("register timed out for service: %s", serviceKey)
go func() {
err := <-registerErrCh
if err != nil {
return
}
if unRegErr := dir.registry.UnRegister(urlToReg.Clone()); unRegErr != nil {
logger.Warnf("register timed out for service %s, but late unregister failed: %v", serviceKey, unRegErr)
}
}()
return fmt.Errorf("register timed out for service: %s", serviceKey)
}
}

Expand Down Expand Up @@ -397,7 +424,7 @@ func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL
ret := res.Service
if ret.Protocol == constant.OverrideProtocol || // 1.for override url in 2.6.x
ret.GetParam(constant.CategoryKey, constant.DefaultCategory) == constant.ConfiguratorsCategory {
dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
dir.appendConfigurator(extension.GetDefaultConfigurator(ret))
ret = nil
} else if ret.Protocol == constant.RouterProtocol || // 2.for router
ret.GetParam(constant.CategoryKey, constant.DefaultCategory) == constant.RouterCategory {
Expand Down Expand Up @@ -630,9 +657,7 @@ func (dir *RegistryDirectory) List(invocation protocolbase.Invocation) []protoco
routerChain := dir.RouterChain()

if routerChain == nil {
dir.invokersLock.RLock()
defer dir.invokersLock.RUnlock()
return dir.cacheInvokers
return dir.snapshotCacheInvokers()
}
return routerChain.Route(dir.consumerURL, invocation)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里虽然把 routerChain == nil 的分支换成了快照,但 routerChain.Route() 仍然会无锁读 RouterChain.invokers,同时 setNewInvokers() 又会在另一个 goroutine 里通过 SetInvokers() 写这份切片。当前 HEAD 直接跑 go test -race ./registry/directory ./registry/nacos 仍会在 Test_List 报 race。建议在 RouterChain 内部先做带锁 snapshot 再路由,只修 cacheInvokers 读取还不够。

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感谢建议,已修复,Route() 不再直接读 c.invokers,改为先走 snapshotInvokers() 拿快照再路由,对cluster/router全包进行了测试,通过
这是修复的地方
图片

}
Expand All @@ -643,7 +668,7 @@ func (dir *RegistryDirectory) IsAvailable() bool {
return false
}

for _, ivk := range dir.cacheInvokers {
for _, ivk := range dir.snapshotCacheInvokers() {
if ivk.IsAvailable() {
return true
}
Expand All @@ -657,22 +682,23 @@ func (dir *RegistryDirectory) Destroy() {
// TODO:unregister & unsubscribe
dir.DoDestroy(func() {
graceful_shutdown.DefaultClosingDirectoryRegistry().Unregister(dir.closingServiceKey(), dir)
if dir.RegisteredUrl != nil {
err := dir.registry.UnRegister(dir.RegisteredUrl)
registeredURL, subscribedURL := dir.snapshotRegistryURLs()

if registeredURL != nil {
err := dir.registry.UnRegister(registeredURL)
if err != nil {
logger.Warnf("Unregister consumer url failed, %s, error: %v", dir.RegisteredUrl.String(), err)
logger.Warnf("Unregister consumer url failed, %s, error: %v", registeredURL.String(), err)
}
}

if dir.SubscribedUrl != nil {
err := dir.registry.UnSubscribe(dir.SubscribedUrl, dir)
if subscribedURL != nil {
err := dir.registry.UnSubscribe(subscribedURL, dir)
if err != nil {
logger.Warnf("Unsubscribe consumer url failed, %s, error: %v", dir.RegisteredUrl.String(), err)
logger.Warnf("Unsubscribe consumer url failed, %s, error: %v", subscribedURL.String(), err)
}
}

invokers := dir.cacheInvokers
dir.cacheInvokers = []protocolbase.Invoker{}
invokers := dir.swapCacheInvokers()
for _, ivk := range invokers {
ivk.Destroy()
}
Expand All @@ -692,11 +718,55 @@ func (dir *RegistryDirectory) closingServiceKey() string {
}

func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) {
doOverrideUrl(dir.configurators, targetUrl)
// Use a read-only snapshot to avoid sharing mutable configurator slice during overrides.
doOverrideUrl(dir.snapshotConfigurators(), targetUrl)
doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl)
doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl)
}

func (dir *RegistryDirectory) snapshotCacheInvokers() []protocolbase.Invoker {
dir.invokersLock.RLock()
defer dir.invokersLock.RUnlock()
invokers := make([]protocolbase.Invoker, len(dir.cacheInvokers))
copy(invokers, dir.cacheInvokers)
return invokers
}

func (dir *RegistryDirectory) swapCacheInvokers() []protocolbase.Invoker {
dir.invokersLock.Lock()
defer dir.invokersLock.Unlock()
invokers := make([]protocolbase.Invoker, len(dir.cacheInvokers))
copy(invokers, dir.cacheInvokers)
dir.cacheInvokers = []protocolbase.Invoker{}
return invokers
}

func (dir *RegistryDirectory) appendConfigurator(configurator config_center.Configurator) {
dir.configuratorsLock.Lock()
defer dir.configuratorsLock.Unlock()
dir.configurators = append(dir.configurators, configurator)
}

func (dir *RegistryDirectory) snapshotConfigurators() []config_center.Configurator {
dir.configuratorsLock.RLock()
defer dir.configuratorsLock.RUnlock()
configurators := make([]config_center.Configurator, len(dir.configurators))
copy(configurators, dir.configurators)
return configurators
}

func (dir *RegistryDirectory) setSubscribedURL(url *common.URL) {
dir.subscribedUrlLock.Lock()
defer dir.subscribedUrlLock.Unlock()
dir.SubscribedUrl = url
}

func (dir *RegistryDirectory) snapshotRegistryURLs() (registeredURL *common.URL, subscribedURL *common.URL) {
dir.subscribedUrlLock.RLock()
defer dir.subscribedUrlLock.RUnlock()
return dir.RegisteredUrl, dir.SubscribedUrl
}

func (dir *RegistryDirectory) getConsumerUrl(c *common.URL) *common.URL {
processID := fmt.Sprintf("%d", os.Getpid())
localIP := common.GetLocalIp()
Expand Down
Loading
Loading