RabbitMQ消息隊列:任務分發機制

        咱們解決了從發送端(Producer)向接收端(Consumer)發送「Hello World」的問題。在實際的應用場景中,這是遠遠不夠的。從本篇文章開始,咱們將結合更加實際的應用場景來說解更多的高級用法。java

        有時Consumer須要大量的運算時,RabbitMQ Server須要必定的分發機制來balance每一個Consumer的load。試想一下,對於web application來講,在一個不少的HTTP request裏是沒有時間來處理複雜的運算的,只能經過後臺的多個工做線程來完成,隊列中的任務將會被工做線程共享執行,這樣的概念在web應用這很是有用。接下來咱們分佈講解。 
web

   應用場景就是RabbitMQ Server會將queue的Message分發給不一樣的Consumer以處理計算密集型的任務:
安全

 

1. 準備

       實際應用Consumer可能作的是計算密集型的工做,那就不能簡單的字符串了。在現實應用中,Consumer有可能作的是一個圖片的resize,或者是pdf文件的渲染或者內容提取。可是做爲Demo,仍是用字符串模擬吧:經過字符串中的.的數量來決定計算的複雜度,每一個.都會消耗1s,即sleep(1)。併發

發送端:app

package com.zhy.rabbitMq._02_workqueue;學習

import java.io.IOException;測試

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;fetch

public class NewTask{
     //隊列名稱
     private final static String QUEUE_NAME = "queue2";spa

     public static void main(String[] args) throws IOException{
          //建立鏈接和頻道
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
          //聲明隊列
          channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          //發送10條消息,依次在消息後面附加1-10個點
          for (int i = 0; i < 10; i++){
               String dots = "";
               for (int j = 0; j <= i; j++){
                    dots += ".";
           }
           String message = "helloworld" + dots+dots.length();
           channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
           System.out.println(" [x] Sent '" + message + "'");
      }
      //關閉頻道和資源
      channel.close();
      connection.close();.net

     }
}

接收端:

package com.zhy.rabbitMq._02_workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work{
     //隊列名稱
     private final static String QUEUE_NAME = "workqueue";

     public static void main(String[] argv) throws java.io.IOException,
       java.lang.InterruptedException{
      //區分不一樣工做進程的輸出
      int hashCode = Work.class.hashCode();
      //建立鏈接和頻道
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      //聲明隊列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      System.out.println(hashCode
        + " [*] Waiting for messages. To exit press CTRL+C");
 
      QueueingConsumer consumer = new QueueingConsumer(channel);
      // 指定消費隊列
      channel.basicConsume(QUEUE_NAME, true, consumer);
      while (true){
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       String message = new String(delivery.getBody());

       System.out.println(hashCode + " [x] Received '" + message + "'");
       doWork(message);
       System.out.println(hashCode + " [x] Done");

      }

 }

 /**
  * 每一個點耗時1s
  * @param task
  * @throws InterruptedException
  */
 private static void doWork(String task) throws InterruptedException{
      for (char ch : task.toCharArray()){
           if (ch == '.')
            Thread.sleep(1000);
      }
     }
}

2. Round-robin dispatching 循環分發

        RabbitMQ的分發機制很是適合擴展,並且它是專門爲併發程序設計的。若是如今load加劇,那麼只須要建立更多的Consumer來進行任務處理便可。固然了,對於負載還要加大怎麼辦?我沒有遇到過這種狀況,那就能夠建立多個virtual Host,細化不一樣的通訊類別了。

     首先開啓個Consumer,即運行兩個工做者。

