Go-簡潔的併發

多核處理器愈來愈普及。有沒有一種簡單的辦法,可以讓咱們寫的軟件釋放多核的威力?是有的。隨着Golang, Erlang, Scala等爲併發設計的程序語言的興起,新的併發模式逐漸清晰。正如過程式編程和麪向對象同樣,一個好的編程模式有一個極其簡潔的內核,還有在此之上豐 富的外延。能夠解決現實世界中各類各樣的問題。本文以GO語言爲例,解釋其中內核、外延。程序員

併發模式以內核

這種併發模式的內核只須要協程通道就夠了。協程負責執行代碼,通道負責在協程之間傳遞事件。sql

Go-簡潔的併發

不久前,併發編程是個很是困難的事。要想編寫一個良好的併發程序,咱們不得不瞭解線程,鎖,semaphore,barrier甚至CPU更新高速緩存的 方式,並且他們個個都有怪脾氣,到處是陷阱。筆者除非萬不得以,決不會本身操做這些底層併發元素。一個簡潔的併發模式不須要這些複雜的底層元素,協程和通 道就夠了。數據庫

協程是輕量級的線程。在過程式編程中,當調用一個過程的時候,須要等待其執行完才返回。而調用一個協程的時候,不須要等待其執行完,會當即返回。協程十分 輕量,Go語言能夠在一個進程中執行有數以十萬計的協程,依舊保持高性能。而對於普通的平臺,一個進程有數千個線程,其CPU會忙於上下文切換,性能急劇 降低。隨意建立線程可不是一個好主意,可是咱們能夠大量使用的協程。編程

通道是協程之間的數據傳輸通道。通道能夠在衆多的協程之間傳遞數據,具體能夠值也能夠是個引用。通道有兩種使用方式。數組

  • 協程能夠試圖向通道放入數據,若是通道滿了,會掛起協程,直到通道能夠爲他放入數據爲止。
  • 協程能夠試圖向通道索取數據,若是通道沒有數據,會掛起協程,直到通道返回數據爲止。

如此,通道就能夠在傳遞數據的同時,控制協程的運行。有點像事件驅動,也有點像阻塞隊列。緩存

這兩個概念很是的簡單,各個語言平臺都會有相應的實現。在Java和C上也各有庫能夠實現二者。安全

  Golang Erlang Scala(Actor)
協程 goroutines process actor
消息隊列 channel mailbox channel

只要有協程和通道,就能夠優雅的解決併發的問題。沒必要使用其餘和併發有關的概念。那如何用這兩把利刃解決各式各樣的實際問題呢?網絡

併發模式以外延

協程相較於線程,能夠大量建立。打開這扇門,咱們拓展出新的用法,能夠作生成器,可讓函數返回「服務」,可讓循環併發執行,還能共享變量。可是出現新 的用法的同時,也帶來了新的棘手問題,協程也會泄漏,不恰當的使用會影響性能。下面會逐一介紹各類用法和問題。演示的代碼用GO語言寫成,由於其簡潔明 了,並且支持所有功能。數據結構

生成器

有的時候,咱們須要有一個函數能不斷生成數據。比方說這個函數能夠讀文件,讀網絡,生成自增加序列,生成隨機數。這些行爲的特色就是,函數的已知一些變量,如文件路徑。而後不斷調用,返回新的數據。多線程

Go-簡潔的併發

下面生成隨機數爲例, 以讓咱們作一個會併發執行的隨機數生成器。

非併發的作法是這樣的:

// 函數 rand_generator_1 ,返回 int
func rand_generator_1() int {
	return rand.Int()
}

上面是一個函數,返回一個int。假如rand.Int()這個函數調用須要很長時間等待,那該函數的調用者也會所以而掛起。因此咱們能夠建立一個協程,專門執行rand.Int()。

