理解channel 工做原理以及源碼

Go 的併發特性  
  • goroutines : 獨立執行每一個任務,並 可能並行 執行
  • channels : 用於 goroutines 之間的通信、同步
  • 一個簡單的事務處理的例子  
    對於下面這樣的非併發的程序:
            
            
            
            
    func main() {
    tasks := getTasks()
    // 處理每一個任務
    for _, task := range tasks {
    process(task)
    }
    }
    將其轉換爲 Go 的併發模式很容易,使用典型的 Task Queue 的模式:
            
            
            
            
    func main() {
    // 建立帶緩衝的 channel
    ch := make ( chan Task, 3 )
    // 運行固定數量的 workers
    for i := 0 ; i < numWorkers; i++ {
    go worker(ch)
    }
    // 發送任務到 workers
    hellaTasks := getTasks()
    for _, task := range hellaTasks {
    ch <- task
    }
    ...
    }
    func worker(ch chan Task) {
    for {
    // 接收任務
    task := <-ch
    process(task)
    }
    }
    channels 的特性  
    • goroutine-safe,多個 goroutine 能夠同時訪問一個 channel 而不會出現競爭問題
    • 能夠用於在 goroutine 之間存儲傳遞
    • 其語義是先入先出(FIFO)
    • 能夠致使 goroutine 的 block 和 unblock
    • 解析  
      構造 channel  
                     
                     
                     
                     
      // 帶緩衝的 channel
      ch := make ( chan Task, 3 )
      // 無緩衝的 channel
      ch := make ( chan Tass)

      回顧前面提到的 channel 的特性,特別是前兩個。若是忽略內置的 channel,讓你設計一個具備 goroutines-safe 而且能夠用來存儲、傳遞值的東西,你會怎麼作?不少人可能以爲或許能夠用一個帶鎖的隊列來作。沒錯,事實上,channel 內部就是一個帶鎖的隊列。html

      https://golang.org/src/runtime/chan.gogolang

                     
                     
                     
                     
      type hchan struct {
      ...
      buf unsafe.Pointer // 指向一個環形隊列
      ...
      sendx uint // 發送 index
      recvx uint // 接收 index
      ...
      lock mutex // 互斥量
      }

      buf 的具體實現很簡單,就是一個環形隊列的實現。sendx  recvx 分別用來記錄發送、接收的位置。而後用一個 lock 互斥鎖來確保無競爭冒險。安全

      對於每個 ch := make(chan Task, 3) 這類操做,都會在中,分配一個空間,創建並初始化一個 hchan 結構變量,而 ch 則是指向這個 hchan 結構的指針併發

      由於 ch 自己就是個指針,因此咱們才能夠在 goroutine 函數調用的時候直接將 ch 傳遞過去,而不用再 &ch 取指針了,因此全部使用同一個 ch 的 goroutine 都指向了同一個實際的內存空間。ide

      發送、接收  

      爲了方便描述,咱們用 G1 表示 main() 函數的 goroutine,而 G2 表示 worker 的 goroutine。函數

                     
                     
                     
                     
      // G1
      func main() {
      ...
      for _, task := range tasks {
      ch <- task
      }
      ...
      }
                     
                     
                     
                     
      // G2
      func worker(ch chan Task) {
      for {
      task :=<-ch
      process(task)
      }
      }
      簡單的發送、接收  

      那麼 G1 中的 ch <- task0 具體是怎麼作的呢?post

      1. 獲取鎖
      2. enqueue(task0)(這裏是內存複製 task0)
      3. 釋放鎖

      這一步很簡單,接下來看 G2  t := <- ch 是如何讀取數據的。性能

      1. 獲取鎖
      2. t = dequeue()(一樣,這裏也是內存複製)
      3. 釋放鎖

      這一步也很是簡單。可是咱們從這個操做中能夠看到,全部 goroutine 中共享的部分只有這個 hchan 的結構體,而全部通信的數據都是內存複製。這遵循了 Go 併發設計中很核心的一個理念:優化

      「Do not communicate by sharing memory;instead, share memory by communicating.」ui

      阻塞和恢復  
      發送方被阻塞  

      假設 G2 須要很長時間的處理,在此期間,G1 不斷的發送任務:

      1. ch <- task1
      2. ch <- task2
      3. ch <- task3

      可是當再一次 ch <- task4 的時候,因爲 ch 的緩衝只有 3 個,因此沒有地方放了,因而 G1 被 block 了,當有人從隊列中取走一個 Task 的時候,G1 纔會被恢復。這是咱們都知道的,不過咱們今天關心的不是發生了什麼,而是如何作到的?

      goroutine 的運行時調度  

      首先,goroutine 不是操做系統線程,而是用戶空間線程。所以 goroutine 是由 Go runtime 來建立並管理的,而不是 OS,因此要比操做系統線程輕量級。

      固然,goroutine 最終仍是要運行於某個線程中的,控制 goroutine 如何運行於線程中的是 Go runtime 中的 scheduler (調度器)。

      Go 的運行時調度器是 M:N 調度模型,既 N 個 goroutine,會運行於 M 個 OS 線程中。換句話說,一個 OS 線程中,可能會運行多個 goroutine。

      Go 的 M:N 調度中使用了3個結構:

      • M: OS 線程
      • G: goroutine
      • P: 調度上下文
        • P 擁有一個運行隊列,裏面是全部能夠運行的 goroutine 及其上下文

      要想運行一個 goroutine - G,那麼一個線程 M,就必須持有一個該 goroutine 的上下文 P

      goroutine 被阻塞的具體過程  

      那麼當 ch <- task4 執行的時候,channel 中已經滿了,須要pause G1。這個時候,:

      1. G1 會調用運行時的 gopark
      2. 而後 Go 的運行時調度器就會接管
      3.  G1 的狀態設置爲 waiting
      4. 斷開 G1  M 之間的關係(switch out),所以 G1 脫離 M,換句話說,M 空閒了,能夠安排別的任務了。
      5.  P 的運行隊列中,取得一個可運行的 goroutine G
      6. 創建新的 G  M 的關係(Switch in),所以 G 就準備好運行了。
      7. 當調度器返回的時候,新的 G 就開始運行了,而 G1 則不會運行,也就是 block 了。

      從上面的流程中能夠看到,對於 goroutine 來講,G1 被阻塞了,新的 G 開始運行了;而對於操做系統線程 M 來講,則根本沒有被阻塞。

      咱們知道 OS 線程要比 goroutine 要沉重的多,所以這裏儘可能避免 OS 線程阻塞,能夠提升性能。

      goroutine 恢復執行的具體過程  

      前面理解了阻塞,那麼接下來理解一下如何恢復運行。不過,在繼續瞭解如何恢復以前,咱們須要先進一步理解 hchan 這個結構。由於,當 channel 不在滿的時候,調度器是如何知道該讓哪一個 goroutine 繼續運行呢?並且 goroutine 又是如何知道該從哪取數據呢?

       hchan 中,除了以前提到的內容外,還定義有 sendq  recvq 兩個隊列,分別表示等待發送、接收的 goroutine,及其相關信息。

                            
                            
                            
                            
      type hchan struct {
      ...
      buf unsafe.Pointer // 指向一個環形隊列
      ...
      sendq waitq // 等待發送的隊列
      recvq waitq // 等待接收的隊列
      ...
      lock mutex // 互斥量
      }

      其中 waitq 是一個鏈表結構的隊列,每一個元素是一個 sudog 的結構,其定義大體爲:

                            
                            
                            
                            
      type sudog struct {
      g *g // 正在等候的 goroutine
      elem unsafe.Pointer // 指向須要接收、發送的元素
      ...
      }

      https://golang.org/src/runtime/runtime2.go?h=sudog#L270

      因此在以前的阻塞 G1 的過程當中,實際上:

      1. G1 給本身建立一個 sudog 的變量
      2. 而後追加到 sendq 的等候隊列中,方便未來的 receiver 來使用這些信息恢復 G1

      這些都是發生在調用調度器以前

      那麼如今開始看一下如何恢復。

       G2 調用 t := <- ch 的時候,channel 的狀態是,緩衝是滿的,並且還有一個 G1 在等候發送隊列裏,而後 G2 執行下面的操做:

      1. G2 先執行 dequeue() 從緩衝隊列中取得 task1  t
      2. G2  sendq 中彈出一個等候發送的 sudog
      3. 將彈出的 sudog 中的 elem 的值 enqueue()  buf 
      4. 將彈出的 sudog 中的 goroutine,也就是 G1,狀態從 waiting 改成 runnable
        1. 而後,G2 須要通知調度器 G1 已經能夠進行調度了,所以調用 goready(G1)
        2. 調度器將 G1 的狀態改成 runnable
        3. 調度器將 G1 壓入 P 的運行隊列,所以在未來的某個時刻調度的時候,G1 就會開始恢復運行。
        4. 返回到 G2

      注意,這裏是由 G2 來負責將 G1  elem 壓入 buf 的,這是一個優化。這樣未來 G1 恢復運行後,就沒必要再次獲取鎖、enqueue()、釋放鎖了。這樣就避免了屢次鎖的開銷。

      若是接收方先阻塞呢?  

      更酷的地方是接收方先阻塞的流程。

      若是 G2 先執行了 t := <- ch,此時 buf 是空的,所以 G2 會被阻塞,他的流程是這樣:

      1. G2 給本身建立一個 sudog 結構變量。其中 g 是本身,也就是 G2,而 elem 則指向 t
      2. 將這個 sudog 變量壓入 recvq 等候接收隊列
      3. G2 須要告訴 goroutine,本身須要 pause 了,因而調用 gopark(G2)
        1. 和以前同樣,調度器將其 G2 的狀態改成 waiting
        2. 斷開 G2  M 的關係
        3.  P 的運行隊列中取出一個 goroutine
        4. 創建新的 goroutine 和 M 的關係
        5. 返回,開始繼續運行新的 goroutine

      這些應該已經不陌生了,那麼當 G1 開始發送數據的時候,流程是什麼樣子的呢?

      G1 能夠將 enqueue(task),而後調用 goready(G2)。不過,咱們能夠更聰明一些。

      咱們根據 hchan 結構的狀態,已經知道 task 進入 buf 後,G2 恢復運行後,會讀取其值,複製到 t 中。那麼 G1 能夠根本不走 bufG1 能夠直接把數據給 G2

      Goroutine 一般都有本身的棧,互相之間不會訪問對方的棧內數據,除了 channel。這裏,因爲咱們已經知道了 t 的地址(經過 elem指針),並且因爲 G2 不在運行,因此咱們能夠很安全的直接賦值。當 G2 恢復運行的時候,既不須要再次獲取鎖,也不須要對 buf 進行操做。從而節約了內存複製、以及鎖操做的開銷。

      總結  
      • goroutine-safe

        • hchan 中的 lock mutex
      • 存儲、傳遞值,FIFO

        • 經過 hchan 中的環形緩衝區來實現
      • 致使 goroutine 的阻塞和恢復

        • hchan 中的 sendqrecvq,也就是 sudog 結構的鏈表隊列
        • 調用運行時調度器 (gopark(), goready())
        • 其它 channel 的操做  
          無緩衝 channel  

          無緩衝的 channel 行爲就和前面說的直接發送的例子同樣:

          • 接收方阻塞 → 發送方直接寫入接收方的棧
          • 發送方阻塞 → 接受法直接從發送方的 sudog 中讀取
          • select  

            https://golang.org/src/runtime/select.go

            1. 先把全部須要操做的 channel 上鎖
            2. 給本身建立一個 sudog,而後添加到全部 channel 的 sendqrecvq(取決因而發送仍是接收)
            3. 把全部的 channel 解鎖,而後 pause 當前調用 select 的 goroutine(gopark()
            4. 而後當有任意一個 channel 可用時,select 的這個 goroutine 就會被調度執行。
            5. resuming mirrors the pause sequence
            6. 爲何 Go 會這樣設計?  
              Simplicity  

              更傾向於帶鎖的隊列,而不是無鎖的實現。

              「性能提高不是憑空而來的,是隨着複雜度增長而增長的。」 - dvyokov

              後者雖然性能可能會更好,可是這個優點,並不必定可以打敗隨之而來的實現代碼的複雜度所帶來的劣勢。

              Performance  
              • 調用 Go 運行時調度器,這樣能夠保持 OS 線程不被阻塞

              跨 goroutine 的棧讀、寫。

              • 可讓 goroutine 醒來後沒必要獲取鎖
              • 能夠避免一些內存複製

              固然,任何優點都會有其代價。這裏的代價是實現的複雜度,因此這裏有更復雜的內存管理機制、垃圾回收以及棧收縮機制。

              在這裏性能的提升優點,要比複雜度的提升帶來的劣勢要大。

              因此在 channel 實現的各類代碼中,咱們均可以見到這種 simplicity vs performance 的權衡後的結果。

相關文章
相關標籤/搜索