日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
NacosClient服務(wù)訂閱之事件機(jī)制剖析

本文轉(zhuǎn)載自微信公眾號(hào)「程序新視」,作者二師兄。轉(zhuǎn)載本文請(qǐng)聯(lián)系程序新視公眾號(hào)。

學(xué)習(xí)不用那么功利,二師兄帶你從更高維度輕松閱讀源碼~

上篇文章,我們分析了Nacos客戶端訂閱的核心流程:Nacos客戶端通過一個(gè)定時(shí)任務(wù),每6秒從注冊(cè)中心獲取實(shí)例列表,當(dāng)發(fā)現(xiàn)實(shí)例發(fā)生變化時(shí),發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理,然后更新內(nèi)存中和本地的緩存中的實(shí)例。

這篇文章為服務(wù)訂閱的第二篇,我們重點(diǎn)來分析,定時(shí)任務(wù)獲取到最新實(shí)例列表之后,整個(gè)事件機(jī)制是如何處理的。

回顧整個(gè)流程

先回顧一下客戶端服務(wù)訂閱的基本流程:

在第一步調(diào)用subscribe方法時(shí),會(huì)訂閱一個(gè)EventListener事件。而在定時(shí)任務(wù)UpdateTask定時(shí)獲取實(shí)例列表之后,會(huì)調(diào)用ServiceInfoHolder#processServiceInfo方法對(duì)ServiceInfo進(jìn)行本地處理,這其中就包括和事件處理。

監(jiān)聽事件的注冊(cè)

在subscribe方法中,通過如下方式進(jìn)行了監(jiān)聽事件的注冊(cè):

 
 
 
 
  1. @Override
  2. public void subscribe(String serviceName, String groupName, List clusters, EventListener listener)
  3.         throws NacosException {
  4.     if (null == listener) {
  5.         return;
  6.     }
  7.     String clusterString = StringUtils.join(clusters, ",");
  8.     changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
  9.     clientProxy.subscribe(serviceName, groupName, clusterString);
  10. }

這里的changeNotifier.registerListener便是進(jìn)行具體的事件注冊(cè)邏輯。追進(jìn)去看一下實(shí)現(xiàn)源碼:

 
 
 
 
  1. // InstancesChangeNotifier
  2. public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
  3.     String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
  4.     ConcurrentHashSet eventListeners = listenerMap.get(key);
  5.     if (eventListeners == null) {
  6.         synchronized (lock) {
  7.             eventListeners = listenerMap.get(key);
  8.             if (eventListeners == null) {
  9.                 eventListeners = new ConcurrentHashSet();
  10.                 // 將EventListener緩存到listenerMap
  11.                 listenerMap.put(key, eventListeners);
  12.             }
  13.         }
  14.     }
  15.     eventListeners.add(listener);
  16. }

可以看出,事件的注冊(cè)便是將EventListener存儲(chǔ)在InstancesChangeNotifier的listenerMap屬性當(dāng)中了。

這里的數(shù)據(jù)結(jié)構(gòu)為Map,key為服務(wù)實(shí)例信息的拼接,value為監(jiān)聽事件的集合。

事件注冊(cè)流程就這么簡單。這里有一個(gè)雙重檢查鎖的實(shí)踐案例,不知道你留意到?jīng)]?可以學(xué)習(xí)一下。

ServiceInfo的處理

上面完成了事件的注冊(cè),現(xiàn)在就追溯一下觸發(fā)事件的來源。UpdateTask中獲取到最新實(shí)例會(huì)進(jìn)行本地化處理,部分代碼如下:

 
 
 
 
  1. // 獲取緩存的service信息
  2. ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  3. if (serviceObj == null) {
  4.     // 根據(jù)serviceName從注冊(cè)中心服務(wù)端獲取Service信息
  5.     serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
  6.     serviceInfoHolder.processServiceInfo(serviceObj);
  7.     lastRefTime = serviceObj.getLastRefTime();
  8.     return;
  9. }