// 函數 rand_generator_2,返回 通道(Channel)
func rand_generator_2() chan int {
	// 建立通道
	out := make(chan int)
	// 建立協程
	go func() {
		for {
			//向通道內寫入數據,若是無人讀取會等待
			out <- rand.Int()
		}
	}()
	return out
}

func main() {
	// 生成隨機數做爲一個服務
	rand_service_handler := rand_generator_2()
	// 從服務中讀取隨機數並打印
	fmt.Printf("%d
", <-rand_service_handler)
}

上面的這段函數就能夠併發執行了rand.Int()。有一點值得注意到函數的返回能夠理解爲一個「服務」。但咱們須要獲取隨機數據 時候,能夠隨時向這個服務取用,他已經爲咱們準備好了相應的數據,無需等待,隨要隨到。若是咱們調用這個服務不是很頻繁,一個協程足夠知足咱們的需求了。 但若是咱們須要大量訪問,怎麼辦?咱們能夠用下面介紹的多路複用技術,啓動若干生成器,再將其整合成一個大的服務。

調用生成器,能夠返回一個「服務」。能夠用在持續獲取數據的場合。用途很普遍,讀取數據,生成ID,甚至定時器。這是一種很是簡潔的思路,將程序併發化。

多路複用

多路複用是讓一次處理多個隊列的技術。Apache使用處理每一個鏈接都須要一個進程,因此其併發性能不是很好。而Nighx使用多路複用的技術,讓一個進 程處理多個鏈接,因此併發性能比較好。一樣,在協程的場合,多路複用也是須要的,但又有所不一樣。多路複用能夠將若干個類似的小服務整合成一個大服務。

Go-簡潔的併發

那麼讓咱們用多路複用技術作一個更高併發的隨機數生成器吧。

// 函數 rand_generator_3 ,返回通道(Channel)
func rand_generator_3() chan int {
	// 建立兩個隨機數生成器服務
	rand_generator_1 := rand_generator_2()
	rand_generator_2 := rand_generator_2()

	//建立通道
	out := make(chan int)

	//建立協程
	go func() {
		for {
			//讀取生成器1中的數據,整合
			out <- <-rand_generator_1
		}
	}()
	go func() {
		for {
			//讀取生成器2中的數據,整合
			out <- <-rand_generator_2
		}
	}()
	return out
}

上面是使用了多路複用技術的高併發版的隨機數生成器。經過整合兩個隨機數生成器,這個版本的能力是剛纔的兩倍。雖然協程能夠大量建立,可是衆多協程仍是會 爭搶輸出的通道。Go語言提供了Select關鍵字來解決,各家也有各家竅門。加大輸出通道的緩衝大小是個通用的解決方法。

多路複用技術能夠用來整合多個通道。提高性能和操做的便捷。配合其餘的模式使用有很大的威力。

Furture技術

Furture是一個頗有用的技術,咱們經常使用Furture來操做線程。咱們能夠在使用線程的時候,能夠建立一個線程,返回Furture,以後能夠經過它等待結果。 可是在協程環境下的Furtue能夠更加完全,輸入參數一樣能夠是Furture的。

Go-簡潔的併發

調用一個函數的時候,每每是參數已經準備好了。調用協程的時候也一樣如此。可是若是咱們將傳入的參數設爲通道,這樣咱們就能夠在不許備好參數的狀況下調用 函數。這樣的設計能夠提供很大的自由度和併發度。函數調用和函數參數準備這兩個過程能夠徹底解耦。下面舉一個用該技術訪問數據庫的例子。

//一個查詢結構體
type query struct {
	//參數Channel
	sql chan string
	//結果Channel
	result chan string
}

//執行Query
func execQuery(q query) {
	//啓動協程
	go func() {
		//獲取輸入
		sql := <-q.sql
		//訪問數據庫,輸出結果通道
		q.result <- "get " + sql
	}()

}

func main() {
	//初始化Query
	q :=
		query{make(chan string, 1), make(chan string, 1)}
	//執行Query,注意執行的時候無需準備參數
	execQuery(q)

	//準備參數
	q.sql <- "select * from table"
	//獲取結果
	fmt.Println(<-q.result)
}

上面利用Furture技術,不單讓結果在Furture得到,參數也是在Furture獲取。準備好參數後,自動執行。Furture和生成器的區別在 於,Furture返回一個結果,而生成器能夠重複調用。還有一個值得注意的地方,就是將參數Channel和結果Channel定義在一個結構體裏面做 爲參數,而不是返回結果Channel。這樣作能夠增長聚合度,好處就是能夠和多路複用技術結合起來使用。

Furture技術能夠和各個其餘技術組合起來用。能夠經過多路複用技術,監聽多個結果Channel,當有結果後,自動返回。也能夠和生成器組合使用, 生成器不斷生產數據,Furture技術逐個處理數據。Furture技術自身還能夠首尾相連,造成一個併發的pipe filter。這個pipe filter能夠用於讀寫數據流,操做數據流。

Future是一個很是強大的技術手段。能夠在調用的時候不關心數據是否準備好,返回值是否計算好的問題。讓程序中的組件在準備好數據的時候自動跑起來。

併發循環

循環每每是性能上的熱點。若是性能瓶頸出如今CPU上的話,那麼九成可能性熱點是在一個循環體內部。因此若是能讓循環體併發執行,那麼性能就會提升不少。

Go-簡潔的併發

要併發循環很簡單,只有在每一個循環體內部啓動協程。協程做爲循環體能夠併發執行。調用啓動前設置一個計數器,每個循環體執行完畢就在計數器上加一個元素,調用完成後經過監聽計數器等待循環協程所有完成。

//創建計數器
sem := make(chan int, N); 
//FOR循環體
for i,xi := range data {
	//創建協程
    go func (i int, xi float) {
        doSomething(i,xi);
		//計數
        sem <- 0;
    } (i, xi);
}
// 等待循環結束
for i := 0; i < N; ++i { <-sem }

上面是一個併發循環例子。經過計數器來等待循環所有完成。若是結合上面提到的Future技術的話,則沒必要等待。能夠等到真正須要的結果的地方,再去檢查數據是否完成。

經過併發循環能夠提供性能,利用多核,解決CPU熱點。正由於協程能夠大量建立,才能在循環體中如此使用,若是是使用線程的話,就須要引入線程池之類的東西,防止建立過多線程,而協程則簡單的多。

Chain Filter技術

前面提到了Future技術首尾相連,能夠造成一個併發的pipe filter。這種方式能夠作不少事情,若是每一個Filter都由同一個函數組成,還能夠有一種簡單的辦法把他們連起來。

Go-簡潔的併發

因爲每一個Filter協程均可以併發運行,這樣的結構很是有利於多核環境。下面是一個例子,用這種模式來產生素數。

// A concurrent prime sieve

package main

// Send the sequence 2, 3, 4, ... to channel 'ch'.
func Generate(ch chan<- int) {
	for i := 2; ; i++ {
		ch <- i // Send 'i' to channel 'ch'.
	}
}

// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func Filter(in <-chan int, out chan<- int, prime int) {
	for {
		i := <-in // Receive value from 'in'.
		if i%prime != 0 {
			out <- i // Send 'i' to 'out'.
		}
	}
}

