RabbitMQ-工做原理

使用場景

在咱們秒殺搶購商品的時候,系統會提醒咱們稍等排隊中,而不是像幾年前同樣頁面卡死或報錯給用戶。css

像這種排隊結算就用到了消息隊列機制,放入通道里面一個一個結算處理,而不是某個時間斷忽然涌入大批量的查詢新增把數據庫給搞宕機,因此RabbitMQ本質上起到的做用就是削峯填谷,爲業務保駕護航。html

爲何選擇RabbitMQ

如今的市面上有不少MQ能夠選擇,好比ActiveMQ、ZeroMQ、Appche Qpid,那問題來了爲何要選擇RabbitMQ?java

  1. 除了Qpid,RabbitMQ是惟一一個實現了AMQP標準的消息服務器;
  2. 可靠性,RabbitMQ的持久化支持,保證了消息的穩定性;
  3. 高併發,RabbitMQ使用了Erlang開發語言,Erlang是爲電話交換機開發的語言,天生自帶高併發光環,和高可用特性;
  4. 集羣部署簡單,正是應爲Erlang使得RabbitMQ集羣部署變的超級簡單;
  5. 社區活躍度高,根據網上資料來看,RabbitMQ也是首選;

工做機制

生產者、消費者和代理算法

在瞭解消息通信以前首先要了解3個概念:生產者、消費者和代理。數據庫

生產者:消息的建立者,負責建立和推送數據到消息服務器;安全

消費者:消息的接收方,用於處理數據和確認消息;服務器

代理:就是RabbitMQ自己,用於扮演「快遞」的角色,自己不生產消息,只是扮演「快遞」的角色。網絡

消息發送原理併發

首先你必須鏈接到Rabbit才能發佈和消費消息,那怎麼鏈接和發送消息的呢?app

你的應用程序和Rabbit Server之間會建立一個TCP鏈接,一旦TCP打開,並經過了認證,認證就是你試圖鏈接Rabbit以前發送的Rabbit服務器鏈接信息和用戶名和密碼,有點像程序鏈接數據庫,使用Java有兩種鏈接認證的方式,後面代碼會詳細介紹,一旦認證經過你的應用程序和Rabbit就建立了一條AMQP信道(Channel)。

信道是建立在「真實」TCP上的虛擬鏈接,AMQP命令都是經過信道發送出去的,每一個信道都會有一個惟一的ID,不管是發佈消息,訂閱隊列或者介紹消息都是經過信道完成的。

爲何不經過TCP直接發送命令?

對於操做系統來講建立和銷燬TCP會話是很是昂貴的開銷,假設高峯期每秒有成千上萬條鏈接,每一個鏈接都要建立一條TCP會話,這就形成了TCP鏈接的巨大浪費,並且操做系統每秒能建立的TCP也是有限的,所以很快就會遇到系統瓶頸。

若是咱們每一個請求都使用一條TCP鏈接,既知足了性能的須要,又能確保每一個鏈接的私密性,這就是引入信道概念的緣由。

你必須知道的Rabbit

想要真正的瞭解Rabbit有些名詞是你必須知道的。

包括:ConnectionFactory(鏈接管理器)、Channel(信道)、Exchange(交換器)、Queue(隊列)、RoutingKey(路由鍵)、BindingKey(綁定鍵)。

ConnectionFactory(鏈接管理器):應用程序與Rabbit之間創建鏈接的管理器,程序代碼中使用;

Channel(信道):消息推送使用的通道;

Exchange(交換器):用於接受、分配消息;

Queue(隊列):用於存儲生產者的消息;

RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;

BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;

看到上面的解釋,最難理解的路由鍵和綁定鍵了,那麼他們具體怎麼發揮做用的,請看下圖:

關於更多交換器的信息,咱們在後面再講。

消息持久化

Rabbit隊列和交換器有一個不可告人的祕密,就是默認狀況下重啓服務器會致使消息丟失,那麼怎麼保證Rabbit在重啓的時候不丟失呢?答案就是消息持久化。

當你把消息發送到Rabbit服務器的時候,你須要選擇你是否要進行持久化,但這並不能保證Rabbit能從崩潰中恢復,想要Rabbit消息能恢復必須知足3個條件:

  1. 投遞消息的時候durable設置爲true,消息持久化,代碼:channel.queueDeclare(x, true, false, false, null),參數2設置爲true持久化;
  2. 設置投遞模式deliveryMode設置爲2(持久),代碼:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),參數3設置爲存儲純文本到磁盤;
  3. 消息已經到達持久化交換器上;
  4. 消息已經到達持久化的隊列;

持久化工做原理

Rabbit會將你的持久化消息寫入磁盤上的持久化日誌文件,等消息被消費以後,Rabbit會把這條消息標識爲等待垃圾回收。

持久化的缺點

消息持久化的優勢顯而易見,但缺點也很明顯,那就是性能,由於要寫入硬盤要比寫入內存性能較低不少,從而下降了服務器的吞吐量,儘管使用SSD硬盤可使事情獲得緩解,但他仍然吸乾了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。

因此使用者要根據本身的狀況,選擇適合本身的方式。

虛擬主機

每一個Rabbit都能建立不少vhost,咱們稱之爲虛擬主機,每一個虛擬主機其實都是mini版的RabbitMQ,擁有本身的隊列,交換器和綁定,擁有本身的權限機制。

vhost特性

  1. RabbitMQ默認的vhost是「/」開箱即用;

  2. 多個vhost是隔離的,多個vhost沒法通信,而且不用擔憂命名衝突(隊列和交換器和綁定),實現了多層分離;

  3. 建立用戶的時候必須指定vhost;