這部分邏輯在上篇文章中已經(jīng)分析過了,這里重點(diǎn)看serviceInfoHolder#processServiceInfo中的業(yè)務(wù)邏輯處理。先看流程圖,然后看代碼。

上述邏輯簡單說就是:判斷一下新的ServiceInfo數(shù)據(jù)是否正確,是否發(fā)生了變化。如果數(shù)據(jù)格式正確,且發(fā)生的變化,那就發(fā)布一個(gè)InstancesChangeEvent事件,同時(shí)將ServiceInfo寫入本地緩存。

下面看一下代碼實(shí)現(xiàn):

 
 
 
 
  1. public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
  2.     String serviceKey = serviceInfo.getKey();
  3.     if (serviceKey == null) {
  4.         return null;
  5.     }
  6.     ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
  7.     if (isEmptyOrErrorPush(serviceInfo)) {
  8.         //empty or error push, just ignore
  9.         return oldService;
  10.     }
  11.     // 緩存服務(wù)信息
  12.     serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
  13.     // 判斷注冊(cè)的實(shí)例信息是否已變更
  14.     boolean changed = isChangedServiceInfo(oldService, serviceInfo);
  15.     if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
  16.         serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
  17.     }
  18.     // 通過prometheus-simpleclient監(jiān)控服務(wù)緩存Map的大小
  19.     MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
  20.     // 服務(wù)實(shí)例已變更
  21.     if (changed) {
  22.         NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
  23.                 + JacksonUtils.toJson(serviceInfo.getHosts()));
  24.         // 添加實(shí)例變更事件,會(huì)被推動(dòng)到訂閱者執(zhí)行
  25.         NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
  26.                 serviceInfo.getClusters(), serviceInfo.getHosts()));
  27.         // 記錄Service本地文件
  28.         DiskCache.write(serviceInfo, cacheDir);
  29.     }
  30.     return serviceInfo;
  31. }

可以對(duì)照流程圖和代碼中的注釋部分進(jìn)行理解這個(gè)過程。

我們要講的重點(diǎn)是服務(wù)信息變更之后,發(fā)布的InstancesChangeEvent,也就是流程圖中標(biāo)紅的部分。

事件追蹤

上面的事件是通過NotifyCenter進(jìn)行發(fā)布的,NotifyCenter中的核心流程如下:

NotifyCenter中進(jìn)行事件發(fā)布,發(fā)布的核心邏輯是:

  • 根據(jù)InstancesChangeEvent事件類型,獲得對(duì)應(yīng)的CanonicalName;
  • 將CanonicalName作為Key,從NotifyCenter#publisherMap中獲取對(duì)應(yīng)的事件發(fā)布者(EventPublisher);
  • EventPublisher將InstancesChangeEvent事件進(jìn)行發(fā)布。

NotifyCenter中的核心代碼實(shí)現(xiàn)如下:

 
 
 
 
  1. private static boolean publishEvent(final Class eventType, final Event event) {
  2.     if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
  3.         return INSTANCE.sharePublisher.publish(event);
  4.     }
  5.     // 根據(jù)InstancesChangeEvent事件類型,獲得對(duì)應(yīng)的CanonicalName;
  6.     final String topic = ClassUtils.getCanonicalName(eventType);
  7.     // 將CanonicalName作為Key,從NotifyCenter#publisherMap中獲取對(duì)應(yīng)的事件發(fā)布者(EventPublisher);
  8.     EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  9.     if (publisher != null) {
  10.         // EventPublisher將InstancesChangeEvent事件進(jìn)行發(fā)布。
  11.         return publisher.publish(event);
  12.     }
  13.     LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
  14.     return false;
  15. }

上述代碼中的INSTANCE為NotifyCenter的單例模式實(shí)現(xiàn)。那么,這里的publisherMap中key(CanonicalName)和value(EventPublisher)之間的關(guān)系是什么時(shí)候建立的呢?

這個(gè)是在NacosNamingService實(shí)例化時(shí)調(diào)用init方法中進(jìn)行綁定的:

 
 
 
 
  1. // Publisher的注冊(cè)過程在于建立InstancesChangeEvent.class與EventPublisher的關(guān)系。
  2. NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);

