消息隊列助你成爲高薪的 Node.js 工程師

爲何寫這篇文章

  • 如今的面試要求愈來愈高了,打開看了看幾個BOSS招聘 Node.js 全棧開發的,其中都有一條「瞭解消息隊列,並在項目中應用過」,嗚嗚嗚
  • 後端開發者應該都知道消息隊列,可是一些前端開發者可能知道的並很少,可是大家可能好奇搶票,商品秒殺等功能是如何實現的,其實沒有多麼高大上,看了消息隊列就知道了。

文章導圖(你能學到)

做者簡介:koala,專一完整的 Node.js 技術棧分享,從 JavaScript 到 Node.js,再到後端數據庫,祝您成爲優秀的高級 Node.js 工程師。【程序員成長指北】做者,Github 博客開源項目 github.com/koala-codin…javascript

什麼是消息隊列

「消息隊列」是在消息的傳輸過程當中保存消息的容器。html

我的理解:我把它分紅兩個詞消息隊列。當一大批客戶端同時產生大量的網絡請求(消息)時候,服務器的承受能力確定是有一個限制的。這時候要是有個容器,先讓這些消息排隊就行了,還好有個叫隊列的數據結構,經過有隊列屬性的容器排隊(先進先出),把消息再傳到咱們的服務器,壓力減少了好多,這個很棒的容器就是消息隊列前端

這段理解中還包含這個兩個概念: 客戶端->生產者 服務器->消費者 當有消息隊列出現,生產者消費者是必不可少的兩個概念,上面的理解是多個生產者對應一個消費者,固然現實開發中還有許多消費者的狀況哦。接下來的文章也會屢次提到生產-消費模型java

消息隊列優點

  • 應用解耦node

    消息隊列可使消費者和生產者直接互不干涉,互不影響,只須要把消息發送到隊列便可,並且可獨立的擴展或修改兩邊的處理過程,只要能確保它們遵照一樣的接口約定,能夠生產者用Node.js實現,消費者用phython實現。git

  • 靈活性和峯值處理能力程序員

    當客戶端訪問量忽然劇增,對服務器的訪問已經超過服務所能處理的最大峯值,甚至致使服務器超時負載崩潰,使用消息隊列能夠解決這個問題,能夠經過控制消費者的處理速度生產者可進入消息隊列的數量等來避免峯值問題github

  • 排序保證面試

    消息隊列能夠控制數據處理的順序,由於消息隊列自己使用的是隊列這個數據結構,FIFO(先進選出),在一些場景數據處理的順序很重要,好比商品下單順序等。redis

  • 異步通訊

    消息隊列中的有些消息,並不須要當即處理,消息隊列提供了異步處理機制,能夠把消息放在隊列中並不當即處理,須要的時候處理,或者異步慢慢處理,一些不重要的發送短信和郵箱功能可使用。

  • 可擴展性

    前面提到了消息隊列能夠作到解耦,若是咱們想加強消息入隊和出隊的處理頻率,很簡單,並不須要改變代碼中任何內容,能夠直接對消息隊列修改一些配置便可,好比咱們想限制每次發送給消費者的消息條數等。

有優點定有它現實的應用場景,文章後面會針對優點講它們對應的應用場景。

消息隊列的類型介紹

介紹幾款目前市場上主流的消息隊列(課外知識,可忽略)

  • Kafka:是由 Apache 軟件基金會開發的一個開源流處理平臺,由 Scala 和 Java 編寫,是一種高吞吐量的分佈式發佈訂閱消息系統,支持單機每秒百萬併發。另外,Kafka 的定位主要在日誌等方面, 由於Kafka 設計的初衷就是處理日誌的,能夠看作是一個日誌(消息)系統一個重要組件,針對性很強。0.8 版本開始支持複製,不支持事物,所以對消息的重複、丟失、錯誤沒有嚴格的要求。
  • RocketMQ:阿里開源的消息中間件,是一款低延遲、高可靠、可伸縮、易於使用的消息中間件,思路起源於 Kafka。最大的問題商業版收費,有些功能不開放。
  • RabbitMQ:由 Erlang(有着和原生 Socket 同樣低的延遲)語言開發基於 AMQP 協議的開源消息隊列系統。能保證消息的可靠性、穩定性、安全性。高併發 的特性,毋庸置疑,RabbitMQ 最高,緣由是它的實現語言是天生具有高併發高可用的erlang 語言,天生的分佈式優點。

