消息中間件NSQ深刻與實踐

1. 介紹

最近在研究一些消息中間件,經常使用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一個基於Go語言的分佈式實時消息平臺,它基於MIT開源協議發佈,由bitly公司開源出來的一款簡單易用的消息中間件。 官方和第三方還爲NSQ開發了衆多客戶端功能庫,如官方提供的基於HTTP的nsqd、Go客戶端go-nsq、Python客戶端pynsq、基於Node.js的JavaScript客戶端nsqjs、異步C客戶端libnsq、Java客戶端nsq-java以及基於各類語言的衆多第三方客戶端功能庫。java

1.1 Features

1). DistributedNSQ提供了分佈式的,去中心化,且沒有單點故障的拓撲結構,穩定的消息傳輸發佈保障,可以具備高容錯和HA(高可用)特性。2). Scalable易於擴展NSQ支持水平擴展,沒有中心化的brokers。內置的發現服務簡化了在集羣中增長節點。同時支持pub-sub和load-balanced 的消息分發。3). Ops FriendlyNSQ很是容易配置和部署,生來就綁定了一個管理界面。二進制包沒有運行時依賴。官方有Docker image。4.Integrated高度集成官方的 Go 和 Python庫都有提供。並且爲大多數語言提供了庫。node

1.2 組件

  • Topic :一個topic就是程序發佈消息的一個邏輯鍵,當程序第一次發佈消息時就會建立topic。 sql

  • Channels :channel與消費者相關,是消費者之間的負載均衡,channel在某種意義上來講是一個「隊列」。每當一個發佈者發送一條消息到一個topic,消息會被複制到全部消費者鏈接的channel上,消費者經過這個特殊的channel讀取消息,實際上,在消費者第一次訂閱時就會建立channel。Channel會將消息進行排列,若是沒有消費者讀取消息,消息首先會在內存中排隊,當量太大時就會被保存到磁盤中。 服務器

  • Messages:消息構成了咱們數據流的中堅力量,消費者能夠選擇結束消息,代表它們正在被正常處理,或者從新將他們排隊待到後面再進行處理。每一個消息包含傳遞嘗試的次數,當消息傳遞超過必定的閥值次數時,咱們應該放棄這些消息,或者做爲額外消息進行處理。 markdown

  • nsqd:nsqd 是一個守護進程,負責接收,排隊,投遞消息給客戶端。它能夠獨立運行,不過一般它是由 nsqlookupd 實例所在集羣配置的(它在這能聲明 topics 和 channels,以便你們能找到)。 網絡

  • nsqlookupd:nsqlookupd 是守護進程負責管理拓撲信息。客戶端經過查詢 nsqlookupd 來發現指定話題(topic)的生產者,而且 nsqd 節點廣播話題(topic)和通道(channel)信息。有兩個接口:TCP 接口,nsqd 用它來廣播。HTTP 接口,客戶端用它來發現和管理。 架構

  • nsqadmin:nsqadmin 是一套 WEB UI,用來聚集集羣的實時統計,並執行不一樣的管理任務。 經常使用工具類: 負載均衡

  • nsq_to _file:消費指定的話題(topic)/通道(channel),並寫到文件中,有選擇的滾動和/或壓縮文件。 curl

  • nsq_to _http:消費指定的話題(topic)/通道(channel)和執行 HTTP requests (GET/POST) 到指定的端點。 異步

  • nsq_to _nsq:消費者指定的話題/通道和重發布消息到目的地 nsqd 經過 TCP。

1.3 拓撲結構

NSQ推薦經過他們相應的nsqd實例使用協同定位發佈者,這意味着即便面對網絡分區,消息也會被保存在本地,直到它們被一個消費者讀取。更重要的是,發佈者沒必要去發現其餘的nsqd節點,他們老是能夠向本地實例發佈消息。

首先,一個發佈者向它的本地nsqd發送消息,要作到這點,首先要先打開一個鏈接,而後發送一個包含topic和消息主體的發佈命令,在這種狀況下,咱們將消息發佈到事件topic上以分散到咱們不一樣的worker中。 事件topic會複製這些消息而且在每個鏈接topic的channel上進行排隊,在咱們的案例中,有三個channel,它們其中之一做爲檔案channel。消費者會獲取這些消息而且上傳到S3。

每一個channel的消息都會進行排隊,直到一個worker把他們消費,若是此隊列超出了內存限制,消息將會被寫入到磁盤中。Nsqd節點首先會向nsqlookup廣播他們的位置信息,一旦它們註冊成功,worker將會從nsqlookup服務器節點上發現全部包含事件topic的nsqd節點。

而後每一個worker向每一個nsqd主機進行訂閱操做,用於代表worker已經準備好接受消息了。這裏咱們不須要一個完整的連通圖,但咱們必需要保證每一個單獨的nsqd實例擁有足夠的消費者去消費它們的消息,不然channel會被隊列堆着。

2. Internals

2.1 消息傳遞擔保

NSQ 保證消息將交付至少一次,雖然消息多是重複的。消費者應該關注到這一點,刪除重複數據或執行idempotent等操做。 這個擔保是做爲協議和工做流的一部分,工做原理以下(假設客戶端成功鏈接並訂閱一個話題): 1)客戶表示已經準備好接收消息 2)NSQ 發送一條消息,並暫時將數據存儲在本地(在 re-queue 或 timeout) 3)客戶端回覆 FIN(結束)或 REQ(從新排隊)分別指示成功或失敗。若是客戶端沒有回覆, NSQ 會在設定的時間超時,自動從新排隊消息 這確保了消息丟失惟一可能的狀況是不正常結束 nsqd 進程。在這種狀況下,這是在內存中的任何信息(或任何緩衝未刷新到磁盤)都將丟失。 如何防止消息丟失是最重要的,即便是這個意外狀況能夠獲得緩解。一種解決方案是構成冗餘 nsqd對(在不一樣的主機上)接收消息的相同部分的副本。由於你實現的消費者是冪等的,以兩倍時間處理這些消息不會對下游形成影響,並使得系統可以承受任何單一節點故障而不會丟失信息。

2.2 簡化配置和管理

單個 nsqd 實例被設計成能夠同時處理多個數據流。流被稱爲「話題」和話題有 1 個或多個「通道」。每一個通道都接收到一個話題中全部消息的拷貝。在實踐中,一個通道映射到下行服務消費一個話題。 話題和通道都沒有預先配置。話題由第一次發佈消息到命名的話題或第一次經過訂閱一個命名話題來建立。通道被第一次訂閱到指定的通道建立。話題和通道的全部緩衝的數據相互獨立,防止緩慢消費者形成對其餘通道的積壓(一樣適用於話題級別)。 一個通道通常會有多個客戶端鏈接。假設全部已鏈接的客戶端處於準備接收消息的狀態,每一個消息將被傳遞到一個隨機的客戶端。nsqlookupd,它提供了一個目錄服務,消費者能夠查找到提供他們感興趣訂閱話題的 nsqd 地址 。在配置方面,把消費者與生產者解耦開(它們都分別只須要知道哪裏去鏈接 nsqlookupd 的共同實例,而不是對方),下降複雜性和維護。 在更底的層面,每一個 nsqd 有一個與 nsqlookupd 的長期 TCP 鏈接,按期推進其狀態。這個數據被 nsqlookupd 用於給消費者通知 nsqd 地址。對於消費者來講,一個暴露的 HTTP /lookup 接口用於輪詢。爲話題引入一個新的消費者,只需啓動一個配置了 nsqlookup 實例地址的 NSQ 客戶端。無需爲添加任何新的消費者或生產者更改配置,大大下降了開銷和複雜性。

2.3 消除單點故障

