RabbitMQ是一個消息代理,主要的想法很簡單:它接收並轉發消息。你能夠把它當作一個郵局,當你發送郵件到郵筒,你相信郵差先生最終會將郵件投遞給收件人。RabbitMQ在這個比喻裏,是一個郵筒,郵局和一個郵遞員。
RabbitMQ和郵局最大的不一樣是,RabbitMQ不處理紙張,而是接收、存儲和轉發數據的二進制形式。
RabbitMQ和普通消息,使用的一些術語。
生產:意味着發送,發送消息的程序是生產者,生產者以下:
html
隊列:是一個郵箱的名稱。它在RabbitMQ裏面。雖然消息流經RabbitMQ和應用程序,可是他們只能存儲在隊列裏面。一個隊列不受任何限制的約束,只要你喜歡你能夠存儲儘量多的東西,它本質上是一個無限的緩衝區。不少生產者能夠發送消息到一個隊列,不少消費者能夠從一個隊列獲取數據。隊列以下:
java
消費者:也有相似的含義接收,一個消費者是一個程序,主要是等待接收消息,消費者以下:
git
注意:生產者、消費者和代理不必定在一臺機器上,事實上不少應用中也是如此。github
下面咱們將用Java編寫兩個程序,生產者發送一個消息,消費者接收消息並打印出來。數組
在下圖中,」P「是咱們的生產者,」C「是咱們的消費者,在中間的盒子是一個隊列——即RabbitMQ維持的一個消息緩衝區。
緩存
The Java client library
RabbitMQ支持不少種協議,本教程使用AMQP 0-9-1,這是一個開發的通用的消息協議,RabbitMQ的客戶端支持不少種語言,這裏咱們將使用RabbitMQ提供的Java客戶端。異步
下載客戶端庫包,並解壓到工做目錄:socket
$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./
固然,RabbitMQ也在maven中央倉庫中:maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.1</version> </dependency>
如今咱們有了Java客戶端和依賴,咱們能夠寫一點代碼了。ide
咱們稱呼咱們的消息發送者爲send,接受者爲recv,發送者將鏈接到RabbitMQ,發送一條消息,而後退出。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
建立這個類,並命名隊列:
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
而後咱們建立一個到服務的鏈接:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
這個鏈接虛擬化了socket鏈接,這裏咱們在本地鏈接上代理,若是咱們想鏈接其餘機器上的代理,咱們能夠改變name和IP地址便可。
而後咱們建立了一個channel——提供了不少API供咱們獲取東西的所在。
爲了發送消息,咱們定義了一個隊列,而後咱們能發佈消息到隊列上。
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
定義一個冪等的隊列——只能在未存在的狀況下建立。消息體是一個字節數組。
最後咱們關閉通道和鏈接:
channel.close(); connection.close();
以上就是Send.java的全部代碼。
tips:
send無論用!在發送消息以後,在RabbitMQ的後臺並無看到發送消息,你可能認爲是程序錯了,也許是代理沒有足夠的空間了(默認是須要1G的),因此拒絕接受消息。檢查代理的logfile確認緣由所在。而後去配置文件設置disk_free_limit便可。
咱們的接受者從RabbitMQ拉取消息,因此不像發送一條消息的發送者,咱們必須保持監聽,而後打印出來。
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer實現了Consumer接口——咱們用來緩衝服務推給咱們的消息。
實現和sender差很少,咱們打開一個鏈接和通道,定義一個咱們即將消費的隊列,和send發佈的隊列同樣。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
這裏咱們定義了一個隊列,由於咱們也許會在sender以前啓動receiver,咱們須要確認這個隊列在咱們消費以前就已經存在了。
咱們讓服務從隊列裏面給咱們傳遞消息,自從他將要異步的推送消息,咱們提供了一個緩存消息的回調方法,知道咱們去用他,這就是DefaultConsumer作的事情。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是所有的Recv.java。
原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
代碼地址:https://github.com/aheizi/hi-mq
相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之發佈訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠程過程調用(RPC)