日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Flume架構(gòu)與源碼分析-核心組件分析-2

4、整體流程

公司主營業(yè)務(wù):網(wǎng)站制作、成都網(wǎng)站建設(shè)、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)建站是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)建站推出沁陽免費(fèi)做網(wǎng)站回饋大家。

從以上部分我們可以看出,不管是Source還是Sink都依賴Channel,那么啟動時(shí)應(yīng)該先啟動Channel然后再啟動Source或Sink即可。

Flume有兩種啟動方式:使用EmbeddedAgent內(nèi)嵌在Java應(yīng)用中或使用Application單獨(dú)啟動一個(gè)進(jìn)程,此處我們已Application分析為主。

首先進(jìn)入org.apache.flume.node.Application的main方法啟動:

Java代碼

 
 
  1. //1、設(shè)置默認(rèn)值啟動參數(shù)、參數(shù)是否必須的    
  2. Options options = new Options();    
  3. Option option = new Option("n", "name", true, "the name of this agent");    
  4. option.setRequired(true);    
  5. options.addOption(option);    
  6.     
  7. option = new Option("f", "conf-file", true,    
  8. "specify a config file (required if -z missing)");    
  9. option.setRequired(false);    
  10. options.addOption(option);    
  11.     
  12. //2、接著解析命令行參數(shù)    
  13. CommandLineParser parser = new GnuParser();    
  14. CommandLine commandLine = parser.parse(options, args);    
  15.     
  16. String agentName = commandLine.getOptionValue('n');    
  17. boolean reload = !commandLine.hasOption("no-reload-conf");    
  18.     
  19. if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {    
  20.   isZkConfigured = true;    
  21. }    
  22.     
  23. if (isZkConfigured) {    
  24.     //3、如果是通過ZooKeeper配置,則使用ZooKeeper參數(shù)啟動,此處忽略,我們以配置文件講解    
  25. } else {    
  26.   //4、打開配置文件,如果不存在則快速失敗    
  27.   File configurationFile = new File(commandLine.getOptionValue('f'));    
  28.   if (!configurationFile.exists()) {    
  29.          throw new ParseException(    
  30.         "The specified configuration file does not exist: " + path);    
  31.   }    
  32.   List components = Lists.newArrayList();    
  33.     
  34.   if (reload) { //5、如果需要定期reload配置文件,則走如下方式    
  35.     //5.1、此處使用Guava提供的事件總線    
  36.     EventBus eventBus = new EventBus(agentName + "-event-bus");    
  37.     //5.2、讀取配置文件,使用定期輪訓(xùn)拉起策略,默認(rèn)30s拉取一次    
  38.     PollingPropertiesFileConfigurationProvider configurationProvider =    
  39.         new PollingPropertiesFileConfigurationProvider(    
  40.           agentName, configurationFile, eventBus, 30);    
  41.     components.add(configurationProvider);    
  42.     application = new Application(components); //5.3、向Application注冊組件    
  43.     //5.4、向事件總線注冊本應(yīng)用,EventBus會自動注冊Application中使用@Subscribe聲明的方法    
  44.     eventBus.register(application);    
  45.     
  46.   } else { //5、配置文件不支持定期reload    
  47.     PropertiesFileConfigurationProvider configurationProvider =    
  48.         new PropertiesFileConfigurationProvider(    
  49.           agentName, configurationFile);    
  50.     application = new Application();    
  51.     //6.2、直接使用配置文件初始化Flume組件    
  52.     application.handleConfigurationEvent(configurationProvider    
  53.       .getConfiguration());    
  54.   }    
  55. }    
  56. //7、啟動Flume應(yīng)用    
  57. application.start();    
  58.     
  59. //8、注冊虛擬機(jī)關(guān)閉鉤子,當(dāng)虛擬機(jī)關(guān)閉時(shí)調(diào)用Application的stop方法進(jìn)行終止    
  60. final Application appReference = application;    
  61. Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {    
  62.   @Override    
  63.   public void run() {    
  64.     appReference.stop();    
  65.   }    
  66. });    

