RabbitMQ-Exchange交換器

交換器分類

RabbitMQ的Exchange(交換器)分爲四類:java

  • direct(默認)
  • headers
  • fanout
  • topic

其中headers交換器容許你匹配AMQP消息的header而非路由鍵,除此以外headers交換器和direct交換器徹底一致,但性能卻不好,幾乎用不到,因此咱們本文也不作講解。緩存

注意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途建立的隊列,獲取不到以前的消息。ide

一、direct交換器

direct爲默認的交換器類型,也很是的簡單,若是路由鍵匹配的話,消息就投遞到相應的隊列,如圖:性能

使用代碼:channel.basicPublish("", QueueName, null, message)推送direct交換器消息到對於的隊列,空字符爲默認的direct交換器,用隊列名稱當作路由鍵。this

direct交換器代碼示例spa

發送端:線程

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨佔模式;參數四:消費者斷開鏈接時是否刪除隊列;參數五:消息其餘參數】 channel.queueDeclare(config.QueueName, false, false, false, null); String message = String.format("當前時間:%s", new Date().getTime()); // 推送內容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其餘屬性-路由的headers信息;參數四:消息主體】 channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));

接收端,持續接收消息:日誌

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨佔模式;參數四:消費者斷開鏈接時是否刪除隊列;參數五:消息其餘參數】 channel.queueDeclare(config.QueueName, false, false, false, null); Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); // 消息正文 System.out.println("收到消息 => " + message); channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量確認小於當前id的消息】 } }; channel.basicConsume(config.QueueName, false, "", defaultConsumer);

接收端,獲取單條消息code

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息確認

持續消息獲取使用:basic.consume;單個消息獲取使用:basic.get。orm

注意:不能使用for循環單個消息消費來替代持續消息消費,由於這樣性能很低;

公平調度

當接收端訂閱者有多個的時候,direct會輪詢公平的分發給每一個訂閱者(訂閱者消息確認正常),如圖:

消息的發後既忘特性

發後既忘模式是指接受者不知道消息的來源,若是想要指定消息的發送者,須要包含在發送內容裏面,這點就像咱們在信件裏面註明本身的姓名同樣,只有這樣才能知道發送者是誰。

消息確認

看了上面的代碼咱們能夠知道,消息接收到以後必須使用channel.basicAck()方法手動確認(非自動確認刪除模式下),那麼問題來了。

消息收到未確認會怎麼樣?

若是應用程序接收了消息,由於bug忘記確認接收的話,消息在隊列的狀態會從「Ready」變爲「Unacked」,如圖:

若是消息收到卻未確認,Rabbit將不會再給這個應用程序發送更多的消息了,這是由於Rabbit認爲你沒有準備好接收下一條消息。

此條消息會一直保持Unacked的狀態,直到你確認了消息,或者斷開與Rabbit的鏈接,Rabbit會自動把消息改完Ready狀態,分發給其餘訂閱者。

固然你能夠利用這一點,讓你的程序延遲確認該消息,直到你的程序處理完相應的業務邏輯,這樣能夠有效的防治Rabbit給你過多的消息,致使程序崩潰。

消息確認Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

channel.basicAck(long deliveryTag, boolean multiple)爲消息確認,參數1:消息的id;參數2:是否批量應答,true批量確認小於次id的消息。

總結:消費者消費的每條消息都必須確認。

消息拒絕

消息在確認以前,能夠有兩個選擇:

選擇1:斷開與Rabbit的鏈接,這樣Rabbit會從新把消息分派給另外一個消費者;

選擇2:拒絕Rabbit發送的消息使用channel.basicReject(long deliveryTag, boolean requeue),參數1:消息的id;參數2:處理消息的方式,若是是true,Rabbib會從新分配這個消息給其餘訂閱者,若是設置成false的話,Rabbit會把消息發送到一個特殊的「死信」隊列,用來存放被拒絕而不從新放入隊列的消息。

消息拒絕Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒絕

二、fanout交換器——發佈/訂閱模式

