分佈式系統消息中間件——RibbitMQ的使用基礎篇

前言

我是在解決分佈式事務的一致性問題時瞭解到RabbitMQ的,當時主要是要基於RabbitMQ來實現咱們分佈式系統之間對有事務可靠性要求的系統間通訊的。關於分佈式事務一致性問題及其常見的解決方案,能夠看我另外一篇博客。提到RabbitMQ,不難想到的幾個關鍵字:消息中間件、消息隊列。而消息隊列不禁讓我想到,當時在大學學習操做系統這門課,消息隊列不難想到生產者消費者模式。(PS:操做系統這門課程真的很好也很重要,其中的一些思想在我工做的很長一段一時間內給了我很大幫助和啓發,給我提供了許多解決問題的思路。強烈建議每個程序員都去學一學操做系統!)html

clipboard.png

一 消息中間件

1.1 簡介程序員

消息中間件也能夠稱消息隊列,是指用高效可靠的消息傳遞機制進行與平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息隊列模型,能夠在分佈式環境下擴展進程的通訊。當下主流的消息中間件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不一樣平臺之間進行通訊,經常使用來屏蔽各類平臺協議之間的特性,實現應用程序之間的協同。其優勢在於可以在客戶端和服務器之間進行同步和異步的鏈接,而且在任什麼時候刻均可以將消息進行傳送和轉發。是分佈式系統中很是重要的組件,主要用來解決應用耦合、異步通訊、流量削峯等問題。面試

1.2 做用docker

消息中間件幾大主要做用以下:數據庫

  • 解耦
  • 冗餘(存儲)
  • 擴展性
  • 削峯
  • 可恢復性
  • 順序保證
  • 緩衝
  • 異步通訊

1.3 消息中間件的兩種模式json

1.3.1 P2P模式數組

P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每一個消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。安全

P2P的特色:性能優化

  • 每一個消息只有一個消費者(Consumer)(即一旦被消費,消息就再也不在消息隊列中)
  • 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息以後,無論接收者有沒有正在運行它不會影響到消息被髮送到隊列
  • 接收者在成功接收消息以後需向隊列應答成功
  • 若是但願發送的每一個消息都會被成功處理的話,那麼須要P2P模式

1.3.2 Pub/Sub模式服務器

Pub/Sub模式包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

Pub/Sub的特色

  • 每一個消息能夠有多個消費者
  • 發佈者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者以後,才能消費發佈者的消息。
  • 爲了消費消息,訂閱者必須保持運行的狀態。
  • 若是但願發送的消息能夠不被作任何處理、或者只被一個消息者處理、或者能夠被多個消費者處理的話,那麼能夠採用Pub/Sub模型。

1.4 經常使用中間件介紹與對比

  • Kafka是LinkedIn開源的分佈式發佈-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特色是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支持複製,不支持事務,對消息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。
  • RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
  • RocketMQ是阿里開源的消息中間件,它是純Java開發,具備高吞吐量、高可用性、適合大規模分佈式系統應用的特色。RocketMQ思路起源於Kafka,但並非Kafka的一個Copy,它對消息的可靠傳輸及事務性作了優化,目前在阿里集團被普遍應用於交易、充值、流計算、消息推送、日誌流式處理、binglog分發等場景。

RabbitMQ比Kafka可靠,kafka更適合IO高吞吐的處理,通常應用在大數據日誌處理或對實時性(少許延遲),可靠性(少許丟數據)要求稍低的場景使用,好比ELK日誌收集。

二 RabbitMQ瞭解

2.1 簡介

RabbitMQ是流行的開源消息隊列系統。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了一個Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)或者數據持久化都有很好的支持。其主要特色以下:

  • 可靠性
  • 靈活的路由
  • 擴展性
  • 高可用性
  • 多種協議
  • 多語言客戶端
  • 管理界面
  • 插件機制

2.2 概念

RabbitMQ從總體上來看是一個典型的生產者消費者模型,主要負責接收、存儲和轉發消息。其總體模型架構以下圖所示:

clipboard.png

咱們先來看一個RabbitMQ的運轉流程,稍後會對這個流程中所涉及到的一些概念進行詳細的解釋。

生產者:

