介紹
RabbitMQ是做爲一個消息代理中間件,其設計的目的很簡單:收集消息而後轉發消息。你能夠把它當成一個郵局:當你發送郵件到郵箱時你確定相信郵遞員確定會把這封郵件送到收件人手上。在這個比喻中,RabbitMQ實際充當了郵箱、郵局以及郵遞員的角色。
與郵局最大的不一樣是,RabbitMQ它不處理紙張文件,而是負責接收,儲存以及轉發二進制數據message。
在詳細介紹以前,咱們首先了解一些RabbitMQ的術語:html
消息隊列:消息隊列存在RabbitMQ server端中;雖然消息流經你的應用程序和RabbitMQ server,但他們只能夠儲存在RabbitMQ server隊列中。隊列是不受任何限制的,它能夠儲存你發送的全部消息,由於本質上它就是一個無限緩衝區。能夠多個生產者同時發送消息給一個隊列,同時 也能夠多個消費者從一個隊列接收數據。如下圖來表示消息隊列:數組
消費者:咱們稱大部分時間都在等待接收消息的程序端爲消費者;如下圖來表示消費者:」Hello world「
在本節咱們將用java編寫兩個程序:一個生產者用來發送消息以及一個消費者用來接收消息而且打印出來。在實現過程當中咱們主要關注如何實現功能而對於java API的細節一筆帶過。
在下面這個圖中,」P」表明生產者,」C」表明消費者,在中間的盒子表示隊列(一個RabbitMQ用來保存消息的緩衝區)。
基於java的客戶端:
RabbitMQ遵循AMQP協議(高級消息隊列協議)–這是一個開放的、通用的消息協議。如今網上已存在不少基於的不一樣語言實現的AMQP客戶端。在本例中咱們使用java語言的客戶端。maven引入以下:緩存
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-client</artifactId>
<version>1.3.0</version>
</dependency>
生產者
在這個例子中,咱們分別以Send和Recv來表示生產者、消費者。Send主要用來鏈接RabbitMQ server及發送一個消息,以後退出。
下面是Send.class實現:服務器
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { //--------------1-------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //--------------2----------------- channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //---------------3---------------- channel.close(); connection.close(); } }
代碼的第一部分主要建立一個與RabbitMQ服務器的鏈接,該連接抽象了socket操做,負責判斷協議版本和身份認證等等。另外由於咱們把消息 代理broker搭建在本地,因此鏈接的地址是localhost。若是你想鏈接到其餘機器的代理broker,你能夠指定它的域名或者IP地址。
在第二部分,咱們建立了一個通道;通道爲咱們提供了不少API用於完成消息的發送。好比,咱們能夠聲明一個隊列用來發送消息以及把消息發佈到聲明的隊列。
值得注意的是,聲明一個隊列是冪等的,只有在隊列不存在的時候才建立。另外因爲消息的內容是二進制的字節數組,因此收到數據後你能夠編碼成任意你須要的數據。
最後,也就是第三部分,咱們須要關掉通道channel以及鏈接。
發送失敗
若是你是第一次使用RabbitMQ,那麼當你沒有看到發送的消息的時候,你可能會撓頭到底想問題出在了哪呢?沒關係,有多是broker(代理,之後 統稱broker)沒有足夠的磁盤空間(默認狀況下它至少須要1Gb的剩餘空間)所以拒絕接收任何消息。你能夠查看broker的日誌文件來確認而且減小 一下限制。地址http://www.rabbitmq.com/configure.html#config-items裏說明了如何設置 disk_free_limit.
異步
消費者
下面實現咱們的消費者,它主要用來持續監聽RabbitMQ推送消息而且打印。
下面是Recv.class的實現:socket
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { //-------------1------------------- ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //--------------2------------------- QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
QueueingConsumer類主要用來緩存RabbitMQ服務器推送的消息。
創建Recv的過程跟send相似;首先建立一個到服務器的鏈接和一個通道channel(後面統一叫channel),以及定義一個咱們將要消費的隊列。注意隊列必須與send的匹配,也就是名字要同樣。
在第一部分,咱們聲明瞭一個隊列;另外因爲咱們可能在sender運行前啓動了Recv,所以咱們要確保在咱們消費消息前隊列已經存在。
在第二部分中咱們告訴服務器要把QUEUE_NAME隊列中的消息推送給咱們。由於服務器是異步推送消息給咱們,所以咱們須要提供一個回調對象QueueingConsumer來緩存消息直到被程序消費掉。
QueueingConsumer.nextDelivery()會堵塞直到RabbitMQ服務器推送消息過來。maven