How we redesign the NSQ-NSQ重塑之客戶端

1、overview

有讚的自研版 NSQ 在高可用性以及負載均衡方面進行了改造,自研版的 nsqd 中引入了數據分區以及副本,副本保存在不一樣的 nsqd 上,達到容災目的。此外,自研版 NSQ 在原有 Protocol Spec 基礎上進行了拓展,支持基於分區的消息生產、消費,以及基於消息分區的有序消費,以及消息追蹤功能。html

爲了充分支持自研版 NSQ 新功能,在要構建 NSQ client 時,須要在兼容原版 NSQ 的基礎上,實現額外的設計。本文做爲《Building Client Libraries》的拓展,爲構建有贊自研版 NSQ client 提供指引。git

參考 NSQ 官方的構造 client 指南的結構,接下來的文章分爲以下部分:github

1 workflow 及配置 2 nsqd 發現服務 3 nsqd 建連 4 發送/接收消息 5 順序消費算法

本文根據有贊自研版 nsq 的新特性,對 nsq 文檔[1]中構建 nsq client 的專題進行補充。在閱讀《Building Client Libraries》的基礎上閱讀本文,更有助於理解。sql

2、workflow 及配置

經過一張圖,瀏覽一下 nsq client 的工做流程。bash

client 做爲消息 producer 或者 consumer 啓動後,負責 lookup 的功能經過 nsqlookupd 進行 nsqd 服務發現。對於服務發現返回的 nsqd 節點,client 進行建連操做以及異常處理。nsq 鏈接創建後,producer 進行消息發送,consumer 則監聽端口接收消息。同時,client 負責響應來自 nsqd 的心跳,以保持鏈接不被斷開。在和 nsqd 消息通訊過程當中,client 經過 lookup 發現,持續更新 nsq 集羣中 topic 以及節點最新信息,並對鏈接作相應更新操做。當消息發送/消費結束時,client 負責關閉相應 nsqd 鏈接。文章在接下來討論這一流程中的關鍵步驟,對相應功能的實現作更詳細的說明。服務器

2.1 配置 client

自研版 NSQ 改造自開源版 NSQ,繼承了開源版 NSQ 中的配置。[^1]中 Configuration 段落的內容適用於有贊自研版。惟一須要指出的地方是,開源版 nsq 將使用 nsqlookupd 做爲 nsqd 服務發現的一個可選項,基於配置靈活性的考量,開源版 NSQ 容許 client 經過 nsqd 的地址直接創建鏈接,自研版 NSQ 因爲支持動態負載,nsqd 之間的主從關係在集羣中發生切換的時候,須要依賴自研版的 nsqlookupd 將變動信息反饋給 nsq client。基於此,使用 nsqlookupd 進行服務發現,在自研版 NSQ 中是一個「標配」。咱們也將在下一節中對服務發現過程作詳細的說明。負載均衡

3、nsqd 發現服務

開源版中,提供 nsqd 發現服務做爲 nsqlookupd 的重要功能,用於向消息的 consumer/producer 提供可消費/生產的 nsqd 節點。上文提到,區別於開源版本,自研版的 nsqlookupd 將做爲服務發現的惟一入口。nsq client 負責調用 nsqlookupd 的 lookup 服務,並經過 poll,定時更新消息在 nsq 集羣上的讀寫分佈信息。根據 lookup 返回的結果,nsq client 對 nsqd 進行建連。異步

在自研版中訪問 lookup 服務的方式和開源版同樣簡單直接,向 nsqlookupd 的 http 端口 GET:/lookup?topic={topic_name} 便可完成。不一樣之處在於,自研版本 NSQ 的 lookup 服務中支持兩種新的查詢參數:socket

GET:/lookup?topic={topic_name}&access={r or w}&metainto=true

其中:access 用於區分 nsq client 的生產/消費請求。表明 producer 的 lookup 查詢中的參數爲 access=w,consumer 的 lookup 查詢爲 access=r。metainfo 參數用於提示 nsqlookup 是否返回查詢 topic 的分區元數據,包括查詢 topic 的分區總數,以及副本個數。順序消費時,prodcuer 經過返回的分區元數據來判斷 lookup 響應中返回的分區數是否完整,順序消費和生產的詳細部分咱們將在發送消息章節中討論。client 在訪問 lookup 服務時,根據 prodcuer&consumer 角色的差異可使用兩類查詢參數

  • Producer GET:/lookup?topic=test&access=w&metainfo=true

  • Consumer GET:/lookup?topic=test&access=r

