本章咱們來一次快速入門RabbitMQ——生產者與消費者。須要構建一個生產端與消費端的模型。什麼意思呢?咱們的生產者發送一條消息,投遞到RabbitMQ集羣也就是Broker。 咱們的消費端進行監聽RabbitMQ,當發現隊列中有消息後,就進行消費。html
本次整合主要採用SpringBoot框架,須要對SpringBoot的使用有必定了解。mysql
咱們來看下大概步驟:git
這個鏈接工廠須要配置一些相應的信息,例如: RabbitMQ節點的地址,端口號,VirtualHost等等。 Channel是咱們RabbitMQ全部消息進行交互的關鍵。github
/**
*
* @ClassName: ConnectionUtils
* @Description: 鏈接工具類
* @author Coder編程
* @date 2019年6月21日 上午22:28:22
*
*/
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException {
//定義鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
//設置服務地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);//amqp協議 端口 相似與mysql的3306
//設置帳號信息,用戶名、密碼、vhost
factory.setVirtualHost("/vhost_cp");
factory.setUsername("user_cp");
factory.setPassword("123456");
// 經過工程獲取鏈接
Connection connection = factory.newConnection();
return connection;
}
}
複製代碼
/**
*
* @ClassName: Producer
* @Description: 生產者
* @author Coder編程
* @date 2019年7月30日 上午21:04:43
*
*/
public class Producer {
public static void main(String[] args) throws Exception {
System.out.println("Producer start...");
//1 建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2 經過connection建立一個Channel
Channel channel = connection.createChannel();
//3 經過Channel發送數據
for(int i=0; i < 5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", null, msg.getBytes());
}
//4 記得要關閉相關的鏈接
channel.close();
connection.close();
}
}
複製代碼
/**
*
* @ClassName: Consumer
* @Description: 消費端
* @author Coder編程
* @date 2019年7月30日 上午21:08:12
*
*/
public class Consumer {
public static void main(String[] args) throws Exception {
System.out.println("Consumer start...");
//1 建立ConnectionFactory
Connection connection = ConnectionUtils.getConnection();
//2經過connection建立一個Channel
Channel channel = connection.createChannel();
//3聲明(建立)一個隊列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//4建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//5設置Channel
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
//6 獲取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消費端: " + msg);
//Envelope envelope = delivery.getEnvelope();
}
}
}
複製代碼
channel.queueDeclare(queueName, true, false, false, null);
複製代碼
第一個參數:queuename:隊列的名稱 第二個參數:durable 是否持久化。true消息會持久化到本地,保證重啓服務後消息不會丟失 第三個參數:exclusive :表示獨佔方式,設置爲true 在某些情景下有必要,例如:順序消費。表示只有一個channel能夠去監聽,其餘channel都不可以監聽。目的就是爲了保證順序消費。 第四個參數:autoDelete:隊列若是與Exchange未綁定,則自動刪除 第五個參數:arguments:擴展參數面試
channel.basicConsume(QUEUE_NAME, true, consumer);
複製代碼
第二個參數 autoAck:自動簽收消息sql
(1)啓動消費端編程
(2)查看管控臺bash
(3)運行生產端框架
(4) 問題
注意:
這裏面可能有一個問題:爲何要先啓動消費端呢?
由於在消費端建立的隊列,咱們必需要有隊列,纔可以發送消息。
另外一個問題:在生產端代碼中:
channel.basicPublish("", "test001", null, msg.getBytes());
複製代碼
並無設置exchange,只設置了隊列名稱,消費端卻依然可以消費到消息,這是爲何呢?
答:發消息的必定要指定Exchange,若是不指定Exchange或者Exchange爲空的話,它會默認走第一個
歡迎關注我的微信公衆號:Coder編程 獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識! 新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。
文章收錄至 Github: github.com/CoderMerlin… Gitee: gitee.com/573059382/c… 歡迎關注並star~
![]()
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(二)各大主流消息中間件綜合對比介紹!