我是在解決分佈式事務的一致性問題時瞭解到RabbitMQ的,當時主要是要基於RabbitMQ來實現咱們分佈式系統之間對有事務可靠性要求的系統間通訊的。關於分佈式事務一致性問題及其常見的解決方案,能夠看我另外一篇博客。提到RabbitMQ,不難想到的幾個關鍵字:消息中間件、消息隊列。而消息隊列不禁讓我想到,當時在大學學習操做系統這門課,消息隊列不難想到生產者消費者模式。(PS:操做系統這門課程真的很好也很重要,其中的一些思想在我工做的很長一段一時間內給了我很大幫助和啓發,給我提供了許多解決問題的思路。強烈建議每個程序員都去學一學操做系統!)html
分佈式系統消息中間件——RabbitMQ的使用進階篇docker
消息中間件也能夠稱消息隊列,是指用高效可靠的消息傳遞機制進行與平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息隊列模型,能夠在分佈式環境下擴展進程的通訊。當下主流的消息中間件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不一樣平臺之間進行通訊,經常使用來屏蔽各類平臺協議之間的特性,實現應用程序之間的協同。其優勢在於可以在客戶端和服務器之間進行同步和異步的鏈接,而且在任什麼時候刻均可以將消息進行傳送和轉發。是分佈式系統中很是重要的組件,主要用來解決應用耦合、異步通訊、流量削峯等問題。json
消息中間件幾大主要做用以下:c#
P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每一個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。數組
P2P的特色:安全
Pub/Sub模式包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。服務器
Pub/Sub的特色網絡
Kafka是LinkedIn開源的分佈式發佈-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特色是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支持複製,不支持事務,對消息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
RocketMQ是阿里開源的消息中間件,它是純Java開發,具備高吞吐量、高可用性、適合大規模分佈式系統應用的特色。RocketMQ思路起源於Kafka,但並非Kafka的一個Copy,它對消息的可靠傳輸及事務性作了優化,目前在阿里集團被普遍應用於交易、充值、流計算、消息推送、日誌流式處理、binglog分發等場景。
RabbitMQ比Kafka可靠,kafka更適合IO高吞吐的處理,通常應用在大數據日誌處理或對實時性(少許延遲),可靠性(少許丟數據)要求稍低的場景使用,好比ELK日誌收集。
RabbitMQ是流行的開源消息隊列系統。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了一個Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。其主要特色以下:
RabbitMQ從總體上來看是一個典型的生產者消費者模型,主要負責接收、存儲和轉發消息。其總體模型架構以下圖所示:
咱們先來看一個RabbitMQ的運轉流程,稍後會對這個流程中所涉及到的一些概念進行詳細的解釋。
生產者:
(1)生產者鏈接到RabbitMQ Broker,創建一個鏈接( Connection)開啓一個信道(Channel)
(2)生產者聲明一個交換器,並設置相關屬性,好比交換機類型、是否持久化等
(3)生產者聲明一個隊列井設置相關屬性,好比是否排他、是否持久化、是否自動刪除等
(4)生產者經過路由鍵將交換器和隊列綁定起來
(5)生產者發送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。
(6)相應的交換器根據接收到的路由鍵查找相匹配的隊列。
(7)若是找到,則將從生產者發送過來的消息存入相應的隊列中。
(8)若是沒有找到,則根據生產者配置的屬性選擇丟棄仍是回退給生產者
(9)關閉信道。
(10)關閉鏈接。'
消費者:
(1)消費者鏈接到RabbitMQ Broker ,創建一個鏈接(Connection),開啓一個信道(Channel) 。
(2)消費者向RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數,
(3)等待RabbitMQ Broker 迴應並投遞相應隊列中的消息,消費者接收消息。
(4)消費者確認(ack) 接收到的消息。
(5)RabbitMQ 從隊列中刪除相應己經被確認的消息。
(6)關閉信道。
(7)關閉鏈接。
這裏咱們主要討論兩個問題:
爲什麼要有信道?
主要緣由仍是在於TCP鏈接的"昂貴"性。不管是生產者仍是消費者,都須要和RabbitMQ Broker 創建鏈接,這個鏈接就是一條TCP 鏈接。而操做系統對於TCP鏈接的建立與銷燬是很是昂貴的開銷。假設消費者要消費消息,並根據服務需求合理調度線程,若只進行TCP鏈接,那麼當高併發的時候,每秒可能都有成千上萬的TCP鏈接,不只僅是對TCP鏈接的浪費,也很快會超過操做系統每秒所能創建鏈接的數量。若是能在一條TCP鏈接上操做,又能保證各個線程之間的私密性就完美了,因而信道的概念出現了。
信道爲什麼?
信道是創建在Connection 之上的虛擬鏈接。當應用程序與Rabbit Broker創建TCP鏈接的時候,客戶端緊接着能夠建立一個AMQP 信道(Channel) ,每一個信道都會被指派一個惟一的ID。RabbitMQ 處理的每條AMQP 指令都是經過信道完成的。信道就像電纜裏的光纖束。一條電纜內含有許多光纖束,容許全部的鏈接經過多條光線束進行傳輸和接收。
關於生產者消費者咱們須要瞭解幾個概念:
從RabbitMQ的運轉流程咱們能夠知道生產者的消息是發佈到交換器上的。而消費者則是從隊列上獲取消息的。那麼消息究竟是如何從交換器到隊列的呢?咱們先具體瞭解一下這幾個概念。
Queue:隊列,是RabbitMQ的內部對象,用於存儲消息。RabbitMQ中消息只能存儲在隊列中。生產者投遞消息到隊列,消費者從隊列中獲取消息並消費。多個消費者能夠訂閱同一個隊列,這時隊列中的消息會被平均分攤(輪詢)給多個消費者進行消費,而不是每一個消費者都收到全部的消息進行消費。(注意:RabbitMQ不支持隊列層面的廣播消費,若是須要廣播消費,能夠採用一個交換器經過路由Key綁定多個隊列,由多個消費者來訂閱這些隊列的方式。)
Exchange:交換器。在RabbitMQ中,生產者並不是直接將消息投遞到隊列中。真實狀況是,生產者將消息發送到Exchange(交換器),由交換器將消息路由到一個或多個隊列中。若是路由不到,或返回給生產者,或直接丟棄,或作其它處理。
RoutingKey:路由Key。生產者將消息發送給交換器的時候,通常會指定一個RoutingKey,用來指定這個消息的路由規則。這個路由Key須要與交換器類型和綁定鍵(BindingKey)聯合使用才能最終生效。在交換器類型和綁定鍵固定的狀況下,生產者能夠在發送消息給交換器時經過指定RoutingKey來決定消息流向哪裏。
Binding:RabbitMQ經過綁定將交換器和隊列關聯起來,在綁定的時候通常會指定一個綁定鍵,這樣RabbitMQ就能夠指定如何正確的路由到隊列了。
從這裏咱們能夠看到在RabbitMQ中交換器和隊列實際上能夠是一對多,也能夠是多對多關係。交換器和隊列就像咱們關係數據庫中的兩張表。他們同歸BindingKey作關聯(多對多關係表)。在咱們投遞消息時,能夠經過Exchange和RoutingKey(對應BindingKey)就能夠找到相對應的隊列。
RabbitMQ主要有四種類型的交換器:
fanout:扇形交換器,它會把發送到該交換器的消息路由到全部與該交換器綁定的隊列中。若是使用扇形交換器,則不會匹配路由Key。
direct:direct交換器,會把消息路由到RoutingKey與BindingKey徹底匹配的隊列中。
topic:徹底匹配BindingKey和RoutingKey的direct交換器 有些時候並不能知足實際業務的需求。topic 類型的交換器在匹配規則上進行了擴展,它與direct 類型的交換器類似,也是將消息路由到BindingKey 和RoutingKey 相匹配的隊
列中,但這裏的匹配規則有些不一樣,它約定:
如圖:
· 路由鍵爲" apple.rabbit.client" 的消息會同時路由到Queuel 和Queue2;
· 路由鍵爲" orange.mq.client" 的消息只會路由到Queue2 中:
· 路由鍵爲" apple.mq.demo" 的消息只會路由到Queue2 中:
· 路由鍵爲" banana.rabbit.demo" 的消息只會路由到Queuel 中:
· 路由鍵爲" orange.apple.banana" 的消息將會被丟棄或者返回給生產者由於它沒有匹配任何路由鍵。
瞭解了上面的概念,咱們再來思考消息是如何從交換器到隊列的。首先Rabbit在接收到消息時,會解析消息的標籤從而獲得消息的交換器與路由key信息。而後根據交換器的類型、路由key以及該交換器和隊列的綁定關係來決定消息最終投遞到哪一個隊列裏面。
這裏咱們基於docker來安裝。
docker pull rabbitmq:management
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
這裏咱們以dotnet平臺下RabbitMQ.Client3.6.9(能夠從nuget中下載)爲示例,簡單介紹dotnet平臺下對RabbitMQ的簡單操做。更詳細的內容能夠從nuget中下載源碼和文檔進行查看。
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin";//用戶名 factory.Password = "admin";//密碼 factory.HostName = "192.168.17.205";//主機名 factory.VirtualHost = "";//虛擬主機(這個暫時不須要,稍後的文章裏會介紹虛擬主機的概念) factory.Port = 15672;//端口 IConnection conn = factory.CreateConnection();//建立鏈接
IModel channel = conn.CreateModel();
說明:Connection 能夠用來建立多個Channel 實例,可是Channel 實例不能在線程間共享,應用程序應該爲每個線程開闢一個Channel 。某些狀況下Channel 的操做能夠併發運行,可是在其餘狀況下會致使在網絡上出現錯誤的通訊幀交錯,同時也會影響友送方確認( publisherconfrrm)機制的運行,因此多線程問共享Channel實例是非線程安全的。
channel.ExchangeDeclare("exchangeName", "direct", true); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, "exchangeName", "routingKey");
如上建立了一個持久化的、非自動刪除的、綁定類型爲direct 的交換器,同時也建立了一個非持久化的、排他的、自動刪除的隊列(此隊列的名稱由RabbitMQ 自動生成)。這裏的交換器和隊列也都沒有設置特殊的參數。
上面的代碼也展現瞭如何使用路由鍵將隊列和交換器綁定起來。上面聲明的隊列具有以下特性: 只對當前應用中同一個Connection 層面可用,同一個Connection 的不一樣Channel可共用,而且也會在應用鏈接斷開時自動刪除。
上述方法根據參數不一樣,能夠有不一樣的重載形式,根據自身的須要進行調用。
ExchangeDeclare方法詳解:
ExchangeDeclare有多個重載方法,這些重載方法都是由下面這個方法中缺省的某些參數構成的。
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
QueueDeclare方法詳解:
QueueDeclare只有兩個重載。
QueueDeclareOk QueueDeclare(); QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
不帶任何參數的queueDeclare 方法默認建立一個由RabbitMQ 命名的(相似這種amq.gen-LhQzlgv3GhDOv8PIDabOXA 名稱,這種隊列也稱之爲匿名隊列〉、排他的、自動刪除的、非持久化的隊列。
注意:生產者和消費者都可以使用queueDeclare 來聲明一個隊列,可是若是消費者在同一個信道上訂閱了另外一個隊列,就沒法再聲明隊列了。必須先取消訂閱,而後將信道直爲"傳輸"模式,以後才能聲明隊列。
QueueBind 方法詳解:
將隊列和交換器綁定的方法以下:
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
將隊列與交換器解綁的方法以下:
QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
其參數與綁定意義相同。
注:除隊列能夠綁定交換器外,交換器一樣能夠綁定隊列。即:ExchangeBind方法,其使用方式與隊列綁定類似。
發送消息可使用BasicPublish方法。
void BasicPublish(string exchange, string routingKey, bool mandatory,IBasicProperties basicProperties, byte[] body);
RabbitMQ 的消費模式分兩種: 推(Push)模式和拉(Pull)模式。推模式採用BasicConsume
進行消費,而拉模式則是調用BasicGet進行消費。
推模式:
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//定義消費者對象 consumer.Received += (model, ea) => { //do someting; channel.BasicAck(ea.DeliveryTag, multiple: false);//確認 }; channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);//訂閱消息
string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
拉模式
BasicGetResult result = channel.BasicGet("queueName", noAck: false);//獲取消息 channel.BasicAck(result.DeliveryTag, multiple: false);//確認
在應用程序使用完以後,須要關閉鏈接,釋放資源:
channel.close(); conn.close() ;
顯式地關閉Channel 是個好習慣,但這不是必須的,在Connection 關閉的時候,Channel 也會自動關閉。
以上簡單介紹了分佈式系統中消息中間件的概念與做用,以及RabbitMQ的一些基本概念與簡單使用。下一篇文章將繼續針對RabbitMQ進行總結。主要內容包括什麼時候建立隊列、RabbitMQ的確認機制、過時時間的使用、死信隊列、以及利用RabbitMQ實現延遲隊列......
《RabbitMQ實戰指南》
《RabbitMQ實戰 高效部署分佈式消息隊列》