新聞中心
什么是延遲任務(wù)?
顧名思議,我們把需要延遲執(zhí)行的任務(wù)叫做延遲任務(wù)。

創(chuàng)新互聯(lián)基于成都重慶香港及美國等地區(qū)分布式IDC機房數(shù)據(jù)中心構(gòu)建的電信大帶寬,聯(lián)通大帶寬,移動大帶寬,多線BGP大帶寬租用,是為眾多客戶提供專業(yè)服務(wù)器托管報價,主機托管價格性價比高,為金融證券行業(yè)服務(wù)器托管,ai人工智能服務(wù)器托管提供bgp線路100M獨享,G口帶寬及機柜租用的專業(yè)成都idc公司。
延遲任務(wù)的使用場景有以下這些:
- 紅包 24 小時未被查收,需要延遲執(zhí)退還業(yè)務(wù);
- 每個月賬單日,需要給用戶發(fā)送當(dāng)月的對賬單;
- 訂單下單之后 30 分鐘后,用戶如果沒有付錢,系統(tǒng)需要自動取消訂單。
等事件都需要使用延遲任務(wù)。
延遲任務(wù)實現(xiàn)思路分析
延遲任務(wù)實現(xiàn)的關(guān)鍵是在某個時間節(jié)點執(zhí)行某個任務(wù)?;谶@個信息我們可以想到實現(xiàn)延遲任務(wù)的手段有以下兩個:
- 自己手寫一個“死循環(huán)”一直判斷當(dāng)前時間節(jié)點有沒有要執(zhí)行的任務(wù);
- 借助 JDK 或者第三方提供的工具類來實現(xiàn)延遲任務(wù)。
而通過 JDK 實現(xiàn)延遲任務(wù)我們能想到的關(guān)鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務(wù)執(zhí)行方法就有很多了,例如:Redis、Netty、MQ 等手段。
延遲任務(wù)實現(xiàn)
下面我們將結(jié)合代碼來講解每種延遲任務(wù)的具體實現(xiàn)。
1.無限循環(huán)實現(xiàn)延遲任務(wù)
此方式我們需要開啟一個無限循環(huán)一直掃描任務(wù),然后使用一個 Map 集合用來存儲任務(wù)和延遲執(zhí)行的時間,實現(xiàn)代碼如下:
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 延遲任務(wù)執(zhí)行方法匯總
*/
public class DelayTaskExample {
// 存放定時任務(wù)
private static Map_TaskMap = new HashMap<>();
public static void main(String[] args) {
System.out.println("程序啟動時間:" + LocalDateTime.now());
// 添加定時任務(wù)
_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延遲 3s
// 調(diào)用無限循環(huán)實現(xiàn)延遲任務(wù)
loopTask();
}
/**
* 無限循環(huán)實現(xiàn)延遲任務(wù)
*/
public static void loopTask() {
Long itemLong = 0L;
while (true) {
Iterator it = _TaskMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
itemLong = (Long) entry.getValue();
// 有任務(wù)需要執(zhí)行
if (Instant.now().toEpochMilli() >= itemLong) {
// 延遲任務(wù),業(yè)務(wù)邏輯執(zhí)行
System.out.println("執(zhí)行任務(wù):" + entry.getKey() +
" ,執(zhí)行時間:" + LocalDateTime.now());
// 刪除任務(wù)
_TaskMap.remove(entry.getKey());
}
}
}
}
}
以上程序執(zhí)行的結(jié)果為:
程序啟動時間:2020-04-12T18:51:28.188
執(zhí)行任務(wù):task-1 ,執(zhí)行時間:2020-04-12T18:51:31.189
可以看出任務(wù)延遲了 3s 鐘執(zhí)行了,符合我們的預(yù)期。
2.Java API 實現(xiàn)延遲任務(wù)
Java API 提供了兩種實現(xiàn)延遲任務(wù)的方法:DelayQueue 和 ScheduledExecutorService。
① ScheduledExecutorService 實現(xiàn)延遲任務(wù)
我們可以使用 ScheduledExecutorService 來以固定的頻率一直執(zhí)行任務(wù),實現(xiàn)代碼如下:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序啟動時間:" + LocalDateTime.now());
scheduledExecutorServiceTask();
}
/**
* ScheduledExecutorService 實現(xiàn)固定頻率一直循環(huán)執(zhí)行任務(wù)
*/
public static void scheduledExecutorServiceTask() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
// 執(zhí)行任務(wù)的業(yè)務(wù)代碼
System.out.println("執(zhí)行任務(wù)" +
" ,執(zhí)行時間:" + LocalDateTime.now());
}
},
2, // 初次執(zhí)行間隔
2, // 2s 執(zhí)行一次
TimeUnit.SECONDS);
}
}以上程序執(zhí)行的結(jié)果為:
程序啟動時間:2020-04-12T21:28:10.416
執(zhí)行任務(wù) ,執(zhí)行時間:2020-04-12T21:28:12.421
執(zhí)行任務(wù) ,執(zhí)行時間:2020-04-12T21:28:14.422
......
可以看出使用 ScheduledExecutorService#scheduleWithFixedDelay(...) 方法之后,會以某個頻率一直循環(huán)執(zhí)行延遲任務(wù)。
② DelayQueue 實現(xiàn)延遲任務(wù)
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現(xiàn) Delayed 接口,并重寫 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,DelayQueue 實現(xiàn)延遲隊列的完整代碼如下:
public class DelayTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
// 添加延遲任務(wù)
delayQueue.put(new DelayElement(1000));
delayQueue.put(new DelayElement(3000));
delayQueue.put(new DelayElement(5000));
System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 執(zhí)行延遲任務(wù)
System.out.println(delayQueue.take());
}
System.out.println("結(jié)束時間:" + DateFormat.getDateTimeInstance().format(new Date()));
}
static class DelayElement implements Delayed {
// 延遲截止時間(單面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 獲取剩余時間
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 隊列里元素的排序依據(jù)
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
}開始時間:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
結(jié)束時間:2020-4-12 20:40:43
3.Redis 實現(xiàn)延遲任務(wù)
使用 Redis 實現(xiàn)延遲任務(wù)的方法大體可分為兩類:通過 zset 數(shù)據(jù)判斷的方式,和通過鍵空間通知的方式。
① 通過數(shù)據(jù)判斷的方式
我們借助 zset 數(shù)據(jù)類型,把延遲任務(wù)存儲在此數(shù)據(jù)集合中,然后在開啟一個無線循環(huán)查詢當(dāng)前時間的所有任務(wù)進行消費,實現(xiàn)代碼如下(需要借助 Jedis 框架):
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class DelayQueueExample {
// zset key
private static final String _KEY = "myDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// 延遲 30s 執(zhí)行(30s 后的時間)
long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
jedis.zadd(_KEY, delayTime, "order_1");
// 繼續(xù)添加測試數(shù)據(jù)
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
// 開啟延遲隊列
doDelayQueue(jedis);
}
/**
* 延遲隊列消費
* @param jedis Redis 客戶端
*/
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// 當(dāng)前時間
Instant nowInstant = Instant.now();
long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間
long nowSecond = nowInstant.getEpochSecond();
// 查詢當(dāng)前時間的所有任務(wù)
Setdata = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
for (String item : data) {
// 消費任務(wù)
System.out.println("消費:" + item);
}
// 刪除已經(jīng)執(zhí)行的任務(wù)
jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
Thread.sleep(1000); // 每秒輪詢一次
}
}
}
② 通過鍵空間通知
默認(rèn)情況下 Redis 服務(wù)器端是不開啟鍵空間通知的,需要我們通過 config set notify-keyspace-events Ex 的命令手動開啟,開啟鍵空間通知后,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現(xiàn)了給每個人開啟一個定時任務(wù)的功能,實現(xiàn)代碼如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "__keyevent@0__:expired"; // 訂閱頻道名稱
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 執(zhí)行定時任務(wù)
doTask(jedis);
}
/**
* 訂閱過期消息,執(zhí)行定時任務(wù)
* @param jedis Redis 客戶端
*/
public static void doTask(Jedis jedis) {
// 訂閱過期消息
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收到消息,執(zhí)行定時任務(wù)
System.out.println("收到消息:" + message);
}
}, _TOPIC);
}
}
4.Netty 實現(xiàn)延遲任務(wù)
Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基于 NIO 的客戶、服務(wù)器端的編程框架,使用 Netty 可以確保你快速和簡單的開發(fā)出一個網(wǎng)絡(luò)應(yīng)用,例如實現(xiàn)了某種協(xié)議的客戶、服務(wù)端應(yīng)用。Netty 相當(dāng)于簡化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如:基于 TCP 和 UDP 的 socket 服務(wù)開發(fā)。
可以使用 Netty 提供的工具類 HashedWheelTimer 來實現(xiàn)延遲任務(wù),實現(xiàn)代碼如下。
首先在項目中添加 Netty 引用,配置如下:
io.netty
netty-common
4.1.48.Final
Netty 實現(xiàn)的完整代碼如下:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序啟動時間:" + LocalDateTime.now());
NettyTask();
}
/**
* 基于 Netty 的延遲任務(wù)
*/
private static void NettyTask() {
// 創(chuàng)建延遲任務(wù)實例
HashedWheelTimer timer = new HashedWheelTimer(3, // 時間間隔
TimeUnit.SECONDS,
100); // 時間輪中的槽數(shù)
// 創(chuàng)建一個任務(wù)
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("執(zhí)行任務(wù)" +
" ,執(zhí)行時間:" + LocalDateTime.now());
}
};
// 將任務(wù)添加到延遲隊列中
timer.newTimeout(task, 0, TimeUnit.SECONDS);
}
}以上程序執(zhí)行的結(jié)果為:
程序啟動時間:2020-04-13T10:16:23.033
執(zhí)行任務(wù) ,執(zhí)行時間:2020-04-13T10:16:26.118
HashedWheelTimer 是使用定時輪實現(xiàn)的,定時輪其實就是一種環(huán)型的數(shù)據(jù)結(jié)構(gòu),可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執(zhí)行的超時任務(wù),同時有一個指針一格一格的走,走到那個格子時就執(zhí)行格子對應(yīng)的延遲任務(wù),如下圖所示:
以上的圖片可以理解為,時間輪大小為 8,某個時間轉(zhuǎn)一格(例如 1s),每格指向一個鏈表,保存著待執(zhí)行的任務(wù)。
5.MQ 實現(xiàn)延遲任務(wù)
如果專門開啟一個 MQ 中間件來執(zhí)行延遲任務(wù),就有點殺雞用宰牛刀般的奢侈了,不過已經(jīng)有了 MQ 環(huán)境的話,用它來實現(xiàn)延遲任務(wù)的話,還是可取的。
幾乎所有的 MQ 中間件都可以實現(xiàn)延遲任務(wù),在這里更準(zhǔn)確的叫法應(yīng)該叫延隊列。本文就使用 RabbitMQ 為例,來看它是如何實現(xiàn)延遲任務(wù)的。
RabbitMQ 實現(xiàn)延遲隊列的方式有兩種:
- 通過消息過期后進入死信交換器,再由交換器轉(zhuǎn)發(fā)到延遲消費隊列,實現(xiàn)延遲功能;
- 使用 rabbitmq-delayed-message-exchange 插件實現(xiàn)延遲功能。
注意:延遲插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依賴 Erlang/OPT 18.0 及以上運行環(huán)境。
由于使用死信交換器比較麻煩,所以推薦使用第二種實現(xiàn)方式 rabbitmq-delayed-message-exchange 插件的方式實現(xiàn)延遲隊列的功能。
首先,我們需要下載并安裝 rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html
選擇相應(yīng)的對應(yīng)的版本進行下載,然后拷貝到 RabbitMQ 服務(wù)器目錄,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 開啟插件,在使用命令 rabbitmq-plugins list 查詢安裝的所有插件,安裝成功如下圖所示:
最后重啟 RabbitMQ 服務(wù),使插件生效。
首先,我們先要配置消息隊列,實現(xiàn)代碼如下:
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默認(rèn)的交換機
@Bean
CustomExchange customExchange() {
Mapargs = new HashMap<>();
args.put("x-delayed-type", "direct");
//參數(shù)二為類型:必須是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 綁定隊列到交換器
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
然后添加增加消息的代碼,具體實現(xiàn)如下:
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("發(fā)送時間:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000);
return message;
}
});
}
}
再添加消費消息的代碼:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收時間:" + sdf.format(new Date()));
System.out.println("消息內(nèi)容:" + msg);
}
}
最后,我們使用代碼測試
當(dāng)前名稱:實戰(zhàn):十種實現(xiàn)延遲任務(wù)的方法,附代碼!
瀏覽地址:http://www.dlmjj.cn/article/dpipges.html


咨詢
建站咨詢
