原文地址:http://www.rabbitmq.com/getstarted.html 翻譯得很差,歡迎指出。html
RabbitMQ是一個消息代理(或者說消息隊列),它的主要意圖很明顯,就是接收和轉發消息。你能夠把它想象成一個郵局:當你把一封郵件放入郵箱,郵遞員會幫你把郵件送到收件人的手上。在這裏,RabbitMQ就比如一個郵箱、郵局或者郵遞員。java
RabbitMQ和郵局的主要區別在於,RabbitMQ不是處理郵件,而是接收、存儲和將消息以二進制的方式轉發出去。shell
在這裏,咱們先說明一些RabbitMQ中涉及到的術語。編程
生產者(Producer)。生產表示只負責發送的意思,一個只負責發送消息的程序稱爲一個生產者,咱們經過一個P來表示一個生產者,以下圖:
windows
隊列(Queue)。隊列就比如一個郵箱,它在RabbitMQ的內部。雖然消息在RabbitMQ和程序之間傳遞,可是它們是存儲在隊列中的。一個隊列沒有大小的限制,你想存儲多少條消息就存儲多少條,它的本質是一個無限大的緩衝區。任何生產者均可以往一個隊列裏發送消息,一樣的,任何消費者也能夠從一個隊列那接收到消息。咱們用下圖來表示一個隊列,隊列上面的文字表示這個隊列的名字:
數組
消費者(Consumer)。接收和發送的過程很相似,一個消費者程序一般是等待別人發送消息給它。咱們經過一個C來表示一個消費者,以下圖:
緩存
注意一點,消費者、生產者和消息隊列能夠不用運行在同一臺機器上。實際上,在大多數的應用程序中,它們並非在同一臺機器上運行的。安全
在這一小節中,咱們將編寫兩個Java程序,一個做爲生產者,發送一條簡單的消息;另外一個做爲消費者,接收並打印出接收到的消息。在這裏,咱們先不討論Java API的具體細節,而是先編寫出一個可運行的「Hello World」程序。服務器
在下圖中,「P」表示一個生產者,"C"表示一個消費者,中間的矩形表示一個隊列。併發
RabbitMQ會涉及許多協議,其中AMQP協議是一個開放式的、通用的消息協議。RabbitMQ支持不少編程語言,咱們這裏經過RabbitMQ提供的Java客戶端來進行演示。請自行下載、安裝RabbitMQ和相關jar包。
在下面的代碼中,咱們用Send來表示一個發送端,用Recv來表示一個接收端。發送端會先鏈接RabbitMQ,併發送一個簡單的消息後關閉。
對於Send而言,咱們須要導入一些相關的類:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
首先,先創建一個類,併爲隊列起一個名:
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ... } }
而後咱們建立一個connection:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
connection是一個抽象的sockect鏈接,在鏈接時,咱們須要注意一下登陸認證信息。在這裏咱們鏈接到本地機器,因此填寫上「localhost」。若是咱們想要鏈接到其餘機器上,只須要填寫上其餘機器的IP地址便可。
接下來,咱們建立一個channel,大多數的API均可以經過這個channel來獲取。
若是要發送消息,咱們必須先聲明一個隊列,而後咱們才能夠把消息發送到這個隊列裏去:
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
聲明一個隊列後,僅當隊列不存在時,隊列纔會被建立。另外,消息的內容是一個字節數組,因此咱們須要先對消息進行編碼。最後,咱們須要關閉鏈接。
channel.close(); connection.close();
以上就是Send.java類的全部代碼
發送端運行不起來
若是你是第一次使用RabbitMQ,並且沒有看到打印的「Sent」信息,你可能會奇怪是哪裏出問題了。這裏有多是RabbitMQ在啓動時沒有足夠的磁盤空間(默認狀況下最少須要1Gb的空間),因此它會拒絕接收任何消息。經過查看日誌文件能夠肯定是不是由這個緣由所形成的。若是有必要,咱們也能夠經過修改配置文件中的disk_free_limit來下降大小的限制。
上面的代碼是發送端的。咱們的接收端是從RabbitMQ那獲取消息,因此並非像發送端那樣發送一個簡單的消息,而是須要一直監聽獲取消息,並打印輸出。
對於接收端而言,它須要導入以下的類:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer是一個實現了Consumer接口的類,咱們可使用它來獲取發送過來的消息。
跟Send同樣,咱們先建立一個connection和channel,並聲明一個接收消息的隊列。注意一點,這裏的隊列名要和Send的相匹配。
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
在這裏咱們一樣聲明瞭一個隊列,由於咱們可能在啓動發送端以前就先啓動了接收端,在咱們開始接收消息以前,咱們要先確認隊列是否存在。
咱們告訴RabbitMQ經過這個隊列給咱們發送消息,所以RabbitMQ會異步的給咱們推送消息,咱們須要提供一個回調對象用來緩存消息,直到咱們準備使用它。這就是DefaultConsumer所作的事情。
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是Recv.java類的全部代碼
編譯並運行以上程序,你會看到接收端將會收到並打印出來自發送端的「Hello World!」消息。
你能夠在類路徑下編譯這些文件:
$ javac -cp rabbitmq-client.jar Send.java Recv.java
爲了運行它們,你須要rabbitma-client.jar和它的依賴文件。在一個終端運行發送者:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
而後運行接收者:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
在windows環境中,咱們使用分號代替冒號來分隔classpath上的選項。
Hint
你能夠設置一個環境變量到classpath中:
$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar $ java -cp $CP Send在windows上:> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar > java -cp %CP% Send
在實際使用中,RabbitMQ並非簡單的經過指定一個IP就能夠進行鏈接的,它還須要指定端口號、用戶名和密碼。就像是這樣:
factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest");
當沒有明確指明登陸認證信息的時候,就會使用默認值來進行登陸,以上都是默認的認證信息。
另外,咱們也能夠經過設置URI來進行鏈接,URI的格式以下:
amqp://username:password@host:port factory.setUri("amqp://guest:guest@localhost:5672");
在第一部分中,咱們的程序結構很是簡單。在接下來,咱們將會建立一個工做隊列,向多個消費者分發任務。
在以前的程序中,咱們發送了一個包含「Hello World!」的消息,如今咱們發送一些字符串用來表示複雜的任務。在這裏,由於咱們沒有真正意義上的任務,好比調整圖片的大小或者渲染pdf文件,因此咱們經過Thread.sleep()函數來模擬程序的處理時間。咱們用字符串中的句號來表明它的複雜度,每個句號表示要花費一秒的工做時間。例如,一個「Hello...」的字符串就表示它須要3秒鐘的處理時間。
咱們會稍微修改咱們以前程序中的Send.java代碼,使其容許發送包含任意內容的消息。這個程序將會定時向隊列中發送任務,咱們把它命名爲NewTask.java:
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
一些幫助咱們從命令行的參數中獲取消息的函數:
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
咱們的Recv.java程序也須要做出一些修改:它須要從隊列中獲取消息,並統計消息中有多少個「.」,而後sleep相應的時間。咱們把它取名爲Worker.java:
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
編譯以上程序:
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
使用任務隊列的優點之一是任務的並行處理。若是如今積壓了一大堆任務,咱們僅須要添加更多的消費者便可,這是很容易擴展的。
首先,咱們嘗試同時運行兩個消費者實例,他們都會從隊列裏去獲取消息,以下。
你須要打開三個控制檯,其中兩個用來運行消費者程序,分別稱爲C1和C2
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
第三個控制檯用來運行生產者程序。一旦你開啓了消費者程序,就能夠啓動生產者了:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
讓咱們看看消費者獲得了什麼消息:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認狀況下,RabbitMQ會一直把消息發送給下一個消費者,平均狀況下,每一個消費者得到的消息是同樣多的。這種分發方式叫作輪詢,你能夠嘗試運行更多的消費者。
處理一個任務可能須要花費數秒時間,你可能會好奇若是一個消費者執行了一個長任務,而且在完成處理前就掛了的狀況下會發送什麼事。就拿咱們當前的代碼來講,一旦RabbitMQ將消息傳遞給消費者,消息就會從內存中刪除。在這種狀況下,若是你強行關閉正在運行的消費者,那麼它正在處理的消息就會丟失。那些發送給這個消費者但尚未開始處理的消息也會一併丟失。
可是,咱們並不想丟失任何消息。實際上,若是一個消費者掛了,咱們更但願將消息傳遞給其餘的消費者消費。
爲了保證消息不會丟失,RabbitMQ支持消息確認(acknowledgments)機制。Ack是由消費者發送的,用來告訴RabbitMQ這個消息已經接收到並處理完成,能夠從內存中刪除它了。
若是一個消費者沒有發送ack就掛了,RabbitMQ會認爲這個消息沒有處理完成並將消息從新入隊。若是這時有其餘消費者在運行,它將會把這個消息發送給另外一個消費者。經過這種方式,即便消費者掛了,也能夠確保消息不會丟失。
在這裏不存在超時的概念,只有在消費者掛了的狀況下,RabbitMQ纔會重發消息,不然就會一直等待消息的處理,即便須要花費很長的時間來處理。
消息確認機制默認狀況下是開啓的。如今咱們要關閉它,改成手動提交確認信號。當處理完一個任務後,咱們將手動提交。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
經過這段代碼,咱們能夠保證即便你經過Ctrl+C來關閉一個消費者,也不會丟失任何消息。那些沒有發送確認信號的消息將會很快被重發。
忘記發送確認信號
忘記寫basicAck是一個很廣泛的錯誤,可是這會產生嚴重的後果。當你的客戶端退出後,全部的消息將會被從新發送,RabbitMQ會愈來愈佔內存,由於它不會刪除那些沒有發送確認信號的消息。
想要調試這種類型的錯誤,你可使用rabbitmqctl打印出messages_unacknowledged屬性:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
咱們已經學習瞭如何保證在消費者掛了的狀況下,消息也不會丟失。可是若是RabbitMQ掛了,咱們的消息仍然會丟失。
當RabbitMQ退出或崩潰時,它將會丟失全部隊列和消息,除非你讓它不要這麼作。經過兩個方面能夠保證消息不會丟失:對消息和隊列進行持久化處理。
首先,咱們須要保證RabbitMQ不會丟失咱們的隊列。爲此,咱們須要將一個隊列聲明爲一個持久化的隊列:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然這個代碼是正確的,可是它卻不會執行,由於咱們已經定義了一個非持久化的hello隊列。RabbitMQ不容許使用不一樣的參數去從新定義一個已經存在的隊列,若是你強行這樣作,它將會返回一個錯誤。有一個快速的解決方案,就是從新聲明一個不一樣名字的隊列,好比task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
須要在生產者和消費者的代碼中對queuqDeclare進行修改。
至此,咱們能夠保證即便RabbitMQ重啓,task_queue隊列也不會丟失。如今,咱們須要對消息進行持久化,經過設置MessageProperties(實現了BasicProperties)的值爲PERSISTENT_TEXT_PLAIN便可。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意:
消息的持久化並不能徹底保證消息不會丟失,雖然它會告訴RabbitMQ把消息保存在硬盤上,可是從接收消息到保存消息之間,仍是須要必定的時間的。一樣,RabbitMQ沒有對每一個消息作fsync(2)——消息僅僅存在於緩存中,並無真正的寫入硬盤。因此這個持久化並非健壯的,可是對於簡單的工做隊列來講已經徹底足夠了。若是你須要更強大的持久化的話,你能夠考慮使用publisher confirms機制。
你可能會注意到,分發的過程並非咱們所但願的那樣。例如在某一狀況下有兩個消費者,RabbitMQ默認將序號爲奇數的消息發送給第一個消費者,序號爲偶數的消息發送給另外一個消費者,當序號爲奇數的消息都是一些很耗時的任務,而序號爲偶數的消息都是一些小任務,那麼就會形成第一個消費者一直處於忙碌狀態,而另外一個消費者處理完畢後會處於空等待的狀態。RabbitMQ並不會在乎這些事情,它只會一直均勻的分發消息。
這種狀況的發生是由於RabbitMQ只負責分發消息,它並不關心一個消費者返回了多少個確認信號(即處理了多少條消息),它只是盲目的將第n條消息往第n個消費者發送罷了。
爲了解決這個問題,咱們可使用basicQos方法,設置prefetchCount = 1。它將會告訴RabbitMQ不要同時給一個消費者超過一個任務,換句話說,就是在一個消費者發送確認信息以前,不要再給它發送新消息了。取而代之的是將消息發送給下一個空閒的消費者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意隊列的大小
若是全部的消費者都處於繁忙中,你的隊列很快就會被佔滿。你須要注意這件事,而且添加更多的消費者或者經過其餘策略來解決他。
最終的NewTask.java以下:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
Worker.java的代碼:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... }
在以前的示例中,咱們建立了一個工做隊列,在這以前,咱們都假設每個消息都準確的發送到一個消費者那裏。在接下來,咱們將作一些徹底不一樣的事情——將一個消息發送到多個消費者,這種模式被稱爲發佈和訂閱模式。
爲了說明這個模式,咱們將會建立一個簡單的日誌系統,它由兩部分程序組成,一個是發送日誌消息,另外一個是接收並打印日誌消息。
在咱們的日誌系統中,每個運行的接收程序都會獲取一個消息的拷貝副本。經過這種方式,咱們可讓一個消費者把日誌記錄到硬盤中,同時可讓另外一個消費者把日誌輸出到屏幕上。
在本質上,發送日誌消息至關於廣播到全部接收者。
在以前,咱們都是直接從一個隊列中發送或獲取消息。如今是時候介紹RabbitMQ中的full messaging模型了。
讓咱們快速複習下在先前部分中咱們所學的知識:
在RabbitMQ的消息模型中,核心思想是生產者不直接將消息發送給隊列。實際上,生產者甚至徹底不知道消息會被髮送到哪一個隊列中。
相反,生產者只能將消息發送到交換機上。交換機是一個很是簡單的東西,它一邊從生產者那裏接收消息,一邊向隊列推送消息。交換機必須確切的知道它想要把消息發送給哪些接收者。例如是否發送到一個特定的隊列中?仍是發送給不少個隊列?或者是把消息丟棄等等。這些東西都經過交換機的類型來規定。
交換機的類型包括:direct, topic, headers和fanout。咱們先關注fanout。讓咱們先建立一個這種類型的交換機,並稱呼它爲logs:
channel.exchangeDeclare("logs", "fanout");
fanout類型的交換機很是簡單,經過它的名字,你可能已經猜出它的用處了。它將會以廣播的方式把接收到的消息發送到它所知道的隊列中去,這個正是咱們所須要的。
交換機列表
經過使用rabbitmqctl命令,咱們能夠列出服務器中的全部交換機:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.在這個列表裏,有一些以amq.*開頭的交換機和默認(沒有名字)的交換機。這些都是默認建立的,並且你不太可能會使用到它們。
匿名交換機
在以前的部分中,咱們對交換機毫無概念,但仍然能將消息發送到隊列中,那是由於咱們使用了默認的交換機,經過使用空串("")來標識它。
回想一下以前是如何發送消息的:
channel.basicPublish("", "hello", null, message.getBytes());
這裏的第一個參數就是交換機的名字。空串表示它是默認交換機或者是匿名交換機:若是routingKey存在的話,消息將經過routingKey路由到特定的隊列中去。
如今,咱們定義本身命名的交換機:
channel.basicPublish( "logs", "", null, message.getBytes());
你可能會想起咱們以前使用的隊列是有特定的名稱的(hello和task_queue)。對於咱們來講,爲一個隊列命名是很是有必要的,咱們須要指定多個消費者到同一個隊列獲取消息。當你想在多個生產者和消費者之間共用一個隊列,那麼爲隊列命名就很是重要了。
可是對於咱們目前來講還不須要。咱們想要監聽全部日誌消息,而不是它的一個子集。一樣,咱們只會對最新的消息感興趣,而不是舊消息。爲了解決這個問題,咱們須要作兩件事。
第一,不管何時鏈接RabbitMQ,咱們都須要一個新的空隊列。爲了這樣作,咱們會建立一個隨機的名字,或者直接讓服務器給咱們一個隨機的名字。
第二,一旦咱們與消費者斷開,隊列應該被自動刪除。
在Java中,當咱們使用無參的queueDeclare(),它將會爲咱們建立一個非持久化的、專用的、自動刪除、帶隨機名字的隊列:
String queueName = channel.queueDeclare().getQueue();
在這裏,隊列名queueName的值是一個隨機產生的字符串,例如amq.gen-JzTY20BRgKO-HjmUJj0wLg。
咱們已經建立一個fanout類型的交換機和一個隊列。如今咱們須要告訴交換機發送消息到咱們的隊列裏。在這裏,把交換機和隊列之間的關係稱爲綁定。
channel.queueBind(queueName, "logs", "");
從如今開始,logs交換機將會把消息發送到咱們的隊列中去。
綁定列表
你能夠經過使用rabbitmqctl list_bindings來列出全部綁定。
生產者程序負責發送日誌消息,看起來跟之前的代碼沒有什麼區別。最重要的改變是,如今咱們把消息發送到咱們的logs交換機中,而不是匿名交換機。在發送時咱們須要指定一個routingKey,可是在使用fanout類型的交換機時,routingKey的值會被忽略。這是咱們的EmitLog.java程序:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
正如你所看到的,在創建一個connection以後,咱們聲明瞭一個交換機。這個步驟是必須的,由於RabbitMQ禁止把消息發送到一個不存在的交換機。
若是交換機上沒有綁定任何隊列的話,消息將會被丟棄。可是這個對咱們來講是能夠接受的,若是沒有消費者監聽,咱們能夠安全的丟棄消息。
ReceiveLogs.java的代碼以下:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
像以前那樣編譯程序,我已經編譯完成了。
$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java
若是你想要把日誌保存到文件中,你只須要打開控制檯並輸入:
$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
若是你但願在屏幕中看到日誌,新建一個終端並運行:
$ java -cp .:rabbitmq-client.jar ReceiveLogs
固然,爲了發送日誌,輸入:
$ java -cp .:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings,你能夠驗證這代碼確實建立和綁定了咱們想要的隊列。隨着兩個ReceiveLogs.java程序的運行,你能夠看到以下的信息:
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
這個結果簡單明瞭:數據從logs交換機發送到服務器安排的兩個隊列中去,那正是咱們所指望的。
在上一節,咱們創建了一個簡單的日誌系統,咱們能夠將日誌信息廣播到多個接收者那裏去。
接下來,咱們將要實現只訂閱部分消息。例如,咱們只將error類型的日誌信息保存到硬盤中去,固然依舊在控制檯中打印出全部的日誌信息。
在以前的例子中,咱們已經創建了綁定,你能夠回顧一下代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一個交換機跟一個隊列之間的綁定,能夠理解爲:就是這個隊列只對這個交換機發送過來的消息感興趣。
綁定還有另外的一個routingKey參數,爲了不跟basic_publish的參數搞混,咱們把它稱爲binding key。如下是咱們經過一個key建立的一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
這個binding key的意義取決於交換機的類型,咱們以前使用的fanout類型的交換機是會忽略它的值的。
咱們以前的日誌系統是將全部消息廣播到全部消費者那裏去。咱們想要擴展它,讓它能根據消息的嚴重性來進行過濾。例如,咱們只把error類型的錯誤日誌記錄到硬盤中去,而不是將硬盤空間浪費在warning或info類型的日誌上。
咱們使用的fanout類型的交換機沒有太多的靈活性,它只能無腦的進行廣播。
因此咱們將會使用direct類型的交換機來替換它。direct類型交換機的路由規則很簡單——它只將消息發送到那些binding key和routing key徹底匹配的隊列中去。
爲了說明清楚,咱們看一下下面的結構圖:
在這裏,咱們能夠看到direct類型的交換機X綁定了兩個隊列。第一個隊列經過一個orange關鍵字來綁定,第二個隊列綁定了兩個關鍵字,分別爲black和green。
經過這樣的綁定,當交換器接收到routing key爲orange的消息的時候,就會發送到隊列Q1中去;當接收到 routing key爲black或green的消息的時候,就會發送到隊列Q2中去;其餘類型的消息則會被丟棄。
用一個binding key綁定多個隊列是合法的,在上面的例子中,咱們還可使用black將X和Q1綁定起來。在這裏,direct類型的交換機就跟fanout類型的交換機同樣,會把消息發送到全部的匹配的隊列中去。一個routing key爲black的消息將會被髮送到Q1和Q2中去。
咱們將會在咱們的日誌系統中使用這種模型,使用direct類型的交換機來替換fanout類型的交換機。咱們能夠把routing key做爲日誌消息的嚴重級別,經過這種方式,接收程序就能夠選擇對應的級別進行接收。總之,讓咱們先看一下如何發送日誌:
首先,咱們須要建立一個交換機:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
而後咱們準備發送一條消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
在這裏爲了簡化,咱們先約定日誌的級別分爲info、warning和error三種。
接收消息的部分跟以前的程序沒有什麼不一樣。惟一的區別就是咱們要爲隊列建立它感興趣的binding key。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
EmitLogDirect.java的代碼:
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect.java的代碼
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
像以前那樣編譯它們。爲了方便,當咱們運行實例時,咱們使用一個$CP的環境變量來表示類的路徑。
若是你只想保存warning和error類型的日誌到文件裏去,你能夠打開控制檯,並輸入:
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
若是你想要在屏幕中看到全部的日誌信息,你能夠打開一個新的終端並作以下操做:
$ java -cp $CP ReceiveLogsDirect info warning error [*] Waiting for logs. To exit press CTRL+C
另外,例如想要發送error類型的日誌信息,能夠輸入:
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
在上一部分中,咱們改進了咱們的日誌系統,使用direct類型的交換機代替fanout類型的交換機,獲得了一個具備選擇性的日誌接收系統。
雖然使用了direct類型的交換機改進了咱們的系統,可是它依然不完善,它不能根據多個條件進行路由。
在咱們的日誌系統中,咱們可能不只想要獲得各類級別的日誌,還想要獲得日誌的發送源。你可能從syslog unix tool瞭解過這個概念,它基於severity(info/warn/crit...) 和facility (auth/cron/kern...)來路由日誌信息。
那將會給咱們帶來更多的靈活性,由於咱們可能只須要監聽那些來自cron的critical日誌,而不是全部的kern日誌。
爲了在咱們的日誌系統中實現此功能,咱們須要使用更加複雜的類型——topic類型的交換機
發送到topic類型的交換機的消息不能是隨意的routing key,它必須是一個經過點分隔的字符串列表。字符串的內容什麼均可以,可是咱們通常會給它一個有意義的名字。例如一些合法的routing key就像這樣:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。Routing key中你想要寫多少個單詞均可以,它的長度上線是255個字節。
Binding key也是同樣。Topic類型的交換機的邏輯跟direct類型的很是類似,它也是把消息發送到與routing key匹配的隊列中去。然而,它的binding key卻有兩種很是特殊的用法:
下面這個例子很容易理解:
在這個例子中,咱們發送的消息都是用來描述動物的。每一個消息的routing key都包含三個單詞,第一個單詞用來描述動物的速度,第二個用來表示顏色,第三個用來表示物種:"
咱們爲交換機和隊列建立三個綁定關係,隊列Q1經過".orange."來綁定,隊列Q2經過 "..rabbit"和"lazy.#"來綁定。
這些綁定關係能夠歸納爲:
一個routing key爲"quick.orange.rabbit"的消息將會被髮送到全部隊列。帶有"lazy.orange.elephant"的消息也會被髮往全部隊列中去。而對於"quick.orange.fox"來講,它只會被髮送到Q1隊列中,"lazy.brown.fox"只會被髮送到Q2隊列中。而"lazy.pink.rabbit"只會被髮送到Q2隊列中一次,儘管它匹配了Q2隊列的兩個routing key。對於"quick.brown.fox"來講,沒有routing key和它匹配,因此它會被丟棄。
若是咱們不遵照以前的約定,發送一條只帶一個單詞或四個單詞的消息,例如"orange"或"quick.orange.male.rabbit",會發生什麼事呢?好吧,這些消息都會由於沒有找到匹配routing key而被丟棄。
另外一方面,對於"lazy.orange.male.rabbit"來講,雖然它包含四個單詞,可是它卻和最後一個routing key("lazy.#")匹配,因此它會被髮送到Q2隊列中去。
Topic類型的交換機
Topic類型的交換機的功能十分強大,能夠媲美其餘類型的交換機。
當一個隊列的binding key爲一個「#」,它將接收全部消息,就像fanout類型的交換機同樣。
當binding key中沒有包含「*」或「#」字符的話,topic類型的交換機就至關於direct類型的交換機。
咱們將在咱們的日誌系統中使用topic類型的交換機。咱們假設日誌消息的routing key經過兩個單詞組成,就像"
代碼幾乎跟先前的同樣。
EmitLogTopic.java的代碼以下:
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... }
ReceiveLogsTopic.java的代碼以下:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
接收全部日誌類型:
$ java -cp $CP ReceiveLogsTopic "#"
從"kern"那接收日誌信息:
$ java -cp $CP ReceiveLogsTopic "kern.*"
或者只接收「critical」類型的日誌:
$ java -cp $CP ReceiveLogsTopic "*.critical"
你還能夠創建多個綁定關係:
$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
或者直接發送routing key爲"kern.critical"類型的日誌:
$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
<p><b>一些有趣的問題:</b></p> - "*"會匹配到routing key爲空的消息嗎? - "#.*"會匹配到". ."嗎?或者只匹配一個單詞嗎? - "a.*.#」和"a.#"有什麼不一樣?