RabbitMQ是消息代理:它接受並轉發消息。能夠將它視爲郵局:當你投遞郵件到一個郵箱,郵遞員終究會將郵件遞交給你的收件人。以此類推,RabbitMQ能夠是一個郵箱,一個郵局和一個郵遞員。
RabbitMQ與郵局之間的主要區別在於,它不處理紙張,而是接收,存儲和轉發數據消息的二進制數據。java
2.1 生產者(producer)與消費者(consumer)
生產者與消費者是RabbitMQ通訊過程當中的兩個重要的角色,至關於郵件的發送方與接收方,而RabbitMQ充當的角色就是傳遞消息的第三方,也就是說它是不能產生數據的。在實際應用中,生產者和消費者也是能夠角色互相轉換的,因此當咱們應用程序鏈接到 RabbitMQ 服務器時,必需要明確我是生產者仍是消費者。緩存
2.2 鏈接(Connections)
生產者生產消息以後,發佈到RabbitMQ以前,須要先鏈接RabbitMQ服務器吧,RabbitMQ支持的全部協議都是基於TCP的,而且爲了提升效率而採用長期鏈接(每一個協議操做不會打開新鏈接)。一個客戶端庫鏈接使用單個TCP鏈接。服務器
2.3 通道(Channels)
創建完鏈接以後,客戶端會基於該TCP鏈接的基礎之上,開闢一條AMQP通道,客戶端執行的每一個協議操做都在通道上發生,通道存在於鏈接中,而不是單獨存在的。關閉鏈接後,其上的全部通道也將關閉。
咱們也能夠在一條鏈接中開闢一條或多條AMQP通道,每條通道都會有一個惟一的ID來確保通道之間互不干預
網絡
2.4 隊列(Queues)
隊列是RabbitMQ服務器中消息的終點,相似於一個郵箱,能夠緩存消息;生產者向其中投遞消息,消費者從其中取出消息。負載均衡
2.5 交換機(Exchanges)
交換機用於接收生產者發送的消息,而且能處理消息,例如將消息遞交給某個隊列、遞交給全部隊列、或者將消息丟棄,它主要應用的有三種類型:
a、Fanout:廣播(扇形),將消息交給全部綁定到交換機的隊列
b、Direct:定向(直連),把消息交給符合指定routing_key 的隊列
c、Topic:通配符(主題),把消息交給符合routing pattern(路由模式) 的隊列3d
2.1 simple簡單模式
生產者與消費者爲一對一的關係,一條消息只能被一個消費者消費,若是此時沒有消費者,那麼消息會被暫時緩存在queue中,直至被消費代理
2.2 work工做模式
在work模型下,生產者與消費者爲一對多的關係,多個消費者能夠綁定到一個隊列,共同消費隊列中的消息code
queue能夠實現負載均衡,生產者將消息放入queue中後,RabbitMQ會經過輪詢的方式,實現消息的分配blog
經過消息確認機制(Acknowlege)實現。當消費者獲取消息後,會向RabbitMQ發送回執ACK,告知消息已經被接收。不過這種回執ACK分兩種狀況:rabbitmq
手動ACK:消息接收後,不會發送ACK,須要手動調用
這就要取決於消息的重要性,若是消息不過重要,丟失也沒有影響,那麼自動ACK會比較方便,反之,若是消息很是重要,不容丟失。那麼最好在消費完成後手動ACK,不然接收消息後就自動ACK,RabbitMQ就會把消息從隊列中刪除,若是此時消費者宕機,那麼消息就丟失了。
2.3 訂閱模型-Fanout
X表示交換機;在廣播模式下,消息發送流程是這樣的:
2.4 訂閱模型-Direct
在Direct模型下:
routing_key
(路由key)routing_key
。routing_key
進行判斷,只有隊列的routing_key
與消息的 routing_key
徹底一致,纔會接收到消息 2.5 訂閱模型-Topic
Topic類型的Exchange與Direct相比,都是能夠根據routing_key把消息路由到不一樣的隊列。只不過Topic類型Exchange可讓隊列在綁定routing_key的時候使用通配符!通配符規則:
#
:能夠替代零個或多個單詞。
*
:能夠代替一個單詞
圖解:
*.orange.*
的routing_key,它能匹配到以orange爲中心,先後各一個單詞的路由,例如:item.orange.insert
,可是該item.orange.insert.one
路由就不能被匹配C2:消費者,其所在隊列Q2綁定了lazy.#
的routing_key,它能匹配到任何以lazy.#
開頭的路由
//獲取鏈接 Connection connection = ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); //交換機持久化 ,第三個參數表明是否持久化 channel.exchangeDeclare(EXCHANGE_NAME,"EXCHANGE_TYPE",true); //隊列持久化 ,第二個參數表明是否持久化 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //消息持久化 ,第三個參數表明是否持久化 channel.basicPublish(EXCHANGE_NAME,routing_key,MessageProperties.PERSISTENT_TEXT_PLAIN,MESSAGE_TEXT.getBYtes());