來源:https://blog.csdn.net/wantnrun/article/details/80401641spring
場景數據庫
開發中常常須要用到定時任務,對於商城來講,定時任務尤爲多,好比優惠券定時過時、訂單定時關閉、微信支付2小時未支付關閉訂單等等,都須要用到定時任務,可是定時任務自己有一個問題,通常來講咱們都是經過定時輪詢查詢數據庫來判斷是否有任務須要執行,也就是說無論怎麼樣,咱們須要先查詢數據庫,並且有些任務對時間準確要求比較高的,須要每秒查詢一次,對於系統小卻是無所謂,若是系統自己就大並且數據也多的狀況下,這就不大現實了,因此須要其餘方式的,固然實現的方式有多種多樣的,好比Redis實現定時隊列、基於優先級隊列的JDK延遲隊列、時間輪等。由於咱們項目中自己就使用到了Rabbitmq,因此基於方便開發和維護的原則,咱們使用了Rabbitmq延遲隊列來實現定時任務,不知道rabbitmq是什麼的和不知道springboot怎麼集成Rabbitmq的能夠查看我以前的文章Spring boot集成RabbitMQspringboot
Rabbitmq延遲隊列Rabbitmq自己是沒有延遲隊列的,只能經過Rabbitmq自己隊列的特性來實現,想要Rabbitmq實現延遲隊列,須要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)微信
一個消息在知足以下條件下,會進死信交換機,記住這裏是交換機而不是隊列,一個交換機能夠對應不少隊列。dom
一個消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。ide
上面的消息的TTL到了,消息過時了。微信支付
隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。ui
死信交換機就是普通的交換機,只是由於咱們把過時的消息扔進去,因此叫死信交換機,並非說死信交換機是某種特定的交換機.net
消息的TTL就是消息的存活時間。RabbitMQ能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。若是隊列設置了,消息也設置了,那麼會取小的。因此一個消息若是被路由到不一樣的隊列中,這個消息死亡的時間有可能不同(不一樣的隊列設置)。這裏單講單個消息的TTL,由於它纔是實現延遲任務的關鍵。code
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
能夠經過設置消息的expiration字段或者x-message-ttl屬性來設置時間,二者是同樣的效果。只是expiration字段是字符串參數,因此要寫個int類型的字符串:當上面的消息扔到隊列中後,過了60秒,若是沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有「死掉」的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去
如圖所示,就是建立一個普通的交換機,這裏爲了方便區分,把交換機的名字取爲:delay
這個隊列的主要做用是讓消息定時過時的,好比咱們須要2小時候關閉訂單,咱們就須要把消息放進這個隊列裏面,把消息過時時間設置爲2小時
建立一個一個名爲delay_queue1的自動過時的隊列,固然圖片上面的參數並不會讓消息自動過時,由於咱們並無設置x-message-ttl參數,若是整個隊列的消息有消息都是相同的,能夠設置,這裏爲了靈活,因此並無設置,另外兩個參數x-dead-letter-exchange表明消息過時後,消息要進入的交換機,這裏配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過時後,進入死信交換機的routing-key,跟發送消息的routing-key一個道理,根據這個key將消息放入不一樣的隊列
這個隊列纔是真正處理消息的隊列,全部進入這個隊列的消息都會被處理
消息隊列的名字爲delay_queue2
消息隊列綁定到交換機進入交換機詳情頁面,將建立的2個隊列(delayqueue1和delayqueue2)綁定到交換機上面
自動過時消息隊列的routing key 設置爲delay
綁定delayqueue2
delayqueue2 的key要設置爲建立自動過時的隊列的x-dead-letter-routing-key參數,這樣當消息過時的時候就能夠自動把消息放入delay_queue2這個隊列中了
綁定後的管理頁面以下圖:
固然這個綁定也可使用代碼來實現,只是爲了直觀表現,因此本文使用的管理平臺來操做
發送消息String msg = "hello word"; MessageProperties messageProperties = newMessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = newMessage(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay",message);
主要的代碼就是
messageProperties.setExpiration("6000");
設置了讓消息6秒後過時
注意:由於要讓消息自動過時,因此必定不能設置delay_queue1的監聽,不能讓這個隊列裏面的消息被接受到,不然消息一旦被消費,就不存在過時了
接收消息接收消息配置好delay_queue2的監聽就行了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
/** 消息交換機的名字*/
public static final String EXCHANGE = "delay";
/** 隊列key1*/
public static final String ROUTINGKEY1 = "delay";
/** 隊列key2*/
public static final String ROUTINGKEY2 = "delay_key";
/**
* 配置連接信息
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必需要設置
return connectionFactory;
}
/**
* 配置消息交換機
* 針對消費者配置
FanoutExchange: 將消息分發到全部的綁定隊列,無routingkey的概念
HeadersExchange :經過添加屬性key-value匹配
DirectExchange:按照routingkey分發到指定隊列
TopicExchange:多關鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息隊列2
* 針對消費者配置
* @return
*/
@Bean
public Queue queue() {
return new Queue("delay_queue2", true); //隊列持久
}
/**
* 將消息隊列2與交換機綁定
* 針對消費者配置
* @return
*/
@Bean
@Autowired
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的監聽,這個監聽會接受消息隊列1的消息
* 針對消費者配置
* @return
*/
@Bean
@Autowired
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
}
});
return container;
}
}
在消息監聽中處理須要定時處理的任務就行了,由於Rabbitmq能發送消息,因此能夠把任務特徵碼發過來,好比關閉訂單就把訂單id發過來,這樣就避免了須要查詢一下那些訂單須要關閉而加劇MySQL負擔了,畢竟一旦訂單量大的話,查詢自己也是一件很費IO的事情
總結基於Rabbitmq實現定時任務,就是將消息設置一個過時時間,放入一個沒有讀取的隊列中,讓消息過時後自動轉入另一個隊列中,監控這個隊列消息的監聽處來處理定時任務具體的操做
https://mp.weixin.qq.com/s/XtjPANZhbgvDYz06Q41CgQ