vhost操做

能夠經過rabbitmqctl工具命令建立:

rabbitmqctl add_vhost[vhost_name]

刪除vhost:

rabbitmqctl delete_vhost[vhost_name]

查看全部的vhost:

rabbitmqctl list_vhosts

vhosts(broker)
一個RabbitMQ的實體上能夠有多個vhosts,用戶與權限設置就是依附於vhosts。
在rabbitmq server上能夠建立多個虛擬的message broker,又叫作virtual hosts (vhosts)。每個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost至關於物理的server,能夠爲不一樣app提供邊界隔離,使得應用安全的運行在不一樣的vhost實例上,相互之間不會干擾。producer和consumer鏈接rabbit server須要指定一個vhost。

connection 與 channel(鏈接與信道)
connection是指物理的鏈接,一個client與一個server之間有一個鏈接;一個鏈接上能夠創建多個channel,能夠理解爲邏輯上的鏈接。通常應用的狀況下,有一個channel就夠用了,不須要建立更多的channel。

exchange 與  routingkey(交換機與路由鍵)
Exchange相似於數據通訊網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是經過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange能夠和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue同樣,Exchange也可設置爲持久化,臨時或者自動刪除。

環境搭建

前文咱們已經介紹了Ubuntu搭建RabbitMQ的步驟:RabbitMQ在Ubuntu上的環境搭建

若是你是在Windows10上去安裝那就更簡單了,先放下載地址:

Erlang/Rabbit Server百度網盤連接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密碼:wct9

固然也可去Erlang和Rabbit官網去下,就是速度比較慢。個人百度雲Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下載最新的Erlang,在Windows10上打開擴展插件有問題,打不開。

  1. 安裝Erlang;

  2. 安裝Rabbit Server;

  3. 進入安裝目錄\sbin下,使用命令「rabbitmq-plugins enable rabbitmq_management」啓動網頁管理插件;

  4. 重啓Rabbit服務;

使用:http://localhost:15672進行測試,默認的登錄帳號爲:guest,密碼爲:guest

重複安裝Rabbit Server的坑

若是不是第一次在Windows上安裝Rabbit Server必定要把Rabbit和Erlang卸載乾淨以後,找到註冊表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 刪除其下的全部項。

否則會出現Rabbit安裝以後啓動不了的狀況,理論上卸載的順序也是先Rabbit在Erlang。

代碼實現

java版實現,使用maven項目,建立能夠查看:MyEclipse2017破解設置與maven項目搭建

項目建立成功以後,添加Rabbit Client jar包,只須要在pom.xml裏面配置,以下信息:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.2.0</version> </dependency>

java實現代碼分爲兩個類,第一個是建立Rabbit鏈接,第二是應用類使用最簡單的方式發佈和消費消息。

Rabbit的鏈接,兩種方式:

方式一:

public static Connection GetRabbitConnection() { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(Config.UserName); factory.setPassword(Config.Password); factory.setVirtualHost(Config.VHost); factory.setHost(Config.Host); factory.setPort(Config.Port); Connection conn = null; try { conn = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return conn; }

方式二:

public static Connection GetRabbitConnection2() { ConnectionFactory factory = new ConnectionFactory(); // 鏈接格式:amqp://userName:password@hostName:portNumber/virtualHost String uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port, Config.VHost); Connection conn = null; try { factory.setUri(uri); factory.setVirtualHost(Config.VHost); conn = factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return conn; }

第二部分:應用類,使用最簡單的方式發佈和消費消息

public static void main(String[] args) { Publisher(); // 推送消息 Consumer(); // 消費消息 } /** * 推送消息 */ public static void Publisher() { // 建立一個鏈接 Connection conn = ConnectionFactoryUtil.GetRabbitConnection(); if (conn != null) { try { // 建立通道 Channel channel = conn.createChannel(); // 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨佔模式;參數四:消費者斷開鏈接時是否刪除隊列;參數五:消息其餘參數】 channel.queueDeclare(Config.QueueName, false, false, false, null); String content = String.format("當前時間:%s", new Date().getTime()); // 發送內容【參數說明:參數一:交換機名稱;參數二:隊列名稱,參數三:消息的其餘屬性-routing headers,此屬性爲MessageProperties.PERSISTENT_TEXT_PLAIN用於設置純文本消息存儲到硬盤;參數四:消息主體】 channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8")); System.out.println("已發送消息:" + content); // 關閉鏈接 channel.close(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * 消費消息 */ public static void Consumer() { // 建立一個鏈接 Connection conn = ConnectionFactoryUtil.GetRabbitConnection(); if (conn != null) { try { // 建立通道 Channel channel = conn.createChannel(); // 聲明隊列【參數說明:參數一:隊列名稱,參數二:是否持久化;參數三:是否獨佔模式;參數四:消費者斷開鏈接時是否刪除隊列;參數五:消息其餘參數】 channel.queueDeclare(Config.QueueName, false, false, false, null); // 建立訂閱器,並接受消息 channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); // 隊列名稱 String contentType = properties.getContentType(); // 內容類型 String content = new String(body, "utf-8"); // 消息正文 System.out.println("消息正文:" + content); channel.basicAck(envelope.getDeliveryTag(), false); // 手動確認消息【參數說明:參數一:該消息的index;參數二:是否批量應答,true批量確認小於index的消息】 } }); } catch (Exception e) { e.printStackTrace(); } } }

代碼裏面已經寫了很詳細的註釋,在這裏也不過多的介紹了。

執行效果,如圖:

相關文章
相關標籤/搜索