(1)生產者鏈接到RabbitMQ Broker,創建一個鏈接( Connection)開啓一個信道(Channel)
(2)生產者聲明一個交換器,並設置相關屬性,好比交換機類型、是否持久化等
(3)生產者聲明一個隊列井設置相關屬性,好比是否排他、是否持久化、是否自動刪除等
(4)生產者經過路由鍵將交換器和隊列綁定起來
(5)生產者發送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息。
(6)相應的交換器根據接收到的路由鍵查找相匹配的隊列。
(7)若是找到,則將從生產者發送過來的消息存入相應的隊列中。
(8)若是沒有找到,則根據生產者配置的屬性選擇丟棄仍是回退給生產者
(9)關閉信道。
(10)關閉鏈接。

消費者:

(1)消費者鏈接到RabbitMQ Broker ,創建一個鏈接(Connection),開啓一個信道(Channel) 。
(2)消費者向RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數,
(3)等待RabbitMQ Broker 迴應並投遞相應隊列中的消息,消費者接收消息。
(4)消費者確認(ack) 接收到的消息。
(5)RabbitMQ 從隊列中刪除相應己經被確認的消息。
(6)關閉信道。
(7)關閉鏈接。

2.2.1 信道

這裏咱們主要討論兩個問題:

爲什麼要有信道?

主要緣由仍是在於TCP鏈接的"昂貴"性。不管是生產者仍是消費者,都須要和RabbitMQ Broker 創建鏈接,這個鏈接就是一條TCP 鏈接。而操做系統對於TCP鏈接的建立於銷燬是很是昂貴的開銷。假設消費者要消費消息,並根據服務需求合理調度線程,若只進行TCP鏈接,那麼當高併發的時候,每秒可能都有成千上萬的TCP鏈接,不只僅是對TCP鏈接的浪費,也很快會超過操做系統每秒所能創建鏈接的數量。若是能在一條TCP鏈接上操做,又能保證各個線程之間的私密性就完美了,因而信道的概念出現了。

信道爲什麼?

信道是創建在Connection 之上的虛擬鏈接。當應用程序與Rabbit Broker創建TCP鏈接的時候,客戶端緊接着能夠建立一個AMQP 信道(Channel) ,每一個信道都會被指派一個惟一的D。RabbitMQ 處理的每條AMQP 指令都是經過信道完成的。信道就像電纜裏的光纖束。一條電纜內含有許多光纖束,容許全部的鏈接經過多條光線束進行傳輸和接收。

2.2.2 生產者消費者

關於生產者消費者咱們須要瞭解幾個概念:

  • Producer:生產者,即消息投遞者一方。
  • 消息:消息通常分兩個部分:消息體(payload)和標籤。標籤用來描述這條消息,如:一個交換器的名稱或者一個路由Key,Rabbit經過解析標籤來肯定消息的去向,payload是消息內容可使一個json,數組等等。
  • Consumer:消費者,就是接收消息的一方。消費者訂閱RabbitMQ的隊列,當消費者消費一條消息時,只是消費消息的消息體。在消息路由的過程當中,會丟棄標籤,存入到隊列中的只有消息體。
  • Broker:消息中間件的服務節點。

2.2.3 隊列、交換器、路由key、綁定

從RabbitMQ的運轉流程咱們能夠知道生產者的消息是發佈到交換器上的。而消費者則是從隊列上獲取消息的。那麼消息究竟是如何從交換器到隊列的呢?咱們先具體瞭解一下這幾個概念。

Queue:隊列,是RabbitMQ的內部對象,用於存儲消息。RabbitMQ中消息只能存儲在隊列中。生產者投遞消息到隊列,消費者從隊列中獲取消息並消費。多個消費者能夠訂閱同一個隊列,這時隊列中的消息會被平均分攤(輪詢)給多個消費者進行消費,而不是每一個消費者都收到全部的消息進行消費。(注意:RabbitMQ不支持隊列層面的廣播消費,若是須要廣播消費,能夠採用一個交換器經過路由Key綁定多個隊列,由多個消費者來訂閱這些隊列的方式。)

Exchange:交換器。在RabbitMQ中,生產者並不是直接將消息投遞到隊列中。真實狀況是,生產者將消息發送到Exchange(交換器),由交換器將消息路由到一個或多個隊列中。若是路由不到,或返回給生產者,或直接丟棄,或作其它處理。

