這篇文章的誕生要感謝一位讀者,是他讓這篇優秀的文章有了和你們見面的機會,重點是優秀文章,哈哈。html
事情的通過是這樣的...java
不用謝我,送人玫瑰,手有餘香。相信接下來的內容必定不會讓你失望,由於它將是目前市面上最好的關於「延遲任務」的文章,這也一直是我寫做追求的目標,讓個人每一篇文章都比市面上的好那麼一點點。redis
好了,話很少說,直接進入今天的主題,本文的主要內容以下圖所示: spring
顧明思議,咱們把須要延遲執行的任務叫作延遲任務。編程
延遲任務的使用場景有如下這些:服務器
等事件都須要使用延遲任務。微信
延遲任務實現的關鍵是在某個時間節點執行某個任務。基於這個信息咱們能夠想到實現延遲任務的手段有如下兩個:網絡
而經過 JDK 實現延遲任務咱們能想到的關鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務執行方法就有不少了,例如:Redis、Netty、MQ 等手段。數據結構
下面咱們將結合代碼來說解每種延遲任務的具體實現。框架
此方式咱們須要開啓一個無限循環一直掃描任務,而後使用一個 Map 集合用來存儲任務和延遲執行的時間,實現代碼以下:
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/** * 延遲任務執行方法彙總 */
public class DelayTaskExample {
// 存放定時任務
private static Map<String, Long> _TaskMap = new HashMap<>();
public static void main(String[] args) {
System.out.println("程序啓動時間:" + LocalDateTime.now());
// 添加定時任務
_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延遲 3s
// 調用無限循環實現延遲任務
loopTask();
}
/** * 無限循環實現延遲任務 */
public static void loopTask() {
Long itemLong = 0L;
while (true) {
Iterator it = _TaskMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
itemLong = (Long) entry.getValue();
// 有任務須要執行
if (Instant.now().toEpochMilli() >= itemLong) {
// 延遲任務,業務邏輯執行
System.out.println("執行任務:" + entry.getKey() +
" ,執行時間:" + LocalDateTime.now());
// 刪除任務
_TaskMap.remove(entry.getKey());
}
}
}
}
}
複製代碼
以上程序執行的結果爲:
程序啓動時間:2020-04-12T18:51:28.188
執行任務:task-1 ,執行時間:2020-04-12T18:51:31.189
能夠看出任務延遲了 3s 鍾執行了,符合咱們的預期。
Java API 提供了兩種實現延遲任務的方法:DelayQueue 和 ScheduledExecutorService。
咱們可使用 ScheduledExecutorService 來以固定的頻率一直執行任務,實現代碼以下:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序啓動時間:" + LocalDateTime.now());
scheduledExecutorServiceTask();
}
/** * ScheduledExecutorService 實現固定頻率一直循環執行任務 */
public static void scheduledExecutorServiceTask() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
// 執行任務的業務代碼
System.out.println("執行任務" +
" ,執行時間:" + LocalDateTime.now());
}
},
2, // 初次執行間隔
2, // 2s 執行一次
TimeUnit.SECONDS);
}
}
複製代碼
以上程序執行的結果爲:
程序啓動時間:2020-04-12T21:28:10.416
執行任務 ,執行時間:2020-04-12T21:28:12.421
執行任務 ,執行時間:2020-04-12T21:28:14.422
......
能夠看出使用 ScheduledExecutorService#scheduleWithFixedDelay(...) 方法以後,會以某個頻率一直循環執行延遲任務。
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現 Delayed 接口,並重寫 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,DelayQueue 實現延遲隊列的完整代碼以下:
public class DelayTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
// 添加延遲任務
delayQueue.put(new DelayElement(1000));
delayQueue.put(new DelayElement(3000));
delayQueue.put(new DelayElement(5000));
System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 執行延遲任務
System.out.println(delayQueue.take());
}
System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));
}
static class DelayElement implements Delayed {
// 延遲截止時間(單面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 獲取剩餘時間
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 隊列裏元素的排序依據
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
}
複製代碼
以上程序執行的結果爲:
開始時間:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
結束時間:2020-4-12 20:40:43
使用 Redis 實現延遲任務的方法大致可分爲兩類:經過 zset 數據判斷的方式,和經過鍵空間通知的方式。
咱們藉助 zset 數據類型,把延遲任務存儲在此數據集合中,而後在開啓一個無線循環查詢當前時間的全部任務進行消費,實現代碼以下(須要藉助 Jedis 框架):
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class DelayQueueExample {
// zset key
private static final String _KEY = "myDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// 延遲 30s 執行(30s 後的時間)
long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
jedis.zadd(_KEY, delayTime, "order_1");
// 繼續添加測試數據
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
// 開啓延遲隊列
doDelayQueue(jedis);
}
/** * 延遲隊列消費 * @param jedis Redis 客戶端 */
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// 當前時間
Instant nowInstant = Instant.now();
long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間
long nowSecond = nowInstant.getEpochSecond();
// 查詢當前時間的全部任務
Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
for (String item : data) {
// 消費任務
System.out.println("消費:" + item);
}
// 刪除已經執行的任務
jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
Thread.sleep(1000); // 每秒輪詢一次
}
}
}
複製代碼
默認狀況下 Redis 服務器端是不開啓鍵空間通知的,須要咱們經過 config set notify-keyspace-events Ex
的命令手動開啓,開啓鍵空間通知後,咱們就能夠拿到每一個鍵值過時的事件,咱們利用這個機制實現了給每一個人開啓一個定時任務的功能,實現代碼以下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "__keyevent@0__:expired"; // 訂閱頻道名稱
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 執行定時任務
doTask(jedis);
}
/** * 訂閱過時消息,執行定時任務 * @param jedis Redis 客戶端 */
public static void doTask(Jedis jedis) {
// 訂閱過時消息
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收到消息,執行定時任務
System.out.println("收到消息:" + message);
}
}, _TOPIC);
}
}
複製代碼
Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基於 NIO 的客戶、服務器端的編程框架,使用 Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 至關於簡化和流線化了網絡應用的編程開發過程,例如:基於 TCP 和 UDP 的 socket 服務開發。
可使用 Netty 提供的工具類 HashedWheelTimer 來實現延遲任務,實現代碼以下。
首先在項目中添加 Netty 引用,配置以下:
<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.48.Final</version>
</dependency>
複製代碼
Netty 實現的完整代碼以下:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序啓動時間:" + LocalDateTime.now());
NettyTask();
}
/** * 基於 Netty 的延遲任務 */
private static void NettyTask() {
// 建立延遲任務實例
HashedWheelTimer timer = new HashedWheelTimer(3, // 時間間隔
TimeUnit.SECONDS,
100); // 時間輪中的槽數
// 建立一個任務
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("執行任務" +
" ,執行時間:" + LocalDateTime.now());
}
};
// 將任務添加到延遲隊列中
timer.newTimeout(task, 0, TimeUnit.SECONDS);
}
}
複製代碼
以上程序執行的結果爲:
程序啓動時間:2020-04-13T10:16:23.033
執行任務 ,執行時間:2020-04-13T10:16:26.118
HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,能夠把它想象成一個時鐘,分紅了許多格子,每一個格子表明必定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,以下圖所示:
以上的圖片能夠理解爲,時間輪大小爲 8,某個時間轉一格(例如 1s),每格指向一個鏈表,保存着待執行的任務。
若是專門開啓一個 MQ 中間件來執行延遲任務,就有點殺雞用宰牛刀般的奢侈了,不過已經有了 MQ 環境的話,用它來實現延遲任務的話,仍是可取的。
幾乎全部的 MQ 中間件均可以實現延遲任務,在這裏更準確的叫法應該叫延隊列。本文就使用 RabbitMQ 爲例,來看它是如何實現延遲任務的。
RabbitMQ 實現延遲隊列的方式有兩種:
注意: 延遲插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依賴 Erlang/OPT 18.0 及以上運行環境。
因爲使用死信交換器比較麻煩,因此推薦使用第二種實現方式 rabbitmq-delayed-message-exchange 插件的方式實現延遲隊列的功能。
首先,咱們須要下載並安裝 rabbitmq-delayed-message-exchange 插件,下載地址:www.rabbitmq.com/community-p…
選擇相應的對應的版本進行下載,而後拷貝到 RabbitMQ 服務器目錄,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
開啓插件,在使用命令 rabbitmq-plugins list
查詢安裝的全部插件,安裝成功以下圖所示:
最後重啓 RabbitMQ 服務,使插件生效。
首先,咱們先要配置消息隊列,實現代碼以下:
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默認的交換機
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//參數二爲類型:必須是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 綁定隊列到交換器
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
複製代碼
而後添加增長消息的代碼,具體實現以下:
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("發送時間:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000);
return message;
}
});
}
}
複製代碼
再添加消費消息的代碼:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收時間:" + sdf.format(new Date()));
System.out.println("消息內容:" + msg);
}
}
複製代碼
最後,咱們使用代碼測試一下:
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序執行以後,再退出測試
}
}
複製代碼
以上程序的執行結果以下:
發送時間:2020-04-13 20:47:51
接收時間:2020-04-13 20:47:54
消息內容:Hi Admin.
從結果能夠看出,以上程序執行符合延遲任務的實現預期。
若是你使用的是 Spring 或 SpringBoot 的項目的話,可使用藉助 Scheduled 來實現,本文將使用 SpringBoot 項目來演示 Scheduled 的實現,實現咱們須要聲明開啓 Scheduled,實現代碼以下:
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
複製代碼
而後添加延遲任務,實現代碼以下:
@Component
public class ScheduleJobs {
@Scheduled(fixedDelay = 2 * 1000)
public void fixedDelayJob() throws InterruptedException {
System.out.println("任務執行,時間:" + LocalDateTime.now());
}
}
複製代碼
此時當咱們啓動項目以後就能夠看到任務以延遲了 2s 的形式一直循環執行,結果以下:
任務執行,時間:2020-04-13T14:07:53.349
任務執行,時間:2020-04-13T14:07:55.350
任務執行,時間:2020-04-13T14:07:57.351
...
咱們也可使用 Corn 表達式來定義任務執行的頻率,例如使用 @Scheduled(cron = "0/4 * * * * ?")
。
Quartz 是一款功能強大的任務調度器,能夠實現較爲複雜的調度功能,它還支持分佈式的任務調度。
咱們使用 Quartz 來實現一個延遲任務,首先定義一個執行任務代碼以下:
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
public class SampleJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("任務執行,時間:" + LocalDateTime.now());
}
}
複製代碼
在定義一個 JobDetail 和 Trigger 實現代碼以下:
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SampleScheduler {
@Bean
public JobDetail sampleJobDetail() {
return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
.storeDurably().build();
}
@Bean
public Trigger sampleJobTrigger() {
// 3s 後執行
SimpleScheduleBuilder scheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger")
.withSchedule(scheduleBuilder).build();
}
}
複製代碼
最後在 SpringBoot 項目啓動以後開啓延遲任務,實現代碼以下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/** * SpringBoot 項目啓動後執行 */
public class MyStartupRunner implements CommandLineRunner {
@Autowired
private SchedulerFactoryBean schedulerFactoryBean;
@Autowired
private SampleScheduler sampleScheduler;
@Override
public void run(String... args) throws Exception {
// 啓動定時任務
schedulerFactoryBean.getScheduler().scheduleJob(
sampleScheduler.sampleJobTrigger());
}
}
複製代碼
以上程序的執行結果以下:
2020-04-13 19:02:12.331 INFO 17768 --- [ restartedMain] com.example.demo.DemoApplication : Started DemoApplication in 1.815 seconds (JVM running for 3.088)
任務執行,時間:2020-04-13T19:02:15.019
從結果能夠看出在項目啓動 3s 以後執行了延遲任務。
本文講了延遲任務的使用場景,以及延遲任務的 10 種實現方式:
俗話說:臺上一分鐘,臺下十年功。本文的全部內容皆爲做者多年工做積累的結晶,以及熬夜嘔心瀝血的整理,若是以爲本文有幫助到你,請點個贊再走(您的支持是我不斷前進的動力),謝謝你。
更多精彩內容,請關注微信公衆號「Java中文社羣」