Nacos 服务变更通知
Mr.Lee 2023-11-09 11:16:23 Spring Cloud AlibabaNacos
最近在做数据监控相关的东西...用到的东西对我来说, 都略显生疏...今天先来说说, 在Nacos中遇到的"生疏"
先说需求呀, 我需要, 实时获取在Nacos中注册的服务, 以及服务的实例和实例下的元数据
因为Issues中没描述, 元数据变更是否会通知, 也没有具体的类. 只能通过"全局搜索"的方式来查找, 文中的 subscribe
接口 .
已知: 要实现 com.alibaba.nacos.api.naming.listener.EventListener
接口, 注册到 com.alibaba.nacos.common.notify.listener.Subscriber<InstancesChangeEvent>
中.
# InstancesChangeNotifier
实例变更通知 registerListener
package com.alibaba.nacos.client.naming.event;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* A subscriber to notify eventListener callback.
*
* @author horizonzy
* @since 1.4.1
*/
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {
private final String eventScope;
private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<>();
@JustForTest
public InstancesChangeNotifier() {
this.eventScope = UUID.randomUUID().toString();
}
public InstancesChangeNotifier(String eventScope) {
this.eventScope = eventScope;
}
/**
* register listener.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
// 实现了注册方法
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.computeIfAbsent(key, keyInner -> new ConcurrentHashSet<>());
eventListeners.add(listener);
}
/**
* deregister listener.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @param listener custom listener
*/
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (eventListeners == null) {
return;
}
eventListeners.remove(listener);
if (CollectionUtils.isEmpty(eventListeners)) {
listenerMap.remove(key);
}
}
/**
* check serviceName,clusters is subscribed.
*
* @param groupName group name
* @param serviceName serviceName
* @param clusters clusters, concat by ','. such as 'xxx,yyy'
* @return is serviceName,clusters subscribed
*/
public boolean isSubscribed(String groupName, String serviceName, String clusters) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
return CollectionUtils.isNotEmpty(eventListeners);
}
public List<ServiceInfo> getSubscribeServices() {
List<ServiceInfo> serviceInfos = new ArrayList<>();
for (String key : listenerMap.keySet()) {
serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}
@Override
public void onEvent(InstancesChangeEvent event) {
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
for (final EventListener listener : eventListeners) {
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
listener.onEvent(namingEvent);
}
}
}
private com.alibaba.nacos.api.naming.listener.Event transferToNamingEvent(
InstancesChangeEvent instancesChangeEvent) {
return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),
instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());
}
@Override
public Class<? extends Event> subscribeType() {
return InstancesChangeEvent.class;
}
@Override
public boolean scopeMatches(InstancesChangeEvent event) {
return this.eventScope.equals(event.scope());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# NacosNamingService
NacosNamingService 实现了注册listener的方法
package com.alibaba.nacos.client.naming;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.api.selector.AbstractSelector;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.core.Balancer;
import com.alibaba.nacos.client.naming.event.InstancesChangeEvent;
import com.alibaba.nacos.client.naming.event.InstancesChangeNotifier;
import com.alibaba.nacos.client.naming.remote.NamingClientProxy;
import com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
/**
* Nacos Naming Service.
*
* @author nkorange
*/
@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {
// ...
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {
subscribe(serviceName, new ArrayList<>(), listener);
}
@Override
public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException {
subscribe(serviceName, groupName, new ArrayList<>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
/**
* 实现了注册listener的方法
*/
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
if (null == listener) {
return;
}
String clusterString = StringUtils.join(clusters, ",");
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
clientProxy.subscribe(serviceName, groupName, clusterString);
}
@Override
public void unsubscribe(String serviceName, EventListener listener) throws NacosException {
unsubscribe(serviceName, new ArrayList<>(), listener);
}
@Override
public void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException {
unsubscribe(serviceName, groupName, new ArrayList<>(), listener);
}
@Override
public void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {
unsubscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
String clustersString = StringUtils.join(clusters, ",");
changeNotifier.deregisterListener(groupName, serviceName, clustersString, listener);
if (!changeNotifier.isSubscribed(groupName, serviceName, clustersString)) {
clientProxy.unsubscribe(serviceName, groupName, clustersString);
}
}
// ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
接下来, 就要看怎么实例化 NacosNamingService
# NacosWatch
package com.alibaba.cloud.nacos.discovery;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.SmartLifecycle;
/**
* @author xiaojing
* @author yuhuangbin
* @author pengfei.lu
* @author ruansheng
*/
public class NacosWatch implements SmartLifecycle, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
private final Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
private final AtomicBoolean running = new AtomicBoolean(false);
private final NacosServiceManager nacosServiceManager;
private final NacosDiscoveryProperties properties;
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
this.stop();
callback.run();
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent namingEvent) {
List<Instance> instances = namingEvent.getInstances();
Optional<Instance> instanceOptional = selectCurrentInstance(
instances);
instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance);
});
}
}
});
// 在这初始化的 NamingService
NamingService namingService = nacosServiceManager.getNamingService();
try {
namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
}
}
private String buildKey() {
return String.join(":", properties.getService(), properties.getGroup());
}
private void resetIfNeeded(Instance instance) {
if (!properties.getMetadata().equals(instance.getMetadata())) {
properties.setMetadata(instance.getMetadata());
}
}
private Optional<Instance> selectCurrentInstance(List<Instance> instances) {
return instances.stream()
.filter(instance -> properties.getIp().equals(instance.getIp())
&& properties.getPort() == instance.getPort())
.findFirst();
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
EventListener eventListener = listenerMap.get(buildKey());
try {
NamingService namingService = nacosServiceManager.getNamingService();
namingService.unsubscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService unsubscribe failed, properties:{}", properties,
e);
}
}
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public int getPhase() {
return 0;
}
@Override
public void destroy() {
this.stop();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
在我的代码里, 可以拿到
NacosServiceManager
问题就可以解决了呗....NamingService namingService = nacosServiceManager.getNamingService(); try { namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (Exception e) { log.error("namingService subscribe failed, properties:{}", properties, e); }
1
2
3
4
5
6
7
于是:
// todo: temp start
private final EventListener listener = event -> {
log.info("收到服务变更事件: {}", com.alibaba.fastjson2.JSON.toJSONString(event));
};
@GetMapping("/v1/xxx")
public ResultData xxx() {
/*
{
"password": "nacos",
"endpoint": "",
"secretKey": "",
"serverAddr": "127.0.0.1:8848",
"accessKey": "",
"clusterName": "DEFAULT",
"namespace": "745e1a4c-cccf-4245-a125-801e2a3d3988",
"com.alibaba.nacos.naming.log.filename": "",
"namingLoadCacheAtStart": "false",
"username": "nacos",
"group": "operate-monitor-middleware"
}
*/
// NacosServiceManager.class;
NacosServiceManager manager = SpringContextHolder.getBean(NacosServiceManager.class);
NamingService service = manager.getNamingService();
try {
service.subscribe("omm-gateway", "operate-monitor-middleware", new ArrayList<>(), listener);
} catch (NacosException e) {
e.printStackTrace();
}
System.out.println(service);
return ResultData.build().success();
}
// todo: temp end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
参考文章:
- https://github.com/alibaba/nacos/issues/3743