RoutingKey:路由Key。生產者將消息發送給交換器的時候,通常會指定一個RoutingKey,用來指定這個消息的路由規則。這個路由Key須要與交換器類型和綁定鍵(BindingKey)聯合使用才能最終生效。在交換器類型和綁定鍵固定的狀況下,生產者能夠在發送消息給交換器時經過指定RoutingKey來決定消息流向哪裏。

Binding:RabbitMQ經過綁定將交換器和隊列關聯起來,在綁定的時候通常會指定一個綁定鍵,這樣RabbitMQ就能夠指定如何正確的路由到隊列了。

從這裏咱們能夠看到在RabbitMQ中交換器和隊列實際上能夠是一對多,也能夠是多對多關係。交換器和隊列就像咱們關係數據庫中的兩張表。他們同歸BindingKey作關聯(多對多關係表)。在咱們投遞消息時,能夠經過Exchange和RoutingKey(對應BindingKey)就能夠找到相對應的隊列。

RabbitMQ主要有四種類型的交換器:

  • fanout:扇形交換器,它會把發送到該交換器的消息路由到全部與該交換器綁定的隊列中。若是使用扇形交換器,則不會匹配路由Key。

clipboard.png

  • direct:direct交換器,會把消息路由到RoutingKey與BindingKey徹底匹配的隊列中。

clipboard.png

  • topic:徹底匹配BindingKey和RoutingKey的direct交換器 有些時候並不能知足實際業務的需求。topic
    類型的交換器在匹配規則上進行了擴展,它與direct 類型的交換器類似,也是將消息路由到BindingKey 和RoutingKey相匹配的隊列中,但這裏的匹配規則有些不一樣,它約定:

1:RoutingKey 爲一個點號"."分隔的字符串(被點號"."分隔開的每一段獨立的字符串稱爲一個單詞)λ,如"hs.rabbitmq.client","com.rabbit.client"等。
2:BindingKey 和RoutingKey 同樣也是點號"."分隔的字符串;
3:BindingKey 中能夠存在兩種特殊字符串""和"#",用於作模糊匹配,其中""用於匹配一個單詞,"#"用於匹配多規格單詞(能夠是零個)。

clipboard.png

如圖:

​ · 路由鍵爲" apple.rabbit.client" 的消息會同時路由到Queuel 和Queue2;
​ · 路由鍵爲" orange.mq.client" 的消息只會路由到Queue2 中:
​ · 路由鍵爲" apple.mq.demo" 的消息只會路由到Queue2 中:
​ · 路由鍵爲" banana.rabbit.demo" 的消息只會路由到Queuel 中:
​ · 路由鍵爲" orange.apple.banana" 的消息將會被丟棄或者返回給生產者由於它沒有匹配任何路由鍵。

  • header:headers 類型的交換器不依賴於路由鍵的匹配規則來路由消息,而是根據發送的消息內容中 的headers屬性進行匹配。在綁定隊列和交換器時制定一組鍵值對, 當發送消息到交換器時, RabbitMQ 會獲取到該消息的headers(也是一個鍵值對的形式) ,對比其中的鍵值對是否徹底 匹配隊列和交換器綁定時指定的鍵值對,若是徹底匹配則消息會路由到該隊列,不然不會路由到該隊列。(注:該交換器類型性能較差且不實用,所以通常不會用到)。

瞭解了上面的概念,咱們再來思考消息是如何從交換器到隊列的。首先Rabbit在接收到消息時,會解析消息的標籤從而獲得消息的交換器與路由key信息。而後根據交換器的類型、路由key以及該交換器和隊列的綁定關係來決定消息最終投遞到哪一個隊列裏面。

三 RabbitMQ使用

3.1 RabbitMQ安裝

這裏咱們基於docker來安裝。

3.1.1 拉取鏡像

docker pull rabbitmq:management

3.1.2 啓動容器

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
3.2 RabbitMQ 客戶端開發使用

這裏咱們以dotnet平臺下RabbitMQ.Client3.6.9(能夠從nuget中下載)爲示例,簡單介紹dotnet平臺下對RabbitMQ的簡單操做。更詳細的內容能夠從nuget中下載源碼和文檔進行查看。

