主页 > 游戏开发  > 

Nacos源码解读04——服务发现

Nacos源码解读04——服务发现
Nacos服务发现的方式

1.客户端获取 1.1:先是故障转移机制判断是否去本地文件中读取信息,读到则返回 1.2:再去本地服务列表读取信息(本地缓存),没读到则创建一个空的服务,然后立刻去nacos中读取更新 1.3:读到了就返回,同时开启定时更新,定时向服务端同步信息 (正常1s,异常最多60s一次) 2.服务端通过GRPC推送 建立长连接、当服务发现变更的时候往订阅了服务的客户端推送事件

SpringBoot自动注入

项目启动的时候会通过自动注入的机制将 NacosDiscoveryClientConfiguration注入 当注入NacosDiscoveryClientConfiguration的时候会将DiscoveryClient一起注入Bean DiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances和getServices方法,而且都是由NacosServiceDiscovery实现

获取实例信息

NacosDiscoveryClient

private NacosServiceDiscovery serviceDiscovery; public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { this.serviceDiscovery = nacosServiceDiscovery; } @Override public List<ServiceInstance> getInstances(String serviceId) { try { return serviceDiscovery.getInstances(serviceId); } catch (Exception e) { throw new RuntimeException( "Can not get hosts from nacos server. serviceId: " + serviceId, e); } } public List<ServiceInstance> getInstances(String serviceId) throws NacosException { String group = discoveryProperties.getGroup(); List<Instance> instances = namingService().selectInstances(serviceId, group, true); return hostToServiceInstanceList(instances, serviceId); }

NacosServiceDiscovery

