工做隊列模式爲一個生產者對應多個消費者,可是隻有一個消費者得到消息,即一個隊列被多個消費者監聽,但一條消息只能被其中的一個消費者獲取java
代碼以下:git
生產者代碼:github
public class WorkSend { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //獲取鏈接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //聲明通道 Channel channel = connection.createChannel(); //建立隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", "hello", null, "First.".getBytes()); channel.basicPublish("", "hello", null, "Secode..".getBytes()); channel.basicPublish("", "hello", null, "Third....".getBytes()); channel.basicPublish("", "hello", null, "Fourth....".getBytes()); channel.basicPublish("", "hello", null, "Fifth.....".getBytes()); //六、關閉通道和鏈接 channel.close(); connection.close(); } }
消費者代碼spring
public class WorkRecv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //獲取鏈接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //聲明通道 Channel channel = connection.createChannel(); //聲明隊列隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); //channel.basicAck(); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); // DeliverCallback deliverCallback = new DeliverCallback(){ // @Override // public void handle(String consumerTag, Delivery delivery) throws IOException { // String message = new String(delivery.getBody(), "UTF-8"); // System.out.println(" [x] Received '" + message + "'"); // } // }; // // channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){ // @Override // public void handle(String consumerTag) throws IOException { // // } // }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
1 生產者將消息交個交換機 2 交換機交給綁定的隊列 3 隊列由多個消費者同時監聽,只有其中一個可以獲取這一條消息,造成了資源的爭搶,誰的資源空閒大,爭搶到的可能越大;app
Round-robin dispatching(輪詢分發)ide
使用任務隊列的優勢之一是可以輕鬆並行工做。若是咱們這裏積壓了不少的消息,咱們能夠增長work的並行度,這樣就能夠輕鬆擴展。fetch
默認狀況下,RabbitMQ將每一個消息依次發送給下一個消費者。平均而言,每一個消費者都會收到相同數量的消息。這種分發消息的方式稱爲輪詢。ui
Message acknowledgment(消息確認)this
使用咱們當前的代碼,RabbitMQ一旦向消費者發送了一條消息,便當即將其標記爲刪除。在這種狀況下,若是咱們kill掉一個worker,咱們將丟失正在處理的消息。而且還將丟失全部發送給該特定worker但還沒有處理的消息。 爲了確保消息永不丟失,RabbitMQ支持 消息確認。消費者發送回一個確認(告知),告知RabbitMQ特定的消息已被接收並處理,而且RabbitMQ能夠自由刪除它。 若是消費者死了(其通道已關閉,鏈接已關閉或TCP鏈接丟失)而沒有發送確認,RabbitMQ將瞭解消息未徹底處理,並將從新排隊。若是同時有其餘消費者在線,它將很快將其從新分發給另外一個消費者。這樣,您能夠確保即便worker偶爾死亡也不會丟失任何消息。 沒有任何消息超時;消費者死亡時,RabbitMQ將從新傳遞消息。即便處理一條消息花費很是很是長的時間也不要緊。 默認狀況下,手動消息確認處於打開狀態。咱們經過autoAck = false 標誌顯式關閉了它們。在消息完成投遞的時候,手動確認消息投遞成功。spa
消費者代碼修改以下:
public class ManWorkRecv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //獲取鏈接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //聲明通道 Channel channel = connection.createChannel(); //聲明隊列隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); //手動確認消息已經成功投遞 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; //設置消息手動確認 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
上述代碼能夠確保咱們在kill掉一個消費者的狀況下,消息不會丟失。
Message durability(消息持久化)
前面已經講了即便一個消費者退出,消息不會丟失,可是若是RabbitMQ服務退出時,隊列和消息仍然會丟失,這是由於 默認隊列和消息都是放在內存中的 。爲保證消息和隊列不丟失,須要把隊列和消息設置爲持久化 確保RabbitMQ永遠不會丟失咱們的隊列。聲明持久化代碼以下:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
因爲RabbitMQ已經存在一個hello隊列,而且RabbitMQ不支持對已經存在的隊列進行參數修改,因此須要咱們刪除以前建立的隊列或者從新建立一個隊列。
其次,咱們須要保證消息的持久化,消息持久化設置以下:
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "Fifth.....".getBytes());
這樣,咱們就能夠保證RabbitMQ服務宕機的狀況下,消息和隊列都不會丟失。
Fair dispatching(公平分發)
RabbitMQ在進行消息分發的時候,能夠設置一次分發給某一個消費者多少條消息,如在消費者端設置prefetchCount=1;以下代碼
int prefetchCount = 1 ; channel.basicQos(prefetchCount);
該設置表示,RabbitMQ一次分發給消費者一條消息,在消費者處理並確認上一條消息以前,不會再給這個消費者發送一條新消息,而會將其分發給其餘的消費者
SpringBoot實現:
@SpringBootApplication @EnableScheduling public class RabbitAmqpTutorialsApplication { public static void main(String[] args) throws Exception { SpringApplication.run(RabbitAmqpTutorialsApplication.class, args); } }
@Configuration public class Tut2Config { @Bean public Queue hello() { return new Queue("hello"); } /** * 這裏是啓動兩個消費者 */ private static class ReceiverConfig{ @Bean public Tut2Receiver receiver1(){ return new Tut2Receiver(1); } @Bean public Tut2Receiver receiver2(){ return new Tut2Receiver(2); } } @Bean public Tut2Sender sender() { return new Tut2Sender(); } }
//監聽隊列 @RabbitListener(queues = "hello") public class Tut2Receiver { private final int instance; public Tut2Receiver(int i) { this.instance = i; } @RabbitHandler public void receive(String in) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + this.instance + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + this.instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
public class Tut2Sender { @Autowired private RabbitTemplate template; //隊列 @Autowired private Queue queue; AtomicInteger dots = new AtomicInteger(0); AtomicInteger count = new AtomicInteger(0); /** * 定時向隊列hello發送消息 */ @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { StringBuilder builder = new StringBuilder("Hello"); if (dots.incrementAndGet() == 3) { dots.set(1); } for (int i = 0; i < dots.get(); i++) { builder.append('.'); } builder.append(count.incrementAndGet()); String message = builder.toString(); //向隊列中發送消息 template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }