本文記錄本身在閱讀和學習nsq源碼的時候的一些學習筆記,主要目的是我的總結和方便後期查閱。html
author:lihaiping1603@aliyun.com網絡
date:2020/01/13函數
對於topic下的channel中的消息會有router()函數進行路由,它將消息流轉到內存的chan中,即channel中的incomingMsgChan轉到memoryMsgChan,若是memoryMsgChan消息滿了,堵住的話,就會將消息寫入到backend中。oop
對於memoryMsgChan中消息,會在channel的messagePump()函數中進行再一次的流轉,他會將消息從memoryMsgChan通道中接收,而後再次轉發到clientMsgChan中去。學習
對於每一個client網絡連接,NSQ都會對此client網絡連接新建一個IOLoop()的goroutine來處理一切和client的消息。當client發送SUB訂閱命令以後,client會根據它訂閱的topic和channel,再啓動一個goroutine來推送channel中的msg到client,這個函數就是messagePump()函數。而在client的messagePump中,主要的任務就是發送心跳和和接收來自channel中clientMsgChan消息,而後將消息打包發送給client。固然消息發送給client可能會失敗,因此NSQ在這裏作一個很好的容錯失敗的策略,當咱們將消息推送給client的時候,既然消息可能會失敗,因此咱們就須要將消息存起來,因而client會將這個消息在它所在的channel中,啓動一個超時策略,若是超時的話,消息會被再次以跟寵topic流轉到chanel一樣的流程進入到channel的消息流轉流程。spa
而client在發送網絡消息以前,會經過調用client.Channel.StartInFlightTimeout(msg, client)函數,來將消息msg和client一塊兒生成一個另外的消息對象inFlightMessage,而後再將這個消息對象增長超時時間,進一步封裝成pqueue.Item,而後分別存儲到 channel的inFlightMessages和inFlightPQ中,其中inFlightMessages是一個map,他以msg的id爲key進行存儲,方便等會當客戶端收到消息應答之後,進行刪除操做。同時在inFlightPQ中,這個主要是作超時用的,由於channel中會啓動一個goroutine函數inFlightWorker來專門處理inFlightPQ中超時消息。router
在inFlightWorker()中,主要根據超時時間來不斷的從pq隊列中取消息,若是超時時間到了,消息尚未從inFlightPQ隊列中刪除掉,說明這個消息可能丟失或者出現什麼問題了,咱們就須要從新流轉這個消息。htm