日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
深入理解AP架構(gòu)Nacos注冊(cè)原理

1、Nacos簡(jiǎn)介

Nacos是一款阿里巴巴開(kāi)源用于管理分布式微服務(wù)的中間件,能夠幫助開(kāi)發(fā)人員快速實(shí)現(xiàn)動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、服務(wù)配置、服務(wù)元數(shù)據(jù)及流量管理等。這篇文章主要剖析一下Nacos作為注冊(cè)中心時(shí)其服務(wù)注冊(cè)與發(fā)現(xiàn)原理。

2、為什么會(huì)需要Nacos

Nacos作為注冊(cè)中心是為了更好更方便的管理應(yīng)用中的每一個(gè)服務(wù),是各個(gè)分布式節(jié)點(diǎn)之間的紐帶。其作為注冊(cè)中心主要提供以下核心功能:

  • 服務(wù)注冊(cè)與發(fā)現(xiàn):動(dòng)態(tài)的增減服務(wù)節(jié)點(diǎn),服務(wù)節(jié)點(diǎn)增減后動(dòng)態(tài)的通知服務(wù)消費(fèi)者,不需要由消費(fèi)者來(lái)更新配置。
  • 服務(wù)配置:動(dòng)態(tài)修改服務(wù)配置,并將其推送到服務(wù)提供者和服務(wù)消費(fèi)者而不需要重啟服務(wù)。
  • 健康檢查和服務(wù)摘除:主動(dòng)的檢查服務(wù)健康情況,對(duì)于宕機(jī)的服務(wù)將其摘除服務(wù)列表。

3、分布式架構(gòu)CAP理論

CAP定理是分布式系統(tǒng)中最基礎(chǔ)的原則,所以理解和掌握了CAP對(duì)系統(tǒng)架構(gòu)的設(shè)計(jì)至關(guān)重要。分布式架構(gòu)下所有系統(tǒng)不可能同時(shí)滿(mǎn)足以下三點(diǎn):Consisteny(一致性)、Availability(可用性)、Partition tolerance(分區(qū)容錯(cuò)性),CAP指明了任何分布式系統(tǒng)只能同時(shí)滿(mǎn)足這三項(xiàng)中的兩項(xiàng)。

分布式系統(tǒng)肯定都要保證其容錯(cuò)性 ,那么可用性和一致性就只能選一個(gè)了。簡(jiǎn)單來(lái)說(shuō)分布式系統(tǒng)的CAP理論就像你想買(mǎi)個(gè)新手機(jī),這個(gè)手機(jī)不可能功能強(qiáng)大、便宜、又好看的,它最多只能滿(mǎn)足兩點(diǎn)的,要么功能強(qiáng)大便宜、要么功能強(qiáng)大好看、要么便宜好看,不可能同時(shí)滿(mǎn)足三點(diǎn)。

4、幾種注冊(cè)中心的區(qū)別

注冊(cè)中心在分布式應(yīng)用中是經(jīng)常用到的,也是必不可少的,那注冊(cè)中心,又分為以下幾種:Eureka、Zookeeper、Nacos等。這些注冊(cè)中心最大的區(qū)別就是其基于AP架構(gòu)還是CP架構(gòu),簡(jiǎn)單介紹一下:

  • Zookeeper:用過(guò)或者了解過(guò)zk做注冊(cè)中心的同學(xué)都知道,Zookeeper集群下一旦leader節(jié)點(diǎn)宕機(jī)了,在短時(shí)間內(nèi)服務(wù)都不可通訊,因?yàn)樗鼈冊(cè)谝欢〞r(shí)間內(nèi)follower進(jìn)行選舉來(lái)推出新的leader,因?yàn)樵谶@段時(shí)間內(nèi),所有的服務(wù)通信將受到影響,而且leader選取時(shí)間比較長(zhǎng),需要花費(fèi)幾十秒甚至上百秒的時(shí)間,因此:可以理解為 Zookeeper是實(shí)現(xiàn)的CP,也就是將失去A(可用性)。
  • Eureka:Eureka集群下每個(gè)節(jié)點(diǎn)之間都會(huì)定時(shí)發(fā)送心跳,定時(shí)同步數(shù)據(jù),沒(méi)有master/slave之分,是一個(gè)完全去中心化的架構(gòu)。因此每個(gè)注冊(cè)到Eureka下的實(shí)例都會(huì)定時(shí)同步ip,服務(wù)之間的調(diào)用也是根據(jù)Eureka拿到的緩存服務(wù)數(shù)據(jù)進(jìn)行調(diào)用。若一臺(tái)Eureka服務(wù)宕機(jī),其他Eureka在一定時(shí)間內(nèi)未感知到這臺(tái)Eureka服務(wù)宕機(jī),各個(gè)服務(wù)之間還是可以正常調(diào)用。Eureka的集群中,只要有一臺(tái)Eureka還在,就能保證注冊(cè)服務(wù)可用(保證可用性),只不過(guò)查到的信息可能不是最新的(不保證強(qiáng)一致性)。當(dāng)數(shù)據(jù)出現(xiàn)不一致時(shí),雖然A, B上的注冊(cè)信息不完全相同,但每個(gè)Eureka節(jié)點(diǎn)依然能夠正常對(duì)外提供服務(wù),這會(huì)出現(xiàn)查詢(xún)服務(wù)信息時(shí)如果請(qǐng)求A查不到,但請(qǐng)求B就能查到。如此保證了可用性但犧牲了一致性。
  • Nacos:同時(shí)支持CP和AP架構(gòu),根據(jù)根據(jù)服務(wù)注冊(cè)選擇臨時(shí)和永久來(lái)決定走AP模式還是CP模式。如果注冊(cè)Nacos的client節(jié)點(diǎn)注冊(cè)時(shí)ephemeral=true,那么Nacos集群對(duì)這個(gè)client節(jié)點(diǎn)的效果就是AP,采用distro協(xié)議實(shí)現(xiàn);而注冊(cè)Nacos的client節(jié)點(diǎn)注冊(cè)時(shí)ephemeral=false,那么Nacos集群對(duì)這個(gè)節(jié)點(diǎn)的效果就是CP的,采用raft協(xié)議實(shí)現(xiàn)。

