新聞中心
背景
成都創(chuàng)新互聯(lián)公司堅持“要么做到,要么別承諾”的工作理念,服務領(lǐng)域包括:網(wǎng)站設計、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣等服務,滿足客戶于互聯(lián)網(wǎng)時代的??稻W(wǎng)站設計、移動媒體設計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡建設合作伙伴!
在開發(fā)中,往往會遇到一些關(guān)于延時任務的需求。例如
•生成訂單30分鐘未支付,則自動取消
•生成訂單60秒后,給用戶發(fā)短信
對上述的任務,我們給一個專業(yè)的名字來形容,那就是延時任務。
最近需要做一個延時處理的功能,主要是從kafka中消費消息后根據(jù)消息中的某個延時字段來進行延時處理,在實際的實現(xiàn)過程中有一些需要注意的地方,記錄如下。
實現(xiàn)過程
說到java中的定時功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下幾點:
- Timer使用的是絕對時間,系統(tǒng)時間的改變會對Timer產(chǎn)生一定的影響;而ScheduledThreadPoolExecutor使用的是相對時間,所以不會有這個問題。
- Timer使用單線程來處理任務,長時間運行的任務會導致其他任務的延時處理,而ScheduledThreadPoolExecutor可以自定義線程數(shù)量。
- Timer沒有對運行時異常進行處理,一旦某個任務觸發(fā)運行時異常,會導致整個Timer崩潰,而ScheduledThreadPoolExecutor對運行時異常做了捕獲(可以在 afterExecute() 回調(diào)方法中進行處理),所以更加安全。
1、ScheduledThreadPoolExecutor決定了用ScheduledThreadPoolExecutor來進行實現(xiàn),接下來就是代碼編寫啦(大體流程代碼)。
主要的延時實現(xiàn)如下:
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new ThreadPoolExecutor.AbortPolicy()); //從消息中取出延遲時間及相關(guān)信息的代碼略 int delayTime = 0; executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { //具體操作邏輯 }},0,delayTime, TimeUnit.SECONDS);
其中NamedThreadFactory是我自定義的一個線程工廠,主要給線程池定義名稱及相關(guān)日志打印便于后續(xù)的問題分析,這里就不多做介紹了。拒絕策略也是采用默認的拒絕策略。
然后測試了一下,滿足目標需求的功能,可以做到延遲指定時間后執(zhí)行,至此似乎功能就被完成了。
大家可能疑問,這也太簡單了有什么好說的,但是這種方式實現(xiàn)簡單是簡單但是存在一個潛在的問題,問題在哪呢,讓我們看一下ScheduledThreadPoolExecutor的源碼:
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
ScheduledThreadPoolExecutor由于它自身的延時和周期的特性,默認使用了DelayWorkQueue,而并不像我們平時使用的SingleThreadExecutor等構(gòu)造是可以使用自己定義的LinkedBlockingQueue并且設置隊列大小,問題就出在這里。
DelayWrokQueue是一個無界隊列,而我們的目標數(shù)據(jù)源是kafka,也就是一個高并發(fā)高吞吐的消息隊列,很大可能在某一時間段有大量的消息過來從而導致OOM,在使用多線程時我們是肯定要考慮到OOM的可能性的,因為OOM帶來的后果往往比較嚴重,系統(tǒng)OOM臨時的解決辦法一般只能是重啟,可能會導致用戶數(shù)據(jù)丟失等不可能挽回的問題,所以從編碼設計階段要采用盡可能穩(wěn)妥的手段來避免這些問題。
2、采用redis和線程結(jié)合
這一次換了思路,采用redis來幫助我們做緩沖,從而避免消息過多OOM的問題。
相關(guān)redis zset api:
//添加元素 ZADD key score member [[score member] [score member] …] //根據(jù)分值及限制數(shù)量查詢 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] //從zset中刪除指定成員 ZREM key member [member …]
我們采用redis基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)的zset結(jié)構(gòu),采用score來存儲我們目標發(fā)送時間的數(shù)值,整體處理流程如下:
- 第一步數(shù)據(jù)存儲:9:10分從kafka接收了一條a的訂單消息,要求30分鐘后進行發(fā)貨通知,那我們就將當前時間加上30分鐘然后轉(zhuǎn)為時間戳作為a的score,key為a的訂單號存入redis中。代碼如下:
public void onMessage(String topic, String message) { String orderId; int delayTime = 0; try { MapmsgMap = gson.fromJson(message, new TypeToken
- 第二步數(shù)據(jù)處理:另起一個線程具體調(diào)度時間根據(jù)業(yè)務需求來定,我這里3分鐘執(zhí)行一次,內(nèi)部邏輯:從redis中取出一定量的zset數(shù)據(jù),如何取呢,使用zset的zrangeByScore方法,根據(jù)數(shù)據(jù)的score進行排序,當然可以帶上時間段,這里從0到現(xiàn)在,來進行消費,需要注意的一點是,在取出數(shù)據(jù)后我們需要用zrem方法將取出的數(shù)據(jù)從zset中刪除,防止其他線程重復消費數(shù)據(jù)。在此之后進行接下來的發(fā)貨通知等相關(guān)邏輯。代碼如下:
public void run(){ //獲取批量大小 int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100")); try { //批量獲取離發(fā)送時間最近的orderNum條數(shù)據(jù) Calendar calendar = Calendar.getInstance(); long now = calendar.getTimeInMillis(); //獲取無限早到現(xiàn)在的事件key(防止上次批量數(shù)量小于放入數(shù)量,存在歷史數(shù)據(jù)未消費情況) SetorderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum); LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds)); if (CollectionUtils.isNotEmpty(orders)){ //刪除key 防止重復發(fā)送 for (String orderId : orderIds) { RedisUtils.getInstance().zrem(Constant.DELAY, orderId); } //接下來執(zhí)行發(fā)送等業(yè)務邏輯 } } catch (Exception e) { LOGGER.warn("task.run exception:{}", e); } }
至此完成了依賴redis和線程完成了延時發(fā)送的功能。
結(jié)語
那么對上面兩種不同的實現(xiàn)方式進行一下優(yōu)缺點比較:
- 第一種方式實現(xiàn)簡單,不依賴外部組件,能夠快速的實現(xiàn)目標功能,但缺點也很明顯,需要在特定的場景下使用,如果是我這種消息量大的情況下使用很可能是有問題,當然在數(shù)據(jù)源消息不多的情況下不失為好的選擇。
- 第二種方式實現(xiàn)稍微復雜一點,但是能夠適應消息量大的場景,采用redis的zset作為了“中間件”的效果,并且?guī)椭覀冞M行延時的功能實現(xiàn)能夠較好的適應高并發(fā)場景,缺點在于在編寫的過程中需要考慮實際的因素較多,例如線程的執(zhí)行周期時間,發(fā)送可能會有一定時間的延遲,批量數(shù)據(jù)大小的設置等等。
綜上是本人這次延時功能的實現(xiàn)過程的兩種實現(xiàn)方式的總結(jié),具體采用哪種方式還需大家根據(jù)實際情況選擇,希望能給大家?guī)韼椭s:由于本人的技術(shù)能力有限,文章中可能出現(xiàn)技術(shù)描述不準確或者錯誤的情況懇請各位大佬指出,我立馬進行改正,避免誤導大家,謝謝!
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對創(chuàng)新互聯(lián)的支持。
文章名稱:利用Redis實現(xiàn)延時處理的方法實例
本文URL:http://www.dlmjj.cn/article/ighohg.html