3.2.1 鏈接Rabbit

ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = "admin";//用戶名
        factory.Password = "admin";//密碼      
        factory.HostName = "192.168.17.205";//主機名
        factory.VirtualHost = "";//虛擬主機(這個暫時不須要,稍後的文章裏會介紹虛擬主機    的概念)
        factory.Port = 15672;//端口
        IConnection conn = factory.CreateConnection();//建立鏈接

3.2.2 建立信道

IModel channel = conn.CreateModel();
說明:Connection 能夠用來建立多個Channel 實例,可是Channel 實例不能在線程問共享,應用程序應該爲每個線程開闢一個Channel 。某些狀況下Channel 的操做能夠併發運行,可是在其餘狀況下會致使在網絡上出現錯誤的通訊幀交錯,同時也會影響友送方確認( publisherconfrrm)機制的運行,因此多線程問共享Channel實例是非線程安全的。

3.2.3 交換器、隊列和綁定

channel.ExchangeDeclare("exchangeName", "direct", true);
  String queueName = channel.QueueDeclare().QueueName;
  channel.QueueBind(queueName, "exchangeName", "routingKey");

如上建立了一個持久化的、非自動刪除的、綁定類型爲direct 的交換器,同時也建立了一個非持久化的、排他的、自動刪除的隊列(此隊列的名稱由RabbitMQ 自動生成)。這裏的交換器和隊列也都沒有設置特殊的參數。

上面的代碼也展現瞭如何使用路由鍵將隊列和交換器綁定起來。上面聲明的隊列具有以下特性: 只對當前應用中同一個Connection 層面可用,同一個Connection 的不一樣Channel可共用,而且也會在應用鏈接斷開時自動刪除。

上述方法根據參數不一樣,能夠有不一樣的重載形式,根據自身的須要進行調用。

ExchangeDeclare方法詳解:

ExchangeDeclare有多個重載方法,這些重載方法都是由下面這個方法中缺省的某些參數構成的。

void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
  • exchange : 交換器的名稱。
  • type : 交換器的類型,常見的如fanout、direct 、topic
  • durable: 設置是否持久化。durab l e 設置爲true 表示持久化,
    反之是非持久化。持久化能夠將交換器存盤,在服務器重啓的時候不會丟失相關信息。
  • autoDelete : 設置是否自動刪除。autoDelete 設置爲true
    則表示自動刪除。自動刪除的前提是至少有一個隊列或者交換器與這個交換器綁定,以後全部與這個交換器綁定的隊列或者交換器都與此解綁。注意不能錯誤地把這個參數理解爲:"當與此交換器鏈接的客戶端都斷開時,RabbitMQ 會自動刪除本交換器"。
  • internal : 設置是不是內置的。若是設置爲true,則表示是內置的交換器,客戶端程序沒法直接發送消息到這個交換器中,只能經過交換器路由到交換器這種方式。
  • argument : 其餘一些結構化參數,好比alternate - exchange。

QueueDeclare方法詳解:

QueueDeclare只有兩個重載。

QueueDeclareOk QueueDeclare();

QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);