本篇文章主要是深入研究一下Nacos基于AP架構(gòu)微服務(wù)注冊(cè)原理,由于篇幅有限基于CP架構(gòu)的Nacos微服務(wù)注冊(cè)下次再跟你們分析。

5、Nacos服務(wù)注冊(cè)與發(fā)現(xiàn)的原理

1.微服務(wù)在啟動(dòng)將自己的服務(wù)注冊(cè)到Nacos注冊(cè)中心,同時(shí)發(fā)布http接口供其他系統(tǒng)調(diào)用,一般都是基于SpringMVC。

2.服務(wù)消費(fèi)者基于Feign調(diào)用服務(wù)提供者對(duì)外發(fā)布的接口,先對(duì)調(diào)用的本地接口加上注解@FeignClient,F(xiàn)eign會(huì)針對(duì)加了該注解的接口生成動(dòng)態(tài)代理,服務(wù)消費(fèi)者針對(duì)Feign生成的動(dòng)態(tài)代理去調(diào)用方法時(shí),會(huì)在底層生成Http協(xié)議格式的請(qǐng)求,類(lèi)似 /stock/deduct? productId=100。

3.Feign最終會(huì)調(diào)用Ribbon從本地的Nacos注冊(cè)表的緩存里根據(jù)服務(wù)名取出服務(wù)提供在機(jī)器的列表,然后進(jìn)行負(fù)載均衡并選擇一臺(tái)機(jī)器出來(lái),對(duì)選出來(lái)的機(jī)器IP和端口拼接之前生成的url請(qǐng)求,生成調(diào)用的Http接口地址。

6、Nacos核心功能點(diǎn)

服務(wù)注冊(cè):Nacos Client會(huì)通過(guò)發(fā)送REST請(qǐng)求的方式向Nacos Server注冊(cè)自己的服務(wù),提供自身的元數(shù)據(jù),比如ip地址、端口等信息。Nacos Server接收到注冊(cè)請(qǐng)求后,就會(huì)把這些元數(shù)據(jù)信息存儲(chǔ)在一個(gè)雙層的內(nèi)存Map中。

服務(wù)心跳:在服務(wù)注冊(cè)后,Nacos Client會(huì)維護(hù)一個(gè)定時(shí)心跳來(lái)持續(xù)通知Nacos Server,說(shuō)明服務(wù)一直處于可用狀態(tài),防止被剔除。默認(rèn)5s發(fā)送一次心跳。

服務(wù)健康檢查:Nacos Server會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù)用來(lái)檢查注冊(cè)服務(wù)實(shí)例的健康情況,對(duì)于超過(guò)15s沒(méi)有收到客戶(hù)端心跳的實(shí)例會(huì)將它 的healthy屬性置為false(客戶(hù)端服務(wù)發(fā)現(xiàn)時(shí)不會(huì)發(fā)現(xiàn)),如果某個(gè)實(shí)例超過(guò)30秒沒(méi)有收到心跳,直接剔除該實(shí)例(被剔除的實(shí)例如果恢復(fù) 發(fā)送心跳則會(huì)重新注冊(cè))

服務(wù)發(fā)現(xiàn):服務(wù)消費(fèi)者(Nacos Client)在調(diào)用服務(wù)提供者的服務(wù)時(shí),會(huì)發(fā)送一個(gè)REST請(qǐng)求給Nacos Server,獲取上面注冊(cè)的服務(wù)清 單,并且緩存在Nacos Client本地,同時(shí)會(huì)在Nacos Client本地開(kāi)啟一個(gè)定時(shí)任務(wù)定時(shí)拉取服務(wù)端最新的注冊(cè)表信息更新到本地緩存

服務(wù)同步:Nacos Server集群之間會(huì)互相同步服務(wù)實(shí)例,用來(lái)保證服務(wù)信息的一致性。

7、Nacos源碼分析

看Nacos源碼的不難發(fā)現(xiàn),Nacos實(shí)際上就是一個(gè)基于Spring Boot的web應(yīng)用,不管是服務(wù)注冊(cè)還是發(fā)送心跳都是通過(guò)給Nacos服務(wù)端發(fā)送http請(qǐng)求實(shí)現(xiàn)的。下載并編譯Nacos源碼就不過(guò)多贅述了,首先需要搭建一個(gè)微服務(wù)作為Nacos的客戶(hù)端。