說明本文主要以RabbitMQ講解,較爲常見。 我的認爲這幾種消息隊列中間件能實現的功能,經過 redis 也都能實現,思想。

初識消息隊列(消息隊列在node.js中的簡單應用)

Rabbitmq基本安裝

Mac版安裝

直接經過 HomeBrew 安裝,執行如下命令

brew install rabbitmq
複製代碼

啓動 rabbitmq

進入安裝目錄
$ /usr/local/Cellar/rabbitmq/3.7.8
啓動
$ sbin/rabbitmq-server
複製代碼

瀏覽器輸入 http://localhost:15672/#/ 默認用戶名密碼 guest

安裝後的基本示意圖

可視化界面可模塊功能介紹:


其餘系統安裝請自行網上搜索

幾個端口區別說明

5672:通訊默認端口號 15672:管理控制檯默認端口號 25672:集羣通訊端口號 注意: 阿里雲 ECS 服務器若是出現 RabbitMQ 安裝成功,外網不能訪問是由於安全組的問題沒有開放端口 解決方案

Rabbitmq安裝後的基本命令

如下列舉一些在終端經常使用的操做命令

  • whereis rabbitmq:查看 rabbitmq 安裝位置
  • rabbitmqctl start_app:啓動應用
  • whereis erlang:查看erlang安裝位置
  • rabbitmqctl start_app:啓動應用
  • rabbitmqctl stop_app:關閉應用
  • rabbitmqctl status:節點狀態
  • rabbitmqctl add_user username password:添加用戶
  • rabbitmqctl list_users:列出全部用戶
  • rabbitmqctl delete_user username:刪除用戶
  • rabbitmqctl add_vhost vhostpath:建立虛擬主機
  • rabbitmqctl list_vhosts:列出全部虛擬主機
  • rabbitmqctl list_queues:查看全部隊列
  • rabbitmqctl -p vhostpath purge_queue blue:清除隊列裏消息

注意:以上終端全部命令,須要進入到rabbitmqctl的sbin目錄下執行rabbitmqctl命令纔有用,不然會報錯:

Node.js實現一個簡單的 HelloWorld 消息隊列

畫一張基本的圖,HelloWorld 消息隊列的圖片,把下面幾個概念都畫進去。


看這段代碼前先說幾個概念

  • 生產者 :生產消息的
  • 消費者 :接收消息的
  • 通道 channel:創建鏈接後,會獲取一個 channel 通道
  • exchange :交換機,消息須要先發送到 exchange 交換機,也能夠說是第一步存儲消息的地方(交換機會有不少類型,後面會詳細說)。
  • 消息隊列 : 到達消費者前一刻存儲消息的地方,exchange 交換機會把消息傳遞到此
  • ack回執:收到消息後確認消息已經消費的應答

amqplib模塊

推薦一個 npm 模塊amqplib

Github: github.com/squaremo/am…

$ npm install amqplib
複製代碼

生產者代碼 product.js

const amqp =require('amqplib');

async function product(params) {
    // 1.建立連接對象
    const connect =await amqp.connect('amqp://localhost:5672');
     // 1. 建立連接對象
     const connection = await amqp.connect('amqp://localhost:5672');

     // 2. 獲取通道
     const channel = await connection.createChannel();
 
     // 3. 聲明參數
     const routingKey = 'helloKoalaQueue';
     const msg = 'hello koala';
 
     for (let i=0; i<10000; i++) {
         // 4. 發送消息
         await channel.publish('', routingKey, Buffer.from(`${msg}${i}條消息`));
     }
 
     // 5. 關閉通道
     await channel.close();
     // 6. 關閉鏈接
     await connect.close();
}
product();
複製代碼

