本系列是「RabbitMQ實戰:高效部署分佈式消息隊列」書籍的總結筆記。java
前段時間總結完了「深刻淺出MyBatis」系列,對MyBatis有了更全面和深刻的瞭解,在掘金社區也收到了一些博友的喜歡,很高興。另外,短暫的陪產假就要結束了,小寶也二週了,下週二就要投入工做了,但願本身儘快調整過來,加油努力。bash
從本篇開始總結「RabbitMQ實戰」系列的閱讀筆記,RabbitMQ是一個開源的消息代理和隊列服務器,能夠經過基本協議在徹底不一樣的應用之間共享數據,能夠將做業排隊以便讓分佈式服務進行處理。服務器
本篇介紹下消息通訊,首先介紹基礎概念,將這些概念映射到AMQP協議,而後介紹消息持久化、發送方確認模式等消息可靠性保證。微信
經過本篇介紹,你會了解到:session
此部分的介紹,會牽涉到AMQP的元素,若是以前沒接觸過的,能夠結合下面的「AMQP元素」進行理解。併發
消息是傳輸的主體,消息包括兩部分:有效載荷(payload)和標籤(label);有效載荷是要傳輸的數據,能夠是任何內容,好比JSON串、二進制、自定義的數據協議等;標籤描述了有效載荷,而且Rabbit用它來決定誰將得到消息的投遞。分佈式
能夠與HTTP協議類比,HTTP消息頭部描述了消息體的類型、大小等,HTTP消息體是要傳輸的數據,HTTP服務端經過消息頭部決定如何處理請求和數據。工具
生產者建立消息,而後發送到代理服務器(RabbitMQ Server),AMQP只會用標籤表述這條消息(一個交換器名稱和可選的主題標記),Rabbit服務器會根據標籤把消息發送給訂閱的消費者。性能
消費者消費消息,它會訂閱到隊列(queue)上,每當有消息到達RabbitMQ服務器時,會發送給消費者,消費者收到消息時,會進行處理。spa
注意:消費者收到的消息只包括有效載荷,全部不會知道是從哪裏發來的。
要想發佈或消費消息,必須先與RabbitMQ Server創建一條TCP鏈接,創建TCP鏈接以後,要建立一條信道,信道是創建在真實TCP鏈接的虛擬鏈接。
AMQP命令都是經過信道發送出去的,每條信道會被指派一個惟一的ID,爲何不直接經過TCP鏈接發送AMQP命令呢? 由於操做系統創建和銷燬TCP會話是很昂貴的,並且建立的鏈接數也有限。 經過引入通道,能夠在鏈接上創建通道,並且通道是私密的,相互不受影響。
通道的概念仍是有點抽象,後面專門寫一篇文章進行分析介紹,這裏簡單理解下吧。
AMQP消息路由有三部分組成:隊列、交換器和綁定,隊列是存放消息的地方,交換器是決定不一樣的分發策略,綁定是隊列和交換器的橋樑,定義匹配規則。
生產者發送消息到交換器,交換器根據自身類型和綁定規則,將消息存放在對應隊列中,而後將消息發送到監聽隊列的消費者。
如上圖:P爲生產者,X爲交換器,交換器類型爲direct,根據不一樣的綁定規則(orange、black、green),分發給不一樣的隊列,C爲消費者,從不一樣的隊列介紹消息。
消費者經過兩種方式從特定的隊列接收消息:
basic.get會影響性能,推薦使用basic.consume來實現高吞吐量,由於其處理過程是先訂閱消息,獲取單條消息,再取取消訂閱。
若是隊列擁有多個消費者時,隊列收到的消息將以循環的方式發給消費者,即多個消費者平均消費這些消息。
另外,消費者接收到的每一條消息都要進行確認,必須經過basic.ack命令向rabbitmq服務端發送一個確認。 也能夠設置auto_ack爲true,只要消費者接收到消息,就自動視爲確認,不過不建議這樣,由於接收到不表明業務邏輯處理成功。 服務端接收到確認後,會從隊列中刪除對應消息。
還有一種場景,在接收到消息後,若是不想處理,能夠經過下面方式處理:
最後來介紹下如何建立隊列,首先明確下是生成者仍是消費者建立,關鍵點是:生產者可否承擔起丟失消息,由於發出去的消息若是路由到了不存在的隊列,Rabbit會忽略它們。因此,建議生成者和消費者都嘗試去建立隊列,能夠經過設置queue.declare的passive選項設置爲ture來判斷隊列是否存在,若是不存在會返回一個錯誤。
經過queue.declare命令來建立隊列,有一些選項說明下:
queueDeclare(String queue,
boolean durable,
boolean exclusive,
Map<String, Object> arguments);
複製代碼
交換器有四種類型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由鍵,不太實用,就不詳細介紹了。
第一種:direct交換器
direct交換器比較簡單,若是和路由鍵 徹底匹配 的話,就會投遞到對應的隊列:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
複製代碼
服務器默認包含一個空白字符串名稱的默認路由器,當聲明一個隊列時,會自定綁定到默認交換器,並以隊列名稱做爲路由鍵。
第二種:fanout交換器
fanout交換器,不處理路由鍵,只須要簡單的將隊列綁定到交換機上,爲會每一個消費者自動生成一個隨機隊列,全部的消費者都會收到全部消息。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
複製代碼
第三種:topic交換器
topic交換器,將路由鍵和某模式進行匹配,此時隊列須要綁定要一個模式上。
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
複製代碼
關於模式,符號#匹配一個或多個詞,符號*匹配一個詞,所以kfs.#可以匹配到kfs.session.message,可是audit.*只會匹配到audit.session。
每一個RabbitMQ服務器都能建立虛擬消息服務器,稱爲虛擬主機(vhost),每一個RabbitMQ本質上是一個mini版的RabbitMQ服務器,擁有本身的隊列、交換器、綁定,還有本身的權限機制。
鏈接時,必須制定vhost,rabbitmq包含了默認的vhost:"/"。當建立一個用戶時,會被指派給至少一個vhsot,而且相互隔離。
vhost不能經過AMQP協議建立,須要使用rabbitmqctl工具建立。
若是沒有持久化,重啓rabbitmq後,隊列、交換器都會消失,RabbitMQ提供了持久化的功能,須要知足如下三個條件:
當發佈一條持久化消息到持久化交換器上時,rabbit會在消息提交到日誌文件後纔會發送響應,全部會損失性能,因此,只對重要數據持久化便可。
考慮這種狀況:因爲發佈消息後,不返回任何信息給生產者,如何只對服務器已經持久化到硬盤了呢,可能在傳輸過程當中丟失,或者持久化前服務器宕機,致使消息丟失。
RabbitMQ經過「發送方確認模式」來解決上面的問題。首先,須要將信道設置成confirm模式,這樣全部在信道上發佈的消息都會被指派一個惟一的ID號,一旦消息被投遞到全部匹配的隊列或持久化到磁盤,會發送一個確認消息給生產者。
經過本篇的介紹,對Rabbit的消息模型有了總體瞭解,下一篇會寫個DEMO,並介紹下運行和管理RabbitMQ。
歡迎掃描下方二維碼,關注個人我的微信公衆號 ~