在設計上,因爲 metainfo 提供的信息是爲 producer 在順序消費場景下的生產,爲了減小 nsqlookupd 服務壓力,表明 consumer 的 lookup 查詢無需攜帶 metainfo 參數。自研版 lookup 的響應和開原版本兼容:

{    "status_code":200,    "status_txt":"OK",    "data":{        "channels":[            "BaseConsumer"        ],        "meta":{            "partition_num":1,            "replica":1        },        "partitions":{            "0":{                "id":"XX.XX.XX.XX:33122",                "remote_address":"X.X.X.X:33122",                "hostname":"host-name",                "broadcast_address":"XX.XX.XX.XX",                "tcp_port":4150,                "http_port":4151,                "version":"0.3.7-HA.1.5.4.1",                "distributed_id":"XX.XX.XX.XX:4250:4150:338437"            }        },        "producers":[            {                "id":"XX.XX.XX.XX:33122",                "remote_address":"XX.XX.XX.XX:33122",                "hostname":"host-name",                "broadcast_address":"XX.XX.XX.XX",                "tcp_port":4150,                "http_port":4151,                "version":"0.3.7-HA.1.5.4.1",                "distributed_id":"XX.XX.XX.XX:4250:4150:338437"            }        ]    }}複製代碼

3.1 lookup 流程

client 的 lookup 流程以下:

自研版 nsqlookupd 添加了 listlookup 服務,用於發現接入集羣中的 nsqlookupd。nsq client 經過訪問一個已配置的 nsqlookupd 地址,發現全部 nsqlookupd。得到 nsqlookupd 後,nsq client 遍歷獲得的 nsqlookupd 地址,進行 lookup 節點發現。nsq client 合併遍歷得到的節點信息並返回。nsq client 在訪問 listlookup 以及 lookup 服務失敗的場景下(如,訪問超時),nsq client 能夠嘗試重試。lookup 超過最大重試次數後依然失敗的狀況,nsq client 能夠下降訪問該 nsqlookupd 的優先級。client 按期查詢 lookup,保證 client 更新鏈接到有效的 nsqd。

4、nsqd 建連

自研版 nsqd 在建連時遵守[^1]中描述的建連步驟,經過 lookup 返回結果中 partitions 字段中的{broadcastaddress}:{tcpport}創建 TCP 鏈接。自研版中,一個獨立的 TCP 鏈接對應一個 topic 的一個分區。consumer 在建連的時候須要創建與分區數量對應的 TCP 鏈接,以接收到全部的分區中的消息。client 的基本建連過程依然遵照[^1]中的 4 步:

  • client 發送 magic 標誌

  • client 發送 IDENTIFY command 並處理返回結果

  • client 發送 SUB command (指定目標的 topic 以及分區), 並處理返回結果

  • client 發送 RDY 命令

client 經過自研版 NSQ 中拓展的SUB命令,鏈接到指定 topic 的指定分區。

SUB <topic> <channel_name> <topic_partition> \ntopic_name -- 消費的 topic 名稱topic_partition -- topic 的合法分區名channel_name -- channel複製代碼

Response:

OK複製代碼

Error response:

E_INVALIDE_BAD_TOPICE_BAD_CHANNELE_SUB_ORDER_IS_MUST  當前 topic 只支持順序消費複製代碼

client 在建連過程當中,向 lookup 返回的每個 nsqd partition 發送該命令。SUB 命令的出錯響應中,自研版本 NSQ 中加入了最後一個錯誤代碼,當 client SUB 一個配置爲順序消費的 topic 時,client 會收到該錯誤。相應的攜帶分區號的 PUB 命令格式爲:

PUB <topic_name> <topic_partition>\n[ 4-byte size in bytes ][ N-byte binary data ]topic -- 合法 topic 名稱partitionId -- 合法分區名複製代碼

Response:

OK複製代碼

Error response:

E_INVALIDE_BAD_TOPICE_BAD_MESSAGEE_PUB_FAILEDE_FAILED_ON_NOT_LEADER    當前嘗試寫入的 nsqd 節點在副本中不是 leaderE_FAILED_ON_NOT_WRITABLE  當前的 nqd 節點禁止寫入消息E_TOPIC_NOT_EXIST         向當前鏈接中寫入不存在的 topic 消息複製代碼

自研版本 NSQ 中加入了最後三個錯誤代碼,分別用於提示當前嘗試寫入的 nsqd 節點在副本中不是 leader,以及當前的 nqd 節點禁止寫入。client 在接收到錯誤的時候,應該直接關閉 TCP 鏈接,等待 lookup 定時查詢更新 nsqd 節點信息,或者馬上發起 lookup 查詢。若是沒有傳入 partition id, 服務端會選擇默認的 partition. 客戶端能夠選擇 topic 的 partition 發送算法,能夠根據負載狀況選擇一個 partition 發送,也能夠固定的 Key 發送到固定的 partition。client 在消費時,能夠指定只消費一個 partition 仍是消費全部 partition。每一個 partition 會創建獨立的 socket 鏈接接收消息。client 須要處理多個 partition 的 channel 消費問題。

5、發送/接收消息

主要討論生產者和消費者對消息的處理。

5.1 生產者發送消息

client 的消息生產流程以下:

建連過程當中,對於消息生產者,client 在接收到對於 IDENTITY 的響應以後,使用 PUB 命令向鏈接發送消息。client 在 PUB 命令後須要解析收到的響應,出現 Error response 的狀況下,client 應當關閉當前鏈接,並向 lookup 服務從新獲取 nsqd 節點信息。client 能夠將 nsqd 鏈接經過池化,在生產時進行復用,鏈接池中指定 topic 的鏈接爲空時,client 將初始化該鏈接,因失敗而關閉的鏈接將不返回鏈接池。

5.2 消費者接收消息

client 的消費流程以下:

client 在和 nsqd 創建鏈接後,使用異步方式消費消息。nsqd 定時向鏈接中發送心跳相應,用於檢查 client 的活性。client 在收到心跳幀後,向 nsqd 迴應 NOP。當 nsqd 連續兩次發送心跳未能收到迴應後,nsqd 鏈接將在服務端關閉。參考[^1]中消息處理章節的相關內容,client 在消費消息的時候有以下情景:

  1. 消息被順利消費的狀況下,FIN 經過 nsqd 鏈接發送;

  2. 消息的消費失敗的狀況下,client 須要通知 nsqd 該條消息消費失敗。client 經過 REQ 命令攜帶延時參數,通知 nsqd 將消息重發。若是消費者沒有指定延時參數,client 能夠根據消息的 attempt 參數,計算延時參數。client 能夠容許消費者指定當某條消息的消費次數超過指定的次數,client 是否能夠將該條消息丟棄,或者從新發送至 nsqd 消息隊列尾部去後,再 FIN;

  3. 消息的消費須要更多的時間,client 發送 TOUCH 命令,重置 nsqd 的超時時間。TOUCH 命令能夠重複發送,直到消息超時,或者 client 發送 FIN,REQ 命令。是否發送 TOUCH 命令,決定權應當由消費者決定,client 自己不該當使用 TOUCH;

  4. 下發至 client 的消息超時,此時 nsqd 將重發該消息。此種狀況下,client 可能重複消費到消息。消費者在保證消息消費的冪等性的同時,對於重發消息 client 應當可以正常消費;

5.3 消息 ACK

自研版本 NSQ 中,對原有的消息 ID 進行了改造,自研版本中的消息 ID 長度依然爲 16 字節:

[8-byte internal id][8-byte trace id]複製代碼

高位開始的 16 字節,是自研版 NSQ 的內部 ID,後 16 字節是該條消息的 TraceID,用於 client 實現消息追蹤功能。

6、順序消費

基於 topic 分區的消息順序生產消費,是自研版 NSQ 中的新功能。自研版 NSQ 容許生產者經過 shardingId 映射將消息發送到固定 topic 分區。創建鏈接時,消費者在發送 IDENTIFY 後,經過新的 SubOrder 命令鏈接到順序消費 topic。順序消費時,nsqd 的分區在接收到來自 client 的 FIN 確認以前,將不會推送下一條消息。

