GO的併發之道-Goroutine調度原理&Channel詳解

併發(並行),一直以來都是一個編程語言裏的核心主題之一,也是被開發者關注最多的話題;Go語言做爲一個出道以來就自帶 『高併發』光環的富二代編程語言,它的併發(並行)編程確定是值得開發者去探究的,而Go語言中的併發(並行)編程是經由goroutine實現的,goroutine是golang最重要的特性之一,具備使用成本低、消耗資源低、能效高等特色,官方宣稱原生goroutine併發成千上萬不成問題,因而它也成爲Gopher們常用的特性。

1、goroutine簡介

Golang被極度讚賞的是它的異步機制,也就是goroutine。goroutine使用方式很是的簡單,只需使用go關鍵字便可啓動一個協程, 而且它是處於異步方式運行,你不須要等它運行完成之後再執行之後的代碼。html

go func()//經過go關鍵字啓動一個協程來運行函數

除去語法上的簡潔,goroutine是一個協程,也就是比線程更節省資源,一個線程中能夠有多個協程,並且goroutine被分配到多個CPU上運行,是真正意義上的併發。golang

go func()//經過go關鍵字啓動一個協程來運行函數

2、goroutine內部原理

在介紹goroutine原理以前,先對一些關鍵概念進行介紹:編程

關鍵概念

併發緩存

一個cpu上能同時執行多項任務,在很短期內,cpu來回切換任務執行(在某段很短期內執行程序a,而後又迅速得切換到程序b去執行),有時間上的重疊(宏觀上是同時的,微觀還是順序執行),這樣看起來多個任務像是同時執行,這就是併發。安全

並行併發

當系統有多個CPU時,每一個CPU同一時刻都運行任務,互不搶佔本身所在的CPU資源,同時進行,稱爲並行。異步

簡單理解編程語言

你吃飯吃到一半,電話來了,你一直到吃完了之後纔去接,這就說明你不支持併發也不支持並行。
你吃飯吃到一半,電話來了,你停了下來接了電話,接完後繼續吃飯,這說明你支持併發。
你吃飯吃到一半,電話來了,你一邊打電話一邊吃飯,這說明你支持並行。ide

併發的關鍵是你有處理多個任務的能力,不必定要同時。
並行的關鍵是你有同時處理多個任務的能力。函數

在計算機中就是:

因此我認爲它們最關鍵的點就是:是不是『同時』。

  

進程

cpu在切換程序的時候,若是不保存上一個程序的狀態(也就是咱們常說的context--上下文),直接切換下一個程序,就會丟失上一個程序的一系列狀態,因而引入了進程這個概念,用以劃分好程序運行時所須要的資源。

所以進程就是一個程序運行時候的所須要的基本資源單位(也能夠說是程序運行的一個實體)。

線程

cpu切換多個進程的時候,會花費很多的時間,由於切換進程須要切換到內核態,而每次調度須要內核態都須要讀取用戶態的數據,進程一旦多起來,cpu調度會消耗一大堆資源,所以引入了線程的概念,線程自己幾乎不佔有資源,他們共享進程裏的資源,內核調度起來不會那麼像進程切換那麼耗費資源。

線程是進程的一個執行實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。

NOTE:線程包括三大類,並且goroutine也並不是真正地協程。(請查看:《線程那些事兒》)

有時候爲了方便理解能夠簡單把goroutine類比成協程,但內心必定要有個清晰的認知 — goroutine並不等同於協程。

協程

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以,協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。線程和進程的操做是由程序觸發系統接口,最後的執行者是系統;協程的操做執行者則是用戶自身程序,goroutine也是協程。

 

 

G-P-M調度模型簡介

groutine能擁有強大的併發實現是經過GPM調度模型實現,下面就來解釋下goroutine的調度模型。

 

