RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml
RabbitMQ-從基礎到實戰(2)— 防止消息丟失java
RabbitMQ-從基礎到實戰(4)— 消息的交換(中)app
RabbitMQ-從基礎到實戰(5)— 消息的交換(下)ide
RabbitMQ-從基礎到實戰(6)— 與Spring集成函數
在前面的例子中,每一個消息都只對應一個消費者,即便有多個消費者在線,也只會有一個消費者接收並處理一條消息,這是消息中間件的一種經常使用方式。性能
另一種方式,生產者生產一條消息,廣播給一個或多個隊列,全部訂閱了這個隊列的消費者,均可以消費這條消息,這就是消息訂閱。this
官方教程列舉了這樣一個場景,生產者發出一條記錄日誌的消息,消費者1接收到後寫日誌到硬盤,消費者2接收到後打印日誌到屏幕。工做中還有不少這樣的場景有待發掘,適當的使用消息訂閱後能夠成倍的增長效率。idea
在前兩章的例子中,咱們涉及到了三個概念spa
這不由讓咱們覺得,生產者生產消息後直接到發送到隊列,消費者從隊列中獲取消息,再消費掉。debug
其實這是錯誤的,在RabbitMQ中,生產者不會直接把消息發送給隊列,實際上,生產者甚至不知道一條消息會不會被髮送到隊列上。
正確的概念是,生產者會把消息發送給RabbitMQ的交換中心(Exchange),Exchange的一側是生產者,另外一側則是一個或多個隊列,由Exchange決定一條消息的生命週期--發送給某些隊列,或者直接丟棄掉。
這個概念在官方文檔中被稱做RabbitMQ消息模型的核心思想(core idea)
以下圖,其中X表明的是Exchange。
RabbitMQ中,有4種類型的Exchange
更詳細的介紹,請看官方文檔
能夠對一個隊列命名是十分重要的,在消費者消費消息時,要指明消費哪一個隊列的消息(下面的queue),這樣就可讓多個消費者同時分享一個隊列
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
上述記錄日誌的場景中,有如下幾個特色
本身聲明隊列是比較麻煩的,所以,RabbitMQ提供了簡便的獲取臨時隊列的方法,該隊列會在鏈接斷開後銷燬
String queueName = channel.queueDeclare().getQueue();
這行代碼會獲取一個名字相似於「amq.gen-JzTY20BRgKO-HjmUJj0wLg」的臨時隊列
再次注意,在RabbitMQ中,消息是發送到Exchange的,不是直接發送的Queue。所以,須要把Queue和Exchange進行綁定,告訴RabbitMQ把指定的Exchange上的消息發送的這個隊列上來
綁定隊列使用此方法
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
其中,queue是隊列名,exchange是要綁定的交換中心,routingKey就是這個queue的routingKey
下面來實現上述場景,生產者發送日誌消息,消費者1記錄日誌,消費者2打印日誌
下面的代碼中,把鏈接工廠等方法放到了構造函數中,也就是說,每new一個對象,都會建立一個鏈接,在生產環境這樣作是很浪費性能的,每次建立一個connection都會創建一次TCP鏈接,生產環境應使用鏈接池。而Channel又不同,多個Channel是共用一個TCP鏈接的,所以能夠放心的獲取Channel(本結論出自官方文檔對Channel的定義)
AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
日誌消息發送類 LogSender
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 public class LogSender { 12 13 private Logger logger = LoggerFactory.getLogger(LogSender.class); 14 private ConnectionFactory factory; 15 private Connection connection; 16 private Channel channel; 17 18 /** 19 * 在構造函數中獲取鏈接 20 */ 21 public LogSender(){ 22 super(); 23 try { 24 factory = new ConnectionFactory(); 25 factory.setHost("127.0.0.1"); 26 connection = factory.newConnection(); 27 channel = connection.createChannel(); 28 } catch (Exception e) { 29 logger.error(" [X] INIT ERROR!",e); 30 } 31 } 32 /** 33 * 提供個關閉方法,如今並無什麼卵用 34 * @return 35 */ 36 public boolean closeAll(){ 37 try { 38 this.channel.close(); 39 this.connection.close(); 40 } catch (IOException | TimeoutException e) { 41 logger.error(" [X] CLOSE ERROR!",e); 42 return false; 43 } 44 return true; 45 } 46 47 /** 48 * 咱們更加關心的業務方法 49 * @param message 50 */ 51 public void sendMessage(String message) { 52 try { 53 //聲明一個exchange,命名爲logs,類型爲fanout 54 channel.exchangeDeclare("logs", "fanout"); 55 //exchange是logs,表示發送到此Exchange上 56 //fanout類型的exchange,忽略routingKey,因此第二個參數爲空 57 channel.basicPublish("logs", "", null, message.getBytes()); 58 logger.debug(" [D] message sent:"+message); 59 } catch (IOException e) { 60 e.printStackTrace(); 61 } 62 } 63 }
在LogSender中,和以前的例子不同的地方是,咱們沒有直接聲明一個Queue,取而代之的是聲明瞭一個exchange
發佈消息時,第一個參數填了咱們聲明的exchange名字,routingKey留空,由於fanout類型忽略它。
在前面的例子中,咱們routingKey填的是隊列名,由於默認的exchange(exchange位空字符串時使用默認交換中心)會把隊列的routingKey設置爲queueName(聲明隊列的時候設置的,不是發送消息的時候),又是direct類型,因此能夠經過queueName當作routingKey找到隊列。
消費類 LogConsumer
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class LogConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(LogConsumer.class); 20 private ConnectionFactory factory; 21 private Connection connection; 22 private Channel channel; 23 24 /** 25 * 在構造函數中獲取鏈接 26 */ 27 public LogConsumer() { 28 super(); 29 try { 30 factory = new ConnectionFactory(); 31 factory.setHost("127.0.0.1"); 32 connection = factory.newConnection(); 33 channel = connection.createChannel(); 34 // 聲明exchange,防止生產者沒啓動,exchange不存在 35 channel.exchangeDeclare("logs","fanout"); 36 } catch (Exception e) { 37 logger.error(" [X] INIT ERROR!", e); 38 } 39 } 40 41 /** 42 * 提供個關閉方法,如今並無什麼卵用 43 * 44 * @return 45 */ 46 public boolean closeAll() { 47 try { 48 this.channel.close(); 49 this.connection.close(); 50 } catch (IOException | TimeoutException e) { 51 logger.error(" [X] CLOSE ERROR!", e); 52 return false; 53 } 54 return true; 55 } 56 57 /** 58 * 咱們更加關心的業務方法 59 */ 60 public void consume() { 61 try { 62 // 獲取一個臨時隊列 63 String queueName = channel.queueDeclare().getQueue(); 64 // 把剛剛獲取的隊列綁定到logs這個交換中心上,fanout類型忽略routingKey,因此第三個參數爲空 65 channel.queueBind(queueName, "logs", ""); 66 //定義一個Consumer,消費Log消息 67 Consumer consumer = new DefaultConsumer(channel) { 68 @Override 69 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 70 byte[] body) throws IOException { 71 String message = new String(body, "UTF-8"); 72 logger.debug(" [D] 我是來打印日誌的:"+message); 73 } 74 }; 75 //這裏自動確認爲true,接收到消息後該消息就銷燬了 76 channel.basicConsume(queueName, true, consumer); 77 } catch (IOException e) { 78 e.printStackTrace(); 79 } 80 } 81 }
複製一個項目,把72行改成以下代碼,表明兩個作不一樣工做的消費者
1 logger.debug(" [D] 我已經把消息寫到硬盤了:"+message);
消費者App
1 public class App 2 { 3 public static void main( String[] args ) 4 { 5 LogConsumer consumer = new LogConsumer(); 6 consumer.consume(); 7 } 8 }
生產者App
1 public class App { 2 public static void main( String[] args ) throws InterruptedException{ 3 LogSender sender = new LogSender(); 4 while(true){ 5 sender.sendMessage(System.nanoTime()+""); 6 Thread.sleep(1000); 7 } 8 } 9 }
把消費者打包成兩個可執行的jar包,方便觀察控制檯
用java -jar 命令執行,結果以下
本章介紹了RabbitMQ中消息的交換,再次強調,RabbitMQ中,消息是經過交換中心轉發到隊列的,不要被默認的exchange混淆,默認的exchange會自動把queue的名字設置爲它的routingKey,因此消息發佈時,才能經過queueName找到該隊列,其實此時queueName扮演的角色就是routingKey。
本教程是參考官方文檔寫出來的,後續章節會介紹更多RabbitMQ的相關知識以及項目中的實戰技巧