《Go in action》讀後記錄:Go的併發與並行

本文的主要內容是:算法

  • 瞭解goroutine,使用它來運行程序
  • 瞭解Go是如何檢測並修正競爭狀態的(解決資源互斥訪問的方式)
  • 瞭解並使用通道chan來同步goroutine

1、使用goroutine來運行程序

1.Go的併發與並行

Go的併發能力,是指讓某個函數獨立於其餘函數運行的能力。當爲一個函數建立goroutine時,該函數將做爲一個獨立的工做單元,被 調度器 調度到可用的邏輯處理器上執行。Go的運行時調度器是個複雜的軟件,它作的工做大體是:緩存

  • 管理被建立的全部goroutine,爲其分配執行時間
  • 將操做系統線程與語言運行時的邏輯處理器綁定

參考The Go scheduler ,這裏較淺顯地說一下Go的運行時調度器。操做系統會在物理處理器上調度操做系統線程來運行,而Go語言的運行時會在邏輯處理器上調度goroutine來運行,每一個邏輯處理器都分別綁定到單個操做系統線程上。這裏涉及到三個角色:安全

  • M:操做系統線程,這是真正的內核OS線程
  • P:邏輯處理器,表明着調度的上下文,它使goroutine在一個M上跑
  • G:goroutine,擁有本身的棧,指令指針等信息,被P調度

每一個P會維護一個全局運行隊列(稱爲runqueue),處於ready就緒狀態的goroutine(灰色G)被放在這個隊列中等待被調度。在編寫程序時,每當go func啓動一個goroutine時,runqueue便在尾部加入一個goroutine。在下一個調度點上,P就從runqueue中取出一個goroutine出來執行(藍色G)。併發

當某個操做系統線程M阻塞的時候(好比goroutine執行了阻塞的系統調用),P能夠綁定到另一個操做系統線程M上,讓運行隊列中的其餘goroutine繼續執行:函數

上圖中G0執行了阻塞操做,M0被阻塞,P將在新的系統線程M1上繼續調度G執行。M1有多是被新建立的,或者是從線程緩存中取出。Go調度器保證有足夠的線程來運行全部的P,語言運行時默認限制每一個程序最多建立10000個線程,這個如今能夠經過調用runtime/debug包的SetMaxThreads方法來更改。工具

Go能夠在在一個邏輯處理器P上實現併發,若是須要並行,必須使用多於1個的邏輯處理器。Go調度器會把goroutine平等分配到每一個邏輯處理器上,此時goroutine將在不一樣的線程上運行,不過前提是要求機器擁有多個物理處理器。ui

2.建立goroutine

使用關鍵字go來建立一個goroutine,並讓全部的goroutine都獲得執行:atom

//example1.go
package main

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   wg sync.WaitGroup
)

func main() {
   //分配一個邏輯處理器P給調度器使用
   runtime.GOMAXPROCS(1)
   //在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine
   wg.Add(2)
   //聲明1個匿名函數,並建立一個goroutine
   fmt.Printf("Begin Coroutines\n")
   go func() {
      //在函數退出時,wg計數器減1
      defer wg.Done()
      //打印3次小寫字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c ", char)
         }
      }
   }()
   //聲明1個匿名函數,並建立一個goroutine
   go func() {
      defer wg.Done()
      //打印大寫字母表3次
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            fmt.Printf("%c ", char)
         }
      }
   }()

   fmt.Printf("Waiting To Finish\n")
   //等待2個goroutine執行完畢
   wg.Wait()

}

這個程序使用runtime.GOMAXPROCS(1)來分配一個邏輯處理器給調度器使用,兩個goroutine將被該邏輯處理器調度併發執行。程序輸出:操作系統

Begin Coroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z

從輸出來看,是先執行完一個goroutine,再接着執行第二個goroutine的,大寫字母所有打印完後,再打印所有的小寫字母。那麼,有沒有辦法讓兩個goroutine並行執行呢?爲程序指定兩個邏輯處理器便可:線程

//修改成2個邏輯處理器
runtime.GOMAXPROCS(2)

此時執行程序,輸出爲:

Begin Coroutines
Waiting To Finish
A B C D E a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c F G H I J K L M N O P Q R S T U V W X d e f g h i j k l m n o p q r s Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z t u v w x y z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z

那若是隻有1個邏輯處理器,如何讓兩個goroutine交替被調度?實際上,若是goroutine須要很長的時間才能運行完,調度器的內部算法會將當前運行的goroutine讓出,防止某個goroutine長時間佔用邏輯處理器。因爲示例程序中兩個goroutine的執行時間都很短,在爲引發調度器調度以前已經執行完。不過,程序也可使用runtime.Gosched()來將當前在邏輯處理器上運行的goruntine讓出,讓另外一個goruntine獲得執行:

//example2.go
package main

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   wg sync.WaitGroup
)

func main() {
   //分配一個邏輯處理器P給調度器使用
   runtime.GOMAXPROCS(1)
   //在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine
   wg.Add(2)
   //聲明1個匿名函數,並建立一個goroutine
   fmt.Printf("Begin Coroutines\n")
   go func() {
      //在函數退出時,wg計數器減1
      defer wg.Done()
      //打印3次小寫字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            if char=='k'{
               runtime.Gosched()
            }
            fmt.Printf("%c ", char)
         }
      }
   }()
   //聲明1個匿名函數,並建立一個goroutine
   go func() {
      defer wg.Done()
      //打印大寫字母表3次
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            if char == 'K'{
               runtime.Gosched()
            }
            fmt.Printf("%c ", char)
         }
      }
   }()

   fmt.Printf("Waiting To Finish\n")
   //等待2個goroutine執行完畢
   wg.Wait()

}

兩個goroutine在循環的字符爲k/K的時候會讓出邏輯處理器,程序的輸出結果爲:

Begin Coroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J a b c d e f g h i j K L M N O P Q R S T U V W X Y Z A B C D E F G H I J k l m n o p q r s t u v w x y z a b c d e f g h i j K L M N O P Q R S T U V W X Y Z k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z

這裏大小寫字母果真是交替着輸出了。不過從輸出能夠看到,第一次輸出大寫字母時遇到K沒有讓出邏輯處理器,這是什麼緣由還不是很清楚,調度器的調度機制?

2、處理競爭狀態

併發程序避免不了的一個問題是對資源的同步訪問。若是多個goroutine在沒有互相同步的狀況下去訪問同一個資源,並進行讀寫操做,這時goroutine就處於競爭狀態下:

//example3.go
package main

import (
   "sync"
   "runtime"
   "fmt"
)

var (
   //counter爲訪問的資源
   counter int64
   wg      sync.WaitGroup
)

func addCount() {
   defer wg.Done()
   for count := 0; count < 2; count++ {
      value := counter
      //當前goroutine從線程退出
      runtime.Gosched()
      value++
      counter=value
   }
}

func main() {
   wg.Add(2)
   go addCount()
   go addCount()
   wg.Wait()
   fmt.Printf("counter: %d\n",counter)
}
//output:
counter: 4 或者counter: 2

這段程序中,goroutinecounter的讀寫操做沒有進行同步,goroutine 1對counter的寫結果可能被goroutine 2所覆蓋。Go可經過以下方式來解決這個問題:

  • 使用原子函數操做
  • 使用互斥鎖鎖住臨界區
  • 使用通道chan

1. 檢測競爭狀態

有時候競爭狀態並不能一眼就看出來。Go 提供了一個很是有用的工具,用於檢測競爭狀態。使用方式是:

go build -race example4.go//用競爭檢測器標誌來編譯程序
./example4 //運行程序

工具檢測出了程序存在一處競爭狀態,並指出發生競爭狀態的幾行代碼是:

22        counter=value
18        value := counter
28        go addCount()
29        go addCount()

2. 使用原子函數

對整形變量或指針的同步訪問,可使用原子函數來進行。這裏使用原子函數來修復example4.go中的競爭狀態問題:

//example5.go
package main

import (
    "sync"
    "runtime"
    "fmt"
    "sync/atomic"
)

var (
    //counter爲訪問的資源
    counter int64
    wg      sync.WaitGroup
)

func addCount() {
    defer wg.Done()
    for count := 0; count < 2; count++ {
        //使用原子操做來進行
        atomic.AddInt64(&counter,1)
        //當前goroutine從線程退出
        runtime.Gosched()
    }
}

func main() {
    wg.Add(2)
    go addCount()
    go addCount()
    wg.Wait()
    fmt.Printf("counter: %d\n",counter)
}
//output:
counter: 4

這裏使用atomic.AddInt64函數來對一個整形數據進行加操做,另一些有用的原子操做還有:

atomic.StoreInt64() //寫
atomic.LoadInt64()  //讀

更多的原子操做函數請看atomic包中的聲明。

3. 使用互斥鎖

對臨界區的訪問,可使用互斥鎖來進行。對於example4.go的競爭狀態,可使用互斥鎖來解決:

//example5.go
package main

import (
   "sync"
   "runtime"
   "fmt"
)

var (
   //counter爲訪問的資源
   counter int
   wg      sync.WaitGroup
   mutex   sync.Mutex
)

func addCount() {
   defer wg.Done()

   for count := 0; count < 2; count++ {
      //加上鎖,進入臨界區域
      mutex.Lock()
      {
         value := counter
         //當前goroutine從線程退出
         runtime.Gosched()
         value++
         counter = value
      }
      //離開臨界區,釋放互斥鎖
      mutex.Unlock()
   }
}
func main() {
   wg.Add(2)
   go addCount()
   go addCount()
   wg.Wait()
   fmt.Printf("counter: %d\n", counter)
}

//output:
counter: 4

使用Lock()Unlock()函數調用來定義臨界區,在同一個時刻內,只有一個goroutine可以進入臨界區,直到調用Unlock()函數後,其餘的goroutine纔可以進入臨界區。

在Go中解決共享資源安全訪問,更經常使用的使用通道chan。

3、利用通道共享數據

Go語言採用CSP消息傳遞模型。經過在goroutine之間傳遞數據來傳遞消息,而不是對數據進行加鎖來實現同步訪問。這裏就須要用到通道chan這種特殊的數據類型。當一個資源須要在goroutine中共享時,chan在goroutine中間架起了一個通道。通道使用make來建立:

unbuffered := make(char int) //建立無緩存通道,用於int類型數據共享
buffered := make(chan string,10)//建立有緩存通道,用於string類型數據共享
buffered<- "hello world" //向通道中寫入數據
value:= <-buffered //從通道buffered中接受數據

通道用於放置某一種類型的數據。建立通道時指定通道的大小,將建立有緩存的通道。無緩存通道是一種同步通訊機制,它要求發送goroutine和接收goroutine都應該準備好,不然會進入阻塞。

1. 無緩存的通道

無緩存通道是同步的——一個goroutine向channel寫入消息的操做會一直阻塞,直到另外一個goroutine從通道中讀取消息。反過來也是,一個goroutine從channel讀取消息的操做會一直阻塞,直到另外一個goroutine向通道中寫入消息。《Go in action》中關於無緩存通道的解釋有一個很是棒的例子:網球比賽。在網球比賽中,兩位選手老是處在如下兩種狀態之一:要麼在等待接球,要麼在把球打向對方。球的傳遞可看爲通道中數據傳遞。下面這段代碼使用通道模擬了這個過程:

//example6.go
package main

import (
    "sync"
    "fmt"
    "math/rand"
    "time"
)

var wg sync.WaitGroup

func player(name string, court chan int) {
    defer wg.Done()
    for {
        //若是通道關閉,那麼選手勝利
        ball, ok := <-court
        if !ok {
            fmt.Printf("Player %s Won\n", name)
            return
        }
        n := rand.Intn(100)

        //隨機機率使某個選手Miss
        if n%13 == 0 {
            fmt.Printf("Player %s Missed\n", name)
            //關閉通道
            close(court)
            return
        }
        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball++
        //不然選手進行擊球
        court <- ball
    }
}


func main() {
    rand.Seed(time.Now().Unix())
    court := make(chan int)
    //等待兩個goroutine都執行完
    wg.Add(2)
    //選手1等待接球
    go player("candy", court)
    //選手2等待接球
    go player("luffic", court)
    //球進入球場(能夠開始比賽了)
    court <- 1
    wg.Wait()
}
//output:
Player luffic Hit 1
Player candy Hit 2
Player luffic Hit 3
Player candy Hit 4
Player luffic Hit 5
Player candy Missed
Player luffic Won

2. 有緩存的通道

有緩存的通道是一種在被接收前能存儲一個或者多個值的通道,它與無緩存通道的區別在於:無緩存的通道保證進行發送和接收的goroutine會在同一時間進行數據交換,有緩存的通道沒有這種保證。有緩存通道讓goroutine阻塞的條件爲:通道中沒有數據可讀的時候,接收動做會被阻塞;通道中沒有區域容納更多數據時,發送動做阻塞。向已經關閉的通道中發送數據,會引起panic,可是goroutine依舊能從通道中接收數據,可是不能再向通道里發送數據。因此,發送端應該負責把通道關閉,而不是由接收端來關閉通道。

小結

  • goroutine被邏輯處理器執行,邏輯處理器擁有獨立的系統線程與運行隊列
  • 多個goroutine在一個邏輯處理器上能夠併發執行,當機器有多個物理核心時,可經過多個邏輯處理器來並行執行。
  • 使用關鍵字 go 來建立goroutine。
  • 在Go中,競爭狀態出如今多個goroutine試圖同時去訪問一個資源時。
  • 可使用互斥鎖或者原子函數,去防止競爭狀態的出現。
  • 在go中,更好的解決競爭狀態的方法是使用通道來共享數據。
  • 無緩衝通道是同步的,而有緩衝通道不是。

(完)

相關文章
相關標籤/搜索