registerToPublisher方法默認(rèn)采用了DEFAULT_PUBLISHER_FACTORY來進(jìn)行構(gòu)建。

 
 
 
 
  1. public static EventPublisher registerToPublisher(final Class eventType, final int queueMaxSize) {
  2.     return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
  3. }

如果查看NotifyCenter中靜態(tài)代碼塊,會(huì)發(fā)現(xiàn)DEFAULT_PUBLISHER_FACTORY默認(rèn)構(gòu)建的EventPublisher為DefaultPublisher。

至此,我們得知,在NotifyCenter中它維護(hù)了事件名稱和事件發(fā)布者的關(guān)系,而默認(rèn)的事件發(fā)布者為DefaultPublisher。

DefaultPublisher的事件發(fā)布

查看DefaultPublisher的源碼,會(huì)發(fā)現(xiàn)它繼承自Thread,也就是說它是一個(gè)線程類。同時(shí),它又實(shí)現(xiàn)了EventPublisher,也就是我們前面提到的發(fā)布者。

 
 
 
 
  1. public class DefaultPublisher extends Thread implements EventPublisher {}

在DefaultPublisher的init方法實(shí)現(xiàn)如下:

 
 
 
 
  1. @Override
  2. public void init(Class type, int bufferSize) {
  3.     // 守護(hù)線程
  4.     setDaemon(true);
  5.     // 設(shè)置線程名字
  6.     setName("nacos.publisher-" + type.getName());
  7.     this.eventType = type;
  8.     this.queueMaxSize = bufferSize;
  9.     // 阻塞隊(duì)列初始化
  10.     this.queue = new ArrayBlockingQueue<>(bufferSize);
  11.     start();
  12. }

也就是說,當(dāng)DefaultPublisher被初始化時(shí),是以守護(hù)線程的方式運(yùn)作的,其中還初始化了一個(gè)阻塞隊(duì)列,隊(duì)列的默認(rèn)大小為16384。

最后調(diào)用了start方法:

 
 
 
 
  1. @Override
  2. public synchronized void start() {
  3.     if (!initialized) {
  4.         // start just called once
  5.         super.start();
  6.         if (queueMaxSize == -1) {
  7.             queueMaxSize = ringBufferSize;
  8.         }
  9.         initialized = true;
  10.     }
  11. }

start方法中調(diào)用了super.start,此時(shí)等于啟動(dòng)了線程,會(huì)執(zhí)行對(duì)應(yīng)的run方法。

run方法中只調(diào)用了如下方法:

 
 
 
 
  1. void openEventHandler() {
  2.     try {
  3.         // This variable is defined to resolve the problem which message overstock in the queue.
  4.         int waitTimes = 60;
  5.         // for死循環(huán)不斷的從隊(duì)列中取出Event,并通知訂閱者Subscriber執(zhí)行Event
  6.         // To ensure that messages are not lost, enable EventHandler when
  7.         // waiting for the first Subscriber to register
  8.         for (; ; ) {
  9.             if (shutdown || hasSubscriber() || waitTimes <= 0) {
  10.                 break;
  11.             }
  12.             ThreadUtils.sleep(1000L);
  13.             waitTimes--;
  14.         }
  15.         for (; ; ) {
  16.             if (shutdown) {
  17.                 break;
  18.             }
  19.             // // 從隊(duì)列取出Event
  20.             final Event event = queue.take();
  21.             receiveEvent(event);
  22.             UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
  23.         }
  24.     } catch (Throwable ex) {
  25.         LOGGER.error("Event listener exception : ", ex);
  26.     }
  27. }

這里寫了兩個(gè)死循環(huán),第一個(gè)死循環(huán)可以理解為延時(shí)效果,也就是說線程啟動(dòng)時(shí)最大延時(shí)60秒,在這60秒中每隔1秒判斷一下當(dāng)前線程是否關(guān)閉,是否有訂閱者,是否超過60秒。如果滿足一個(gè)條件,就可以提前跳出死循環(huán)。

