前言:
本專題用於記錄本身(647)在Go語言方向的學習和積累。 系列內容比較偏基礎,推薦給想要入門Go語言開發者們閱讀。
目錄以下:
Go語言基礎(一)—— 簡介、環境配置、HelloWorld
Go語言基礎(二)—— 基本經常使用語法
Go語言基礎(三)—— 面向對象編程
Go語言基礎(四)—— 優質的容錯處理
Go語言基礎(五)—— 併發編程
Go語言基礎(六)—— 測試、反射、Unsafe
Go語言基礎(七)—— 架構 & 常見任務
Go語言基礎(八)—— 性能調優golang
本篇將介紹以下內容:
1.協程機制(Groutine
)
2.共享內存併發機制(協程安全)
3.CSP併發機制(channel
)
4.多路選擇和超時控制(select
)
5.channel的關閉和廣播(channel
)
6.任務的取消
7.Context與關聯任務取消
8.常見併發任務(實戰)編程
相信你們確定都知道 「線程」 與 「進程」 的概念。vim
而在Go語言中,「協程」能夠理解爲更輕量級的線程。 經過調度「協程」就能夠把系統Kernel的效率發揮到極致。緩存
經過一張表格,咱們來對比一下協程與線程的區別。安全
\ | 默認棧大小(建立時) | KSE對應關係(Kernel Space Entity) |
---|---|---|
線程 Thread | 1M | 1 : 1 |
協程 Groutine | 2K | M : N |
協程vs.線程的優點在於:網絡
kernel entity
)的切換,這會形成較大的成本。kernel entity
)下切換,就能下降切換系統線程(kernel entity
)的成本。(如上圖所示)語法:go + func
架構
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(i) // 正確案例,值傳遞。各個協程無競爭關係。
}(i)
// go func() {
// fmt.Println(i) // 錯誤案例,共享變量。各個協程有競爭關係
// }()
}
time.Sleep(time.Millisecond * 50)
}
複製代碼
說到協程安全,咱們第一個會想到的就是加鎖(lock)。 經過加鎖來保證協程安全。併發
在Go語言中也是如此,咱們來看個例子。異步
// 協程不安全demo
func TestCounterThreadUnsafe(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func() {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Logf("counter = %d", counter)
}
複製代碼
結果以下:函數
=== RUN TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
share_mem_test.go:18: counter = 4765
複製代碼
這時就會發現,計算錯誤,由於併發致使了漏值。
// 協程等待demo(停1秒,不推薦)
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock() //函數調用完成後:解鎖,保證協程安全
}()
mut.Lock() // 函數將要調用前:加鎖,保證協程安全
counter++
}()
}
time.Sleep(1 * time.Second) // 等待一秒,等協程所有執行完
t.Logf("counter = %d", counter)
}
複製代碼
結果以下:
=== RUN TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
share_mem_test.go:35: counter = 5000
複製代碼
結果正確,可是有一個問題。由於這裏有個1秒的延遲等待,保證協程運行完畢再調用結果。所以,有沒有更好的處理方式呢?接下來咱們再優化一下。
WaitGroup
)保證順序執行。// 協程安全Demo
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex // 互斥鎖
var wg sync.WaitGroup // 等待隊列
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1) // 加個任務
go func() {
defer func() {
mut.Unlock() //函數調用完成後:解鎖,保證協程安全
}()
mut.Lock() // 函數將要調用前:加鎖,保證協程安全
counter++
wg.Done() // 作完任務
}()
}
wg.Wait() //等待全部任務執行完畢
t.Logf("counter = %d", counter)
}
複製代碼
運行結果以下:
=== RUN TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
share_mem_test.go:55: counter = 5000
複製代碼
這樣的話,能夠看出:互斥鎖Mutex
和等待隊列WaitGroup
不只保證了協程的安全,還避免了提早打印結果。(✔️)
CSP(Communicating sequential processes
):通訊順序進程(管道通訊)。 簡單來講,CSP是經過Channel
(管道)來通訊的。
Go 中的
Channel
(管道)有容量限制而且獨立於處理Groutine
(協程)。
Go中常見的Channel
有兩種,分別對應爲Channel
、Buffer Channel
。
首先,發送者與接受者必須同時站在Channel
的兩端才進行交互。 若是一方不在,另外一方就會阻塞在一端,直到兩端都在才進行交互。
建立語法:make(chan [type])
retChannel := make(chan string) // 建立無緩衝channel,並指明channel中的數據爲string,雙端等待
複製代碼
輸入語法:channel <-
channel <- object // channel輸入
複製代碼
獲取語法:<- channel
object <- channel // channel輸出
複製代碼
這是一種稍微高級一點的Channel
方式,(更加鬆耦合)。
首先,給
Channel
設置一個容量大小,而且不要求發送者與接受者同時站在兩端。 而後,發送者會以Buffer
的形式,不斷往Channel
裏發送消息。 直到Channel
的容量滿了才阻塞。 這時,只要接受方接收了消息(即Channel
有剩餘容量了),發送者就會繼續發送消息。
建立語法:make(chan [type], Int)
retChannel := make(chan string, 1) // 建立有緩衝channel,並指明channel中的數據爲string
複製代碼
輸入語法:channel <-
channel <- object // channel輸入
複製代碼
獲取語法:<- channel
object <- channel // channel輸出
複製代碼
Demo:模擬了一個網絡請求的方法調用過程,經過Channel
來控制當前協程在網絡請求的等待過程當中,去執行別的任務。
// 模擬網絡請求
func serviceTask() string {
fmt.Println("- start working on service task.")
time.Sleep(time.Millisecond * 50)
return "- service task is Done."
}
// 別的任務
func otherTask() {
fmt.Println("start working on something else.")
time.Sleep(time.Millisecond * 100)
fmt.Println("other task is Done.")
}
// csp異步管道
func AsyncService() chan string {
retChannel := make(chan string) // 無緩衝channel,建立並指明channel中的數據爲string,雙端等待
// retChannel := make(chan string, 1) // 有緩衝channel,建立並指明channel中的數據爲string
go func() {
ret := serviceTask()
fmt.Println("returned result.")
retChannel <- ret // channel輸入
fmt.Println("service exited.")
}()
return retChannel
}
func TestAsyncService(t *testing.T) {
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh) // channel輸出
time.Sleep(time.Second * 1)
}
複製代碼
使用select
關鍵字,完成「多路選擇」與「超時控制」。
channel
可能有多個時,可使用select來處理多路的響應事件。注意:這裏與
switch
有點像,可是要注意的是,它並非順序判斷的。也就是若是channel1
與channel2
同時知足時,可能走的是channel1
、也多是channel2
,並不像switch
同樣作順序的判斷。
Demo:
select {
case ret := <-channel1:
t.Log(ret)
case ret:= <- channel2:
t.Log(ret)
case default:
t.Error("No one returned.")
}
複製代碼
同時,咱們也能夠設置一個超時等待的一個分路,當channel
超時還未返回時,能夠執行相應的代碼。
Demo:
select {
case ret := <-AsyncService(): //正常返回
t.Log(ret)
case <-time.After(time.Millisecond * 100): // 超時等待
t.Error("time out")
}
複製代碼
要點以下:
close
的channel
發消息,會致使程序panic
。v, ok <- channel
。 其中,ok
爲bool
值, 若ok==true
時,表示channel
處於open
狀態。 若ok==false
時,表示channel
處於close
狀態。channel
接收者在channel
關閉時,都會馬上從阻塞等待中返回,且ok
值爲false
。(PS:廣播機制,一般被利用向多個訂閱者同時發送信號。如,退出信號。)Demo:
// 消息生產者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
fmt.Println("channel close.")
close(ch) // 關閉channel
wg.Done()
}()
}
// 消息接收者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。
fmt.Println(data)
} else {
fmt.Println("Receiver close.")
break // channel被close
}
}
wg.Done()
}()
}
func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg) // 開啓生產者
wg.Add(1)
dataReceiver(ch, &wg) // 開啓消費者
wg.Wait()
}
複製代碼
經過上面的close channel
(廣播機制),咱們能夠延伸一下,經過close channel
通知全部channel
取消當前的任務。
Demo以下:
func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan:
return true
default:
return false
}
}
// 只能取消單個channel
func cancel_1(cancelChan chan struct{}) {
cancelChan <- struct{}{}
}
// 全部channel所有取消
func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}
func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0) // 建立了一個channal,經過它來控制事件取消
for i := 0; i < 5; i++ { // 開啓5個協程
go func(i int, chanclCh chan struct{}) { // 每一個協程裏面都有一個死循環,去等待取消消息
for {
if isCancelled(cancelChan) {
break
}
time.Sleep(time.Millisecond * 5) // 模擬延遲5毫秒
}
fmt.Println(i, "Cancelled") // 說明退出了死循環,打印日誌
}(i, cancelChan)
}
cancel_2(cancelChan) // 通知全部channel關閉。
time.Sleep(time.Second * 1)
}
複製代碼
剛纔咱們經過close channel
來取消任務,但會有些問題。 好比,當一個任務被取消後,它所關聯的子任務也應該被當即取消。
爲了解決這個問題,go 1.9.0
以後,golang
加入了context
,來保證關聯任務的取消。
context
就是用於管理相關任務的上下文,包含了共享值的傳遞,超時,取消通知。
結構體以下:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
複製代碼
Deadline
會返回一個超時時間,Goroutine
得到了超時時間後,例如能夠對某些io操做設定超時時間。Done
方法返回一個信道(channel
),當Context
被撤銷或過時時,該信道是關閉的,即它是一個表示Context
是否已關閉的信號。Done
信道關閉後,Err方法代表Context被撤的緣由。Value
可讓Goroutine
共享一些數據,固然得到數據是協程安全的。但使用這些數據的時候要注意同步,好比返回了一個map
,而這個map
的讀寫則要加鎖。要點:
context.Background()
建立。context.WithCancel(parentContext)
建立。<-ctx.Done
咱們把剛纔的例子稍加調整,經過context來取消全部關聯的任務。
context
:ctx, cancel := context.WithCancel(context.Background()) // 建立一個子context
複製代碼
context
做爲參數。func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
複製代碼
cancel
方法。for i := 0; i < 5; i++ { // 開啓5個協程
go func(i int, ctx context.Context) { // 每一個協程裏面都有一個死循環,去等待取消消息
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5) // 模擬延遲5毫秒
}
fmt.Println(i, "Cancelled") // 說明退出了死循環,打印日誌
}(i, ctx)
}
cancel() // 取消ctx
複製代碼
完整示例代碼以下:
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) // 建立一個子context
for i := 0; i < 5; i++ { // 開啓5個協程
go func(i int, ctx context.Context) { // 每一個協程裏面都有一個死循環,去等待取消消息
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5) // 模擬延遲5毫秒
}
fmt.Println(i, "Cancelled") // 說明退出了死循環,打印日誌
}(i, ctx)
}
cancel() // 取消ctx
time.Sleep(time.Second * 1)
}
複製代碼
場景:在多協程的狀況下,保證某段代碼只執行一次。
type Singleton struct {
data string
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Printf("%p\n", obj)
wg.Done()
}()
}
wg.Wait()
}
複製代碼
利用channel管道通訊的機制,咱們能夠再任何一個協程完成任務時,就給對象發消息。
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func firstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner) // 建立bufferChannel。(若是用channel會致使協程泄漏,剩下9個channel會一直阻塞在系統中。)
for i := 0; i < numOfRunner; i++ { // 開了10個協程
go func(i int) {
ret := runTask(i) // 每一個協程去執行任務
ch <- ret
}(i)
}
return <-ch // 返回channel裏的第一個Response。(由於channel是一個先進先出的管道)
}
func TestFirstResponse(t *testing.T) {
t.Log(firstResponse()) // 發現每次運行返回的都不同,會根據協程完成任務的一個順序返回。
}
複製代碼
剛纔,咱們介紹了first response,接下來咱們看一下all response該怎麼作。思路是同樣的,只要接收到全部channel
返回的數據,再返回便可。
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func allResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner) // 建立bufferChannel。
for i := 0; i < numOfRunner; i++ { // 開了10個協程
go func(i int) {
ret := runTask(i) // 每一個協程去執行任務
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
return finalRet // 返回channel裏的全部的Response。(由於channel是一個先進先出的管道)
}
func TestAllResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 打印一下當前的協程數量
t.Log(allResponse()) // 發現每次運行返回的都不同,會根據協程完成任務的一個順序返回。
t.Log("After:", runtime.NumGoroutine()) // 再打印一下當前的協程數量
}
複製代碼
咱們能夠用buffer channel的管道特性來作一個對象池。
Demo:
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj // 用於緩衝可重用對象
}
// 生產指定數量對象的對象池
func NewObjPool(numOfObj int) *ObjPool {
ObjPool := ObjPool{}
ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
for i := 0; i < numOfObj; i++ {
ObjPool.bufChan <- &ReusableObj{}
}
return &ObjPool
}
// 從對象池中得到對象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): // 超時控制
return nil, errors.New("time out")
}
}
// 釋放對象池裏的對象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
func TestObjPool(t *testing.T) {
pool := NewObjPool(10) // 生產一個10容量大小的對象池
for i := 0; i < 10; i++ {
if v, err := pool.GetObj(time.Second * 1); err != nil { // 獲取obj
t.Error(err)
} else {
fmt.Printf("%T\n", v) // 獲取成功,答應日誌。
if err := pool.ReleaseObj(v); err != nil { // 釋放obj
t.Error(err)
}
}
}
fmt.Println("Done.")
}
複製代碼
咱們能夠經過sync.pool作對象緩存(建立、獲取、緩存的策略)。
首先,嘗試從私有對象獲取。
其次,若是私有對象不存在,就嘗試從當前Process
的共享池獲取。
若是當前Process
的共享池是空的,就嘗試從其餘Process
的共享池獲取。
若是全部Process
的共享池都是空的,就從sync.pool
指定的New
方法中「New」
一個新的對象返回。
每一次GC
(垃圾回收)都會清除sync.pool的緩存對象。
所以,對象緩存的有效期爲下一次GC
以前。
基本使用:
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} { // 建立一個新的對象
fmt.Println("Create a new object.")
return 100
},
}
v := pool.Get().(int) // 獲取對象
fmt.Println(v)
pool.Put(3) // 放回對象
// runtime.GC() // 觸發GC,會清除sync.pool中緩存的對象
v1, _ := pool.Get().(int)
fmt.Println(v1)
}
複製代碼
多協程下的使用:
func TestSyncPoolInMultiGroutine(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 10
},
}
pool.Put(100)
pool.Put(100)
pool.Put(100)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {// 建立10個協程
wg.Add(1)
go func(id int) {
fmt.Println(pool.Get()) // 獲取對象
wg.Done()
}(i)
}
wg.Wait()
}
複製代碼
優勢:經過sync.pool
下降複雜對象的建立和GC代價。
問題:sync.pool
會被GC回收,而且在併發使用中須要考慮加鎖。所以,在程序中要作好取捨。(考慮是建立一個對象的代價大?仍是用sync.pool加鎖緩存複用的代價大?)
PS:另附上,分享連接:《Go語言從入門到實戰》 祝你們學有所成,工做順利。謝謝!