以上流程只提取了核心代碼中的一部分,比如ZK的實(shí)現(xiàn)直接忽略了,而Flume啟動大體流程如下:

1、讀取命令行參數(shù);

2、讀取配置文件;

3、根據(jù)是否需要reload使用不同的策略初始化Flume;如果需要reload,則使用Guava的事件總線實(shí)現(xiàn),Application的handleConfigurationEvent是事件訂閱者,PollingPropertiesFileConfigurationProvider是事件發(fā)布者,其會定期輪訓(xùn)檢查文件是否變更,如果變更則重新讀取配置文件,發(fā)布配置文件事件變更,而handleConfigurationEvent會收到該配置變更重新進(jìn)行初始化;

4、啟動Application,并注冊虛擬機(jī)關(guān)閉鉤子。

handleConfigurationEvent方法比較簡單,首先調(diào)用了stopAllComponents停止所有組件,接著調(diào)用startAllComponents使用配置文件初始化所有組件:

Java代碼

 
 
  1. @Subscribe    
  2. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {    
  3.   stopAllComponents();    
  4.   startAllComponents(conf);    
  5. }     

MaterializedConfiguration存儲Flume運(yùn)行時(shí)需要的組件:Source、Channel、Sink、SourceRunner、SinkRunner等,其是通過ConfigurationProvider進(jìn)行初始化獲取,比如PollingPropertiesFileConfigurationProvider會讀取配置文件然后進(jìn)行組件的初始化。

對于startAllComponents實(shí)現(xiàn)大體如下:

