摘要 在工做者之間分配任務java
RabbitMQ RabbitMQ入門python
目錄[-]git
在這第一指南部分,咱們寫了經過同一命名的隊列發送和接受消息。在這一部分,咱們將會建立一個工做隊列,在多個工做者之間使用分佈式時間任務。
工做隊列(亦稱:任務隊列)背後主要的思想是避免當即處理一個資源密集型任務而且不得不一直等待完成。相反咱們能夠計劃着讓任務後續執行。咱們將任務封裝成消息,發送到隊列中。一個工做者進程在後臺運行,獲取任務並最終執行任務。當你運行多個工做者,全部的任務將會被他們所共享。
在web應用程序中,這個理念是特別有用的,你沒法在一個短暫的http請求中處理一個複雜的任務。
在先前的指南中,咱們發送了一個包含"Hello World!「消息。如今咱們將要發送一些字符串,用來表明複雜的任務。咱們沒有一個真實的任務,好比圖片的調整大小或者pdf文件渲染,因此咱們經過Thread.sleep()
函數,假裝一個咱們是很忙景象。咱們將會把字符串中點的數量來表明它的複雜度;每個點將要花費一秒的工做。例如,一個使用Hello...
描述的假任務會發送三秒。
咱們將會輕量的修改咱們之前例子中Send.java
代碼,使其容許任意的消息能夠經過命令行發出。這個程序將要計劃安排任務到咱們的工做隊列中,因此咱們把它命名爲NewTask.java
:
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
一些幫助從命令行中獲取消息參數:
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
咱們老的Recv.java
程序也要求作些改變:它須要將消息體中每一個點假裝成一秒。從隊列中獲取消息,運行任務,因此咱們將它稱之爲Worker.java
:
while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); }
咱們假裝的任務中冒充執行時間:
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
在第一部分指南中那樣編譯它們(jar 文件須要再工做路徑上):
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
使用任務隊列的優點之一是咱們是容易並行處理。若是咱們正在處理一些堆積的文件的話,咱們僅僅須要增長更多的工做者,經過這種方式咱們是容易擴展的。
首先,讓咱們試着在同一時間運行兩個工做者實例。他們都會從隊列中獲取消息,可是具體怎樣作呢?讓咱們一塊兒來看一看。
你須要三個打開的控制平臺,其中兩個用來運行工做者程序。他們將會是咱們的兩個消費者-C1和C2。
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
在這第三個控制平臺咱們用來發布新的任務。一旦你啓動消費者,你就能夠發佈消息了:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
讓咱們看看什麼被投遞到咱們工做者那裏:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認狀況想,RabbitMQ將會把每個消息發送給下一個消費者。平均下來每一個消費者獲取的消息數量是相同的。這種分佈式消息方式被稱爲輪詢。試試三個或更多的工做者。
處理一個任務可能花費數秒時間,你可能會好奇若是一個消費者開始一個長任務,而且在處理完成部分的狀況下就死掉了會發生什麼狀況。就咱們當前的代碼來講,一旦RabbitMQ將消息傳遞給消費者,它就會當即將消息從內存中刪除。在這種狀況下,若是你殺掉一個正在處理的工做者你會丟失它正在處理的消息。咱們也同時失去了已經分配給這個工做者而且沒有開始處理的消息。
可是咱們不想丟失任何任務,若是一個工做者死掉,咱們指望將任務傳遞給另外一個工做者。
爲了保證每個消息不會丟失,RabbitMQ支持消息確認機制。一個消息確認是由消費者發出,告訴RabbitMQ這個消息已經被接受,處理完成,RabbitMQ 能夠刪除它了。
若是一個消費者沒有發送確認信號,RabbitMQ將會認定這個消息沒有徹底處理成功,將會把它傳遞給另外一個消費者。經過這種方式,即便工做者有時會死掉,你依舊能夠保證沒有消息會被丟失。
這裏不存在消息超時;RabbitMQ只會在工做者鏈接死掉才從新傳遞這個消息。即便一個消息要被處理很長很長時間,也不是問題。
消息確認機制默認狀況下是開着的。在先前的例子中咱們是明確的將這個功能關閉no_ack=True
。是時候移除這個標識了,一旦咱們完成一個任務,工做者須要發送一個確認信號。
QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
使用這段代碼,咱們能夠保證即便你將一個正在處理消息的工做者經過CTRL+C
來終止它運行,依舊沒有消息會丟失。稍後,工做者死亡後沒有發送確認的消息會被從新傳遞。
忘掉確認
這是一個廣泛的錯誤,就是忘記確認。這是一個很簡單的錯誤,可是這後果是嚴重的。當你的客戶端退出,消息會從新傳遞(看上去是隨機傳遞的),RabbitMQ會愈來愈佔用內存,由於它不會釋放哪些沒有發送確認的消息。
爲了調試這種類型的錯誤,你可使用
rabbitmqctl
打印出messages_unacknowledged
屬性:$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
咱們已經學習瞭如何在肯定消費者是否已經死掉,而且保證任務不被丟失。可是若是RabbitMQ服務器中止,咱們的任務依舊會丟失。
當RabbitMQ退出或者崩潰,它將會忘記這隊列和消息,除非你告訴它不要這樣作。兩個事情須要作來保證消息不會丟失:咱們標記隊列和消息持久化。
首先,咱們須要確保RabbitMQ不會丟失咱們的隊列,爲了這樣作,咱們須要將它聲明爲持久化:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然這命令是正確的,但它不會當即在咱們的程序裏運行。那是由於咱們已經定義了一個不持久化的hello
隊列。RabbitMQ不容許你使用不一樣的參數從新定義一個存在的隊列,若是你試着那樣作它會返回一個錯誤。有個快速的變通方案-讓咱們聲明一個不一樣名字的隊列,好比task_queue
:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
這個queuqDeclare
的改變須要應用在生產者和消費者的代碼中。
在這點上,咱們能夠保證即便RabbitMQ重啓,task_queue
隊列也不會丟失。如今咱們須要標記消息持久化 - 經過設置MessageProperties
(實現了BasicProperties
)的值爲PERSISTENT_TEXT_PLAIN
。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意消息持久化
標記消息持久化不能徹底保證消息不會被丟失,雖然這樣會告訴RabbitMQ保存消息到硬盤上。可是對於RabbitMQ依舊有個短暫的時間窗口對於接收一個消息而且尚未完成保存。一樣,RabbitMQ不能讓每一個消息同步–它可能僅僅保存在緩存中,尚未真正的寫入到硬盤中。這持久化的保證不是健壯的,可是對咱們的簡單的任務隊列來講是足夠了。若是你須要更健壯的持久化保證,你可使用出版者確認。
你可能注意到了,分發過程並無如咱們想的那樣運做。例如,在一個狀況下有兩個工做者,當全部奇數消息是重的和全部偶數是輕的,一個工做者會一直忙碌下去,而另外一個則會幾乎不作什麼事情。好吧,RabbitMQ不會在乎那個事情,它會一直均勻的分發消息。
這種狀況發生由於RabbitMQ僅僅分發消息到隊列中。它不關心有多少消息沒有由發送者發送確認信號。它僅僅盲目的將N個消息發送到N個消費者。
爲了解決這個問題,咱們可使用basicQos
方法,設置prefetchCount=1
。這樣將會告知RabbitMQ不要同時給一個工做者超過一個任務,或者換句話說在一個工做者處理完成,發送確認以前不要給它分發一個新的消息。代替,把消息分發到下一個不繁忙的工做者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意隊列大小
若是你的全部工做者是在忙碌,你的隊列就會被填滿。你將會想關注這件事,可能要添加更多的工做者,或者有些其餘策略。
咱們的NewTask.java
最終代碼:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
(NewTask.java source)
咱們的Worker.java
代碼:
java.lang.InterruptedException { import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... }
(Worker.java source)
使用消息確認和預讀數量你能夠創建一個工做隊列。持久化選項使得RabbitMQ重啓以後任務依舊存在。
想要了解更多關於通道方法和消息屬性,你能夠瀏覽javadocs online
。
如今咱們能夠移到指南3了,學習怎麼樣將相同的消息傳遞給多個消費者
轉載:http://my.oschina.net/OpenSourceBO/blog/379735