Go的調度器內部的三個重要的結構:M,P,G
M:M表明內核級線程,一個M就是一個線程,goroutine就是跑在M之上的;M是一個很大的結構,裏面維護小對象內存cache(mcache)、當前執行的goroutine、隨機數發生器等等很是多的信息
G:表明一個goroutine,它有本身的棧,instruction pointer和其餘信息(正在等待的channel等等),用於調度。
P:P全稱是Processor,處理器,它的主要用途就是用來執行goroutine的,因此它也維護了一個goroutine隊列,裏面存儲了全部須要它來執行的goroutine

NOTE:G-P-M模型詳解,請查看該篇博文。

 

調度實現

從上圖中看,有2個物理線程M,每個M都擁有一個處理器P,每個也都有一個正在運行的goroutine。
P的數量能夠經過GOMAXPROCS()來設置,它其實也就表明了真正的併發度,即有多少個goroutine能夠同時運行。
圖中灰色的那些goroutine並無運行,而是出於ready的就緒態,正在等待被調度。P維護着這個隊列(稱之爲runqueue),
Go語言裏,啓動一個goroutine很容易:go function 就行,因此每有一個go語句被執行,runqueue隊列就在其末尾加入一個
goroutine,在下一個調度點,就從runqueue中取出(如何決定取哪一個goroutine?)一個goroutine執行。

 

當一個OS線程M0陷入阻塞時(以下圖),P轉而在運行M1,圖中的M1多是正被建立,或者從線程緩存中取出。

 


當MO返回時,它必須嘗試取得一個P來運行goroutine,通常狀況下,它會從其餘的OS線程那裏拿一個P過來,
若是沒有拿到的話,它就把goroutine放在一個global runqueue裏,而後本身睡眠(放入線程緩存裏)。全部的P也會週期性的檢查global runqueue並運行其中的goroutine,不然global runqueue上的goroutine永遠沒法執行。
 
另外一種狀況是P所分配的任務G很快就執行完了(分配不均),這就致使了這個處理器P很忙,可是其餘的P還有任務,此時若是global runqueue沒有任務G了,那麼P不得不從其餘的P裏拿一些G來執行。通常來講,若是P從其餘的P那裏要拿任務的話,通常就拿run queue的一半,這就確保了每一個OS線程都能充分的使用,以下圖:
參考地址:http://morsmachine.dk/go-scheduler
 

3、使用goroutine

基本使用

設置goroutine運行的CPU數量,最新版本的go已經默認已經設置了。

num := runtime.NumCPU()    //獲取主機的邏輯CPU個數
runtime.GOMAXPROCS(num)    //設置可同時執行的最大CPU數

使用示例

package main

import (
    "fmt"
    "time"
)

func cal(a int , b int )  {
    c := a+b
    fmt.Printf("%d + %d = %d\n",a,b,c)
}

func main() {
  
    for i :=0 ; i<10 ;i++{
        go cal(i,i+1)  //啓動10個goroutine 來計算
    }
    time.Sleep(time.Second * 2) // sleep做用是爲了等待全部任務完成
} 
//結果
//8 + 9 = 17
//9 + 10 = 19
//4 + 5 = 9
//5 + 6 = 11
//0 + 1 = 1
//1 + 2 = 3
//2 + 3 = 5
//3 + 4 = 7
//7 + 8 = 15
//6 + 7 = 13
View Code

 

goroutine異常捕捉

當啓動多個goroutine時,若是其中一個goroutine異常了,而且咱們並無對進行異常處理,那麼整個程序都會終止,因此咱們在編寫程序時候最好每一個goroutine所運行的函數都作異常處理,異常處理採用recover

package main

import (
    "fmt"
    "time"
)

func addele(a []int ,i int)  {
    defer func() {    //匿名函數捕獲錯誤
        err := recover()
        if err != nil {
            fmt.Println("add ele fail")
        }
    }()
   a[i]=i
   fmt.Println(a)
}

