本文來自:Rabbitmq的五種模式和案例spring
消息生產者p將消息放入隊列後端
消費者監聽隊列,若是隊列中有消息,就消費掉,消息被拿走後,自動從隊列刪除
(隱患,消息可能沒有被消費者正確處理,已經消失了,沒法恢復)數組
應用場景:聊天室 服務器
案例:spring-boot
1>.首先準備依賴工具
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2>.寫一個test類spa
public class SimpleTest { //模擬生產者將消息放入隊列 @Test public void send() throws Exception{ /*1 建立鏈接工廠 * 2 配置共創config * 3 獲取鏈接 * 4獲取信道 * 5 從信道聲明queue * 6 發送消息 * 7 釋放資源 */ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("106.23.34.56"); factory.setPort(5672); factory.setVirtualHost("/tb"); factory.setUsername("admin"); factory.setPassword("123456"); //從工廠獲取鏈接 Connection conn=factory.newConnection(); //從鏈接獲取信道 Channel chan=conn.createChannel(); //利用channel聲明第一個隊列 chan.queueDeclare("simple", false, false, false, null); //queue String類型,表示聲明的queue對列的名字 //durable Boolean類型,表示是否持久化 //exclusive Boolean類型:當前聲明的queue是否專一;true當前鏈接建立的 //任何channle均可以鏈接這個queue,false,新的channel不可以使用 //autoDelete Boolean類型:在最後鏈接使用完成後,是否刪除隊列,false //arguments Map類型,其餘聲明參數 //發送消息 String msg="helloworld,nihaoa"; chan.basicPublish("", "simple", null, msg.getBytes()); //exchange String類型,交換機名稱,簡單模式使用默認交換"" //routingkey String類型,當前的消息綁定的routingkey,簡單模式下,與隊列同名便可 //props BasicProperties類型,消息的屬性字段對象,例如BasicProperties //能夠設置一個deliveryMode的值0 持久化,1 表示不持久化,durable配合使用 //body byte[] :消息字符串的byte數組 } //模擬消費端 @Test public void receive() throws Exception{
ConnectionFactory factory=new ConnectionFactory(); factory.setHost("106.23.34.56"); factory.setPort(5672); factory.setVirtualHost("/tb"); factory.setUsername("admin"); factory.setPassword("123456"); //從工廠獲取鏈接
Connection conn=factory.newConnection();//從鏈接獲取信道Channel chan=conn.createChannel();chan.queueDeclare("simple", false, false, false, null);//建立一個消費者QueueingConsumer consumer= new QueueingConsumer(chan);chan.basicConsume("simple", consumer);//監聽隊列while(true){//獲取下一個delivery,delivery從隊列獲取消息Delivery delivery = consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println(msg);}}}.net
生產者將消息放入隊列
多個消費者同時監聽同一個隊列,消息如何被消費?
C1,C2共同爭搶當前消息隊列的內容,誰先拿到消息,誰來負責消費
應用場景:紅包;大型項目中的資源調度過程(直接由最空閒的系統爭搶到資源處理任務) code
案例:xml
1>首先寫一個工具類
public class ConnectionUtil { public static Connection getConn(){ try{ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("106.33.44.179"); factory.setPort(5672); factory.setVirtualHost("/tb"); factory.setUsername("admin"); factory.setPassword("123456"); //從工廠獲取鏈接 Connection conn=factory.newConnection(); return conn; }catch(Exception e){ System.out.println(e.getMessage()); return null; } } }
2>寫test類
public class WorkTest { @Test public void send() throws Exception{ //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明隊列 chan.queueDeclare("work", false, false, false, null); for(int i=0;i<100;i++){ String msg="1712,hello:"+i+"message"; chan.basicPublish("", "work", null, msg.getBytes()); System.out.println("第"+i+"條信息已經發送"); } chan.close(); conn.close(); } @Test public void receive1() throws Exception{ //獲取鏈接,獲取信道 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); chan.queueDeclare("work", false, false, false, null); //同一時刻服務器只發送一條消息給同一消費者,消費者空閒,才發送一條 chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //綁定隊列和消費者的關係 //queue //autoAck:消息被消費後,是否自動確認回執,若是false,不自動須要手動在 //完成消息消費後進行回執確認,channel.ack,channel.nack //callback //chan.basicConsume(queue, autoAck, callback) chan.basicConsume("work", false, consumer); //監聽 while(true){ Delivery delivery=consumer.nextDelivery(); byte[] result = delivery.getBody(); String msg=new String(result); System.out.println("接受到:"+msg); Thread.sleep(50); //返回服務器,回執 chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } @Test public void receive2() throws Exception{ //獲取鏈接,獲取信道 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); chan.queueDeclare("work", false, false, false, null); //同一時刻服務器只發送一條消息給同一消費者,消費者空閒,才發送一條 chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //綁定隊列和消費者的關係 //queue //autoAck:消息被消費後,是否自動確認回執,若是false,不自動須要手動在 //完成消息消費後進行回執確認,channel.ack,channel.nack //callback //chan.basicConsume(queue, autoAck, callback) chan.basicConsume("work", false, consumer); //監聽 while(true){ Delivery delivery=consumer.nextDelivery(); byte[] result = delivery.getBody(); String msg=new String(result); System.out.println("接受到:"+msg); Thread.sleep(150); //返回服務器,回執 chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
生產者將消息交給交換機
有交換機根據發佈訂閱的模式設定將消息同步到全部的綁定隊列中;
後端的消費者都能拿到消息
應用場景:郵件羣發,羣聊天,廣告
案例:
public class FanoutTest { //交換機,有類型,發佈訂閱:fanout //路由模式:direct //主題模式:topic @Test public void send() throws Exception { //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明交換機 //參數意義,1 交換機名稱,2 類型:fanout,direct,topic chan.exchangeDeclare("fanoutEx", "fanout"); //發送消息 for(int i=0;i<100;i++){ String msg="1712 hello:"+i+"msg"; chan.basicPublish("fanoutEx", "", null, msg.getBytes()); System.out.println("第"+i+"條信息已經發送"); } } @Test public void receiv01() throws Exception{ //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //生命隊列 chan.queueDeclare("fanout01", false, false, false, null); //聲明交換機 chan.exchangeDeclare("fanoutEx", "fanout"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("fanout01", "fanoutEx", ""); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("fanout01",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("一號消費者接收到"+ new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope(). getDeliveryTag(), false); } } @Test public void receiv02() throws Exception{ //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //生命隊列 chan.queueDeclare("fanout02", false, false, false, null); //聲明交換機 chan.exchangeDeclare("fanoutEx", "fanout"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("fanout02", "fanoutEx", ""); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("fanout02",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("二號消費者接收到"+new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
生產者發送消息到交換機,同時綁定一個路由Key,交換機根據路由key對下游綁定的隊列進行路
由key的判斷,知足路由key的隊列纔會接收到消息,消費者消費消息
應用場景: 項目中的error報錯
案例:
public class RoutingTopicTest { @Test public void routingSend() throws Exception{ //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明交換機 //參數意義,1 交換機名稱,2 類型:fanout,direct,topic chan.exchangeDeclare("directEx", "direct"); //發送消息 String msg="路由模式的消息"; chan.basicPublish("directEx", "jt1713", null, msg.getBytes()); } @Test public void routingRec01() throws Exception{ System.out.println("一號消費者等待接收消息"); //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明隊列 chan.queueDeclare("direct01", false, false, false, null); //聲明交換機 chan.exchangeDeclare("directEx", "direct"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("direct01", "directEx", "jt1712"); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("direct01",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("一號消費者接收到"+ new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope(). getDeliveryTag(), false); } } @Test public void routingRec02() throws Exception{ System.out.println("二號消費者等待接收消息"); //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明隊列 chan.queueDeclare("direct02", false, false, false, null); //聲明交換機 chan.exchangeDeclare("directEx", "direct"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("direct02", "directEx", "jt1711"); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("direct02",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("二號消費者接收到"+ new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope(). getDeliveryTag(), false); } } }
*號表明單個詞語
#表明多個詞語
其餘的內容與routing路由模式一致
案例:
public class RoutingTopicTest { @Test public void routingRec02() throws Exception{ System.out.println("二號消費者等待接收消息"); //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明隊列 chan.queueDeclare("direct02", false, false, false, null); //聲明交換機 chan.exchangeDeclare("directEx", "direct"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("direct02", "directEx", "jt1711"); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("direct02",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("二號消費者接收到"+ new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope(). getDeliveryTag(), false); } } @Test public void topicSend() throws Exception{ //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明交換機 //參數意義,1 交換機名稱,2 類型:fanout,direct,topic chan.exchangeDeclare("topicEx", "topic"); //發送消息 String msg="主題模式的消息"; chan.basicPublish("topicEx", "jt1712.add.update", null, msg.getBytes()); } @Test public void topicRec01() throws Exception{ System.out.println("一號消費者等待接收消息"); //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明隊列 chan.queueDeclare("topic01", false, false, false, null); //聲明交換機 chan.exchangeDeclare("topicEx", "topic"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("topic01", "topicEx", "jt1712"); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("topic01",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("一號消費者接收到"+ new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope(). getDeliveryTag(), false); } } @Test public void topicRec02() throws Exception{ System.out.println("二號消費者等待接收消息"); //獲取鏈接 Connection conn = ConnectionUtil.getConn(); Channel chan = conn.createChannel(); //聲明隊列 chan.queueDeclare("topic02", false, false, false, null); //聲明交換機 chan.exchangeDeclare("topicEx", "topic"); //綁定隊列到交換機 //參數 1 隊列名稱,2 交換機名稱 3 路由key chan.queueBind("topic02", "topicEx", "jt1712.#"); chan.basicQos(1); //定義消費者 QueueingConsumer consumer=new QueueingConsumer(chan); //消費者與隊列綁定 chan.basicConsume("topic02",false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); System.out.println("二號消費者接收到"+ new String(delivery.getBody())); chan.basicAck(delivery.getEnvelope(). getDeliveryTag(), false); } } }