// The prime sieve: Daisy-chain Filter processes.
func main() {
	ch := make(chan int) // Create a new channel.
	go Generate(ch)      // Launch Generate goroutine.
	for i := 0; i < 10; i++ {
		prime := <-ch
		print(prime, "
")
		ch1 := make(chan int)
		go Filter(ch, ch1, prime)
		ch = ch1
	}
}

上面的程序建立了10個Filter,每一個分別過濾一個素數,因此能夠輸出前10個素數。

Chain-Filter經過簡單的代碼建立併發的過濾器鏈。這種辦法還有一個好處,就是每一個通道只有兩個協程會訪問,就不會有激烈的競爭,性能會比較好。

共享變量

 

 

協程之間的通訊只可以經過通道。可是咱們習慣於共享變量,並且不少時候使用共享變量能讓代碼更簡潔。好比一個Server有兩個狀態開和關。其餘僅僅但願獲取或改變其狀態,那又該如何作呢。能夠將這個變量至於0通道中,並使用一個協程來維護。

Go-簡潔的併發

下面的例子描述如何用這個方式,實現一個共享變量。

//共享變量有一個讀通道和一個寫通道組成
type sharded_var struct {
	reader chan int
	writer chan int
}

//共享變量維護協程
func sharded_var_whachdog(v sharded_var) {
	go func() {
		//初始值
		var value int = 0
		for {
			//監聽讀寫通道,完成服務
			select {
			case value = <-v.writer:
			case v.reader <- value:
			}
		}
	}()
}

func main() {
	//初始化,並開始維護協程
	v := sharded_var{make(chan int), make(chan int)}
	sharded_var_whachdog(v)

	//讀取初始值
	fmt.Println(<-v.reader)
	//寫入一個值
	v.writer <- 1
	//讀取新寫入的值
	fmt.Println(<-v.reader)
}

這樣,就能夠在協程和通道的基礎上實現一個協程安全的共享變量了。定義一個寫通道,須要更新變量的時候,往裏寫新的值。再定義一個讀通道,須要讀的時候,從裏面讀。經過一個單獨的協程來維護這兩個通道。保證數據的一致性。

通常來講,協程之間不推薦使用共享變量來交互,可是按照這個辦法,在一些場合,使用共享變量也是可取的。不少平臺上有較爲原生的共享變量支持,到底用那種實現比較好,就見仁見智了。另外利用協程和通道,能夠還實現各類常見的併發數據結構,如鎖等等,就不一一贅述。

協程泄漏

協程和內存同樣,是系統的資源。對於內存,有自動垃圾回收。可是對於協程,沒有相應的回收機制。會不會若干年後,協程普及了,協程泄漏和內存泄漏同樣成爲 程序員永遠的痛呢?通常而言,協程執行結束後就會銷燬。協程也會佔用內存,若是發生協程泄漏,影響和內存泄漏同樣嚴重。輕則拖慢程序,重則壓垮機器。

C和C++都是沒有自動內存回收的程序設計語言,但只要有良好的編程習慣,就能解決規避問題。對於協程是同樣的,只要有好習慣就能夠了。

只有兩種狀況會致使協程沒法結束。一種狀況是協程想從一個通道讀數據,但無人往這個通道寫入數據,或許這個通道已經被遺忘了。還有一種狀況是程想往一個通道寫數據,但是因爲無人監聽這個通道,該協程將永遠沒法向下執行。下面分別討論如何避免這兩種狀況。

對於協程想從一個通道讀數據,但無人往這個通道寫入數據這種狀況。解決的辦法很簡單,加入超時機制。對於有不肯定會不會返回的狀況,必須加入超時,避免出 現永久等待。另外不必定要使用定時器才能終止協程。也能夠對外暴露一個退出提醒通道。任何其餘協程均可以經過該通道來提醒這個協程終止。

Go-簡潔的併發

對於協程想往一個通道寫數據,但通道阻塞沒法寫入這種狀況。解決的辦法也很簡單,就是給通道加緩衝。但前提是這個通道只會接收到固定數目的寫入。比方說, 已知一個通道最多隻會接收N次數據,那麼就將這個通道的緩衝設置爲N。那麼該通道將永遠不會堵塞,協程天然也不會泄漏。也能夠將其緩衝設置爲無限,不過這 樣就要承擔內存泄漏的風險了。等協程執行完畢後,這部分通道內存將會失去引用,會被自動垃圾回收掉。

func never_leak(ch chan int) {
	//初始化timeout,緩衝爲1
	timeout := make(chan bool, 1)
	//啓動timeout協程,因爲緩存爲1,不可能泄露
	go func() {
		time.Sleep(1 * time.Second)
		timeout <- true
	}()
	//監聽通道,因爲設有超時,不可能泄露
	select {
	case <-ch:
		// a read from ch has occurred
	case <-timeout:
		// the read from ch has timed out
	}
}

上面是個避免泄漏例子。使用超時避免讀堵塞,使用緩衝避免寫堵塞。

和內存裏面的對象同樣,對於長期存在的協程,咱們不用擔憂泄漏問題。一是長期存在,二是數量較少。要警戒的只有那些被臨時建立的協程,這些協程數量大且生 命週期短,每每是在循環中建立的,要應用前面提到的辦法,避免泄漏發生。協程也是把雙刃劍,若是出問題,不但沒能提升程序性能,反而會讓程序崩潰。但就像 內存同樣,一樣有泄漏的風險,但越用越溜了。

併發模式之實現

在併發編程大行其道的今天,對協程和通道的支持成爲各個平臺比不可少的一部分。雖然各家有各家的叫法,但都能知足協程的基本要求—併發執行和可大量建立。筆者對他們的實現方式總結了一下。

下面列舉一些已經支持協程的常見的語言和平臺。

語言/平臺 實現時間 協程名稱 備註
GoLang 原生支持 goroutines  
Erlang 原生支持 process 函數式語言
Scala 原生支持 actor 函數式編程
Python 2.5版本後 coroutine 官方Python不徹底實現
Stackless Python支持
Perl 6.0版本後 coroutine  
Ruby 1.9 版本後 fiber  
Lua 原生支持 coroutine  
C# .net 2.0版本後 fiber  

GoLang 和Scala做爲最新的語言,一出生就有完善的基於協程併發功能。Erlang最爲老資格的併發編程語言,返老還童。其餘二線語言則幾乎所有在新的版本中加入了協程。

使人驚奇的是C/C++和Java這三個世界上最主流的平臺沒有在對協程提供語言級別的原生支持。他們都揹負着厚重的歷史,沒法改變,也無需改變。但他們還有其餘的辦法使用協程。

Java平臺有不少方法實現協程:

  • 修改虛擬機:對JVM打補丁來實現協程,這樣的實現效果好,可是失去了跨平臺的好處
  • 修改字節碼:在編譯完成後加強字節碼,或者使用新的JVM語言。稍稍增長了編譯的難度。
  • 使用JNI:在Jar包中使用JNI,這樣易於使用,可是不能跨平臺。
  • 使用線程模擬協程:使協程重量級,徹底依賴JVM的線程實現。

其中修改字節碼的方式比較常見。由於這樣的實現辦法,能夠平衡性能和移植性。最具表明性的JVM語言Scala就能很好的支持協程併發。流行的Java Actor模型類庫akka也是用修改字節碼的方式實現的協程。

對於C語言,協程和線程同樣。可使用各類各樣的系統調用來實現。協程做爲一個比較高級的概念,實現方式實在太多,就不討論了。比較主流的實現有libpcl, coro,lthread等等。

對於C++,有Boost實現,還有一些其餘開源庫。還有一門名爲μC++語言,在C++基礎上提供了併發擴展。

可見這種編程模型在衆多的語言平臺中已經獲得了普遍的支持,再也不小衆。若是想使用的話,隨時能夠加到本身的工具箱中。

結語

本文探討了一個極其簡潔的併發模型。在只有協程和通道這兩個基本元件的狀況下。能夠提供豐富的功能,解決形形色色實際問題。並且這個模型已經被普遍的實 現,成爲潮流。相信這種併發模型的功能遠遠不及此,必定也會有更多更簡潔的用法出現。或許將來CPU核心數目將和人腦神經元數目同樣多,到那個時候,咱們 又要從新思考併發模型了。 

相關文章
相關標籤/搜索