日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
五張圖帶你理解 RocketMQ 延時(shí)消息機(jī)制

延時(shí)消息是指發(fā)送到 RocketMQ 后不會(huì)馬上被消費(fèi)者拉取到,而是等待固定的時(shí)間,才能被消費(fèi)者拉取到。

成都創(chuàng)新互聯(lián)公司企業(yè)建站,10年網(wǎng)站建設(shè)經(jīng)驗(yàn),專注于網(wǎng)站建設(shè)技術(shù),精于網(wǎng)頁設(shè)計(jì),有多年建站和網(wǎng)站代運(yùn)營(yíng)經(jīng)驗(yàn),設(shè)計(jì)師為客戶打造網(wǎng)絡(luò)企業(yè)風(fēng)格,提供周到的建站售前咨詢和貼心的售后服務(wù)。對(duì)于成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)中不同領(lǐng)域進(jìn)行深入了解和探索,創(chuàng)新互聯(lián)在網(wǎng)站建設(shè)中充分了解客戶行業(yè)的需求,以靈動(dòng)的思維在網(wǎng)頁中充分展現(xiàn),通過對(duì)客戶行業(yè)精準(zhǔn)市場(chǎng)調(diào)研,為客戶提供的解決方案。

延時(shí)消息的使用場(chǎng)景很多,比如電商場(chǎng)景下關(guān)閉超時(shí)未支付的訂單,某些場(chǎng)景下需要在固定時(shí)間后發(fā)送提示消息。

1.生產(chǎn)者

首先看一個(gè)生產(chǎn)者發(fā)送延時(shí)消息的官方示例代碼:

public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}

// Shutdown producer after use.
producer.shutdown();
}

從上面的代碼可以看到,跟普通消息不一樣的是,消息設(shè)置 setDelayTimeLevel 屬性值,這里設(shè)置為 3,這里最終將 3 這個(gè)延時(shí)級(jí)別復(fù)制給了 DELAY 屬性。

關(guān)于延時(shí)級(jí)別,可以看下面這個(gè)定義:

//MessageStoreConfig類
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

這里延時(shí)級(jí)別有 18 個(gè),上面的示例代碼中延遲級(jí)別是 3,消息會(huì)延遲 10s 后消費(fèi)者才能拉取。

2.Broker 處理

2.1 寫入消息

Broker 收到消息后,會(huì)將消息寫入 CommitLog。在寫入時(shí),會(huì)判斷消息 DELAY 屬性是否大于 0。代碼如下:

//CommitLog 類
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}

從上面的代碼可以看到,CommitLog 寫入時(shí)并沒有直接寫入,而是把 Topic 改為 SCHEDULE_TOPIC_XXXX,把 queueId 改為延時(shí)級(jí)別減 1。因?yàn)檠訒r(shí)級(jí)別有 18 個(gè),所以這里有 18 個(gè)隊(duì)列。如下圖:

2.2 調(diào)度消息

延時(shí)消息寫入后,會(huì)有一個(gè)調(diào)度任務(wù)不停地拉取這些延時(shí)消息,這個(gè)邏輯在類 ScheduleMessageService。這個(gè)類的初始化代碼如下:

public void start() {
if (started.compareAndSet(false, true)) {
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
//省略部分邏輯
for (Map.Entry entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
//省略部分邏輯
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
//省略持久化的邏輯
}
}

上面的 load() 方法會(huì)加載一個(gè) delayLevelTable(ConcurrentHashMap類型),key 保存延時(shí)級(jí)別(從 1 開始),value 保存延時(shí)時(shí)間(單位是 ms)。

load() 方法結(jié)束后,創(chuàng)建了一個(gè)有 18 個(gè)核心線程的定時(shí)線程池,然后遍歷 delayLevelTable,創(chuàng)建 18 個(gè)任務(wù)(DeliverDelayedMessageTimerTask)進(jìn)行每個(gè)延時(shí)級(jí)別的任務(wù)調(diào)度。任務(wù)調(diào)度的代碼邏輯如下:

public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));

if (cq == null) {
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}

SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ == null) {
//省略部分邏輯
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}

long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//省略部分邏輯
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = deliverTimestamp - now;
if (countdown > 0) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}

MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}

MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
//事務(wù)消息判斷省略
boolean deliverSuc;
//只保留同步
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);

if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}

nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}

這段代碼可以參考下面的流程圖來進(jìn)行理解:

上面有一個(gè)修正投遞時(shí)間的函數(shù),這個(gè)函數(shù)的意義是如果已經(jīng)過了投遞時(shí)間,那么立即投遞。代碼如下:

private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
long result = deliverTimestamp;

long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
if (deliverTimestamp > maxTimestamp) {
result = now;
}
return result;
}

注意:消息從 CommitLog 轉(zhuǎn)發(fā)到 ConsumeQueue 時(shí),會(huì)判斷是否是延時(shí)消息(Topic = SCHEDULE_TOPIC_XXXX 并且延時(shí)級(jí)別大于 0),如果是延時(shí)消息,就會(huì)修改 tagsCode 值為消息投遞的時(shí)間戳,而 tagsCode 原值是 tag 的 HashCode。代碼如下:

//CommitLog類checkMessageAndReturnSize方法
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}

如下圖:

而 ScheduleMessageService 調(diào)度線程將消息從 ConsumeQueue 重新投遞到原始隊(duì)列中時(shí),會(huì)把 tagsCode 再次修改為 tag 的 HashCode,代碼如下:

//類MessageExtBrokerInner,這個(gè)方法被 messageTimeup 方法調(diào)用。
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (null == tags || tags.length() == 0) { return 0; }

return tags.hashCode();
}

如下圖:

2.3 一個(gè)問題

如果有一個(gè)業(yè)務(wù)場(chǎng)景,要求延時(shí)消息 3 小時(shí)才能消費(fèi),而 RocketMQ 的延時(shí)消息最大延時(shí)級(jí)別只支持延時(shí) 2 小時(shí),怎么處理?

這里提供兩個(gè)思路供大家參考:

在 Broker 上修改 messageDelayLevel 的默認(rèn)配置;

在客戶端緩存 msgId,先設(shè)置延時(shí)級(jí)別是 18(2h),當(dāng)客戶端拉取到消息后首先判斷有沒有緩存,如果有緩存則再次發(fā)送延時(shí)消息,這次延時(shí)級(jí)別是 17(1h),如果沒有緩存則進(jìn)行消費(fèi)。

3 總結(jié)

經(jīng)過上面的講解,延時(shí)消息的處理流程如下:

最后,延時(shí)消息的延時(shí)時(shí)間并不精確,這個(gè)時(shí)間是 Broker 調(diào)度線程把消息重新投遞到原始的 MessageQueue 的時(shí)間,如果發(fā)生消息積壓或者 RocketMQ 客戶端發(fā)生流量管控,客戶端拉取到消息后進(jìn)行處理的時(shí)間可能會(huì)超出預(yù)設(shè)的延時(shí)時(shí)間。?


當(dāng)前題目:五張圖帶你理解 RocketMQ 延時(shí)消息機(jī)制
本文路徑:http://www.dlmjj.cn/article/dpgshod.html