而第二個(gè)死循環(huán)才是真正的業(yè)務(wù)邏輯處理,會(huì)從阻塞隊(duì)列中取出一個(gè)事件,然后通過receiveEvent方法進(jìn)行執(zhí)行。

那么,隊(duì)列中的事件哪兒來的呢?此時(shí),你可能已經(jīng)想到剛才DefaultPublisher的發(fā)布事件方法被調(diào)用了。來看看它的publish方法實(shí)現(xiàn):

 
 
 
 
  1. @Override
  2. public boolean publish(Event event) {
  3.     checkIsStart();
  4.     boolean success = this.queue.offer(event);
  5.     if (!success) {
  6.         LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
  7.         receiveEvent(event);
  8.         return true;
  9.     }
  10.     return true;
  11. }

可以看到,DefaultPublisher的publish方法的確就是往阻塞隊(duì)列中存入事件。這里有個(gè)分支邏輯,如果存入失敗,會(huì)直接調(diào)用receiveEvent,和從隊(duì)列中取出事件執(zhí)行的方法一樣??梢岳斫鉃?,如果向隊(duì)列中存入失敗,則立即執(zhí)行,不走隊(duì)列了。

最后,再來看看receiveEvent方法的實(shí)現(xiàn):

 
 
 
 
  1. void receiveEvent(Event event) {
  2.     final long currentEventSequence = event.sequence();
  3.     if (!hasSubscriber()) {
  4.         LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.");
  5.         return;
  6.     }
  7.     // 通知訂閱者執(zhí)行Event
  8.     // Notification single event listener
  9.     for (Subscriber subscriber : subscribers) {
  10.         // Whether to ignore expiration events
  11.         if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
  12.             LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
  13.                     event.getClass());
  14.             continue;
  15.         }
  16.         // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
  17.         // Remove original judge part of codes.
  18.         notifySubscriber(subscriber, event);
  19.     }
  20. }

這里最主要的邏輯就是遍歷DefaultPublisher的subscribers(訂閱者集合),然后執(zhí)行通知訂閱者的方法。

那么有朋友要問了這subscribers中的訂閱者哪里來的呢?這個(gè)還要回到NacosNamingService的init方法中:

 
 
 
 
  1. // 將Subscribe注冊(cè)到Publisher
  2. NotifyCenter.registerSubscriber(changeNotifier);

該方法最終會(huì)調(diào)用NotifyCenter的addSubscriber方法:

 
 
 
 
  1. private static void addSubscriber(final Subscriber consumer, Class subscribeType,
  2.         EventPublisherFactory factory) {
  3.     final String topic = ClassUtils.getCanonicalName(subscribeType);
  4.     synchronized (NotifyCenter.class) {
  5.         // MapUtils.computeIfAbsent is a unsafe method.
  6.         MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
  7.     }
  8.     // 獲取時(shí)間對(duì)應(yīng)的Publisher
  9.     EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  10.     if (publisher instanceof ShardedEventPublisher) {
  11.         ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
  12.     } else {
  13.         // 添加到subscribers集合
  14.         publisher.addSubscriber(consumer);
  15.     }
  16. }

其中核心邏輯就是將訂閱事件、發(fā)布者、訂閱者三者進(jìn)行綁定。而發(fā)布者與事件通過Map進(jìn)行維護(hù)、發(fā)布者與訂閱者通過關(guān)聯(lián)關(guān)系進(jìn)行維護(hù)。

發(fā)布者找到了,事件也有了,最后看一下notifySubscriber方法:

 
 
 
 
  1. @Override
  2. public void notifySubscriber(final Subscriber subscriber, final Event event) {
  3.     LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
  4.     // 執(zhí)行訂閱者Event
  5.     final Runnable job = () -> subscriber.onEvent(event);
  6.     final Executor executor = subscriber.executor();
  7.     if (executor != null) {
  8.         executor.execute(job);
  9.     } else {
  10.         try {
  11.             job.run();
  12.         } catch (Throwable e) {
  13.             LOGGER.error("Event callback exception: ", e);
  14.         }
  15.     }
  16. }

