Golang常見的併發模式

Go語言最吸引人的地方是它內建的併發支持。Go語言併發體系的理論是C.A.R Hoare在1978年提出的CSP(Communicating Sequential Process,通信順序進程)。CSP有着精確的數學模型,並實際應用在了Hoare參與設計的T9000通用計算機上。從NewSqueak、Alef、Limbo到如今的Go語言,對於對CSP有着20多年實戰經驗的Rob Pike來講,他更關注的是將CSP應用在通用編程語言上產生的潛力。做爲Go併發編程核心的CSP理論的核心概念只有一個:同步通訊。關於同步通訊的話題咱們在前面一節已經講過,本節咱們將簡單介紹下Go語言中常見的併發模式。程序員

首先要明確一個概念:併發不是並行。併發更關注的是程序的設計層面,併發的程序徹底是能夠順序執行的,只有在真正的多核CPU上纔可能真正地同時運行。並行更關注的是程序的運行層面,並行通常是簡單的大量重複,例如GPU中對圖像處理都會有大量的並行運算。爲更好的編寫併發程序,從設計之初Go語言就注重如何在編程語言層級上設計一個簡潔安全高效的抽象模型,讓程序員專一於分解問題和組合方案,並且不用被線程管理和信號互斥這些繁瑣的操做分散精力。golang

在併發編程中,對共享資源的正確訪問須要精確的控制,在目前的絕大多數語言中,都是經過加鎖等線程同步方案來解決這一困難問題,而Go語言卻另闢蹊徑,它將共享的值經過Channel傳遞(實際上多個獨立執行的線程不多主動共享資源)。在任意給定的時刻,最好只有一個Goroutine可以擁有該資源。數據競爭從設計層面上就被杜絕了。爲了提倡這種思考方式,Go語言將其併發編程哲學化爲一句口號:算法

Do not communicate by sharing memory; instead, share memory by communicating.編程

不要經過共享內存來通訊,而應經過通訊來共享內存。緩存

這是更高層次的併發編程哲學(經過管道來傳值是Go語言推薦的作法)。雖然像引用計數這類簡單的併發問題經過原子操做或互斥鎖就能很好地實現,可是經過Channel來控制訪問可以讓你寫出更簡潔正確的程序。安全

併發版本的Hello world

咱們先以在一個新的Goroutine中輸出「Hello world」,main等待後臺線程輸出工做完成以後退出,這樣一個簡單的併發程序做爲熱身。網絡

併發編程的核心概念是同步通訊,可是同步的方式卻有多種。咱們先以你們熟悉的互斥量sync.Mutex來實現同步通訊。根據文檔,咱們不能直接對一個未加鎖狀態的sync.Mutex進行解鎖,這會致使運行時異常。下面這種方式並不能保證正常工做:多線程

func main() {
	var mu sync.Mutex

	go func(){
		fmt.Println("你好, 世界")
		mu.Lock()
	}()

	mu.Unlock()
}
複製代碼

由於mu.Lock()mu.Unlock()並不在同一個Goroutine中,因此也就不知足順序一致性內存模型。同時它們也沒有其它的同步事件能夠參考,這兩個事件不可排序也就是能夠併發的。由於多是併發的事件,因此main函數中的mu.Unlock()頗有可能先發生,而這個時刻mu互斥對象還處於未加鎖的狀態,從而會致使運行時異常。併發

下面是修復後的代碼:異步

func main() {
	var mu sync.Mutex

	mu.Lock()
	go func(){
		fmt.Println("你好, 世界")
		mu.Unlock()
	}()

	mu.Lock()
}
複製代碼

修復的方式是在main函數所在線程中執行兩次mu.Lock(),當第二次加鎖時會由於鎖已經被佔用(不是遞歸鎖)而阻塞,main函數的阻塞狀態驅動後臺線程繼續向前執行。當後臺線程執行到mu.Unlock()時解鎖,此時打印工做已經完成了,解鎖會致使main函數中的第二個mu.Lock()阻塞狀態取消,此時後臺線程和主線程再沒有其它的同步事件參考,它們退出的事件將是併發的:在main函數退出致使程序退出時,後臺線程可能已經退出了,也可能沒有退出。雖然沒法肯定兩個線程退出的時間,可是打印工做是能夠正確完成的。

