在上篇介紹瞭如何簡單的發送一個消息隊列以後,咱們本篇來看下RabbitMQ的另一種模式,工做隊列。ide
咱們上篇文章說的是,一個生產者生產了消息被一個消費者消費了,以下圖學習
上面這種簡單的消息隊列確實能夠處理咱們的任務,可是當咱們隊列中的任務過多,處理每條任務有須要很長的耗時,那麼使用一個消費者處理消息顯然不不夠的,因此咱們能夠增長消費者,來共享消息隊列中的消息,進行任務處理。spa
也就是以下圖code
雖然上圖我只花了一個生產者A,那麼同理,能有多個消費者,那也能多個生產者。隊列
public class Send { public static final String QUEUE_NAME = "test_word_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 獲取鏈接 Connection connection = MQConnectUtil.getConnection(); // 建立通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 模擬發送20條消息 for (int i = 0; i < 20; i++) { String msg = "消息:" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i * 20); System.out.println(msg); } channel.close(); connection.close(); } }
public class Consumer1 { public static final String QUEUE_NAME = "test_word_queue"; public static void main(String[] args) throws Exception { // 獲取鏈接 Connection connection = MQConnectUtil.getConnection(); // 建立頻道 Channel channel = connection.createChannel(); // 隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消費者[A]-內容:" + msg); Thread.sleep(2 * 1000); } }; // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); } }
public class Consumer2 { public static final String QUEUE_NAME = "test_word_queue"; public static void main(String[] args) throws Exception { // 獲取鏈接 Connection connection = MQConnectUtil.getConnection(); // 建立頻道 Channel channel = connection.createChannel(); // 隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @SneakyThrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消費者[B]-內容:" + msg); Thread.sleep(1000); } }; // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); } }
咱們來看下消費者A和消費者B的消費狀況rem
有沒有發現什麼問題,我總過模擬發送了20條消息,細心的同窗能夠發現,消費者A和消費者B消費了一樣多的消息,都消費了10天,可是我在消費者A和消費者B中,什麼sleep不通的時長,按道理說消費者B要比消費者A處理消息的速度塊,處理的消息更多,那麼爲何會產生這樣的緣由?get
默認狀況下,RabbitMQ會將每一個消息依次發送給下一個消費者,每一個消費者收到的消息數量實際上是同樣的,咱們把這種分發消息的方式稱爲輪訓分發模式。消息隊列
本篇咱們就簡單介紹這麼多內容,有心學習的童鞋必定要敲敲代碼,看不必定能看會,只有本身敲一遍,纔能有所理解。it
更多內容請關注:io