Java代碼

 
 
  1. //1、首先啟動Channel    
  2. supervisor.supervise(Channels,    
  3.       new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    
  4. //2、確保所有Channel是否都已啟動    
  5. for(Channel ch: materializedConfiguration.getChannels().values()){    
  6.   while(ch.getLifecycleState() != LifecycleState.START    
  7.       && !supervisor.isComponentInErrorState(ch)){    
  8.     try {    
  9.       Thread.sleep(500);    
  10.     } catch (InterruptedException e) {    
  11.         Throwables.propagate(e);    
  12.     }    
  13.   }    
  14. }    
  15. //3、啟動SinkRunner    
  16. supervisor.supervise(SinkRunners,      
  17. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    
  18. //4、啟動SourceRunner    
  19. supervisor.supervise(SourceRunner,    
  20. new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    
  21. //5、初始化監(jiān)控服務(wù)    
  22. this.loadMonitoring();    

從如下代碼中可以看到,首先要準(zhǔn)備好Channel,因?yàn)镾ource和Sink會操作它,對于Channel如果初始化失敗則整個(gè)流程是失敗的;然后啟動SinkRunner,先準(zhǔn)備好消費(fèi)者;接著啟動SourceRunner開始進(jìn)行采集日志。此處我們發(fā)現(xiàn)有兩個(gè)單獨(dú)的組件LifecycleSupervisor和MonitorService,一個(gè)是組件守護(hù)哨兵,一個(gè)是監(jiān)控服務(wù)。守護(hù)哨兵對這些組件進(jìn)行守護(hù),假設(shè)出問題了默認(rèn)策略是自動重啟這些組件。

對于stopAllComponents實(shí)現(xiàn)大體如下:

Java代碼

 
 
  1. //1、首先停止SourceRunner    
  2. supervisor.unsupervise(SourceRunners);    
  3. //2、接著停止SinkRunner    
  4. supervisor.unsupervise(SinkRunners);    
  5. //3、然后停止Channel    
  6. supervisor.unsupervise(Channels);    
  7. //4、***停止MonitorService    
  8. monitorServer.stop();     

此處可以看出,停止的順序是Source、Sink、Channel,即先停止生產(chǎn),再停止消費(fèi),***停止管道。

Application中的start方法代碼實(shí)現(xiàn)如下:

Java代碼

 
 
  1. public synchronized void start() {    
  2.   for(LifecycleAware component : components) {    
  3.     supervisor.supervise(component,    
  4.         new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);    
  5.   }    
  6. }     

其循環(huán)Application注冊的組件,然后守護(hù)哨兵對它進(jìn)行守護(hù),默認(rèn)策略是出現(xiàn)問題會自動重啟組件,假設(shè)我們支持reload配置文件,則之前啟動Application時(shí)注冊過PollingPropertiesFileConfigurationProvider組件,即該組件會被守護(hù)哨兵守護(hù)著,出現(xiàn)問題默認(rèn)策略自動重啟。

而Application關(guān)閉執(zhí)行了如下動作:

Java代碼

 
 
  1. public synchronized void stop() {    
  2.   supervisor.stop();    
  3.   if(monitorServer != null) {    
  4.     monitorServer.stop();    
  5.   }    
  6. }     
  7.   

即關(guān)閉守護(hù)哨兵和監(jiān)控服務(wù)。

到此基本的Application分析結(jié)束了,我們還有很多疑問,守護(hù)哨兵怎么實(shí)現(xiàn)的。

整體流程可以總結(jié)為:

1、首先初始化命令行配置;

2、接著讀取配置文件;

3、根據(jù)是否需要reload初始化配置文件中的組件;如果需要reload會使用Guava事件總線進(jìn)行發(fā)布訂閱變化;

4、接著創(chuàng)建Application,創(chuàng)建守護(hù)哨兵,并先停止所有組件,接著啟動所有組件;啟動順序:Channel、SinkRunner、SourceRunner,并把這些組件注冊給守護(hù)哨兵、初始化監(jiān)控服務(wù);停止順序:SourceRunner、SinkRunner、Channel;

5、如果配置文件需要定期reload,則需要注冊Polling***ConfigurationProvider到守護(hù)哨兵;

6、***注冊虛擬機(jī)關(guān)閉鉤子,停止守護(hù)哨兵和監(jiān)控服務(wù)。

輪訓(xùn)實(shí)現(xiàn)的SourceRunner 和SinkRunner會創(chuàng)建一個(gè)線程進(jìn)行工作,之前已經(jīng)介紹了其工作方式。接下來我們看下守護(hù)哨兵的實(shí)現(xiàn)。

首先創(chuàng)建LifecycleSupervisor:

Java代碼

 
 
  1. //1、用于存放被守護(hù)的組件    
  2. supervisedProcesses = new HashMap();    
  3. //2、用于存放正在被監(jiān)控的組件    
  4. monitorFutures = new HashMap>();    
  5. //3、創(chuàng)建監(jiān)控服務(wù)線程池    
  6. monitorService = new ScheduledThreadPoolExecutor(10,    
  7.     new ThreadFactoryBuilder().setNameFormat(    
  8.         "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")    
  9.         .build());    
  10. monitorService.setMaximumPoolSize(20);    
  11. monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);    
  12. //4、定期清理被取消的組件    
  13. purger = new Purger();    
  14. //4.1、默認(rèn)不進(jìn)行清理    
  15. needToPurge = false;     

LifecycleSupervisor啟動時(shí)會進(jìn)行如下操作:

Java代碼

 
 
  1. public synchronized void start() {    
  2.   monitorService.scheduleWithFixedDelay(purger, 2, 2, TimeUnit.HOURS);    
  3.   lifecycleState = LifecycleState.START;    
  4. }     

首先每隔兩個(gè)小時(shí)執(zhí)行清理組件,然后改變狀態(tài)為啟動。而LifecycleSupervisor停止時(shí)直接停止了監(jiān)控服務(wù),然后更新守護(hù)組件狀態(tài)為STOP:

Java代碼

 
 
  1. //1、首先停止守護(hù)監(jiān)控服務(wù)    
  2. if (monitorService != null) {    
  3.   monitorService.shutdown();    
  4.   try {    
  5.     monitorService.awaitTermination(10, TimeUnit.SECONDS);    
  6.   } catch (InterruptedException e) {    
  7.     logger.error("Interrupted while waiting for monitor service to stop");    
  8.   }    
  9. }    
  10. //2、更新所有守護(hù)組件狀態(tài)為STOP,并調(diào)用組件的stop方法進(jìn)行停止    
  11. for (final Entry entry : supervisedProcesses.entrySet()) {    
  12.   if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {    
  13.     entry.getValue().status.desiredState = LifecycleState.STOP;    
  14.     entry.getKey().stop();    
  15.   }    
  16. }    
  17.  
  18. //3、更新本組件狀態(tài)    
  19. if (lifecycleState.equals(LifecycleState.START)) {    
  20.   lifecycleState = LifecycleState.STOP;    
  21. }     
  22. //4、***的清理    
  23.  
  24. supervisedProcesses.clear();     
  25. monitorFutures.clear();     

接下來就是調(diào)用supervise進(jìn)行組件守護(hù)了:

Java代碼

 
 
  1.  if(this.monitorService.isShutdown() || this.monitorService.isTerminated()    
  2.   || this.monitorService.isTerminating()){    
  3.     //1、如果哨兵已停止則拋出異常,不再接收任何組件進(jìn)行守護(hù)    
  4.   }    
  5.   //2、初始化守護(hù)組件    
  6.   Supervisoree process = new Supervisoree();    
  7.   process.status = new Status();    
  8.   //2.1、默認(rèn)策略是失敗重啟    
  9.   process.policy = policy;    
  10.   //2.2、初始化組件默認(rèn)狀態(tài),大多數(shù)組件默認(rèn)為START    
  11.   process.status.desiredState = desiredState;    
  12.   process.status.error = false;    
  13.   //3、組件監(jiān)控器,用于定時(shí)獲取組件的***狀態(tài),或者重新啟動組件    
  14.   MonitorRunnable monitorRunnable = new MonitorRunnable();    
  15.   monitorRunnable.lifecycleAware = lifecycleAware;    
  16.   monitorRunnable.supervisoree = process;    
  17.   monitorRunnable.monitorService = monitorService;    
  18.     
  19.   supervisedProcesses.put(lifecycleAware, process);    
  20.   //4、定期的去執(zhí)行組件監(jiān)控器,獲取組件***狀態(tài),或者重新啟動組件    
  21.   ScheduledFuture future = monitorService.scheduleWithFixedDelay(    
  22.       monitorRunnable, 0, 3, TimeUnit.SECONDS);    
  23.   monitorFutures.put(lifecycleAware, future);    
  24. }    