生產者代碼解釋與運行結果

執行 node product.js
複製代碼

代碼註釋中已經把基本的流程講解了,可是我剛開始看的時候還有疑問,我想不少小夥伴也會有疑問,說明下:

  • 疑問1

    前面提到過交換機這個名詞,生產者發消息的時候必需要指定一個 exchange,若不指定 exchange(爲空)會默認指向 AMQP default 交換機,AMQP default 路由規則是根據 routingKey 和 mq 上有沒有相同名字的隊列進行匹配路由。上面這段代碼就是默認指定的交換機。不一樣類型交換機詳細講解請往下看。

  • 疑問2

    生產者發送消息後,消息是發送到交換機exchange,可是這時候會建立隊列嗎?

    答案:代碼中咱們聲明的是路由是routingKey,可是它並無建立helloKoalaQueue 消息隊列,消息只會發送到交exchange交換機。 運行代碼後看隊列截圖能夠證實這一點:

  • 說明1

    生產者發送消息後,注意關閉通道和鏈接,只要消息發送成功後,鏈接就能夠關閉了,消費者用任何語言去獲取消息均可以,這也證實了消息隊列優秀解耦的特性

  • 說明2

    能夠屢次執行node product.js生產者代碼,消息會堆積到交換機exchange中,並不會覆蓋,若是已執行過消費者而且確認了對應的消息隊列,消息會從exchange交換機發送到消息隊列,並存入到消息隊列,等待消費者消費

消費者代碼 consumer.js

// 構建消費者
const amqp = require('amqplib');

async function consumer() {
    // 1. 建立連接對象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 獲取通道
    const channel = await connection.createChannel();

    // 3. 聲明參數
    const queueName = 'helloKoalaQueue';
  
    // 4. 聲明隊列,交換機默認爲 AMQP default
    await channel.assertQueue(queueName);

    // 5. 消費
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());
        channel.ack(msg);
    });
}
consumer();
複製代碼

生產者代碼解釋與運行結果

執行 node consumer.js
複製代碼
  • 運行後的執行結果

  • 說明1

    這時候我改變代碼中的隊列名稱爲helloKoalaQueueHaHa,這時候去看Rabbitmq可視化界面中,隊列模塊,建立了這個隊列

    看到這裏再次證實了消息隊列優秀的解耦特性消費者和生產者模型之間沒有任何聯繫,再次建立這個helloKoalaQueueHaHa路由名稱的生產者,消費者也會正常消費,而且會打印消息,你們能夠實際操做試一下。

  • 說明2

    這時候我改變代碼中的隊列名稱爲helloKoalaQueueHaHa,這時候去看Rabbitmq可視化界面中,隊列模塊,建立了這個隊列

    看到這裏又再次證實了消息隊列優秀的解耦特性消費者和生產者模型之間沒有任何聯繫,再次建立這個helloKoalaQueueHaHa路由名稱的生產者,消費者也會正常消費,而且會打印消息,你們能夠實際操做試一下。

如何釋放掉消息隊列

可視化界面中直接刪除掉消息隊列

  1. 訪問http://{rabbitmq安裝IP}:15672,登陸。
  2. 點擊queues,這裏能夠看到你建立的全部的Queue,
  3. 選中某一個Queue,而後會進入一個列表界面,下方有個Delete按鈕,確認 Queue刪除隊列/Purge Message清除消息便可。

弊端: 這樣只能一個隊列一個隊列的刪除,若是隊列中的消息過多就會特別慢。

經過代碼實現消息隊列釋放(刪除)

消息隊列交換機講解

先記住一句話

生產者發消息的時候必須指定一個 exchange,不然消息沒法直接到達消息隊列,Exchange將消息路由到一個或多個Queue中(或者丟棄)

而後開始本章節交換機的講解

若不指定 exchange(爲空)會默認指向 AMQP default 交換機,AMQP default 路由規則是根據 routingKey 和 mq 上有沒有相同名字的隊列進行匹配路由。

