本章咱們來一次快速入門RabbitMQ——生產者與消費者。須要構建一個生產端與消費端的模型。什麼意思呢?咱們的生產者發送一條消息,投遞到RabbitMQ集羣也就是Broker。
咱們的消費端進行監聽RabbitMQ,當發現隊列中有消息後,就進行消費。html
本次整合主要採用SpringBoot框架,須要對SpringBoot的使用有必定了解。java
咱們來看下大概步驟:mysql
這個鏈接工廠須要配置一些相應的信息,例如: RabbitMQ節點的地址,端口號,VirtualHost等等。
Channel是咱們RabbitMQ全部消息進行交互的關鍵。git
/** * * @ClassName: ConnectionUtils * @Description: 鏈接工具類 * @author Coder編程 * @date 2019年6月21日 上午22:28:22 * */ public class ConnectionUtils { public static Connection getConnection() throws IOException, TimeoutException { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("127.0.0.1"); //端口 factory.setPort(5672);//amqp協議 端口 相似與mysql的3306 //設置帳號信息,用戶名、密碼、vhost factory.setVirtualHost("/vhost_cp"); factory.setUsername("user_cp"); factory.setPassword("123456"); // 經過工程獲取鏈接 Connection connection = factory.newConnection(); return connection; } }
/** * * @ClassName: Producer * @Description: 生產者 * @author Coder編程 * @date 2019年7月30日 上午21:04:43 * */ public class Producer { public static void main(String[] args) throws Exception { System.out.println("Producer start..."); //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2 經過connection建立一個Channel Channel channel = connection.createChannel(); //3 經過Channel發送數據 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes()); } //4 記得要關閉相關的鏈接 channel.close(); connection.close(); } }
/** * * @ClassName: Consumer * @Description: 消費端 * @author Coder編程 * @date 2019年7月30日 上午21:08:12 * */ public class Consumer { public static void main(String[] args) throws Exception { System.out.println("Consumer start..."); //1 建立ConnectionFactory Connection connection = ConnectionUtils.getConnection(); //2經過connection建立一個Channel Channel channel = connection.createChannel(); //3聲明(建立)一個隊列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); //4建立消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5設置Channel channel.basicConsume(queueName, true, queueingConsumer); while(true){ //6 獲取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費端: " + msg); //Envelope envelope = delivery.getEnvelope(); } } }
channel.queueDeclare(queueName, true, false, false, null);
第一個參數:queuename:隊列的名稱
第二個參數:durable 是否持久化。true消息會持久化到本地,保證重啓服務後消息不會丟失
第三個參數:exclusive :表示獨佔方式,設置爲true 在某些情景下有必要,例如:順序消費。表示只有一個channel能夠去監聽,其餘channel都不可以監聽。目的就是爲了保證順序消費。
第四個參數:autoDelete:隊列若是與Exchange未綁定,則自動刪除
第五個參數:arguments:擴展參數github
channel.basicConsume(QUEUE_NAME, true, consumer);
第二個參數 autoAck:自動簽收消息面試
(1)啓動消費端
sql
(2)查看管控臺
編程
能夠看到已經有一個鏈接,一個信道,一個消費者等信息了。微信
能夠看到信道目前的狀態是空閒狀態。框架
隊列中多了test001隊列。
關於管控臺的介紹能夠看這篇文章:消息中間件——RabbitMQ(四)命令行與管控臺的基本操做!
(3)運行生產端
能夠看到生產端發送完消息以後停下了,消費端迅速接收到了消息。也能夠繼續經過管控臺觀察消費的狀況。
(4) 問題
注意:
這裏面可能有一個問題:爲何要先啓動消費端呢?
由於在消費端建立的隊列,咱們必需要有隊列,纔可以發送消息。
另外一個問題:在生產端代碼中:
channel.basicPublish("", "test001", null, msg.getBytes());
並無設置exchange,只設置了隊列名稱,消費端卻依然可以消費到消息,這是爲何呢?
答:發消息的必定要指定Exchange,若是不指定Exchange或者Exchange爲空的話,它會默認走第一個
它的路由規則:將相同命名的隊列Queue的消息路由過去,若是路由不過去,將會把消息刪除。
歡迎關注我的微信公衆號:Coder編程
獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。
文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~
參考文章:
https://www.cnblogs.com/myJavaEE/p/6665166.html
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(二)各大主流消息中間件綜合對比介紹!