使用sync.Mutex互斥鎖同步是比較低級的作法。咱們如今改用無緩存的管道來實現同步:

func main() {
	done := make(chan int)

	go func(){
		fmt.Println("你好, 世界")
		<-done
	}()

	done <- 1
}
複製代碼

根據Go語言內存模型規範,對於從無緩衝Channel進行的接收,發生在對該Channel進行的發送完成以前。所以,後臺線程<-done接收操做完成以後,main線程的done <- 1發送操做纔可能完成(從而退出main、退出程序),而此時打印工做已經完成了。

上面的代碼雖然能夠正確同步,可是對管道的緩存大小太敏感:若是管道有緩存的話,就沒法保證main退出以前後臺線程能正常打印了。更好的作法是將管道的發送和接收方向調換一下,這樣能夠避免同步事件受管道緩存大小的影響:

func main() {
	done := make(chan int, 1) // 帶緩存的管道

	go func(){
		fmt.Println("你好, 世界")
		done <- 1
	}()

	<-done
}
複製代碼

對於帶緩衝的Channel,對於Channel的第K個接收完成操做發生在第K+C個發送操做完成以前,其中C是Channel的緩存大小。雖然管道是帶緩存的,main線程接收完成是在後臺線程發送開始但還未完成的時刻,此時打印工做也是已經完成的。

基於帶緩存的管道,咱們能夠很容易將打印線程擴展到N個。下面的例子是開啓10個後臺線程分別打印:

func main() {
	done := make(chan int, 10) // 帶 10 個緩存

	// 開N個後臺打印線程
	for i := 0; i < cap(done); i++ {
		go func(){
			fmt.Println("你好, 世界")
			done <- 1
		}()
	}

	// 等待N個後臺線程完成
	for i := 0; i < cap(done); i++ {
		<-done
	}
}
複製代碼

對於這種要等待N個線程完成後再進行下一步的同步操做有一個簡單的作法,就是使用sync.WaitGroup來等待一組事件:

func main() {
	var wg sync.WaitGroup

	// 開N個後臺打印線程
	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func() {
			fmt.Println("你好, 世界")
			wg.Done()
		}()
	}

	// 等待N個後臺線程完成
	wg.Wait()
}
複製代碼

其中wg.Add(1)用於增長等待事件的個數,必須確保在後臺線程啓動以前執行(若是放到後臺線程之中執行則不能保證被正常執行到)。當後臺線程完成打印工做以後,調用wg.Done()表示完成一個事件。main函數的wg.Wait()是等待所有的事件完成。

生產者消費者模型

併發編程中最多見的例子就是生產者消費者模式,該模式主要經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。簡單地說,就是生產者生產一些數據,而後放到成果隊列中,同時消費者從成果隊列中來取這些數據。這樣就讓生產消費變成了異步的兩個過程。當成果隊列中沒有數據時,消費者就進入飢餓的等待中;而當成果隊列中數據已滿時,生產者則面臨因產品擠壓致使CPU被剝奪的下崗問題。

Go語言實現生產者消費者併發很簡單:

// 生產者: 生成 factor 整數倍的序列
func Producer(factor int, out chan<- int) {
	for i := 0; ; i++ {
		out <- i*factor
	}
}

// 消費者
func Consumer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}
func main() {
	ch := make(chan int, 64) // 成果隊列

	go Producer(3, ch) // 生成 3 的倍數的序列
	go Producer(5, ch) // 生成 5 的倍數的序列
	go Consumer(ch)    // 消費 生成的隊列

	// 運行必定時間後退出
	time.Sleep(5 * time.Second)
}
複製代碼

咱們開啓了2個Producer生產流水線,分別用於生成3和5的倍數的序列。而後開啓1個Consumer消費者線程,打印獲取的結果。咱們經過在main函數休眠必定的時間來讓生產者和消費者工做必定時間。正如前面一節說的,這種靠休眠方式是沒法保證穩定的輸出結果的。

咱們可讓main函數保存阻塞狀態不退出,只有當用戶輸入Ctrl-C時才真正退出程序:

