轉載博客:rabbitmq

原文出處:http://www.cnblogs.com/sam-uncle/p/9202933.htmlhtml

假設有這一些比較耗時的任務,按照上一次的那種方式,咱們要一直等前面的耗時任務完成了以後才能接着處理後面耗時的任務,那要等多久才能處理完?別擔憂,咱們今天的主角--工做隊列就能夠解決該問題。咱們將圍繞下面這個索引展開:java

  1. 什麼是工做隊列
  2. 代碼準備
  3. 循環分發
  4. 消息確認
  5. 公平分發
  6. 消息持久化

廢話少說,直接展開。ide

1、什麼是工做隊列fetch

工做隊列--用來將耗時的任務分發給多個消費者(工做者),主要解決這樣的問題:處理資源密集型任務,而且還要等他完成。有了工做隊列,咱們就能夠將具體的工做放到後面去作,將工做封裝爲一個消息,發送到隊列中,一個工做進程就能夠取出消息並完成工做。若是啓動了多個工做進程,那麼工做就能夠在多個進程間共享。spa

2、代碼準備3d

  1. 生產者類:NewTask.java
    複製代碼
    public class NewTask {
        //隊列名稱
        public static final String QUEUE_NAME = "TASK_QUEUE";
        //隊列是否須要持久化
        public static final boolean DURABLE = false;
        
        //須要發送的消息列表
        public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
        
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.queue
                channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);
    
                // 3.publish msg
                for (int i = 0; i < msgs.length; i++) {
                    channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes());
                    System.out.println("** new task ****:" + msgs[i]);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
    
        }
    }
    複製代碼

     

  2. 消費者類:Work.java
    複製代碼
    public class Work {
    
        public static void main(String[] args) {
            System.out.println("*** Work ***");
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try {
                //1.connection & channel
                final Channel channel = factory.newConnection().createChannel();
                
                //2.queue
                channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null);
    
                //3. consumer instance
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        //deal task
                        doWork(msg);
    
                    }
                };
                
                //4.do consumer
                boolean autoAck = true;
                channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        private static void doWork(String msg) {
            try {
                System.out.println("**** deal task begin :" + msg);
                
                //僞裝task比較耗時,經過sleep()來模擬須要消耗的時間
                if ("sleep".equals(msg)) {
                    Thread.sleep(1000 * 60);
                } else {
                    Thread.sleep(1000);
                }
    
                System.out.println("**** deal task finish :" + msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    複製代碼

     

  3. 再來一個消費者類:Work2.java,代碼同Work.java如出一轍。

 

3、循環分發code

咱們先啓動Work和Work2,而後啓動NewTask,運行結果以下:htm

NewTask運行結果:blog

Work運行結果:索引

Work2運行結果:

 

咱們發現,消息生產者發送了6條消息,消費者work和work2分別分到了3個消息,並且是循環輪流分發到的,這種分發的方式就是循環分發。

4、消息確認

假如咱們在發送的消息裏面添加「sleep"

//須要發送的消息列表
    public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};

根據代碼中的實現,這個sleep要耗時1分鐘,萬一在這1分鐘以內,工做進程崩潰了或者被kill了,會發生什麼狀況呢?根據上面的代碼:

//4.do consumer
            boolean autoAck = true;
            channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);

自動確認爲true,每次RabbitMQ向消費者發送消息以後,會自動發確認消息(我工做你放心,不會有問題),這個時候消息會當即從內存中刪除。若是工做者掛了,那將會丟失它正在處理和未處理的全部工做,並且這些工做還不能再交由其餘工做者處理,這種丟失屬於客戶端丟失。

咱們來驗證下,和剛纔的步驟同樣執行程序:

複製代碼
1.NewTask的控制檯打印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯打印結果:
**** deal task begin :sleep

3.Work2的控制檯打印結果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5
複製代碼

根據上面的內容,消息生產者發送了7條消息, work2消費了一、三、5 三條,那剩下的sleep、二、四、6 這四條消息確定是work來處理,只是sleep耗時一分鐘 ,時間差後面的還沒來得及處理,這個時候咱們kill掉work,去看下RabbitMQ 管理頁面,沒有未處理的消息,消息隨着work被kill也跟着丟失了。

是否是很可怕?

爲了應對這種狀況,RabbitMQ支持消息確認。消費者處理完消息以後,會發送一個確認消息告訴RabbitMQ,消息處理完了,你能夠刪掉它了。

代碼修改(Work.java和Work2.java同步修改):1.將自動確認改成false,2.消息處理以後再經過channel.basicAck進行消息確認

 修改完後,執行程序:

複製代碼
1.NewTask的控制檯打印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯打印結果:
**** deal task begin :sleep

3.Work2的控制檯打印結果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5
複製代碼

而後kill掉work,去看RabbitMQ管理頁面,會發現有4條未確認:

再去看下work2的控制檯,work2將work未處理完和將來得及處理的消息都給處理了:

等work2處理完後,你再去看RabbitMQ管理頁面,會發現頁面的消息數值也都變成0 了。

 

5、公平分發

按照上面那種循環分發的方式,每一個消費者會分到相同數量的任務,這樣會有一個問題:假若有一些task很是耗時,以前的任務尚未完成,後面又來了那麼多任務,來不及處理,那咋辦? 有的消費者忙的不可開交,有的消費者卻很快處理完事情而後無所事事浪費資源,那咋整?答案就是:公平分發。 怎麼實現呢?

 發生上述問題的緣由就是RabbitMQ收到消息後就當即分發出去,而沒有確認各個工做者未返回確認的消息數量。所以咱們可使用basicQos方法,並將參數prefetchCount設爲1,告訴RabbitMQ 我每次值處理一條消息,你要等我處理完了再分給我下一個。這樣RabbitMQ就不會輪流分發了,而是尋找空閒的工做者進行分發。

代碼修改(work和Work2同步修改):

執行代碼:

複製代碼
1.NewTask的控制檯打印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯打印結果:
**** deal task begin :sleep
**** deal task finish :sleep

3.Work2的控制檯打印結果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 2
**** deal task finish :task 2
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 4
**** deal task finish :task 4
**** deal task begin :task 5
**** deal task finish :task 5
**** deal task begin :task 6
**** deal task finish :task 6
複製代碼

Work只處理了sleep,Work2處理了一、二、三、四、五、6 這個六條消息。

6、消息持久化

上面說到消息確認的時候,提到了工做者被kill的狀況。那若是RabbitMQ被stop掉了呢?咱們來看下:

此次只啓動Work和NewTask,不啓動Work2,全部消息都交給Work來處理,控制檯打印信息:

複製代碼
1.NewTask的控制檯打印結果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制檯打印結果:
**** deal task begin :sleep
複製代碼

在work處理sleep的過程當中,咱們停掉RabbitMQ服務

而後從新start服務並執行rabbitmq-plugins enable rabbitmq_management命令,而後查看管理頁面:

你會發現,全部消息都將被清空了。這種丟失屬於服務端丟失

所以須要將消息進行持久化來應對這種狀況。

持久化須要作兩件事情:

  1. 隊列持久化,在聲明隊列的時候,將第二個參數設爲true

          

     另外,因爲RabbitMQ不容許從新定義已經存在的隊列,不然就會報錯(上一篇博客中已經提到過了),所以咱們將此次的隊列名改下:

     

  2. 消息持久化,在發送消息的時候,將第三個參數設爲2

而後運行代碼,在work處理sleep的時候將服務停掉,並從新啓動且執行rabbitmq-plugins enable rabbitmq_management命令,而後查看管理頁面:

 

一共7條消息,未確認的1條(sleep)和ready的6條(一、二、三、四、五、6)。消息被保存了下來。

 從新啓動Work,全部消息被消費:

相關文章
相關標籤/搜索