併發系統實踐1

  最近項目裏面遇到了一個較爲複雜的併發系統設計,雖然最終仍是實現了並投入運行,可是耗時仍是挺久的,想一想能夠總結下,但願能夠幫助之後的設計和實踐,須要注意的是,我專一於介紹遇到的問題以及相應的解決思路,可能不夠系統性,歡迎指正。
  談到併發,不少童鞋聽到的都是優勢,我想潑下冷水,雖然多線程能夠極大的提升效率,吞吐量等,可是併發不是銀彈,引入的同時會提升系統的複雜性,給設計和調試帶了難度(還不包括引入的系統開銷例如線程切換等),因此須要辯證的來看待併發,這點不少基礎併發書籍都會在最開始介紹。
 
背景介紹
  咱們所作的是一個監控系統,用於監控咱們開發的線上服務。咱們的線上服務大體包括兩部分,
  首先定義下咱們的監控目標:延遲。也就是說,A的流入數據到B的產出數據時間在95%的percentile超過5分鐘即表示有較大延遲須要報警。
初步設計
  根據目標,咱們設計的初步架構是在內存中保存全部的數據結構,並根據咱們的閾值打各類perfCounter,再經過Counter內置的統計計算和報警系統進行報警。這樣設計的目的是:簡單並能夠快速原型化,目前的需求不須要歷史記錄,因此落盤也就沒有必要了。
首先第一個遇到的問題是,如何監控咱們高度依賴的服務A?
  咱們但願下降監控系統對被監控系統的耦合,因此很天然的仍是想到了引入一箇中間件來解決,就是kafka queue。這樣,咱們打算給A和B定義以下幾組kafka消息,咱們須要監控目標就變成了最初的消息和最後的消息的時間差,結構大體以下:
  消息結構:
  1.   時間戳;
  2.   步驟名稱;
  3.   各個步驟的額外數據;
  4.   id;
  5.   是否成功;
  6.   錯誤消息;
  整體思路能夠描繪以下:
  總共有兩個線程,一個是主線程,負責從kafka中取消息消費,並根據消息類型路由到各個handler處理;另外一個線程是超時記錄線程,用於不斷的從內存隊列中取消息並計時,若是超時且沒有沒有收到消息E(結束)即認爲超時並打上報警的counter。
  因此,一次處理超時的定義爲 time(E) - time(A) > threshold
  這裏須要考慮的問題是怎麼設計數據結構來存儲消息以及如何加鎖來控制併發呢?
  因爲A,B,C,D,E的消息獲取記錄都須要,由於系統的此次處理可能中斷在任何一個消息的處理上,因此這些收到的消息都須要給到超時記錄線程用於報警。因此,很天然的想到了ConcurrentMap,key是消息id,value則是另外一個map,包括了各個消息步驟的數據,key是步驟名,value是消息。
  經過wiki咱們知道,
  A Map providing additional atomic putIfAbsent, remove, and replace methods.Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentMap as a key or value happen-before actions subsequent to the access or removal of that object from the ConcurrentMap in another thread.
  這個併發map,保證了刪除和放入的原子性,以及可見性,這樣這個數據結構就至關於自帶鎖了,它會同時被消費線程和超時記錄線程訪問。
  (這裏有個小技巧須要注意,操做value的時候每每須要判斷是否存在某個key,由於這個key隨時可能被其餘線程刪除,因此通常來講是把value的引用記下來而不是判斷一次後,每次都去讀,由於有被異步刪除的風險。)
  乍一看,邏輯彷佛很清晰,各個消息的handler在處理消息的時候,判斷有沒有失敗,失敗就直接報警(看成處理不成功),成功的話直接放入超時線程計時。對於E的handler,多加上判斷是否結束,若是結束,把當前的時間差記錄下來並直接返回(刪掉map中的key)。
 
版本迭代
2.0版本:
  細心的同窗應該發現,這裏還有併發問題:
  1.   對於一個消息,在整個流程中,其實應該只被打一次的(成功或者失敗),可是咱們以前方案並無考慮,可能一個消息在E處剛打了成功,與此同時,超時記錄線程正好記錄到超時(沒來得及檢查map的key是否被刪除),這個時候消息就被打了兩次時間,形成數據不真實;
  這個併發問題不容易發現是由於,共享變量不是那麼好察覺,不像那個map,這個打counter其實也是共享變量,因此審視一個併發系統須要從總體和局部同時考慮,局部上咱們發現須要對map加鎖,總體上,監控系統的輸入是各類kafka消息,輸出是counter計時,這樣考慮咱們可能就比較容易發現這個隱藏的共享變量了。
  那麼怎麼解決呢?對每一個消息的counter加鎖便可。因爲不想在原來的map加數據了,因此新加了一個concurrentMap來記錄每個消息id->AtomicBoolean,這樣,在每次打成功或者失敗的counter的時候,用CAS判斷便可,自己AtomicBoolean是保證可見性的,這點就不須要擔憂了。
  但是,不久又有新需求來了,你們須要知道每個消息A,B,C,D,E的處理時間差,這樣方便定位是哪一步比較慢。
3.0版本:
  針對這個問題,我看到是同一個id的每一步都須要打一次的鎖,因此,又建了一個每個id+step的ConcurrentMap來記錄每個id+step -> AtomicBoolean來保證每個step都只被打一次。
  改完以後,上線發現C,D和E的時間比E的時間還長,思前想後,忽然發現沒有考慮到A,B,C,D, E的順序是不保證了,因此,不能假設C比D早來,或者E比D晚來,這樣就形成了,某些id的處理E先來了,打了一個結束時間的counter,C和D還沒來,等它們來了,因爲已經結束了就不會計時了,這樣形成整體C,D的時間少了。解決方法是,在E來了,結束時,檢查C,D是否來,沒來按當前時間打便可。
 
總結
  直到這個版本上線,才使得整體的時間指標看起來比較正常,這3個版本走走停停改了將近1個月感慨良多,深入的理解了多線程帶來的系統複雜性和調試複雜性,以爲有下來如下幾點值得總結:
  1.   在設計的時候,儘可能多花時間考慮清楚可能形成併發問題的點;
  2.   能夠從總體和局部,輸入和輸出等多方面來考慮併發點,再設計相應的併發工具解決;
  3.   在引入多線程的時候,要考慮到即將形成的複雜性提高,須要有權衡的思考;
相關文章
相關標籤/搜索