Skip to content
Merged
69 changes: 50 additions & 19 deletions config_center/file/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,44 @@ type CacheListener struct {
rootPath string
}

type listenerSet struct {
mu sync.RWMutex
listeners map[config_center.ConfigurationListener]struct{}
}

func newListenerSet(listener config_center.ConfigurationListener) *listenerSet {
return &listenerSet{
listeners: map[config_center.ConfigurationListener]struct{}{
listener: {},
},
}
}

func (ls *listenerSet) Add(listener config_center.ConfigurationListener) {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.listeners[listener] = struct{}{}
}

func (ls *listenerSet) Remove(listener config_center.ConfigurationListener) (empty bool) {
ls.mu.Lock()
defer ls.mu.Unlock()
delete(ls.listeners, listener)
return len(ls.listeners) == 0
}

// Snapshot returns a read-only listener snapshot for safe iteration.
// Callers must treat both the returned slice and its listeners as immutable.
func (ls *listenerSet) Snapshot() []config_center.ConfigurationListener {
ls.mu.RLock()
defer ls.mu.RUnlock()
listeners := make([]config_center.ConfigurationListener, 0, len(ls.listeners))
for l := range ls.listeners {
listeners = append(listeners, l)
}
return listeners
}

// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
cl := &CacheListener{rootPath: rootPath}
Expand All @@ -65,7 +103,7 @@ func NewCacheListener(rootPath string) *CacheListener {
if event.Op&fsnotify.Remove == fsnotify.Remove {
cl.contentCache.Delete(key)
if l, ok := cl.keyListeners.Load(key); ok {
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel)
removeCallback(l.(*listenerSet).Snapshot(), key, remoting.EventTypeDel)
}
}
if event.Op&fsnotify.Write == fsnotify.Write {
Expand All @@ -77,7 +115,7 @@ func NewCacheListener(rootPath string) *CacheListener {
}
cl.contentCache.Store(key, content)
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, content,
dataChangeCallback(l.(*listenerSet).Snapshot(), key, content,
remoting.EventTypeUpdate)
}
}
Expand All @@ -90,7 +128,7 @@ func NewCacheListener(rootPath string) *CacheListener {
}
cl.contentCache.Store(key, content)
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, content,
dataChangeCallback(l.(*listenerSet).Snapshot(), key, content,
remoting.EventTypeAdd)
}
}
Expand All @@ -113,22 +151,22 @@ func NewCacheListener(rootPath string) *CacheListener {
return cl
}