7.1 Nacos客戶(hù)端注冊(cè)

Nacos客戶(hù)端也是個(gè)Spring Boot項(xiàng)目,當(dāng)客戶(hù)端服務(wù)啟動(dòng)時(shí)Spring Boot項(xiàng)目啟動(dòng)時(shí)自動(dòng)加載spring-cloud-starter-alibaba-nacos-discovery包的META-INF/spring.factories中包含自動(dòng)裝配的配置信息,并將文件中的類(lèi)加載成bean放入Spring容器中,我們可以先看一下spring.factories文件:

org.springframework.boot.autoconfigure.EnableAutoCnotallow=\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapCnotallow=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

找到Nacos注冊(cè)中心的自動(dòng)配置類(lèi):NacosServiceRegistryAutoConfiguration。

NacosServiceRegistryAutoConfiguration這個(gè)類(lèi)是Nacos客戶(hù)端啟動(dòng)時(shí)的一個(gè)入口類(lèi),代碼如下:

@Configuration(
proxyBeanMethods = false
)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(
value = {"spring.cloud.service-registry.auto-registration.enabled"},
matchIfMissing = true
)
@AutoConfigureAfter({AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class})
public class NacosServiceRegistryAutoConfiguration {
public NacosServiceRegistryAutoConfiguration() {
}

@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}

@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosRegistration nacosRegistration(ObjectProvider> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
return new NacosRegistration((List)registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
}

@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
}

看NacosServiceRegistryAutoConfiguration配置類(lèi)有3個(gè)@Bean注解。

  • nacosServiceRegistry()方法: 定義了NacosServiceRegistry的bean,并且為其屬性nacosDiscoveryProperties賦值,即將從配置文件中讀取到的配置信息賦值進(jìn)去待用;
  • nacosRegistration()方法主要就是定義了NacosRegistration的bean,后面會(huì)用到這個(gè)bean;
  • nacosAutoServiceRegistration:該方法比較核心它的參數(shù)中有2個(gè)就是前面定義的兩個(gè)bean,其實(shí)就是為了這個(gè)方法服務(wù)的,由NacosAutoServiceRegistration類(lèi)的構(gòu)造器傳入NacosAutoServiceRegistration類(lèi)中:NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration),后面的流程都是以這句代碼作為入口。

利用IDEA查看類(lèi)結(jié)構(gòu),如上圖所示,NacosAutoServiceRegistration繼承AbstractAutoServiceRegistration類(lèi),而AbstractAutoServiceRegistration類(lèi)又實(shí)現(xiàn)了AutoServiceRegistration和ApplicationListener接口。

ApplicationListener接口是Spring提供的事件監(jiān)聽(tīng)接口,Spring會(huì)在所有bean都初始化完成之后發(fā)布一個(gè)事件,ApplicationListener會(huì)監(jiān)聽(tīng)所發(fā)布的事件,這里的事件是Spring Boot自定義的WebServerInitializedEvent事件,主要是項(xiàng)目啟動(dòng)時(shí)就會(huì)發(fā)布WebServerInitializedEvent事件,然后被AbstractAutoServiceRegistration監(jiān)聽(tīng)到,從而就會(huì)執(zhí)行onApplicationEvent方法,在這個(gè)方法里就會(huì)進(jìn)行服務(wù)注冊(cè)。

這里AbstractAutoServiceRegistration類(lèi)實(shí)現(xiàn)了Spring監(jiān)聽(tīng)器接口ApplicationListener,并重寫(xiě)了該接口的onApplicationEvent方法。

public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}

繼續(xù)點(diǎn)下去看bind方法。

public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
this.port.compareAndSet(0, event.getWebServer().getPort());
//start方法
this.start();
}
}

看到這里發(fā)現(xiàn)了bind方法里有個(gè)非常重要的start()方法,繼續(xù)看該方法的register()就是真正的客戶(hù)端注冊(cè)方法。

public void start() {
if (!this.isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}

} else {
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
//真正的客戶(hù)端注冊(cè)方法
this.register();
if (this.shouldRegisterManagement()) {
this.registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}

}
}

跳過(guò)一些中間非關(guān)鍵性的代碼,可以直接看該注冊(cè)方法。

protected void register() {
this.serviceRegistry.register(getRegistration());
}

這里的serviceRegistry就是NacosServiceRegistryAutoConfiguration類(lèi)中第一個(gè)@Bean定義的bean,第一個(gè)@Bean就是這里的serviceRegistry對(duì)象的實(shí)現(xiàn);其中g(shù)etRegistration()獲取的就是第二個(gè)@Bean定義的NacosRegistration的實(shí)例,這兩個(gè)bean實(shí)例都是通過(guò)第3個(gè)@Bean傳進(jìn)來(lái)的,所以這里就可以把NacosServiceRegistryAutoConfiguration類(lèi)中那3個(gè)@Bean給串起來(lái)了。

protected void register() {
this.serviceRegistry.register(getRegistration());
}