不帶任何參數的queueDeclare 方法默認建立一個由RabbitMQ 命名的(相似這種amq.gen-LhQzlgv3GhDOv8PIDabOXA 名稱,這種隊列也稱之爲匿名隊列〉、排他的、自動刪除的、非持久化的隊列。

  • queue : 隊列的名稱。
  • durable: 設置是否持久化。爲true 則設置隊列爲持久化。持久化的隊列會存盤,在服務器重啓的時候能夠保證不丟失相關信息。
  • exclusive : 設置是否排他。爲true
    則設置隊列爲排他的。若是一個隊列被聲明爲排他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點:排他隊列是基於鏈接(Connection) 可見的,同一個鏈接的不一樣信道(Channel)是能夠同時訪問同一鏈接建立的排他隊列;"首次"是指若是一個鏈接己經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣:即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用於一個客戶端同時發送和讀取消息的應用場景。
  • autoDelete: 設置是否自動刪除。爲true則設置隊列爲自動刪除。自動刪除的前提是:至少有一個消費者鏈接到這個隊列,以後全部與這個隊列鏈接的消費者都斷開時,纔會自動刪除。不能把這個參數錯誤地理解爲:當鏈接到此隊列的全部客戶端斷開時,這個隊列自動刪除",由於生產者客戶端建立這個隊列,或者沒有消費者客戶端與這個隊列鏈接時,都不會自動刪除這個隊列。
  • argurnents:設置隊列的其餘一些參數,如x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key,x-rnax-priority等。

注意:生產者和消費者都可以使用queueDeclare來聲明一個隊列,可是若是消費者在同一個信道上訂閱了另外一個隊列,就沒法再聲明隊列了。必須先取消訂閱,而後將信道直爲"傳輸"模式,以後才能聲明隊列。

在此我向你們推薦一個架構學習交流羣。交流學習羣號:478030634  裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多

QueueBind 方法詳解:

將隊列和交換器綁定的方法以下:

void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
  • queue: 隊列名稱:
  • exchange: 交換器的名稱:
  • routingKey: 用來綁定隊列和交換器的路由鍵;
  • argument: 定義綁定的一些參數。

將隊列與交換器解綁的方法以下:

QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);

其參數與綁定意義相同。

注:除隊列能夠綁定交換器外,交換器一樣能夠綁定隊列。即:ExchangeBind方法,其使用方式與隊列綁定類似。

3.2.4 發送消息

發送消息可使用BasicPublish方法。

void BasicPublish(string exchange, string routingKey, bool mandatory,IBasicProperties basicProperties, byte[] body);
  • exchange: 交換器的名稱,指明消息須要發送到哪一個交換器中。若是設置爲空字符串,則消息會被髮送到RabbitMQ 默認的交換器中。
  • routingKey : 路由鍵,交換器根據路由鍵將消息存儲到相應的隊列之中。
  • basicProperties: 消息的基本屬性集。
  • body : 消息體( pay1oad ),真正須要發送的消息。
  • mandatory: 是否將消息返回給生產者(會在後續的文章中介紹這個參數).

3.2.5 消費消息

RabbitMQ 的消費模式分兩種: 推(Push)模式和拉(Pull)模式。推模式採用BasicConsume

進行消費,而拉模式則是調用BasicGet進行消費。

推模式:

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//定義消費者 對象
consumer.Received += (model, ea) =>
   {
    //do someting;
    channel.BasicAck(ea.DeliveryTag, multiple: false);//確認
   };
    channel.BasicConsume(queue: "queueName",
                    noAck: false,
                    consumer: consumer);//訂閱消息

string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
  • queue : 隊列的名稱:
  • noAck : 設置是否須要確認,false爲須要確認。
  • consumerTag: 消費者標籤,用來區分多個消費者:
  • noLocal : 設置爲true 則表示不能將同一個Connection中生產者發送的消息傳送給這個Connection中的消費者:
  • exclusive : 設置是否排他
  • arguments : 設置消費者的其餘參數
  • consumer: 指定處理消息的消費者對象。

拉模式

BasicGetResult result = channel.BasicGet("queueName", noAck: false);//獲取消息

channel.BasicAck(result.DeliveryTag, multiple: false);//確認

3.2.6 關閉鏈接

在應用程序使用完以後,須要關閉鏈接,釋放資源:

channel.close();
conn.close() ;

顯式地關閉Channel 是個好習慣,但這不是必須的,在Connection 關閉的時候,Channel 也會自動關閉。

結束語

以上簡單介紹了分佈式系統中消息中間件的概念與做用,以及RabbitMQ的一些基本概念與簡單使用。下一篇文章將繼續針對RabbitMQ進行總結。主要內容包括什麼時候建立隊列、RabbitMQ的確認機制、過時時間的使用、死信隊列、以及利用RabbitMQ實現延遲隊列......

你們以爲文章對你仍是有一點點幫助的,你們能夠點擊下方二維碼進行關注。 《Java爛豬皮》 公衆號聊的不只僅是Java技術知識,還有面試等乾貨,後期還有大量架構乾貨。你們一塊兒關注吧!關注爛豬皮,你會了解的更多.............. 

原文鏈接:https://www.cnblogs.com/hunte...

相關文章
相關標籤/搜索