func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
func removeCallback(listeners []config_center.ConfigurationListener, key string, event remoting.EventType) {
if len(listeners) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
for l := range lmap {
for _, l := range listeners {
callback(l, key, "", event)
}
}

func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key, content string, event remoting.EventType) {
if len(lmap) == 0 {
func dataChangeCallback(listeners []config_center.ConfigurationListener, key, content string, event remoting.EventType) {
if len(listeners) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
for l := range lmap {
for _, l := range listeners {
callback(l, key, content, event)
}
}
Expand All @@ -151,12 +189,9 @@ func (cl *CacheListener) Close() error {
func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
Comment thread
AlexStocks marked this conversation as resolved.
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{
listener: {},
})
listeners, loaded := cl.keyListeners.LoadOrStore(key, newListenerSet(listener))
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
cl.keyListeners.Store(key, listeners)
listeners.(*listenerSet).Add(listener)
return
}
if err := cl.watch.Add(key); err != nil {
Expand All @@ -170,17 +205,13 @@ func (cl *CacheListener) RemoveListener(key string, listener config_center.Confi
if !loaded {
return
}
lmap := listeners.(map[config_center.ConfigurationListener]struct{})
delete(lmap, listener)
if len(lmap) == 0 {
if listeners.(*listenerSet).Remove(listener) {
cl.keyListeners.Delete(key)
cl.contentCache.Delete(key)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
return
}
cl.keyListeners.Store(key, lmap)
}

func getFileContent(path string) string {
Expand Down
69 changes: 65 additions & 4 deletions config_center/file/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestCacheListenerCallbacks(t *testing.T) {
t.Fatalf("write file error %v", err)
}
waitEvent(t, rec.ch, remoting.EventTypeUpdate)
drainEvents(rec.ch)

// remove listener then cleanup
cl.RemoveListener(filePath, rec)
Expand All @@ -66,14 +67,40 @@ func TestCacheListenerCallbacks(t *testing.T) {
if err := os.Remove(filePath); err != nil {
t.Fatalf("remove file error %v", err)
}
assertNoEvent(t, rec.ch, 200*time.Millisecond)
}

func TestDrainEvents(t *testing.T) {
ch := make(chan *config_center.ConfigChangeEvent, 4)
ch <- &config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeAdd}
ch <- &config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeUpdate}

drainEvents(ch)

select {
case <-rec.ch:
// should not receive after removal
t.Fatalf("unexpected event after remove")
case <-time.After(200 * time.Millisecond):
case ev := <-ch:
t.Fatalf("expected channel drained, got %v", ev.ConfigType)
default:
}
}

func TestAssertNoEvent(t *testing.T) {
t.Run("allow stale event before grace window", func(t *testing.T) {
ch := make(chan *config_center.ConfigChangeEvent, 2)
ch <- &config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeUpdate}
assertNoEvent(t, ch, 120*time.Millisecond)
})

t.Run("return when no event arrives", func(t *testing.T) {
ch := make(chan *config_center.ConfigChangeEvent, 1)
start := time.Now()
assertNoEvent(t, ch, 120*time.Millisecond)
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("assertNoEvent returned too early, elapsed=%v", elapsed)
}
})
}

func waitEvent(t *testing.T, ch <-chan *config_center.ConfigChangeEvent, expect remoting.EventType) {
t.Helper()
select {
Expand All @@ -85,3 +112,37 @@ func waitEvent(t *testing.T, ch <-chan *config_center.ConfigChangeEvent, expect
t.Fatalf("timeout waiting for event %v", expect)
}
}

func drainEvents(ch <-chan *config_center.ConfigChangeEvent) {
for {
select {
case <-ch:
default:
return
}
}
}

func assertNoEvent(t *testing.T, ch <-chan *config_center.ConfigChangeEvent, duration time.Duration) {
t.Helper()

grace := time.NewTimer(50 * time.Millisecond)
defer grace.Stop()

timer := time.NewTimer(duration)
defer timer.Stop()
allowStale := true
for {
select {
case ev := <-ch:
if allowStale {
continue
}
t.Fatalf("unexpected event after remove: %v", ev.ConfigType)
case <-grace.C:
allowStale = false
case <-timer.C:
return
}
}
}
28 changes: 23 additions & 5 deletions metrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package metrics

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
Expand All @@ -43,19 +47,26 @@ type ApplicationMetricLevel struct {
HostName string
}

var applicationName string
var applicationVersion string
var (
appInfoLock sync.RWMutex
applicationName string
applicationVersion string
)

Comment thread
AlexStocks marked this conversation as resolved.
// cannot import rootConfig,may cause cycle import,so be it
func InitAppInfo(appName string, appVersion string) {
appInfoLock.Lock()
defer appInfoLock.Unlock()
// Update name/version in one critical section to avoid mixed snapshots.
applicationName = appName
applicationVersion = appVersion
}

func GetApplicationLevel() *ApplicationMetricLevel {
appName, appVersion := getAppInfo()
return &ApplicationMetricLevel{
ApplicationName: applicationName,
Version: applicationVersion,
ApplicationName: appName,
Version: appVersion,
Ip: common.GetLocalIp(),
HostName: common.GetLocalHostName(),
GitCommitId: "",
Expand Down Expand Up @@ -113,8 +124,9 @@ type ConfigCenterLevel struct {
}

func NewConfigCenterLevel(key string, group string, configCenter string, changeType string) *ConfigCenterLevel {
appName, _ := getAppInfo()
return &ConfigCenterLevel{
ApplicationName: applicationName,
ApplicationName: appName,
Ip: common.GetLocalIp(),
HostName: common.GetLocalHostName(),
Key: key,
Expand All @@ -124,6 +136,12 @@ func NewConfigCenterLevel(key string, group string, configCenter string, changeT
}
}

func getAppInfo() (name string, version string) {
appInfoLock.RLock()
defer appInfoLock.RUnlock()
return applicationName, applicationVersion
}

func (l ConfigCenterLevel) Tags() map[string]string {
tags := make(map[string]string)
tags[constant.TagApplicationName] = l.ApplicationName
Expand Down
19 changes: 19 additions & 0 deletions metrics/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metrics

import (
"os"
"sync"
"testing"
)

Expand Down Expand Up @@ -102,3 +103,21 @@ func TestConfigCenterLevelTags(t *testing.T) {
assert.NotEmpty(t, tags[constant.TagIp])
assert.NotEmpty(t, tags[constant.TagHostname])
}

func TestAppInfoConcurrentAccess(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 400; i++ {
wg.Add(2)
go func() {
defer wg.Done()
InitAppInfo("test-app", "1.0.0")
}()
go func() {
defer wg.Done()
name, version := getAppInfo()
_ = name
_ = version
}()
}
wg.Wait()
}
19 changes: 16 additions & 3 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ type overrideSubscribeListener struct {
url *common.URL
originInvoker base.Invoker
protocol *registryProtocol
configLock sync.RWMutex
configurator config_center.Configurator
}

Expand All @@ -349,7 +350,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker base.Invoker
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
if isMatched(event.Service, nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(event.Service)
nl.setConfigurator(extension.GetDefaultConfigurator(event.Service))
nl.doOverrideIfNecessary()
}
}
Expand All @@ -370,8 +371,8 @@ func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
if exporter, ok := nl.protocol.bounds.Load(key); ok {
currentUrl := exporter.(base.Exporter).GetInvoker().GetURL()
// Compatible with the 2.6.x
if nl.configurator != nil {
nl.configurator.Configure(providerUrl)
if configurator := nl.getConfigurator(); configurator != nil {
configurator.Configure(providerUrl)
}
// provider application level management in 2.7.x
for _, v := range nl.protocol.providerConfigurationListener.Configurators() {
Expand All @@ -393,6 +394,18 @@ func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
}
}

func (nl *overrideSubscribeListener) setConfigurator(configurator config_center.Configurator) {
nl.configLock.Lock()
defer nl.configLock.Unlock()
nl.configurator = configurator
}

func (nl *overrideSubscribeListener) getConfigurator() config_center.Configurator {
nl.configLock.RLock()
defer nl.configLock.RUnlock()
return nl.configurator
}

func isMatched(providerUrl *common.URL, consumerUrl *common.URL) bool {
// Compatible with the 2.6.x
if len(providerUrl.GetParam(constant.CategoryKey, "")) == 0 &&
Expand Down
Loading
Loading