Go 併發編程中的經驗教訓

經過學習如何定位併發處理的陷阱來避免將來處理這些問題時的困境。html

在複雜的分佈式系統進行任務處理時,你一般會須要進行併發的操做。在 Mode.net 公司,咱們天天都要和實時、快速和靈活的軟件打交道。而沒有一個高度併發的系統,就不可能構建一個毫秒級的動態地路由數據包的全球專用網絡。這個動態路由是基於網絡狀態的,儘管這個過程須要考慮衆多因素,但咱們的重點是鏈路指標。在咱們的環境中,鏈路指標能夠是任何跟網絡連接的狀態和當前屬性(如連接延遲)有關的任何內容。linux

併發探測連接監控

咱們的動態路由算法 H.A.L.O.逐跳自適應鏈路狀態最佳路由Hop-by-Hop Adaptive Link-State Optimal Routing)部分依賴於鏈路指標來計算路由表。這些指標由位於每一個 PoP(存活節點Point of Presence)上的獨立組件收集。PoP 是表示咱們的網絡中單個路由實體的機器,經過鏈路鏈接並分佈在咱們的網絡拓撲中的各個位置。某個組件使用網絡數據包探測周圍的機器,周圍的機器回覆數據包給前者。從接收到的探測包中能夠得到鏈路延遲。因爲每一個 PoP 都有不止一個臨近節點,因此這種探測任務實質上是併發的:咱們須要實時測量每一個臨近鏈接點的延遲。咱們不能串行地處理;爲了計算這個指標,必須儘快處理每一個探測。git

latency computation graph
latency computation graph

序列號和重置:一個從新排列場景

咱們的探測組件互相發送和接收數據包,並依靠序列號進行數據包處理。這旨在避免處理重複的包或順序被打亂的包。咱們的第一個實現依靠特殊的序列號 0 來重置序列號。這個數字僅在組件初始化時使用。主要的問題是咱們考慮了遞增的序列號老是從 0 開始。在該組件重啓後,包的順序可能會從新排列,某個包的序列號可能會輕易地被替換成重置以前使用過的值。這意味着,後繼的包都會被忽略掉,直到排到重置以前用到的序列值。github

UDP 握手和有限狀態機

這裏的問題是該組件重啓先後的序列號是否一致。有幾種方法能夠解決這個問題,通過討論,咱們選擇了實現一個帶有清晰狀態定義的三步握手協議。這個握手過程在初始化時經過連接創建會話。這樣能夠確保節點經過同一個會話進行通訊且使用了適當的序列號。golang

爲了正確實現這個過程,咱們必須定義一個有清晰狀態和過渡的有限狀態機。這樣咱們就能夠正確管理握手過程當中的全部極端狀況。算法

finite state machine diagram
finite state machine diagram

會話 ID 由握手的初始化程序生成。一個完整的交換順序以下:緩存

  1. 發送者發送一個 SYN(ID) 數據包。
  2. 接收者存儲接收到的 ID 併發送一個 SYN-ACK(ID)
  3. 發送者接收到 SYN-ACK(ID) 併發送一個 ACK(ID)。它還發送一個從序列號 0 開始的數據包。
  4. 接收者檢查最後接收到的 ID,若是 ID 匹配,則接受 ACK(ID)。它還開始接受序列號爲 0 的數據包。

處理狀態超時

基本上,每種狀態下你都須要處理最多三種類型的事件:連接事件、數據包事件和超時事件。這些事件會併發地出現,所以你必須正確處理併發。ruby

  • 連接事件包括網絡鏈接或網絡斷開的變化,相應的初始化一個連接會話或斷開一個已創建的會話。
  • 數據包事件是控制數據包(SYN/SYN-ACK/ACK)或只是探測響應。
  • 超時事件在當前會話狀態的預約超時時間到期後觸發。

這裏面臨的最主要的問題是如何處理併發的超時到期和其餘事件。這裏很容易陷入死鎖和資源競爭的陷阱。bash

第一種方法

本項目使用的語言是 Golang。它確實提供了原生的同步機制,如自帶的通道和鎖,而且可以使用輕量級線程來進行併發處理。網絡

gophers hacking together
gophers hacking together

gopher 們聚衆狂歡

首先,你能夠設計兩個分別表示咱們的會話和超時處理程序的結構體。

type Session struct {  
  State SessionState  
  Id SessionId  
  RemoteIp string  
}

type TimeoutHandler struct {  
  callback func(Session)     session Session     duration int     timer *timer.Timer   } 複製代碼

Session 標識鏈接會話,內有表示會話 ID、臨近的鏈接點的 IP 和當前會話狀態的字段。

TimeoutHandler 包含回調函數、對應的會話、持續時間和指向調度計時器的指針。

