RabbitMQ的Exchange(交換器)分爲四類:java
其中headers交換器容許你匹配AMQP消息的header而非路由鍵,除此以外headers交換器和direct交換器徹底一致,但性能卻不好,幾乎用不到,因此咱們本文也不作講解。緩存
注意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途建立的隊列,獲取不到以前的消息。ide
direct爲默認的交換器類型,也很是的簡單,若是路由鍵匹配的話,消息就投遞到相應的隊列,如圖:性能
使用代碼:channel.basicPublish("", QueueName, null, message)推送direct交換器消息到對於的隊列,空字符爲默認的direct交換器,用隊列名稱當作路由鍵。this
direct交換器代碼示例spa
發送端:線程
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨佔模式;參數四:消費者斷開鏈接時是否刪除隊列;參數五:消息其餘參數】 channel.queueDeclare(config.QueueName, false, false, false, null); String message = String.format("當前時間:%s", new Date().getTime()); // 推送內容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其餘屬性-路由的headers信息;參數四:消息主體】 channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
接收端,持續接收消息:日誌
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨佔模式;參數四:消費者斷開鏈接時是否刪除隊列;參數五:消息其餘參數】 channel.queueDeclare(config.QueueName, false, false, false, null); Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); // 消息正文 System.out.println("收到消息 => " + message); channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量確認小於當前id的消息】 } }; channel.basicConsume(config.QueueName, false, "", defaultConsumer);
接收端,獲取單條消息code
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息確認
持續消息獲取使用:basic.consume;單個消息獲取使用:basic.get。orm
注意:不能使用for循環單個消息消費來替代持續消息消費,由於這樣性能很低;
公平調度
當接收端訂閱者有多個的時候,direct會輪詢公平的分發給每一個訂閱者(訂閱者消息確認正常),如圖:
消息的發後既忘特性
發後既忘模式是指接受者不知道消息的來源,若是想要指定消息的發送者,須要包含在發送內容裏面,這點就像咱們在信件裏面註明本身的姓名同樣,只有這樣才能知道發送者是誰。
看了上面的代碼咱們能夠知道,消息接收到以後必須使用channel.basicAck()方法手動確認(非自動確認刪除模式下),那麼問題來了。
消息收到未確認會怎麼樣?
若是應用程序接收了消息,由於bug忘記確認接收的話,消息在隊列的狀態會從「Ready」變爲「Unacked」,如圖:
若是消息收到卻未確認,Rabbit將不會再給這個應用程序發送更多的消息了,這是由於Rabbit認爲你沒有準備好接收下一條消息。
此條消息會一直保持Unacked的狀態,直到你確認了消息,或者斷開與Rabbit的鏈接,Rabbit會自動把消息改完Ready狀態,分發給其餘訂閱者。
固然你能夠利用這一點,讓你的程序延遲確認該消息,直到你的程序處理完相應的業務邏輯,這樣能夠有效的防治Rabbit給你過多的消息,致使程序崩潰。
消息確認Demo:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
channel.basicAck(long deliveryTag, boolean multiple)爲消息確認,參數1:消息的id;參數2:是否批量應答,true批量確認小於次id的消息。
總結:消費者消費的每條消息都必須確認。
消息在確認以前,能夠有兩個選擇:
選擇1:斷開與Rabbit的鏈接,這樣Rabbit會從新把消息分派給另外一個消費者;
選擇2:拒絕Rabbit發送的消息使用channel.basicReject(long deliveryTag, boolean requeue),參數1:消息的id;參數2:處理消息的方式,若是是true,Rabbib會從新分配這個消息給其餘訂閱者,若是設置成false的話,Rabbit會把消息發送到一個特殊的「死信」隊列,用來存放被拒絕而不從新放入隊列的消息。
消息拒絕Demo:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null); GetResponse resp = channel.basicGet(config.QueueName, false); String message = new String(resp.getBody(), "UTF-8"); channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒絕
fanout有別於direct交換器,fanout是一種發佈/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到全部附加到這個交換器的隊列上。
好比用戶上傳了本身的頭像,這個時候圖片須要清除緩存,同時用戶應該獲得積分獎勵,你能夠把這兩個隊列綁定到圖片上傳的交換器上,這樣當有第三個、第四個上傳完圖片須要處理的需求的時候,原來的代碼能夠不變,只須要添加一個訂閱消息便可,這樣發送方和消費者的代碼徹底解耦,並能夠垂手可得的添加新功能了。
和direct交換器不一樣,咱們在發送消息的時候新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器。
發送端:
final String ExchangeName = "fanoutec"; // 交換器名稱 Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器 String message = "時間:" + new Date().getTime(); channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));
接受消息不一樣於direct,咱們須要聲明fanout路由器,並使用默認的隊列綁定到fanout交換器上。
接收端:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 聲明fanout交換器 String queueName = channel.queueDeclare().getQueue(); // 聲明隊列 channel.queueBind(queueName, ExchangeName, ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); } }; channel.basicConsume(queueName, true, consumer);
fanout和direct的區別最多的在接收端,fanout須要綁定隊列到對應的交換器用於訂閱消息。
其中channel.queueDeclare().getQueue()爲隨機隊列,Rabbit會隨機生成隊列名稱,一旦消費者斷開鏈接,該隊列會自動刪除。
注意:對於fanout交換器來講routingKey(路由鍵)是無效的,這個參數是被忽略的。
最後介紹的是topic交換器,topic交換器運行和fanout相似,可是能夠更靈活的匹配本身想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。
假設咱們如今有一個日誌系統,會把全部日誌級別的日誌發送到交換器,warning、log、error、fatal,但咱們只想處理error以上的日誌,要怎麼處理?這就須要使用topic路由器了。
topic路由器的關鍵在於定義路由鍵,定義routingKey名稱不能超過255字節,使用「.」做爲分隔符,例如:com.mq.rabbit.error。
消費消息的時候routingKey可使用下面字符匹配消息:
例如發佈了一個「com.mq.rabbit.error」的消息:
能匹配上的路由鍵:
不能匹配上的路由鍵:
因此若是想要訂閱全部消息,可使用「#」匹配。
注意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途建立的隊列,獲取不到以前的消息。
發佈端:
String routingKey = "com.mq.rabbit.error"; Connection conn = connectionFactoryUtil.GetRabbitConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器 String message = "時間:" + new Date().getTime(); channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));
接收端:
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 聲明topic交換器 String queueName = channel.queueDeclare().getQueue(); // 聲明隊列 String routingKey = "#.error"; channel.queueBind(queueName, ExchangeName, routingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(routingKey + "|接收消息 => " + message); } }; channel.basicConsume(queueName, true, consumer);
若是須要更大的控制鏈接,用戶可本身設置線程池,代碼以下:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es);
其實看過源碼的同窗可能知道,factory.newConnection自己默認也有線程池的機制,ConnectionFactory.class部分源碼以下:
private ExecutorService sharedExecutor; public Connection newConnection() throws IOException, TimeoutException { return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort()))); } public void setSharedExecutor(ExecutorService executor) { this.sharedExecutor = executor; }
其中this.sharedExecutor就是默認的線程池,能夠經過setSharedExecutor()方法設置ConnectionFactory的線程池,若是不設置則爲null。
用戶若是本身設置了線程池,像本小節第一段代碼寫的那樣,那麼當鏈接關閉的時候,不會自動關閉用戶自定義的線程池,因此用戶必須本身手動關閉,經過調用shutdown()方法,不然可能會阻止JVM的終止。
官方的建議是隻有在程序出現嚴重性能瓶頸的時候,才應該考慮使用此功能。