不得不說(shuō),阿里巴巴開(kāi)發(fā)的中間件,其底層源碼的命名還是很規(guī)范的,register()方法從命名上來(lái)看就可以知道這是注冊(cè)的方法,事實(shí)也確實(shí)是注冊(cè)的方法,這個(gè)方法中會(huì)通過(guò)nacos-client包來(lái)調(diào)用nacos-server的服務(wù)注冊(cè)接口來(lái)實(shí)現(xiàn)服務(wù)的注冊(cè)功能。下面我看一下調(diào)用Nacos注冊(cè)接口方法:

public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
} else {
NamingService namingService = this.namingService();
String serviceId = registration.getServiceId();
String group = this.nacosDiscoveryProperties.getGroup();
//構(gòu)建客戶(hù)端參數(shù)ip,端口號(hào)等
Instance instance = this.getNacosInstanceFromRegistration(registration);

try {
//調(diào)用注冊(cè)方法
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
} catch (Exception var7) {
log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
ReflectionUtils.rethrowRuntimeException(var7);
}

}
}

//構(gòu)建客戶(hù)端注冊(cè)參數(shù)
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());
instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());
instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled());
instance.setMetadata(registration.getMetadata());
instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral());
return instance;
}

根據(jù)源碼可以知道beatReactor.addBeatInfo()方法作用在于創(chuàng)建心跳信息實(shí)現(xiàn)健康檢測(cè),Nacos 服務(wù)端必須要確保注冊(cè)的服務(wù)實(shí)例是健康的,而心跳檢測(cè)就是服務(wù)健康檢測(cè)的手段。而serverProxy.registerService()實(shí)現(xiàn)服務(wù)注冊(cè),綜上可以分析出Nacos客戶(hù)端注冊(cè)流程:

到此為止還沒(méi)有真正的實(shí)現(xiàn)服務(wù)的注冊(cè),但是至少已經(jīng)知道了Nacos客戶(hù)端的自動(dòng)注冊(cè)原理是借助了Spring Boot的自動(dòng)配置功能,在項(xiàng)目啟動(dòng)時(shí)通過(guò)自動(dòng)配置類(lèi)。NacosServiceRegistryAutoConfiguration將NacosServiceRegistry注入進(jìn)來(lái),通過(guò)Spring的事件監(jiān)聽(tīng)機(jī)制,調(diào)用該類(lèi)的注冊(cè)方法register(registration)實(shí)現(xiàn)服務(wù)的自動(dòng)注冊(cè)。

7.2 Nacos服務(wù)發(fā)現(xiàn)

7.2.1 Nacos客戶(hù)端客戶(hù)端服務(wù)發(fā)現(xiàn)

當(dāng)Nacos服務(wù)端啟動(dòng)后,會(huì)先從本地緩存的serviceInfoMap中獲取服務(wù)實(shí)例信息,獲取不到則通過(guò)NamingProxy調(diào)用Nacos服務(wù)端獲取服務(wù)實(shí)例信息,最后開(kāi)啟定時(shí)任務(wù)每秒請(qǐng)求服務(wù)端獲取實(shí)例信息列表進(jìn)而更新本地緩存serviceInfoMap,服務(wù)發(fā)現(xiàn)拉取實(shí)例信息流程圖如下:

廢話不多說(shuō),直接上服務(wù)發(fā)現(xiàn)源碼:

/**
* 客戶(hù)端服務(wù)發(fā)現(xiàn)
*
* @param serviceName name of service
* @param groupName group of service
* @param clusters list of cluster
* @param subscribe if subscribe the service
* @return
* @throws NacosException
*/
@Override
public List getAllInstances(String serviceName, String groupName, List clusters,
boolean subscribe) throws NacosException {

ServiceInfo serviceInfo;
if (subscribe) {
// 如果本地緩存不存在服務(wù)信息,則進(jìn)行訂閱
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
// 如果非訂閱模式就直接拉取服務(wù)端的注冊(cè)表
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
List list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList();
}
return list;
}
/**
* 客戶(hù)端從注冊(cè)中心拉取注冊(cè)列表
*
* @param serviceName
* @param clusters
* @return
*/
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}

//客戶(hù)端從本地緩存中拉群注冊(cè)表信息,第一次根據(jù)服務(wù)名從注冊(cè)表map中獲取,服務(wù)表信息肯定是為null
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

//如果拿到緩存map中的服務(wù)列表為null,如果是第一次根據(jù)服務(wù)名拉取注冊(cè)表信息,肯定為null
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);

serviceInfoMap.put(serviceObj.getKey(), serviceObj);

updatingMap.put(serviceName, new Object());
//第一次拉取注冊(cè)表信息為null后,然后調(diào)用Nacos服務(wù)端接口更新本地注冊(cè)表
updateServiceNow(serviceName, clusters);

updatingMap.remove(serviceName);

} else if (updatingMap.containsKey(serviceName)) {

if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}