每個臨近鏈接點的會話都包含一個保存調度 TimeoutHandler 的全局映射。

SessionTimeout map[Session]*TimeoutHandler
複製代碼

下面方法註冊和取消超時:

// schedules the timeout callback function.  
func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {  
    timeout.callback(timeout.session)  
  })  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.timer == nil {  
    return  
  }  
  timeout.timer.Stop()  
}
複製代碼

你可使用相似下面的方法來建立和存儲超時:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
  if sessionTimeout[session] == nil {  
    sessionTimeout[session] := new(TimeoutHandler)  
  }  
   
  timeout = sessionTimeout[session]  
  timeout.session = session  
  timeout.callback = callback  
  timeout.duration = duration  
  return timeout  
}
複製代碼

超時處理程序建立後,會在通過了設置的 duration 時間(秒)後執行回調函數。然而,有些事件會使你從新調度一個超時處理程序(與 SYN 狀態時的處理同樣,每 3 秒一次)。

爲此,你可讓回調函數從新調度一次超時:

func synCallback(session Session) {  
  sendSynPacket(session)

  // reschedules the same callback.  
  newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  newTimeout.Register()

  sessionTimeout[state] = newTimeout  
}
複製代碼

此次回調在新的超時處理程序中從新調度本身,並更新全局映射 sessionTimeout

數據競爭和引用

你的解決方案已經有了。能夠經過檢查計時器到期後超時回調是否執行來進行一個簡單的測試。爲此,註冊一個超時,休眠 duration 秒,而後檢查是否執行了回調的處理。執行這個測試後,最好取消預約的超時時間(由於它會從新調度),這樣纔不會在下次測試時產生反作用。

使人驚訝的是,這個簡單的測試發現了這個解決方案中的一個問題。使用 cancel 方法來取消超時並無正確處理。如下順序的事件會致使數據資源競爭:

  1. 你有一個已調度的超時處理程序。
  2. 線程 1:
    1. 你接收到一個控制數據包,如今你要取消已註冊的超時並切換到下一個會話狀態(如發送 SYN 後接收到一個 SYN-ACK
    2. 你調用了 timeout.Cancel(),這個函數調用了 timer.Stop()。(請注意,Golang 計時器的中止不會終止一個已過時的計時器。)
  3. 線程 2:
    1. 在取消調用以前,計時器已過時,回調即將執行。
    2. 執行回調,它調度一次新的超時並更新全局映射。
  4. 線程 1:
    1. 切換到新的會話狀態並註冊新的超時,更新全局映射。

兩個線程併發地更新超時映射。最終結果是你沒法取消註冊的超時,而後你也會丟失對線程 2 從新調度的超時的引用。這致使處理程序在一段時間內持續執行和從新調度,出現非預期行爲。

鎖也解決不了問題

使用鎖也不能徹底解決問題。若是你在處理全部事件和執行回調以前加鎖,它仍然不能阻止一個過時的回調運行:

func (timeout* TimeoutHandler) Register() {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
    stateLock.Lock()  
    defer stateLock.Unlock()

    timeout.callback(timeout.session)  
  })  
}
複製代碼

如今的區別就是全局映射的更新是同步的,可是這仍是不能阻止在你調用 timeout.Cancel() 後回調的執行 —— 這種狀況出如今調度計時器過時了可是尚未拿到鎖的時候。你仍是會丟失一個已註冊的超時的引用。

使用取消通道

你可使用取消通道,而沒必要依賴不能阻止到期的計時器執行的 golang 函數 timer.Stop()

這是一個略有不一樣的方法。如今你能夠不用再經過回調進行遞歸地從新調度;而是註冊一個死循環,這個循環接收到取消信號或超時事件時終止。

新的 Register() 產生一個新的 go 線程,這個線程在超時後執行你的回調,並在前一個超時執行後調度新的超時。返回給調用方一個取消通道,用來控制循環的終止。

func (timeout *TimeoutHandler) Register() chan struct{} {  
  cancelChan := make(chan struct{})  
   
  go func () {  
    select {  
    case _ = <- cancelChan:  
      return  
    case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
      func () {  
        stateLock.Lock()  
        defer stateLock.Unlock()

        timeout.callback(timeout.session)  
      } ()  
    }  
  } ()

  return cancelChan  
}

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
  timeout.cancelChan <- struct{}{}  
}
複製代碼

這個方法給你註冊的全部超時提供了取消通道。一個取消調用向通道發送一個空結構體並觸發取消操做。然而,這並不能解決前面的問題;可能在你經過通道取消以前以及超時線程拿到鎖以前,超時時間就已經到了。

