新聞中心
?1、問題現(xiàn)象
某一天,項目組一個同事向我反饋,他們使用公司的數(shù)據(jù)同步產(chǎn)品將MySQL數(shù)據(jù)同步到MQ集群,然后使用消費者將數(shù)據(jù)再同步到ES,反饋數(shù)據(jù)同步延遲嚴(yán)重,但對應(yīng)的消費組確沒有積壓,但最近最近幾分鐘的數(shù)據(jù)都沒有同步過來。

十余年的臨澧網(wǎng)站建設(shè)經(jīng)驗,針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。營銷型網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整臨澧建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗。創(chuàng)新互聯(lián)從事“臨澧網(wǎng)站設(shè)計”,“臨澧網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實執(zhí)行。
那問題來了,消費端沒有消費積壓,而且通過查看數(shù)據(jù)同步平臺該通過任務(wù)的同步狀態(tài),同樣顯示沒有積壓,那是為什么呢?
遇到這個問題,我們應(yīng)該冷靜下來,分析一下其大概的數(shù)據(jù)流向圖,梳理后如下圖所示:
通過初步的診斷,從數(shù)據(jù)同步產(chǎn)品查看Binlog同步無延遲、MQ消費無積壓,那為什么最終Es集群中的數(shù)據(jù)與MySQL有高達(dá)幾分鐘的延遲呢?
2、問題排查
根據(jù)上圖幾個關(guān)鍵組件數(shù)據(jù)同步延遲的檢測,基本就排除了數(shù)據(jù)同步組件、MQ消費端本身消費的問題,問題的癥結(jié)應(yīng)該就是數(shù)據(jù)同步組件成功將數(shù)據(jù)寫入到MQ集群,并且MQ集群返回了寫入成功,但消費端并沒有及時感知這個消息,也就是說消息雖然寫入到MQ集群,但并沒有達(dá)到消費隊列。
因為如果數(shù)據(jù)同步組件如果沒有寫入成功,則MySQL Binlog日志就會出現(xiàn)延遲。但如果是MQ消費端的問題,則MQ平臺也會顯示消費組積壓。
那為什么消息服務(wù)器寫入成功,但消費組為什么感知不到呢?
首先為了驗證上述結(jié)論是否正確,我還特意去看了一下主題的詳細(xì)信息:
查看主題的統(tǒng)計信息時發(fā)現(xiàn)當(dāng)前系統(tǒng)的時間為19:01分, 但主題最新的寫入時間才是18:50,兩者之間相差將近10分鐘。
備注:上述界面是我們公司內(nèi)部的消息運營管理平臺,其實底層是調(diào)用了RocketMQ提供的topicStatus命令。
那這又是怎么造成的呢?
在這里我假設(shè)大家對RocketMQ底層的實現(xiàn)原理還不是特別熟悉,在這樣的情況下,我覺得我們應(yīng)該首先摸清楚topicStatus這個命令返回的minOffset、maxOffset以及l(fā)astUpdate這些是的具體獲取邏輯,只有了解了這些,我們才能尋根究底,最終找到解決辦法。
2.1 問題探究與原理剖析
在這個場景中,我們可以通過對topicStatus命令進(jìn)行解析,從而探究其背后的實現(xiàn)原理。
當(dāng)我們在命令行中輸入 sh ./mqadmin topicStatus命令時,最終是調(diào)用defaultMQAdminExtImpl的examineTopicStats方法,最終在服務(wù)端的處理邏輯定義在AdminBrokerProcessor的getTopicStatsInfo方法中,核心代碼如下:
這里的實現(xiàn)要點:
- 通過MessageStore的getMinOffsetInQueue獲取最小偏移量。
- 通過MessageStore的getMaxOffsetInQueue獲取最大偏移量。
- 最新更新時間為最大偏移量減去一(表示最新一條消息)的存儲時間
故要弄清隊列最大、最小偏移量,關(guān)鍵是要看懂getMaxOffsetInQueue或者getMinOffsetInQueue的計算邏輯。
我也注意到分析源碼雖然能直抵真相,但閱讀起來太粗糙,所以我接下來的文章會盡量避免通篇的源碼解讀,取而代之的是只點出源碼的入口處,其旁支細(xì)節(jié)將通過時序圖獲流程圖,方便感興趣的讀者朋友去探究,我重點進(jìn)行知識點的提煉,降低大家的學(xué)習(xí)成本。
如果大家想成體系的研究RocketMQ,想將消息中間件當(dāng)成自己職業(yè)的閃光點,強(qiáng)烈建議購買我的兩本關(guān)于RocketMQ的數(shù)據(jù):《RocketMQ技術(shù)內(nèi)幕》與《RocketMQ實戰(zhàn)》。
MessageStore的getMaxOffsetInQueue的時序圖如下所示:
從上述時序圖我們可以得知,調(diào)用DefaultMessageStore的getMaxOffsetInQueue方法,首先是根據(jù)主題、隊列ID獲取ConsumeQueue對象(在RocketMQ中一個主題的一個隊列會對應(yīng)一個ConsumeQueue,代表一個消費隊列),也就是這里獲取的偏移量指的是消費者隊列中的偏移量,而不是Commitlog文件的偏移量。
如果是找最大偏移量,就從該隊列中的找到最后一個文件,去獲取器最大的有效偏移量,也就是等于文件的起始偏移量(fileFromOffset)加上該文件當(dāng)前最大可讀的偏移量(readPosition),故引起這張時序圖一個非常關(guān)鍵的點,就是如何獲取消費隊列最大的可讀偏移量,代碼見MappedFile的getReadPosition:
public int getReadPosition(){
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}由于ConsumeQueue并沒有 transientStorePoolEnable 機(jī)制,數(shù)據(jù)直接寫入到FlieChannel中,故這里的writeBuffer為空,取的是 wrotePosition的值,那ConsumeQueue文件的wrotePosition值在什么地方更新呢?
這個可以通過查看MappedFile中修改wrotePosition的方法appendMessage方法的調(diào)用,如下圖所示:
與ConsumeQueue對應(yīng)的入口主要有兩個:
- ReputMessageService#doReput Commitlog異步轉(zhuǎn)發(fā)線程,通過該線程異步構(gòu)建Consumequeue、Index等文件
- Commitlog#recoverAbnormally RocketMQ啟動時根據(jù)Commitlog文件自動恢復(fù)Consumequeue文件
今天的主角當(dāng)然不讓非ReputMessageService莫屬,這里先和大家普及一下一個最基本的知識:RocketMQ為了追求極致的順序?qū)?,會將所有主題的消息順序?qū)懭氲揭粋€文件(Commitlog文件),然后異步轉(zhuǎn)發(fā)到ConsumeQueue(消費隊列文件)、IndexFile(索引文件)。
其轉(zhuǎn)發(fā)服務(wù)就是通過ReputMessageService來實現(xiàn)的。
在深入介紹Commitlog文件的轉(zhuǎn)發(fā)機(jī)制之前,我在這里先問大家一個問題:消息是寫入到內(nèi)存就轉(zhuǎn)發(fā)給ConsumeQueue,亦或是刷寫到磁盤后再轉(zhuǎn)發(fā)呢?
為了方便大家對這個問題的探究,其代碼的核心入口如下圖所示:
這里的關(guān)鍵實現(xiàn)要點如下:
- 判斷是否轉(zhuǎn)發(fā)關(guān)鍵條件在于 isCommitlogAvailable()方法返回true
- 根據(jù)轉(zhuǎn)發(fā)位點reputFromOffset,從Commitlog文件中獲取消息的物理偏移量、消息大小,tags等信息轉(zhuǎn)發(fā)到消息消費隊列、索引文件。
那isCommitlogAvailable的核心如下所示:
故轉(zhuǎn)發(fā)的關(guān)鍵就在于Commitlog的maxOffset的獲取邏輯了,其實現(xiàn)時序圖如下所示:
這里核心重點是getReadPosition方法的實現(xiàn),在RocketMQ寫Commitlog文件,為了提升寫入性能,引入了內(nèi)存級讀寫分離機(jī)制,具體的實現(xiàn)原理如下圖所示:
具體在實現(xiàn)層面,就是如果transientStorePoolEnable=true,數(shù)據(jù)寫入到堆外內(nèi)存(writeBuffer)中,然后再提交到FileChannel,提交的位置(commitedPosition來表示)。
大家可以分別看一下改變wrotePosition與committedPposition的調(diào)用鏈。
其中wrotePosition的調(diào)用鏈如下所示:
可以得知:wrotePosition是消息寫入到內(nèi)存(pagecache或者堆外內(nèi)存)都會更新,但一旦開啟了堆外內(nèi)存機(jī)制,并不會取該值,所以我們可以理解為當(dāng)消息寫入到Pagecache中時,就可以被轉(zhuǎn)發(fā)到消息消費隊列。
緊接著我們再看一下committedPosition的調(diào)用鏈,如下所示:
原來在RocketMQ中,如果開啟了transientStorePoolEnable機(jī)制,消息先寫入到堆外內(nèi)存,然后就會向消息發(fā)送者返回發(fā)送成功,然后會有一個異步線程(CommitRealTimeService)定時將消息(默認(rèn)200ms一次循環(huán))提交到FileChannel,即更新committedPosition的值,消息就會轉(zhuǎn)發(fā)給消費隊列,從而消費者就可以進(jìn)行消費。
2.2 問題原因提煉
經(jīng)過上面的解析,問題應(yīng)該有所眉目了。
由于我們公司為了提高RocketMQ的資源利用率,提升RocketMQ的寫入性能,我們開啟了transientStorePoolEnable機(jī)制,消息發(fā)送端寫入到堆外內(nèi)存,就會返回寫入成功,這樣MySQL Binlog數(shù)據(jù)同步并不會產(chǎn)生延遲,那這里的問題,無非就2個:
- CommitRealTimeService 線程并沒有及時將堆外內(nèi)存中的數(shù)據(jù)提交到FileChannel
- ReputMessageService線程沒有及時將數(shù)據(jù)轉(zhuǎn)發(fā)到消費隊列
由于目前我暫時對底層存儲寫入的原理還認(rèn)識不夠深入,對相關(guān)系統(tǒng)采集指標(biāo)不夠敏感,當(dāng)時主要分析了一下線程棧,發(fā)現(xiàn)ReputMessageService線程一直在工作,推測可能是轉(zhuǎn)發(fā)不及時,這塊我還需要更加深入去研究,如果大家對這塊有其實理解,歡迎留言,我也會在后續(xù)工作中提升這塊的技能,更加深入去理解底層的原理。
也就是目前知道了問題的表象原因,雖然底層原理還未通透,但目前足以指導(dǎo)我們更好的處理問題:將集群內(nèi)消息寫入大的主題,遷移到其他負(fù)載較低的集群,從而降低該集群的寫入壓力,當(dāng)遷移了幾個主題后,果不其然,消息到達(dá)消費隊列接近實時,集群得以恢復(fù)。
當(dāng)前標(biāo)題:生產(chǎn)環(huán)境MQ集群一個非常詭異的消費延遲排查
網(wǎng)站URL:http://www.dlmjj.cn/article/djohddi.html


咨詢
建站咨詢