func main() {
    Arry := make([]int,4)
    for i :=0 ; i<10 ;i++{
        go addele(Arry,i)
    }
    time.Sleep(time.Second * 2)
}
//結果
add ele fail
[0 0 0 0]
[0 1 0 0]
[0 1 2 0]
[0 1 2 3]
add ele fail
add ele fail
add ele fail
add ele fail
add ele fail
View Code

 

同步的goroutine

因爲goroutine是異步執行的,那頗有可能出現主程序退出時還有goroutine沒有執行完,此時goroutine也會跟着退出。此時若是想等到全部goroutine任務執行完畢才退出,go提供了sync包和channel來解決同步問題,固然若是你能預測每一個goroutine執行的時間,你還能夠經過time.Sleep方式等待全部的groutine執行完成之後在退出程序(如上面的列子)。

示例一:使用sync包同步goroutine
sync大體實現方式
WaitGroup 等待一組goroutinue執行完畢. 主程序調用 Add 添加等待的goroutinue數量. 每一個goroutinue在執行結束時調用 Done ,此時等待隊列數量減1.,主程序經過Wait阻塞,直到等待隊列爲0.
 
package main

import (
    "fmt"
    "sync"
)

func cal(a int , b int ,n *sync.WaitGroup)  {
    c := a+b
    fmt.Printf("%d + %d = %d\n",a,b,c)
    defer n.Done() //goroutinue完成後, WaitGroup的計數-1

}

func main() {
    var go_sync sync.WaitGroup //聲明一個WaitGroup變量
    for i :=0 ; i<10 ;i++{
        go_sync.Add(1) // WaitGroup的計數加1
        go cal(i,i+1,&go_sync)  
    }
    go_sync.Wait()  //等待全部goroutine執行完畢
}
//結果
9 + 10 = 19
2 + 3 = 5
3 + 4 = 7
4 + 5 = 9
5 + 6 = 11
1 + 2 = 3
6 + 7 = 13
7 + 8 = 15
0 + 1 = 1
8 + 9 = 17
View Code

 

示例二:經過channel實現goroutine之間的同步。

實現方式:經過channel能在多個groutine之間通信,當一個goroutine完成時候向channel發送退出信號,等全部goroutine退出時候,利用for循環channe去channel中的信號,若取不到數據會阻塞原理,等待全部goroutine執行完畢,使用該方法有個前提是你已經知道了你啓動了多少個goroutine。

package main

import (
    "fmt"
    "time"
)

func cal(a int , b int ,Exitchan chan bool)  {
    c := a+b
    fmt.Printf("%d + %d = %d\n",a,b,c)
    time.Sleep(time.Second*2)
    Exitchan <- true
}

func main() {

    Exitchan := make(chan bool,10)  //聲明並分配管道內存
    for i :=0 ; i<10 ;i++{
        go cal(i,i+1,Exitchan)
    }
    for j :=0; j<10; j++{   
         <- Exitchan  //取信號數據,若是取不到則會阻塞
    }
    close(Exitchan) // 關閉管道
}
View Code

goroutine之間的通信

goroutine本質上是協程,能夠理解爲不受內核調度,而受go調度器管理的線程。goroutine之間能夠經過channel進行通訊或者說是數據共享,固然你也可使用全局變量來進行數據共享。

示例:使用channel模擬消費者和生產者模式

package main

import (
    "fmt"
    "sync"
)

func Productor(mychan chan int,data int,wait *sync.WaitGroup)  {
    mychan <- data
    fmt.Println("product data:",data)
    wait.Done()
}
func Consumer(mychan chan int,wait *sync.WaitGroup)  {
     a := <- mychan
    fmt.Println("consumer data:",a)
     wait.Done()
}
func main() {

    datachan := make(chan int, 100)   //通信數據管道
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        go Productor(datachan, i,&wg) //生產數據
        wg.Add(1)
    }
    for j := 0; j < 10; j++ {
        go Consumer(datachan,&wg)  //消費數據
        wg.Add(1)
    }
    wg.Wait()
}
//結果
consumer data: 4
product data: 5
product data: 6
product data: 7
product data: 8
product data: 9
consumer data: 1
consumer data: 5
consumer data: 6
consumer data: 7
consumer data: 8
consumer data: 9
product data: 2
consumer data: 2
product data: 3
consumer data: 3
product data: 4
consumer data: 0
product data: 0
product data: 1
View Code

