這些教程介紹了使用RabbitMQ建立消息傳遞應用程序的基礎知識。您須要安裝RabbitMQ服務器才能完成教程html
RabbitMQ是一個消息代理:它接受和轉發消息。你能夠把它想象成一個郵局:當你把你想要發佈的郵件放在郵箱裏時,你能夠肯定先生或女士郵遞員最終將郵件發送給你的收件人。java
在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。git
RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受,存儲和轉發二進制數據塊 - 消息。github
英文原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.htmlspring
在本教程的這一部分中,咱們將用Java編寫兩個程序; 一個發送單個消息的生產者,以及接收消息並將其打印出來的消費者。api
咱們將詳細介紹Java API中的一些細節,專一於這個很是簡單的事情,以便開始使用。 這是一個消息傳遞的「Hello World」。數組
在下圖中,「P」是咱們的生產者,「C」是咱們的消費者。 中間的盒子是一個隊列 - RabbitMQ表明消費者保存的消息緩衝區。緩存
RabbitMQ 有許多不一樣語言的RabbitMQ客戶端。 這裏咱們將使用RabbitMQ提供的Java客戶端。服務器
下載RabbitMQ提供的Java客戶端以及它的依賴(SLF4J API and SLF4J Simple)異步
將這些文件複製到工做目錄中,並跟着教程複製Java文件。
Tips:
RabbitMQ Java客戶端也位於中央Maven存儲庫中,其中包含groupId com.rabbitmq和artifactId amqp-client。
請注意SLF4J Simple對於教程來講已經足夠了,可是您應該在生產環境中使用像Logback 這樣的完整日誌庫。
關於Maven 等其餘下載方式請移步
Java Client
- On Maven Central: RabbitMQ Java client.
- Quick download: Binary | Source
- Javadoc
- All Java client downloads
- Older versions
咱們擁有Java客戶端及其依賴關係,那麼咱們接下來開始寫代碼。
因爲是學習使用RabbitMQ,咱們這裏使用 STS 來寫這個項目。
1. 打開STS,新建一個名字叫作 RabbitMQ_HelloWorld_Sample 的 Java Project。
2. 新建一個叫作libs的文件夾,咱們將上面三個jar 複製到咱們的項目中,而後並添加依賴
完成後項目結構如圖所示
3. 編寫生產者類
咱們會打電話給咱們的消息發佈者(發送者)發送和咱們的消息使用者(接收者)Recv。 發佈者將鏈接到RabbitMQ,發送一條消息,而後退出
建立Send.java 文件,關於代碼的講解在註釋裏:
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { //設置消息隊列的名稱 private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws java.io.IOException, TimeoutException{ /** * 建立一個到RabbitMQ Server 的鏈接 * 鏈接抽象出套接字鏈接,併爲咱們處理協議版本協商和身份驗證等。 * 在這裏,咱們鏈接到本地機器上的代理 - 所以是本地主機。 * 若是咱們想鏈接到另外一臺機器上的代理,咱們只需在此指定其名稱或IP地址。 * */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 接下來咱們建立一個Channel 對象,這是大部分用於完成任務的API駐留的地方。 * 要想發送出去,咱們必須聲明一個隊列來執行發送,那麼咱們能夠將消息發佈到隊列中: * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //聲明一個隊列是冪等的 - 只有當它不存在時纔會被建立。 //消息內容是一個字節數組,因此你能夠編碼任何你喜歡的地方。 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //最後咱們關閉這些鏈接對象 channel.close(); connection.close(); } }
執行成功後你將看到這樣的內容:
Tips: ,
若是這是您第一次使用RabbitMQ,而且您沒有看到「已發送」消息,那麼您可能會抓住您的頭腦,想知道會出現什麼問題?
也許代理啓動時沒有足夠的可用磁盤空間(默認狀況下它至少須要200 MB空閒空間),所以拒絕接受消息。
檢查代理日誌文件以確認並在必要時減小限制。 配置文件文檔將告訴你如何設置disk_free_limit。
還有一種多是你的RabbitMQ 沒有啓動,執行下面命令再次嘗試便可。
rabbitmq-service.bat start
4. 編寫消費者
這就是咱們的出版商。 咱們的消費者推送來自RabbitMQ的消息,所以與發佈單個消息的發佈者不一樣,咱們將繼續運行以收聽消息並將其打印出來。
建立一個Recv.java 文件,代碼講解在註釋裏面
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 消費者 * 額外的DefaultConsumer是一個實現Consumer接口的類,咱們將使用它來緩存由服務器推送給咱們的消息。 * */ public class Recv { private final static String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, TimeoutException { /** * 設置與發佈者相同; * 咱們打開一個鏈接和一個通道,並聲明咱們將要使用的隊列。 * 請注意,這與發送發佈到的隊列相匹配 * */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 請注意,咱們也在這裏聲明隊列。 * 由於咱們可能會在發佈者以前啓動消費者,因此咱們但願在咱們嘗試使用消息以前確保隊列已存在。 * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /** * 咱們即將告訴服務器將隊列中的消息傳遞給咱們。 * 因爲它會異步推送消息,所以咱們以對象的形式提供回調,該消息將緩衝消息,直到咱們準備好使用它們。 * 這是一個DefaultConsumer子類的做用。 * */ Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
執行成功後
咱們能夠看到咱們的消費者收到了消息隊列生產者剛發佈的消息。
本篇完~