新聞中心
前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項目又重新啟動了,帶著復(fù)雜的心情仔細賞閱“兒時”的代碼,心中的酸楚只有自己能夠體會。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、重慶小程序開發(fā)、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了中方免費建站歡迎大家使用!
這不,昨天又被領(lǐng)導(dǎo)叫進了“小黑屋”,讓我把代碼重構(gòu)下進行升級??吹竭@么“可愛”的代碼,心中一萬只“xx馬”疾馳而過。
讓我最深惡痛覺的就是里邊竟然用定時任務(wù)實現(xiàn)了“關(guān)閉超時訂單”的功能,現(xiàn)在想來,哭笑不得。我們先分析一波為什么大家都在抵制用定時任務(wù)來實現(xiàn)該功能。
定時任務(wù)
關(guān)閉超時訂單是在創(chuàng)建訂單之后的一段時間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時時間是一致的。
如果我們使用定時任務(wù)來進行該操作,很難把握定時任務(wù)輪詢的時間間隔:
- 時間間隔足夠小,在誤差允許的范圍內(nèi)可以達到我們說的時間一致性問題,但是頻繁掃描數(shù)據(jù)庫,執(zhí)行定時任務(wù),會造成網(wǎng)絡(luò)IO和磁盤IO的消耗,對實時交易造成一定的沖擊;
- 時間間隔比較大,由于每個訂單創(chuàng)建的時間不一致,所以上邊的一致性要求很難達到,舉例如下:
假設(shè)30分鐘訂單超時自動關(guān)閉,定時任務(wù)的執(zhí)行間隔時間為30分鐘:
我們在第5分鐘進行下單操作;
- 當(dāng)時間來到第30分鐘時,定時任務(wù)執(zhí)行一次,但是我們的訂單未滿足條件,不執(zhí)行;
- 當(dāng)時間來到第35分鐘時,訂單達到關(guān)閉條件,但是定時任務(wù)未執(zhí)行,所以不執(zhí)行;
- 當(dāng)時間來到第60分鐘時,開始執(zhí)行我們的訂單關(guān)閉操作,而此時,誤差達到25分鐘;
- 經(jīng)此種種,我們需要舍棄該方式。
延時隊列
為了滿足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊列:RabbitMQ。盡管它本身并沒有提供延時隊列的功能,但是我們可以利用它的存活時間和死信交換機的特性來間接實現(xiàn)。
首先我們先來簡單介紹下什么是存活時間?什么是死信交換機?
存活時間
存活時間的全拼是Time To Live,簡稱 TTL。它既支持對消息本身進行設(shè)置(延遲隊列的關(guān)鍵),又支持對隊列進行設(shè)置(該隊列中所有消息存在相同的過期時間)。
- 對消息本身進行設(shè)置:即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判定的;
- 對隊列進行設(shè)置:一旦消息過期,就會從隊列中抹去。
如果同時使用這兩種方法,那么以過期時間小的那個數(shù)值為準(zhǔn)。當(dāng)消息達到過期時間還沒有被消費,那么該消息就“死了”,我們把它稱為 死信 消息。
消息變?yōu)樗佬诺臈l件:
- 消息被拒絕(basic.reject/basic.nack),并且requeue=false;
- 消息的過期時間到期了;
- 隊列達到最大長度。
隊列設(shè)置注意事項
隊列中該屬性的設(shè)置要在第一次聲明隊列的時候設(shè)置才有效,如果隊列一開始已存在且沒有這個屬性,則要刪掉隊列再重新聲明才可以。
隊列的 ttl 只能被設(shè)置為某個固定的值,一旦設(shè)置后則不能更改,否則會拋出異常;
死信交換機
死信交換機全拼Dead-Letter-Exchange,簡稱DLX。
當(dāng)消息在一個隊列中變成死信之后,如果這個消息所在的隊列設(shè)置了x-dead-letter-exchange參數(shù),那么它會被發(fā)送到x-dead-letter-exchange對應(yīng)值的交換機上,這個交換機就稱之為死信交換機,與這個死信交換器綁定的隊列就是死信隊列。
- x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機;
- x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認使用消息本身的routing-key。
死信隊列與普通隊列的區(qū)別就是它的RoutingKey和Exchange需要作為參數(shù),綁定到正常的隊列上。
實戰(zhàn)教學(xué)
先來張圖感受下我們的整體思路:
- 生產(chǎn)者發(fā)送帶有 ttl 的消息放入交換機路由到延時隊列中;
- 在延時隊列中綁定死信交換機與死信轉(zhuǎn)發(fā)的routing-key;
- 等延時隊列中的消息達到延時時間之后變成死信轉(zhuǎn)發(fā)到死信交換機并路由到死信隊列中;
- 最后供消費者消費。
配置類
@Configuration
public class DelayQueueRabbitConfig {
public static final String DLX_QUEUE = "queue.dlx";//死信隊列
public static final String DLX_EXCHANGE = "exchange.dlx";//死信交換機
public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信隊列與死信交換機綁定的routing-key
public static final String ORDER_QUEUE = "queue.order";//訂單的延時隊列
public static final String ORDER_EXCHANGE = "exchange.order";//訂單交換機
public static final String ORDER_ROUTING_KEY = "routingkey.order";//延時隊列與訂單交換機綁定的routing-key
/**
* 定義死信隊列
**/
@Bean
public Queue dlxQueue(){
return new Queue(DLX_QUEUE,true);
}
/**
* 定義死信交換機
**/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(DLX_EXCHANGE, true, false);
}
/**
* 死信隊列和死信交換機綁定
* 設(shè)置路由鍵:routingkey.dlx
**/
@Bean
Binding bindingDLX(){
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}
/**
* 訂單延時隊列
* 設(shè)置隊列里的死信轉(zhuǎn)發(fā)到的DLX名稱
* 設(shè)置死信在轉(zhuǎn)發(fā)時攜帶的 routing-key 名稱
**/
@Bean
public Queue orderQueue() {
Mapparams = new HashMap<>();
params.put("x-dead-letter-exchange", DLX_EXCHANGE);
params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(ORDER_QUEUE, true, false, false, params);
}
/**
* 訂單交換機
**/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE, true, false);
}
/**
* 把訂單隊列和訂單交換機綁定在一起
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}
}
消費消息
@Component
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽隊列名稱
public class OrderMQReciever {
@RabbitHandler
public void process(String message){
System.out.println("OrderMQReciever接收到的消息是:"+ message);
}
}
測試
通過調(diào)用接口,發(fā)現(xiàn)10秒之后才會消費消息:
問題升級
由于開發(fā)環(huán)境和測試環(huán)境使用的是同一個交換機和隊列,所以發(fā)送的延時時間都是30分鐘。但是為了在測試環(huán)境讓測試同學(xué)方便測試,故手動將測試環(huán)境的時間改為了1分鐘。
問題復(fù)現(xiàn)
接著問題就來了:延時時間為1分鐘的消息并沒有立即被消費,而是等30分鐘的消息被消費完之后才被消費了。至于原因,我們下邊再分析,先用代碼來給大家復(fù)現(xiàn)下該問題。
@GetMapping("/sendManyMessage")
public String sendManyMessage(){
send("延遲消息睡10秒",10000+"");
send("延遲消息睡2秒",2000+"");
send("延遲消息睡5秒",5000+"");
return "ok";
}
private void send(String msg, String delayTime){
rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,
DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
msg,message->{
message.getMessageProperties().setExpiration(delayTime);
return message;
});
}執(zhí)行結(jié)果如下:
OrderMQReciever接收到的消息是:延遲消息睡10秒
OrderMQReciever接收到的消息是:延遲消息睡2秒
OrderMQReciever接收到的消息是:延遲消息睡5秒
原因就是延時隊列也滿足隊列先進先出的特征,當(dāng)10秒的消息未出隊列時,后邊的消息不能順利出隊,造成后邊的消息阻塞了,未能達到精準(zhǔn)延時。
問題解決
我們可以利用x-delay-message插件來解決該問題。
消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)。
- 生產(chǎn)者發(fā)送消息到交換機時,并不會立即進入,而是先將消息持久化到 Mnesia(一個分布式數(shù)據(jù)庫管理系統(tǒng));
- 插件將會嘗試確認消息是否過期;
- 如果消息過期,消息會通過 x-delayed-type 類型標(biāo)記的交換機投遞至目標(biāo)隊列,供消費者消費。
實踐
官網(wǎng)下載:https://www.rabbitmq.com/community-plugins.html
我這邊使用的是v3.8.0.ez,將文件下載下來放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。
出現(xiàn)如圖所示,代表安裝成功。
配置類
@Configuration
public class XDelayedMessageConfig {
public static final String DIRECT_QUEUE = "queue.direct";//隊列
public static final String DELAYED_EXCHANGE = "exchange.delayed";//延遲交換機
public static final String ROUTING_KEY = "routingkey.bind";//綁定的routing-key
/**
* 定義隊列
**/
@Bean
public Queue directQueue(){
return new Queue(DIRECT_QUEUE,true);
}
/**
* 定義延遲交換機
* args:根據(jù)該參數(shù)進行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機具有相同的路由行為,
* 如果想要不同的路由行為,可以更換現(xiàn)有的交換類型如:“topic”
* 交換機類型為 x-delayed-message
**/
@Bean
public CustomExchange delayedExchange(){
Mapargs = new HashMap ();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 隊列和延遲交換機綁定
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
}
}
發(fā)送消息
@RestController
@RequestMapping("/delayed")
public class DelayedSendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendManyMessage")
public String sendManyMessage(){
send("延遲消息睡10秒",10000);
send("延遲消息睡2秒",2000);
send("延遲消息睡5秒",5000);
return "ok";
}
private void send(String msg, Integer delayTime){
//將消息攜帶路由鍵值
rabbitTemplate.convertAndSend(
XDelayedMessageConfig.DELAYED_EXCHANGE,
XDelayedMessageConfig.ROUTING_KEY,
msg,
message->{
message.getMessageProperties().setDelay(delayTime);
return message;
});
}
}
消費消息
@Component
@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽隊列名稱
public class DelayedMQReciever {
@RabbitHandler
public void process(String message){
System.out.println("DelayedMQReciever接收到的消息是:"+ message);
}
}
測試
DelayedMQReciever接收到的消息是:延遲消息睡2秒
DelayedMQReciever接收到的消息是:延遲消息睡5秒
DelayedMQReciever接收到的消息是:延遲消息睡10秒
這樣我們的問題就順利解決了。
局限性
延遲的消息存儲在一個Mnesia表中,當(dāng)前節(jié)點上只有一個磁盤副本,它們將在節(jié)點重啟后存活。
雖然觸發(fā)計劃交付的計時器不會持久化,但它將在節(jié)點啟動時的插件激活期間重新初始化。顯然,集群中只有一個預(yù)定消息的副本意味著丟失該節(jié)點或禁用其上的插件將丟失駐留在該節(jié)點上的消息。
該插件的當(dāng)前設(shè)計并不適合延遲消息數(shù)量較多的場景(如數(shù)萬條或數(shù)百萬條),另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統(tǒng)中使用了一定數(shù)量的長時間計時器之后,它們開始爭用調(diào)度程序資源,并且時間漂移不斷累積。
網(wǎng)站名稱:領(lǐng)導(dǎo)看了我寫的關(guān)閉超時訂單,讓我出門左轉(zhuǎn)!
新聞來源:http://www.dlmjj.cn/article/cdoocph.html


咨詢
建站咨詢
