在第一個教程裏面,咱們寫了一個程序從一個有名字的隊列中發送和接收消息,在這裏咱們將要建立一個分發耗時任務給多個worker的任務隊列。
html
任務隊列核心思想就是避免執行一個資源密集型的任務,而程序要等待其執行完畢才能進行下一步的任務。相反地咱們讓任務延遲執行,咱們封裝一個task做爲消息,並把它發送至隊列,在後臺運行的工做進程將彈出的任務,並最終執行做業。當運行多個worker的時候,task將在他們之間共享。java
在前一節中咱們發送一個包含「HelloWorld!」的消息,如今咱們發送字符串表明一個複雜的任務,咱們沒有一個真實的任務,好比格式化圖片大小等等,因此咱們使用Thread.sleep()表明一個執行時間較長的任務,這裏咱們使用幾個點來表明任務的複雜度,每個點表明任務執行一秒的時間,好比hello...就表明執行了3秒。
咱們稍微改變一個上一節中的Send.java,容許從命令行發送任意的消息,程序將從咱們的工做隊列中執行任務,因此命名爲NewTask.java:git
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
如下是幫助從命令行參數獲取消息體的代碼:github
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
上一節中的Rece.java也須要少量改變:須要僞造一個根據點來執行多少秒的任務。它將處理傳送過來的消息,而且執行任務,命名爲Worker.java:shell
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(TASK_QUEUE_NAME, true, consumer);
模擬執行時間的任務:緩存
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
編譯:服務器
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
使用任務隊列的優勢之一就是很容並行化一個work,若是咱們產生了工做積壓,咱們能夠很簡單的增長worker的數量,來解決問題。
首先,讓咱們嘗試在同一時間運行兩個工人實例。他們都將在隊列中獲得消息,但究竟如何?讓咱們來看看。
您須要三個控制檯打開。兩個將運行輔助程序。這些控制檯將是咱們的兩名消費者 - C1和C2。app
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
在第三個,咱們將發佈新的任務。一旦你開始運行消費者就能夠發佈幾條消息:ide
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
咱們來看看它是怎樣將任務非配給咱們的worker的:fetch
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認狀況下,RabbitMQ將發送每個在序列中的消息到下一個消費者,平均而言,每個消費者將得到相同數量的消息,發佈這種消息的方式叫循環調度。
作一個任務須要幾秒鐘,那麼當一個消費者執行任務到一半的時候掛了怎麼辦?在咱們的當前代碼裏面,一旦消息傳送給咱們的消費者,消息就從存儲中刪除了。在這種狀況下,若是Kill了一個worker,咱們不只僅失去了它正在執行的消息任務,並且咱們將失去全部分配給它,可是還沒執行的消任務。
可是咱們不想丟失任何消息,若是一個worker掛掉,咱們將分配這些任務給其餘的消費者。
爲了確保消息不會丟失,RabbitMQ支持消息確認。一個ACK(nowledgement)從消費者發送給RabbitMQ一個消息確認當前消息已被接收和處理,RabbitMQ可自由將其刪除。
若是消費者死亡(其信道被關閉,關閉鏈接,或TCP鏈接丟失),而不發送ACK,RabbitMQ知道消息並無被接收和執行徹底,將從新將它放入隊列。若是同一時間存在其餘在線的消費者,它將迅速從新傳遞消息給另外一個消費者。這樣,你能夠確定沒有消息丟失,即便偶爾的消費者死亡。
目前沒有任何消息超時,當消費者掛掉的時候,RabbitMQ將從新傳遞消息,即便處理一個消息須要很長很長的時間也不要緊。
消息確認默認狀況下開啓。在前面的例子中,咱們明確地經過AUTOACK = true標誌將它們關閉。如今是時候刪除此標誌,一旦咱們與任務完成,將從worker發送適當的確認。
channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } };
使用此代碼,咱們能夠確定,即便你使用CTRL + C,殺死一個worker,什麼都不會丟失。worker死亡後不久,全部未確認的消息會被從新傳遞。
被遺忘的確認
忘記baseACK是一個常見的錯誤,這是個簡單的錯誤,可是後果是很嚴重的。當你的客戶端退出的時候(可能看起來就像是隨機交還)消息將被從新傳遞,但RabbitMQ會消耗的愈來愈多的內存,它將沒法釋聽任何unacked的消息。
爲了調試這種錯誤,你可使用rabbitmqctl打印messages_unacknowledged字段。
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是若是RabbitMQ的服務器中止,咱們的任務仍然會丟失。
當RabbitMQ的退出或崩潰時,除非你告訴它不要忘記的隊列和消息。兩件事情都須要確保,消息纔不會丟失:咱們須要將隊列和消息持久化。
首先,咱們須要確保的RabbitMQ永遠不會失去咱們的隊列。爲了作到這一點,咱們須要把它聲明爲持久:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然以上的代碼自己是對的,可是它不會對咱們的目前設置起做用。這是由於咱們已經定義了一個名爲hello的不持久的隊列,RabbitMQ不容許你使用不一樣的參數從新定義現有隊列,並會返回一個錯誤。可是有一個快速的解決
辦法 - 讓咱們與聲明不一樣名稱的隊列,例如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
queueDeclare變化須要被施加到生產者和消費者代碼二者。
在這一點上咱們確保即便RabbitMQ的重啓task_queue隊列也不會被丟失。如今,咱們須要咱們的消息標記爲持久性 - 經過設置MessageProperties(實現BasicProperties)的值PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意消息持久性
將消息標記爲持久性並不能徹底保證信息不會丟失。雖然RabbitMQ消息將保存到磁盤,可是有很短的時間內RabbitMQ已經接受了消息,可是還沒來得及保存它。此外,RabbitMQ沒有爲每條消息作FSYNC(2) - 它可能只是保存到緩存,並無真正寫入磁盤。持久性的保證不強,可是對於咱們簡單的任務隊列仍是綽綽有餘的。若是你須要一個更強有力的保證,那麼你可使用publisher confirms。
你可能已經注意到,調度仍然沒有徹底按照咱們真正想要的工做。舉個例子,好比有兩個消費者的狀況,當奇數的消息很是重,可是偶數的消息很是輕的時候,一個消費者將被累死,而另外一個卻閒着。RabbitMQ殊不知道,仍然在均勻的給每一個消費者發送消息。
這種狀況發生是由於RabbitMQ只負責分發進入到隊列的消息,它不看爲消費者未確認的消息的數量。它只是盲目分派每第n個消息給第n消費者。
爲了杜絕那種狀況,咱們可使用basicQos方法與prefetchCount = 1設置。它告訴RabbitMQ不要把多個消息在同一時間給一個消費者。或者,換句話說,只有消費者處理而且確認前一個消息以後纔會給它分配下一個消息,相反,消息將被非配給下一個不處於忙碌的消費者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意隊列大小
若是全部的worker都在忙,你的隊列也填滿了。您將要留意的是,也許添加更多的worker,或者有一些其餘的策略。
NewTask.java
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
原文地址:RabbitMQ之Work Queues
代碼地址:https://github.com/aheizi/hi-mq
相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之發佈訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠程過程調用(RPC)