如果不需要守護(hù)了,則需要調(diào)用unsupervise:

Java代碼

 
 
  1. public synchronized void unsupervise(LifecycleAware lifecycleAware) {    
  2.   synchronized (lifecycleAware) {    
  3.     Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);    
  4.     //1.1、設(shè)置守護(hù)組件的狀態(tài)為被丟棄    
  5.     supervisoree.status.discard = true;    
  6.     //1.2、設(shè)置組件盼望的***生命周期狀態(tài)為STOP    
  7.     this.setDesiredState(lifecycleAware, LifecycleState.STOP);    
  8.     //1.3、停止組件    
  9.     lifecycleAware.stop();    
  10.   }    
  11.   //2、從守護(hù)組件中移除    
  12.   supervisedProcesses.remove(lifecycleAware);    
  13.   //3、取消定時(shí)監(jiān)控組件服務(wù)    
  14.   monitorFutures.get(lifecycleAware).cancel(false);    
  15.   //3.1、通知Purger需要進(jìn)行清理,Purger會定期的移除cancel的組件    
  16.   needToPurge = true;    
  17.   monitorFutures.remove(lifecycleAware);    
  18. }    

接下來我們再看下MonitorRunnable的實(shí)現(xiàn),其負(fù)責(zé)進(jìn)行組件狀態(tài)遷移或組件故障恢復(fù):

