那麼今日份就來了解一下怎麼用RabbitMQ
搞定分佈式下的超時訂單html
沒錯,做爲一名有追求的程序員,咱們接着上一篇文章實戰|我仍是很建議你用DelayQueue搞定超時訂單的-(1) Go on。java
傲嬌的RabbitMQ官網赫然寫着:git
RabbitMQ is the most widely deployed open source message broker.
複製代碼
因而可知,RabbitMQ是一個消息中間件,生產者生成消息,消費者消費消息,它遵循AMQP(高級消息隊列協議),是最普遍部署的開源消息代理。 因此,今天我用RabbitMQ爲你們搗鼓一下延遲隊列。程序員
使用RabbitMQ來實現延遲任務必須先了解RabbitMQ的兩個概念:消息的TTL和死信Exchange,經過這二者的組合來實現上述需求。github
消息的TTL
就是消息的存活時間。RabbitMQ 能夠對隊列和消息分別設置TTL
。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。若是隊列設置了,消息也設置了,那麼會取小的(誰小誰尷尬
)。因此一個消息若是被路由到不一樣的隊列中,這個消息死亡的時間有可能不同(不一樣的隊列設置)。這裏單講單個消息的TTL,由於它纔是實現延遲任務的關鍵。windows
那麼,如何設置這個TTL值呢?有兩種方式,第一種是在建立隊列的時候設置隊列的"x-message-ttl"
屬性,以下:api
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
複製代碼
這樣全部被投遞到該隊列的消息都最多不會存活超過6s。bash
另外一種方式即是針對每條消息設置TTL,代碼以下:app
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
複製代碼
這樣這條消息的過時時間也被設置成了6s。異步
但這兩種方式是有區別的,若是設置了隊列的TTL屬性,那麼一旦消息過時,就會被隊列丟棄,而第二種方式,消息即便過時,也不必定會被立刻丟棄,由於消息是否過時是在即將投遞到消費者以前斷定的,若是當前隊列有嚴重的消息積壓狀況,則已過時的消息也許還能存活較長時間。 另外,還須要注意的一點是,若是不設置TTL,表示消息永遠不會過時,若是將TTL設置爲0,則表示除非此時能夠直接投遞該消息到消費者,不然該消息將會被丟棄。
單靠死信還不能實現延遲任務,還要靠Dead Letter Exchange
。
Exchage
的概念在這裏就不在贅述。一個消息在知足以下條件下,會進死信路由,記住這裏是路由而不是隊列,一個路由能夠對應不少隊列。
Consumer
拒收了,而且reject
方法的參數裏requeue
是false
。也就是說不會被再次放在隊列裏,被其餘消費者使用。TTL
到了,消息就過時了。Dead Letter Exchange
其實就是一種普通的exchange
,和建立其餘exchange
沒有兩樣。只是在某一個設置Dead Letter Exchange
的隊列中有消息過時了,會自動觸發消息的轉發,發送到Dead Letter Exchange
中去。
延遲任務經過消息的TTL
和Dead Letter Exchange
來實現。咱們須要創建2個隊列,一個用於發送消息,一個用於消息過時後的轉發目標隊列。
生產者生產一條延時消息,根據須要延時時間的不一樣,利用不一樣的routingkey
將消息路由到不一樣的延時隊列,每一個隊列都設置了不一樣的TTL
屬性,並綁定在同一個死信交換機中,消息過時後,根據routingkey
的不一樣,又會被路由到不一樣的死信隊列中,消費者只須要監聽對應的死信隊列進行處理便可。
rabbitmq-plugins enable rabbitmq_management
複製代碼
開啓Web管理插件,而後啓動rabbitmq-server
訪問http://localhost:15672/#/
,輸入密令後你能看到就能夠啦.
在 RabbitMQ 3.6.x
以前咱們通常採用死信隊列(DLX)+TTL過時時間
來實現延遲隊列,咱們這裏不作過多介紹,能夠參考其餘道友的:TTL+DLX
實現方式。
在 RabbitMQ 3.6.x
開始(如今都3.8.+
了),RabbitMQ 官方提供了延遲隊列的插件,能夠下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載地址:
3.7.x
的,可是3.8.0
是向下兼容3.7.x
的,而後我又在Bintray
找到了3.7.x
,你們信不過就找對應的版本插件哈....plugins
的目錄中,運行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
複製代碼
#集成 rabbitmq
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 150000
publisher-confirms: true #開啓確認機制 採用消息確認模式,
publisher-returns: true #開啓return確認機制
template: #消息發出去後,異步等待響應
mandatory: true #設置爲 true 後,消費者在消息沒有被路由到合適隊列狀況下會被return監聽,而不會自動刪除
複製代碼
@Configuration
public class MQConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
public static final String DELAY_EXCHANGE = "Ex.DelayExchange";
public static final String DELAY_QUEUE = "MQ.DelayQueue";
public static final String DELAY_KEY = "delay.#";
/** * 延時交換機 * * @return TopicExchange */
@Bean
public TopicExchange delayExchange() {
Map<String, Object> pros = new HashMap<>(3);
//設置交換機支持延遲消息推送
pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(DELAY_EXCHANGE, true, false, pros);
//咱們在也能夠在 Exchange 的聲明中能夠設置exchange.setDelayed(true)來開啓延遲隊列
exchange.setDelayed(true);
return exchange;
}
/** * 延時隊列 * * @return Queue */
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE, true);
}
/** * 綁定隊列和交換機,以及設定路由規則key * * @return Binding */
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_KEY);
}
}
複製代碼
/** * @author LiJing * @ClassName: MQSender * @Description: MQ發送 生產者 * @date 2019/10/9 11:50 */
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData: " + correlationData);
System.out.println("ack: " + ack);
if (!ack) {
System.out.println("異常處理....");
}
}
};
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange , String routingKey) {
System.out.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
public void sendDelay(Object message, int delayTime) {
//採用消息確認模式,消息發出去後,異步等待響應
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 時間戳 全局惟一
CorrelationData correlationData = new CorrelationData("delay" + System.nanoTime());
//發送消息時指定 header 延遲時間
rabbitTemplate.convertAndSend(MQConfig.DELAY_EXCHANGE, "delay.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//設置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 兩種方式 都可
//message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(delayTime);
return message;
}
}, correlationData);
}
}
複製代碼
/** * @author LiJing * @ClassName: MQReceiver * @Description: 消費者 * @date 2019/10/9 11:51 */
@Component
@Slf4j
public class MQReceiver {
@RabbitListener(queues = MQConfig.DELAY_QUEUE)
@RabbitHandler
public void onDelayMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("延遲隊列在" + LocalDateTime.now()+"時間," + "延遲後收到消息:" + new String(msg.getBody()));
}
}
複製代碼
5.建立一個mq的測試控制器
@RestController
@RequestMapping("/mq")
public class MqController extends AbstractController {
@Autowired
private MQSender mqSender;
@GetMapping(value = "/send/delay")
public void sendDelay(int delayTime) {
String msg = "hello delay";
System.out.println("發送開始時間:" + LocalDateTime.now() + "測試發送delay消息====>" + msg);
mqSender.sendDelay(msg, delayTime);
}
}
複製代碼
http://localhost:8080/api/mq/send/delay?delayTime=6000
http://localhost:8080/api/mq/send/delay?delayTime=10000
複製代碼
延時隊列在須要延時處理的場景下很是有用,使用RabbitMQ來實現延時隊列,能夠很好的利用RabbitMQ的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。
另外,經過RabbitMQ集羣的特性,能夠很好的解決單點故障問題,不會由於單個節點掛掉致使延時隊列不可用或者消息丟失。
固然,延時隊列還有不少其它選擇,好比利用Redis的zset,Quartz或者利用kafka的時間輪,這些方式各有特色,但就像爐石傳說通常,這些知識就比如手裏的卡牌,知道的越多,能夠用的卡牌也就越多,遇到問題便能遊刃有餘,因此須要大量的知識儲備和經驗積累才能打造出更出色的卡牌組合,讓本身解決問題的能力獲得更好的提高。
肥朝告訴我說:聞道有前後,術業有專攻,達者爲師。
那今日份的講解就到此結束,具體的代碼請移步個人gitHub的mybot項目888分支查閱,fork體驗一把,或者評論區留言探討,寫的很差,請多多指教~~