吐血總結吧~由於太經常使用,由於是平常,由於即便是平常也並無多少人懂的不少,由於想掌握一個消息中間件,就是如此吧,很少說,我一篇,我主要講講基本的使用與方法調用的入參及其意義,其實,RabbitMQ就光是基礎的api調用,也並無多少人熟悉~java
想來想去,不知道如何開篇,我大概就把我學習到的一些比較重要的概念羅列出來吧,就算是一個記錄吧。下面是一些我想寫入的點面試
基本的安裝我不在我文章裏面介紹,這種東西一搜一大把,並且每一個平臺安裝模式也不同,例如個人mac,直接一條命令搞定。我看Linux下面還要先安裝erlang的虛擬機。這些並不是RabbitMQ的核心技術,想作運維,出門左轉。設計模式
RabbitMQ主要實現了AMQP這個很牛逼的協議。其實這種就相似於Redis,任何的Java這個層面的接口API調用,都是對應着相關協議的命令輸出的。這東西細想也簡單,無非就是咱們給RabbitMQ的server端下命令,讓他作什麼事兒。例如我讓他建立信道,我就發一個Channel.open的字符串過去,對方server端起了個socket監聽器,發現有數據,立刻處理,而後匹配到具體的命令處理器,相似於策略設計模式,使用一個Handler處理邏輯。本節我經過生產者與消費者的最簡單代碼,來窺探一下,到底RabbitMQ的流轉過程是怎麼樣的,讓你們有個初步的印象:哦原來,RabbitMQ大概是這樣的。api
先上基礎的最簡單的生產者代碼:安全
// 建立鏈接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel(); String msg = "21123123"; channel.basicPublish("ExchargeName", "RoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 關閉資源 channel.close(); connection.close();
接下倆咱們來看看流轉過程的用例圖:併發
其中,Product表明生產者節點,Consumer表明消費者節點,Broker表明RabbitMQ Server端的一個機器節點。以後再也不說明運維
一樣,咱們先來一個最簡單的消費者的代碼模板:異步
// 建立鏈接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){} channel.basicQos(64); channel.basicConsume("queueName",consumer); // 關閉資源 channel.close(); connection.close();
下面是消費者的流轉圖:socket
請細看這兩個時序圖,與圖中的標記,其中對應了具體哪一個JavaAPI調用,對應了哪一個AMQP的消息協議命令。不能,就是稍微多了點步驟。接下來,我會在遇到相關AMQP的命令時候,再繼續介紹,就不集中介紹所有的AMQP協議命令了。ide
我本身感受,集中所有介紹沒用,由於是我的根本無法立刻記住,即便我給你十天記住了,而後1天不用就忘了。說白了,仍是要緊扣實戰的編碼,單獨研究協議,枯燥且沒意義,即便面試,人家也不會專門問你命令。
發現,AMQP的協議中,我大體看了兩個實現(RabbitMQ和Qpid),對這幾個基礎的構件,都是提早談起的,可見,AMQP就是根據這幾個構件來組織的,我就不經過大量文字解說這幾個構件了,儘可能經過圖片來展現,好懂一些。
主要的構件包括:
爲了說明隊列,先上一個虛擬的圖(由於真實狀況,消息不會直接到隊列的):
其實,咱們使用生產者,pulish一條消息,首先發送到的,就是交換機這個實體裏面,並不是隊列,而後經過各類綁定關係再路由到具體的隊列中,下一小節,咱們就講這個路由的問題。首先來看看交換機的示例圖:
交換機類型:
具體的RoutingKey和BindingKey如何區分下面咱們來講,即便咱們沒有指定具體的交換機發送給一個queue,RabbitMQ服務端也會隨機生成一個交換機,後面會介紹到
下面是RoutingKey和BindingKey的相關圖例:
直接上具體的區分方法吧:
這一部分讓咱們開始寫代碼,看看如何用RabbitMQ來發送與接收消息,而且接口參數都是什麼意思,畢竟發送與接收代碼中的參數仍是挺多的,代碼不算整潔,咱們來看看爲啥不整潔,爲啥要用這些個參數。
兩種方式來建立:
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("root"); connectionFactory.setPassword("root123"); connectionFactory.setVirtualHost("/root"); // 建立鏈接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel();
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri("amqp://root:root123@127.0.0.1:5672/root"); // 建立鏈接 Connection connection = connectionFactory.newConnection(); // 建立信道 Channel channel = connection.createChannel();
使用注意事項:
對應的方式是:exchangeDeclare。先來看看Java中API客戶端的源碼,發現註釋已經介紹的很清楚了:
/** * 激活一個不自動刪除, 不持久化,且沒有拓展參數的交換機 * @param exchange 交換機名稱 * @param type 交換機類型 * @return 確認交換機聲明是否成功 * @throws java.io.IOException io異常 */ Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; /** * 激活一個不自動刪除, 不持久化,且沒有拓展參數的交換機 * @param exchange 交換機名稱 * @param type 交換機類型(封裝了個enum給你用) * @return 確認交換機聲明是否成功 * @throws java.io.IOException io異常 */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException; /** * 激活一個不自動刪除, 且沒有拓展參數的交換機 * @param exchange 交換機名稱 * @param type 交換機類型 * @param durable true表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機) */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; /** * 激活一個不自動刪除, 且沒有拓展參數的交換機 * @param exchange 交換機名稱 * @param type 交換機類型(封裝了個enum給你用) * @param durable true表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機) */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException; /** * 聲明一個交換機 * @param exchange 交換機名稱 * @param type 交換機類型 * @param durable ttrue表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機) * @param autoDelete true表示若是沒有被使用,這個交換機會自動刪除 * @param arguments 其餘的一個些屬性 */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * 聲明一個交換機 * @param exchange 交換機名稱 * @param type 交換機類型(封裝了個enum給你用) * @param durable ttrue表示要持久化這個交換機 (在服務端重啓的時候還會存在這個交換機) * @param autoDelete true表示若是沒有被使用,這個交換機會自動刪除 * @param arguments 其餘的一個些屬性 */ Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * @param internal true表示這個交換機是一個內部的, 不能直接使用客戶端的api進行pubulish操做 */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; /** * 和前面的方式差很少,可是沒有返回值,且無論服務端是否有response */ void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; /** * 被動的聲明一個交換機; 判斷這個名字的交換機是否存在 * @throws IOException 若是交換機不存在,會報404的異常 */ Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException; /** * 刪除一個交換機 * @param exchange 交換機名稱 * @param ifUnused true表示肯定這個交換機是否被使用,只有徹底不被使用的狀況下,才能刪除 * @return a deletion-confirm method to indicate the exchange was successfully deleted * @throws java.io.IOException if an error is encountered */ Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException; /** * 一樣也是刪除一個交換機,可是沒有返回值,不等待服務端的返回確認 */ void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
其實直接看源碼,一切已經很明瞭了,下面我對極個別一些注意點說說:
使用的方法是queueDeclare,對於隊列的聲明,重載方法少不少,一樣的咱們直接來看看源碼與源碼中的註釋:
/** * 聲明一個服務端命名的, 本鏈接獨佔的, 自動刪除的, 不持久化的隊列 * 建立以後隊列的名稱,能夠從返回對象Queue.DeclareOk中的"queue"字段獲取. */ Queue.DeclareOk queueDeclare() throws IOException; /** * 聲明一個隊列 * @param queue 隊列的名稱 * @param durable true表示這個隊列要作持久化(服務端重啓以後,這個隊列同樣存在) * @param exclusive true表示這個隊列是一個獨佔的(被限制只能當前connection使用) * @param autoDelete true表示這個隊列是自動刪除的 (服務端會在這個隊列不被使用的時候將其刪除) * @param arguments 隊列建立時候的其餘參數 */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * 相似於queueDeclare方法,可是沒有返回值,而且無論服務端是否有回覆,聲明是否成功 */ void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; /** * 聲明一個隊列,若是隊列不存在,將會拋出一個異常,而且關閉Channel */ Queue.DeclareOk queueDeclarePassive(String queue) throws IOException; /** * 刪除一個隊列, 無論是否這個隊列被使用或者裏面有消息 * @param queue 隊列名稱 */ Queue.DeleteOk queueDelete(String queue) throws IOException; /** * 刪除一個隊列 * @param queue 隊列名稱 * @param ifUnused true表示這個隊列只有不被使用時候才刪除 * @param ifEmpty true表示只有這個隊列爲空的時候才刪除 */ Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; /** * 和queueDelete(String, boolean, boolean)相似,可是沒有返回值,且不等服務端返回 */ void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
一樣的,介紹幾個重點問題:
這一部分,主要是將咱們聲明的交換機與隊列創建關聯的一個動做,涉及到的方法有:
下面咱們來看看源碼與註釋:
/** * 將一個隊列綁定到交換機上面, 不帶任何拓展參數 * @param queue 隊列名稱 * @param exchange 交換機名稱 * @param routingKey 這裏其實就是綁定的鍵 */ Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; /** * 將一個隊列綁定到交換機上面 * @param queue 隊列名稱 * @param routingKey 綁定鍵 * @param arguments 其餘的拓展參數 */ Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; /** * 無論是否綁定成功就返回的綁定方法 */ void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; /** * 將隊列與一個交換機經過綁定鍵綁定在一塊兒的關係解除 * @param queue 隊列名稱 * @param exchange 交換機名稱 * @param routingKey 綁定鍵 */ Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException; Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; /** * 將一個交換機綁定到另一個交換機上面 * @param destination 這個是消息要到達的交換機名稱 * @param source 這個是消息從哪裏來的交換機名稱 * @param routingKey 綁定鍵 */ Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException; Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException; void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
主要使用的參數,經過註釋已經很明瞭了,其餘的注意點暫時沒有。
發送消息這裏對應的方法,主要就是basicPublish這個方法,先來看看他的API接口:
/** * 發佈一個消息到服務端。 * * 若是發佈的消息到交換機不存在,會拋出一個channel-level protocol的異常,而且會關閉這個channel * * @param exchange 消息要發佈到的交換機 * @param routingKey 這個是路由鍵(會匹配具體的綁定鍵) * @param 一個消息其餘的屬性 - 例如headers * @param body 消息體 */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; /** * 發佈一個消息到服務端。 * * @param exchange 消息要發佈到的交換機 * @param routingKey 這個是路由鍵 * @param mandatory 這個後面文章會介紹 * @param 一個消息其餘的屬性 - 例如headers * @param 消息體 */ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; /** * 發佈一個消息到服務端。 * * @param mandatory 後面文章介紹 * @param immediate 後面文章介紹 */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
這裏能夠看到的是,mandatory、immediate這兩個入參很關鍵,後面會專門講,由於這兩個入參關係到了消息的可靠性和穩定性。下面是一些能夠添加屬性的代碼示例:
channel.basicPublish("ExchargeName", "RoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.basicPublish("ExchargeName", "RoutingKey", new AMQP.BasicProperties.Builder() .contentType("test/plain") .deliveryMode(2) .priority(1) .userId("hiden") .build(), msg.getBytes());
下面介紹下經過Builder模式構建發消息時候的屬性,有助於瞭解消息的可靠性:
消費消息來講,相比較來講要比單純的接口調用要複雜點,由於涉及到方法的註冊等,還存在兩種消費模式:
在推模式中,能夠經過持續訂閱方式來消費消息,主要經過兩個類:
設計這兩個消費對象的類的方法是:
/** * 啓動一個非本地, 非排他, 要手動ack的, 服務端自定義的消費者標籤名稱的消費者 */ String basicConsume(String queue, Consumer callback) throws IOException; /** * 啓動一個非本地, 非排他, 服務端自定義的消費者標籤名稱的消費者 */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; /** * 啓動一個非本地, 非排他, 服務端自定義的消費者標籤名稱的消費者(固然帶了個參數map) */ String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException; /** * 啓動一個非本地, 非排他的消費者(固然帶了個參數map) * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param consumerTag a client-generated consumer tag to establish context * @param callback an interface to the consumer object * @return the consumerTag associated with the new consumer * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; /** * 啓動一個消費者 * @param queue 隊列名稱 * @param autoAck 消費者是否自動確認 * @param consumerTag 消費者標籤 * @param noLocal true表示不能把同一個connection中生產者發送的消息傳送給這個connection中的消費者 * @param exclusive 是不是拍他的消費者 * @param arguments 設置消費者的其餘參數 * @param callback 設置消費者的回調函數,用來處理RabbitMQ推送過來的消息 */ String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
下面是一段消費者的真實代碼:
channel.basicQos(64); channel.basicConsume("queueName", false, "tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); String msg = new String(body, "UTF-8"); // 這裏進行消息處理 channel.basicAck(deliveryTag,false); } });
具體的DefaultConsumer類裏面還有其餘的幾個方法,對應不一樣的消費動做回調的函數:
若是手動調用了Channel#basicCancel,觸發方法的序列是:
另外還要注意的是:這些個被觸發的對象都被分配到了不一樣的線程池上,因此要注意線程安全性的問題
拉模式,有別於推模式的被動獲取消息,咱們能夠直接在業務邏輯中,主動獲取RabbitMQ服務端中的消息,下面是總體的時序圖
對應的代碼是:
GetResponse response = channel.basicGet("QueueName", false); System.out.println(response.getBody()); channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
比較下,推模式和拉模式:
這裏我自我感受很關鍵,由於涉及到消息消費的可靠性。首先讓咱們來看看ack機制:
總體上,RabbitMQ的服務端存在兩種隊列:
另外,RabbitMQ不會主動刪除或者過時未確認的消息,判斷是否從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開。若是長時間沒回復ack,直到鏈接斷開,此消息會從新進入隊列,等待投遞給下一個消費者,固然也多是當前消費者。
另一個點,是咱們能夠直接拒絕一個消息,使用的方法是:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
第一個參數好理解,重點說下第二個,requeue:
還有個方法,能夠請求RabbitMQ從新發送還未被確認的消息,有下面兩個方法
Basic.RecoverOk basicRecover() throws IOException; Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
再次涉及到requeue參數:
關閉涉及到的有兩個對象的關閉:
關閉的方法有:
void close()
void close(int closeCode, String closeMessage)
整個AMQP協議中connection與channel聲明週期以下:
固然咱們能夠註冊關閉以後觸發的監聽器:
connection.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException cause) { // 業務邏輯 Method reason = cause.getReason(); } });
接下倆大概會有兩篇,一篇是功能性的拔高,一篇是RabbitMQ的原理相關,若是可能我會說說集羣構建與鏡像