原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.htmlhtml
RabbitMQ是一個接收並轉發消息的消息代理。你能夠把它當成是一座郵局,當你把想要發出的郵件放到信箱裏以後,郵遞員會把它運送到你指定的收件人那裏。 RabbitMQ就是郵局、信箱和郵遞員的集合。java
RabbitMQ和郵局的最大區別在於它處理的是消息——也就是二進制的數據,而非信件。數組
在RabbitMQ中使用了一些術語:緩存
注意生產者、消費者以及消息代理不須要部署在同一臺主機上,並且大多數狀況下它們也確實不在一塊兒。另外,一個程序也不能同時當生產者和消費者。併發
在本章節咱們將編寫兩段Java程序:一個發送消息的生產者和一個接收消息並打印的消費者。咱們將僅着眼於那些須要用到的簡單的東西而忽略一些細節。畢竟這只是一個入門的例子。異步
在下邊這張圖中,「P」是生產者,「C」是消費者。中間的方框是一個隊列—— RabbitMQ爲消費者保留的消息緩衝區。socket
咱們將消息發送者成爲==Send==,將消息接收者稱爲==Recv==。消息發送者將鏈接到RabbitMQ併發送一條消息,而後退出。編碼
在==Send.java==中,咱們須要導入一些類3d
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
設置類並命名隊列:代理
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ... } }
而後咱們建立一個鏈接到Rabbitmq服務端:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { }
Connection類抽象了底層的socket鏈接,並幫咱們處理包括協議版本協商和受權的一系列事情。這裏咱們鏈接的是本地的RabbitMQ,因此使用了localhost。若是咱們想要鏈接到其餘機器的RabbitMQ,咱們須要將localhost改爲對應的域名或者IP。
接下來咱們建立一個channel,咱們大多數的工做均可以經過這個類提供的API完成。注意咱們可使用try-with-resources語法來建立,由於==Connection==和==Channel==都實現了==java.io.Closeable==接口。這樣咱們就不須要手動關閉它們了。
爲了發送消息,咱們還須要聲明一個隊列,全部的這些代碼都包裹在try-with-resources語句塊中
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
聲明隊列的代碼是冪等的——它只有在不存在的時候纔會建立。它能夠接收的消息爲字節數組,因此你能夠任意編碼你的消息。
下邊是Send.java的完整代碼
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
消息發送不成功!
若是這是你第一次使用RabbitMQ而且沒有看到"Sent"消息,你可能抓耳撓腮想要知道哪裏出了問題。或許是RabbitMQ啓動的時候磁盤空間不足(默認狀況下至少須要200MB的剩餘空間),因此它拒絕接收消息。檢查RabbitMQ的日誌文件查明緣由,並在須要的時候調低這個限制。這份配置文檔會告訴你怎麼設置disk_free_limit參數。
咱們的消費者不像生產者那樣只須要發送一條消息,它須要一直監聽來自RabbitMQ的消息,因此咱們須要保持它持續運行接收消息並打印。
==Recv.java==的代碼與Send.java的代碼很相近:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
多出來的DefaultConsumer類實現了 Consumer接口,咱們將會用它來接收來自RabbitMQ的消息。
相應的配置與生產者相同:咱們開啓一個 connection和一個 channel,並聲明咱們想要消費的隊列。注意這裏隊列的名字必須與生產者中的隊列名字相同。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); } }
注意這裏咱們也聲明瞭一個隊列。由於咱們的消費者可能比生產者先啓動,因此在咱們消費消息以前須要先確保隊列存在。
爲何咱們不使用try-with-resource語句來自動關閉channel和connection?由於若是這樣作的話,咱們的程序會自動關閉關閉這些資源並退出!而實際上咱們須要這個程序持續運行以異步監聽消息的到達。
以後咱們須要告訴RabbitMQ服務端將隊列中的消息發送給咱們。因爲RabbitMQ會將消息異步發送給咱們,因此咱們提供了一個回調方法來幫咱們緩存消息,直到咱們去處理這些消息。這就是DeliverCallback類所作的事情。
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
下邊是Recv.java的完整代碼
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }