RabbitMQ(三)

   這節咱們主要講RabbitMQ的分發,由生產者發佈一個任務,多個接受者去獲取任務來進行加工處理。java

下面介紹任務分發

  

  一個隊列的優勢就是很容易處理並行化的工做能力,可是若是咱們積累了大量的工做,咱們就須要更多的工做者來處理,這裏就要採用分佈機制了。數據庫

  咱們建立一個新的生產者NewTask服務器

package com.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
    private static final String TASK_QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        
        //分發消息
        for(int i=0; i<10; i++){
            String message = "Hello RabbitMQ" + i;
            //MessageProperties.PERSISTENT_TEXT_PLAIN設置持久化
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("NewTask send :" + message);
        }
        channel.close();
        connection.close();
    }
}

  而後咱們建立一個Work1和Work2去接收任務,其中Work1和Work2代碼同樣。負載均衡

package com.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("Worker1  Waiting for messages");
        
        //每次從隊列獲取的數量
        channel.basicQos(1);
        
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException{
                  String message = new String(body, "UTF-8");
                  System.out.println("Worker1 Received:" + message);
                  try{
                      Thread.sleep(1000); // 暫停1秒鐘
                  }catch(Exception e){
                      //此操做中的全部異常將被丟棄
                      channel.abort();
                  }finally{
                      System.out.println("Worker1 Done");
                      //消息處理完成後手工確認,即下面的basicConsume第二個參數要爲false。
                      channel.basicAck(envelope.getDeliveryTag(), false);
                  }
            }
        };
        boolean autoAck = false;
        //消息完成確認
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
}
package com.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Work2 {
    private static final String TASK_QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("Worker2  Waiting for messages");
        
        //每次從隊列獲取的數量
        channel.basicQos(1);
        
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException{
                  String message = new String(body, "UTF-8");
                  System.out.println("Worker2 Received:" + message);
                  try{
                      Thread.sleep(1000); // 暫停1秒鐘
                  }catch(Exception e){
                      //此操做中的全部異常將被丟棄
                      channel.abort();
                  }finally{
                      System.out.println("Worker2 Done");
                      //消息處理完成後手工確認,即下面的basicConsume第二個參數要爲false。
                      channel.basicAck(envelope.getDeliveryTag(), false);
                  }
            }
        };
        boolean autoAck = false;
        //消息完成確認
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
}

運行結果:ide

  NewTask:spa

  

  Work1:3d

  

  Work2:code

  

*注1(重要):

  channel.basicQos(1);保證一次只分發一個,默認狀況下,RabbitMQ將隊列消息隨機分配給每一個消費者,這時可能出現消息調度不均衡的問題。例若有兩臺消費者服務器,一個服務器可能很是繁忙,消息不斷,另一個卻很清閒,沒有什麼負載。RabbitMQ不會主動介入這些狀況,仍是會隨機調度消息到每臺服務器。這是由於RabbitMQ此時只負責調度消息,不會根據ACK的反饋機制來分析那臺服務器返回反饋慢,是否是處理不過來啊?blog

  爲了解決這個問題,咱們可使用channel.basicQos(1)這個設置。這個設置告訴RabbitMQ,不要一次將多個消息發送給一個消費者。這樣作的好處是隻有當消費者處理完成當前消息並反饋後,纔會收到另一條消息或任務。這樣就避免了負載不均衡的事情了。rabbitmq

*注2(重要):

  autoAck是否自動回覆,若是爲true的話,每次生產者只要發送信息就會從內存中刪除,那麼若是消費者程序異常退出,那麼就沒法獲取數據,咱們固然是不但願出現這樣的狀況,因此纔去手動回覆,每當消費者收到並處理信息而後在通知生成者。最後從隊列中刪除這條信息。若是消費者異常退出,若是還有其餘消費者,那麼就會把隊列中的消息發送給其餘消費者,若是沒有,等消費者啓動時候再次發送。所以一旦將autoAck關閉以後,必定要記得處理完消息以後,向服務器確認消息。不然服務器將會一直轉發該消息。若是忘記了向服務器確認處理完消息的話,隊列中的信息會一直存在。好比將NewTask運行一次,Work1中註釋掉channel.basicAck(envelope.getDeliveryTag(), false); 而且一次獲取10條信息,那麼每運行一次Work1都會收到隊列task_queue的消息。

  所以忘記確認 忘記經過basicAck返回確認信息是常見的錯誤。這個錯誤很是嚴重,將致使消費者客戶端退出或者關閉後,消息會被退回RabbitMQ服務器,這會使RabbitMQ服務器內存爆滿,並且RabbitMQ也不會主動刪除這些被退回的消息。 

*注3(重要):

  可是除了設置ack手動回覆之外,仍是不夠的,若是RabbitMQ-Server忽然掛掉了,那麼尚未被讀取的消息仍是會丟失 ,因此咱們可讓消息持久化。 只須要在定義Queue時,設置持久化消息就能夠了,方法以下:

boolean durable = true;
channel.queueDeclare(channelName, durable, false, false, null);

  這樣設置以後,服務器收到消息後就會馬上將消息寫入到硬盤,就能夠防止忽然服務器掛掉,而引發的數據丟失了。可是服務器若是剛收到消息,還沒來得及寫入到硬盤,就掛掉了,這樣仍是沒法避免消息的丟失。
由於RabbitMQ不作實時當即的磁盤同步(fsync)。這種狀況下,對於持久化要求不是特別高的簡單任務隊列來講,仍是能夠知足的。若是須要更強大的保證,那麼你能夠考慮使用生產者確認反饋機制。 
注意,服務器重啓後這條隊列頗有可能會報錯,由於已經定義的隊列,再次定義是無效的,這就是冪次原理。RabbitMQ不容許從新定義一個已有的隊列信息,也就是說不容許修改已經存在的隊列的參數。若是你非要這樣作,只會返回異常。
所以一個快速有效的方法就是從新聲明另外一個名稱的隊列,不過這須要修改生產者和消費者的代碼,因此,在開發時,最好是將隊列名稱放到配置文件中。這時,即便RabbitMQ服務器重啓,新隊列中的消息也不會丟失。

總結:

  1:不要一次將多個消息給一個消費者,採用負載均衡。

  2:channel.basicConsume()裏的ack參數。當從隊列當中取出一個消息的時候,RabbitMQ須要應用顯式地回饋說已經獲取到了該消息。若是一段時間內不回饋,RabbitMQ會將該消息從新分配給另一個綁定在該隊列上的消費者。另外一種狀況是消費者斷開鏈接,可是獲取到的消息沒有回饋,則RabbitMQ一樣從新分配。若是將該參數設置爲true,則RabbimtMQ會爲下一個AMQP請求添加一個ack屬性,告訴AMQP服務器須要等待回饋。否者,不要等待回饋。大多數時候,你也許想要本身手工發送回饋,例如,須要在回饋以前將消息存入數據庫。回饋一般是經過調用 channel.basicAck(deliveryTag, multiple)方法。

  3:持久化隊列。並最好將隊列名稱寫在配置文件中。

相關文章
相關標籤/搜索