/**
* 定時(shí)任務(wù)拉取,每隔幾秒鐘就去拉取一次,去拉取nacos注冊(cè)表,更新客戶(hù)端本地注冊(cè)列表的map
*
* 為啥這里要定時(shí)任務(wù)拉取呢?因?yàn)樯厦娴阶?cè)表map是緩存在客戶(hù)端本地的,假如有新的服務(wù)注冊(cè)到nacos
* 時(shí),這時(shí)就要更新客戶(hù)端注冊(cè)表信息,所以這里會(huì)執(zhí)行一個(gè)訂單拉取的任務(wù)
*/
scheduleUpdateIfAbsent(serviceName, clusters);

return serviceInfoMap.get(serviceObj.getKey());
}

//異步拉取任務(wù)
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}

synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
//執(zhí)行一個(gè)定時(shí)拉取任務(wù)
ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}

//定時(shí)拉取注冊(cè)表任務(wù)
public class UpdateTask implements Runnable {

long lastRefTime = Long.MAX_VALUE;

private final String clusters;

private final String serviceName;

/**
* the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
*/
private int failCount = 0;

public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}

private void incFailCount() {
int limit = 6;
if (failCount == limit) {
return;
}
failCount++;
}

private void resetFailCount() {
failCount = 0;
}

@Override
public void run() {
long delayTime = DEFAULT_DELAY;

try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

if (serviceObj == null) {
//又在繼續(xù)調(diào)用拉取nacos注冊(cè)列表方法
updateService(serviceName, clusters);
return;
}

if (serviceObj.getLastRefTime() <= lastRefTime) {
//又在繼續(xù)調(diào)用拉取nacos注冊(cè)列表方法
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}

lastRefTime = serviceObj.getLastRefTime();

if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
//最后繼續(xù)嵌套調(diào)用當(dāng)前這個(gè)任務(wù),實(shí)現(xiàn)定時(shí)拉取
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}

這里值得注意的是,Nacos客戶(hù)端拉取注冊(cè)列表方法的最后又是一個(gè)定時(shí)任務(wù)任務(wù),每隔10秒鐘就會(huì)拉取一次服務(wù)端Nacos的注冊(cè)表。為啥這里要定時(shí)任務(wù)拉取呢?因?yàn)樯厦娴阶?cè)表map是緩存在客戶(hù)端本地的,假如有新的服務(wù)注冊(cè)到Nacos時(shí),這時(shí)就要更新客戶(hù)端注冊(cè)表信息,所以這里會(huì)執(zhí)行一個(gè)拉取的任務(wù)。

