<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
在上圖的模型中,有如下概念:java
- 生產者,也就是要發送消息的程序
- 消費者:消息的接受者,會一直等待消息到來。
- 消息隊列:圖中紅色部分。相似一個郵箱,能夠緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
全部的中間件技術都是基於
tcp/ip
協議基礎之上構建新型的協議規範,只不過rabbitmq
遵循的是amqp
緩存
實現步驟:安全
- 建立鏈接工程
- 建立鏈接 connection
- 經過鏈接獲取通道 Channel
- 經過通道建立交換機,聲明隊列,綁定關係,路由key,發送消息,和接收消息
- 準備消息內容
- 發送消息給隊列 queue
- 關閉鏈接
- 關閉通道
public class Producer { public static void main(String[] args) { //1. 建立鏈接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); //這裏要使用本身的IP地址 connectionFactory.setHost("192.168.57.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //2. 建立鏈接 connection connection = connectionFactory.newConnection("生產者"); //3. 經過鏈接獲取通道 Channel channel = connection.createChannel(); //4. 經過通道建立交換機,聲明隊列,綁定關係,路由key,發送消息 String quequeName = "queuel"; /** * @params1 隊列的名稱 * @params2 是否要持久化 durable-false * @params3 排他性,是不是獨佔獨立 * @params4 是否自動刪除,隨着最後一個消費者消息完畢之後是否把隊列自動刪除 * @params5 攜帶的附屬參數 */ channel.queueDeclare(quequeName,false,false,false,null); //5. 準備消息內容 String message = "Hello,Consumer"; //6. 發送消息給隊列 queue channel.basicPublish("",quequeName,null,message.getBytes()); System.out.println("消息發送成功"); } catch (Exception e) { e.printStackTrace(); }finally { //7. 關閉鏈接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //8. 關閉通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
public class Consumer { public static void main(String[] args) { //1. 建立鏈接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); //這裏要使用本身的IP地址 connectionFactory.setHost("192.168.57.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //2. 建立鏈接 connection connection = connectionFactory.newConnection("消費者"); //3. 經過鏈接獲取通道 Channel channel = connection.createChannel(); //4. 經過通道建立交換機,聲明隊列,綁定關係,路由key,發送消息,和接收消息 String quequeName = "queue1"; channel.queueDeclare(quequeName,false,false,false,null); //5.監聽消息 DefaultConsumer consumer = new DefaultConsumer(channel){ /* consumerTag:消息者標籤,channel.basicConsume能夠指定 envelope:消息包內容,可從中獲取消息id,消息routing key,交換機,消息和重裝標記(收到消息失敗後是否須要從新發送) properties:消息屬性 body;消息 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key爲:"+ envelope.getRoutingKey()); //交換機 System.out.println("交換機爲:"+ envelope.getExchange()); //消息id System.out.println("消息id爲:"+ envelope.getDeliveryTag()); //收到的消息 System.out.println("接收到的消息:"+ new String(body,"UTF-8")); System.out.println(""); System.out.println("======================================================"); System.out.println(""); } }; channel.basicConsume("queue1", true, consumer); } catch (Exception e) { e.printStackTrace(); }finally { //6. 不關閉資源,一直監聽 } } }
AMQP 一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。服務器
AMQP是一個二進制協議,擁有一些現代化特色:
多信道
、協商式
,異步
,安全
,擴平臺
,中立
,高效
。網絡RabbitMQ 是 AMQP協議 的 Erlang的實現。異步
概念 | 說明 |
---|---|
鏈接 Connection | 一個網絡鏈接,例如:TCP/IP套接字鏈接。 |
會話 Session | 端點之間的命名對話。在一個會話上下文中,保證「剛好傳遞一次」。 |
信道 Channel | 多路複用鏈接中的一條獨立的雙向數據流通道。爲會話提供物理傳輸介質。 |
客戶端 Client | AMQP鏈接或者會話的發起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。 |
服務節點Broker | 消息中間件的服務節點。通常狀況下能夠將一個RabbitMQ Broker看做一臺RabbitMQ 服務器。 |
端點 | AMQP對話的任意一方。一個AMQP鏈接包括兩個端點(一個是客戶端,一個是服務器)。 |
消費者 Consumer | 一個從消息隊列裏請求消息的客戶端程序。 |
生產者 Producer | 一個向交換機發布消息的客戶端應用程序。 |
以 入門案例 爲例maven
newConnection()
方法 , 會進一步封裝 Protocol Header 0-9-1
的報文頭髮送給 Broker
,以此通知Broker
本次交互採用的是 AMQP 0-9-1
協議,緊接着 Broker
返回 Connection.Start
來創建鏈接,在鏈接的過程當中涉及 Connection.Start/.Start-OK
、Connection.Tune/.Tune-Ok
,Connection.Open/ .Open-Ok
這6 個命令的交互。connection.createChannel
方法。此方法開啓信道,其包裝的 channel.open
命令發送給 Broker
, 等待 channel.basicPublish
方法,對應的AMQP命令爲 Basic.Publish
, 這個命令包含了content Header
和content Body()
。content Header 包含了消息體的屬性,例如:投遞模式,優先級等,content Body 包含了消息體自己。Channel.Close和Channl.Close-Ok
與Connetion.Close和Connection.Close-Ok
的命令交互。newConnection()
方法,這個方法會進一步封裝 Protocol Header 0-9-1
的報文頭髮送給Broker ,以此通知Broker 本次交互採用的是 AMQP 0-9-1
協議,緊接着Broker 返回Connection.Start
來創建鏈接,在鏈接的過程當中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok
這6 個命令的交互。connection.createChannel
方法。和生產者客戶端同樣,協議涉及Channel . Open/Open-Ok
命令。Basic.Consume
命令(即調用channel.basicConsume
方法〉將Channel 置爲接收模式,以後Broker 回執 Basic . Consume - Ok
以告訴消費者客戶端準備好消費消息。Basic.Deliver
命令,這個命令和 Basic.Publish
命令同樣會攜帶 Content Header 和Content Body。
Basic.Ack
命令。Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok
的命令交互。我的博客爲:
MoYu's HomePagetcp