新聞中心
RocketMQ中怎么對DLedger進行整合,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
10年專注成都網(wǎng)站制作,企業(yè)網(wǎng)站制作,個人網(wǎng)站制作服務(wù),為大家分享網(wǎng)站制作知識、方案,網(wǎng)站設(shè)計流程、步驟,成功服務(wù)上千家企業(yè)。為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計及定制高端網(wǎng)站建設(shè)服務(wù),專注于企業(yè)網(wǎng)站制作,高端網(wǎng)頁制作,對成都木托盤等多個領(lǐng)域,擁有豐富的網(wǎng)站推廣經(jīng)驗。
1、閱讀源碼之前的思考
RocketMQ 的消息存儲文件主要包括 commitlog 文件、consumequeue 文件與 Index 文件。commitlog 文件存儲全量的消息,consumequeue、index 文件都是基于 commitlog 文件構(gòu)建的。要使用 DLedger 來實現(xiàn)消息存儲的一致性,應該關(guān)鍵是要實現(xiàn) commitlog 文件的一致性,即 DLedger 要整合的對象應該是 commitlog 文件,即只需保證 raft 協(xié)議的復制組內(nèi)各個節(jié)點的 commitlog 文件一致即可。
我們知道使用文件存儲消息都會基于一定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數(shù)、消息長度,消息屬性、消息體等,而我們再來回顧一下 DLedger 日志的存儲格式:
DLedger 要整合 commitlog 文件,是不是可以把 rocketmq 消息,即一個個 commitlog 條目整體當成 DLedger 的 body 字段即可。
還等什么,跟我一起來看源碼吧?。?!別急,再拋一個問題,DLedger 整合 RocketMQ commitlog,能不能做到平滑升級?
帶著這些思考和問題,一起來探究 DLedger 是如何整合 RocketMQ 的。
2、從 Broker 啟動流程看 DLedger
> 溫馨提示:本文不會詳細介紹 Broker 端的啟動流程,只會點出在啟動過程中與 DLedger 相關(guān)的代碼,如想詳細了解 Broker 的啟動流程,建議關(guān)注筆者的《RocketMQ技術(shù)內(nèi)幕》一書。
Broker 涉及到 DLedger 相關(guān)關(guān)鍵點如下:
2.1 構(gòu)建 DefaultMessageStore
DefaultMessageStore 構(gòu)造方法
if(messageStoreConfig.isEnableDLegerCommitLog()) { // [@1](https://my.oschina.net/u/1198) this.commitLog = new DLedgerCommitLog(this); else { this.commitLog = new CommitLog(this); // @2 }
代碼@1:如果開啟 DLedger ,commitlog 的實現(xiàn)類為 DLedgerCommitLog,也是本文需要關(guān)注的關(guān)鍵所在。
代碼@2:如果未開啟 DLedger,則使用舊版的 Commitlog實現(xiàn)類。
2.2 增加節(jié)點狀態(tài)變更事件監(jiān)聽器
BrokerController#initialize
if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }
主要調(diào)用 LedgerLeaderElector 的 addRoleChanneHandler 方法增加 節(jié)點角色變更事件監(jiān)聽器,DLedgerRoleChangeHandler 是實現(xiàn)主從切換的另外一個關(guān)鍵點。
2.3 調(diào)用 DefaultMessageStore 的 load 方法
DefaultMessageStore#load
// load Commit Log result = result && this.commitLog.load(); // [@1](https://my.oschina.net/u/1198) // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); this.indexService.load(lastExitOK); this.recover(lastExitOK); // @2 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); }
代碼@1、@2 最終都是委托 commitlog 對象來執(zhí)行,這里的關(guān)鍵又是如果開啟了 DLedger,則最終調(diào)用的是 DLedgerCommitLog。
經(jīng)過上面的鋪墊,主角 DLedgerCommitLog “閃亮登場“了。
3、DLedgerCommitLog 詳解
> 溫馨提示:由于 Commitlog 的絕大部分方法都已經(jīng)在《RocketMQ技術(shù)內(nèi)幕》一書中詳細介紹了,并且 DLedgerCommitLog 的實現(xiàn)原理與 Commitlog 文件的實現(xiàn)原理類同,本文會一筆帶過關(guān)于存儲部分的實現(xiàn)細節(jié)。
3.1 核心類圖
DLedgerCommitlog 繼承自 Commitlog。讓我們一一來看一下它的核心屬性。
DLedgerServer dLedgerServer 基于 raft 協(xié)議實現(xiàn)的集群內(nèi)的一個節(jié)點,用 DLedgerServer 實例表示。
DLedgerConfig dLedgerConfig DLedger 的配置信息。
DLedgerMmapFileStore dLedgerFileStore DLedger 基于文件映射的存儲實現(xiàn)。
MmapFileList dLedgerFileList DLedger 所管理的存儲文件集合,對比 RocketMQ 中的 MappedFileQueue。
int id 節(jié)點ID,0 表示主節(jié)點,非0表示從節(jié)點
MessageSerializer messageSerializer 消息序列器。
long beginTimeInDledgerLock = 0 用于記錄 消息追加的時耗(日志追加所持有鎖時間)。
long dividedCommitlogOffset = -1 記錄的舊 commitlog 文件中的最大偏移量,如果訪問的偏移量大于它,則訪問 dledger 管理的文件。
boolean isInrecoveringOldCommitlog = false 是否正在恢復舊的 commitlog 文件。
接下來我們將詳細介紹 DLedgerCommitlog 各個核心方法及其實現(xiàn)要點。
3.2 構(gòu)造方法
public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) { super(defaultMessageStore); // @1 dLedgerConfig = new DLedgerConfig(); dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable()); dLedgerConfig.setStoreType(DLedgerConfig.FILE); dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId()); dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup()); dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers()); dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()); dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog()); dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen()); dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1); id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1; // @2 dLedgerServer = new DLedgerServer(dLedgerConfig); // @3 dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore(); DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> { assert bodyOffset == DLedgerEntry.BODY_OFFSET; buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION); buffer.putLong(entry.getPos() + bodyOffset); }; dLedgerFileStore.addAppendHook(appendHook); // @4 dLedgerFileList = dLedgerFileStore.getDataFileList(); this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); // @5 }
代碼@1:調(diào)用父類 即 CommitLog 的構(gòu)造函數(shù),加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 文件,以便兼容升級 DLedger 的消息。我們稍微看一下 CommitLog 的構(gòu)造函數(shù):
代碼@2:構(gòu)建 DLedgerConfig 相關(guān)配置屬性,其主要屬性如下:
enableDiskForceClean 是否強制刪除文件,取自 broker 配置屬性 cleanFileForciblyEnable,默認為 true 。
storeType DLedger 存儲類型,固定為 基于文件的存儲模式。
dLegerSelfId leader 節(jié)點的 id 名稱,示例配置:n0,其配置要求第二個字符后必須是數(shù)字。
dLegerGroup DLeger group 的名稱,建議與 broker 配置屬性 brokerName 保持一致。
dLegerPeers DLeger Group 中所有的節(jié)點信息,其配置示例 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多個節(jié)點使用分號隔開。
storeBaseDir 設(shè)置 DLedger 的日志文件的根目錄,取自 borker 配件文件中的 storePathRootDir ,即 RocketMQ 的數(shù)據(jù)存儲根路徑。
mappedFileSizeForEntryData 設(shè)置 DLedger 的單個日志文件的大小,取自 broker 配置文件中的 - mapedFileSizeCommitLog,即與 commitlog 文件的單個文件大小一致。
deleteWhen DLedger 日志文件的刪除時間,取自 broker 配置文件中的 deleteWhen,默認為凌晨 4點。
fileReservedHours DLedger 日志文件保留時長,取自 broker 配置文件中的 fileReservedHours,默認為 72h。
代碼@3:根據(jù) DLedger 配置信息創(chuàng)建 DLedgerServer,即創(chuàng)建 DLedger 集群節(jié)點,集群內(nèi)各個節(jié)點啟動后,就會觸發(fā)選主。
代碼@4:構(gòu)建 appendHook 追加鉤子函數(shù),這是兼容 Commitlog 文件很關(guān)鍵的一步,后面會詳細介紹其作用。
代碼@5:構(gòu)建消息序列化。
根據(jù)上述的流程圖,構(gòu)建好 DefaultMessageStore 實現(xiàn)后,就是調(diào)用其 load 方法,在啟用 DLedger 機制后,會依次調(diào)用 DLedgerCommitlog 的 load、recover 方法。
3.3 load
public boolean load() { boolean result = super.load(); if (!result) { return false; } return true; }
DLedgerCommitLog 的 laod 方法實現(xiàn)比較簡單,就是調(diào)用 其父類 Commitlog 的 load 方法,即這里也是為了啟用 DLedger 時能夠兼容以前的消息。
3.4 recover
在 Broker 啟動時會加載 commitlog、consumequeue等文件,需要恢復其相關(guān)是數(shù)據(jù)結(jié)構(gòu),特別是與寫入、刷盤、提交等指針,其具體調(diào)用 recover 方法。 DLedgerCommitLog#recover
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // @1 recover(maxPhyOffsetOfConsumeQueue); }
首先會先恢復 consumequeue,得出 consumequeue 中記錄的最大有效物理偏移量,然后根據(jù)該物理偏移量進行恢復。 接下來看一下該方法的處理流程與關(guān)鍵點。
DLedgerCommitLog#recover
dLedgerFileStore.load();
Step1:加載 DLedger 相關(guān)的存儲文件,并一一構(gòu)建對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針為文件的大小。
DLedgerCommitLog#recover
if (dLedgerFileList.getMappedFiles().size() > 0) { dLedgerFileStore.recover(); // @1 dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset(); // @2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // @3 disableDeleteDledger(); } long maxPhyOffset = dLedgerFileList.getMaxWrotePosition(); // Clear ConsumeQueue redundant data if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) { // @4 log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset); this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset); } return; }
Step2:如果已存在 DLedger 的數(shù)據(jù)文件,則只需要恢復 DLedger 相關(guān)數(shù)據(jù)文建,因為在加載舊的 commitlog 文件時已經(jīng)將其重要的數(shù)據(jù)指針設(shè)置為最大值。其關(guān)鍵實現(xiàn)點如下:
首先調(diào)用 DLedger 文件存儲實現(xiàn)類 DLedgerFileStore 的 recover 方法,恢復管轄的 MMapFile 對象(一個文件對應一個MMapFile實例)的相關(guān)指針,其實現(xiàn)方法與 RocketMQ 的 DefaultMessageStore 的恢復過程類似。
設(shè)置 dividedCommitlogOffset 的值為 DLedger 中所有物理文件的最小偏移量。操作消息的物理偏移量小于該值,則從 commitlog 文件中查找;物理偏移量大于等于該值的話則從 DLedger 相關(guān)的文件中查找消息。
如果存在舊的 commitlog 文件,則禁止刪除 DLedger 文件,其具體做法就是禁止強制刪除文件,并將文件的有效存儲時間設(shè)置為 10 年。
如果 consumequeue 中存儲的最大物理偏移量大于 DLedger 中最大的物理偏移量,則刪除多余的 consumequeue 文件。
>溫馨提示:為什么當存在 commitlog 文件的情況下,不能刪除 DLedger 相關(guān)的日志文件呢?
因為在此種情況下,如果 DLedger 中的物理文件有刪除,則物理偏移量會斷層。
正常情況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續(xù)的,這樣非常方便是訪問 commitlog 還是 訪問 DLedger ,但如果DLedger 部分文件刪除后,這兩個值就變的不連續(xù),就會造成中間的文件空洞,無法被連續(xù)訪問。
DLedgerCommitLog#recover
isInrecoveringOldCommitlog = true; super.recoverNormally(maxPhyOffsetOfConsumeQueue); isInrecoveringOldCommitlog = false;
Step3:如果啟用了 DLedger 并且是初次啟動(還未生成 DLedger 相關(guān)的日志文件),則需要恢復 舊的 commitlog 文件。
DLedgerCommitLog#recover
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile == null) { // @1 return; } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(mappedFile.getWrotePosition()); boolean needWriteMagicCode = true; // 1 TOTAL SIZE byteBuffer.getInt(); //size int magicCode = byteBuffer.getInt(); if (magicCode == CommitLog.BLANK_MAGIC_CODE) { // @2 needWriteMagicCode = false; } else { log.info("Recover old commitlog found a illegal magic code={}", magicCode); } dLedgerConfig.setEnableDiskForceClean(false); dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize(); // @3 log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset); if (needWriteMagicCode) { // @4 byteBuffer.position(mappedFile.getWrotePosition()); byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition()); byteBuffer.putInt(BLANK_MAGIC_CODE); mappedFile.flush(0); } mappedFile.setWrotePosition(mappedFile.getFileSize()); // @5 mappedFile.setCommittedPosition(mappedFile.getFileSize()); mappedFile.setFlushedPosition(mappedFile.getFileSize()); dLedgerFileList.getLastMappedFile(dividedCommitlogOffset); log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset); }
Step4:如果存在舊的 commitlog 文件,需要將最后的文件剩余部分全部填充,即不再接受新的數(shù)據(jù)寫入,新的數(shù)據(jù)全部寫入到 DLedger 的數(shù)據(jù)文件中。其關(guān)鍵實現(xiàn)點如下:
嘗試查找最后一個 commitlog 文件,如果未找到,則結(jié)束。
從最后一個文件的最后寫入點(原 commitlog 文件的 待寫入位點)嘗試去查找寫入的魔數(shù),如果存在魔數(shù)并等于 CommitLog.BLANK_MAGIC_CODE,則無需再寫入魔數(shù),在升級 DLedger 第一次啟動時,魔數(shù)為空,故需要寫入魔數(shù)。
初始化 dividedCommitlogOffset ,等于最后一個文件的起始偏移量加上文件的大小,即該指針指向最后一個文件的結(jié)束位置。
將最后一個 commitlog 未寫滿的數(shù)據(jù)全部寫入,其方法為 設(shè)置消息體的 size 與 魔數(shù)即可。
設(shè)置最后一個文件的 wrotePosition、flushedPosition、committedPosition 為文件的大小,同樣有意味者最后一個文件已經(jīng)寫滿,下一條消息將寫入 DLedger 中。
在啟用 DLedger 機制時 Broker 的啟動流程就介紹到這里了,相信大家已經(jīng)了解 DLedger 在整合 RocketMQ 上做的努力,接下來我們從消息追加、消息讀取兩個方面再來探討 DLedger 是如何無縫整合 RocketMQ 的,實現(xiàn)平滑升級的。
4、從消息追加看 DLedger 整合 RocketMQ 如何實現(xiàn)無縫兼容
> 溫馨提示:本節(jié)同樣也不會詳細介紹整個消息追加(存儲流程),只是要點出與 DLedger(多副本、主從切換)相關(guān)的核心關(guān)鍵點。如果想詳細了解消息追加的流程,可以閱讀筆者所著的《RocketMQ技術(shù)內(nèi)幕》一書。
DLedgerCommitLog#putMessage
AppendEntryRequest request = new AppendEntryRequest(); request.setGroup(dLedgerConfig.getGroup()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setBody(encodeResult.data); dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); if (dledgerFuture.getPos() == -1) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); }
關(guān)鍵點一:消息追加時,則不再寫入到原先的 commitlog 文件中,而是調(diào)用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有集群內(nèi)的 Leader 節(jié)點負責消息追加以及在消息復制,只有超過集群內(nèi)的半數(shù)節(jié)點成功寫入消息后,才會返回寫入成功。如果追加成功,將會返回本次追加成功后的起始偏移量,即 pos 屬性,即類似于 rocketmq 中 commitlog 的偏移量,即物理偏移量。
DLedgerCommitLog#putMessage
long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);
關(guān)鍵點二:根據(jù) DLedger 的起始偏移量計算真正的消息的物理偏移量,從開頭部分得知,DLedger 自身有其存儲協(xié)議,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結(jié)構(gòu),返回給客戶端的消息偏移量為 body 字段的開始偏移量,即通過 putMessage 返回的物理偏移量與不使用Dledger 方式返回的物理偏移量的含義是一樣的,即從開偏移量開始,可以正確讀取消息,這樣 DLedger 完美的兼容了 RocketMQ Commitlog。關(guān)于 pos 以及 wroteOffset 的圖解如下:
5、從消息讀取看 DLedger 整合 RocketMQ 如何實現(xiàn)無縫兼容
DLedgerCommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { if (offset < dividedCommitlogOffset) { // @1 return super.getMessage(offset, size); } int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData(); MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0); // @2 if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return convertSbr(mappedFile.selectMappedBuffer(pos, size)); // @3 } return null; }
消息查找比較簡單,因為返回給客戶端消息,轉(zhuǎn)發(fā)給 consumequeue 的消息物理偏移量并不是 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實現(xiàn)關(guān)鍵點如下:
如果查找的物理偏移量小于 dividedCommitlogOffset,則從原先的 commitlog 文件中查找。
然后根據(jù)物理偏移量按照二分方找到具體的物理文件。
對物理偏移量取模,得出在該物理文件中中的絕對偏移量,進行消息查找即可,因為只有知道其物理偏移量,從該處先將消息的長度讀取出來,然后即可讀出一條完整的消息。
5、總結(jié)
根據(jù)上面詳細的介紹,我想讀者朋友們應該不難得出如下結(jié)論:
DLedger 在整合時,使用 DLedger 條目包裹 RocketMQ 中的 commitlog 條目,即在 DLedger 條目的 body 字段來存儲整條 commitlog 條目。
引入 dividedCommitlogOffset 變量,表示物理偏移量小于該值的消息存在于舊的 commitlog 文件中,實現(xiàn) 升級 DLedger 集群后能訪問到舊的數(shù)據(jù)。
新 DLedger 集群啟動后,會將最后一個 commitlog 填充,即新的數(shù)據(jù)不會再寫入到 原先的 commitlog 文件。
消息追加到 DLedger 數(shù)據(jù)日志文件中,返回的偏移量不是 DLedger 條目的起始偏移量,而是DLedger 條目中 body 字段的起始偏移量,即真實消息的起始偏移量,保證消息物理偏移量的語義與 RocketMQ Commitlog一樣。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。
文章題目:RocketMQ中怎么對DLedger進行整合
轉(zhuǎn)載來源:http://www.dlmjj.cn/article/jcehjs.html