PS:近期在南寧出差,工做比較忙,因此更新會比較慢。web
說到消息通訊,可能咱們首先會想到的是郵箱,QQ,微信,短信等等這些通訊方式,這些通訊方式都有發送者,接收者,還有一箇中間存儲離線消息的容器。可是這些通訊方式和咱們要講的 RabbitMQ 的通訊模型是不同的,好比和郵件的通訊方式相比,郵件服務器基於 POP3/SMTP 協議,通訊雙方須要明確指定,而且發送的郵件內容有固定的結構。而 RabbitMQ 服務器基於 AMQP 協議,這個協議是不須要明確指定發送方和接收方的,並且發送的消息也沒有固定的結構,甚至能夠直接存儲二進制數據,而且和郵件服務器同樣,也能存儲離線消息,最關鍵的是 RabbitMQ 既可以以一對一的方式進行路由,還可以以一對多的方式進行廣播。算法
下面這張圖是大體展現了 RabbitMQ 消息通訊的過程:apache
ps:看不懂不要緊,後面會經過具體的例子進行講解。數組
在 RabbitMQ 的通訊過程當中,有兩個主要的角色:生產者和消費者。類比於郵件通訊的發送方和接收方。安全
這裏首先咱們要明確 RabbtiMQ 服務器是不可以產生數據的,正如同其名字——消息中間件,是一個用來傳遞消息的中間商。生產者產生建立消息,而後發佈到代理服務器(RabbitMQ),而消費者則從代理服務器獲取消息(不是直接找生產者要消息),並且在實際應用中,生產者和消費者也是能夠角色互相轉換的,因此當咱們應用程序鏈接到 RabbitMQ 服務器時,必需要明確我是生產者呢仍是消費者。服務器
生產者建立消息,而後發佈到 RabbitMQ 服務器中,那麼什麼是消息?微信
這裏的消息分爲兩部分:有效內容和內容標籤。app
①、有效內容:能夠是任何內容,一個數組,一個集合,甚至二進制數據均可以。RabbitMQ 不會在乎你發什麼數據,儘管發就好了。負載均衡
②、內容標籤:描述有效內容,是 RabbitMQ 用來決定誰將得到消息。前面說的郵件通訊,必須明確指定發送方地址和收件方地址,而基於 AMQP 協議的 RabbitMQ 則是經過生產者發送消息附帶的內容標籤將消息發送個感興趣的消費者。maven
後面咱們會詳細解析標籤是什麼,這裏只須要知道生產者會建立消息並設置標籤。注意最上面的大圖,通常來講生產者建立消息會設置標籤,可是傳輸到消費者那裏就沒有標籤了,除非你在有效內容中說明誰是生產者,通常消費者是不知道誰產生的消息的。
生產者產生了消息,而後發佈到 RabbitMQ 服務器,發佈以前確定要先鏈接上服務器,也就是要在應用程序和rabbitmq 服務器之間創建一條 TCP 鏈接,一旦鏈接創建,應用程序就能夠建立一條 AMQP 信道。
信道是創建在「真實的」TCP 鏈接內的虛擬鏈接,AMQP 命令都是經過信道發送出去的,每條信道都會被指派一個惟一的ID(AMQP庫會幫你記住ID的),不管是發佈消息、訂閱隊列或者接收消息,這些動做都是經過信道來完成的。
可能有人會問,爲何不直接經過 TCP 鏈接來發送AMQP命令呢?
這裏緣由是效率問題,由於對於操做系統來講,每次創建和銷燬 TCP 會話是很是昂貴的開銷,而實際系統中,好比電商雙十一,每秒鐘高峯期成千上萬條鏈接,通常來講操做系統創建TCP鏈接是有數量限制的,那麼這就會遇到瓶頸。
引入信道的概念,咱們能夠在一條 TCP 鏈接上建立 N多個信道,這樣既能發送命令,也可以保證每條信道的私密性,咱們能夠將其想象爲光纖電纜。
截取上面的一部分圖:
交換器和隊列都是 RabbitMQ 服務器的一部分,咱們知道生產者會將消息發送到 RabbitMQ 服務器,而進入該服務器後,首先進入交換機部分,而後由交換器根據消息附帶的內容標籤,將消息綁定到相應的隊列。咱們首先來看什麼是隊列:
①、容納消息的場所,生產者發送到RabbitMQ服務器的消息會在隊列中等待消費者消費。
②、隊列是 RabbitMQ 服務器中最後的終點(除非消息進入了黑洞,黑洞的概念下面會介紹)。
③、隊列能夠實現負載均衡,咱們能夠增長一堆消費者,而後讓 RabbitMQ 以循環的方式來均勻的分配消息。
搞清楚了隊列是什麼了,那麼消息是如何到達隊列的呢?沒錯,就是經過交換器。
消息進入RabbitMQ 服務器時,會首先將消息發送到交換器,而後交換器會根據特定的路由算法以及消息的內容標籤將消息綁定到相應的隊列。在 AMQP 協議中有四種交換器:direct、fanout、topic和 headers,每種交換器都實現了不一樣的路由算法,這也對應 RabbitMQ 工做的幾種不一樣方式,這是重點,後面博客會進行詳細介紹。
最上面那張大圖,我畫了虛擬主機A以及虛擬主機B,說明在 RabbitMQ 服務器中存在着多個虛擬主機,那麼虛擬主機究竟是什麼?
首先咱們拋出這樣一個問題,一個 RabbitMQ 確定不是隻服務一個應用程序,那麼多個應用程序同時使用 RabbitMQ 服務器,如何保證彼此之間不會衝突?
答案就是使用虛擬主機,虛擬主機其實就是一個迷你版的RabbitMQ 服務器,它擁有本身的交換器和隊列,更重要的是虛擬主機擁有本身的權限機制,一個服務器可以建立多個虛擬主機。那麼咱們在使用RabbitMQ服務器的時候,只須要將一個應用程序對應一個虛擬主機,這種各個實例間邏輯上的分離就可以保證不一樣的應用程序安全的傳遞消息。
默認的虛擬主機是「/」。
介紹完RabbitMQ 消息通訊過程當中的一些基本概念後,下面咱們經過一個代碼實例來實際感覺一下。
這是一個Maven工程,首先咱們看 pom.xml 文件:導入 amqp-client 依賴便可
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ys.rabbitmq</groupId> <artifactId>RabbitMQTest</artifactId> <version>1.0-SNAPSHOT</version> <packaging>war</packaging> <name>RabbitMQTest Maven Webapp</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> </dependencies> </project>
生產者:
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.ys.utils.ConnectionUtil; 6 7 /** 8 * Create by hadoop 9 */ 10 public class Send { 11 private final static String QUEUE_NAME = "hello"; 12 13 public static void main(String[] args) throws Exception{ 14 //一、獲取鏈接 15 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 16 //二、聲明通道 17 Channel channel = connection.createChannel(); 18 //三、聲明(建立)隊列 19 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20 //四、定義消息內容 21 String message = "hello rabbitmq "; 22 //五、發佈消息 23 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 24 System.out.println("[x] Sent'"+message+"'"); 25 //六、關閉通道和鏈接 26 channel.close(); 27 connection.close(); 28 } 29 }
消費者:
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.QueueingConsumer; 6 import com.ys.utils.ConnectionUtil; 7 8 9 /** 10 * Create by hadoop 11 */ 12 public class Recv { 13 14 private final static String QUEUE_NAME = "hello"; 15 16 public static void main(String[] args) throws Exception{ 17 //一、獲取鏈接 18 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 19 //二、聲明通道 20 Channel channel = connection.createChannel(); 21 //三、聲明隊列 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 //四、定義隊列的消費者 24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 25 //五、監聽隊列 26 channel.basicConsume(QUEUE_NAME,true,queueingConsumer); 27 //六、獲取消息 28 while (true){ 29 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [x] Received '" + message + "'"); 32 } 33 } 34 35 }
工具類:ConnectionUtil
1 package com.ys.utils; 2 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 /** 7 * Create by hadoop 8 */ 9 public class ConnectionUtil { 10 11 public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{ 12 //一、定義鏈接工廠 13 ConnectionFactory factory = new ConnectionFactory(); 14 //二、設置服務器地址 15 factory.setHost(host); 16 //三、設置端口 17 factory.setPort(port); 18 //四、設置虛擬主機、用戶名、密碼 19 factory.setVirtualHost(vHost); 20 factory.setUsername(userName); 21 factory.setPassword(passWord); 22 //五、經過鏈接工廠獲取鏈接 23 Connection connection = factory.newConnection(); 24 return connection; 25 } 26 }