Java代碼

 
 
  1. public synchronized void unsupervise(LifecycleAware lifecycleAware) {    
  2.   synchronized (lifecycleAware) {    
  3.     Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware);    
  4.     //1.1、設(shè)置守護(hù)組件的狀態(tài)為被丟棄    
  5.     supervisoree.status.discard = true;    
  6.     //1.2、設(shè)置組件盼望的***生命周期狀態(tài)為STOP    
  7.     this.setDesiredState(lifecycleAware, LifecycleState.STOP);    
  8.     //1.3、停止組件    
  9.     lifecycleAware.stop();    
  10.   }    
  11.   //2、從守護(hù)組件中移除    
  12.   supervisedProcesses.remove(lifecycleAware);    
  13.   //3、取消定時(shí)監(jiān)控組件服務(wù)    
  14.   monitorFutures.get(lifecycleAware).cancel(false);    
  15.   //3.1、通知Purger需要進(jìn)行清理,Purger會定期的移除cancel的組件    
  16.   needToPurge = true;    
  17.   monitorFutures.remove(lifecycleAware);    
  18. }    
  19. 接下來我們再看下MonitorRunnable的實(shí)現(xiàn),其負(fù)責(zé)進(jìn)行組件狀態(tài)遷移或組件故障恢復(fù):  
  20. Java代碼    
  21. public void run() {    
  22.   long now = System.currentTimeMillis();    
  23.   try {    
  24.     if (supervisoree.status.firstSeen == null) {    
  25.         supervisoree.status.firstSeen = now; //1、記錄***次狀態(tài)查看時(shí)間    
  26.     }    
  27.     supervisoree.status.lastSeen = now; //2、記錄***一次狀態(tài)查看時(shí)間    
  28.     synchronized (lifecycleAware) {    
  29.         //3、如果守護(hù)組件被丟棄或出錯(cuò)了,則直接返回    
  30.         if (supervisoree.status.discard || supervisoree.status.error) {    
  31.           return;    
  32.         }    
  33.         //4、更新***一次查看到的狀態(tài)    
  34.         supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();    
  35.         //5、如果組件的狀態(tài)和守護(hù)組件看到的狀態(tài)不一致,則以守護(hù)組件的狀態(tài)為準(zhǔn),然后進(jìn)行初始化    
  36.         if (!lifecycleAware.getLifecycleState().equals(    
  37.             supervisoree.status.desiredState)) {    
  38.           switch (supervisoree.status.desiredState) {     
  39.             case START: //6、如果是啟動狀態(tài),則啟動組件    
  40.              try {    
  41.                 lifecycleAware.start();    
  42.               } catch (Throwable e) {    
  43.                 if (e instanceof Error) {    
  44.                   supervisoree.status.desiredState = LifecycleState.STOP;    
  45.                   try {    
  46.                     lifecycleAware.stop();    
  47.                   } catch (Throwable e1) {    
  48.                     supervisoree.status.error = true;    
  49.                     if (e1 instanceof Error) {    
  50.                       throw (Error) e1;    
  51.                     }    
  52.                   }    
  53.                 }    
  54.                 supervisoree.status.failures++;    
  55.               }    
  56.               break;    
  57.             case STOP: //7、如果是停止?fàn)顟B(tài),則停止組件    
  58.               try {    
  59.                 lifecycleAware.stop();    
  60.               } catch (Throwable e) {    
  61.                 if (e instanceof Error) {    
  62.                   throw (Error) e;    
  63.                 }    
  64.                 supervisoree.status.failures++;    
  65.               }    
  66.               break;    
  67.             default:    
  68.           }    
  69.     } catch(Throwable t) {    
  70.     }    
  71.   }    
  72. }    

如上代碼進(jìn)行了一些簡化,整體邏輯即定時(shí)去采集組件的狀態(tài),如果發(fā)現(xiàn)守護(hù)組件和組件的狀態(tài)不一致,則可能需要進(jìn)行啟動或停止。即守護(hù)監(jiān)視器可以用來保證組件如能失敗后自動啟動。默認(rèn)策略是總是失敗后重啟,還有一種策略是只啟動一次。

【本文是專欄作者張開濤的原創(chuàng)文章,作者微信公眾號:開濤的博客,id:kaitao-1234567】


新聞名稱:Flume架構(gòu)與源碼分析-核心組件分析-2
文章路徑:http://www.dlmjj.cn/article/dpojgho.html