fanout有別於direct交換器,fanout是一種發佈/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到全部附加到這個交換器的隊列上。

好比用戶上傳了本身的頭像,這個時候圖片須要清除緩存,同時用戶應該獲得積分獎勵,你能夠把這兩個隊列綁定到圖片上傳的交換器上,這樣當有第三個、第四個上傳完圖片須要處理的需求的時候,原來的代碼能夠不變,只須要添加一個訂閱消息便可,這樣發送方和消費者的代碼徹底解耦,並能夠垂手可得的添加新功能了。

和direct交換器不一樣,咱們在發送消息的時候新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器。

發送端:

final String ExchangeName = "fanoutec"; // 交換器名稱 Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器 String message = "時間:" + new Date().getTime(); channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));

接受消息不一樣於direct,咱們須要聲明fanout路由器,並使用默認的隊列綁定到fanout交換器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器 String queueName = channel.queueDeclare().getQueue(); // 聲明隊列 channel.queueBind(queueName, ExchangeName, ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); } }; channel.basicConsume(queueName, true, consumer);

fanout和direct的區別最多的在接收端,fanout須要綁定隊列到對應的交換器用於訂閱消息。

其中channel.queueDeclare().getQueue()爲隨機隊列,Rabbit會隨機生成隊列名稱,一旦消費者斷開鏈接,該隊列會自動刪除。

注意:對於fanout交換器來講routingKey(路由鍵)是無效的,這個參數是被忽略的。

三、topic交換器——匹配訂閱模式

最後介紹的是topic交換器,topic交換器運行和fanout相似,可是能夠更靈活的匹配本身想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。

假設咱們如今有一個日誌系統,會把全部日誌級別的日誌發送到交換器,warning、log、error、fatal,但咱們只想處理error以上的日誌,要怎麼處理?這就須要使用topic路由器了。

topic路由器的關鍵在於定義路由鍵,定義routingKey名稱不能超過255字節,使用「.」做爲分隔符,例如:com.mq.rabbit.error。

消費消息的時候routingKey可使用下面字符匹配消息:

  • "*"匹配一個分段(用「.」分割)的內容;
  • "#"匹配0和多個字符;

例如發佈了一個「com.mq.rabbit.error」的消息:

能匹配上的路由鍵:

  • cn.mq.rabbit.*
  • cn.mq.rabbit.#
  • #.error
  • cn.mq.#
  • #

不能匹配上的路由鍵:

  • cn.mq.*
  • *.error
  • *

因此若是想要訂閱全部消息,可使用「#」匹配。

注意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途建立的隊列,獲取不到以前的消息。

發佈端:

String routingKey = "com.mq.rabbit.error"; Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器 String message = "時間:" + new Date().getTime(); channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器 String queueName = channel.queueDeclare().getQueue(); // 聲明隊列 String routingKey = "#.error"; channel.queueBind(queueName, ExchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(routingKey + "|接收消息 => " + message); } }; channel.basicConsume(queueName, true, consumer);

擴展部分—自定義線程池

若是須要更大的控制鏈接,用戶可本身設置線程池,代碼以下:

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es);

其實看過源碼的同窗可能知道,factory.newConnection自己默認也有線程池的機制,ConnectionFactory.class部分源碼以下:

private ExecutorService sharedExecutor; public Connection newConnection() throws IOException, TimeoutException { return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); } public void setSharedExecutor(ExecutorService executor) { this.sharedExecutor = executor; }

其中this.sharedExecutor就是默認的線程池,能夠經過setSharedExecutor()方法設置ConnectionFactory的線程池,若是不設置則爲null。

用戶若是本身設置了線程池,像本小節第一段代碼寫的那樣,那麼當鏈接關閉的時候,不會自動關閉用戶自定義的線程池,因此用戶必須本身手動關閉,經過調用shutdown()方法,不然可能會阻止JVM的終止。

官方的建議是隻有在程序出現嚴重性能瓶頸的時候,才應該考慮使用此功能。

相關文章
相關標籤/搜索