4、channel

不一樣goroutine之間是如何進行通信的呢?

  • 方法一:全局變量和鎖同步
  • 方法二:Channel

這裏咱們主要注重講解下go中特有的channel,其相似於UNIX中的管道(piple)。

channel概念

channel俗稱管道,用於數據傳遞或數據共享,其本質是一個先進先出的隊列,使用goroutine+channel進行數據通信簡單高效,同時也線程安全多個goroutine可同時修改一個channel,不須要加鎖

channel操做

定義和聲明:

1 var 變量名 chan 類型    //channel是有類型的,一個整數的channel只能存放整數
2 
3 var test chan int 
4 
5 var test chan map[string]string
6 
7 var test chan *stu

channel可分爲三種:

只讀channel:只能讀channel裏面數據,不可寫入

只寫channel:只能寫數據,不可讀

通常channel:可讀可寫

複製代碼
var readOnlyChan <-chan int            // 只讀chan
var writeOnlyChan chan<- int           // 只寫chan
var mychan  chan int                   //讀寫channel
mychannel = make(chan int,10)

//或者
read_only := make (<-chan int,10)//定義只讀的channel
write_only := make (chan<- int,10)//定義只寫的channel
read_write := make (chan int,10)//可同時讀寫
複製代碼
定義完成之後須要make來分配內存空間,否則會deadlock!
//定義一個結構體類型的channel

package main

type student struct{
    name string
}

func main() {
    var stuChan chan student
    stuChan = make(chan student, 10)

    stu := student{name:"syu01"}

    stuChan <- stu  
}
struct類型channel

 

讀寫數據

ch <- "wd"  //寫數據
a := <- ch //讀取數據
a, ok := <-ch  //推薦的讀取數據方法

注意:

  • 管道若是未關閉,在讀取超時會則會引起deadlock異常
  • 管道若是關閉進行寫入數據會pannic
  • 當管道中沒有數據時候再行讀取或讀取到默認值,如int類型默認值是0

遍歷管道

  • 使用for range遍歷管道,若是管道未關閉會引起deadlock錯誤。
  • 若是採用for死循環已經關閉的管道,當管道沒有數據時候,讀取的數據會是管道的默認值,而且循環不會退出。
package main

import (
    "fmt"
    "time"
)


func main() {
    mychannel := make(chan int,10)
    for i := 0;i < 10;i++{
        mychannel <- i
    }
    close(mychannel)  //關閉管道
    fmt.Println("data lenght: ",len(mychannel))
    for  v := range mychannel {  //遍歷管道
        fmt.Println(v)
    }
    fmt.Printf("data lenght:  %d",len(mychannel))
}
View Code

帶緩衝區channe和不帶緩衝區channel

帶緩衝區channel:定義聲明時候制定了緩衝區大小(長度),能夠保存多個數據。

不帶緩衝區channel:只能存一個數據,而且只有當該數據被取出時候才能存下一個數據。

ch := make(chan int) //不帶緩衝區
ch := make(chan int ,10) //帶緩衝區

不帶緩衝區示例:

package main

import "fmt"

func test(c chan int) {
    for i := 0; i < 10; i++ {
        fmt.Println("send ", i)
        c <- i
    }
}
func main() {
    ch := make(chan int)
    go test(ch)
    for j := 0; j < 10; j++ {
        fmt.Println("get ", <-ch)
    }
}