邏輯比較簡單,如果訂閱者定義了Executor,那么使用它定義的Executor進(jìn)行事件的執(zhí)行,如果沒有,那就創(chuàng)建一個(gè)線程進(jìn)行執(zhí)行。

至此,整個(gè)服務(wù)訂閱的事件機(jī)制完成。

小結(jié)

整體來看,整個(gè)服務(wù)訂閱的事件機(jī)制還是比較復(fù)雜的,因?yàn)橛玫搅耸录男问?,邏輯就比較繞,而且這期間還摻雜了守護(hù)線程,死循環(huán),阻塞隊(duì)列等。需要重點(diǎn)理解NotifyCenter對(duì)事件發(fā)布者、事件訂閱者和事件之間關(guān)系的維護(hù),而這一關(guān)系的維護(hù)的入口就位于NacosNamingService的init方法當(dāng)中。

下面再梳理一下幾個(gè)核心流程:

ServiceInfoHolder中通過NotifyCenter發(fā)布了InstancesChangeEvent事件;

NotifyCenter中進(jìn)行事件發(fā)布,發(fā)布的核心邏輯是:

  • 根據(jù)InstancesChangeEvent事件類型,獲得對(duì)應(yīng)的CanonicalName;
  • 將CanonicalName作為Key,從NotifyCenter#publisherMap中獲取對(duì)應(yīng)的事件發(fā)布者(EventPublisher);
  • EventPublisher將InstancesChangeEvent事件進(jìn)行發(fā)布。
  • InstancesChangeEvent事件發(fā)布:

通過EventPublisher的實(shí)現(xiàn)類DefaultPublisher進(jìn)行InstancesChangeEvent事件發(fā)布;

  • DefaultPublisher本身以守護(hù)線程的方式運(yùn)作,在執(zhí)行業(yè)務(wù)邏輯前,先判斷該線程是否啟動(dòng);
  • 如果啟動(dòng),則將事件添加到BlockingQueue中,隊(duì)列默認(rèn)大小為16384;
  • 添加到BlockingQueue成功,則整個(gè)發(fā)布過程完成;
  • 如果添加失敗,則直接調(diào)用DefaultPublisher#receiveEvent方法,接收事件并通知訂閱者;
  • 通知訂閱者時(shí)創(chuàng)建一個(gè)Runnable對(duì)象,執(zhí)行訂閱者的Event。
  • Event事件便是執(zhí)行訂閱時(shí)傳入的事件;

如果添加到BlockingQueue成功,則走另外一個(gè)業(yè)務(wù)邏輯:

  • DefaultPublisher初始化時(shí)會(huì)創(chuàng)建一個(gè)阻塞(BlockingQueue)隊(duì)列,并標(biāo)記線程啟動(dòng);
  • DefaultPublisher本身是一個(gè)Thread,當(dāng)執(zhí)行super.start方法時(shí),會(huì)調(diào)用它的run方法;
  • run方法的核心業(yè)務(wù)邏輯是通過openEventHandler方法處理的;
  • openEventHandler方法通過兩個(gè)for循環(huán),從阻塞隊(duì)列中獲取時(shí)間信息;
  • 第一個(gè)for循環(huán)用于讓線程啟動(dòng)時(shí)在60s內(nèi)檢查執(zhí)行條件;
  • 第二個(gè)for循環(huán)為死循環(huán),從阻塞隊(duì)列中獲取Event,并調(diào)用DefaultPublisher#receiveEvent方法,接收事件并通知訂閱者;
  • Event事件便是執(zhí)行訂閱時(shí)傳入的事件;

關(guān)于Nacos Client服務(wù)定義的事件機(jī)制就將這么多,下篇我們來講講故障轉(zhuǎn)移和緩存的實(shí)現(xiàn)。


分享名稱:NacosClient服務(wù)訂閱之事件機(jī)制剖析
本文網(wǎng)址:http://www.dlmjj.cn/article/cdephsi.html