生產者,發送消息的一方,圖中左側的client。java
消費者,接收消息的一方,圖中後側的client。spring
消息中間件的服務節點,通常一個RabbitMQ Broker當作一臺RabbitMQ服務器。服務器
消息包含兩部分:消息體和標籤。消息體(payload)是一個帶有業務邏輯結構的數據,好比一個 JSON 字符串。消息的標籤用來表述這條消息 , 好比一個交換器的名稱和一個路由鍵。 生產者把消息交由 RabbitMQ, RabbitMQ 以後會根據標籤把消息發送給感興趣的消費者。app
鏈接實際上是一條TCP鏈接,若是是生產者仍是消費者都須要和Broker創建鏈接。ide
信道是創建在 Connection 之上的虛擬鏈接, RabbitMQ 處理的每條 AMQP 指令都是經過信道完成的。函數
爲何還要引入信道呢?試想這 樣一個場景, 一個應用程序中有不少個線程須要從 RabbitMQ 中消費消息,或者生產消息,那 麼必然須要創建不少個 Connection,也就是許多個 TCP 鏈接。然而對於操做系統而言,創建和 銷燬 TCP 鏈接是很是昂貴的開銷,若是遇到使用高峯,性能瓶頸也隨之顯現。 RabbitMQ 採用 相似 NIO' (Non-blocking 1/0) 的作法,選擇 TCP 鏈接複用,不只能夠減小性能開銷,同時也便於管理。性能
隊列是 RabbitMQ 的內部對象,用於存儲消息,當多個消費者能夠訂閱同一個隊列,這時隊列中的消息會被平均分攤(Round-Robin,即輪詢) 給多個消費者進行處理,而不是每一個消費者都收到全部的消息井處理。fetch
生產者將消息發送到 Exchange (交換器,一般也 能夠用大寫的 "X" 來表示),由交換器將消息路由到一個或者多個隊列中。若是路由不到,或 許會返回給生產者,或許直接丟棄。操作系統
生產者將消息發給交換器的時候, 通常會指定一個 RoutingKey,用 來指定這個消息的路由規則,而這個 RoutingKey 須要與交換器類型和綁定鍵 (BindingKey) 聯合使用才能最終生效。線程
RabbitMQ 中經過綁定將交換器與隊列關聯起來,在綁定的時候通常會指定一個綁定鍵 (BindingKey), 這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了。
生產者將消息發送給交換器時, 須要一個 RoutingKey, 當 BindingKey 和 RoutingKey 相匹配時, 消息會被路由到對應的隊列中。在綁定多個隊列到同一個交換器的時候, 這些綁定容許使用相同的 BindingKey。 BindingKey 並非在全部的狀況下都生效,它依賴於交換器類型, 比 如 fanout 類型的交換器就會無視 BindingKey, 而是將消息路由到全部綁定到該交換器的隊列中 。
每一個Vhost本質上是一個mini版的RabbitMQ服務器,擁有本身的隊列,交換機和綁定,它擁有本身的權限機制。
RabbitMQ 經常使用的交換器類型有 fanout、 direct、 topic、 headers 這四種。
它會把全部發送到該交換器的消息路由到全部與該交換器綁定的隊列中,至關於廣播模式。
把消息路由到那些 BindingKey 和 RoutingKey徹底匹配的隊列中。
將消息路由到 BindingKey 和 RoutingKey 相匹配的隊列中。
headers 類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。
package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = " routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection();// 建立鏈接 Channel channel = connection.createChannel(); // 建立信道 //建立一個 type="direct"、持久化的、非自動刪除的交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 建立一個持久化、非排他的、非自動刪除的隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 將交換器與隊列經過路由鍵綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 發送一條持久化的消息: Hello World! String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 關閉資源 channel.close(); connection.close(); } } package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitConsumer { private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[] { new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 這裏的鏈接方式與生產者的 demo 略有不一樣, 注意辨別區別 Connection connection = factory.newConnection(addresses); // 建立鏈接 final Channel channel = connection.createChannel(); // 建立信道 channel.basicQos(64); // 設置客戶端最多接收未被 ack 的消息的個數 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("recv message: " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); // 等待回調函數執行完畢以後, 關閉資源 TimeUnit.SECONDS.sleep(15); channel.close(); connection.close(); } }
exchange: 交換器的名稱。
type: 交換器的類型,常見的如 fanout、 direct、 topic
durable: 設置是否持久化。 durable 設置爲 true 表示持久化, 反之是非持久化。持 久化能夠將交換器存盤,在服務器重啓 的時候不會丟失相關信息。
autoDelete: 設置是否自動刪除。 autoDelete 設置爲 true 則表示自動刪除。自動 刪除的前提是至少有一個隊列或者交換器與這個交換器綁定, 以後全部與這個交換器綁
定的隊列或者交換器都與此解綁。注意不能錯誤地把這個參數理解爲: "當與此交換器 鏈接的客戶端都斷開時, RabbitMQ 會自動刪除本交換器"。
internal : 設置是不是內置的。若是設置爲 true,則表示是內置的交換器,客戶端程 序沒法直接發送消息到這個交換器中,只能經過交換器路由到交換器這種方式。
argument: 其餘一些結構化參數,好比 alternate-exchange (備份交換機)
Queue. DeclareOk queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Str工ng , Object> arguments) throws IOException;
queue : 隊列的名稱。
durable: 設置是否持久化。爲 true 則設置隊列爲持久化。持久化的隊列會存盤,在 服務器重啓的時候能夠保證不丟失相關信息。
exclusive: 設置是否排他。爲 true 則設置隊列爲排他的。若是一個隊列被聲明爲排 他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點:排他隊列是基於鏈接(Connection) 可見的,同一個鏈接的不一樣信道 (Channel) 是能夠同時訪問同一鏈接建立的排他隊列; "首次"是指若是一個鏈接己經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣:即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列 適用於一個客戶端同時發送和讀取消息的應用場景。
autoDelete: 設置是否自動刪除。爲 true 則設置隊列爲自動刪除。自動刪除的前提是: 至少有一個消費者鏈接到這個隊列,以後全部與這個隊列鏈接的消費者都斷開時,纔會自動刪除。不能把這個參數錯誤地理解爲: "當鏈接到此隊列的全部客戶端斷開時,這 個隊列自動刪除",由於生產者客戶端建立這個隊列,或者沒有消費者客戶端與這個隊列鏈接時,都不會自動刪除這個隊列。
argurnents: 設置隊列的其餘一些參數,如 x-rnessage-ttl 、 x-expires 、x-rnax-length、x-rnax-length-bytes 、x-dead-letter-exchange、x-dead-letter-routing-key, x-rnax-priority 等。
將隊列和交換器綁定起來
Queue . BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
queue: 隊列名稱:
exchange: 交換器的名稱:
routingKey: 用來綁定隊列和交換器的路由鍵;
argument: 定義綁定的一些參數。
將交換器與交換器綁定,綁定以後, 消息從 source 交 換器轉發到 destination 交換器,某種程度上來講 destination 交換器能夠看做一個隊列。
Exchange . BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
channel . exchangeDeclare( "source" , "direct", false, true, null) ; channel . exchangeDeclare("destination", "fanout" , false, true, null); channel.exchangeBind("destination" , "source" , "exKey"); channel . queueDeclare( "queue", false, false, true, null); channel . queueBind("queue" , " dest工nation " , "") ; channel.basicPublish( "source" , "exKey" , nul l , "exToExDemo". getBytes ()) ;
void basicPublish(String exchange, String routingKey, boo1ean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
exchange: 交換器的名稱,指明消息須要發送到哪一個交換器中 。 若是設置爲空字符串, 則消息會被髮送到 RabbitMQ 默認的交換器中。
routingKey: 路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中 。
props : 消息的基本屬性集,其包含 14 個屬性成員,分別有 contentType 、contentEncoding、 headers (Map<String , Object>) 、 deliveryMode、 priority、correlationld、 replyTo、 expiration、 messageld、 timestamp、 type、 userld、appld、 clusterld。
byte [] body: 消息體 (payload),真正須要發送的消息。
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String ,Object> arguments, Consumer callback) throws IOException;
queue: 隊列的名稱:
autoAck: 設置是否自動確認。建議設成 false,即不自動確認:
consumerTag: 消費者標籤,用來區分多個消費者:
noLocal : 設置爲 true 則表示不能將同一個 Connection中生產者發送的消息傳送給 這個 Connection 中的消費者:
exclusive : 設置是否排他:
arguments : 設置消費者的其餘參數:
callback: 設置消費者的回調函數。用來處理 RabbitMQ 推送過來的消息,好比 DefaultConsumer, 使用時須要客戶端重寫 (override) 其中的方法。
對於消費者客戶端來講重寫 handleDelivery 方法是十分方便的。更復雜的消費者客戶端會重寫更多的方法, 具體以下:
void handleConsumeOk(String consumerTag) ;
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) ;
void handleRecoverOk(String consumerTag);
容許限制信道上的消費者所能保持的最大 未確認消息的數量。
void basicQos(int prefetchSize, int prefetchCount, boo1ean global)
prefetchSize 這個參數表示消費者所能接收未確認消息的整體大小的上限,單位爲 B
prefetchSize 以確保發送的消息都沒有超過所限定的 prefetchCount 的值
例子:
channel .basicQos(3, false); // Per consumer limit
channel.basicQos(5, true); // Per channel limit
channel .basicConsume("queuel", false, consumerl) ;
channel.basicConsume("queue2", false, consumer2) ;
那麼這裏每一個消費者最多隻能收到 3 個未確認的消息,兩個消費者能收到的未確認的消息個數之和的上限爲 5。在未確認消息的狀況下,若是 consumerl 接收到了消息 1 、 2 和 3,那麼 consumer2至多隻能收到 11 和 12。若是像這樣同時使用兩種 global 的模式,則會增長 RabbitMQ 的負載,由於 RabbitMQ 須要更多的資源來協調完成這些限制。如無特殊須要,最好只使用 global 爲 false 的設置,這也是默認的設置。
採用消息確認機制後,只要設置 autoAck 參數爲 false,消費者就有足夠的時間處理消息 (任務),不用擔憂處理消息過程當中消費者進程掛掉後消息丟失的問題, 由於 RabbitMQ 會一直 等待持有消息直到消費者顯式調用 Basic.Ack 命令爲止。若是 RabbitMQ 一直沒有收到消費者的確認信號,而且消費此消息的消費者己經 斷開鏈接,則 RabbitMQ 會安排該消息從新進入隊列,等待投遞給下一個消費者,固然也有可 能仍是原來的那個消費者。當 autoAck 等於 true 時, RabbitMQ 會自動把發送出去的 消息置爲確認,而後從內存(或者磁盤)中刪除,而無論消費者是否真正地消費到了這些消息。
消息拒絕:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中 deliveryTag 能夠看做消息的編號 ,它是一個 64 位的長整型值,最大值是 9223372036854775807。若是 requeue 參數設置爲 true,則 RabbitMQ 會從新將這條消息存入 隊列,以即可以發送給下一個訂閱的消費者;若是 requeue 參數設置爲 false,則 RabbitMQ 當即會把消息從隊列中移除,而不會把它發送給新的消費者。
Basic.Reject 命令一次只能拒絕一條消息 ,若是想要批量拒絕消息 ,則可使用 Basic.Nack 這個命令。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中 deliveryTag 和 requeue 的含義能夠參考 basicReject 方法。 multiple 參數設置爲 false 則表示拒絕編號爲 deliveryT坷的這一條消息,這時候 basicNack 和 basicReject 方法同樣; multiple 參數設置爲 true 則表示拒絕 deliveryTag 編號以前所 有未被當前消費者確認的消息。
消息恢復:
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
channel.basicRecover 方法用來請求 RabbitMQ 從新發送還未被確認的消息。 若是 requeue 參數設置爲 true,則未被確認的消息會被從新加入到隊列中,這樣對於同一條消息 來講,可能會被分配給與以前不一樣的消費者。若是 requeue 參數設置爲 false,那麼同一條消 息會被分配給與以前相同的消費者。默認狀況下,若是不設置 requeue 這個參數,至關於channel.basicRecover(true) ,即 requeue 默認爲 true