https://golang.org/doc/effective_go.html#concurrency
https://talks.golang.org/2012/concurrency.slide#34
https://speakerdeck.com/kavya719/understanding-channelshtml
Do not communicate by sharing memory; instead, share memory by communicating.golang
什麼是Goroutine?shell
A goroutine has a simple model: it is a function executing concurrently with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required.數據結構
Goroutine被分配到OS的多個線程中。因此一個Goroutine 阻塞了,其餘的任何繼續運行。Goroutine的抽象隱藏了線程建立和管理的複雜性。閉包
在一個函數和方法調用的前面加上go 關鍵字,就是在一個新的goroutine中調用這個函數或方法。當調用完成的時候,那麼goroutine將會安靜退出。就像在Unixshell中使用&ide
go list.Sort() // run list.Sort concurrently; don't wait for it.
函數字面值一樣能夠在goroutine中直接調用函數
func Announce(message string, delay time.Duration) { go func() { time.Sleep(delay) fmt.Println(message) }() // Note the parentheses - must call the function. }
在Go中,函數字面值是閉包。上面的例子沒有價值, 由於沒有任何方式來通知goroutine已經完成。ui
和map同樣,channels 也是使用make來分配的,make的結果就是對一個底層數據結構的引用。若是額外的參數提供了,那麼就是提供給緩衝區的大小。默認大小爲0,給一個沒有buffer或同步的bufferspa
ci := make(chan int) // unbuffered channel of integers cj := make(chan int, 0) // unbuffered channel of integers cs := make(chan *os.File, 100) // buffered channel of pointers to Files
沒有緩衝的channel將同步與通訊結合在一塊兒了。無緩衝區的channel主要用來同步,同時也能夠用來簡單的傳輸數據。對channel有不少的慣用法,這裏給出一個。線程
c := make(chan int) // Start the sort in a goroutine; when it completes, signal on the channel. go func() { list.Sort() c <- 1 // Send a signal, value 不重要 } doSomethingForAWhile() <-c // Wait for sort to finish; discard sent value.
channel的接收這將會永遠阻塞 ,直到channel中有數據過來。若是channel是unbuffered,那麼發送數據一端將會阻塞,直到接收者可以接收。若是channel有buffer,那麼發送數據的一端只會阻塞到數據拷貝到buffer中(這個極短的時間會阻塞)。若是buffer已經滿了,那麼一直阻塞到接受者可以接收一個數據。
一個帶緩衝的channel能夠用做信號量,用來限制流量,在下面的例子中,到來的請求被傳遞給handle方法,這個方法傳遞一個數據到channel,處理這個請求,最後從這個管道中接受這個數據。channel 緩衝區的容量限制了同時調用process的數量。
var sem = make(chan int, MaxOutstanding) func handle(r *Request) { sem <- 1 // Wait for active queue to drain. process(r) // May take a long time. <-sem // Done; enable next request to run. } func Serve(queue chan *Request) { for { req := <-queue go handle(req) // Don't wait for handle to finish. } }
一旦MaxOutstanding的處理process,那麼更多的數據向channel插入,就會阻塞。直到有process處理完畢,從channel中釋放一個value。這樣的設計仍然會有問題,Sever對於每個request都會建立一個goroutine,即便是隻能有MaxOutstanding個goroutine來處理。結果是,這個程序在短期,若是有大量的請求進來,那麼將有大量資源將會被消費。咱們經過給goroutine建立閘門來限制goroutine的建立。
func Serve(queue chan *Request) { for req := range queue { sem <- 1 go func() { process(req) // Buggy; see explanation below. <-sem }() } }
這裏的bug,對於for循環中req, req是對在每次迭代中從新利用的,因此req對每一個goroutine都會共享,這不是咱們想要的,咱們想要的是,對於每個goroutine,req都是獨一無二的。解決辦法能夠以下:經過將req的值做爲形參來傳遞。
func Serve(queue chan *Request) { for req := range queue { sem <- 1 go func(req *Request) { process(req) <-sem }(req) } }
上面的代碼等價於:
func Serve(queue chan *Request) { for req := range queue { req := req // Create new instance of req for the goroutine. sem <- 1 go func() { process(req) <-sem }() } }
對於goroutine,一般咱們但願啓動固定數量的handle goroutine。
func handle(queue chan *Request) { for r := range queue { process(r) } } func Serve(clientRequests chan *Request, quit chan bool) { // Start handlers for i := 0; i < MaxOutstanding; i++ { go handle(clientRequests) } <-quit // Wait to be told to exit. }
package main import "fmt" func main(){ go func() { fmt.Println("hello world") }() }
若是運行上面程序,結果是什麼都沒有,程序就結束了,緣由是main goroutine退出了。
package main import "fmt" import "time" import "math/rand" func main() { res := make(chan int) go func() { joe := boring("Joe") ann := boring("Ann") for i := 0; i < 5; i++ { fmt.Println(<-joe) fmt.Println(<-ann) } fmt.Println("You're both boring; I'm leaving.") res <- 1 }() <- res // 即便在main goroutine中等待很長時間,這個程序也沒有錯 } func boring(msg string) <-chan string { // Returns receive-only channel of strings. c := make(chan string) go func() { // We launch the goroutine from inside the function. for i := 0; ; i++ { // 一直在不停的給channel 寫數據。 c <- fmt.Sprintf("%s %d", msg, i) time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond) } }() return c // Return the channel to the caller. }
這個例子能夠見上面的代碼,其返回了一個channel做爲數據的生成器
當一個goroutine中,你要讀或寫多個channel,golang怎麼處理呢?使用select語句,其像switch語句,可是每個case都是用來通訊的。特色以下:
select { case v1 := <-c1: fmt.Printf("received %v from c1\n", v1) case v2 := <-c2: fmt.Printf("received %v from c2\n", v1) case c3 <- 23: fmt.Printf("sent %v to c3\n", 23) default: fmt.Printf("no one was ready to communicate\n") }
注意這裏select的語句只能執行一次,沒有一直執行的意思。
所謂生成器,就是數據不停的生成,這裏利用的channel是第一類值:
package main import "fmt" import time func boring(msg string) <- chan string { c := make(chan string) go func() { // We launch the goroutine from inside the function. for i := 0; ; i++ { c <- fmt.Sprintf("%s %d", msg, i) time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond) } }() return c // Return the channel to the caller. } func main() { c := boring("boring!") // Function returning a channel. for i := 0; i < 5; i++ { fmt.Printf("You say: %q\n", <-c) } fmt.Println("You're boring; I'm leaving.") }
利用了channel做爲返回值,不停的讀其中的數據,注意到goroutine能夠在boring函數調用存在,其調用後,仍然存在。
所謂Fan-In,至關於多路選擇器,有多個輸入channel,選擇其中的一個channel的輸入,做爲輸出,具體的模式爲
這裏的代碼利用select對channel的語義:
func fanIn(input1, input2 <-chan string) <-chan string { c := make(chan string) go func() { for { select { case s := <-input1: c <- s case s := <-input2: c <- s } } }() return c }
func main() { ch := make(chain Task, 3) for i := 0 ; i < numWorkers; i++ { go workder(ch) } // Send tasks to workers helloTasks := getTasks() for _, task := range helloTasks { taskCh <- task; } } func worker(ch) { for { task := <-taskCh process(task) } }