private void updateServiceNow(String serviceName, String clusters) {
try {
//拉群nacos列表,更新到本地緩存map中的注冊(cè)列表
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}

/**
* Update service now.
* 拉取注冊(cè)列表
*
* @param serviceName service name
* @param clusters clusters
*/
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
//調(diào)用拉群列表接口
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

if (StringUtils.isNotEmpty(result)) {
//解析返回值服務(wù)表json
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}

/**
* Nacos客戶(hù)端查詢(xún)服務(wù)端注冊(cè)表數(shù)
*
* @param serviceName service name
* @param clusters clusters
* @param udpPort udp port
* @param healthyOnly healthy only
* @return instance list
* @throws NacosException nacos exception
*/
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {

final Map params = new HashMap(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));

//調(diào)用拉取注冊(cè)列表接口
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

7.2.2 服務(wù)端服務(wù)發(fā)現(xiàn)查詢(xún)注冊(cè)表api

上面分析了當(dāng)客戶(hù)端在其本地緩存中沒(méi)有找到注冊(cè)表信息,就會(huì)調(diào)用Nacos服務(wù)端api拉取注冊(cè)表信息,不難發(fā)現(xiàn)服務(wù)端查詢(xún)注冊(cè)表api為"/instance/list"。

/**
* Get all instance of input service.
* 客戶(hù)端獲取nacos所有注冊(cè)實(shí)例方法
*
* @param request http request
* @return list of instance
* @throws Exception any error during list
*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {

String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);

String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}

這里通過(guò)doSrvIpxt()方法獲取服務(wù)列表,根據(jù)namespaceId、serviceName獲取service實(shí)例,service實(shí)例中srvIPs獲取所有服務(wù)提供者的實(shí)例信息,遍歷組裝成json字符串并返回。

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();

// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {

pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}

if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}

checkIfDisabled(service);

List srvedIPs;

//獲取所有實(shí)例
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));

// filter ips using selector:
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}

if (CollectionUtils.isEmpty(srvedIPs)) {

if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}

if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}

result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}

Map> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());

for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}

if (isCheck) {
result.put("reachProtectThreshold", false);
}

double threshold = service.getProtectThreshold();

if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {

Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
result.put("reachProtectThreshold", true);
}

ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}

if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);

return JacksonUtils.createEmptyJsonNode();
}

ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

for (Map.Entry> entry : ipMap.entrySet()) {
List ips = entry.getValue();

if (healthyOnly && !entry.getKey()) {
continue;
}

for (Instance instance : ips) {

// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}

ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();

ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}

ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);

}
}

result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}

最后看一下獲取服務(wù)端實(shí)例方法,最后就是將臨時(shí)實(shí)例或者持久實(shí)例放在一個(gè)集合中返回給客戶(hù)端。

public List srvIPs(List clusters) {
if (CollectionUtils.isEmpty(clusters)) {
clusters = new ArrayList<>();
clusters.addAll(clusterMap.keySet());
}
return allIPs(clusters);
}

public List allIPs(List clusters) {
List result = new ArrayList<>();
for (String cluster : clusters) {
Cluster clusterObj = clusterMap.get(cluster);
if (clusterObj == null) {
continue;
}

result.addAll(clusterObj.allIPs());
}
return result;
}

public List allIPs() {
List allInstances = new ArrayList<>();
//將nacos內(nèi)存中注冊(cè)表信息返回
allInstances.addAll(persistentInstances);
allInstances.addAll(ephemeralInstances);
return allInstances;
}

總結(jié)一下Nacos客戶(hù)端服務(wù)發(fā)現(xiàn)的核心流程:

如果沒(méi)有開(kāi)啟訂閱模式,則直接通過(guò)調(diào)用/instance/list接口獲取服務(wù)實(shí)例列表信息;

如果開(kāi)啟訂閱模式,則先會(huì)從本地緩存中獲取實(shí)例信息,如果不存在,則進(jìn)行訂閱獲并獲取實(shí)例信息;在獲得最新的實(shí)例信息之后,也會(huì)執(zhí)行processServiceJson(result)方法來(lái)更新內(nèi)存和本地實(shí)例緩存,并發(fā)布變更時(shí)間。

開(kāi)啟訂閱時(shí),會(huì)開(kāi)啟定時(shí)任務(wù),定時(shí)執(zhí)行UpdateTask獲取服務(wù)器實(shí)例信息、更新本地緩存、發(fā)布事件等;

7.3 Nacos服務(wù)端注冊(cè)

服務(wù)端的注冊(cè)源碼邏輯相對(duì)客戶(hù)端的還是要復(fù)雜很多,所以這里我們先看一下Nacos服務(wù)端注冊(cè)的完整流程圖,避免一上來(lái)就看源碼被繞暈。

接下來(lái)我們就著重分析一下AP架構(gòu)Nacos服務(wù)注冊(cè)的源碼。

7.3.1 Nacos服務(wù)端注冊(cè)

Nacos服務(wù)端注冊(cè)當(dāng)然是本文的核心,那么首先我們來(lái)看一下Nacos服務(wù)端注冊(cè)源碼。從Nacos的客戶(hù)端注冊(cè)原理不難發(fā)現(xiàn),客戶(hù)端通過(guò)調(diào)用Nacos服務(wù)端提供的http接口實(shí)現(xiàn)注冊(cè),對(duì)外提供的服務(wù)接口請(qǐng)求地址為nacos/v1/ns/instance,實(shí)現(xiàn)代碼咋nacos-naming模塊下的InstanceController類(lèi)中:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {

//從請(qǐng)求參數(shù)匯總獲得namespaceId(命名空間Id)
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//從請(qǐng)求參數(shù)匯總獲得serviceName(服務(wù)名)
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);

final Instance instance = parseInstance(request);
//registerInstance注冊(cè)實(shí)例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}

客戶(hù)端就是通過(guò)調(diào)用該api實(shí)現(xiàn)Nacos的注冊(cè)的,下面可以看一下Nacos的這個(gè)注冊(cè)api是怎么實(shí)現(xiàn)的。

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {

createEmptyService(namespaceId, serviceName, instance.isEphemeral());

//前面構(gòu)建過(guò)了,這里調(diào)取肯定部不為null,從serviceMap中根據(jù)namespaceId和serviceName得到一個(gè)服務(wù)對(duì)象
Service service = getService(namespaceId, serviceName);

if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}

//調(diào)用addInstance添加服務(wù)實(shí)例
//總體流程:把需要注冊(cè)的實(shí)例放到內(nèi)存阻塞隊(duì)列中,另外會(huì)起另一個(gè)線程從內(nèi)存中取出intance實(shí)例放到Service中,即注冊(cè)成功了
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

registerInstance()干了兩件事兒,第一就是createEmptyService()方法從請(qǐng)求參數(shù)匯總獲得serviceName(服務(wù)名)和namespaceId(命名空間Id),第二就是調(diào)用registerInstance注冊(cè)實(shí)例。先看一下createEmptyService方法。

7.3.2 服務(wù)端構(gòu)建注冊(cè)表

Nacos的注冊(cè)表是多級(jí)存儲(chǔ)結(jié)構(gòu),最外層是通過(guò)namespace來(lái)實(shí)現(xiàn)環(huán)境隔離,然后是group分組,分組下就是服務(wù),一個(gè)服務(wù)有可以分為不同的集群,集群中包含多個(gè)實(shí)例。因此其注冊(cè)表結(jié)構(gòu)為一個(gè)Map,類(lèi)型是:Map>外層key是namespace_id,內(nèi)層key是group  + serviceName,Service內(nèi)部維護(hù)一個(gè)Map,結(jié)構(gòu)是:Map的key是clusterName,其值是集群信息;Cluster內(nèi)部維護(hù)一個(gè)Set集合Set ephemeralInstances和Set persistentInstances,元素是Instance類(lèi)型,代表集群中的多個(gè)實(shí)例。

createEmptyService()方法就是服務(wù)端構(gòu)建注冊(cè)表的方法,基于AP架構(gòu)的Nacos實(shí)際就是將注冊(cè)實(shí)例信息保存在內(nèi)存中。

/**
* 1、創(chuàng)建一個(gè)Serivice對(duì)象,內(nèi)部包含了一個(gè)clusterMap。
* 2、將service對(duì)象放入到SeriviceMap中,結(jié)構(gòu)為:Map>。
* 3、開(kāi)啟一個(gè)定時(shí)任務(wù)用來(lái)檢測(cè)實(shí)例的心跳是否超時(shí),每5秒執(zhí)行一次。
*
* @param namespaceId
* @param serviceName
* @param local
* @throws NacosException
*/
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
//第一次注冊(cè)進(jìn)來(lái),從注冊(cè)表里獲取命名空間,肯定是為null,所以需要構(gòu)建一個(gè)命名空間,
//設(shè)置nameSpace等信息,如果Service實(shí)例為空,則創(chuàng)建并保存到緩存中
if (service == null) {

Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();

//注冊(cè)和初始化,通過(guò)putService()方法將服務(wù)緩存到內(nèi)存
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}

createEmptyService()方法主要作用如下:

  • 創(chuàng)建一個(gè)Serivice對(duì)象,內(nèi)部包含了一個(gè)clusterMap;
  • 將service對(duì)象放入到SeriviceMap中,結(jié)構(gòu)為:Map>;
  • 開(kāi)啟一個(gè)定時(shí)任務(wù)用來(lái)檢測(cè)實(shí)例的心跳是否超時(shí),每5秒執(zhí)行一次。

createServiceIfAbsent()方法主要作用在于第一次注冊(cè)進(jìn)來(lái),從注冊(cè)表里獲取命名空間,肯定是為null,所以需要構(gòu)建一個(gè)命名空間,設(shè)置nameSpace等信息并保存到緩存中。這個(gè)方法里值得注意的是putServiceAndInit()方法,可以點(diǎn)進(jìn)來(lái)看一下這個(gè)方法:

private void putServiceAndInit(Service service) throws NacosException {
//構(gòu)建注冊(cè)表雙層map,初始化serviceMap --> Map> serviceMap
putService(service);
//初始化service,開(kāi)啟心跳檢測(cè)的線程
service.init();
//實(shí)現(xiàn)數(shù)據(jù)一致性監(jiān)聽(tīng)
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

這里我著重putService(service)方法,這里實(shí)際是將注冊(cè)的實(shí)例緩存到內(nèi)存的注冊(cè)表中

/**
* 通過(guò)putService()方法將服務(wù)緩存到內(nèi)存
*
* @param service service
*/
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
//雙檢索防止并發(fā),為了防止同一個(gè)服務(wù)多個(gè)地方同時(shí)注冊(cè)
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
//構(gòu)建NamespaceId,Serivce對(duì)象放到了ServiceMap里面了,也就是說(shuō)下次我們?cè)僬{(diào)用getService(namespaceId)的時(shí)候就可以獲取到一個(gè)Service對(duì)象了
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
//構(gòu)建 service name
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}

7.3.3 Nacos服務(wù)端心跳機(jī)制

接下來(lái)我們看一下 putServiceAndInit(Service service)方法中的,init()初始化方法是怎么保持心跳連接的。

/**
* service.init()建立心跳機(jī)制
*/
public void init() {
//客戶(hù)端心跳檢查任務(wù),每隔5s執(zhí)行一次,clientBeatCheckTask是一個(gè)線程的方法
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}

/**
* Schedule client beat check task with a delay.
*
* @param task client beat check task
*/
public static void scheduleCheck(ClientBeatCheckTask task) {
//客戶(hù)端的心跳任務(wù),這里并沒(méi)有嵌套調(diào)用,而是開(kāi)啟延遲5s的任務(wù),然后每隔5秒鐘執(zhí)行一次
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

public class ClientBeatCheckTask implements Runnable {

private Service service;

public ClientBeatCheckTask(Service service) {
this.service = service;
}

@JsonIgnore
public PushService getPushService() {
return ApplicationUtils.getBean(PushService.class);
}

@JsonIgnore
public DistroMapper getDistroMapper() {
return ApplicationUtils.getBean(DistroMapper.class);
}

public GlobalConfig getGlobalConfig() {
return ApplicationUtils.getBean(GlobalConfig.class);
}

public SwitchDomain getSwitchDomain() {
return ApplicationUtils.getBean(SwitchDomain.class);
}

public String taskKey() {
return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());
}

@Override
public void run() {
try {
/**
* nacos心跳在集群架構(gòu)下只允許在一臺(tái)機(jī)器上執(zhí)行健康檢查任務(wù)
*
* 集群中有多臺(tái)機(jī)器,本方法在于對(duì)服務(wù)名稱(chēng)做hash運(yùn)算再對(duì)機(jī)器數(shù)量取模后,那么
* 這里每次只有定位到一臺(tái)機(jī)器,其他機(jī)器都直接return了
*
* 疑問(wèn):如果一臺(tái)機(jī)器掛了會(huì)怎么辦?這里取模會(huì)不會(huì)亂掉?那這里會(huì)不會(huì)要做一致性hash?
* 在nacos集群中每臺(tái)機(jī)器之間也是存在狀態(tài)同步的,每臺(tái)機(jī)器之間都有集群節(jié)點(diǎn)同步任務(wù),詳見(jiàn)com.alibaba.nacos.naming.cluster.ServerListManager.ServerStatusReporter
*
*/
if (!getDistroMapper().responsible(service.getName())) {
return;
}

if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}

//獲取服務(wù)端所有實(shí)例
List instances = service.allIPs(true);

// first set health status of instances:
/**
* for循環(huán)對(duì)每個(gè)實(shí)例都做健康檢查
* 在這個(gè)方法里面主要是循環(huán)當(dāng)前service的每一個(gè)臨時(shí)實(shí)例 用當(dāng)前時(shí)間減去最后一次心跳時(shí)間 是否大于心跳超時(shí)時(shí)間來(lái)判斷心跳是否超時(shí),
* 如果大于這個(gè)時(shí)間會(huì)執(zhí)行instance.setHealthy(false)將實(shí)例的健康狀態(tài)改為false;但是這個(gè)定時(shí)任務(wù)不會(huì)立即執(zhí)行,會(huì)每5秒執(zhí)行一次:
*/
for (Instance instance : instances) {
//判斷心跳是否超時(shí):當(dāng)前時(shí)間 - 實(shí)例上次心跳時(shí)間 > 心跳的超時(shí)時(shí)間【默認(rèn)是15秒】?
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
//如果大于心跳默認(rèn)時(shí)間,把實(shí)例的 healthy 設(shè)置為false【服務(wù)列表一開(kāi)始不會(huì)刪掉,一開(kāi)始會(huì)變成false】
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}

if (!getGlobalConfig().isExpireInstance()) {
return;
}

// then remove obsolete instances:
for (Instance instance : instances) {

if (instance.isMarked()) {
continue;
}

//當(dāng)前時(shí)間 - 實(shí)例上一次心跳時(shí)間 > 實(shí)例的刪除時(shí)間【默認(rèn)30s】
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
//直接刪除實(shí)例
deleteIp(instance);
}
}

} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}

}