NSQ被設計以分佈的方式被使用。nsqd 客戶端(經過 TCP )鏈接到指定話題的全部生產者實例。沒有中間人,沒有消息代理,也沒有單點故障。 這種拓撲結構消除單鏈,聚合,反饋。相反,你的消費者直接訪問全部生產者。從技術上講,哪一個客戶端鏈接到哪一個 NSQ 不重要,只要有足夠的消費者鏈接到全部生產者,以知足大量的消息,保證全部東西最終將被處理。對於 nsqlookupd,高可用性是經過運行多個實例來實現。他們不直接相互通訊和數據被認爲是最終一致。消費者輪詢全部的配置的 nsqlookupd 實例和合並 response。失敗的,沒法訪問的,或以其餘方式故障的節點不會讓系統陷於停頓。

2.4 效率

對於數據的協議,經過推送數據到客戶端最大限度地提升性能和吞吐量的,而不是等待客戶端拉數據。這個概念,稱之爲 RDY 狀態,基本上是客戶端流量控制的一種形式。 當客戶端鏈接到 nsqd 和並訂閱到一個通道時,它被放置在一個 RDY 爲 0 狀態。這意味着,尚未信息被髮送到客戶端。當客戶端已準備好接收消息發送,更新它的命令 RDY 狀態到它準備處理的數量,好比 100。無需任何額外的指令,當 100 條消息可用時,將被傳遞到客戶端(服務器端爲那個客戶端每次遞減 RDY 計數)。客戶端庫的被設計成在 RDY 數達到配置 max-in-flight 的 25% 發送一個命令來更新 RDY 計數(並適當考慮鏈接到多個 nsqd 狀況下,適當地分配)。

2.5 心跳和超時

NSQ 的 TCP 協議是面向 push 的。在創建鏈接,握手,和訂閱後,消費者被放置在一個爲 0 的 RDY 狀態。當消費者準備好接收消息,它更新的 RDY 狀態到準備接收消息的數量。NSQ 客戶端庫不斷在幕後管理,消息控制流的結果。每隔一段時間,nsqd 將發送一個心跳線鏈接。客戶端能夠配置心跳之間的間隔,但 nsqd 會期待一個迴應在它發送下一個心掉以前。 組合應用級別的心跳和 RDY 狀態,避免頭阻塞現象,也可能使心跳無用(即,若是消費者是在後面的處理消息流的接收緩衝區中,操做系統將被填滿,堵心跳)爲了保證進度,全部的網絡 IO 時間上限勢必與配置的心跳間隔相關聯。這意味着,你能夠從字面上拔掉之間的網絡鏈接 nsqd 和消費者,它會檢測並正確處理錯誤。當檢測到一個致命錯誤,客戶端鏈接被強制關閉。在傳輸中的消息會超時而從新排隊等待傳遞到另外一個消費者。最後,錯誤會被記錄並累計到各類內部指標。

2.6 分佈式

由於NSQ沒有在守護程序之間共享信息,因此它從一開始就是爲了分佈式操做而生。個別的機器能夠隨便宕機隨便啓動而不會影響到系統的其他部分,消息發佈者能夠在本地發佈,即便面對網絡分區。 這種「分佈式優先」的設計理念意味着NSQ基本上能夠永遠不斷地擴展,須要更高的吞吐量?那就添加更多的nsqd吧。惟一的共享狀態就是保存在lookup節點上,甚至它們不須要全局視圖,配置某些nsqd註冊到某些lookup節點上這是很簡單的配置,惟一關鍵的地方就是消費者能夠經過lookup節點獲取全部完整的節點集。清晰的故障事件——NSQ在組件內創建了一套明確關於可能致使故障的的故障權衡機制,這對消息傳遞和恢復都有意義。雖然它們可能不像Kafka系統那樣提供嚴格的保證級別,但NSQ簡單的操做使故障狀況很是明顯。

2.7 no replication

不像其餘的隊列組件,NSQ並無提供任何形式的複製和集羣,也正是這點讓它可以如此簡單地運行,但它確實對於一些高保證性高可靠性的消息發佈沒有足夠的保證。咱們能夠經過下降文件同步的時間來部分避免,只需經過一個標誌配置,經過EBS支持咱們的隊列。可是這樣仍然存在一個消息被髮布後立刻死亡,丟失了有效的寫入的狀況。