public List<ServiceInstance> getInstances(String serviceId) throws NacosException { //获取分组 String group = discoveryProperties.getGroup(); //查询服务下的实例 List<Instance> instances = namingService().selectInstances(serviceId, group, true); //填充返回的实例信息数据 return hostToServiceInstanceList(instances, serviceId); } @Override public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException { return selectInstances(serviceName, groupName, healthy, true); } @Override public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException { return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe); } @Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; String clusterString = StringUtils.join(clusters, ","); // 默认是订阅的 if (subscribe) { //从缓存中获取实例信息 serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString); //如果获取不到则从服务端拉取 if (null == serviceInfo) { serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString); } } else { // 如果未订阅服务信息,则直接从服务器进行查询 serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false); } //获取服务中的实例信息 return selectInstances(serviceInfo, healthy); } 从缓存中拿数据 public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) { NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch()); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); String key = ServiceInfo.getKey(groupedServiceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } return serviceInfoMap.get(key); } 获取服务的实例信息 private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) { List<Instance> list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList<Instance>(); } Iterator<Instance> iterator = list.iterator(); while (iterator.hasNext()) { Instance instance = iterator.next(); // 保留 健康、启用、权重大于0 的实例 if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) { iterator.remove(); } } return list; } GRPC请求拉取服务实例信息 @Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters); String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters); //定时同步服务端serviceInfo serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); //获取ServiceInfo 信息 ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey); //如果没有则从服务端拿 if (null == result || !isSubscribed(serviceName, groupName, clusters)) { //GRPC请求 result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } //填充进Map中 这里可以看服务注册最后那部分代码最后也是调用serviceInfoHolder保存的 serviceInfoHolder.processServiceInfo(result); return result; } 定时同步服务端ServiceInfo public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) { String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); if (futureMap.get(serviceKey) != null) { return; } synchronized (futureMap) { if (futureMap.get(serviceKey) != null) { return; } //构建任务放到ScheduledFuture执行 ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters)); futureMap.put(serviceKey, future); } } 缓存订阅信息 public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) { //拿服务当key String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster); //构建需要缓存的订阅信息 SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster); //缓存订阅信息 synchronized (subscribes) { subscribes.put(key, redoData); } } 执行订阅 public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException { //构建订阅请求 SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true); //执行订阅 SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class); //设置已订阅 redoService.subscriberRegistered(serviceName, groupName, clusters); return response.getServiceInfo(); } 设置订阅信息已订阅 public void subscriberRegistered(String serviceName, String groupName, String cluster) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster); synchronized (subscribes) { SubscriberRedoData redoData = subscribes.get(key); // 标记订阅数据已订阅 if (null != redoData) { redoData.setRegistered(true); } } } Nacos订阅机制

Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理。该更新实例的更新实例,该更新本地缓存的更新本地缓存。 UpdateTask

public class UpdateTask implements Runnable { public void run() { long delayTime = DEFAULT_DELAY; try { //校验订阅任务是否不对 如果不对就不处理 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey( serviceKey)) { NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters); isCancel = true; return; } //从缓存中拿 Service信息 ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); //如果拿不到则去服务端拉取 if (serviceObj == null) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //然后再填充进缓存 serviceInfoHolder.processServiceInfo(serviceObj); //更新下事件 lastRefTime = serviceObj.getLastRefTime(); return; } // 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询 if (serviceObj.getLastRefTime() <= lastRefTime) { //服务过期了重新查 serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //在缓存进去 serviceInfoHolder.processServiceInfo(serviceObj); } // 刷新更新时间 lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } // 下次更新缓存时间设置,默认为6秒 // TODO multiple time can be configured. delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; // 重置失败数量为0 resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e); } finally { // 下次调度刷新时间,下次执行的时间与failCount有关 // failCount=0,则下次调度时间为6秒,最长为1分钟 // 即当无异常情况下缓存实例的刷新时间是6秒 if (!isCancel) { executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } } } } 实例变更事件处理 监听事件的注册

在NacosNamingService的subscribe方法中,通过如下方式进行了监听事件的注册:

@Override public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { if (null == listener) { return; } String clusterString = StringUtils.join(clusters, ","); changeNotifier.registerListener(groupName, serviceName, clusterString, listener); clientProxy.subscribe(serviceName, groupName, clusterString); }

这里的changeNotifier.registerListener便是进行具体的事件注册逻辑

public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); //这里用到了双重检查锁机制 if (eventListeners == null) { synchronized (lock) { eventListeners = listenerMap.get(key); if (eventListeners == null) { eventListeners = new ConcurrentHashSet<EventListener>(); listenerMap.put(key, eventListeners); } } } eventListeners.add(listener); }

可以看出,事件的注册便是将EventListener存储在InstancesChangeNotifier的listenerMap属性当中了。

这里的数据结构为Map,key为服务实例信息的拼接,value为监听事件的集合。

监听服务变更事件

因为UpdateTask 中假如没有从缓存中拿到服务信息则会通过grpc协议从服务端拉取然后会执行serviceInfoHolder.processServiceInfo方法缓存服务信息,当实例发生变化的话这个方法最终会发送一个InstancesChangeEvent 事件 所以这里会监听InstancesChangeEvent 事件进行处理

InstancesChangeNotifier

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> { private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>(); @Override public void onEvent(InstancesChangeEvent event) { String key = ServiceInfo.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters()); ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key); if (CollectionUtils.isEmpty(eventListeners)) { return; } for (final EventListener listener : eventListeners) { //[] final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event); final com.alibaba.nacos.api.naming.listener.Event namingEvent = new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts()); // 最终调度执行listener.onEvent(namingEvent),只在NacosWatch#start找到了有效的EventListener,见下文 if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) { ((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent)); } else { listener.onEvent(namingEvent); } } } } } public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean { private Map<String, EventListener> listenerMap = new ConcurrentHashMap(16); private final AtomicBoolean running = new AtomicBoolean(false); public void start() { if (this.running pareAndSet(false, true)) { EventListener eventListener = (EventListener)this.listenerMap puteIfAbsent(this.buildKey(), (event) -> { return new EventListener() { public void onEvent(Event event) { if (event instanceof NamingEvent) { List instances = ((NamingEvent)event).getInstances(); //[] Optional instanceOptional = NacosWatch.this.selectCurrentInstance(instances); // 按IP和端口选择第一个instance作为当前的instance Optional instanceOptional = instances.stream().filter((instance) -> { return this.properties.getIp().equals(instance.getIp()) && this.properties.getPort() == instance.getPort(); }).findFirst() instanceOptional.ifPresent((currentInstance) -> { //[] NacosWatch.this.resetIfNeeded(currentInstance); // 重新设置properties的metadata if (!this.properties.getMetadata().equals(instance.getMetadata())) { this.properties.setMetadata(instance.getMetadata()); } }); } } }; }); } } 获取服务信息

NacosDiscoveryClient

@Override public List<String> getServices() { try { return serviceDiscovery.getServices(); } catch (Exception e) { log.error("get service name from nacos server fail,", e); return Collections.emptyList(); } } public List<String> getServices() throws NacosException { //获取分组 String group = discoveryProperties.getGroup(); //获取服务信息 ListView<String> services = namingService().getServicesOfServer(1, Integer.MAX_VALUE, group); return services.getData(); } @Override public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException { return getServicesOfServer(pageNo, pageSize, groupName, null); } @Override public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException { return clientProxy.getServiceList(pageNo, pageSize, groupName, selector); }

GRPC拉取信息

@Override public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException { //构建请求 ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize); if (selector != null) { if (SelectorType.valueOf(selector.getType()) == SelectorType.label) { request.setSelector(JacksonUtils.toJson(selector)); } } //采用GRPC协议拉取信息 ServiceListResponse response = requestToServer(request, ServiceListResponse.class); ListView<String> result = new ListView<String>(); result.setCount(response.getCount()); result.setData(response.getServiceNames()); return result; } 服务端处理GRPC请求 接收获取服务的请求

ServiceListRequestHandler

@Override @Secured(action = ActionTypes.READ) public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException { //根据命名空间获取这个命名空间下的所有服务信息 erviceManager.getInstance().getSingletons这个方法服务注册的时候里有 Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace()); //构建返回信息 ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>()); //服务信息不等于空填充返回信息 if (!serviceSet.isEmpty()) { Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName()); // TODO select service by selector List<String> serviceNameList = ServiceUtil .pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet); result.setCount(serviceNameSet.size()); result.setServiceNames(serviceNameList); } return result; } 订阅服务请求

SubscribeServiceRequestHandler

@Override @Secured(action = ActionTypes.READ) public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException { //命名空间 String namespaceId = request.getNamespace(); //服务名称 String serviceName = request.getServiceName(); //分组名称 String groupName = request.getGroupName(); String app = request.getHeader("app", "unknown"); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); //构建服务信息 Service service = Service.newService(namespaceId, groupName, serviceName, true); //组装订阅请求 Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters()); //获取健康的实例 ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), //服务元数据信息 metadataManager.getServiceMetadata(service).orElse(null), subscriber); //是否订阅 if (request.isSubscribe()) { clientOperationService.subscribeService(service, subscriber, meta.getConnectionId()); } else { clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId()); } //构建返回数据 return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo); }

发送订阅事件 后续事件监听可参考服务事件处理的那篇文章

@Override public void subscribeService(Service service, Subscriber subscriber, String clientId) { Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service); Client client = clientManager.getClient(clientId); if (!clientIsLegal(client, clientId)) { return; } client.addServiceSubscriber(singleton, subscriber); client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId)); }

发送取消订阅事件 后续事件监听可参考服务事件处理的那篇文章

@Override public void unsubscribeService(Service service, Subscriber subscriber, String clientId) { Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service); Client client = clientManager.getClient(clientId); if (!clientIsLegal(client, clientId)) { return; } client.removeServiceSubscriber(singleton); client.setLastUpdatedTime(); NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId)); } 服务查询请求

ServiceQueryRequestHandler

@Override @Secured(action = ActionTypes.READ) public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException { //获取命名空间 String namespaceId = request.getNamespace(); //分组明 String groupName = request.getGroupName(); //服务名 String serviceName = request.getServiceName(); //创建服务信息 Service service = Service.newService(namespaceId, groupName, serviceName); //集群 String cluster = null == request.getCluster() ? "" : request.getCluster(); boolean healthyOnly = request.isHealthyOnly(); //获取服务信息 ServiceInfo result = serviceStorage.getData(service); //获取服务元数据信息 ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null); // 获取有保护机制的健康实例 result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true, meta.getClientIp()); //构建返回信息 return QueryServiceResponse.buildSuccessResponse(result); } public ServiceInfo getData(Service service) { return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service); } public Optional<ServiceMetadata> getServiceMetadata(Service service) { return Optional.ofNullable(serviceMetadataMap.get(service)); }
标签:

Nacos源码解读04——服务发现由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Nacos源码解读04——服务发现