private void deleteIp(Instance instance) {

try {
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());

//調(diào)用本地服務(wù)
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); // /v/ns/instance

// delete instance asynchronously:
HttpClient.asyncHttpDelete(url, null, null, new Callback() {
@Override
public void onReceive(RestResult result) {
if (!result.ok()) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
instance.toJson(), result.getMessage(), result.getCode());
}
}

@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),
throwable);
}

@Override
public void onCancel() {

}
});

} catch (Exception e) {
Loggers.SRV_LOG
.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);
}
}
}

可以看出init方法是開(kāi)啟了一個(gè)異步線程ClientBeatCheckTask去做了個(gè)周期性發(fā)送心跳的機(jī)制,方法中客戶(hù)端心跳檢查任務(wù),開(kāi)啟延遲5s的任務(wù),然后每隔5秒鐘執(zhí)行一次。

service.init()方法主要通過(guò)定時(shí)任務(wù)不斷檢測(cè)當(dāng)前服務(wù)下所有實(shí)例最后發(fā)送心跳包的時(shí)間。在這個(gè)方法里面主要是循環(huán)當(dāng)前service的每一個(gè)臨時(shí)實(shí)例,用當(dāng)前時(shí)間減去最后一次心跳時(shí)間是否大于15s來(lái)判斷心跳是否超時(shí),如果大于這個(gè)時(shí)間會(huì)執(zhí)行instance.setHealthy(false)將實(shí)例的健康狀態(tài)改為false,但是這個(gè)定時(shí)任務(wù)不會(huì)立即執(zhí)行,會(huì)每5秒執(zhí)行一次;當(dāng)前時(shí)間 - 實(shí)例上一次心跳時(shí)間 > 實(shí)例的刪除時(shí)間【默認(rèn)30s】就會(huì)刪除實(shí)例。

