工做隊列:一個生產者對應多個消費者,生產者直接將消息發送到rabbitmq的隊列之中。java
消息分配模式:公平分配ide
隊列會先給每一個消費者輪流發送一條信息,消費者接收到信息並對之處理。若是不反饋處理結果,隊列就不會再發送信息給該消費者,而是發送給其餘已經處理完一條信息並反饋的消費者。code
簡而言之,哪一個消費者的處理信息效率更高,則收到的信息也越多。blog
生產者:rabbitmq
package com.example.demo.queue.workfair; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "work_fair_queue"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 限定:發送一條信息給消費者A,消費者A未反饋處理結果以前,不會再次發送信息給消費者A channel.basicQos(1); String msg = "msg from producer:"; for(int i=0;i<10;i++) { msg = "msg from producer :" + i; System.out.println("send msg : "+msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { // 關閉通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } // 關閉鏈接 try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消費者1:隊列
package com.example.demo.queue.workfair; import java.io.IOException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer01 { // 隊列名稱 private static final String QUEUE_NAME="work_fair_queue"; public static void main(String[] args) { try { // 獲取鏈接 Connection connection = ConnectionUtil.getConnection(); // 建立通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 限定:發送一條信息給消費者A,消費者A未反饋處理結果以前,不會再次發送信息給消費者A channel.basicQos(1); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1]:receive msg:"+msg); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("[1]:deal msg successful."); } // 反饋消息處理完畢 channel.basicAck(envelope.getDeliveryTag(), false); } }; boolean autoAck = false;// 取消自動反饋 // 接收信息 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉通道 // try { // channel.close(); // } catch (IOException e) { // e.printStackTrace(); // } catch (TimeoutException e) { // e.printStackTrace(); // } // 關閉鏈接 // try { // connection.close(); // } catch (IOException e) { // e.printStackTrace(); // } } } }
消費者2:get
package com.example.demo.queue.workfair; import java.io.IOException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer02 { // 隊列名稱 private static final String QUEUE_NAME="work_fair_queue"; public static void main(String[] args) { try { // 獲取鏈接 Connection connection = ConnectionUtil.getConnection(); // 建立通道 final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 限定:發送一條信息給消費者A,消費者A未反饋處理結果以前,不會再次發送信息給消費者A channel.basicQos(1); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[2]:receive msg:"+msg); try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("[2]:deal msg successful."); } // 反饋消息處理完畢 channel.basicAck(envelope.getDeliveryTag(), false); } }; boolean autoAck = false;// 取消自動反饋 // 接收信息 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉通道 // try { // channel.close(); // } catch (IOException e) { // e.printStackTrace(); // } catch (TimeoutException e) { // e.printStackTrace(); // } // 關閉鏈接 // try { // connection.close(); // } catch (IOException e) { // e.printStackTrace(); // } } } }
執行順序:it
先執行兩個消費者(Consumer01,Consumer02)類的main方法,再執行生產者(Producer)的main方法便可io
順序反過來亦可,可是可能會影響到公平分配的效果。class
生產者終端:
消費者1終端:
消費者2終端:
實現公平分配關鍵點:
// 限定:發送一條信息給消費者A,消費者A未反饋處理結果以前,不會再次發送信息給消費者A channel.basicQos(1);
boolean autoAck = false;// 取消自動反饋 // 接收信息 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// 反饋消息處理完畢 channel.basicAck(envelope.getDeliveryTag(), false);