交換機的種類

經常使用的四種類型

  • fanout

  • direct

  • topic

  • headers

無論是哪種類型的交換機,都有一個綁定binding的操做,只不過根據不一樣的交換機類型有不一樣的路由綁定策略。不一樣類型作的下圖紅色框框中的事。

fanout(中文翻譯 廣播)

fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中,不須要設置路由鍵。

上圖中,上圖中,生產者(Producter)發送到Exchange(X)的全部消息都會路由到圖中的兩個Queue,並最終被兩個消費者(consumer1與consumer2)消費。

說明:全部消息都會路由到兩個Queue中,是兩個消費者均可以收到所有的徹底相同的消息嗎? 答案是的,兩個消費者收到的隊列消息正常應該是徹底相同的。這種類型經常使用於廣播類型的需求,或者也能夠消費者1記錄日誌 ,消費者2打印日誌

對應代碼實現

生產者:

const amqp = require('amqplib');

async function producer() {
    // 建立連接對象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 獲取通道
    const channel = await connection.createChannel();

    // 聲明參數
    const exchangeName = 'fanout_koala_exchange';
    const routingKey = '';
    const msg = 'hello koala';

    // 交換機
    await channel.assertExchange(exchangeName, 'fanout', {
        durable: true,
    });

    // 發送消息
    await channel.publish(exchangeName, routingKey, Buffer.from(msg));

    // 關閉連接
    await channel.close();
    await connection.close();
}
producer();
複製代碼

消費者:

const amqp = require('amqplib');

async function consumer() {
    // 建立連接對象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 獲取通道
    const channel = await connection.createChannel();

    // 聲明參數
    const exchangeName = 'fanout_koala_exchange';
    const queueName = 'fanout_kaola_queue';
    const routingKey = '';

    // 聲明一個交換機
    await channel.assertExchange(exchangeName, 'fanout', { durable: true });

    // 聲明一個隊列
    await channel.assertQueue(queueName);

    // 綁定關係(隊列、交換機、路由鍵)
    await channel.bindQueue(queueName, exchangeName, routingKey);

    // 消費
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());
        channel.ack(msg);
    });

    console.log('消費端啓動成功!');
}
consumer();
複製代碼

注意:其餘類型代碼已經放到 github,地址:github.com/koala-codin… 歡迎 star 交流。

direct

direct 把消息路由到那些 binding key與 routing key 徹底匹配的 Queue中。

以上圖的配置爲例,咱們以 routingKey=」error」 發送消息到Exchange,則消息會路由到 amq1 和 amq2;若是咱們以 routingKey=」info」 或 routingKey=」warning」 來發送消息,則消息只會路由到 Queue2。若是咱們以其餘 routingKey 發送消息,則消息不會路由到這兩個 Queue 中。

topic

生產者指定 RoutingKey 消息根據消費端指定的隊列經過模糊匹配的方式進行相應轉發,兩種通配符模式: #:可匹配一個或多個關鍵字 *:只能匹配一個關鍵字

headers

header exchange(頭交換機)和主題交換機有點類似,可是不一樣於主題交換機的路由是基於路由鍵,頭交換機的路由值基於消息的 header 數據。 主題交換機路由鍵只有是字符串,而頭交換機能夠是整型和哈希值 header Exchange 類型用的比較少,能夠自行 google 瞭解。

消息隊列的思考與深刻探索

消息隊列實現rpc

(本小段內容來源網上,參考文章說明)

RPC 遠程調用服務端的方法,使用 MQ 能夠實現 RPC 的異步調用,基於 Direct 交換機實現

  1. 客戶端便是生產者又是消費者,向 RPC 請求隊列發送 RPC 調用消息,同時監聽 RPC 響應隊列
  2. 服務端監聽RPC請求隊列,收到消息後執行服務端的方法
  3. 服務端將方法執行後的結果發送到RPC響應隊列

(注意,這裏只是提一下 RPC 這個知識,由於單單一個RPC一篇文章都不必定說說完,有興趣的能夠用隊列嘗試一下RPC)