那么服務(wù)實(shí)例的最后心跳包更新時(shí)間是誰(shuí)來(lái)觸發(fā)的呢?實(shí)際上前面在說(shuō)客戶(hù)端注冊(cè)時(shí)有說(shuō)到, Nacos客戶(hù)端注冊(cè)服務(wù)的同時(shí)也建立了心跳機(jī)制。

7.3.4 服務(wù)端實(shí)例注冊(cè)

上文中registerInstance注冊(cè)實(shí)例方法中還有一個(gè)最最重要的方法就是addInstance()方法,其本質(zhì)上就是把當(dāng)前注冊(cè)的服務(wù)實(shí)例保存到Service中。

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

Service service = getService(namespaceId, serviceName);

synchronized (service) {
//將需要注冊(cè)的實(shí)例全部放到Cluster,再將Cluster放在Service里
List instanceList = addIpAddresses(service, ephemeral, ips);

Instances instances = new Instances();
instances.setInstanceList(instanceList);

//看一下 consistencyService 對(duì)象初始化的地方就知道走的是哪個(gè)實(shí)現(xiàn)
consistencyService.put(key, instances);
}
}

public static String buildInstanceListKey(String namespaceId, String serviceName, boole
名稱(chēng)欄目:深入理解AP架構(gòu)Nacos注冊(cè)原理
分享網(wǎng)址:http://www.dlmjj.cn/article/coihoph.html