老聽 clojure 社區的人提起 core.async ,說它如何好用,如何簡化了併發編程的模型,不禁得勾起了個人好奇心,想了解一番其思想的源頭:CSP 模型及受其啓發的 goroutine 和 channel 。html
CSP 描述這樣一種併發模型:多個Process 使用一個 Channel 進行通訊, 這個 Channel 連結的 Process 一般是匿名的,消息傳遞一般是同步的(有別於 Actor Model)。git
CSP 最先是由 Tony Hoare 在 1977 年提出,聽說老爺子至今仍在更新這個理論模型,有興趣的朋友能夠自行查閱電子版本:http://www.usingcsp.com/cspbook.pdf。github
嚴格來講,CSP 是一門形式語言(相似於 ℷ calculus),用於描述併發系統中的互動模式,也所以成爲一衆面向併發的編程語言的理論源頭,並衍生出了 Occam/Limbo/Golang…golang
而具體到編程語言,如 Golang,其實只用到了 CSP 的很小一部分,即理論中的 Process/Channel(對應到語言中的 goroutine/channel):這兩個併發原語之間沒有從屬關係, Process 能夠訂閱任意個 Channel,Channel 也並不關心是哪一個 Process 在利用它進行通訊;Process 圍繞 Channel 進行讀寫,造成一套有序阻塞和可預測的併發模型。編程
What is a goroutine? It’s an independently executing function, launched by a go statement.
It has its own call stack, which grows and shrinks as required.
It’s very cheap. It’s practical to have thousands, even hundreds of thousands of goroutines.
It’s not a thread.
There might be only one thread in a program with thousands of goroutines.
Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.
But if you think of it as a very cheap thread, you won’t be far off.併發― Rob Pikeasync
以上是 Rob Pike 在 Google I/O 2012 上給出的描述,歸納下來其實就一句話:編程語言
goroutine 能夠視爲開銷很小的線程(既不是物理線程也不是協程,但它擁有本身的調用棧,而且這個棧的大小是可伸縮的 不是協程,它有本身的棧),很好用,須要併發的地方就用 go 起一個 func,goroutine走起
ide
在 Golang 中,任何代碼都是運行在 goroutine裏,即使沒有顯式的 go func()
,默認的 main 函數也是一個 goroutine。函數
但 goroutine 不等於操做系統的線程,它與系統線程的對應關係,牽涉到 Golang 運行時的調度器:
調度器由三方面實體構成:
三者對應關係:
上圖有2個 物理線程 M,每個 M 都擁有一個上下文(P),每個也都有一個正在運行的goroutine(G)。
P 的數量可由 runtime.GOMAXPROCS()
進行設置,它表明了真正的併發能力,便可有多少個 goroutine 同時運行。
調度器爲何要維護多個上下文P 呢?由於當一個物理線程 M 被阻塞時,P 能夠轉而投奔另外一個OS線程 M(即 P 帶着 G 連莖拔起,去另外一個 M 節點下運行)。這是 Golang調度器厲害的地方,也是高併發能力的保障。
channel 是 goroutine 之間通訊(讀寫)的通道。由於它的存在,顯得 Golang(或者說CSP)與傳統的共享內存型的併發模型大相徑庭,用 Effective Go 裏的話來講就是:
Do not communicate by sharing memory; instead, share memory by communicating.
在 Golang 的併發模型中,咱們並不關心是哪一個 goroutine(匿名性)在用 channel,只關心 channel 的性質:
好比我但願在程序裏併發的計算並傳遞一個整型值,我就會定義一個 int 型的 channel:
value := make(chan int)
無緩衝的 channel因爲 make 這個 channel 並未提供第二個參數capacity,所以這個 channel 是不帶緩衝區的,即同步阻塞的channel:
它有以下特色:
1. 不能夠在同一個 goroutine 中既讀又寫,不然將會死鎖,拋出如
fatal error: all goroutines are asleep - deadlock!
這樣的錯誤,如下代碼片段是這種典型:
func deadlock() { ch := make(chan int) ch <- 2 x := <-ch log.Println(x) }
2. 兩個goroutine中使用無緩衝的channel,則讀寫互爲阻塞,即雙方代碼的執行都會阻塞在 <-ch 和 ch <- 處,只到雙方讀寫完成在 ch 中的傳遞,各自繼續向下執行,此處借用CSP 圖例說明:
goroutine 在無緩衝 channel 上交互的代碼:
func nolock() { ch := make(chan int) go func() { ch <- 2 log.Println("after write") }() x := <-ch log.Println("after read:", x) }
有緩衝的 channel
在 make 時傳遞第二參 capacity,即爲有緩衝的 channel:
ch := make(chan int, 1)
這樣的 channel 不管是否在同一 goroutine 中,都可讀寫而不致死鎖,看看以下片段,你猜它會輸出什麼:
ch := make(chan int, 1) for i := 0; i < 10; i++ { select { case x := <-ch: fmt.Println(x) case ch <- i: } }
舉個粟子
網上看來的求素數的例子:使用若干個 goroutine (根據求解範圍 N 而定)作素數的篩法,即
從2開始每找到一個素數就標記全部能被該素數整除的全部數。直到沒有可標記的數,剩下的就都是素數。下面以找出10之內全部素數爲例,借用 CSP 方式解決這個問題。
代碼以下:
package main import "fmt" func Processor(seq <-chan int, wait chan struct{}, level int) { go func() { prime, ok := <-seq if !ok { close(wait) return } fmt.Printf("[%d]: %d\n", level, prime) out := make(chan int) Processor(out, wait, level+1) for num := range seq { if num%prime != 0 { out <- num } } close(out) }() } func main() { origin, wait := make(chan int), make(chan struct{}) Processor(origin, wait, 1) for num := 2; num < 10; num++ { origin <- num } close(origin) <-wait }
FAQ
v, ok :=
,要麼使用 v := range ch
形式接收