[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'

工做者1:
605645 [*] Waiting for messages. To exit press CTRL+C
605645 [x] Received 'helloworld.1'
605645 [x] Done
605645 [x] Received 'helloworld....3'
605645 [x] Done

工做者2:
18019860 [*] Waiting for messages. To exit press CTRL+C
18019860 [x] Received 'helloworld..2'
18019860 [x] Done
18019860 [x] Received 'helloworld.....4'
18019860 [x] Done

能夠看到,默認的,RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而無論每一個任務的時長等等,且是一次性分配,並不是一個一個分配。平均的每一個消費者將會得到相等數量的消息。這樣分發消息的方式叫作round-robin。中種分發還有問題,接着瞭解吧!

 

3. Message acknowledgment 消息確認

每一個Consumer可能須要一段時間才能處理完收到的數據。你可能擔憂一個工做者(Consumer)在這個過程當中出錯了,異常退出了,而數據尚未處理完成,那麼很是不幸這段數據就丟失了。由於咱們採用no-ack的方式進行確認,一旦RabbitMQ交付了一個消息給消費者,會立刻從內存中移除這條信息。也就是說,每次Consumer接到數據後,而不論是否處理完成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。

     上述問題是很是嚴重的,可是若是一個Consumer異常退出了,它處理的數據可以被另外的Consumer處理,這樣數據在這種狀況下就不會丟失了(注意是這種狀況下)。

      爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。

    在處理數據後發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠去安全的刪除它了。

    若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的狀況下數據也不會丟失。

    這裏並無用到超時機制。RabbitMQ僅僅經過Consumer的鏈接中斷來確認該Message並無被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來作數據處理。

    默認狀況下,消息確認是打開的(enabled)。上面代碼中咱們經過autoAsk= True 關閉了ack。從新修改一下callback,以在消息處理完成後發送ack:

boolean ack = false ; //打開應答機制  

channel.basicConsume(QUEUE_NAME, ack, consumer);  

//另外須要在每次處理完成一個消息後,手動發送一次應答。  

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

這樣即便你經過Ctr-C中斷了Consumer,那麼Message也不會丟失了,它會被分發到下一個Consumer。

      若是忘記了ack,那麼後果很嚴重。當Consumer退出時,Message會從新分發。而後RabbitMQ會佔用愈來愈多的內存,因爲RabbitMQ會長時間運行,所以這個「內存泄漏」是致命的。去調試這種錯誤,能夠經過一下命令打印

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

4. Message durability消息持久化

        咱們學習了消費者被殺死,Message也不會丟失。可是若是RabbitMQ Server退出呢?軟件都有bug,即便RabbitMQ Server是完美毫無bug的,它仍是有可能退出的:被其它軟件影響,或者系統重啓了,系統panic了。。。

    爲了保證在RabbitMQ退出或者crash了數據仍沒有丟失,須要將queue和Message都要持久化。

queue的持久化須要在聲明時指定durable=True:

第一, 咱們須要確認RabbitMQ永遠不會丟失咱們的隊列。爲了這樣,咱們須要聲明它爲持久化的。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
注:RabbitMQ不容許使用不一樣的參數從新定義一個隊列,因此已經存在的隊列,咱們沒法修改其屬性。
第二, 咱們須要標識咱們的信息爲持久化的。經過設置MessageProperties(implements BasicProperties)值爲PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
如今你能夠執行一個發送消息的程序,而後關閉服務,再從新啓動服務,運行消費者程序作下實驗。

5. Fair dispatch 公平分發

    你可能也注意到了,分發機制不是那麼優雅。默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。固然n是取餘後的。它無論Consumer是否還有unacked Message,只是按照這個默認機制進行分發。

   那麼若是有個Consumer工做比較重,那麼就會致使有的Consumer基本沒事可作,有的Consumer倒是毫無休息的機會。那麼,RabbitMQ是如何處理這種問題呢?

  經過 basic.qos 方法設置prefetch_count=1 。這樣RabbitMQ就會使得每一個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。 設置方法以下:

int prefetchCount = 1;  

channel.basicQos(prefetchCount);  

測試:改變發送消息的代碼,將消息末尾點數改成3-2個,而後首先開啓兩個工做者,接着發送消息:
[x] Sent 'helloworld...3'
[x] Sent 'helloworld..2'

工做者1:
18019860 [*] Waiting for messages. To exit press CTRL+C
18019860 [x] Received 'helloworld...3'
18019860 [x] Done

工做者2:
31054905 [*] Waiting for messages. To exit press CTRL+C
31054905 [x] Received 'helloworld..2'
31054905 [x] Done

能夠看出此時並無按照以前的Round-robin機制進行轉發消息,而是當消費者不忙時進行轉發。且這種模式下支持動態增長消費者,由於消息並無發送出去,動態增長了消費者立刻投入工做。而默認的轉發機制會形成,即便動態增長了消費者,此時的消息已經分配完畢,沒法當即加入工做,即便有不少未完成的任務。

6. 最終版本

發送端:

package com.zhy.rabbitMq._02_workqueue;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask
{
 // 隊列名稱
 private final static String QUEUE_NAME = "workqueue_persistence";

 public static void main(String[] args) throws IOException
 {
  // 建立鏈接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 聲明隊列
  boolean durable = true;// 一、設置隊列持久化
  channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  // 發送10條消息,依次在消息後面附加1-10個點
  for (int i = 5; i > 0; i--)
  {
   String dots = "";
   for (int j = 0; j <= i; j++)
   {
    dots += ".";
   }
   String message = "helloworld" + dots + dots.length();
   // MessageProperties 二、設置消息持久化
   channel.basicPublish("", QUEUE_NAME,
     MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
   System.out.println(" [x] Sent '" + message + "'");
  }
  // 關閉頻道和資源
  channel.close();
  connection.close();

 }

}

接收端:

package com.zhy.rabbitMq._02_workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work
{
 // 隊列名稱
 private final static String QUEUE_NAME = "workqueue_persistence";

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 區分不一樣工做進程的輸出
  int hashCode = Work.class.hashCode();
  // 建立鏈接和頻道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 聲明隊列
  boolean durable = true;
  channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  System.out.println(hashCode
    + " [*] Waiting for messages. To exit press CTRL+C");
  //設置最大服務轉發消息數量
  int prefetchCount = 1;
  channel.basicQos(prefetchCount);
  QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定消費隊列
  boolean ack = false; // 打開應答機制
  channel.basicConsume(QUEUE_NAME, ack, consumer);
  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());

   System.out.println(hashCode + " [x] Received '" + message + "'");
   doWork(message);
   System.out.println(hashCode + " [x] Done");
   //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

  }

 }

 /**
  * 每一個點耗時1s
  *
  * @param task
  * @throws InterruptedException  */ private static void doWork(String task) throws InterruptedException {  for (char ch : task.toCharArray())  {   if (ch == '.')    Thread.sleep(1000);  } }}

相關文章
相關標籤/搜索