這裏的解決方案是,在拿到鎖以後,檢查一下超時範圍內的取消通道。

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
    func () {  
      stateLock.Lock()  
      defer stateLock.Unlock()  
     
      select {  
      case _ = <- handler.cancelChan:  
        return  
      default:  
        timeout.callback(timeout.session)  
      }  
    } ()  
  }
複製代碼

最終,這能夠確保在拿到鎖以後執行回調,不會觸發取消操做。

當心死鎖

這個解決方案看起來有效;可是仍是有個隱患:死鎖

請閱讀上面的代碼,試着本身找到它。考慮下描述的全部函數的併發調用。

這裏的問題在取消通道自己。咱們建立的是無緩衝通道,即發送的是阻塞調用。當你在一個超時處理程序中調用取消函數時,只有在該處理程序被取消後才能繼續處理。問題出如今,當你有多個調用請求到同一個取消通道時,這時一個取消請求只被處理一次。當多個事件同時取消同一個超時處理程序時,如鏈接斷開或控制包事件,很容易出現這種狀況。這會致使死鎖,可能會使應用程序停機。

gophers on a wire, talking
gophers on a wire, talking

有人在聽嗎?

(已得到 Trevor Forrey 受權。)

這裏的解決方案是建立通道時指定緩存大小至少爲 1,這樣向通道發送數據就不會阻塞,也顯式地使發送變成非阻塞的,避免了併發調用。這樣能夠確保取消操做只發送一次,而且不會阻塞後續的取消調用。

func (timeout* TimeoutHandler) Cancel() {  
  if timeout.cancelChan == nil {  
    return  
  }  
   
  select {  
  case timeout.cancelChan <- struct{}{}:  
  default:  
    // can’t send on the channel, someone has already requested the cancellation.  
  }  
}
複製代碼

總結

在實踐中你學到了併發操做時出現的常見錯誤。因爲其不肯定性,即便進行大量的測試,也不容易發現這些問題。下面是咱們在最初的實現中遇到的三個主要問題:

在非同步的狀況下更新共享數據

這彷佛是個很明顯的問題,但若是併發更新發生在不一樣的位置,就很難發現。結果就是數據競爭,因爲一個更新會覆蓋另外一個,所以對同一數據的屢次更新中會有某些更新丟失。在咱們的案例中,咱們是在同時更新同一個共享映射裏的調度超時引用。(有趣的是,若是 Go 檢測到在同一個映射對象上的併發讀寫,會拋出致命錯誤 — 你能夠嘗試下運行 Go 的數據競爭檢測器)。這最終會致使丟失超時引用,且沒法取消給定的超時。當有必要時,永遠不要忘記使用鎖。

gopher assembly line
gopher assembly line

不要忘記同步 gopher 們的工做

缺乏條件檢查

在不能僅依賴鎖的獨佔性的狀況下,就須要進行條件檢查。咱們遇到的場景稍微有點不同,可是核心思想跟條件變量是同樣的。假設有個一個生產者和多個消費者使用一個共享隊列的經典場景,生產者能夠將一個元素添加到隊列並喚醒全部消費者。這個喚醒調用意味着隊列中的數據是可訪問的,而且因爲隊列是共享的,消費者必須經過鎖來進行同步訪問。每一個消費者均可能拿到鎖;然而,你仍然須要檢查隊列中是否有元素。由於在你拿到鎖的瞬間並不知道隊列的狀態,因此仍是須要進行條件檢查。

在咱們的例子中,超時處理程序收到了計時器到期時發出的「喚醒」調用,可是它仍須要檢查是否已向其發送了取消信號,而後才能繼續執行回調。

gopher boot camp
gopher boot camp

若是你要喚醒多個 gopher,可能就須要進行條件檢查

死鎖

當一個線程被卡住,無限期地等待一個喚醒信號,可是這個信號永遠不會到達時,就會發生這種狀況。死鎖能夠經過讓你的整個程序停機來完全殺死你的應用。

在咱們的案例中,這種狀況的發生是因爲屢次發送請求到一個非緩衝且阻塞的通道。這意味着向通道發送數據只有在從這個通道接收完數據後才能返回。咱們的超時線程循環迅速從取消通道接收信號;然而,在接收到第一個信號後,它將跳出循環,而且不再會從這個通道讀取數據。其餘的調用會一直被卡住。爲避免這種狀況,你須要仔細檢查代碼,謹慎處理阻塞調用,並確保不會發生線程飢餓。咱們例子中的解決方法是使取消調用成爲非阻塞調用 — 咱們不須要阻塞調用。


via: opensource.com/article/19/…

做者:Eduardo Ferreira 選題:lujun9972 譯者:lxbwolf 校對:wxy

本文由 LCTT 原創編譯,Linux中國 榮譽推出

相關文章
相關標籤/搜索