一:介紹java
1.模式ide
2.使用場景spa
一個生產者,多個消費者code
每個消費者都有本身的隊列blog
生產者沒有直接把消息發送給隊列,而是發送到了交換機rabbitmq
每個隊列都要綁定到交換機隊列
能夠實現一個消息被多個消費者消費。utf-8
二:程序get
1.生產者it
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class FanoutSend { 8 private static final String EXCHANGE_NAME="text_exchange_fanout"; 9 public static void main(String[] args) throws Exception { 10 //獲取一個鏈接 11 Connection connection= ConnectionUtil.getConnection(); 12 //從鏈接中獲取一個通道 13 Channel channel=connection.createChannel(); 14 //建立交換機 15 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 16 17 //消息 18 String msg="hello pubsub"; 19 20 //發送 21 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); 22 23 System.out.println("send msg:"+msg); 24 //關閉鏈接 25 channel.close(); 26 connection.close(); 27 } 28 }
2.消費者一
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive1 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_email"; 11 public static void main(String[] args)throws Exception{ 12 //獲取一個鏈接 13 Connection connection = ConnectionUtil.getConnection(); 14 //建立通道 15 final Channel channel = connection.createChannel(); 16 //建立隊列聲明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //綁定交換機 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能發送一個消息 23 channel.basicQos(1); 24 25 //建立消費者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[1]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手動應答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //監聽隊列,不是自動應答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
3.消費者二
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive2 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_ems"; 11 public static void main(String[] args)throws Exception{ 12 //獲取一個鏈接 13 Connection connection = ConnectionUtil.getConnection(); 14 //建立通道 15 final Channel channel = connection.createChannel(); 16 //建立隊列聲明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //綁定交換機 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能發送一個消息 23 channel.basicQos(1); 24 25 //建立消費者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[2]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手動應答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //監聽隊列,不是自動應答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
4.效果
send:
receive1:
receive2:
5.運行注意點
若是之間運行receive類,會發現報錯,由於沒有交換機。
因此,能夠先運行send類,雖然交換機不能存儲發送的消息,可是能夠建立交換機。
而後,就能夠按照原來的方式。
先啓動兩個消費者進行監聽,而後啓動生產者。
現象:就是消費者都獲取到了生產者生產的消息。