//結果:
send  0
send  1
get  0
get  1
send  2
send  3
get  2
get  3
send  4
send  5
get  4
get  5
send  6
send  7
get  6
get  7
send  8
send  9
get  8
get  9
View Code

channel實現做業池

咱們建立三個channel,一個channel用於接受任務,一個channel用於保持結果,還有個channel用於決定程序退出的時候。

package main

import (
    "fmt"
)

func Task(taskch, resch chan int, exitch chan bool) {
    defer func() {   //異常處理
        err := recover()
        if err != nil {
            fmt.Println("do task error:", err)
            return
        }
    }()

    for t := range taskch { //  處理任務
        fmt.Println("do task :", t)
        resch <- t //
    }
    exitch <- true //處理完髮送退出信號
}

func main() {
    taskch := make(chan int, 20) //任務管道
    resch := make(chan int, 20)  //結果管道
    exitch := make(chan bool, 5) //退出管道
    go func() {
        for i := 0; i < 10; i++ {
            taskch <- i
        }
        close(taskch)
    }()


    for i := 0; i < 5; i++ {  //啓動5個goroutine作任務
        go Task(taskch, resch, exitch)
    }

    go func() { //等5個goroutine結束
        for i := 0; i < 5; i++ {
            <-exitch
        }
        close(resch)  //任務處理完成關閉結果管道,否則range報錯
        close(exitch)  //關閉退出管道
    }()

    for res := range resch{  //打印結果
        fmt.Println("task res:",res)
    }
}
View Code

只讀channel和只寫channel

通常定義只讀和只寫的管道意義不大,更多時候咱們能夠在參數傳遞時候指明管道可讀仍是可寫,即便當前管道是可讀寫的。

package main

import (
    "fmt"
    "time"
)

//只能向chan裏寫數據
func send(c chan<- int) {
    for i := 0; i < 10; i++ {
        c <- i
    }
}
//只能取channel中的數據
func get(c <-chan int) {
    for i := range c {
        fmt.Println(i)
    }
}
func main() {
    c := make(chan int)
    go send(c)
    go get(c)
    time.Sleep(time.Second*1)
}
//結果
0
1
2
3
4
5
6
7
8
9
View Code  

select-case實現非阻塞channel

原理經過select+case加入一組管道,當知足(這裏說的知足意思是有數據可讀或者可寫)select中的某個case時候,那麼該case返回,若都不知足case,則走default分支。

package main

import (
    "fmt"
)

func send(c chan int)  {
    for i :=1 ; i<10 ;i++  {
     c <-i
     fmt.Println("send data : ",i)
    }
}

func main() {
    resch := make(chan int,20)
    strch := make(chan string,10)
    go send(resch)
    strch <- "wd"
    select {
    case a := <-resch:
        fmt.Println("get data : ", a)
    case b := <-strch:
        fmt.Println("get data : ", b)
    default:
        fmt.Println("no channel actvie")

    }

}

//結果:get data :  wd
View Code

channel中定時器的使用

在對channel進行讀寫的時,能夠對讀寫進行頻率控制,經過time.Ticke實現

示例:

package main

import (
    "time"
    "fmt"
)

func main(){
    requests:= make(chan int ,5)
    for i:=1;i<5;i++{
        requests<-i
    }
    close(requests)
    limiter := time.Tick(time.Second*1)
    for req:=range requests{
        <-limiter
        fmt.Println("requets",req,time.Now()) //執行到這裏,須要隔1秒才繼續往下執行,time.Tick(timer)上面已定義
    }
}
//結果:
requets 1 2018-07-06 10:17:35.98056403 +0800 CST m=+1.004248763
requets 2 2018-07-06 10:17:36.978123472 +0800 CST m=+2.001798205
requets 3 2018-07-06 10:17:37.980869517 +0800 CST m=+3.004544250
requets 4 2018-07-06 10:17:38.976868836 +0800 CST m=+4.000533569
View Code
相關文章
相關標籤/搜索