在 nsqd 配置爲順序消費的 topic 須要 nsq client 經過 SubOrder 進行消費。向順序消費 topic 發送 Sub 命令將會收到錯誤信息,同時鏈接將被關閉。

E_SUB_ORDER_IS_MUST複製代碼


client 在啓動消費者前,能夠經過配置指導 client 在 SUB 以及 SUBORDER 命令之間切換,或者基於 topic 進行切換。SUBORDER 在 TCP 協議的格式以下:


SUB_ORDER  <topic_name> <channel_name> <topic_partition>\ntopic_name -- 進行順序消息的 topic 名稱channel_name -- channel 名稱topic_partition -- topic 分區名稱複製代碼

NSQ 新集羣中,消息的順序生產/消費基於 topic 分區。消息生產者經過指定 shardingID,向目標 partition 發送消息;消息消費者經過指定分區 ID,從指定分區接收消息。Client 進行順序消費時時,TCP 鏈接的 RDY 值至關於將被 NSQ 服務端指定爲 1,在當前消息 Finish 以前不會推送下一條。NSQ 服務器端 topic 進行強制消費配置,當消費場景中日誌出現錯誤消息時,說明該 topic 必須進行順序消費。

順序消費的場景由消息生產這個以及消息消費者兩方的操做完成:

  • 消息生產者經過 SUB_ORDER 命令,鏈接到 Topic 所在的全部 NSQd 分區;

  • 消息消費者經過設置 shardingID 映射,將消息發送到指定 NSQd 的指定 partition,進行生產;

6.1 順序消費場景下的消息生產

client 在進行消息生產時,將攜帶有相同 shardingID 的消息投遞到同一分區中,分區的消息則經過 lookup 服務發現。做爲生產者,client 在 lookup 請求中包含 metainfo 參數,用於得到 topic 的分區總數。client 負責將 shardingID 映射到 topic 分區,同時保證映射的一致性:具備相同的 shardindID 的消息始終被投遞到固定的分區鏈接中。當 shardingID 映射到的 topic 分區對於 client 不可達時,client 結束髮送,告知生產者返回錯誤信息,並當即更新 lookup。

6.2 順序消費場景下的消息消費

client 在進行消息消費時,經過 SUB_ORDER 命令鏈接到 topic 全部分區上。順序消費的場景中,當某條消費的消息超時或 REQUEUE 後,nsq 將會當即將該條消息下發。消息超時或者超過指定重試次數後的策略由消費者指定,client 能夠對於重複消費的消息打印日誌或者告警。

7、消息追蹤功能的實現

新版 NSQ 經過在消息 ID 中增長 TraceID,對消息在 NSQ 中的生命週期進行追蹤,client 經過 PUBTRACE 新命令將須要追蹤的消息發送到 NSQ,PUB_TRACE 命令將包含 traceID 和消息字節碼,格式以下:

PUB_TRACE <topic_name> <topic_partition>\n[ 4-byte size in bytes ][8-byte size trace id][ N-byte binary data ]複製代碼

相較於 PUB 命令,PUB_TRACE 在消息體中增長了 traceID 字段,client 在實現時,傳遞 64 位整數。NSQ 對 PUB_TRACE 的響應格式以下:

OK(2-bytes)+[8-byte internal id]+[8-byte trace id from client]+[8-byte internal disk queue offset]+[4 bytes internal disk queue data size]複製代碼

client 可經過配置或者動態開關,開啓或關閉消息追蹤功能,讓生產者在 PUB 和 PUB_TRACE 命令之間進行切換。爲了獲得完整的 Trace 信息,建議 client 在生產者端打印 PUB_TRACE 響應返回的信息,在消費者端打印收到消息的 TraceID 和時間。

8、總結

本文結合自研版 NSQ 新特性,討論了構建支持服務發現、順序消費、消息追蹤等新特性的 client 過程當中的一些實踐。

9、參考資料

  • [1] NSQ TCP Protocol Spec:http://nsq.io/clients/tcp_protocol_spec.html

  • [2] Building Client Libraries:http://nsq.io/clients/building_client_libraries.html

  • [3] NSQ-Client-Java in youzan:https://github.com/youzan/nsqJavaSDK

  • [4] NSQ in youzan:https://github.com/youzan/nsq

相關文章
相關標籤/搜索