-
Notifications
You must be signed in to change notification settings - Fork 1k
fix: close #3247 nacos/directory lifecycle race risks #3270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9cd31ff
99fc172
86e191d
bd9badc
beda059
2a7102a
860efc6
a041ff6
3314048
a121054
1a9c618
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,9 +70,11 @@ type RegistryDirectory struct { | |
| consumerURL *common.URL | ||
| cacheOriginUrl *common.URL | ||
| configurators []config_center.Configurator | ||
| configuratorsLock sync.RWMutex | ||
| consumerConfigurationListener *consumerConfigurationListener | ||
| referenceConfigurationListener *referenceConfigurationListener | ||
| registerLock sync.Mutex // this lock if for register | ||
| subscribedUrlLock sync.RWMutex | ||
| SubscribedUrl *common.URL | ||
| RegisteredUrl *common.URL | ||
| closingTombstones *sync.Map // map[string]closingTombstone | ||
|
|
@@ -182,14 +184,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (director | |
| // subscribe from registry | ||
| func (dir *RegistryDirectory) Subscribe(url *common.URL) error { | ||
| logger.Infof("Start subscribing for service :%s with a new go routine.", url.Key()) | ||
|
|
||
| go func() { | ||
| dir.SubscribedUrl = url | ||
| if err := dir.registry.Subscribe(url, dir); err != nil { | ||
| logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err) | ||
| } | ||
|
|
||
| }() | ||
| dir.setSubscribedURL(url) | ||
|
|
||
| // Get the timeout time from the registration center configuration (default time 5s) | ||
| registerUrl := dir.registry.GetURL() | ||
|
|
@@ -208,26 +203,58 @@ func (dir *RegistryDirectory) Subscribe(url *common.URL) error { | |
| timeout, _ = time.ParseDuration(constant.DefaultRegTimeout) | ||
| } | ||
|
|
||
| done := make(chan struct{}) | ||
|
|
||
| serviceKey := url.Key() | ||
| go func() { | ||
| urlToReg := getConsumerUrlToRegistry(url) | ||
| err := dir.registry.Register(urlToReg) | ||
| if err != nil { | ||
| logger.Errorf("consumer service %v register registry %v error, error message is %v", | ||
| url.String(), dir.registry.GetURL().String(), err) | ||
| if err := dir.registry.Subscribe(url, dir); err != nil { | ||
| logger.Error("registry.Subscribe(url:%v, dir:%v) = error:%v", url, dir, err) | ||
| } | ||
| }() | ||
|
|
||
| // Registration is bounded by registry timeout (default 5s), but subscription | ||
| // stays decoupled from registration so discovery can continue even on register error. | ||
| if err := dir.registerConsumerWithTimeout(url, timeout, serviceKey); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| close(done) | ||
| logger.Infof("register completed successfully for service: %s", serviceKey) | ||
| return nil | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) registerConsumerWithTimeout(url *common.URL, timeout time.Duration, serviceKey string) error { | ||
| registerErrCh := make(chan error, 1) | ||
| urlToReg := getConsumerUrlToRegistry(url.Clone()) | ||
| go func() { | ||
| registerErrCh <- dir.registry.Register(urlToReg) | ||
| }() | ||
|
|
||
| timer := time.NewTimer(timeout) | ||
| defer timer.Stop() | ||
|
|
||
| select { | ||
| case <-done: | ||
| logger.Infof("register completed successfully for service: %s", url.Key()) | ||
| case err := <-registerErrCh: | ||
| if err != nil { | ||
| registryURL := dir.registry.GetURL() | ||
| registryURLString := "" | ||
| if registryURL != nil { | ||
| registryURLString = registryURL.String() | ||
| } | ||
| logger.Errorf("consumer service %v register registry %v error, error message is %v", | ||
| url.String(), registryURLString, err) | ||
| return err | ||
| } | ||
| return nil | ||
| case <-time.After(timeout): | ||
| logger.Errorf("register timed out for service: %s", url.Key()) | ||
| return fmt.Errorf("register timed out for service: %s", url.Key()) | ||
| case <-timer.C: | ||
| logger.Errorf("register timed out for service: %s", serviceKey) | ||
| go func() { | ||
| err := <-registerErrCh | ||
| if err != nil { | ||
| return | ||
| } | ||
| if unRegErr := dir.registry.UnRegister(urlToReg.Clone()); unRegErr != nil { | ||
| logger.Warnf("register timed out for service %s, but late unregister failed: %v", serviceKey, unRegErr) | ||
| } | ||
| }() | ||
| return fmt.Errorf("register timed out for service: %s", serviceKey) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -397,7 +424,7 @@ func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL | |
| ret := res.Service | ||
| if ret.Protocol == constant.OverrideProtocol || // 1.for override url in 2.6.x | ||
| ret.GetParam(constant.CategoryKey, constant.DefaultCategory) == constant.ConfiguratorsCategory { | ||
| dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret)) | ||
| dir.appendConfigurator(extension.GetDefaultConfigurator(ret)) | ||
| ret = nil | ||
| } else if ret.Protocol == constant.RouterProtocol || // 2.for router | ||
| ret.GetParam(constant.CategoryKey, constant.DefaultCategory) == constant.RouterCategory { | ||
|
|
@@ -630,9 +657,7 @@ func (dir *RegistryDirectory) List(invocation protocolbase.Invocation) []protoco | |
| routerChain := dir.RouterChain() | ||
|
|
||
| if routerChain == nil { | ||
| dir.invokersLock.RLock() | ||
| defer dir.invokersLock.RUnlock() | ||
| return dir.cacheInvokers | ||
| return dir.snapshotCacheInvokers() | ||
| } | ||
| return routerChain.Route(dir.consumerURL, invocation) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里虽然把
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
@@ -643,7 +668,7 @@ func (dir *RegistryDirectory) IsAvailable() bool { | |
| return false | ||
| } | ||
|
|
||
| for _, ivk := range dir.cacheInvokers { | ||
| for _, ivk := range dir.snapshotCacheInvokers() { | ||
| if ivk.IsAvailable() { | ||
| return true | ||
| } | ||
|
|
@@ -657,22 +682,23 @@ func (dir *RegistryDirectory) Destroy() { | |
| // TODO:unregister & unsubscribe | ||
| dir.DoDestroy(func() { | ||
| graceful_shutdown.DefaultClosingDirectoryRegistry().Unregister(dir.closingServiceKey(), dir) | ||
| if dir.RegisteredUrl != nil { | ||
| err := dir.registry.UnRegister(dir.RegisteredUrl) | ||
| registeredURL, subscribedURL := dir.snapshotRegistryURLs() | ||
|
|
||
| if registeredURL != nil { | ||
| err := dir.registry.UnRegister(registeredURL) | ||
| if err != nil { | ||
| logger.Warnf("Unregister consumer url failed, %s, error: %v", dir.RegisteredUrl.String(), err) | ||
| logger.Warnf("Unregister consumer url failed, %s, error: %v", registeredURL.String(), err) | ||
| } | ||
| } | ||
|
|
||
| if dir.SubscribedUrl != nil { | ||
| err := dir.registry.UnSubscribe(dir.SubscribedUrl, dir) | ||
| if subscribedURL != nil { | ||
| err := dir.registry.UnSubscribe(subscribedURL, dir) | ||
| if err != nil { | ||
| logger.Warnf("Unsubscribe consumer url failed, %s, error: %v", dir.RegisteredUrl.String(), err) | ||
| logger.Warnf("Unsubscribe consumer url failed, %s, error: %v", subscribedURL.String(), err) | ||
| } | ||
| } | ||
|
|
||
| invokers := dir.cacheInvokers | ||
| dir.cacheInvokers = []protocolbase.Invoker{} | ||
| invokers := dir.swapCacheInvokers() | ||
| for _, ivk := range invokers { | ||
| ivk.Destroy() | ||
| } | ||
|
|
@@ -692,11 +718,55 @@ func (dir *RegistryDirectory) closingServiceKey() string { | |
| } | ||
|
|
||
| func (dir *RegistryDirectory) overrideUrl(targetUrl *common.URL) { | ||
| doOverrideUrl(dir.configurators, targetUrl) | ||
| // Use a read-only snapshot to avoid sharing mutable configurator slice during overrides. | ||
| doOverrideUrl(dir.snapshotConfigurators(), targetUrl) | ||
| doOverrideUrl(dir.consumerConfigurationListener.Configurators(), targetUrl) | ||
| doOverrideUrl(dir.referenceConfigurationListener.Configurators(), targetUrl) | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) snapshotCacheInvokers() []protocolbase.Invoker { | ||
| dir.invokersLock.RLock() | ||
| defer dir.invokersLock.RUnlock() | ||
| invokers := make([]protocolbase.Invoker, len(dir.cacheInvokers)) | ||
| copy(invokers, dir.cacheInvokers) | ||
| return invokers | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) swapCacheInvokers() []protocolbase.Invoker { | ||
| dir.invokersLock.Lock() | ||
| defer dir.invokersLock.Unlock() | ||
| invokers := make([]protocolbase.Invoker, len(dir.cacheInvokers)) | ||
| copy(invokers, dir.cacheInvokers) | ||
| dir.cacheInvokers = []protocolbase.Invoker{} | ||
| return invokers | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) appendConfigurator(configurator config_center.Configurator) { | ||
| dir.configuratorsLock.Lock() | ||
| defer dir.configuratorsLock.Unlock() | ||
| dir.configurators = append(dir.configurators, configurator) | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) snapshotConfigurators() []config_center.Configurator { | ||
| dir.configuratorsLock.RLock() | ||
| defer dir.configuratorsLock.RUnlock() | ||
| configurators := make([]config_center.Configurator, len(dir.configurators)) | ||
| copy(configurators, dir.configurators) | ||
| return configurators | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) setSubscribedURL(url *common.URL) { | ||
| dir.subscribedUrlLock.Lock() | ||
| defer dir.subscribedUrlLock.Unlock() | ||
| dir.SubscribedUrl = url | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) snapshotRegistryURLs() (registeredURL *common.URL, subscribedURL *common.URL) { | ||
| dir.subscribedUrlLock.RLock() | ||
| defer dir.subscribedUrlLock.RUnlock() | ||
| return dir.RegisteredUrl, dir.SubscribedUrl | ||
| } | ||
|
|
||
| func (dir *RegistryDirectory) getConsumerUrl(c *common.URL) *common.URL { | ||
| processID := fmt.Sprintf("%d", os.Getpid()) | ||
| localIP := common.GetLocalIp() | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的超时只是让调用方提前返回,后台
Registergoroutine 还会继续跑。如果它在超时之后成功,consumer 实际上已经注册了,但Subscribe不会启动,后续也没有补偿UnRegister,状态会变成“返回失败但部分初始化成功”。建议把取消能力下推到registry.Register,或者在超时后处理 late success 并做回滚。Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感谢 ,已修改成 registerConsumerWithTimeout() 超时返回后,如果 Register 迟到成功,则执行 UnRegister 回滚

逻辑详见图片