日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
消息隊(duì)列線程池模型如何保證重啟時(shí)消息不丟

本文轉(zhuǎn)載自微信公眾號(hào)「咖啡拿鐵」,作者咖啡拿鐵 。轉(zhuǎn)載本文請(qǐng)聯(lián)系咖啡拿鐵公眾號(hào)。

創(chuàng)新互聯(lián)專注于興隆臺(tái)網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供興隆臺(tái)營(yíng)銷型網(wǎng)站建設(shè),興隆臺(tái)網(wǎng)站制作、興隆臺(tái)網(wǎng)頁(yè)設(shè)計(jì)、興隆臺(tái)網(wǎng)站官網(wǎng)定制、微信小程序開發(fā)服務(wù),打造興隆臺(tái)網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供興隆臺(tái)網(wǎng)站排名全網(wǎng)營(yíng)銷落地服務(wù)。

背景

今天在脈脈上面看到了一個(gè)帖子,比較有意思:

這個(gè)帖子的意思是:在使用Kafka的時(shí)候,我們已經(jīng)設(shè)置了多個(gè)分區(qū),如何去提升消費(fèi)能力?如果使用線程池的方式去提升如何保證重啟時(shí)消息不丟。

這個(gè)題其實(shí)問(wèn)了兩個(gè)點(diǎn),第一個(gè)是如何提升消費(fèi)能力,第二個(gè)是如果選擇線程池,我們?nèi)绾巫龅较⒉粊G。

這里先解釋一下這兩個(gè)問(wèn)題到底是怎么回事,在很多消息隊(duì)列中都有一個(gè)概念叫partion,代表著分區(qū),分區(qū)是我們提高消息隊(duì)列消費(fèi)的關(guān)鍵,我們的消費(fèi)者消費(fèi)的渠道就是從每個(gè)分區(qū)中來(lái)的,一個(gè)分區(qū)只能被一個(gè)消費(fèi)者持有,如下圖所示:

有點(diǎn)類似銀行排隊(duì),隊(duì)列的個(gè)數(shù)越多,排隊(duì)的時(shí)間相對(duì)來(lái)說(shuō)就會(huì)越少,當(dāng)然也可以通過(guò)異步的方式去處理,比如線程池,把所有的消息都扔到線程池中去執(zhí)行,這就引出了作者說(shuō)的第二個(gè)問(wèn)題,首先我們來(lái)看看同步消費(fèi)為什么不會(huì)丟消息呢?

如果我們使用的是同步模型,當(dāng)我們消費(fèi)了之后會(huì)將offset ack回去,如果我們出現(xiàn)了重啟,沒(méi)有成功offset,那么這部分?jǐn)?shù)據(jù)將會(huì)再次消費(fèi),如果是用線程池進(jìn)行消費(fèi),那么我們?nèi)绾芜M(jìn)行ack呢,比如我們用線程池消費(fèi)了 10,11,12 三條消息如果12先消費(fèi)完,那么我們ack 13嗎?如果這樣做的話,這個(gè)時(shí)候重啟,kafka就會(huì)認(rèn)為你已經(jīng)處理了10,11的消息,這個(gè)時(shí)候消息就會(huì)出現(xiàn)丟失,而發(fā)這個(gè)帖子的同學(xué)就是對(duì)于這一塊是比較疑惑。

網(wǎng)友的回答

我們來(lái)看看網(wǎng)友的一些回答:

網(wǎng)友A:

這名網(wǎng)友的回答本質(zhì)還是使用線程池,作者也回復(fù)了,并沒(méi)有解決線程池的問(wèn)題。

網(wǎng)友B:

這個(gè)方法類似銀行排隊(duì),只要隊(duì)列多,那么處理速度就會(huì)加快,的確是第一個(gè)問(wèn)題的解決辦法之一。

網(wǎng)友C:

這一類主要解決了第二個(gè)問(wèn)題,通過(guò)外部維護(hù)offset,比如通過(guò)offset入庫(kù)的方式,我們就能找到正確的應(yīng)該消費(fèi)的offset,這個(gè)相對(duì)來(lái)說(shuō)比較復(fù)雜,使用一個(gè)MQ還得配套一個(gè)數(shù)據(jù)庫(kù),萬(wàn)一我使用MQ的服務(wù)根本都沒(méi)有數(shù)據(jù)庫(kù),還得單獨(dú)去申請(qǐng)。

網(wǎng)友D:

還有另外一種觀點(diǎn)就是,代碼寫好一點(diǎn),讓消費(fèi)的速度提高,那消費(fèi)能力自然就上去了,這個(gè)的確是一個(gè)很重要的點(diǎn),通常被其他人給忽略,有時(shí)候消費(fèi)比較慢,很多人可能一上來(lái)就是考慮中間件應(yīng)該怎么設(shè)置,往往會(huì)忽略自己的代碼。

看了這么多帖子的一個(gè)回復(fù),感覺(jué)沒(méi)有真正能讓我滿意的答案,下面來(lái)說(shuō)說(shuō)我心中的一些思路。

我的想法

對(duì)于第一個(gè)問(wèn)題的話,如何提升消費(fèi)能力?這個(gè)問(wèn)題其實(shí)可以總結(jié)為三個(gè)辦法:

  1. 如果每臺(tái)消費(fèi)者機(jī)器消費(fèi)線程是固定的,那么我們可以擴(kuò)容消費(fèi)機(jī)器和partion,類似銀行排隊(duì)增加排隊(duì)窗口一樣。
  2. 如果機(jī)器和partion是固定的,增加消費(fèi)線程就是一個(gè)比較好的辦法,但是如果是順序消費(fèi),就不能通過(guò)增加線程數(shù)的方式來(lái)提升消費(fèi)能力,因?yàn)轫樞蛳M(fèi)每個(gè)partion都是一個(gè)單獨(dú)的線程,只能通過(guò)第一種方式去解決。
  3. 增加自身代碼的消費(fèi)能力,你想想如果銀行辦事,如果柜員的辦事效率能提升的非常高,那么整個(gè)排隊(duì)速度肯定也是很快的。

對(duì)于第二個(gè)問(wèn)題,如果我們使用線程池模型,如何去解決消息丟失問(wèn)題,這里我比較推薦的是RocketMQ中的做法,我們之前說(shuō)了用數(shù)據(jù)庫(kù)去保存offset比較復(fù)雜,性能還比較差,在RocketMQ中使用了一個(gè)TreeMap的結(jié)構(gòu)做了我們上面提到的數(shù)據(jù)庫(kù)的事:

 
 
 
 
  1. private final TreeMap msgTreeMap = new TreeMap(); 

這個(gè)TreeMap的key是每個(gè)message的offset,value就是這條消息的一些信息,TreeMap的底層是使用紅黑樹去實(shí)現(xiàn)的,我們可以很快獲取其中的最小值和最大值,當(dāng)我們每次處理完某一條消息的時(shí)候我們會(huì)將這條消息從msgTreeMap中移除,

 
 
 
 
  1. public long removeMessage(final List msgs) { 
  2.         long result = -1; 
  3.         final long now = System.currentTimeMillis(); 
  4.         try { 
  5.             this.lockTreeMap.writeLock().lockInterruptibly(); 
  6.             this.lastConsumeTimestamp = now; 
  7.             try { 
  8.                 if (!msgTreeMap.isEmpty()) { 
  9.                     result = this.queueOffsetMax + 1; 
  10.                     int removedCnt = 0; 
  11.                     for (MessageExt msg : msgs) { 
  12.                         MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); 
  13.                         if (prev != null) { 
  14.                             removedCnt--; 
  15.                             msgSize.addAndGet(0 - msg.getBody().length); 
  16.                         } 
  17.                     } 
  18.                     msgCount.addAndGet(removedCnt); 
  19.  
  20.                     if (!msgTreeMap.isEmpty()) { 
  21.                         result = msgTreeMap.firstKey(); 
  22.                     } 
  23.                 } 
  24.             } finally { 
  25.                 this.lockTreeMap.writeLock().unlock(); 
  26.             } 
  27.         } catch (Throwable t) { 
  28.             log.error("removeMessage exception", t); 
  29.         } 
  30.         return result; 
  31.     } 

removeMessage這個(gè)方法就是移除已經(jīng)消費(fèi)過(guò)的消息,并且返回當(dāng)前最新的消費(fèi)offset,這里返回的結(jié)果就是msgTreeMap.firstKey(),我們ack給消息隊(duì)列server的值其實(shí)也是這個(gè),回到我們這個(gè)問(wèn)題上,如果我們發(fā)生重啟,那么其實(shí)也不需要擔(dān)心我們會(huì)出現(xiàn)消息丟失。


網(wǎng)頁(yè)題目:消息隊(duì)列線程池模型如何保證重啟時(shí)消息不丟
標(biāo)題網(wǎng)址:http://www.dlmjj.cn/article/djsoecp.html