func main() {
	ch := make(chan int, 64) // 成果隊列

	go Producer(3, ch) // 生成 3 的倍數的序列
	go Producer(5, ch) // 生成 5 的倍數的序列
	go Consumer(ch)    // 消費 生成的隊列

	// Ctrl+C 退出
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	fmt.Printf("quit (%v)\n", <-sig)
}
複製代碼

咱們這個例子中有2個生產者,而且2個生產者之間並沒有同步事件可參考,它們是併發的。所以,消費者輸出的結果序列的順序是不肯定的,這並無問題,生產者和消費者依然能夠相互配合工做。

發佈訂閱模型

發佈訂閱(publish-and-subscribe)模型一般被簡寫爲pub/sub模型。在這個模型中,消息生產者成爲發佈者(publisher),而消息消費者則成爲訂閱者(subscriber),生產者和消費者是M:N的關係。在傳統生產者和消費者模型中,是將消息發送到一個隊列中,而發佈訂閱模型則是將消息發佈給一個主題。

爲此,咱們構建了一個名爲pubsub的發佈訂閱模型支持包:

// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub

import (
	"sync"
	"time"
)

type (
	subscriber chan interface{}         // 訂閱者爲一個管道
	topicFunc  func(v interface{}) bool // 主題爲一個過濾器 ) // 發佈者對象 type Publisher struct {
	m           sync.RWMutex             // 讀寫鎖
	buffer      int                      // 訂閱隊列的緩存大小
	timeout     time.Duration            // 發佈超時時間
	subscribers map[subscriber]topicFunc // 訂閱者信息
}

// 構建一個發佈者對象, 能夠設置發佈超時時間和緩存隊列的長度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

// 添加一個新的訂閱者,訂閱所有主題
func (p *Publisher) Subscribe() chan interface{} {
	return p.SubscribeTopic(nil)
}

// 添加一個新的訂閱者,訂閱過濾器篩選後的主題
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, p.buffer)
	p.m.Lock()
	p.subscribers[ch] = topic
	p.m.Unlock()
	return ch
}

// 退出訂閱
func (p *Publisher) Evict(sub chan interface{}) {
	p.m.Lock()
	defer p.m.Unlock()

	delete(p.subscribers, sub)
	close(sub)
}

// 發佈一個主題
func (p *Publisher) Publish(v interface{}) {
	p.m.RLock()
	defer p.m.RUnlock()

	var wg sync.WaitGroup
	for sub, topic := range p.subscribers {
		wg.Add(1)
		go p.sendTopic(sub, topic, v, &wg)
	}
	wg.Wait()
}

// 關閉發佈者對象,同時關閉全部的訂閱者管道。
func (p *Publisher) Close() {
	p.m.Lock()
	defer p.m.Unlock()

	for sub := range p.subscribers {
		delete(p.subscribers, sub)
		close(sub)
	}
}

// 發送主題,能夠容忍必定的超時
func (p *Publisher) sendTopic( sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup, ) {
	defer wg.Done()
	if topic != nil && !topic(v) {
		return
	}

	select {
	case sub <- v:
	case <-time.After(p.timeout):
	}
}
複製代碼

下面的例子中,有兩個訂閱者分別訂閱了所有主題和含有"golang"的主題:

import "path/to/pubsub"

func main() {
	p := pubsub.NewPublisher(100*time.Millisecond, 10)
	defer p.Close()

	all := p.Subscribe()
	golang := p.SubscribeTopic(func(v interface{}) bool {
		if s, ok := v.(string); ok {
			return strings.Contains(s, "golang")
		}
		return false
	})

	p.Publish("hello, world!")
	p.Publish("hello, golang!")

	go func() {
		for  msg := range all {
			fmt.Println("all:", msg)
		}
	} ()

	go func() {
		for  msg := range golang {
			fmt.Println("golang:", msg)
		}
	} ()

	// 運行必定時間後退出
	time.Sleep(3 * time.Second)
}
複製代碼

在發佈訂閱模型中,每條消息都會傳送給多個訂閱者。發佈者一般不會知道、也不關心哪個訂閱者正在接收主題消息。訂閱者和發佈者能夠在運行時動態添加,是一種鬆散的耦合關係,這使得系統的複雜性能夠隨時間的推移而增加。在現實生活中,像天氣預報之類的應用就能夠應用這個併發模式。