2.8 沒有嚴格的順序

雖然Kafka由一個有序的日誌構成,但NSQ不是。消息能夠在任什麼時候間以任何順序進入隊列。在咱們使用的案例中,這一般沒有關係,由於全部的數據都被加上了時間戳,但它並不適合須要嚴格順序的狀況。

2.9 無數據重複刪除功能

NSQ對於超時系統,它使用了心跳檢測機制去測試消費者是否存活仍是死亡。不少緣由會致使咱們的consumer沒法完成心跳檢測,因此在consumer中必須有一個單獨的步驟確保冪等性。

3. 實踐安裝過程

本文將nsq集羣具體的安裝過程略去,你們能夠自行參考官網,比較簡單。這部分介紹下筆者實驗的拓撲,以及nsqadmin的相關信息。

3.1 拓撲結構

實驗採用3臺NSQD服務,2臺LOOKUPD服務。採用官方推薦的拓撲,消息發佈的服務和NSQD在一臺主機。一共5臺機器。NSQ基本沒有配置文件,配置經過命令行指定參數。主要命令以下:LOOKUPD命令

   
  1. bin /nsqlookupd

複製代碼bin /nsqlookupd

NSQD命令

   
  1. bin/nsqd --lookupd-tcp-address=172.16.30.254:4160 -broadcast-address=172.16.30.254

複製代碼bin/nsqd --lookupd-tcp-address=172.16.30.254:4160 -broadcast-address=172.16.30.254
   
  1. bin/nsqadmin --lookupd-http-address=172.16.30.254:4161

複製代碼bin/nsqadmin --lookupd-http-address=172.16.30.254:4161

工具類,消費後存儲到本地文件。

   
  1. bin/nsq_to_file --topic=newtest --channel=test --output-dir=/tmp --lookupd-http-address=172.16.30.254:4161

複製代碼bin/nsq_to_file --topic=newtest --channel=test --output-dir=/tmp --lookupd-http-address=172.16.30.254:4161

發佈一條消息

   
  1. curl -d 'hello world 5' 'http://172.16.30.254:4151/put?topic=test'

複製代碼curl -d 'hello world 5' 'http://172.16.30.254:4151/put?topic=test'

3.2 nsqadmin

對Streams的詳細信息進行查看,包括NSQD節點,具體的channel,隊列中的消息數,鏈接數等信息。

列出全部的NSQD節點:

消息的統計:

lookup主機的列表:

4. 總結

NSQ基本核心就是簡單性,是一個簡單的隊列,這意味着它很容易進行故障推理和很容易發現bug。消費者能夠自行處理故障事件而不會影響系統剩下的其他部分。

事實上,簡單性是咱們決定使用NSQ的首要因素,這方便與咱們的許多其餘軟件一塊兒維護,經過引入隊列使咱們獲得了堪稱完美的表現,經過隊列甚至讓咱們增長了幾個數量級的吞吐量。愈來愈多的consumer須要一套嚴格可靠性和順序性保障,這已經超過了NSQ提供的簡單功能。

結合咱們的業務系統來看,對於咱們所須要傳輸的發票消息,相對比較敏感,沒法容忍某個nsqd宕機,或者磁盤沒法使用的狀況,該節點堆積的消息沒法找回。這是咱們沒有選擇該消息中間件的主要緣由。簡單性和可靠性彷佛並不能徹底知足。相比Kafka,ops肩負起更多負責的運營。另外一方面,它擁有一個可複製的、有序的日誌能夠提供給咱們更好的服務。但對於其餘適合NSQ的consumer,它爲咱們服務的至關好,咱們期待着繼續鞏固它的堅實的基礎。

歡迎關注個人公衆號


ps: 本文首發於筆者的csdn博客,此處將其加入我的博客。

參考

  1. NSQ:分佈式的實時消息平臺

  2. NSQ - NYC Golang Meetup

  3. NSQ Docs

相關文章
相關標籤/搜索