目錄html
你們好!「深度解密 Go 語言」系列很久未見,咱們今天講 channel,預祝閱讀愉快!在開始正文以前,咱們先說些題外話。git
上一篇關於 Go 語言的文章講 Go 程序的整個編碼、編譯、運行、退出的全過程。文章發出後,反響強烈,在各大平臺的閱讀量都不錯。例如博客園登上了 48 小時閱讀排行榜,而且受到了編輯推薦,佔據首頁頭條位置整整一天;在開發者頭條首頁精選的位置霸榜一週時間……程序員
熟悉碼農桃花源的朋友們都知道,這裏每篇文章都很長,要花很長時間才能讀完。但長並非目的,把每一個問題都講深、講透纔是最重要的。首先我本身得徹底理解才行,因此寫每篇文章時我都會看不少參考資料,看源碼,請教大牛,本身還要去寫樣例代碼跑結果……從建立文稿到真正完成寫做須要很長時間。github
作這些事情,無非是想力求我寫出來的文字,都是我目前所能理解的最深層次。若是我暫時理解不了,我會說出來,或者不寫進文章裏面去,留到之後有能力的時候再來寫。golang
我本身平時有這種體會:看微信公衆號的文章都是想快速地看完,快速地拉到最後,目的快點開始看下一篇,新鮮感才能不斷刺激大腦。有時候碰到長文很花時間,可能就沒耐心看下去了,裏面說的東西也以爲很難理解,可能直接就放棄了。可是,若是我知道一篇文章價值很高,就會選一個精力比較充沛的時間段,花整塊時間看完,這時候反倒很容易看進去。這種狀況下,潛意識裏就會知道我今天是必定要讀完這篇文章的,而且要把裏面有價值的東西都吸取進來。shell
因此,對於碼農桃花源的文章,我建議你收藏以後,找個空閒時間再好好看。編程
上週,我把 GitHub 項目 Go-Question 的內容整合成了開源電子書,閱讀體驗提高 N 倍,建議關注項目,如今已經 400 star 了,年末目標是 1k star。項目地址列在了參考資料裏。segmentfault
另外,公衆號的文章也能夠使用微信讀書看,體驗也很是贊,而且能夠放到書架上,每一個公衆號就是一本書,簡直酷炫。數組
閒話最後,一直「吹」了好久的曹大,新書《Go 語言高級編程》出版了!書的另外一位做者是柴樹杉老師,這是給 Go 語言提交 pull 的人,他在 Go 語言上面的研究不用我多說了吧。我第一時間下了單,而且到曹大工位要了簽名。安全
這本書的推薦人有不少大佬,像許世偉,郝林,雨痕等,評價很是高。重點給你們看下雨痕老師對這本書的評價(上圖第二排左側圖):
本書闡明瞭官方文檔某些語焉不詳的部分,有助於 Gopher 瞭解更多內在實現,以及平常工做中須要用到的 RPC、Web、分佈式應用等內容。我認識本書做者之一曹春暉,對他的學習態度和能力頗爲欽佩,所以推薦你們閱讀本書。
你們可能不知道,出書一點都不賺錢,但投入的精力卻很大。可是像曹大在給讀者的書籤名時所說的:書籍是時代的生命。多少知識都是經過書本一代代傳承!
搬過幾回家就知道,紙質書太多,過程會比較痛苦。因此,我如今買紙書都會考慮再三。可是,此次我仍是在第一時間下單了《Go 語言高級編程》。我也強烈推薦你買一本,支持原創者。
柴老師在武漢,我接觸很少。但和曹大倒是常常能見面(在同一個公司工做)。他本人常常活躍在各類微信羣,社區,也很是樂於解答各類疑難雜症。上週還和曹大一塊兒吃了個飯,請教了不少問題,我總結了一些對家都有用的東西,放在個人朋友圈:
若是你想圍觀個人朋友圈,想和我交流,能夠長按下面的二維碼加我好友,備註下來自公衆號。
好了,下面開始咱們的正文。
你們都知道著名的摩爾定律。1965 年,時任仙童公司的 Gordon Moore 發表文章,預測在將來十年,半導體芯片上的晶體管和電阻數量將每一年增長一倍;1975 年,Moore 再次發表論文,將「每一年」修改成「每兩年」。這個預測在 2012 年左右基本是正確的。
但隨着晶體管電路逐漸接近性能極限,摩爾定律終將走到盡頭。靠增長晶體管數量來提升計算機的性能不靈了。因而,人們開始轉換思路,用其餘方法來提高計算機的性能,這就是多核計算機產生的緣由。
這一招看起來還不錯,可是人們又遇到了一個另外一個定律的限制,那就是 Amdahl's Law,它提出了一個模型用來衡量在並行模式下程序運行效率的提高。這個定律是說,一個程序能從並行上得到性能提高的上限取決於有多少代碼必須寫成串行的。
舉個例子,對於一個和用戶打交道的界面程序,它必須和用戶打交道。用戶點一個按鈕,而後才能繼續運行下一步,這必須是串行執行的。這種程序的運行效率就取決於和用戶交互的速度,你有多少核都白瞎。用戶就是不按下一步,你怎麼辦?
2000 年左右雲計算興起,人們能夠方便地獲取計算雲上的資源,方便地水平擴展本身的服務,能夠垂手可得地就調動多臺機器資源甚至將計算任務分發到分佈在全球範圍的機器。可是也所以帶來了不少問題和挑戰。例如怎樣在機器間進行通訊、聚合結果等。最難的一個挑戰是如何找到一個模型能用來描述 concurrent。
咱們都知道,要想一段併發的代碼沒有任何 bug,是很是困難的。有些併發 bug 是在系統上線數年後才發現的,緣由經常是很詭異的,好比用戶數增長到了某個界限。
併發問題通常有下面這幾種:
數據競爭。簡單來講就是兩個或多個線程同時讀寫某個變量,形成了預料以外的結果。
原子性。在一個定義好的上下文裏,原子性操做不可分割。上下文的定義很是重要。有些代碼,你在程序裏看起來是原子的,如最簡單的 i++,但在機器層面看來,這條語句一般須要幾條指令來完成(Load,Incr,Store),不是不可分割的,也就不是原子性的。原子性可讓咱們放心地構造併發安全的程序。
內存訪問同步。代碼中須要控制同時只有一個線程訪問的區域稱爲臨界區。Go 語言中通常使用 sync 包裏的 Mutex 來完成同步訪問控制。鎖通常會帶來比較大的性能開銷,所以通常要考慮加鎖的區域是否會頻繁進入、鎖的粒度如何控制等問題。
死鎖。在一個死鎖的程序裏,每一個線程都在等待其餘線程,造成了一個首尾相連的尷尬局面,程序沒法繼續運行下去。
活鎖。想象一下,你走在一條小路上,一我的迎面走來。你往左邊走,想避開他;他作了相反的事情,他往右邊走,結果兩個都過不了。以後,兩我的又都想從原來本身相反的方向走,仍是一樣的結果。這就是活鎖,看起來都像在工做,但工做進度就是沒法前進。
飢餓。併發的線程不能獲取它所須要的資源以進行下一步的工做。一般是有一個很是貪婪的線程,長時間佔據資源不釋放,致使其餘線程沒法得到資源。
關於併發和並行的區別,引用一個經典的描述:
併發是同一時間應對(dealing with)多件事情的能力。
並行是同一時間動手(doing)作多件事情的能力。
雨痕老師《Go 語言學習筆記》上的解釋:
併發是指邏輯上具有同時處理多個任務的能力;並行則是物理上同時執行多個任務。
而根據《Concurrency in Go》這本書,計算機的概念都是抽象的結果,併發和並行也不例外。它這樣描述併發和並行的區別:
Concurrency is a property of the code; parallelism is a property of the running program.
併發是代碼的特性,並行是正在運行的程序的特性。先忽略我拙劣的翻譯。很新奇,不是嗎?我也是第一次見到這樣的說法,細想一下,仍是頗有道理的。
咱們一直說寫的代碼是併發的或者是並行的,可是咱們能提供什麼保證嗎?若是在只有一個核的機器上跑並行的代碼,它還能並行嗎?你就是再天才,也沒法寫出並行的程序。充其量也就是代碼上看起來「併發」的,如此而已。
固然,表面上看起來仍是並行的,但那不過 CPU 的障眼法,多個線程在分時共享 CPU 的資源,在一個粗糙的時間隔裏看起來就是「並行」。
因此,咱們實際上只能編寫「併發」的代碼,而不能編寫「並行」的代碼,並且只是但願併發的代碼可以並行地執行。併發的代碼可否並行,取決於抽象的層級:代碼裏的併發原語、runtime,操做系統(虛擬機、容器)。層級愈來愈底層,要求也愈來愈高。所以,咱們談併發或並行實際上要指定上下文,也就是抽象的層級。
《Concurrency in Go》書裏舉了一個例子:假如兩我的同時打開電腦上的計算器程序,這兩個程序確定不會影響彼此,這就是並行。在這個例子中,上下文就是兩我的的機器,而兩個計算器進程就是並行的元素。
隨着抽象層次的下降,併發模型實際上變得更難也更重要,而越低層次的併發模型對咱們也越重要。要想併發程序正確地執行,就要深刻研究併發模型。
在 Go 語言發佈前,咱們寫併發代碼時,考慮到的最底層抽象是:系統線程。Go 發佈以後,在這條抽象鏈上,又加一個 goroutine。並且 Go 從著名的計算機科學家 Tony Hoare 那借來一個概念:channel。Tony Hoare 就是那篇著名文章《Communicating Sequential Processes》的做者。
看起來事情變得更加複雜,由於 Go 又引入了一個更底層的抽象,但事實並非這樣。由於 goroutine 並非看起來的那樣又抽象了一層,它實際上是替代了系統線程。Gopher 在寫代碼的時候,並不會去關心繫統線程,大部分時候只須要考慮到 goroutine 和 channel。固然有時候會用到一些共享內存的概念,通常就是指 sync 包裏的東西,好比 sync.Mutex。
CSP 常常被認爲是 Go 在併發編程上成功的關鍵因素。CSP 全稱是 「Communicating Sequential Processes」,這也是 Tony Hoare 在 1978 年發表在 ACM 的一篇論文。論文裏指出一門編程語言應該重視 input 和 output 的原語,尤爲是併發編程的代碼。
在那篇文章發表的時代,人們正在研究模塊化編程的思想,該不應用 goto 語句在當時是最激烈的議題。彼時,面向對象編程的思想正在崛起,幾乎沒什麼人關心併發編程。
在文章中,CSP 也是一門自定義的編程語言,做者定義了輸入輸出語句,用於 processes 間的通訊(communicatiton)。processes 被認爲是須要輸入驅動,而且產生輸出,供其餘 processes 消費,processes 能夠是進程、線程、甚至是代碼塊。輸入命令是:!,用來向 processes 寫入;輸出是:?,用來從 processes 讀出。這篇文章要講的 channel 正是借鑑了這一設計。
Hoare 還提出了一個 -> 命令,若是 -> 左邊的語句返回 false,那它右邊的語句就不會執行。
經過這些輸入輸出命令,Hoare 證實了若是一門編程語言中把 processes 間的通訊看得第一等重要,那麼併發編程的問題就會變得簡單。
Go 是第一個將 CSP 的這些思想引入,而且發揚光大的語言。僅管內存同步訪問控制(原文是 memory access synchronization)在某些狀況下大有用處,Go 裏也有相應的 sync 包支持,可是這在大型程序很容易出錯。
Go 一開始就把 CSP 的思想融入到語言的核內心,因此併發編程成爲 Go 的一個獨特的優點,並且很容易理解。
大多數的編程語言的併發編程模型是基於線程和內存同步訪問控制,Go 的併發編程的模型則用 goroutine 和 channel 來替代。Goroutine 和線程相似,channel 和 mutex (用於內存同步訪問控制)相似。
Goroutine 解放了程序員,讓咱們更能貼近業務去思考問題。而不用考慮各類像線程庫、線程開銷、線程調度等等這些繁瑣的底層問題,goroutine 天生替你解決好了。
Channel 則天生就能夠和其餘 channel 組合。咱們能夠把收集各類子系統結果的 channel 輸入到同一個 channel。Channel 還能夠和 select, cancel, timeout 結合起來。而 mutex 就沒有這些功能。
Go 的併發原則很是優秀,目標就是簡單:儘可能使用 channel;把 goroutine 看成免費的資源,隨便用。
說明一下,前面這兩部分的內容來自英文開源書《Concurrency In Go》,強烈推薦閱讀。
引入結束,咱們正式開始今天的主角:channel。
Goroutine 和 channel 是 Go 語言併發編程的 兩大基石。Goroutine 用於執行併發任務,channel 用於 goroutine 之間的同步、通訊。
Channel 在 gouroutine 間架起了一條管道,在管道里傳輸數據,實現 gouroutine 間的通訊;因爲它是線程安全的,因此用起來很是方便;channel 還提供「先進先出」的特性;它還能影響 goroutine 的阻塞和喚醒。
相信你們必定見過一句話:
Do not communicate by sharing memory; instead, share memory by communicating.
不要經過共享內存來通訊,而要經過通訊來實現內存共享。
這就是 Go 的併發哲學,它依賴 CSP 模型,基於 channel 實現。
簡直是一頭霧水,這兩句話難道不是同一個意思?
經過前面兩節的內容,我我的這樣理解這句話:前面半句說的是經過 sync 包裏的一些組件進行併發編程;然後面半句則是說 Go 推薦使用 channel 進行併發編程。二者其實都是必要且有效的。實際上看完本文後面對 channel 的源碼分析,你會發現,channel 的底層就是經過 mutex 來控制併發的。只是 channel 是更高一層次的併發編程原語,封裝了更多的功能。
關因而選擇 sync 包裏的底層併發編程原語仍是 channel,《Concurrency In Go》這本書的第 2 章 「Go's Philosophy on Concurrency」 裏有一張決策樹和詳細的論述,再次推薦你去閱讀。我把圖貼出來:
Channel 是 Go 語言中一個很是重要的類型,是 Go 裏的第一對象。經過 channel,Go 實現了經過通訊來實現內存共享。Channel 是在多個 goroutine 之間傳遞數據和同步的重要手段。
使用原子函數、讀寫鎖能夠保證資源的共享訪問安全,但使用 channel 更優雅。
channel 字面意義是「通道」,相似於 Linux 中的管道。聲明 channel 的語法以下:
chan T // 聲明一個雙向通道 chan<- T // 聲明一個只能用於發送的通道 <-chan T // 聲明一個只能用於接收的通道
單向通道的聲明,用 <-
來表示,它指明通道的方向。你只要明白,代碼的書寫順序是從左到右就立刻能掌握通道的方向是怎樣的。
由於 channel 是一個引用類型,因此在它被初始化以前,它的值是 nil,channel 使用 make 函數進行初始化。能夠向它傳遞一個 int 值,表明 channel 緩衝區的大小(容量),構造出來的是一個緩衝型的 channel;不傳或傳 0 的,構造的就是一個非緩衝型的 channel。
二者有一些差異:非緩衝型 channel 沒法緩衝元素,對它的操做必定順序是「發送-> 接收 -> 發送 -> 接收 -> ……」,若是想連續向一個非緩衝 chan 發送 2 個元素,而且沒有接收的話,第一次必定會被阻塞;對於緩衝型 channel 的操做,則要「寬鬆」一些,畢竟是帶了「緩衝」光環。
Go 經過 channel 實現 CSP 通訊模型,主要用於 goroutine 之間的消息傳遞和事件通知。
有了 channel 和 goroutine 以後,Go 的併發編程變得異常容易和安全,得以讓程序員把注意力留到業務上去,實現開發效率的提高。
要知道,技術並非最重要的,它只是實現業務的工具。一門高效的開發語言讓你把節省下來的時間,留着去作更有意義的事情,好比寫寫文章。
對 chan 的發送和接收操做都會在編譯期間轉換成爲底層的發送接收函數。
Channel 分爲兩種:帶緩衝、不帶緩衝。對不帶緩衝的 channel 進行的操做實際上能夠看做「同步模式」,帶緩衝的則稱爲「異步模式」。
同步模式下,發送方和接收方要同步就緒,只有在二者都 ready 的狀況下,數據才能在二者間傳輸(後面會看到,實際上就是內存拷貝)。不然,任意一方先行進行發送或接收操做,都會被掛起,等待另外一方的出現才能被喚醒。
異步模式下,在緩衝槽可用的狀況下(有剩餘容量),發送和接收操做均可以順利進行。不然,操做的一方(如寫入)一樣會被掛起,直到出現相反操做(如接收)纔會被喚醒。
小結一下:同步模式下,必需要使發送方和接收方配對,操做纔會成功,不然會被阻塞;異步模式下,緩衝槽要有剩餘容量,操做纔會成功,不然也會被阻塞。
直接上源碼(版本是 1.9.2):
type hchan struct { // chan 裏元素數量 qcount uint // chan 底層循環數組的長度 dataqsiz uint // 指向底層循環數組的指針 // 只針對有緩衝的 channel buf unsafe.Pointer // chan 中元素大小 elemsize uint16 // chan 是否被關閉的標誌 closed uint32 // chan 中元素類型 elemtype *_type // element type // 已發送元素在循環數組中的索引 sendx uint // send index // 已接收元素在循環數組中的索引 recvx uint // receive index // 等待接收的 goroutine 隊列 recvq waitq // list of recv waiters // 等待發送的 goroutine 隊列 sendq waitq // list of send waiters // 保護 hchan 中全部字段 lock mutex }
關於字段的含義都寫在註釋裏了,再來重點說幾個字段:
buf
指向底層循環數組,只有緩衝型的 channel 纔有。
sendx
,recvx
均指向底層循環數組,表示當前能夠發送和接收的元素位置索引值(相對於底層數組)。
sendq
,recvq
分別表示被阻塞的 goroutine,這些 goroutine 因爲嘗試讀取 channel 或向 channel 發送數據而被阻塞。
waitq
是 sudog
的一個雙向鏈表,而 sudog
其實是對 goroutine 的一個封裝:
type waitq struct { first *sudog last *sudog }
lock
用來保證每一個讀 channel 或寫 channel 的操做都是原子的。
例如,建立一個容量爲 6 的,元素爲 int 型的 channel 數據結構以下 :
咱們知道,通道有兩個方向,發送和接收。理論上來講,咱們能夠建立一個只發送或只接收的通道,可是這種通道建立出來後,怎麼使用呢?一個只能發的通道,怎麼接收呢?一樣,一個只能收的通道,如何向其發送數據呢?
通常而言,使用 make
建立一個能收能發的通道:
// 無緩衝通道 ch1 := make(chan int) // 有緩衝通道 ch2 := make(chan int, 10)
經過彙編分析,咱們知道,最終建立 chan 的函數是 makechan
:
func makechan(t *chantype, size int64) *hchan
從函數原型來看,建立的 chan 是一個指針。因此咱們能在函數間直接傳遞 channel,而不用傳遞 channel 的指針。
具體來看下代碼:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) func makechan(t *chantype, size int64) *hchan { elem := t.elem // 省略了檢查 channel size,align 的代碼 // …… var c *hchan // 若是元素類型不含指針 或者 size 大小爲 0(無緩衝類型) // 只進行一次內存分配 if elem.kind&kindNoPointers != 0 || size == 0 { // 若是 hchan 結構體中不含指針,GC 就不會掃描 chan 中的元素 // 只分配 "hchan 結構體大小 + 元素大小*個數" 的內存 c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) // 若是是緩衝型 channel 且元素大小不等於 0(大小等於 0的元素類型:struct{}) if size > 0 && elem.size != 0 { c.buf = add(unsafe.Pointer(c), hchanSize) } else { // race detector uses this location for synchronization // Also prevents us from pointing beyond the allocation (see issue 9401). // 1. 非緩衝型的,buf 沒用,直接指向 chan 起始地址處 // 2. 緩衝型的,能進入到這裏,說明元素無指針且元素類型爲 struct{},也無影響 // 由於只會用到接收和發送遊標,不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內容) c.buf = unsafe.Pointer(c) } } else { // 進行兩次內存分配操做 c = new(hchan) c.buf = newarray(elem, int(size)) } c.elemsize = uint16(elem.size) c.elemtype = elem // 循環數組長度 c.dataqsiz = uint(size) // 返回 hchan 指針 return c }
新建一個 chan 後,內存在堆上分配,大概長這樣:
說明一下,這張圖來源於 Gopher Con 上的一份 PPT,地址見參考資料。這份材料很是清晰易懂,推薦你去讀。
接下來,咱們用一個來自參考資料【深刻 channel 底層】的例子來理解建立、發送、接收的整個過程。
func goroutineA(a <-chan int) { val := <- a fmt.Println("G1 received data: ", val) return } func goroutineB(b <-chan int) { val := <- b fmt.Println("G2 received data: ", val) return } func main() { ch := make(chan int) go goroutineA(ch) go goroutineB(ch) ch <- 3 time.Sleep(time.Second) }
首先建立了一個無緩衝的 channel,接着啓動兩個 goroutine,並將前面建立的 channel 傳遞進去。而後,向這個 channel 中發送數據 3,最後 sleep 1 秒後程序退出。
程序第 14 行建立了一個非緩衝型的 channel,咱們只看 chan 結構體中的一些重要字段,來從總體層面看一下 chan 的狀態,一開始什麼都沒有:
在繼續分析前面小節的例子前,咱們先來看一下接收相關的源碼。在清楚了接收的具體過程以後,也就能輕鬆理解具體的例子了。
接收操做有兩種寫法,一種帶 "ok",反應 channel 是否關閉;一種不帶 "ok",這種寫法,當接收到相應類型的零值時沒法知道是真實的發送者發送過來的值,仍是 channel 被關閉後,返回給接收者的默認類型的零值。兩種寫法,都有各自的應用場景。
通過編譯器的處理後,這兩種寫法最後對應源碼裏的這兩個函數:
// entry points for <- c from compiled code func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
chanrecv1
函數處理不帶 "ok" 的情形,chanrecv2
則經過返回 "received" 這個字段來反應 channel 是否被關閉。接收值則比較特殊,會「放到」參數 elem
所指向的地址了,這很像 C/C++ 裏的寫法。若是代碼裏忽略了接收值,這裏的 elem 爲 nil。
不管如何,最終轉向了 chanrecv
函數:
// 位於 src/runtime/chan.go // chanrecv 函數接收 channel c 的元素並將其寫入 ep 所指向的內存地址。 // 若是 ep 是 nil,說明忽略了接收值。 // 若是 block == false,即非阻塞型接收,在沒有數據可接收的狀況下,返回 (false, false) // 不然,若是 c 處於關閉狀態,將 ep 指向的地址清零,返回 (true, false) // 不然,用返回值填充 ep 指向的內存地址。返回 (true, true) // 若是 ep 非空,則應該指向堆或者函數調用者的棧 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 省略 debug 內容 ………… // 若是是一個 nil 的 channel if c == nil { // 若是不阻塞,直接返回 (false, false) if !block { return } // 不然,接收一個 nil 的 channel,goroutine 掛起 gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2) // 不會執行到這裏 throw("unreachable") } // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 // 當咱們觀察到 channel 沒準備好接收: // 1. 非緩衝型,等待發送列隊 sendq 裏沒有 goroutine 在等待 // 2. 緩衝型,但 buf 裏沒有元素 // 以後,又觀察到 closed == 0,即 channel 未關閉。 // 由於 channel 不可能被重複打開,因此前一個觀測的時候 channel 也是未關閉的, // 所以在這種狀況下能夠直接宣佈接收失敗,返回 (false, false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 加鎖 lock(&c.lock) // channel 已關閉,而且循環數組 buf 裏沒有元素 // 這裏能夠處理非緩衝型關閉 和 緩衝型關閉但 buf 無元素的狀況 // 也就是說即便是關閉狀態,但在緩衝型的 channel, // buf 裏有元素的狀況下還能接收到元素 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(unsafe.Pointer(c)) } // 解鎖 unlock(&c.lock) if ep != nil { // 從一個已關閉的 channel 執行接收操做,且未忽略返回值 // 那麼接收的值將是一個該類型的零值 // typedmemclr 根據類型清理相應地址的內存 typedmemclr(c.elemtype, ep) } // 從一個已關閉的 channel 接收,selected 會返回true return true, false } // 等待發送隊列裏有 goroutine 存在,說明 buf 是滿的 // 這有多是: // 1. 非緩衝型的 channel // 2. 緩衝型的 channel,但 buf 滿了 // 針對 1,直接進行內存拷貝(從 sender goroutine -> receiver goroutine) // 針對 2,接收到循環數組頭部的元素,並將發送者的元素放到循環數組尾部 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 緩衝型,buf 裏有元素,能夠正常接收 if c.qcount > 0 { // 直接從循環數組裏找到要接收的元素 qp := chanbuf(c, c.recvx) // ………… // 代碼裏,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清理掉循環數組裏相應位置的值 typedmemclr(c.elemtype, qp) // 接收遊標向前移動 c.recvx++ // 接收遊標歸零 if c.recvx == c.dataqsiz { c.recvx = 0 } // buf 數組裏的元素個數減 1 c.qcount-- // 解鎖 unlock(&c.lock) return true, true } if !block { // 非阻塞接收,解鎖。selected 返回 false,由於沒有接收到值 unlock(&c.lock) return false, false } // 接下來就是要被阻塞的狀況了 // 構造一個 sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 待接收數據的地址保存下來 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.selectdone = nil mysg.c = c gp.param = nil // 進入channel 的等待接收隊列 c.recvq.enqueue(mysg) // 將當前 goroutine 掛起 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) // 被喚醒了,接着從這裏繼續執行一些掃尾工做 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
上面的代碼註釋地比較詳細了,你能夠對着源碼一行行地去看,咱們再來詳細看一下。
若是 channel 是一個空值(nil),在非阻塞模式下,會直接返回。在阻塞模式下,會調用 gopark 函數掛起 goroutine,這個會一直阻塞下去。由於在 channel 是 nil 的狀況下,要想不阻塞,只有關閉它,但關閉一個 nil 的 channel 又會發生 panic,因此沒有機會被喚醒了。更詳細地能夠在 closechan 函數的時候再看。
和發送函數同樣,接下來搞了一個在非阻塞模式下,不用獲取鎖,快速檢測到失敗而且返回的操做。順帶插一句,咱們平時在寫代碼的時候,找到一些邊界條件,快速返回,能讓代碼邏輯更清晰,由於接下來的正常狀況就比較少,更聚焦了,看代碼的人也更能專一地看核心代碼邏輯了。
// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 (false, false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return }
當咱們觀察到 channel 沒準備好接收:
以後,又觀察到 closed == 0,即 channel 未關閉。
由於 channel 不可能被重複打開,因此前一個觀測的時候, channel 也是未關閉的,所以在這種狀況下能夠直接宣佈接收失敗,快速返回。由於沒被選中,也沒接收到數據,因此返回值爲 (false, false)。
接下來的操做,首先會上一把鎖,粒度比較大。若是 channel 已關閉,而且循環數組 buf 裏沒有元素。對應非緩衝型關閉和緩衝型關閉但 buf 無元素的狀況,返回對應類型的零值,但 received 標識是 false,告訴調用者此 channel 已關閉,你取出來的值並非正常由發送者發送過來的數據。可是若是處於 select 語境下,這種狀況是被選中了的。不少將 channel 用做通知信號的場景就是命中了這裏。
接下來,若是有等待發送的隊列,說明 channel 已經滿了,要麼是非緩衝型的 channel,要麼是緩衝型的 channel,但 buf 滿了。這兩種狀況下均可以正常接收數據。
因而,調用 recv 函數:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 若是是非緩衝型的 channel if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } // 未忽略接收的數據 if ep != nil { // 直接拷貝數據,從 sender goroutine -> receiver goroutine recvDirect(c.elemtype, sg, ep) } } else { // 緩衝型的 channel,但 buf 已滿。 // 將循環數組 buf 隊首的元素拷貝到接收數據的地址 // 將發送者的數據入隊。實際上這時 revx 和 sendx 值相等 // 找到接收遊標 qp := chanbuf(c, c.recvx) // ………… // 將接收遊標處的數據拷貝給接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 將發送者數據拷貝到 buf typedmemmove(c.elemtype, qp, sg.elem) // 更新遊標值 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g // 解鎖 unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒發送的 goroutine。須要等到調度器的光臨 goready(gp, skip+1) }
若是是非緩衝型的,就直接從發送者的棧拷貝到接收者的棧。
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { // dst is on our stack or the heap, src is on another stack. src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
不然,就是緩衝型 channel,而 buf 又滿了的情形。說明發送遊標和接收遊標重合了,所以須要先找到接收遊標:
// chanbuf(c, i) is pointer to the i'th slot in the buffer. func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) }
將該處的元素拷貝到接收地址。而後將發送者待發送的數據拷貝到接收遊標處。這樣就完成了接收數據和發送數據的操做。接着,分別將發送遊標和接收遊標向前進一,若是發生「環繞」,再從 0 開始。
最後,取出 sudog 裏的 goroutine,調用 goready 將其狀態改爲 「runnable」,待發送者被喚醒,等待調度器的調度。
而後,若是 channel 的 buf 裏還有數據,說明能夠比較正常地接收。注意,這裏,即便是在 channel 已經關閉的狀況下,也是能夠走到這裏的。這一步比較簡單,正常地將 buf 裏接收遊標處的數據拷貝到接收數據的地址。
到了最後一步,走到這裏來的情形是要阻塞的。固然,若是 block 傳進來的值是 false,那就不阻塞,直接返回就行了。
先構造一個 sudog,接着就是保存各類值了。注意,這裏會將接收數據的地址存儲到了 elem
字段,當被喚醒時,接收到的數據就會保存到這個字段指向的地址。而後將 sudog 添加到 channel 的 recvq 隊列裏。調用 goparkunlock 函數將 goroutine 掛起。
接下來的代碼就是 goroutine 被喚醒後的各類收尾工做了。
咱們繼續以前的例子。前面說到第 14 行,建立了一個非緩衝型的 channel,接着,第 1五、16 行分別建立了一個 goroutine,各自執行了一個接收操做。經過前面的源碼分析,咱們知道,這兩個 goroutine (後面稱爲 G1 和 G2 好了)都會被阻塞在接收操做。G1 和 G2 會掛在 channel 的 recq 隊列中,造成一個雙向循環鏈表。
在程序的 17 行以前,chan 的總體數據結構以下:
buf
指向一個長度爲 0 的數組,qcount 爲 0,表示 channel 中沒有元素。重點關注 recvq
和 sendq
,它們是 waitq 結構體,而 waitq 實際上就是一個雙向鏈表,鏈表的元素是 sudog,裏面包含 g
字段,g
表示一個 goroutine,因此 sudog 能夠當作一個 goroutine。recvq 存儲那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則存儲那些嘗試寫入 channel,但被阻塞的 goroutine。
此時,咱們能夠看到,recvq 裏掛了兩個 goroutine,也就是前面啓動的 G1 和 G2。由於沒有 goroutine 接收,而 channel 又是無緩衝類型,因此 G1 和 G2 被阻塞。sendq 沒有被阻塞的 goroutine。
recvq
的數據結構以下。這裏直接引用文章中的一幅圖,用了三維元素,畫得很好:
再從總體上來看一下 chan 此時的狀態:
G1 和 G2 被掛起了,狀態是 WAITING
。關於 goroutine 調度器這塊不是今天的重點,固然後面確定會寫相關的文章。這裏先簡單說下,goroutine 是用戶態的協程,由 Go runtime 進行管理,做爲對比,內核線程由 OS 進行管理。Goroutine 更輕量,所以咱們能夠輕鬆建立數萬 goroutine。
一個內核線程能夠管理多個 goroutine,當其中一個 goroutine 阻塞時,內核線程能夠調度其餘的 goroutine 來運行,內核線程自己不會阻塞。這就是一般咱們說的 M:N
模型:
M:N
模型一般由三部分構成:M、P、G。M 是內核線程,負責運行 goroutine;P 是 context,保存 goroutine 運行所須要的上下文,它還維護了可運行(runnable)的 goroutine 列表;G 則是待運行的 goroutine。M 和 P 是 G 運行的基礎。
繼續回到例子。假設咱們只有一個 M,當 G1(go goroutineA(ch)
) 運行到 val := <- a
時,它由原本的 running 狀態變成了 waiting 狀態(調用了 gopark 以後的結果):
G1 脫離與 M 的關係,但調度器可不會讓 M 閒着,因此會接着調度另外一個 goroutine 來運行:
G2 也是一樣的遭遇。如今 G1 和 G2 都被掛起了,等待着一個 sender 往 channel 裏發送數據,才能獲得解救。
接着上面的例子,G1 和 G2 如今都在 recvq 隊列裏了。
ch <- 3
第 17 行向 channel 發送了一個元素 3。
發送操做最終轉化爲 chansend
函數,直接上源碼,一樣大部分都註釋了,能夠看懂主流程:
// 位於 src/runtime/chan.go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 若是 channel 是 nil if c == nil { // 不能阻塞,直接返回 false,表示未發送成功 if !block { return false } // 當前 goroutine 被掛起 gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2) throw("unreachable") } // 省略 debug 相關…… // 對於不阻塞的 send,快速檢測失敗場景 // // 若是 channel 未關閉且 channel 沒有多餘的緩衝空間。這多是: // 1. channel 是非緩衝型的,且等待接收隊列裏沒有 goroutine // 2. channel 是緩衝型的,但循環數組已經裝滿了元素 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 鎖住 channel,併發安全 lock(&c.lock) // 若是 channel 關閉了 if c.closed != 0 { // 解鎖 unlock(&c.lock) // 直接 panic panic(plainError("send on closed channel")) } // 若是接收隊列裏有 goroutine,直接將要發送的數據拷貝到接收 goroutine if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 對於緩衝型的 channel,若是還有緩衝空間 if c.qcount < c.dataqsiz { // qp 指向 buf 的 sendx 位置 qp := chanbuf(c, c.sendx) // …… // 將數據從 ep 處拷貝到 qp typedmemmove(c.elemtype, qp, ep) // 發送遊標值加 1 c.sendx++ // 若是發送遊標值等於容量值,遊標值歸 0 if c.sendx == c.dataqsiz { c.sendx = 0 } // 緩衝區的元素數量加一 c.qcount++ // 解鎖 unlock(&c.lock) return true } // 若是不須要阻塞,則直接返回錯誤 if !block { unlock(&c.lock) return false } // channel 滿了,發送方會被阻塞。接下來會構造一個 sudog // 獲取當前 goroutine 的指針 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectdone = nil mysg.c = c gp.waiting = mysg gp.param = nil // 當前 goroutine 進入發送等待隊列 c.sendq.enqueue(mysg) // 當前 goroutine 被掛起 goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) // 從這裏開始被喚醒了(channel 有機會能夠發送了) if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } // 被喚醒後,channel 關閉了。坑爹啊,panic panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } // 去掉 mysg 上綁定的 channel mysg.c = nil releaseSudog(mysg) return true }
上面的代碼註釋地比較詳細了,咱們來詳細看看。
若是檢測到 channel 是空的,當前 goroutine 會被掛起。
對於不阻塞的發送操做,若是 channel 未關閉而且沒有多餘的緩衝空間(說明:a. channel 是非緩衝型的,且等待接收隊列裏沒有 goroutine;b. channel 是緩衝型的,但循環數組已經裝滿了元素)
對於這一點,runtime 源碼裏註釋了不少。這一條判斷語句是爲了在不阻塞發送的場景下快速檢測到發送失敗,好快速返回。
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false }
註釋裏主要講爲何這一塊能夠不加鎖,我詳細解釋一下。if
條件裏先讀了兩個變量:block 和 c.closed。block 是函數的參數,不會變;c.closed 可能被其餘 goroutine 改變,由於沒加鎖嘛,這是「與」條件前面兩個表達式。
最後一項,涉及到三個變量:c.dataqsiz,c.recvq.first,c.qcount。c.dataqsiz == 0 && c.recvq.first == nil
指的是非緩衝型的 channel,而且 recvq 裏沒有等待接收的 goroutine;c.dataqsiz > 0 && c.qcount == c.dataqsiz
指的是緩衝型的 channel,但循環數組已經滿了。這裏 c.dataqsiz
實際上也是不會被修改的,在建立的時候就已經肯定了。不加鎖真正影響地是 c.qcount
和 c.recvq.first
。
這一部分的條件就是兩個 word-sized read
,就是讀兩個 word 操做:c.closed
和 c.recvq.first
(非緩衝型) 或者 c.qcount
(緩衝型)。
當咱們發現 c.closed == 0
爲真,也就是 channel 未被關閉,再去檢測第三部分的條件時,觀測到 c.recvq.first == nil
或者 c.qcount == c.dataqsiz
時(這裏忽略 c.dataqsiz
),就判定要將此次發送操做做失敗處理,快速返回 false。
這裏涉及到兩個觀測項:channel 未關閉、channel not ready for sending。這兩項都會由於沒加鎖而出現觀測先後不一致的狀況。例如我先觀測到 channel 未被關閉,再觀察到 channel not ready for sending,這時我覺得能知足這個 if 條件了,可是若是這時 c.closed 變成 1,這時其實就不知足條件了,誰讓你不加鎖呢!
可是,由於一個 closed channel 不能將 channel 狀態從 'ready for sending' 變成 'not ready for sending',因此當我觀測到 'not ready for sending' 時,channel 不是 closed。即便 c.closed == 1
,即 channel 是在這兩個觀測中間被關閉的,那也說明在這兩個觀測中間,channel 知足兩個條件:not closed
和 not ready for sending
,這時,我直接返回 false 也是沒有問題的。
這部分解釋地比較繞,其實這樣作的目的就是少獲取一次鎖,提高性能。
若是檢測到 channel 已經關閉,直接 panic。
若是能從等待接收隊列 recvq 裏出隊一個 sudog(表明一個 goroutine),說明此時 channel 是空的,沒有元素,因此纔會有等待接收者。這時會調用 send 函數將元素直接從發送者的棧拷貝到接收者的棧,關鍵操做由 sendDirect
函數完成。
// send 函數處理向一個空的 channel 發送操做 // ep 指向被髮送的元素,會被直接拷貝到接收的 goroutine // 以後,接收的 goroutine 會被喚醒 // c 必須是空的(由於等待隊列裏有 goroutine,確定是空的) // c 必須被上鎖,發送操做執行完後,會使用 unlockf 函數解鎖 // sg 必須已經從等待隊列裏取出來了 // ep 必須是非空,而且它指向堆或調用者的棧 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 省略一些用不到的 // …… // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val if sg.elem != nil { // 直接拷貝內存(從發送者到接收者) sendDirect(c.elemtype, sg, ep) sg.elem = nil } // sudog 上綁定的 goroutine gp := sg.g // 解鎖 unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒接收的 goroutine. skip 和打印棧相關,暫時不理會 goready(gp, skip+1) }
繼續看 sendDirect
函數:
// 向一個非緩衝型的 channel 發送數據、從一個無元素的(非緩衝型或緩衝型但空)的 channel // 接收數據,都會致使一個 goroutine 直接操做另外一個 goroutine 的棧 // 因爲 GC 假設對棧的寫操做只能發生在 goroutine 正在運行中而且由當前 goroutine 來寫 // 因此這裏實際上違反了這個假設。可能會形成一些問題,因此須要用到寫屏障來規避 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src 在當前 goroutine 的棧上,dst 是另外一個 goroutine 的棧 // 直接進行內存"搬遷" // 若是目標地址的棧發生了棧收縮,當咱們讀出了 sg.elem 後 // 就不能修改真正的 dst 位置的值了 // 所以須要在讀和寫以前加上一個屏障 dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
這裏涉及到一個 goroutine 直接寫另外一個 goroutine 棧的操做,通常而言,不一樣 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設。爲了避免出問題,寫的過程當中增長了寫屏障,保證正確地完成寫操做。這樣作的好處是減小了一次內存 copy:不用先拷貝到 channel 的 buf,直接由發送者到接收者,沒有中間商賺差價,效率得以提升,完美。
而後,解鎖、喚醒接收者,等待調度器的光臨,接收者也得以重見天日,能夠繼續執行接收操做以後的代碼了。
c.qcount < c.dataqsiz
,說明緩衝區可用(確定是緩衝型的 channel)。先經過函數取出待發送元素應該去到的位置:qp := chanbuf(c, c.sendx) // 返回循環隊列裏第 i 個元素的地址處 func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) }
c.sendx
指向下一個待發送元素在循環數組中的位置,而後調用 typedmemmove
函數將其拷貝到循環數組中。以後 c.sendx
加 1,元素總量加 1 :c.qcount++
,最後,解鎖並返回。
若是沒有命中以上條件的,說明 channel 已經滿了。無論這個 channel 是緩衝型的仍是非緩衝型的,都要將這個 sender 「關起來」(goroutine 被阻塞)。若是 block 爲 false,直接解鎖,返回 false。
最後就是真的須要被阻塞的狀況。先構造一個 sudog,將其入隊(channel 的 sendq 字段)。而後調用 goparkunlock
將當前 goroutine 掛起,並解鎖,等待合適的時機再喚醒。
喚醒以後,從 goparkunlock
下一行代碼開始繼續往下執行。
這裏有一些綁定操做,sudog 經過 g 字段綁定 goroutine,而 goroutine 經過 waiting 綁定 sudog,sudog 還經過 elem
字段綁定待發送元素的地址,以及 c
字段綁定被「坑」在此處的 channel。
因此,待發送的元素地址實際上是存儲在 sudog 結構體裏,也就是當前 goroutine 裏。
好了,看完源碼。咱們接着來分析例子,相信你們已經把例子忘得差很少了,我再貼一下代碼:
func goroutineA(a <-chan int) { val := <- a fmt.Println("goroutine A received data: ", val) return } func goroutineB(b <-chan int) { val := <- b fmt.Println("goroutine B received data: ", val) return } func main() { ch := make(chan int) go goroutineA(ch) go goroutineB(ch) ch <- 3 time.Sleep(time.Second) ch1 := make(chan struct{}) }
在發送小節裏咱們說到 G1 和 G2 如今被掛起來了,等待 sender 的解救。在第 17 行,主協程向 ch 發送了一個元素 3,來看下接下來會發生什麼。
根據前面源碼分析的結果,咱們知道,sender 發現 ch 的 recvq 裏有 receiver 在等待着接收,就會出隊一個 sudog,把 recvq 裏 first 指針的 sudo 「推舉」出來了,並將其加入到 P 的可運行 goroutine 隊列中。
而後,sender 把發送元素拷貝到 sudog 的 elem 地址處,最後會調用 goready 將 G1 喚醒,狀態變爲 runnable。
當調度器光顧 G1 時,將 G1 變成 running 狀態,執行 goroutineA 接下來的代碼。G 表示其餘可能有的 goroutine。
這裏其實涉及到一個協程寫另外一個協程棧的操做。有兩個 receiver 在 channel 的一邊虎視眈眈地等着,這時 channel 另外一邊來了一個 sender 準備向 channel 發送數據,爲了高效,用不着經過 channel 的 buf 「中轉」一次,直接從源地址把數據 copy 到目的地址就能夠了,效率高啊!
上圖是一個示意圖,3
會被拷貝到 G1 棧上的某個位置,也就是 val 的地址處,保存在 elem 字段。
關閉某個 channel,會執行函數 closechan
:
func closechan(c *hchan) { // 關閉一個 nil channel,panic if c == nil { panic(plainError("close of nil channel")) } // 上鎖 lock(&c.lock) // 若是 channel 已經關閉 if c.closed != 0 { unlock(&c.lock) // panic panic(plainError("close of closed channel")) } // ………… // 修改關閉狀態 c.closed = 1 var glist *g // 將 channel 全部等待接收隊列的裏 sudog 釋放 for { // 從接收隊列裏出隊一個 sudog sg := c.recvq.dequeue() // 出隊完畢,跳出循環 if sg == nil { break } // 若是 elem 不爲空,說明此 receiver 未忽略接收數據 // 給它賦一個相應類型的零值 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } // 取出 goroutine gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, unsafe.Pointer(c)) } // 相連,造成鏈表 gp.schedlink.set(glist) glist = gp } // 將 channel 等待發送隊列裏的 sudog 釋放 // 若是存在,這些 goroutine 將會 panic for { // 從發送隊列裏出隊一個 sudog sg := c.sendq.dequeue() if sg == nil { break } // 發送者會 panic sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, unsafe.Pointer(c)) } // 造成鏈表 gp.schedlink.set(glist) glist = gp } // 解鎖 unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. // 遍歷鏈表 for glist != nil { // 取最後一個 gp := glist // 向前走一步,下一個喚醒的 g glist = glist.schedlink.ptr() gp.schedlink = 0 // 喚醒相應 goroutine goready(gp, 3) } }
close 邏輯比較簡單,對於一個 channel,recvq 和 sendq 中分別保存了阻塞的發送者和接收者。關閉 channel 後,對於等待接收者而言,會收到一個相應類型的零值。對於等待發送者,會直接 panic。因此,在不瞭解 channel 還有沒有接收者的狀況下,不能貿然關閉 channel。
close 函數先上一把大鎖,接着把全部掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 鏈表,再解鎖。最後,再將全部的 sudog 全都喚醒。
喚醒以後,該幹嗎幹嗎。sender 會繼續執行 chansend 函數裏 goparkunlock 函數以後的代碼,很不幸,檢測到 channel 已經關閉了,panic。receiver 則比較幸運,進行一些掃尾工做後,返回。這裏,selected 返回 true,而返回值 received 則要根據 channel 是否關閉,返回不一樣的值。若是 channel 關閉,received 爲 false,不然爲 true。這咱們分析的這種狀況下,received 返回 false。
總結一下操做 channel 的結果:
操做 | nil channel | closed channel | not nil, not closed channel |
---|---|---|---|
close | panic | panic | 正常關閉 |
讀 <- ch | 阻塞 | 讀到對應類型的零值 | 阻塞或正常讀取數據。緩衝型 channel 爲空或非緩衝型 channel 沒有等待發送者時會阻塞 |
寫 ch <- | 阻塞 | panic | 阻塞或正常寫入數據。非緩衝型 channel 沒有等待接收者或緩衝型 channel buf 滿時會被阻塞 |
總結一下,發生 panic 的狀況有三種:向一個關閉的 channel 進行寫操做;關閉一個 nil 的 channel;重複關閉一個 channel。
讀、寫一個 nil channel 都會被阻塞。
Channel 發送和接收元素的本質是什麼?參考資料【深刻 channel 底層】裏是這樣回答的:
Remember all transfer of value on the go channels happens with the copy of value.
就是說 channel 的發送和接收操做本質上都是 「值的拷貝」,不管是從 sender goroutine 的棧到 chan buf,仍是從 chan buf 到 receiver goroutine,或者是直接從 sender goroutine 到 receiver goroutine。
這裏再引用文中的一個例子,我會加上更加詳細地解釋。順帶說一下,這是一篇英文的博客,寫得很好,沒有像咱們這篇文章那樣大段的源碼分析,它是將代碼裏狀況拆開來各自描述的,各有利弊吧。推薦去讀下原文,閱讀體驗比較好。
type user struct { name string age int8 } var u = user{name: "Ankur", age: 25} var g = &u func modifyUser(pu *user) { fmt.Println("modifyUser Received Vaule", pu) pu.name = "Anand" } func printUser(u <-chan *user) { time.Sleep(2 * time.Second) fmt.Println("printUser goRoutine called", <-u) } func main() { c := make(chan *user, 5) c <- g fmt.Println(g) // modify g g = &user{name: "Ankur Anand", age: 100} go printUser(c) go modifyUser(g) time.Sleep(5 * time.Second) fmt.Println(g) }
運行結果:
&{Ankur 25} modifyUser Received Value &{Ankur Anand 100} printUser goRoutine called &{Ankur 25} &{Anand 100}
這裏就是一個很好的 share memory by communicating
的例子。
一開始構造一個結構體 u,地址是 0x56420,圖中地址上方就是它的內容。接着把 &u
賦值給指針 g
,g 的地址是 0x565bb0,它的內容就是一個地址,指向 u。
main 程序裏,先把 g 發送到 c,根據 copy value
的本質,進入到 chan buf 裏的就是 0x56420
,它是指針 g 的值(不是它指向的內容),因此打印從 channel 接收到的元素時,它就是 &{Ankur 25}
。所以,這裏並非將指針 g 「發送」 到了 channel 裏,只是拷貝它的值而已。
再強調一次:
Remember all transfer of value on the go channels happens with the copy of value.
Channel 可能會引起 goroutine 泄漏。
泄漏的緣由是 goroutine 操做 channel 後,處於發送或接收阻塞狀態,而 channel 處於滿或空的狀態,一直得不到改變。同時,垃圾回收器也不會回收此類資源,進而致使 gouroutine 會一直處於等待隊列中,不見天日。
雨痕老師的《Go 語言學習筆記》第 8 章通道的「資源泄露」一節舉了個例子,你們能夠本身去看。
維基百科上給的定義:
In computer science, the happened-before relation (denoted: ->) is a relation between the result of two events, such that if one event should happen before another event, the result must reflect that, even if those events are in reality executed out of order (usually to optimize program flow).
簡單來講就是若是事件 a 和事件 b 存在 happened-before 關係,即 a -> b,那麼 a,b 完成後的結果必定要體現這種關係。因爲現代編譯器、CPU 會作各類優化,包括編譯器重排、內存重排等等,在併發代碼裏,happened-before 限制就很是重要了。
根據晃嶽攀老師在 Gopher China 2019 上的併發編程分享,關於 channel 的發送(send)、發送完成(send finished)、接收(receive)、接收完成(receive finished)的 happened-before 關係以下:
send
必定 happened before
第 n 個 receive finished
,不管是緩衝型仍是非緩衝型的 channel。receive
必定 happened before
第 n+m 個 send finished
。receive
必定 happened before
第 n 個 send finished
。happened before
receiver 獲得通知。咱們來逐條解釋一下。
第一條,咱們從源碼的角度看也是對的,send 不必定是 happened before
receive,由於有時候是先 receive,而後 goroutine 被掛起,以後被 sender 喚醒,send happened after receive。但無論怎樣,要想完成接收,必定是要先有發送。
第二條,緩衝型的 channel,當第 n+m 個 send 發生後,有下面兩種狀況:
若第 n 個 receive 沒發生。這時,channel 被填滿了,send 就會被阻塞。那當第 n 個 receive 發生時,sender goroutine 會被喚醒,以後再繼續發送過程。這樣,第 n 個 receive
必定 happened before
第 n+m 個 send finished
。
若第 n 個 receive 已經發生過了,這直接就符合了要求。
第三條,也是比較好理解的。第 n 個 send 若是被阻塞,sender goroutine 掛起,第 n 個 receive 這時到來,先於第 n 個 send finished。若是第 n 個 send 未被阻塞,說明第 n 個 receive 早就在那等着了,它不只 happened before send finished,它還 happened before send。
第四條,回憶一下源碼,先設置完 closed = 1,再喚醒等待的 receiver,並將零值拷貝給 receiver。
參考資料【鳥窩 併發編程分享】這篇博文的評論區有 PPT 的下載連接,這是晁老師在 Gopher 2019 大會上的演講。
關於 happened before,這裏再介紹一個柴大和曹大的新書《Go 語言高級編程》裏面提到的一個例子。
書中 1.5 節先講了順序一致性的內存模型,這是併發編程的基礎。
咱們直接來看例子:
var done = make(chan bool) var msg string func aGoroutine() { msg = "hello, world" done <- true } func main() { go aGoroutine() <-done println(msg) }
先定義了一個 done channel 和一個待打印的字符串。在 main 函數裏,啓動一個 goroutine,等待從 done 裏接收到一個值後,執行打印 msg 的操做。若是 main 函數中沒有 <-done
這行代碼,打印出來的 msg 爲空,由於 aGoroutine 來不及被調度,還來不及給 msg 賦值,主程序就會退出。而在 Go 語言裏,主協程退出時不會等待其餘協程。
加了 <-done
這行代碼後,就會阻塞在此。等 aGoroutine 裏向 done 發送了一個值以後,纔會被喚醒,繼續執行打印 msg 的操做。而這在以前,msg 已經被賦值過了,因此會打印出 hello, world
。
這裏依賴的 happened before 就是前面講的第一條。第一個 send 必定 happened before 第一個 receive finished,即 done <- true
先於 <-done
發生,這意味着 main 函數裏執行完 <-done
後接着執行 println(msg)
這一行代碼時,msg 已經被賦過值了,因此會打印出想要的結果。
書中,又進一步利用前面提到的第 3 條 happened before 規則,修改了一下代碼:
var done = make(chan bool) var msg string func aGoroutine() { msg = "hello, world" <-done } func main() { go aGoroutine() done <- true println(msg) }
一樣能夠獲得相同的結果,爲何?根據第三條規則,對於非緩衝型的 channel,第一個 receive 必定 happened before 第一個 send finished。也就是說,
在 done <- true
完成以前,<-done
就已經發生了,也就意味着 msg 已經被賦上值了,最終也會打印出 hello, world
。
這部份內容主要來自 Go 101 上的一篇英文文章,參考資料【如何優雅地關閉 channel】能夠直達原文。
文章先「吐槽」了下 Go channel 在設計上的一些問題,接着給出了幾種不一樣狀況下如何優雅地關閉 channel 的例子。按照慣例,我會在原做者內容的基礎上給出本身的解讀,看完這一節你能夠再回頭看一下英文原文,會以爲頗有意思。
關於 channel 的使用,有幾點不方便的地方:
文中還真的就給出了一個檢查 channel 是否關閉的函數:
func IsClosed(ch <-chan T) bool { select { case <-ch: return true default: } return false } func main() { c := make(chan T) fmt.Println(IsClosed(c)) // false close(c) fmt.Println(IsClosed(c)) // true }
看一下代碼,其實存在不少問題。首先,IsClosed 函數是一個有反作用的函數。每調用一次,都會讀出 channel 裏的一個元素,改變了 channel 的狀態。這不是一個好的函數,幹活就幹活,還順手牽羊!
其次,IsClosed 函數返回的結果僅表明調用那個瞬間,並不能保證調用以後會不會有其餘 goroutine 對它進行了一些操做,改變了它的這種狀態。例如,IsClosed 函數返回 true,但這時有另外一個 goroutine 關閉了 channel,而你還拿着這個過期的 「channel 未關閉」的信息,向其發送數據,就會致使 panic 的發生。固然,一個 channel 不會被重複關閉兩次,若是 IsClosed 函數返回的結果是 true,說明 channel 是真的關閉了。
有一條普遍流傳的關閉 channel 的原則:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
不要從一個 receiver 側關閉 channel,也不要在有多個 sender 時,關閉 channel。
比較好理解,向 channel 發送元素的就是 sender,所以 sender 能夠決定什麼時候不發送數據,而且關閉 channel。可是若是有多個 sender,某個 sender 一樣無法肯定其餘 sender 的狀況,這時也不能貿然關閉 channel。
可是上面所說的並非最本質的,最本質的原則就只有一條:
don't close (or send values to) closed channels.
有兩個不那麼優雅地關閉 channel 的方法:
使用 defer-recover 機制,放心大膽地關閉 channel 或者向 channel 發送數據。即便發生了 panic,有 defer-recover 在兜底。
使用 sync.Once 來保證只關閉一次。
代碼我就不貼上來了,直接去看原文。
這一節的重頭戲來了,那應該如何優雅地關閉 channel?
根據 sender 和 receiver 的個數,分下面幾種狀況:
對於 1,2,只有一個 sender 的狀況就不用說了,直接從 sender 端關閉就行了,沒有問題。重點關注第 3,4 種狀況。
第 3 種情形下,優雅關閉 channel 的方法是:the only receiver says "please stop sending more" by closing an additional signal channel。
解決方案就是增長一個傳遞關閉信號的 channel,receiver 經過信號 channel 下達關閉數據 channel 指令。senders 監聽到關閉信號後,中止發送數據。我把代碼修改地更簡潔了:
func main() { rand.Seed(time.Now().UnixNano()) const Max = 100000 const NumSenders = 1000 dataCh := make(chan int, 100) stopCh := make(chan struct{}) // senders for i := 0; i < NumSenders; i++ { go func() { for { select { case <- stopCh: return case dataCh <- rand.Intn(Max): } } }() } // the receiver go func() { for value := range dataCh { if value == Max-1 { fmt.Println("send stop signal to senders.") close(stopCh) return } fmt.Println(value) } }() select { case <- time.After(time.Hour): } }
這裏的 stopCh 就是信號 channel,它自己只有一個 sender,所以能夠直接關閉它。senders 收到了關閉信號後,select 分支 「case <- stopCh」 被選中,退出函數,再也不發送數據。
須要說明的是,上面的代碼並無明確關閉 dataCh。在 Go 語言中,對於一個 channel,若是最終沒有任何 goroutine 引用它,無論 channel 有沒有被關閉,最終都會被 gc 回收。因此,在這種情形下,所謂的優雅地關閉 channel 就是不關閉 channel,讓 gc 代勞。
最後一種狀況,優雅關閉 channel 的方法是:any one of them says "let's end the game" by notifying a moderator to close an additional signal channel。
和第 3 種狀況不一樣,這裏有 M 個 receiver,若是直接仍是採起第 3 種解決方案,由 receiver 直接關閉 stopCh 的話,就會重複關閉一個 channel,致使 panic。所以須要增長一箇中間人,M 個 receiver 都向它發送關閉 dataCh 的「請求」,中間人收到第一個請求後,就會直接下達關閉 dataCh 的指令(經過關閉 stopCh,這時就不會發生重複關閉的狀況,由於 stopCh 的發送方只有中間人一個)。另外,這裏的 N 個 sender 也能夠向中間人發送關閉 dataCh 的請求。
func main() { rand.Seed(time.Now().UnixNano()) const Max = 100000 const NumReceivers = 10 const NumSenders = 1000 dataCh := make(chan int, 100) stopCh := make(chan struct{}) // It must be a buffered channel. toStop := make(chan string, 1) var stoppedBy string // moderator go func() { stoppedBy = <-toStop close(stopCh) }() // senders for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { select { case toStop <- "sender#" + id: default: } return } select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // receivers for i := 0; i < NumReceivers; i++ { go func(id string) { for { select { case <- stopCh: return case value := <-dataCh: if value == Max-1 { select { case toStop <- "receiver#" + id: default: } return } fmt.Println(value) } } }(strconv.Itoa(i)) } select { case <- time.After(time.Hour): } }
代碼裏 toStop 就是中間人的角色,使用它來接收 senders 和 receivers 發送過來的關閉 dataCh 請求。
這裏將 toStop 聲明成了一個 緩衝型的 channel。假設 toStop 聲明的是一個非緩衝型的 channel,那麼第一個發送的關閉 dataCh 請求可能會丟失。由於不管是 sender 仍是 receiver 都是經過 select 語句來發送請求,若是中間人所在的 goroutine 沒有準備好,那 select 語句就不會選中,直接走 default 選項,什麼也不作。這樣,第一個關閉 dataCh 的請求就會丟失。
若是,咱們把 toStop 的容量聲明成 Num(senders) + Num(receivers),那發送 dataCh 請求的部分能夠改爲更簡潔的形式:
... toStop := make(chan string, NumReceivers + NumSenders) ... value := rand.Intn(Max) if value == 0 { toStop <- "sender#" + id return } ... if value == Max-1 { toStop <- "receiver#" + id return } ...
直接向 toStop 發送請求,由於 toStop 容量足夠大,因此不用擔憂阻塞,天然也就不用 select 語句再加一個 default case 來避免阻塞。
能夠看到,這裏一樣沒有真正關閉 dataCh,原樣同第 3 種狀況。
以上,就是最基本的一些情形,但已經能覆蓋幾乎全部的狀況及其變種了。只要記住:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
以及更本質的原則:
don't close (or send values to) closed channels.
從一個有緩衝的 channel 裏讀數據,當 channel 被關閉,依然能讀出有效值。只有當返回的 ok 爲 false 時,讀出的數據纔是無效的。
func main() { ch := make(chan int, 5) ch <- 18 close(ch) x, ok := <-ch if ok { fmt.Println("received: ", x) } x, ok = <-ch if !ok { fmt.Println("channel closed, data invalid.") } }
運行結果:
received: 18 channel closed, data invalid.
先建立了一個有緩衝的 channel,向其發送一個元素,而後關閉此 channel。以後兩次嘗試從 channel 中讀取數據,第一次仍然能正常讀出值。第二次返回的 ok 爲 false,說明 channel 已關閉,且通道里沒有數據。
Channel 和 goroutine 的結合是 Go 併發編程的大殺器。而 Channel 的實際應用也常常讓人眼前一亮,經過與 select,cancel,timer 等結合,它能實現各類各樣的功能。接下來,咱們就要梳理一下 channel 的應用。
前面一節如何優雅關閉 channel 那一節已經講得不少了,這塊就略過了。
channel 用於中止信號的場景仍是挺多的,常常是關閉某個 channel 或者向 channel 發送一個元素,使得接收 channel 的那一方獲知道此信息,進而作一些其餘的操做。
與 timer 結合,通常有兩種玩法:實現超時控制,實現按期執行某個任務。
有時候,須要執行某項操做,但又不想它耗費太長時間,上一個定時器就能夠搞定:
select { case <-time.After(100 * time.Millisecond): case <-s.stopc: return false }
等待 100 ms 後,若是 s.stopc 尚未讀出數據或者被關閉,就直接結束。這是來自 etcd 源碼裏的一個例子,這樣的寫法隨處可見。
定時執行某個任務,也比較簡單:
func worker() { ticker := time.Tick(1 * time.Second) for { select { case <- ticker: // 執行定時任務 fmt.Println("執行 1s 定時任務") } } }
每隔 1 秒種,執行一次定時任務。
服務啓動時,啓動 n 個 worker,做爲工做協程池,這些協程工做在一個 for {}
無限循環裏,從某個 channel 消費工做任務並執行:
func main() { taskCh := make(chan int, 100) go worker(taskCh) // 塞任務 for i := 0; i < 10; i++ { taskCh <- i } // 等待 1 小時 select { case <-time.After(time.Hour): } } func worker(taskCh <-chan int) { const N = 5 // 啓動 5 個工做協程 for i := 0; i < N; i++ { go func(id int) { for { task := <- taskCh fmt.Printf("finish task: %d by worker %d\n", task, id) time.Sleep(time.Second) } }(i) } }
5 個工做協程在不斷地從工做隊列裏取任務,生產方只管往 channel 發送任務便可,解耦生產方和消費方。
程序輸出:
finish task: 1 by worker 4 finish task: 2 by worker 2 finish task: 4 by worker 3 finish task: 3 by worker 1 finish task: 0 by worker 0 finish task: 6 by worker 0 finish task: 8 by worker 3 finish task: 9 by worker 1 finish task: 7 by worker 4 finish task: 5 by worker 2
有時須要定時執行幾百個任務,例如天天定時按城市來執行一些離線計算的任務。可是併發數又不能過高,由於任務執行過程依賴第三方的一些資源,對請求的速率有限制。這時就能夠經過 channel 來控制併發數。
下面的例子來自《Go 語言高級編程》:
var limit = make(chan int, 3) func main() { // ………… for _, w := range work { go func() { limit <- 1 w() <-limit }() } // ………… }
構建一個緩衝型的 channel,容量爲 3。接着遍歷任務列表,每一個任務啓動一個 goroutine 去完成。真正執行任務,訪問第三方的動做在 w() 中完成,在執行 w() 以前,先要從 limit 中拿「許可證」,拿到許可證以後,才能執行 w(),而且在執行完任務,要將「許可證」歸還。這樣就能夠控制同時運行的 goroutine 數。
這裏,limit <- 1
放在 func 內部而不是外部,書籍做者柴大在讀者羣裏的解釋是:
若是在外層,就是控制系統 goroutine 的數量,可能會阻塞 for 循環,影響業務邏輯。
limit 其實和邏輯無關,只是性能調優,放在內層和外層的語義不太同樣。
還有一點要注意的是,若是 w() 發生 panic,那「許可證」可能就還不回去了,所以須要使用 defer 來保證。
終於寫完了,你也終於看完了,恭喜!
回顧一下,這篇文章先從併發和並行講起,又講到了 CSP,Go 語言用 channel 實現 CSP。接着講了什麼是 channel,爲何須要 channel,而後詳細分析了 channel 的實現原理,這也是全文最重要的部分。以後,又講了幾個進階的例子,最後,列舉了幾個 channel 應用的場景。
但願你們能借助本文去讀一下 Go 源碼,這部分源碼也不長,和 context 包同樣,短小精悍,值得一讀。
我在參考資料裏列舉了不少文章、書籍,不少都值得去細看,我在文中也有說起。
當你理解這 channel 的底層原理後,再去看這些英文文章,會以爲頗有意思。之前對他有一種「畏難」心理,理解了以後再讀,就會以爲頗有意思,由於你確實都能看懂。
最後,閱讀愉快!
【Concurrency In Go】https://github.com/arpitjindal97/technology_books/blob/master/Golang/Concurrency-in-Go:Tools-and-Techniques-for-Developers.pdf
【Go 語言高級編程開源書】https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-05-mem.html
【簡潔清晰明瞭】http://litang.me/post/golang-channel/
【柴大 && 曹大 《Go語言高級編程》】https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-05-mem.html
【Go 併發編程實戰】https://book.douban.com/subject/26244729/
【曹大 golang notes】https://github.com/cch123/golang-notes/blob/master/channel.md
【互聯網技術窩 圖解 channel 實現 動畫】https://mp.weixin.qq.com/s/40uxAPdubIk0lU321LmfRg
【一塊兒學 Golang,推薦的資料很是有用】http://www.javashuo.com/article/p-rxijdptq-ex.html
【如何優雅地關閉 channel】https://go101.org/article/channel-closing.html
【深刻 channel 底層】https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8
【Kavya在Gopher Con 上關於 channel 的設計,很是好】https://speakerd.s3.amazonaws.com/presentations/10ac0b1d76a6463aa98ad6a9dec917a7/GopherCon_v10.0.pdf
【channel 應用】https://www.s0nnet.com/archives/go-channels-practice
【應用舉例】https://zhuyasen.com/post/go_queue.html
【應用】https://tonybai.com/2014/09/29/a-channel-compendium-for-golang/
【鳥窩 併發編程分享】https://colobu.com/2019/04/28/gopher-2019-concurrent-in-action/
【Go-Questions,碼農桃花源項目】https://github.com/qcrao/Go-Questions
【GitBook 碼農桃花源開源書】https://qcrao91.gitbook.io/go/