這節咱們主要講RabbitMQ的分發,由生產者發佈一個任務,多個接受者去獲取任務來進行加工處理。java
一個隊列的優勢就是很容易處理並行化的工做能力,可是若是咱們積累了大量的工做,咱們就須要更多的工做者來處理,這裏就要採用分佈機制了。數據庫
咱們建立一個新的生產者NewTask服務器
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //分發消息 for(int i=0; i<10; i++){ String message = "Hello RabbitMQ" + i; //MessageProperties.PERSISTENT_TEXT_PLAIN設置持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("NewTask send :" + message); } channel.close(); connection.close(); } }
而後咱們建立一個Work1和Work2去接收任務,其中Work1和Work2代碼同樣。負載均衡
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP; public class Work1 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); System.out.println("Worker1 Waiting for messages"); //每次從隊列獲取的數量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String message = new String(body, "UTF-8"); System.out.println("Worker1 Received:" + message); try{ Thread.sleep(1000); // 暫停1秒鐘 }catch(Exception e){ //此操做中的全部異常將被丟棄 channel.abort(); }finally{ System.out.println("Worker1 Done"); //消息處理完成後手工確認,即下面的basicConsume第二個參數要爲false。 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; //消息完成確認 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } }
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Work2 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); System.out.println("Worker2 Waiting for messages"); //每次從隊列獲取的數量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String message = new String(body, "UTF-8"); System.out.println("Worker2 Received:" + message); try{ Thread.sleep(1000); // 暫停1秒鐘 }catch(Exception e){ //此操做中的全部異常將被丟棄 channel.abort(); }finally{ System.out.println("Worker2 Done"); //消息處理完成後手工確認,即下面的basicConsume第二個參數要爲false。 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; //消息完成確認 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } }
運行結果:ide
NewTask:spa
Work1:3d
Work2:code
channel.basicQos(1);保證一次只分發一個,默認狀況下,RabbitMQ將隊列消息隨機分配給每一個消費者,這時可能出現消息調度不均衡的問題。例若有兩臺消費者服務器,一個服務器可能很是繁忙,消息不斷,另一個卻很清閒,沒有什麼負載。RabbitMQ不會主動介入這些狀況,仍是會隨機調度消息到每臺服務器。這是由於RabbitMQ此時只負責調度消息,不會根據ACK的反饋機制來分析那臺服務器返回反饋慢,是否是處理不過來啊?blog
爲了解決這個問題,咱們可使用channel.basicQos(1)這個設置。這個設置告訴RabbitMQ,不要一次將多個消息發送給一個消費者。這樣作的好處是隻有當消費者處理完成當前消息並反饋後,纔會收到另一條消息或任務。這樣就避免了負載不均衡的事情了。rabbitmq
autoAck是否自動回覆,若是爲true的話,每次生產者只要發送信息就會從內存中刪除,那麼若是消費者程序異常退出,那麼就沒法獲取數據,咱們固然是不但願出現這樣的狀況,因此纔去手動回覆,每當消費者收到並處理信息而後在通知生成者。最後從隊列中刪除這條信息。若是消費者異常退出,若是還有其餘消費者,那麼就會把隊列中的消息發送給其餘消費者,若是沒有,等消費者啓動時候再次發送。所以一旦將autoAck關閉以後,必定要記得處理完消息以後,向服務器確認消息。不然服務器將會一直轉發該消息。若是忘記了向服務器確認處理完消息的話,隊列中的信息會一直存在。好比將NewTask運行一次,Work1中註釋掉channel.basicAck(envelope.getDeliveryTag(), false); 而且一次獲取10條信息,那麼每運行一次Work1都會收到隊列task_queue的消息。
所以忘記確認 忘記經過basicAck返回確認信息是常見的錯誤。這個錯誤很是嚴重,將致使消費者客戶端退出或者關閉後,消息會被退回RabbitMQ服務器,這會使RabbitMQ服務器內存爆滿,並且RabbitMQ也不會主動刪除這些被退回的消息。
可是除了設置ack手動回覆之外,仍是不夠的,若是RabbitMQ-Server忽然掛掉了,那麼尚未被讀取的消息仍是會丟失 ,因此咱們可讓消息持久化。 只須要在定義Queue時,設置持久化消息就能夠了,方法以下:
boolean durable = true; channel.queueDeclare(channelName, durable, false, false, null);
這樣設置以後,服務器收到消息後就會馬上將消息寫入到硬盤,就能夠防止忽然服務器掛掉,而引發的數據丟失了。可是服務器若是剛收到消息,還沒來得及寫入到硬盤,就掛掉了,這樣仍是沒法避免消息的丟失。
由於RabbitMQ不作實時當即的磁盤同步(fsync)。這種狀況下,對於持久化要求不是特別高的簡單任務隊列來講,仍是能夠知足的。若是須要更強大的保證,那麼你能夠考慮使用生產者確認反饋機制。
注意,服務器重啓後這條隊列頗有可能會報錯,由於已經定義的隊列,再次定義是無效的,這就是冪次原理。RabbitMQ不容許從新定義一個已有的隊列信息,也就是說不容許修改已經存在的隊列的參數。若是你非要這樣作,只會返回異常。
所以一個快速有效的方法就是從新聲明另外一個名稱的隊列,不過這須要修改生產者和消費者的代碼,因此,在開發時,最好是將隊列名稱放到配置文件中。這時,即便RabbitMQ服務器重啓,新隊列中的消息也不會丟失。
1:不要一次將多個消息給一個消費者,採用負載均衡。
2:channel.basicConsume()裏的ack參數。當從隊列當中取出一個消息的時候,RabbitMQ須要應用顯式地回饋說已經獲取到了該消息。若是一段時間內不回饋,RabbitMQ會將該消息從新分配給另一個綁定在該隊列上的消費者。另外一種狀況是消費者斷開鏈接,可是獲取到的消息沒有回饋,則RabbitMQ一樣從新分配。若是將該參數設置爲true,則RabbimtMQ會爲下一個AMQP請求添加一個ack屬性,告訴AMQP服務器須要等待回饋。否者,不要等待回饋。大多數時候,你也許想要本身手工發送回饋,例如,須要在回饋以前將消息存入數據庫。回饋一般是經過調用 channel.basicAck(deliveryTag, multiple)方法。
3:持久化隊列。並最好將隊列名稱寫在配置文件中。