本篇文章主要介紹如何在nodeJs中使用nsq,其餘實現將在後續文章輸出。前端
前段時間作了一個網頁生成pdf的node服務。因爲puppteer和canvas生成過程當中對內存的消耗比較大,內容量大的網頁生成時間過長,對於第三方組件有時候會生成出問題等緣由。node
引入了nsq,使項目實現負載均衡,消除單點故障。git
可是網上查找以後發現介紹node中加入nsq的方案不多,通過軟膜硬泡終於把nsq引入了node中,但願能把本身的收穫和你們聊一下吧。github
NSQ是一個基於Go語言的分佈式實時消息平臺, 它具備分佈式、去中心化的拓撲結構,支持無限水平擴展。無單點故障、故障容錯、高可用性以及可以保證消息的可靠傳遞的特徵。另外,NSQ很是容易配置和部署, 且支持衆多的消息協議。支持多種客戶端,協議簡單。sql
nsq設計很簡單,須要瞭解如下幾個核心概念。npm
一、 nsqd:一個負責接收、排隊、轉發消息到客戶端的守護進程
二、 nsqlookupd:管理拓撲信息, 用於收集nsqd上報的topic和channel,並提供最終一致性的發現服務的守護進程。
三、 Topic:一個topic就是程序發佈消息的一個邏輯鍵,當程序第一次發佈消息時就會建立topic。
四、 Channels:channel組與消費者相關,是消費者之間的負載均衡,channel在某種意義上來講是一個「隊列」。每當一個發佈者發送一條消息到一個topic,消息會被複制到全部消費者鏈接的channel上,消費者經過這個特殊的channel讀取消息,實際上,在消費者第一次訂閱時就會建立channel。
Topic只能有一個channel能夠有多個,不一樣的channel用於分發不一樣的任務
下面附上一個金典nsq示意圖:
canvas
在使用過程當中發現gcc版本太低致使報錯。
由於node.js 4升級了v8引擎,須要gcc版本在4.8以上。後端
本項目是在eggjs基礎上創建的。首先須要在項目中安裝nsqjs
服務器
$ npm install nsqjs --save
在根文件app.js對nsq進行控制,nsq的配置很是簡潔,nsq分爲寫和讀兩個獨立的過程。網絡
const nsq = require('nsqjs') module.exports = app => { app.beforeStart(async () => { // 實例化nsq的寫操做 // 在config中配置nsq的host和port,這裏是你配置的nsq地址 const writerNsq = new nsq.Writer(app.config.nsq.nsqHostWriter, app.config.nsq.writePort) // 鏈接nsq的寫功能 writerNsq.connect() // 當寫操做鏈接成功後,把其賦值到全局的app上以便寫入信息 writerNsq.on('ready', () => { app.writerNsq = writerNsq })
當nsq寫功能實現後,咱們能夠經過publish方法向nsq隊列寫入信息
ctx.app.writerNsq.publish(config.nsq.topic, { // 你所須要傳遞的參數 })
當服務器有空閒的時候nsq會隨機分配到空閒線程上實現讀操做,咱們的核心業務是在讀功能啓用以後實現的。
讀和寫的初始化過程是在項目啓動時候就要執行的。
項目運行過程當中,咱們只是不停的在進行讀寫操做而已。
// 實例化nsq的讀操做 // 參數是對應須要讀的topic和channel和應的nsq讀的地址 const client = new nsq.Reader(app.config.nsq.topic, app.config.nsq.channel, { lookupdHTTPAddresses: app.config.nsq.nsqHostReader, maxInFlight: 1 }) // 鏈接nsq的讀功能 client.connect() // 每當有消息隊列進來的時候就會調用client.on方法 // message是我門在寫的過程當中傳入的信息 client.on方法('message', async msg => { // 對寫入的信息格式化 let data = JSON.parse(msg.body.toString()) try { // 爲了保持鏈接狀態,處理超時狀況 const touch = () => { if (!msg.hasResponded) { msg.touch() // Touch the message again a second before the next timeout. setTimeout(touch, msg.timeUntilTimeout() - 1000) } } let timeTouch = setTimeout(touch, msg.timeUntilTimeout() - 1000) let timeFinish = setTimeout(msg.finish.bind(msg), msg.timeUntilTimeout() * 3 + 1000) // 這裏是項目的核心處理部分,具體內容後面文章在說明,這裏返回的url即是生成pdf的網絡地址 let url = await ctx.service.pdf.index.generate(data) clearTimeout(timeTouch) clearTimeout(timeFinish) // 這裏表示這個隊列結束告訴nsq能夠放下個兄弟進來了 msg.finish() } catch (error) { // 萬一出現錯誤也不要阻塞,nsq在失敗後會從新入隊 msg.finish() // 這裏能夠加入網絡日誌 console.log(error) } }); client.on('error', function(err) { // 這裏監聽讀操做時候發生錯誤狀況處理 // 這裏能夠作一些錯誤處理,加錯誤日誌 console.log(err) }); }); };
剛開始說到用nsq仍是有點慌的,畢竟做爲一個前端工程師一臉懵逼,可是通過仔細學習,和後端大佬的請教,發現nsq其實就是一個高效隊列,簡易好用,對於上手來講仍是比較簡單的。
後續文章還會對puppteer生成pdf服務的核心業務作詳細介紹,也就是上文ctx.service.pdf.index.generate(data)具體實現過程。
項目地址:https://github.com/XIEJUNXIRU...
以上只是本人的學習總結,若有問題,請大神不吝賜教。