是否有消息持久化的必要?

消息隊列是存在內存中的,若是出現問題掛掉,消息隊列中的消息會丟失。因此對於一些需求很是有持久化的必要!RabbitMQ 能夠開啓持久化。不一樣開發語言均可以設置持久化參數。

這裏以Node.js爲例子,其餘語言能夠自行搜索

await channel.assertExchange(exchangeName, 'direct', { durable: true });
    // 注意其中的{ durable: true },這事對交換機持久化,還有其餘的幾種持久化方式
複製代碼

同時推薦一篇不錯的寫持久化的文章: juejin.im/post/5d6f6b…

消費者完成後是否有消息應答的必要?

消息應答簡單的解釋就是消費者完成了消費後,通知一下消息隊列。

我以爲這個配置是有必要打開的,消費者完成消息隊列中的任務,消費者可能中途失敗或者掛掉,一旦 RabbitMQ 發送一個消息給消費者而後便迅速將該消息從消息隊列內存中移除,這種狀況下,消費者對應工做進程失敗或者掛掉後,那該進程正在處理的消息也將丟失。並且,也將丟失全部發送給該進程的未被處理的消息。

爲了確保消息永不丟失,RabbitMQ 支持消息應答機制。當消息被接受,處理以後一條應答便會從消費者回傳至發送方,而後RabbitMQ將其刪除。

若是某個消費者掛掉(信道、連接關閉或者 tcp 連接丟失)且沒有發送 ack 應答,RabbitMQ 會認爲該消息沒有被處理徹底而後會將其從新放置到隊列中。經過這種方式你就能夠確保消息永不丟失,甚至某個工做進程偶然掛掉的狀況。

默認狀況下消息應答是關閉的。是時候使用 false(auto-ack配置項)參數將其開啓了

這裏以 Node.js 爲例子,其餘語言能夠自行搜索

// 消費者消費時候的代碼
await channel.consume(queueName, msg => {
    console.log('koala:', msg.content.toString());
    //... 這裏能夠放業務邏輯處理的代碼,消費者完成後發送回執應答
    channel.ack(msg);// 消息應答
}, { noAck: false });
複製代碼

如何實現公平調度?

能夠將prefetch count項的值配置爲1,這將會指示 RabbitMQ 在同一時間不要發送超過一條消息給每一個消費者。換句話說,直到消息被處理和應答以前都不會發送給該消費者任何消息。取而代之的是,它將會發送消息至下一個比較閒的消費者或工做進程。

這裏以 Node.js 爲例子,amqplib 庫對於限流實現提供的接口方法 prefetch。

prefetch 參數說明

  • count:每次推送給消費端 N 條消息數目,若是這 N 條消息沒有被ack,生產端將不會再次推送直到這 N 條消息被消費。
  • global:在哪一個級別上作限制,ture 爲 channel 上作限制,false 爲消費端上作限制,默認爲 false。
// 建立消費者的時候 限流參數設置
await channel.prefetch(1, false);
複製代碼

如何實現一個交換機給多個消費者依次發送消息,選擇那種交換機?

若是一個生產者,兩個消費者,發放消息,我想要的隊列先給消費者1發,發完消費者1發消費者2,這樣有順序的交互發送,應該如今哪種交換機呢?注意是交互,看完以後想一下?還有消費者完成後有沒有手動回調消息隊列完成的必要?消息持久化有必要沒,持久化有什麼好處?

(看完消息隊列的消息傳遞,你會有疑問管道中的消息(生產者)是怎麼被消費者消費的 放入隊列,而後從隊列被取出)

