併發Concurrency緩存
不少人都是衝着 Go 大肆宣揚的高併發而忍不住躍躍欲試,但其實從源碼的解析來看,goroutine 只是由官方實現的超級「線程池」而已。不過話說回來,每一個實例 4~5KB的棧內存佔用和因爲實現機制而大幅減小的建立和銷燬開銷,是製造 Go 號稱的高併發的根本緣由。另外,goroutine 的簡單易用,也在語言層面上給予了開發者巨大的遍歷。安全
高併發當中必定要注意:併發可不是並行。多線程
併發主要由切換時間片來實現「同時」運行,而並行則是直接利用多核實現多線程的運行,但 Go 能夠設置使用核數,以發揮多核計算機的處理能力。併發
goroutine 奉行經過通訊來共享內存,而不是共享內存來通訊。Go 語言主要是經過 Channe 技術通訊來實現內存的共享的,由於 channel 是一個通道,Go 是經過通道來通訊進行內存數據的共享。異步
對於初學者,goroutine直接理解成爲線程就能夠了。當對一個函數調用go,啓動一個goroutine的時候,就至關於起來一個線程,執行這個函數。函數
實際上,一個goroutine並不至關於一個線程,goroutine的出現正是爲了替代原來的線程概念成爲最小的調度單位。一旦運行goroutine時,先去當先線程查找,若是線程阻塞了,則被分配到空閒的線程,若是沒有空閒的線程,那麼就會新建一個線程。注意的是,當goroutine執行完畢後,線程不會回收推出,而是成爲了空閒的線程。高併發
讓咱們先來看一個最簡單的 goroutine 案例:spa
package main import ( "fmt" "time" ) func main() { //啓用一個goroutine go GoRun() //這裏加一個休眠是由於主線程已啓動就執行完畢消亡來,子線程還來不及執行 time.Sleep(2 * time.Second) } func GoRun() { fmt.Println("Go Go Go!!!") }
運行結果:線程
1
|
Go Go Go!!!
|
Channelcode
1. Channel 是 goroutine 溝通的橋樑,大都是阻塞同步的
2. 它是經過 make 建立,close 關閉
3. Channel 是引用類型
4. 可使用 for range 來迭代,不斷操做 channel
5. 能夠設置單向 或 雙向通道
6. 能夠設置緩存大小,在未被填滿前不會發生阻塞,即它是異步的
那麼針對上溯代碼咱們不使用休眠,而使用 Channel 來實現咱們想要的效果:
channel的意思用白話能夠這麼理解:主線程告訴你們你開goroutine能夠,可是我在個人主線程開了一個管道,你作完了你要作的事情以後,往管道里面塞個東西告訴我你已經完成了。
package main import ( "fmt" ) func main() { //聲明建立一個通道,存儲類型爲bool型 c := make(chan bool) //啓用一個goroutine,使用的是匿名方法方式 go func() { fmt.Println("Go Go Go!!!") c <- true //向 channel 中存入一個值 }() //當程序執行完畢以後再從通道中取出剛纔賦的值 <- c /** 主線程啓動了一個匿名子線程後就執行到了:<-c , 到達這裏主線程就被阻塞了。只有當子線程向通道放入值後主線程阻塞纔會被釋放 其實這個就是完成了消息的發送 */ }
上溯代碼能夠修改成使用 for range 來進行消息的發送:
package main import ( "fmt" ) func main() { //聲明建立一個通道,存儲類型爲bool型,這裏設置的channel就是雙向通道,既能夠存也能夠取 c := make(chan bool) //啓用一個goroutine,使用的是匿名方法方式 go func() { fmt.Println("Go Go Go!!!") c <- true //向 channel 中存入一個值 close(c) //切記若是使用for range來進行取值的時候須要在某個地方進行關閉,不然會發生死鎖 }() //從通道中循環取出剛纔賦的值 for v := range c { fmt.Println(v) } }
從以上代碼能夠看出,通常使用的 Channel 都是雙向通道的,即:既能夠取又能夠存。那單向通道通常用於什麼場景下呢?
單向通道又分爲兩種,一種是隻能讀取,一種是隻能存放,通常用於參數類型傳遞使用。例若有個方法返回一個Channel類型,通常要求操做只能從這裏取,那麼此時它的用途就是隻能存放類型,若是此時你不當心存數據,此時會發生panic 致使程序奔潰發生異常。那麼讀取類型的Channel同理。這樣作其實也是爲了程序的安全性與健壯性,防止一些誤操做。
這裏還有一個知識點,就是有緩存的channel 和 無緩存的channel的區別?
make(chan bool, 1) 表示帶有一個緩存大小的緩存channel
make(chan bool) 或 make(chan bool, 0) 表示一個無緩存的channel
無緩存channel是阻塞的即同步的,而有緩存channel是異步的。怎麼說?好比
c1:=make(chan int) 無緩衝
c2:=make(chan int,1) 有緩衝
c1 <- 1 //往無緩存通道放入數據 1
無緩衝的 不只僅是 向 c1 通道放 1 並且必定要有別的線程 <- c1 接手了這個參數,那麼 c1 <- 1 纔會繼續下去,要否則就一直阻塞着
而 c2 <- 1 則不會阻塞,由於緩衝大小是1 只有當放第二個值的時候第一個還沒被人拿走,這時候纔會阻塞。
打個比喻
無緩衝的 就是一個送信人去你家門口送信 ,你不在家 他不走,你必定要接下信,他纔會走。
無緩衝保證信能到你手上
有緩衝的 就是一個送信人去你家仍到你家的信箱 轉身就走 ,除非你的信箱滿了 他必須等信箱空下來。
有緩衝的 保證 信能進你家的郵箱
那若是在多線程環境下,多個線程併發搶佔會使得打印不是按照順序來,那麼咱們如何確保子線程所有結束完以後主線程再中止呢?主要有兩種方式:
第一種:使用阻塞channel
package main import ( "fmt" "runtime" ) func main() { fmt.Println("當前系統核數:", runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU()) //設置當前程序執行使用的併發數 //定義一個阻塞channel c := make(chan bool) //這裏啓動10個線程運行 for i :=0; i < 10; i++ { go goRun(c, i) } //咱們知道一共有10次循環,那麼在這裏就取10次,那麼子線程goRun只有都執行完了主線程取才能完畢,由於這裏也循環取10次,不夠的話會被阻塞 for i := 0; i < 10; i++ { <- c } } func goRun(c chan bool, index int) { a := 1 //循環疊加1千萬次並返回最終結果 for i := 0; i < 10000000; i++ { a += i } fmt.Println("線程序號:", index, a) //往阻塞隊列插入內容 c <- true }
打印結果:
1
2
3
4
5
6
7
8
9
10
11
|
當前系統核數: 4
線程序號: 9 49999995000001
線程序號: 5 49999995000001
線程序號: 2 49999995000001
線程序號: 0 49999995000001
線程序號: 6 49999995000001
線程序號: 1 49999995000001
線程序號: 3 49999995000001
線程序號: 7 49999995000001
線程序號: 8 49999995000001
線程序號: 4 49999995000001
|
從打印結果能夠看出,多線程環境下運行代碼打印和順序沒有關係,由 CPU 調度本身決定,多運行幾回打印結果必定不會同樣,就是這個道理。
第二種:使用同步機制
package main import ( "fmt" "runtime" "sync" ) func main() { fmt.Println("當前系統核數:", runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU()) //設置當前程序執行使用的併發數 /** waitGroup即任務組,它的最要做用就是用來添加須要工做的任務,沒完成一次任務就標記一次Done,這樣任務組的待完成量會隨之減1 那麼主線程就是來判斷任務組內是否還有未完成任務,當沒有未完成當任務以後主線程就能夠結束運行,從而實現了與阻塞隊列相似的同步功能 這裏建立了一個空的waitGroup(任務組) */ wg := sync.WaitGroup{} wg.Add(10) //添加10個任務到任務組中 //這裏啓動10個線程運行 for i :=0; i < 10; i++ { go goRun(&wg, i) } wg.Wait() } /** 這裏須要傳入引用類型不能傳入值拷貝,由於在子線程中是須要執行Done操做,相似與咱們修改結構體中的int變量主詞遞減,若是是隻拷貝的話是不會影響原類型內的數據 這樣就會發生死循環致使死鎖程序奔潰,報錯異常爲:fatal error: all goroutines are asleep - deadlock! */ func goRun(wg *sync.WaitGroup, index int) { a := 1 //循環疊加1千萬次並返回最終結果 for i := 0; i < 10000000; i++ { a += i } fmt.Println("線程序號:", index, a) wg.Done() }
打印結果:
1
2
3
4
5
6
7
8
9
10
11
|
當前系統核數: 4
線程序號: 1 49999995000001
線程序號: 5 49999995000001
線程序號: 0 49999995000001
線程序號: 9 49999995000001
線程序號: 4 49999995000001
線程序號: 3 49999995000001
線程序號: 2 49999995000001
線程序號: 6 49999995000001
線程序號: 8 49999995000001
線程序號: 7 49999995000001
|
以上全部講解到的都是基於一個 channel 來講的,那麼當咱們有多個 channel 的時候又該怎麼處理呢?
Go 語言爲咱們提供了一種結構名爲:Select,它和 switch 是很是類似的,switch 主要用於普通類型作判斷的,而 select 主要是針對多個 channel 來進行判斷的。
Select
1. 可處理一個或多個 channel 的發送與接收
2. 同時有多個可用的 channel 時,能夠按隨機順序處理
3. 可使用空的 select 來阻塞 main 函數
4. 它還能夠設置超時時間
案例一:用多個 channel 來接收數據:
package main import ( "fmt" ) /** 數據接收處理 */ func main() { //批量初始化channel c1, c2 := make(chan int), make(chan string) //建立一個啓動goroutine的匿名函數 go func() { /** 建立一個無限循環語句,使用select進行處理 咱們通常都是使用這種方式來處理不斷的消息發送和處理 */ for { select { case v, ok := <- c1: if !ok { break } fmt.Println("c1:", v) case v, ok := <- c2: if !ok { break } fmt.Println("c2:", v) } } }() c1 <- 1 c2 <- "liang" c1 <- 2 c2 <- "xuli" //關閉channel close(c1) close(c2) }
打印結果:
1
2
3
4
|
c1: 1
c2: liang
c1: 2
c2: xuli
|
案例二:用多個 channel 來發送數據:
package main import ( "fmt" ) /** 數據接收處理,這裏實現隨機接收0、1 數字並打印 */ func main() { c := make(chan int) num := 0 //建立一個啓動goroutine的匿名函數 go func() { for v := range c { num++ if num & 15 == 0 { fmt.Println() } fmt.Print(v, " ") } }() for { select { case c <- 0: case c <- 1: } } }
打印結果:(只是粘貼了其中一部分)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
1 1 0 1 1 0 0 0 0 1 0 1 0 0 1 0
0 1 0 1 1 0 1 1 0 0 1 1 1 0 0 1
1 1 1 1 0 0 1 1 0 0 0 0 0 1 0 1
0 1 1 0 0 0 1 1 1 0 0 0 1 1 0 0
1 1 1 0 0 0 0 0 1 0 1 1 1 1 1 1
0 0 1 0 0 0 0 1 0 1 1 0 1 1 1 0
1 1 1 0 0 0 1 1 1 0 0 0 1 0 0 1
1 0 1 1 1 1 0 0 1 0 0 1 1 1 1 1
1 1 0 0 0 0 0 1 1 1 0 1 1 0 1 1
1 1 0 0 0 0 1 0 0 1 0 1 0 0 1 1
0 0 0 1 1 1 1 1 0 0 0 1 0 0 0 1
0 1 1 0 1 0 1 0 1 0 0 1 1 0 0 0
0 1 0 0 0 1 0 0 0 1 1 0 0 0 1 1
1 1 0 1 1 1 1 0 0 0 1 0 0 0 1 1
0 1 1 0 0 1 1 0 1 0 1 0 0 0 0 1
0 1 1 0 0 0 1 1 0 1 0 1 0 0 0 0
0 0 0 1 0 0 0 1 1 1 1 1 1 1 1 0
|
案例三:用 channel 設置超時時間:
package main import ( "fmt" "time" ) /** select的超時應用 */ func main() { c := make(chan bool) select { case v := <- c : fmt.Println(v) case <- time.After(3 * time.Second): fmt.Println("TimeOut!!!") } }
打印結果:
1
|
TimeOut!!!
|