併發(並行),一直以來都是一個編程語言裏的核心主題之一,也是被開發者關注最多的話題;Go語言做爲一個出道以來就自帶 『高併發』光環的富二代編程語言,它的併發(並行)編程確定是值得開發者去探究的,而Go語言中的併發(並行)編程是經由goroutine實現的,goroutine是golang最重要的特性之一,具備使用成本低、消耗資源低、能效高等特色,官方宣稱原生goroutine併發成千上萬不成問題,因而它也成爲Gopher們常用的特性。php
Goroutine是優秀的,但不是完美的,在極大規模的高併發場景下,也可能會暴露出問題,什麼問題呢?又有什麼可選的解決方案?本文將經過runtime對goroutine的調度分析,幫助你們理解它的機理和發現一些內存和調度的原理和問題,而且基於此提出一種我的的解決方案 — 一個高性能的Goroutine Pool(協程池)。css
Goroutine & Scheduler
Goroutine,Go語言基於併發(並行)編程給出的自家的解決方案。goroutine是什麼?一般goroutine會被當作coroutine(協程)的 golang實現,從比較粗淺的層面來看,這種認知也算是合理,但實際上,goroutine並不是傳統意義上的協程,如今主流的線程模型分三種:內核級線程模型、用戶級線程模型和兩級線程模型(也稱混合型線程模型),傳統的協程庫屬於用戶級線程模型,而goroutine和它的Go Scheduler
在底層實現上實際上是屬於兩級線程模型,所以,有時候爲了方便理解能夠簡單把goroutine類比成協程,但內心必定要有個清晰的認知 — goroutine並不等同於協程。java
線程那些事兒
互聯網時代以降,因爲在線用戶數量的爆炸,單臺服務器處理的鏈接也水漲船高,迫使編程模式由從前的串行模式升級到併發模型,而幾十年來,併發模型也是一代代地升級,有IO多路複用、多進程以及多線程,這幾種模型都各有長短,現代複雜的高併發架構大可能是幾種模型協同使用,不一樣場景應用不一樣模型,揚長避短,發揮服務器的最大性能,而多線程,由於其輕量和易用,成爲併發編程中使用頻率最高的併發模型,然後衍生的協程等其餘子產品,也都基於它,而咱們今天要分析的 goroutine 也是基於線程,所以,咱們先來聊聊線程的三大模型:python
線程的實現模型主要有3種:內核級線程模型、用戶級線程模型和兩級線程模型(也稱混合型線程模型),它們之間最大的差別就在於用戶線程與內核調度實體(KSE,Kernel Scheduling Entity)之間的對應關係上。而所謂的內核調度實體 KSE 就是指能夠被操做系統內核調度器調度的對象實體(這說的啥玩意兒,敢不敢通俗易懂一點?)。簡單來講 KSE 就是內核級線程,是操做系統內核的最小調度單元,也就是咱們寫代碼的時候通俗理解上的線程了(這麼說不就懂了嘛!裝什麼13)。git
用戶級線程模型
用戶線程與內核線程KSE是多對一(N : 1)的映射模型,多個用戶線程的通常從屬於單個進程而且多線程的調度是由用戶本身的線程庫來完成,線程的建立、銷燬以及多線程之間的協調等操做都是由用戶本身的線程庫來負責而無須藉助系統調用來實現。一個進程中全部建立的線程都只和同一個KSE在運行時動態綁定,也就是說,操做系統只知道用戶進程而對其中的線程是無感知的,內核的全部調度都是基於用戶進程。許多語言實現的 協程庫 基本上都屬於這種方式(好比python的gevent)。因爲線程調度是在用戶層面完成的,也就是相較於內核調度不須要讓CPU在用戶態和內核態之間切換,這種實現方式相比內核級線程能夠作的很輕量級,對系統資源的消耗會小不少,所以能夠建立的線程數量與上下文切換所花費的代價也會小得多。但該模型有個原罪:並不能作到真正意義上的併發,假設在某個用戶進程上的某個用戶線程由於一個阻塞調用(好比I/O阻塞)而被CPU給中斷(搶佔式調度)了,那麼該進程內的全部線程都被阻塞(由於單個用戶進程內的線程自調度是沒有CPU時鐘中斷的,從而沒有輪轉調度),整個進程被掛起。即使是多CPU的機器,也無濟於事,由於在用戶級線程模型下,一個CPU關聯運行的是整個用戶進程,進程內的子線程綁定到CPU執行是由用戶進程調度的,內部線程對CPU是不可見的,此時能夠理解爲CPU的調度單位是用戶進程。因此不少的協程庫會把本身一些阻塞的操做從新封裝爲徹底的非阻塞形式,而後在之前要阻塞的點上,主動讓出本身,並經過某種方式通知或喚醒其餘待執行的用戶線程在該KSE上運行,從而避免了內核調度器因爲KSE阻塞而作上下文切換,這樣整個進程也不會被阻塞了。github
內核級線程模型
用戶線程與內核線程KSE是一對一(1 : 1)的映射模型,也就是每個用戶線程綁定一個實際的內核線程,而線程的調度則徹底交付給操做系統內核去作,應用程序對線程的建立、終止以及同步都基於內核提供的系統調用來完成,大部分編程語言的線程庫(好比Java的java.lang.Thread、C++11的std::thread等等)都是對操做系統的線程(內核級線程)的一層封裝,建立出來的每一個線程與一個獨立的KSE靜態綁定,所以其調度徹底由操做系統內核調度器去作。這種模型的優點和劣勢一樣明顯:優點是實現簡單,直接藉助操做系統內核的線程以及調度器,因此CPU能夠快速切換調度線程,因而多個線程能夠同時運行,所以相較於用戶級線程模型它真正作到了並行處理;但它的劣勢是,因爲直接藉助了操做系統內核來建立、銷燬和以及多個線程之間的上下文切換和調度,所以資源成本大幅上漲,且對性能影響很大。golang
兩級線程模型
兩級線程模型是博採衆長以後的產物,充分吸取前兩種線程模型的優勢且儘可能規避它們的缺點。在此模型下,用戶線程與內核KSE是多對多(N : M)的映射模型:首先,區別於用戶級線程模型,兩級線程模型中的一個進程能夠與多個內核線程KSE關聯,因而進程內的多個線程能夠綁定不一樣的KSE,這點和內核級線程模型類似;其次,又區別於內核級線程模型,它的進程裏的全部線程並不與KSE一一綁定,而是能夠動態綁定同一個KSE, 當某個KSE由於其綁定的線程的阻塞操做被內核調度出CPU時,其關聯的進程中其他用戶線程能夠從新與其餘KSE綁定運行。因此,兩級線程模型既不是用戶級線程模型那種徹底靠本身調度的也不是內核級線程模型徹底靠操做系統調度的,而是中間態(自身調度與系統調度協同工做),也就是 — 『薛定諤的模型』(誤),由於這種模型的高度複雜性,操做系統內核開發者通常不會使用,因此更多時候是做爲第三方庫的形式出現,而Go語言中的runtime調度器就是採用的這種實現方案,實現了Goroutine與KSE之間的動態關聯,不過Go語言的實現更加高級和優雅;該模型爲什麼被稱爲兩級?即用戶調度器實現用戶線程到KSE的『調度』,內核調度器實現KSE到CPU上的『調度』。web
G-P-M 模型概述
每個OS線程都有一個固定大小的內存塊(通常會是2MB)來作棧,這個棧會用來存儲當前正在被調用或掛起(指在調用其它函數時)的函數的內部變量。這個固定大小的棧同時很大又很小。由於2MB的棧對於一個小小的goroutine來講是很大的內存浪費,而對於一些複雜的任務(如深度嵌套的遞歸)來講又顯得過小。所以,Go語言作了它本身的『線程』。算法
在Go語言中,每個goroutine是一個獨立的執行單元,相較於每一個OS線程固定分配2M內存的模式,goroutine的棧採起了動態擴容方式, 初始時僅爲2KB,隨着任務執行按需增加,最大可達1GB(64位機器最大是1G,32位機器最大是256M),且徹底由golang本身的調度器 Go Scheduler 來調度。此外,GC還會週期性地將再也不使用的內存回收,收縮棧空間。 所以,Go程序能夠同時併發成千上萬個goroutine是得益於它強勁的調度器和高效的內存模型。Go的創造者大概對goroutine的定位就是屠龍刀,由於他們不只讓goroutine做爲golang併發編程的最核心組件(開發者的程序都是基於goroutine運行的)並且golang中的許多標準庫的實現也處處能見到goroutine的身影,好比net/http這個包,甚至語言自己的組件runtime運行時和GC垃圾回收器都是運行在goroutine上的,做者對goroutine的厚望可見一斑。編程
任何用戶線程最終確定都是要交由OS線程來執行的,goroutine(稱爲G)也不例外,可是G並不直接綁定OS線程運行,而是由Goroutine Scheduler中的 P - Logical Processor (邏輯處理器)來做爲二者的『中介』,P能夠看做是一個抽象的資源或者一個上下文,一個P綁定一個OS線程,在golang的實現裏把OS線程抽象成一個數據結構:M,G其實是由M經過P來進行調度運行的,可是在G的層面來看,P提供了G運行所需的一切資源和環境,所以在G看來P就是運行它的 「CPU」,由 G、P、M 這三種由Go抽象出來的實現,最終造成了Go調度器的基本結構:
- G: 表示Goroutine,每一個Goroutine對應一個G結構體,G存儲Goroutine的運行堆棧、狀態以及任務函數,可重用。G並不是執行體,每一個G須要綁定到P才能被調度執行。
- P: Processor,表示邏輯處理器, 對G來講,P至關於CPU核,G只有綁定到P(在P的local runq中)才能被調度。對M來講,P提供了相關的執行環境(Context),如內存分配狀態(mcache),任務隊列(G)等,P的數量決定了系統內最大可並行的G的數量(前提:物理CPU核數 >= P的數量),P的數量由用戶設置的GOMAXPROCS決定,可是不論GOMAXPROCS設置爲多大,P的數量最大爲256。
- M: Machine,OS線程抽象,表明着真正執行計算的資源,在綁定有效的P後,進入schedule循環;而schedule循環的機制大體是從Global隊列、P的Local隊列以及wait隊列中獲取G,切換到G的執行棧上並執行G的函數,調用goexit作清理工做並回到M,如此反覆。M並不保留G狀態,這是G能夠跨M調度的基礎,M的數量是不定的,由Go Runtime調整,爲了防止建立過多OS線程致使系統調度不過來,目前默認最大限制爲10000個。
關於P,咱們須要再絮叨幾句,在Go 1.0發佈的時候,它的調度器其實G-M模型,也就是沒有P的,調度過程全由G和M完成,這個模型暴露出一些問題:
- 單一全局互斥鎖(Sched.Lock)和集中狀態存儲的存在致使全部goroutine相關操做,好比:建立、從新調度等都要上鎖;
- goroutine傳遞問題:M常常在M之間傳遞『可運行』的goroutine,這致使調度延遲增大以及額外的性能損耗;
- 每一個M作內存緩存,致使內存佔用太高,數據局部性較差;
- 因爲syscall調用而造成的劇烈的worker thread阻塞和解除阻塞,致使額外的性能損耗。
這些問題實在太扎眼了,致使Go1.0雖然號稱原生支持併發,卻在併發性能上一直飽受詬病,而後,Go語言委員會中一個核心開發大佬看不下了,親自下場從新設計和實現了Go調度器(在原有的G-M模型中引入了P)而且實現了一個叫作 work-stealing 的調度算法:
- 每一個P維護一個G的本地隊列;
- 當一個G被建立出來,或者變爲可執行狀態時,就把他放到P的可執行隊列中;
- 當一個G在M裏執行結束後,P會從隊列中把該G取出;若是此時P的隊列爲空,即沒有其餘G能夠執行, M就隨機選擇另一個P,從其可執行的G隊列中取走一半。
該算法避免了在goroutine調度時使用全局鎖。
至此,Go調度器的基本模型確立:
G-P-M 模型調度
Go調度器工做時會維護兩種用來保存G的任務隊列:一種是一個Global任務隊列,一種是每一個P維護的Local任務隊列。
當經過go
關鍵字建立一個新的goroutine的時候,它會優先被放入P的本地隊列。爲了運行goroutine,M須要持有(綁定)一個P,接着M會啓動一個OS線程,循環從P的本地隊列裏取出一個goroutine並執行。固然還有上文說起的 work-stealing
調度算法:當M執行完了當前P的Local隊列裏的全部G後,P也不會就這麼在那躺屍啥都不幹,它會先嚐試從Global隊列尋找G來執行,若是Global隊列爲空,它會隨機挑選另一個P,從它的隊列裏中拿走一半的G到本身的隊列中執行。
若是一切正常,調度器會以上述的那種方式順暢地運行,但這個世界沒這麼美好,總有意外發生,如下分析goroutine在兩種例外狀況下的行爲。
Go runtime會在下面的goroutine被阻塞的狀況下運行另一個goroutine:
- blocking syscall (for example opening a file)
- network input
- channel operations
- primitives in the sync package
這四種場景又可歸類爲兩種類型:
用戶態阻塞/喚醒
當goroutine由於channel操做或者network I/O而阻塞時(實際上golang已經用netpoller實現了goroutine網絡I/O阻塞不會致使M被阻塞,僅阻塞G,這裏僅僅是舉個栗子),對應的G會被放置到某個wait隊列(如channel的waitq),該G的狀態由_Gruning
變爲_Gwaitting
,而M會跳過該G嘗試獲取並執行下一個G,若是此時沒有runnable的G供M運行,那麼M將解綁P,並進入sleep狀態;當阻塞的G被另外一端的G2喚醒時(好比channel的可讀/寫通知),G被標記爲runnable,嘗試加入G2所在P的runnext,而後再是P的Local隊列和Global隊列。
系統調用阻塞
當G被阻塞在某個系統調用上時,此時G會阻塞在_Gsyscall
狀態,M也處於 block on syscall 狀態,此時的M可被搶佔調度:執行該G的M會與P解綁,而P則嘗試與其它idle的M綁定,繼續執行其它G。若是沒有其它idle的M,但P的Local隊列中仍然有G須要執行,則建立一個新的M;當系統調用完成後,G會從新嘗試獲取一個idle的P進入它的Local隊列恢復執行,若是沒有idle的P,G會被標記爲runnable加入到Global隊列。
以上就是從宏觀的角度對Goroutine和它的調度器進行的一些概要性的介紹,固然,Go的調度中更復雜的搶佔式調度、阻塞調度的更多細節,你們能夠自行去找相關資料深刻理解,本文只講到Go調度器的基本調度過程,爲後面本身實現一個Goroutine Pool提供理論基礎,這裏便再也不繼續深刻上述說的那幾個調度了,事實上若是要徹底講清楚Go調度器,一篇文章的篇幅也實在是捉襟見肘,因此想了解更多細節的同窗能夠去看看Go調度器 G-P-M 模型的設計者 Dmitry Vyukov 寫的該模型的設計文檔《Go Preemptive Scheduler Design》以及直接去看源碼,G-P-M模型的定義放在src/runtime/runtime2.go
裏面,而調度過程則放在了src/runtime/proc.go
裏。
大規模Goroutine的瓶頸
既然Go調度器已經這麼~~牛逼~~優秀了,咱們爲何還要本身去實現一個golang的 Goroutine Pool 呢?事實上,優秀不表明完美,任何不考慮具體應用場景的編程模式都是耍流氓!有基於G-P-M的Go調度器背書,go程序的併發編程中,能夠任性地起大規模的goroutine來執行任務,官方也宣稱用golang寫併發程序的時候隨便起個成千上萬的goroutine毫無壓力。
然而,你起1000個goroutine沒有問題,10000也沒有問題,10w個可能也沒問題;那,100w個呢?1000w個呢?(這裏只是舉個極端的例子,實際編程起這麼大規模的goroutine的例子極少)這裏就會出問題,什麼問題呢?
- 首先,即使每一個goroutine只分配2KB的內存,但若是是恐怖如斯的數量,聚少成多,內存暴漲,就會對GC形成極大的負擔,寫過java的同窗應該知道jvm GC那萬惡的STW(Stop The World)機制,也就是GC的時候會掛起用戶程序直到垃圾回收完,雖然Go1.8以後的GC已經去掉了STW以及優化成了並行GC,性能上有了不小的提高,可是,若是太過於頻繁地進行GC,依然會有性能瓶頸;
- 其次,還記得前面咱們說的runtime和GC也都是goroutine嗎?是的,若是goroutine規模太大,內存吃緊,runtime調度和垃圾回收一樣會出問題,雖然G-P-M模型足夠優秀,韓信點兵,多多益善,但你不能不給士兵發口糧(內存)吧?巧婦難爲無米之炊,沒有內存,Go調度器就會阻塞goroutine,結果就是P的Local隊列積壓,又致使內存溢出,這就是個死循環...,甚至極有可能程序直接Crash掉,原本是想享受golang併發帶來的~~快感~~效益,結果卻得不償失。
一個http標準庫引起的血案
我想,做爲golang擁躉的Gopher們必定都使用過它的net/http標準庫,不少人都說用golang寫web server徹底能夠不用藉助第三方的web framework,僅用net/http標準庫就能寫一個高性能的web server,的確,我也用過它寫過web server,簡潔高效,性能表現也至關不錯,除非有比較特殊的需求不然通常的確不用藉助第三方web framework,可是天下沒有白吃的午飯,net/http爲啥這麼快?要搞清這個問題,從源碼入手是最好的途徑。孔子曾經曰過:源碼面前,如同裸奔。因此,~~高清~~無碼是阻礙程序猿發展大大滴絆腳石啊,源碼纔是咱們進步階梯,切記切記!
接下來咱們就來先看看net/http內部是怎麼實現的。
net/http接收請求且開始處理的源碼放在src/net/http/server.go
裏,先從入口函數ListenAndServe
進去:
func (srv *Server) ListenAndServe() error { addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp", addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) }
看到最後那個srv.Serve調用了嗎?沒錯,這個Serve
方法裏面就是實際處理http請求的邏輯,咱們再進入這個方法內部:
func (srv *Server) Serve(l net.Listener) error { defer l.Close() ... // 不斷循環取出TCP鏈接 for { // 看我看我!!! rw, e := l.Accept() ... // 再看我再看我!!! go c.serve(ctx) } }
首先,這個方法的參數(l net.Listener)
,是一個TCP監聽的封裝,負責監聽網絡端口,rw, e := l.Accept()
則是一個阻塞操做,從網絡端口取出一個新的TCP鏈接進行處理,最後go c.serve(ctx)
就是最後真正去處理這個http請求的邏輯了,看到前面的go關鍵字了嗎?沒錯,這裏啓動了一個新的goroutine去執行處理邏輯,並且這是在一個無限循環體裏面,因此意味着,每來一個請求它就會開一個goroutine去處理,至關任性粗暴啊…,不過有Go調度器背書,通常來講也沒啥壓力,然而,若是,我是說若是哈,忽然一大波請求涌進來了(比方說黑客搞了成千上萬的肉雞DDOS你,沒錯!就這麼倒黴!),這時候,就很成問題了,他來10w個請求你就要開給他10w個goroutine,來100w個你就要老老實實開給他100w個,線程調度壓力陡升,內存爆滿,再而後,你就跪了…
釜底抽薪
有問題,就必定有解決的辦法,那麼,有什麼方案能夠減緩大規模goroutine對系統的調度和內存壓力?要想解決問題,最重要的是找到形成問題的根源,這個問題根源是什麼?goroutine的數量過多致使資源侵佔,那要解決這個問題就要限制運行的goroutine數量,合理複用,節省資源,具體就是 — goroutine池化。
超大規模併發的場景下,不加限制的大規模的goroutine可能形成內存暴漲,給機器帶來極大的壓力,吞吐量降低和處理速度變慢仍是其次,更危險的是可能使得程序crash。因此,goroutine池化是有其現實意義的。
首先,100w個任務,是否是真的須要100w個goroutine來處理?未必!用1w個goroutine也同樣能夠處理,讓一個goroutine多處理幾個任務就是了嘛,池化的核心優點就在於對goroutine的複用。此舉首先極大減輕了runtime調度goroutine的壓力,其次,即是下降了對內存的消耗。
有一個商場,來了1000個顧客買東西,那麼該如何安排導購員服務這1000人呢?有兩種方案:
第一,我僱1000個導購員實行一對一服務,這種固然是最高效的,可是太浪費資源了,僱1000我的的成本極高且管理困難,這些能夠先按下不表,可是每一個顧客到商場買東西也不是一進來就立刻買,通常都得逛一逛,選一選,也就是得花時間挑,1000個導購員一對一盯着,效率極低;這就引出第二種方案:我只僱10個導購員,就在商場裏待命,有顧客須要諮詢的時候招呼導購員過去進行處理,導購員處理完以後就回來,等下一個顧客須要諮詢的時候再去,如此往返反覆...
第二種方案有沒有以爲很眼熟?沒錯,其基本思路就是模擬一個I/O多路複用,經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。關於多路複用,不在本文的討論範圍以內,便再也不贅述,詳細原理能夠參考 I/O多路複用。
第一種方案就是net/http標準庫採用的:來一個請求開一個goroutine處理;第二種方案就是Goroutine Pool(I/O多路複用)。
實現一個 Goroutine Pool
由於上述陳列的一些因爲goroutine規模過大而可能引起的問題,須要有方案來解決這些問題,上文已經分析過,把goroutine池化是一種行之有效的方案,基於此,能夠實現一個Goroutine Pool,複用goroutine,減輕runtime的調度壓力以及緩解內存壓力,依託這些優化,在大規模goroutine併發的場景下能夠極大地提升併發性能。
哎瑪!前面絮絮不休了這麼多,終於進入正題了,接下來就開始講解如何實現一個高性能的Goroutine Pool,秒殺原生併發的goroutine,在執行速度和佔用內存上提升併發程序的性能。好了,話很少說,開始~~裝逼~~分析。
設計思路
Goroutine Pool 的實現思路大體以下:
啓動服務之時先初始化一個 Goroutine Pool 池,這個Pool維護了一個相似棧的LIFO隊列 ,裏面存放負責處理任務的Worker,而後在client端提交task到Pool中以後,在Pool內部,接收task以後的核心操做是:
- 檢查當前Worker隊列中是否有空閒的Worker,若是有,取出執行當前的task;
- 沒有空閒Worker,判斷當前在運行的Worker是否已超過該Pool的容量,是 — 阻塞等待直至有Worker被放回Pool;否 — 新開一個Worker(goroutine)處理;
- 每一個Worker執行完任務以後,放回Pool的隊列中等待。
調度過程以下:
按照這個設計思路,我實現了一個高性能的Goroutine Pool,較好地解決了上述的大規模調度和資源佔用的問題,在執行速度和內存佔用方面相較於原生goroutine併發佔有明顯的優點,尤爲是內存佔用,由於複用,因此規避了無腦啓動大規模goroutine的弊端,能夠節省大量的內存。
完整的項目代碼能夠在個人github上獲取:傳送門,也歡迎提意見和交流。
實現細節
Goroutine Pool的設計原理前面已經講過了,整個調度過程相信你們應該能夠理解了,可是有一句老話說得好,空談誤國,實幹興邦,設計思路有了,具體實現的時候確定會有不少細節、難點,接下來咱們經過分析這個Goroutine Pool的幾個核心實現以及它們的聯動來引導你們過一遍Goroutine Pool的原理。
首先是Pool struct
:
type sig struct{} type f func() error // Pool accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. capacity int32 // running is the number of the currently running goroutines. running int32 // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig // workers is a slice that store the available workers. workers []*Worker // release is used to notice the pool to closed itself. release chan sig // lock for synchronous operation lock sync.Mutex once sync.Once }
Pool
是一個通用的協程池,支持不一樣類型的任務,亦即每個任務綁定一個函數提交到池中,批量執行不一樣類型任務,是一種廣義的協程池;本項目中還實現了另外一種協程池 — 批量執行同類任務的協程池PoolWithFunc
,每個PoolWithFunc
只會綁定一個任務函數pf
,這種Pool適用於大批量相同任務的場景,由於每一個Pool只綁定一個任務函數,所以PoolWithFunc
相較於Pool
會更加節省內存,但通用性就不如前者了,爲了讓你們更好地理解協程池的原理,這裏咱們用通用的Pool
來分析。
capacity
是該Pool的容量,也就是開啓worker數量的上限,每個worker綁定一個goroutine;running
是當前正在執行任務的worker數量;freeSignal
是一個信號,由於Pool開啓的worker數量有上限,所以當所有worker都在執行任務的時候,新進來的請求就須要阻塞等待,那當執行完任務的worker被放回Pool之時,如何通知阻塞的請求綁定一個空閒的worker運行呢?freeSignal
就是來作這個事情的;workers
是一個slice,用來存放空閒worker,請求進入Pool以後會首先檢查workers
中是否有空閒worker,如有則取出綁定任務執行,不然判斷當前運行的worker是否已經達到容量上限,是—阻塞等待,否—新開一個worker執行任務;release
是當關閉該Pool支持通知全部worker退出運行以防goroutine泄露;lock
是一個鎖,用以支持Pool的同步操做;once
用在確保Pool關閉操做只會執行一次。
提交任務到Pool
p.Submit(task f)
以下:
// Submit submit a task to pool func (p *Pool) Submit(task f) error { if len(p.release) > 0 { return ErrPoolClosed } w := p.getWorker() w.sendTask(task) return nil }
第一個if判斷當前Pool是否已被關閉,如果則再也不接受新任務,不然獲取一個Pool中可用的worker,綁定該task
執行。
獲取可用worker(核心)
p.getWorker()
源碼:
// getWorker returns a available worker to run the tasks. func (p *Pool) getWorker() *Worker { var w *Worker // 標誌,表示當前運行的worker數量是否已達容量上限 waiting := false // 涉及從workers隊列取可用worker,須要加鎖 p.lock.Lock() workers := p.workers n := len(workers) - 1 // 當前worker隊列爲空(無空閒worker) if n < 0 { // 運行worker數目已達到該Pool的容量上限,置等待標誌 if p.running >= p.capacity { waiting = true // 不然,運行數目加1 } else { p.running++ } // 有空閒worker,從隊列尾部取出一個使用 } else { <-p.freeSignal w = workers[n] workers[n] = nil p.workers = workers[:n] } // 判斷是否有worker可用結束,解鎖 p.lock.Unlock() if waiting { // 阻塞等待直到有空閒worker <-p.freeSignal p.lock.Lock() workers = p.workers l := len(workers) - 1 w = workers[l] workers[l] = nil p.workers = workers[:l] p.lock.Unlock() // 當前無空閒worker可是Pool尚未滿, // 則能夠直接新開一個worker執行任務 } else if w == nil { w = &Worker{ pool: p, task: make(chan f), } w.run() } return w }
上面的源碼中加了較爲詳細的註釋,結合前面的設計思路,相信你們應該能理解獲取可用worker綁定任務執行這個協程池的核心操做,主要就是實現一個LIFO隊列用來存取可用worker達到資源複用的效果,這裏主要關注一個地方:達到Pool容量限制以後,額外的任務請求須要阻塞等待idle worker,這裏是爲了防止無節制地建立goroutine,事實上Go調度器有一個複用機制,每次使用go
關鍵字的時候它會檢查當前結構體M中的P中,是否有可用的結構體G。若是有,則直接從中取一個,不然,須要分配一個新的結構體G。若是分配了新的G,須要將它掛到runtime的相關隊列中,可是調度器卻沒有限制goroutine的數量,這在瞬時性goroutine爆發的場景下就可能來不及複用G而依然建立了大量的goroutine,因此ants
除了複用還作了限制goroutine數量。
其餘部分能夠依照註釋理解,這裏再也不贅述。
任務執行
// Worker is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. type Worker struct { // pool who owns this worker. pool *Pool // task is a job should be done. task chan f } // run starts a goroutine to repeat the process // that performs the function calls. func (w *Worker) run() { //atomic.AddInt32(&w.pool.running, 1) go func() { //監放任務列表,一旦有任務立馬取出運行 for f := range w.task { if f == nil { atomic.AddInt32(&w.pool.running, -1) return } f() //回收複用 w.pool.putWorker(w) } }() } // stop this worker. func (w *Worker) stop() { w.sendTask(nil) } // sendTask sends a task to this worker. func (w *Worker) sendTask(task f) { w.task <- task }
Worker回收(goroutine複用)
// putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { p.lock.Lock() p.workers = append(p.workers, worker) p.lock.Unlock() p.freeSignal <- sig{} }
結合前面的p.Submit(task f)
和p.getWorker()
,提交任務到Pool以後,獲取一個可用worker,每新建一個worker實例之時都須要調用w.run()
啓動一個goroutine監聽worker的任務列表task
,一有任務提交進來就執行;因此,當調用worker的sendTask(task f)
方法提交任務到worker的任務隊列以後,立刻就能夠被接收並執行,當任務執行完以後,會調用w.pool.putWorker(w *Worker)
方法將這個已經執行完任務的worker從當前任務解綁放回Pool中,以供下個任務可使用,至此,一個任務從提交到完成的過程就此結束,Pool調度將進入下一個循環。
動態擴容或者縮小池容量
// ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { if size < p.Cap() { diff := p.Cap() - size for i := 0; i < diff; i++ { p.getWorker().stop() } } else if size == p.Cap() { return } atomic.StoreInt32(&p.capacity, int32(size)) }
歸納起來,ants
Goroutine Pool的調度過程圖示以下:
彩蛋
還記得前面我說除了通用的Pool struct
以外,本項目還實現了一個PoolWithFunc struct
—一個執行批量同類任務的協程池,PoolWithFunc
相較於Pool
,由於一個池只綁定一個任務函數,省去了每一次task都須要傳送一個任務函數的代價,所以其性能優點比起Pool
更明顯,這裏咱們稍微講一下一個協程池只綁定一個任務函數的細節:
上碼!
type pf func(interface{}) error // PoolWithFunc accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { // capacity of the pool. capacity int32 // running is the number of the currently running goroutines. running int32 // freeSignal is used to notice pool there are available // workers which can be sent to work. freeSignal chan sig // workers is a slice that store the available workers. workers []*WorkerWithFunc // release is used to notice the pool to closed itself. release chan sig // lock for synchronous operation lock sync.Mutex // pf is the function for processing tasks poolFunc pf once sync.Once }
PoolWithFunc struct
中的大部分字段和Pool struct
基本一致,重點關注poolFunc pf
,這是一個函數類型,也就是該Pool綁定的指定任務函數,而client提交到這種類型的Pool的數據就再也不是一個任務函數task f
了,而是poolFunc pf
任務函數的形參,而後交由WorkerWithFunc
處理:
// WorkerWithFunc is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. type WorkerWithFunc struct { // pool who owns this worker. pool *PoolWithFunc // args is a job should be done. args chan interface{} } // run starts a goroutine to repeat the process // that performs the function calls. func (w *WorkerWithFunc) run() { go func() { for args := range w.args { if args == nil || len(w.pool.release) > 0 { atomic.AddInt32(&w.pool.running, -1) return } w.pool.poolFunc(args) w.pool.putWorker(w) } }() }
上面的源碼能夠看到WorkerWithFunc
是一個相似Worker
的結構,只不過監聽的是函數的參數隊列,每接收到一個參數包,就直接調用PoolWithFunc
綁定好的任務函數poolFunc pf
任務函數執行任務,接下來的流程就和Worker
是一致的了,執行完任務後就把worker放回協程池,等待下次使用。
至於其餘邏輯如提交task
、獲取Worker
綁定任務等基本複用自Pool struct
,具體細節有細微差異,但原理一致,萬變不離其宗,有興趣的同窗能夠看我在github上的源碼:Goroutine Pool協程池 ants 。
Benchmarks
吹了這麼久的Goroutine Pool,那都是虛的,理論上池化能夠複用goroutine,提高性能節省內存,沒有benchmark數據以前,好像也不能服衆哈!因此,本章就來進行一次實測,驗證一下再大規模goroutine併發的場景下,Goroutine Pool的表現是否是真的比原生Goroutine併發更好!
測試機器參數:
OS : macOS High Sierra Processor : 2.7 GHz Intel Core i5 Memory : 8 GB 1867 MHz DDR3
Pool測試
測試結果:這裏爲了模擬大規模goroutine的場景,兩次測試的併發次數分別是100w和1000w,前兩個測試分別是執行100w個併發任務不使用Pool和使用了
ants
的Goroutine Pool的性能,後兩個則是1000w個任務下的表現,能夠直觀的看出在執行速度和內存使用上,ants
的Pool都佔有明顯的優點。100w的任務量,使用ants
,執行速度與原生goroutine至關甚至略快,但只實際使用了不到5w個goroutine完成了所有任務,且內存消耗僅爲原生併發的40%;而當任務量達到1000w,優點則更加明顯了:用了70w左右的goroutine完成所有任務,執行速度比原生goroutine提升了100%,且內存消耗依舊保持在不使用Pool的40%左右。
PoolWithFunc測試
測試結果:
- Benchmarkxxx-4 格式爲
基準測試函數名-GOMAXPROCS
,後面的-4表明測試函數運行時對應的CPU核數 - 1 表示執行的次數
- xx ns/op 表示每次的執行時間
- xx B/op 表示每次執行分配的總字節數(內存消耗)
- xx allocs/op 表示每次執行發生了多少次內存分配
由於PoolWithFunc
這個Pool只綁定一個任務函數,也即全部任務都是運行同一個函數,因此相較於Pool
對原生goroutine在執行速度和內存消耗的優點更大,上面的結果能夠看出,執行速度能夠達到原生goroutine的300%,而內存消耗的優點已經達到了兩位數的差距,原生goroutine的內存消耗達到了ants
的35倍且原生goroutine的每次執行的內存分配次數也達到了ants
45倍,1000w的任務量,ants
的初始分配容量是5w,所以它完成了全部的任務依舊只使用了5w個goroutine!事實上,ants
的Goroutine Pool的容量是能夠自定義的,也就是說使用者能夠根據不一樣場景對這個參數進行調優直至達到最高性能。
吞吐量測試
上面的benchmarks出來之後,我當時的心裏是這樣的:
可是太順利反而讓我疑惑,由於結合我過去這20幾年的坎坷人生來看,事情應該不會這麼美好纔對,果不其然,細細一想,雖然ants
Groutine Pool能在大規模併發下執行速度和內存消耗都對原生goroutine佔有明顯優點,但前面的測試demo相信你們注意到了,裏面使用了WaitGroup,也就是用來對goroutine同步的工具,因此上面的benchmarks中主進程會等待全部子goroutine完成任務後纔算完成一次性能測試,然而又有多少場景是單臺機器須要扛100w甚至1000w同步任務的?基本沒有啊!結果就是造出了屠龍刀,但是世界上沒有龍啊!也是無情...
彼時,我心裏變成了這樣:幸虧,
ants
在同步批量任務方面有點曲高和寡,可是若是是異步批量任務的場景下,就有用武之地了,也就是說,在大批量的任務無須同步等待完成的狀況下,能夠再測一下ants
和原生goroutine併發的性能對比,這個時候的性能對比也便是吞吐量對比了,就是在相同大規模數量的請求涌進來的時候,ants
和原生goroutine誰能用更快的速度、更少的內存『吞』完這些請求。
測試結果:
10w 吞吐量
100w 吞吐量
1000W 吞吐量
由於在個人電腦上測試1000w吞吐量的時候原生goroutine已經到了極限,所以程序直接把電腦拖垮了,沒法正常測試了,因此1000w吞吐的測試數據只有ants
Pool的。
從該demo測試吞吐性能對比能夠看出,使用ants
的吞吐性能相較於原生goroutine能夠保持在2~6倍的性能壓制,而內存消耗則能夠達到10~20倍的節省優點。
總結
至此,一個高性能的 Goroutine Pool 開發就完成了,事實上,原理不難理解,總結起來就是一個『複用』,具體落實到代碼細節就是鎖同步、原子操做、channel通訊等這些技巧的使用,ant
這整個項目沒有藉助任何第三方的庫,用golang的標準庫就完成了全部功能,由於自己golang的語言原生庫已經足夠優秀,不少時候開發golang項目的時候是能夠保持輕量且高性能的,未必事事須要藉助第三方庫。
關於ants
的價值,其實前文也說起過了,ants
在大規模的異步&同步批量任務處理都有着明顯的性能優點(特別是異步批量任務),而單機上百萬上千萬的同步批量任務處理現實意義不大,可是在異步批量任務處理方面有很大的應用價值,因此我我的以爲,Goroutine Pool真正的價值仍是在:
- 限制併發的goroutine數量;
- 複用goroutine,減輕runtime調度壓力,提高程序性能;
- 規避過多的goroutine侵佔系統資源(CPU&內存)。
後記
Go語言的三位最初的締造者 — Rob Pike、Robert Griesemer 和 Ken Thompson 中,Robert Griesemer 參與設計了Java的HotSpot虛擬機和Chrome瀏覽器的JavaScript V8引擎,Rob Pike 在大名鼎鼎的bell lab侵淫多年,參與了Plan9操做系統、C編譯器以及多種語言編譯器的設計和實現,Ken Thompson 更是圖靈獎得主、Unix之父、C語言之父。這三人在計算機史上但是元老級別的人物,特別是 Ken Thompson ,是一手締造了Unix和C語言計算機領域的上古大神,因此Go語言的設計哲學有着深深的Unix烙印:簡單、模塊化、正交、組合、pipe、功能短小且聚焦等;而令許多開發者青睞於Go的簡潔、高效編程模式的緣由,也正在於此。
本文從三大線程模型到Go併發調度器再到自定製的 Goroutine Pool,算是較爲完整的窺探了整個Go語言併發模型的前世此生,咱們也能夠看到,Go的設計固然不完美,好比一直被詬病的error處理模式、不支持泛型、差強人意的包管理以及面向對象模式的過分抽象化等等,實際上沒有任何一門編程語言敢說本身是完美的,仍是那句話,任何不考慮應用場景和語言定位的爭執都毫無心義,而Go的定位從出道開始就是系統編程語言&雲計算編程語言(這個有點模糊),而Go的做者們也一直堅持的是用最簡單抽象的工程化設計完成最複雜的功能,因此若是從這個層面去看Go的併發模型,就能夠看出其實除了G-P-M模型中引入的 P ,並無太多革新的原創理論,兩級線程模型是早已成熟的理論,搶佔式調度更不是什麼新鮮的調度模式,Go的偉大之處是在於它誕生之初就是依照Go在谷歌:以軟件工程爲目的的語言設計而設計的,Go其實就是將這些經典的理論和技術以一種優雅高效的工程化方式組合了起來,並用簡單抽象的API或語法糖開放給使用者,Go一直致力於找尋一個高性能&開發效率的共贏點,目前爲止,它作得遠不夠完美,但足夠優秀。另外Go經過引入channel與goroutine協同工做,將一種區別於鎖&原子操做的併發編程模式 — CSP 帶入了Go語言,對開發人員在併發編程模式上的思考有很大的啓發。
從本文中對Go調度器的分析以及ants
Goroutine Pool 的設計與實現過程,對Go的併發模型作了一次解構和優化思考,在ants
中的代碼實現對鎖同步、原子操做、channel通訊的使用也作了一次較爲全面的實踐,但願對Gopher們在Go語言併發模型與併發編程的理解上能有所裨益。
感謝閱讀。