消息隊列應用場景

  1. 雙十一商品秒殺/搶票功能實現

    咱們在雙11的時候,當咱們凌晨大量的秒殺和搶購商品,而後去結算的時候,就會發現,界面會提醒咱們,讓咱們稍等,以及一些友好的圖片文字提醒。而不是像前幾年的時代,動不動就頁面卡死,報錯等來呈現給用戶。

    用一張圖來解釋消息隊列在秒殺搶票等場景的使用: (說明:往下看以前,若是你作過電商類秒殺,能夠想一想你是怎麼實現的,咱們能夠一塊兒討論哦。這裏只是想說下消息隊列的做用,並非最終優化的結果,好比用redis控制總緩存等)

    這裏在生成訂單時候,不須要直接操做數據庫 IO ,預扣庫存。先扣除了庫存,保證不超賣,而後異步生成用戶訂單,這裏用到一次即時消費隊列,這樣響應給用戶的速度就會快不少;並且還要保證很多賣,用戶拿到了訂單,不支付怎麼辦?咱們都知道如今訂單都有有效期,再使用一個消息隊列,用於判斷訂單支付超時,好比說用戶五分鐘內不支付,訂單就失效了,訂單一旦失效,就會加入新的庫存。這也是如今不少網上零售企業保證商品很多賣採用的方案。訂單量比較少的狀況下,生成訂單很是快,用戶幾乎不用排隊。

  2. 積分兌換(積分可用於多平臺)

    積分兌換模塊,有一個公司多個部門都要用到這個模塊,這時候就能夠經過消息隊列解耦這個特性來實現。 各部門系統作各部門的事,可是他們均可以用這個積分系統進行商品的兌換等。其餘模塊與積分模塊徹底解耦。

  3. 發送郵件,用戶大數據分析等 同步變異步功能實現

    這個功能要說的比較多,從一個平臺的用戶註冊開始。

    • 用戶註冊
    • 用戶註冊選擇幾個興趣標籤,這時候須要根據用戶的屬性,用戶分析,計算出推薦內容
    • 註冊後可能須要發送郵件給用戶
    • 發送給用戶一個包含操做指南的系統通知
    • 等等

    正常狀況註冊,不出現高併發

    對於用戶來講,他就是想註冊用一下這個軟件,只要服務端將他的帳戶信息存到數據庫中他即可以登陸上去作他想作的事情了。用戶並不care這些事,服務端就能夠把其餘的操做放入對應的消息隊列中而後立刻返回用戶結果,由消息隊列異步的進行這些操做。

    假若有大量的用戶註冊,發生了高併發

    郵件接口承受不住,或是分析信息時的大量計算使 cpu 滿載,這將會出現雖然用戶數據記錄很快的添加到數據庫中了,可是卻卡在發郵件或分析信息時的狀況,致使請求的響應時間大幅增加,甚至出現超時,這就有點不划算了。面對這種狀況通常也是將這些操做放入消息隊列(生產者消費者模型),消息隊列慢慢的進行處理,同時能夠很快的完成註冊請求,不會影響用戶使用其餘功能。

  4. 基於RabbitMQ的Node.js與Phython或其餘語言實現通訊

    這裏也是利用了 RabbitMQ 的解耦特性,不只僅能夠與 Phython,還能夠與其餘不少語言通訊,就不具體說了。

總結

親,別隻看,你試試呀!直接開啓服務,裝個 RabbitMQ,挺有意思的,就算一個 HelloWorld 也能嘗試出不少內容。並且本文說的不少內容均可以用 redis 來實現,也能夠去看下個人 redis 文章。順便說一句設計模式和數據結構是兩個好東西,愈來愈能感受到。

參考文章

www.cnblogs.com/baidawei/p/… www.sojson.com/blog/48.htm… www.zhihu.com/question/34… bbs.csdn.net/topics/3921… www.imooc.com/article/293… mp.weixin.qq.com/s/wTkwJXlNr…

Node系列原創文章

深刻理解Node.js 中的進程與線程

想學Node.js,stream先有必要搞清楚

require時,exports和module.exports的區別你真的懂嗎

源碼解讀一文完全搞懂Events模塊

Node.js 高級進階之 fs 文件模塊學習

關注我

  • 歡迎加我微信(coder_qi),拉你進技術羣,長期交流學習...
  • 歡迎關注「程序員成長指北」,一個用心幫助你成長的公衆號...
相關文章
相關標籤/搜索