控制併發數

不少用戶在適應了Go語言強大的併發特性以後,都傾向於編寫最大併發的程序,由於這樣彷佛能夠提供最大的性能。在現實中咱們行色匆匆,但有時卻須要咱們放慢腳步享受生活,併發的程序也是同樣:有時候咱們須要適當地控制併發的程度,由於這樣不只僅可給其它的應用/任務讓出/預留必定的CPU資源,也能夠適當下降功耗緩解電池的壓力。

在Go語言自帶的godoc程序實現中有一個vfs的包對應虛擬的文件系統,在vfs包下面有一個gatefs的子包,gatefs子包的目的就是爲了控制訪問該虛擬文件系統的最大併發數。gatefs包的應用很簡單:

import (
	"golang.org/x/tools/godoc/vfs"
	"golang.org/x/tools/godoc/vfs/gatefs"
)

func main() {
	fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
	// ...
}
複製代碼

其中vfs.OS("/path")基於本地文件系統構造一個虛擬的文件系統,而後gatefs.New基於現有的虛擬文件系統構造一個併發受控的虛擬文件系統。併發數控制的原理在前面一節已經講過,就是經過帶緩存管道的發送和接收規則來實現最大併發阻塞:

var limit = make(chan int, 3)

func main() {
	for _, w := range work {
		go func() {
			limit <- 1
			w()
			<-limit
		}()
	}
	select{}
}
複製代碼

不過gatefs對此作一個抽象類型gate,增長了enterleave方法分別對應併發代碼的進入和離開。當超出併發數目限制的時候,enter方法會阻塞直到併發數降下來爲止。

type gate chan bool

func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }
複製代碼

gatefs包裝的新的虛擬文件系統就是將須要控制併發的方法增長了enterleave調用而已:

type gatefs struct {
	fs vfs.FileSystem
	gate
}

func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
	fs.enter()
	defer fs.leave()
	return fs.fs.Lstat(p)
}
複製代碼

咱們不只能夠控制最大的併發數目,並且能夠經過帶緩存Channel的使用量和最大容量比例來判斷程序運行的併發率。當管道爲空的時候能夠認爲是空閒狀態,當管道滿了時任務是繁忙狀態,這對於後臺一些低級任務的運行是有參考價值的。

贏者爲王

採用併發編程的動機有不少:併發編程能夠簡化問題,好比一類問題對應一個處理線程會更簡單;併發編程還能夠提高性能,在一個多核CPU上開2個線程通常會比開1個線程快一些。其實對於提高性能而言,程序並非簡單地運行速度快就表示用戶體驗好的;不少時候程序能快速響應用戶請求才是最重要的,當沒有用戶請求須要處理的時候才合適處理一些低優先級的後臺任務。

假設咱們想快速地搜索「golang」相關的主題,咱們可能會同時打開Bing、Google或百度等多個檢索引擎。當某個搜索最早返回結果後,就能夠關閉其它搜索頁面了。由於受網絡環境和搜索引擎算法的影響,某些搜索引擎可能很快返回搜索結果,某些搜索引擎也可能等到他們公司倒閉也沒有完成搜索。咱們能夠採用相似的策略來編寫這個程序:

func main() {
	ch := make(chan string, 32)

	go func() {
		ch <- searchByBing("golang")
	}()
	go func() {
		ch <- searchByGoogle("golang")
	}()
	go func() {
		ch <- searchByBaidu("golang")
	}()

	fmt.Println(<-ch)
}
複製代碼

首先,咱們建立了一個帶緩存的管道,管道的緩存數目要足夠大,保證不會由於緩存的容量引發沒必要要的阻塞。而後咱們開啓了多個後臺線程,分別向不一樣的搜索引擎提交搜索請求。當任意一個搜索引擎最早有結果以後,都會立刻將結果發到管道中(由於管道帶了足夠的緩存,這個過程不會阻塞)。可是最終咱們只從管道取第一個結果,也就是最早返回的結果。

