消息隊列中間件是指利用高效可靠地消息傳遞機制傳遞消息。有兩種傳遞模式:點對點模式、發佈/訂閱模式。流行的消息中間件有RabblitMQ、Kafka、RockerMQ。它們都提供了基於存儲和轉發的應用程序之間的異步數據發送,即應用程序彼此不直接通訊,而是與做爲中介的消息中間件通訊。數組
RabbitMQ的總體模型架構如圖。RabbitMQ的組成由 生產者、交換器、綁定、隊列、消費者組成。服務器
接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。架構
實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host異步
生產者和消費者都須要和RabbitMQ Broker創建鏈接,鏈接是TCP鏈接。一旦TCP鏈接創建起來,客戶端緊接着建立一個AMQP信道(Channel)。信道是創建在TCP Connection之上的虛擬鏈接,RabbitMQ處理每條AMQP指令都是經過信道完成的。由於創建和銷燬TCP鏈接開銷大,因此選擇TCP鏈接複用,減小開銷。ui
生產者:創造消息,發佈到RabbitMQ中。消息包含兩個部分:標籤和消息體。標籤是爲了描述這條消息,生產者把消息交由RabbitMQ以後會根據標籤把消息發送給感興趣的消費者。在消息路由的過程當中,消息的標籤會丟棄,存入到隊列中的消息只有消息體,spa
消費者:接受消息。消費者鏈接到RabbitMQ 服務器,並訂閱到隊列上。消費者只能獲得消息體,也就不知道消息的生產者是誰。code
路由鍵:生產者將消息發送給交換器的時候,指定RoutingKey.中間件
綁定鍵:經過綁定鍵將交換器和隊列聯繫起來。以下圖blog
生產者將消息發送到交換器,由交換器將消息路由到一個或多個對列中。若是路由不到,直接丟棄消息或者返回給生產者。隊列是生產者和消費者傳遞消息的一箇中介,全部消息都必須經過交換器將消息放入隊列中,不能直接將消息放到隊列中。隊列
交換器類型:
Fanout:會將全部發送到交換器的消息路由到全部與改交換器綁定的隊列中。這種狀況BingKey和RoutingKey至關於不起做用。
Direct:會把消息路由到BindingKey和RoutingKey徹底匹配的隊列中。以下圖 消息只會進入隊列一中。
Topic:會按照必定規則將BindingKey和RoutingKey相匹配的隊列中。BindingKey能夠存在兩種特殊字符串"*"和"#"。「#」號用於匹配一個單詞,「*」匹配多個單詞。
Headers:Headers類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
路由鍵、交換器類型、綁定鍵三者共同決定了消息進入哪些隊列中。
用於存儲消息。RabbitMQ中消息只能存儲在隊列後中。多個消費者能夠訂閱同一個隊列,隊列中的消息會被平均分攤給多個消費者。RabbitMQ 不支持隊列層面的廣播消費。
附上一張完整的結構圖:
在生產者發送消息到消費者消費消息的流程中,有兩個地方須要消費確認:
爲了保證,生產者的消息到達RabbitMQ,能夠經過事務機制和發送方確認機制實現。
Channel.TxSelect 將當前信道設置成事務模式
Channel.TxCommit 提交事務
Channel.TxRollback 事務回滾
生產者將信道設置成confirm(確認)模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從l開始),一旦消息被投遞到全部匹配的隊列以後,RabbitMQ 就會發送一個確認(Basic.Ack) 給生產者(包含消息的惟一ID) ,這就使得生產者知曉消息已經正確到達了目的地了(如上圖的流程1)。若是消息和隊列是可持久化的,那麼確認消息會在消息寫入磁盤以後發出。
生產者調用channel.ConfirmSelect將信道設置爲confirm模式,事務機制和Publisher confirm機制確保的是消息可以正確地發送至RabbitMQ,這裏的「發送至RabbitMQ」的含義指消息被正確地發送到交換器。
事務機制在一條消息發送以後會使發送端阻塞,以等待RabbitMQ 的迴應,以後才能繼續發送下一條消息。相比之下, 發送方確認機制最大的好處在於它是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用程序即可以經過回調方法來處理該確認消息。
爲了保證,隊列中發出的消息被消費者消費,RabbitMQ提供了消息確認機制。
消費者訂閱隊列時,能夠指定autoAck參數,autoAck等於false,RabbitMQ會等待消費者顯示地回覆確認信號才能從隊列後中刪除(如上圖的流程2)。autoAck等於true,會在消息發送去後刪除,無論消費者是否真正消費到這條消息。當autoAck 參數置爲false ,對於RabbitMQ 服務端而言,隊列中的消息分紅了兩個部分:一部分是等待投遞給消費者的消息、一部分是己經投遞給消費者,可是尚未收到消費者確認信號的消息。
若是RabbitMQ 一直沒有收到消費者的確認信號,而且消費此消息的消費者己經斷開鏈接,則RabbitMQ 會安排該消息從新進入隊列,等待投遞給下一個消費者,固然也有可能仍是原來的那個消費者。RabbitMQ 不會爲未確認的消息設置過時時間,它判斷此消息是否須要從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否己經斷開。
ConnectionFactory factory = new ConnectionFactory { UserName = "admin", Password = "admin", HostName = "118.21.96.213" }; var connection = factory.CreateConnection();
var channel = connection.CreateModel();
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments); //聲明交換器後,不須要等待交換器返回。但若是服務器未完成建立,而客戶端使用了這個交換器,會發生異常。 void ExchangeDeclareNoWait(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
void ExchangeDelete(string exchange, bool ifUnused); void ExchangeDeleteNoWait(string exchange, bool ifUnused);
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
void QueueDeclareNoWait(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
若是隊列爲排他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。排他隊列是基於鏈接(Connection)可見的,同一個鏈接的不一樣信道(Channel)是能夠同事訪問同一鏈接建立的排他隊列。 「首次」是指若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的。即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,改排他隊列都會被自動刪除。
自動刪除的前提是:至少有一個消費者鏈接到這個隊列,以後全部與這個隊列鏈接的消費者都斷開時,纔會自動刪除。
//返回隊列刪除期間清除的消息數
uint QueueDelete(string queue, bool ifUnused, bool ifEmpty); void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments); void QueueBindNoWait(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
void ExchangeBind(string destination, string source, string routingKey, IDictionary<string, object> arguments); void ExchangeBindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments);
void ExchangeUnbind(string destination, string source, string routingKey, IDictionary<string, object> arguments);
void ExchangeUnbindNoWait(string destination, string source, string routingKey, IDictionary<string, object> arguments);
消息從source交換器發送到destination交換器中。
void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body);