經過適當開啓一些冗餘的線程,嘗試用不一樣途徑去解決一樣的問題,最終以贏者爲王的方式提高了程序的相應性能。

素數篩

在「Hello world 的革命」一節中,咱們爲了演示Newsqueak的併發特性,文中給出了併發版本素數篩的實現。併發版本的素數篩是一個經典的併發例子,經過它咱們能夠更深入地理解Go語言的併發特性。「素數篩」的原理如圖:

[圖片上傳失敗...(image-b304d-1554179460547)]

圖 1-13 素數篩

咱們須要先生成最初的2, 3, 4, ...天然數序列(不包含開頭的0、1):

// 返回生成天然數序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
	ch := make(chan int)
	go func() {
		for i := 2; ; i++ {
			ch <- i
		}
	}()
	return ch
}
複製代碼

GenerateNatural函數內部啓動一個Goroutine生產序列,返回對應的管道。

而後是爲每一個素數構造一個篩子:將輸入序列中是素數倍數的數提出,並返回新的序列,是一個新的管道。

// 管道過濾器: 刪除能被素數整除的數
func PrimeFilter(in <-chan int, prime int) chan int {
	out := make(chan int)
	go func() {
		for {
			if i := <-in; i%prime != 0 {
				out <- i
			}
		}
	}()
	return out
}
複製代碼

PrimeFilter函數也是內部啓動一個Goroutine生產序列,返回過濾後序列對應的管道。

如今咱們能夠在main函數中驅動這個併發的素數篩了:

func main() {
	ch := GenerateNatural() // 天然數序列: 2, 3, 4, ...
	for i := 0; i < 100; i++ {
		prime := <-ch // 新出現的素數
		fmt.Printf("%v: %v\n", i+1, prime)
		ch = PrimeFilter(ch, prime) // 基於新素數構造的過濾器
	}
}
複製代碼

咱們先是調用GenerateNatural()生成最原始的從2開始的天然數序列。而後開始一個100次迭代的循環,但願生成100個素數。在每次循環迭代開始的時候,管道中的第一個數一定是素數,咱們先讀取並打印這個素數。而後基於管道中剩餘的數列,並以當前取出的素數爲篩子過濾後面的素數。不一樣的素數篩子對應的管道是串聯在一塊兒的。

素數篩展現了一種優雅的併發程序結構。可是由於每一個併發體處理的任務粒度太細微,程序總體的性能並不理想。對於細粒度的併發程序,CSP模型中固有的消息傳遞的代價過高了(多線程併發模型一樣要面臨線程啓動的代價)。

併發的安全退出

有時候咱們須要通知goroutine中止它正在乾的事情,特別是當它工做在錯誤的方向上的時候。Go語言並無提供在一個直接終止Goroutine的方法,因爲這樣會致使goroutine之間的共享變量處在未定義的狀態上。可是若是咱們想要退出兩個或者任意多個Goroutine怎麼辦呢?

Go語言中不一樣Goroutine之間主要依靠管道進行通訊和同步。要同時處理多個管道的發送或接收操做,咱們須要使用select關鍵字(這個關鍵字和網絡編程中的select函數的行爲相似)。當select有多個分支時,會隨機選擇一個可用的管道分支,若是沒有可用的管道分支則選擇default分支,不然會一直保存阻塞狀態。

基於select實現的管道的超時判斷:

select {
case v := <-in:
	fmt.Println(v)
case <-time.After(time.Second):
	return // 超時
}
複製代碼

經過selectdefault分支實現非阻塞的管道發送或接收操做:

select {
case v := <-in:
	fmt.Println(v)
default:
	// 沒有數據
}
複製代碼

經過select來阻止main函數退出:

func main() {
	// do some thins
	select{}
}
複製代碼

當有多個管道都可操做時,select會隨機選擇一個管道。基於該特性咱們能夠用select實現一個生成隨機數序列的程序:

func main() {
	ch := make(chan int)
	go func() {
		for {
			select {
			case ch <- 0:
			case ch <- 1:
			}
		}
	}()

	for v := range ch {
		fmt.Println(v)
	}
}
複製代碼

咱們經過selectdefault分支能夠很容易實現一個Goroutine的退出控制:

func worker(cannel chan bool) {
	for {
		select {
		default:
			fmt.Println("hello")
			// 正常工做
		case <-cannel:
			// 退出
		}
	}
}

func main() {
	cannel := make(chan bool)
	go worker(cannel)

	time.Sleep(time.Second)
	cannel <- true
}
複製代碼

可是管道的發送操做和接收操做是一一對應的,若是要中止多個Goroutine那麼可能須要建立一樣數量的管道,這個代價太大了。其實咱們能夠經過close關閉一個管道來實現廣播的效果,全部從關閉管道接收的操做均會收到一個零值和一個可選的失敗標誌。

func worker(cannel chan bool) {
	for {
		select {
		default:
			fmt.Println("hello")
			// 正常工做
		case <-cannel:
			// 退出
		}
	}
}

func main() {
	cancel := make(chan bool)

	for i := 0; i < 10; i++ {
		go worker(cancel)
	}

	time.Sleep(time.Second)
	close(cancel)
}
複製代碼

咱們經過close來關閉cancel管道向多個Goroutine廣播退出的指令。不過這個程序依然不夠穩健:當每一個Goroutine收到退出指令退出時通常會進行必定的清理工做,可是退出的清理工做並不能保證被完成,由於main線程並無等待各個工做Goroutine退出工做完成的機制。咱們能夠結合sync.WaitGroup來改進:

func worker(wg *sync.WaitGroup, cannel chan bool) {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-cannel:
			return
		}
	}
}

func main() {
	cancel := make(chan bool)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(&wg, cancel)
	}

	time.Sleep(time.Second)
	close(cancel)
	wg.Wait()
}
複製代碼

如今每一個工做者併發體的建立、運行、暫停和退出都是在main函數的安全控制之下了。

context包

在Go1.7發佈時,標準庫增長了一個context包,用來簡化對於處理單個請求的多個Goroutine之間與請求域的數據、超時和退出等操做,官方有博文對此作了專門介紹。咱們能夠用context包來從新實現前面的線程安全退出或超時的控制:

func worker(ctx context.Context, wg *sync.WaitGroup) error {
	defer wg.Done()

	for {
		select {
		default:
			fmt.Println("hello")
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(ctx, &wg)
	}

	time.Sleep(time.Second)
	cancel()

	wg.Wait()
}
複製代碼

當併發體超時或main主動中止工做者Goroutine時,每一個工做者均可以安全退出。

Go語言是帶內存自動回收特性的,所以內存通常不會泄漏。在前面素數篩的例子中,GenerateNaturalPrimeFilter函數內部都啓動了新的Goroutine,當main函數再也不使用管道時後臺Goroutine有泄漏的風險。咱們能夠經過context包來避免這個問題,下面是改進的素數篩實現:

// 返回生成天然數序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
	ch := make(chan int)
	go func() {
		for i := 2; ; i++ {
			select {
			case <- ctx.Done():
				return
			case ch <- i:
			}
		}
	}()
	return ch
}

// 管道過濾器: 刪除能被素數整除的數
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
	out := make(chan int)
	go func() {
		for {
			if i := <-in; i%prime != 0 {
				select {
				case <- ctx.Done():
					return
				case out <- i:
				}
			}
		}
	}()
	return out
}

func main() {
	// 經過 Context 控制後臺Goroutine狀態
	ctx, cancel := context.WithCancel(context.Background())

	ch := GenerateNatural(ctx) // 天然數序列: 2, 3, 4, ...
	for i := 0; i < 100; i++ {
		prime := <-ch // 新出現的素數
		fmt.Printf("%v: %v\n", i+1, prime)
		ch = PrimeFilter(ctx, ch, prime) // 基於新素數構造的過濾器
	}

	cancel()
}
複製代碼

當main函數完成工做前,經過調用cancel()來通知後臺Goroutine退出,這樣就避免了Goroutine的泄漏。

併發是一個很是大的主題,咱們這裏只是展現幾個很是基礎的併發編程的例子。官方文檔也有不少關於併發編程的討論,國內也有專門討論Go語言併發編程的書籍。讀者能夠根據本身的需求查閱相關的文獻。

相關文章
相關標籤/搜索