Go語言基礎(五)—— 併發編程

前言:
本專題用於記錄本身(647)在Go語言方向的學習和積累。 系列內容比較偏基礎,推薦給想要入門Go語言開發者們閱讀。

目錄以下:
Go語言基礎(一)—— 簡介、環境配置、HelloWorld
Go語言基礎(二)—— 基本經常使用語法
Go語言基礎(三)—— 面向對象編程
Go語言基礎(四)—— 優質的容錯處理
Go語言基礎(五)—— 併發編程
Go語言基礎(六)—— 測試、反射、Unsafe
Go語言基礎(七)—— 架構 & 常見任務
Go語言基礎(八)—— 性能調優golang


本篇將介紹以下內容:
1.協程機制(Groutine
2.共享內存併發機制(協程安全)
3.CSP併發機制(channel
4.多路選擇和超時控制(select
5.channel的關閉和廣播(channel
6.任務的取消
7.Context與關聯任務取消
8.常見併發任務(實戰)編程

1、協程機制

相信你們確定都知道 「線程」「進程」 的概念。vim

而在Go語言中,「協程」能夠理解爲更輕量級的線程。 經過調度「協程」就能夠把系統Kernel的效率發揮到極致。緩存

經過一張表格,咱們來對比一下協程與線程的區別。安全

  • Thread vs. Groutine:
\ 默認棧大小(建立時) KSE對應關係(Kernel Space Entity)
線程 Thread 1M 1 : 1
協程 Groutine 2K M : N

協程vs.線程的優點在於:網絡

  • 線程之間的切換會牽扯到內核中系統線程(kernel entity)的切換,這會形成較大的成本。
  • 而多個協程在同一個系統線程(kernel entity)下切換,就能下降切換系統線程(kernel entity)的成本。(如上圖所示)

協程的使用:

語法:go + func架構

func TestGroutine(t *testing.T) {
	for i := 0; i < 10; i++ {
		go func(i int) {
			fmt.Println(i) // 正確案例,值傳遞。各個協程無競爭關係。
		}(i)

		// go func() {
		// fmt.Println(i) // 錯誤案例,共享變量。各個協程有競爭關係
		// }()
	}
	time.Sleep(time.Millisecond * 50)
}
複製代碼

2、共享內存併發機制(協程安全)

說到協程安全,咱們第一個會想到的就是加鎖(lock)。 經過加鎖來保證協程安全。併發

在Go語言中也是如此,咱們來看個例子。異步

  • 協程併發,致使的協程不安全:
// 協程不安全demo
func TestCounterThreadUnsafe(t *testing.T) {
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			counter++
		}()
	}
	time.Sleep(1 * time.Second)
	t.Logf("counter = %d", counter)
}
複製代碼

結果以下:函數

=== RUN   TestCounterThreadUnsafe
--- PASS: TestCounterThreadUnsafe (1.00s)
    share_mem_test.go:18: counter = 4765
複製代碼

這時就會發現,計算錯誤,由於併發致使了漏值。

  • 解決方式一: 普通加鎖,並加延遲等待協程執行完畢(不推薦)
// 協程等待demo(停1秒,不推薦)
func TestCounterThreadSafe(t *testing.T) {
	var mut sync.Mutex
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			defer func() {
				mut.Unlock() //函數調用完成後:解鎖,保證協程安全
			}()
			mut.Lock() // 函數將要調用前:加鎖,保證協程安全
			counter++
		}()
	}
	time.Sleep(1 * time.Second) // 等待一秒,等協程所有執行完
	t.Logf("counter = %d", counter)
}
複製代碼

結果以下:

=== RUN   TestCounterThreadSafe
--- PASS: TestCounterThreadSafe (1.01s)
    share_mem_test.go:35: counter = 5000
複製代碼

結果正確,可是有一個問題。由於這裏有個1秒的延遲等待,保證協程運行完畢再調用結果。所以,有沒有更好的處理方式呢?接下來咱們再優化一下。

  • 解決方式二: 推薦! 使用同步等待隊列(WaitGroup)保證順序執行。
// 協程安全Demo
func TestCounterWaitGroup(t *testing.T) {
	var mut sync.Mutex    // 互斥鎖
	var wg sync.WaitGroup // 等待隊列
	counter := 0
	for i := 0; i < 5000; i++ {
		wg.Add(1) // 加個任務
		go func() {
			defer func() {
				mut.Unlock() //函數調用完成後:解鎖,保證協程安全
			}()
			mut.Lock() // 函數將要調用前:加鎖,保證協程安全
			counter++
			wg.Done() // 作完任務
		}()
	}
	wg.Wait() //等待全部任務執行完畢
	t.Logf("counter = %d", counter)
}
複製代碼

運行結果以下:

=== RUN   TestCounterWaitGroup
--- PASS: TestCounterWaitGroup (0.00s)
    share_mem_test.go:55: counter = 5000
複製代碼

這樣的話,能夠看出:互斥鎖Mutex和等待隊列WaitGroup不只保證了協程的安全,還避免了提早打印結果。(✔️)


3、CSP併發機制

1. CSP

CSP(Communicating sequential processes):通訊順序進程(管道通訊)。 簡單來講,CSP是經過Channel(管道)來通訊的。

Go 中的Channel(管道)有容量限制而且獨立於處理Groutine(協程)。

2. Channel

Go中常見的Channel有兩種,分別對應爲ChannelBuffer Channel

  • 第一種:Channel(無緩衝)

首先,發送者與接受者必須同時站在Channel的兩端才進行交互。 若是一方不在,另外一方就會阻塞在一端,直到兩端都在才進行交互。

建立語法:make(chan [type])

retChannel := make(chan string) // 建立無緩衝channel,並指明channel中的數據爲string,雙端等待
複製代碼

輸入語法:channel <-

channel <- object // channel輸入
複製代碼

獲取語法:<- channel

object <- channel // channel輸出
複製代碼
  • 第二種:Buffer Channel(有緩衝)

這是一種稍微高級一點的Channel方式,(更加鬆耦合)。

首先,給Channel設置一個容量大小,而且不要求發送者與接受者同時站在兩端。 而後,發送者會以Buffer的形式,不斷往Channel裏發送消息。 直到Channel的容量滿了才阻塞。 這時,只要接受方接收了消息(即Channel有剩餘容量了),發送者就會繼續發送消息。

建立語法:make(chan [type], Int)

retChannel := make(chan string, 1) // 建立有緩衝channel,並指明channel中的數據爲string
複製代碼

輸入語法:channel <-

channel <- object // channel輸入
複製代碼

獲取語法:<- channel

object <- channel // channel輸出
複製代碼

Demo:模擬了一個網絡請求的方法調用過程,經過Channel來控制當前協程在網絡請求的等待過程當中,去執行別的任務。

// 模擬網絡請求
func serviceTask() string {
	fmt.Println("- start working on service task.")
	time.Sleep(time.Millisecond * 50)
	return "- service task is Done."
}

// 別的任務
func otherTask() {
	fmt.Println("start working on something else.")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("other task is Done.")
}

// csp異步管道
func AsyncService() chan string {
	retChannel := make(chan string) // 無緩衝channel,建立並指明channel中的數據爲string,雙端等待
	// retChannel := make(chan string, 1) // 有緩衝channel,建立並指明channel中的數據爲string
	go func() {
		ret := serviceTask()
		fmt.Println("returned result.")
		retChannel <- ret // channel輸入
		fmt.Println("service exited.")
	}()
	return retChannel
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh) // channel輸出
	time.Sleep(time.Second * 1)
}
複製代碼

4、多路選擇和超時控制

使用select關鍵字,完成「多路選擇」與「超時控制」。

  • 多路選擇: 當返回的channel可能有多個時,可使用select來處理多路的響應事件。

注意:這裏與switch有點像,可是要注意的是,它並非順序判斷的。也就是若是channel1channel2同時知足時,可能走的是channel1、也多是channel2,並不像switch同樣作順序的判斷。

Demo:

select {
	case ret := <-channel1: 
		t.Log(ret)
	case ret:= <- channel2:
		t.Log(ret)
	case default:
		t.Error("No one returned.")
	}
複製代碼
  • 超時控制:

同時,咱們也能夠設置一個超時等待的一個分路,當channel超時還未返回時,能夠執行相應的代碼。

Demo:

select {
	case ret := <-AsyncService(): //正常返回
		t.Log(ret)
	case <-time.After(time.Millisecond * 100): // 超時等待
		t.Error("time out")
	}
複製代碼

5、channel的關閉和廣播

要點以下:

  1. 向已經closechannel發消息,會致使程序panic
  2. v, ok <- channel。 其中,okbool值, 若ok==true時,表示channel處於open狀態。 若ok==false時,表示channel處於close狀態。
  3. 全部channel接收者在channel關閉時,都會馬上從阻塞等待中返回,且ok值爲false。(PS:廣播機制,一般被利用向多個訂閱者同時發送信號。如,退出信號。)

Demo:

// 消息生產者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}

		fmt.Println("channel close.")
		close(ch) // 關閉channel

		wg.Done()
	}()
}

// 消息接收者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok { // 有消息就打印,直到channel被close。
				fmt.Println(data)
			} else {
				fmt.Println("Receiver close.")
				break // channel被close
			}
		}
		wg.Done()
	}()
}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg) // 開啓生產者
	wg.Add(1)
	dataReceiver(ch, &wg) // 開啓消費者
	wg.Wait()
}
複製代碼

6、任務的取消

經過上面的close channel(廣播機制),咱們能夠延伸一下,經過close channel通知全部channel取消當前的任務。

Demo以下:

func isCancelled(cancelChan chan struct{}) bool {
	select {
	case <-cancelChan:
		return true
	default:
		return false
	}
}

// 只能取消單個channel
func cancel_1(cancelChan chan struct{}) {
	cancelChan <- struct{}{}
}

// 全部channel所有取消
func cancel_2(cancelChan chan struct{}) {
	close(cancelChan)
}

func TestCancel(t *testing.T) {
	cancelChan := make(chan struct{}, 0) // 建立了一個channal,經過它來控制事件取消
	for i := 0; i < 5; i++ {             // 開啓5個協程
		go func(i int, chanclCh chan struct{}) { // 每一個協程裏面都有一個死循環,去等待取消消息
			for {
				if isCancelled(cancelChan) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模擬延遲5毫秒
			}
			fmt.Println(i, "Cancelled") // 說明退出了死循環,打印日誌
		}(i, cancelChan)
	}
	cancel_2(cancelChan) // 通知全部channel關閉。
	time.Sleep(time.Second * 1)
}
複製代碼

7、Context與關聯任務取消

剛纔咱們經過close channel來取消任務,但會有些問題。 好比,當一個任務被取消後,它所關聯的子任務也應該被當即取消。

爲了解決這個問題,go 1.9.0以後,golang加入了context,來保證關聯任務的取消。

1. Context

context就是用於管理相關任務的上下文,包含了共享值的傳遞,超時,取消通知。

結構體以下:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
複製代碼
  1. Deadline會返回一個超時時間,Goroutine得到了超時時間後,例如能夠對某些io操做設定超時時間。
  2. Done方法返回一個信道(channel),當Context被撤銷或過時時,該信道是關閉的,即它是一個表示Context是否已關閉的信號。
  3. Done信道關閉後,Err方法代表Context被撤的緣由。
  4. Value可讓Goroutine共享一些數據,固然得到數據是協程安全的。但使用這些數據的時候要注意同步,好比返回了一個map,而這個map的讀寫則要加鎖。

要點:

  • 根Context:經過context.Background()建立。
  • 子Context:經過context.WithCancel(parentContext)建立。
  • 當前Context被取消時,基於它的子context都會被取消。
  • 接收取消通知: <-ctx.Done

2. 關聯任務取消

咱們把剛纔的例子稍加調整,經過context來取消全部關聯的任務。

  • 首先,建立一個context
ctx, cancel := context.WithCancel(context.Background()) // 建立一個子context
複製代碼
  • 編寫一個取消方法,把context做爲參數。
func isCancelled(ctx context.Context) bool {
    select {
    case <-ctx.Done():
        return true
    default:
        return false
    }
}
複製代碼
  • 開五個協程死循環,每一個協程裏面都有一個死循環,等待取消任務消息。再調用cancel方法。
for i := 0; i < 5; i++ {                                // 開啓5個協程
        go func(i int, ctx context.Context) { // 每一個協程裏面都有一個死循環,去等待取消消息
            for {
                if isCancelled(ctx) {
                    break
                }
                time.Sleep(time.Millisecond * 5) // 模擬延遲5毫秒
            }
            fmt.Println(i, "Cancelled") // 說明退出了死循環,打印日誌
        }(i, ctx)
    }
    cancel() // 取消ctx
複製代碼

完整示例代碼以下:

func isCancelled(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

func TestCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background()) // 建立一個子context
	for i := 0; i < 5; i++ {                                // 開啓5個協程
		go func(i int, ctx context.Context) { // 每一個協程裏面都有一個死循環,去等待取消消息
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5) // 模擬延遲5毫秒
			}
			fmt.Println(i, "Cancelled") // 說明退出了死循環,打印日誌
		}(i, ctx)
	}
	cancel() // 取消ctx
	time.Sleep(time.Second * 1)
}
複製代碼

8、常見併發任務(實戰)

1. 只執行一次(單例模式)

場景:在多協程的狀況下,保證某段代碼只執行一次。

type Singleton struct {
	data string
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
		fmt.Println("Create Obj")
		singleInstance = new(Singleton)
	})
	return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%p\n", obj)
			wg.Done()
		}()
	}
	wg.Wait()
}
複製代碼

2. 僅需任意任務完成

利用channel管道通訊的機制,咱們能夠再任何一個協程完成任務時,就給對象發消息。

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func firstResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 建立bufferChannel。(若是用channel會致使協程泄漏,剩下9個channel會一直阻塞在系統中。)
	for i := 0; i < numOfRunner; i++ { // 開了10個協程
		go func(i int) {
			ret := runTask(i) // 每一個協程去執行任務
			ch <- ret
		}(i)
	}
	return <-ch // 返回channel裏的第一個Response。(由於channel是一個先進先出的管道)
}

func TestFirstResponse(t *testing.T) {
	t.Log(firstResponse()) // 發現每次運行返回的都不同,會根據協程完成任務的一個順序返回。
}
複製代碼

3. 全部任務完成

剛纔,咱們介紹了first response,接下來咱們看一下all response該怎麼作。思路是同樣的,只要接收到全部channel返回的數據,再返回便可。

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func allResponse() string {
	numOfRunner := 10
	ch := make(chan string, numOfRunner) // 建立bufferChannel。
	for i := 0; i < numOfRunner; i++ {   // 開了10個協程
		go func(i int) {
			ret := runTask(i) // 每一個協程去執行任務
			ch <- ret
		}(i)
	}
	finalRet := ""
	for j := 0; j < numOfRunner; j++ {
		finalRet += <-ch + "\n"
	}
	return finalRet // 返回channel裏的全部的Response。(由於channel是一個先進先出的管道)
}

func TestAllResponse(t *testing.T) {
	t.Log("Before:", runtime.NumGoroutine()) // 打印一下當前的協程數量
	t.Log(allResponse())                     // 發現每次運行返回的都不同,會根據協程完成任務的一個順序返回。
	t.Log("After:", runtime.NumGoroutine()) // 再打印一下當前的協程數量
}
複製代碼

4. 對象池

咱們能夠用buffer channel的管道特性來作一個對象池。

Demo:

type ReusableObj struct {
}

type ObjPool struct {
	bufChan chan *ReusableObj // 用於緩衝可重用對象
}

// 生產指定數量對象的對象池
func NewObjPool(numOfObj int) *ObjPool {
	ObjPool := ObjPool{}
	ObjPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		ObjPool.bufChan <- &ReusableObj{}
	}
	return &ObjPool
}

// 從對象池中得到對象
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): // 超時控制
		return nil, errors.New("time out")
	}
}

// 釋放對象池裏的對象
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default:
		return errors.New("overflow")
	}
}

func TestObjPool(t *testing.T) {
	pool := NewObjPool(10) // 生產一個10容量大小的對象池
	for i := 0; i < 10; i++ {
		if v, err := pool.GetObj(time.Second * 1); err != nil { // 獲取obj
			t.Error(err)
		} else {
			fmt.Printf("%T\n", v)                      // 獲取成功,答應日誌。
			if err := pool.ReleaseObj(v); err != nil { // 釋放obj
				t.Error(err)
			}
		}
	}
	fmt.Println("Done.")
}
複製代碼

5. sync.pool對象緩存

咱們能夠經過sync.pool作對象緩存(建立、獲取、緩存的策略)。

對象獲取策略:
  1. 首先,嘗試從私有對象獲取。

  2. 其次,若是私有對象不存在,就嘗試從當前Process的共享池獲取。

  3. 若是當前Process的共享池是空的,就嘗試從其餘Process的共享池獲取。

  4. 若是全部Process的共享池都是空的,就從sync.pool指定的New方法中「New」一個新的對象返回。

sync.pool緩存對象的生命週期:
  • 每一次GC(垃圾回收)都會清除sync.pool的緩存對象。

  • 所以,對象緩存的有效期爲下一次GC以前。

基本使用:

func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} { // 建立一個新的對象
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int) // 獲取對象
	fmt.Println(v)
	pool.Put(3) // 放回對象
	// runtime.GC() // 觸發GC,會清除sync.pool中緩存的對象
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}
複製代碼

多協程下的使用:

func TestSyncPoolInMultiGroutine(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(100)
	pool.Put(100)
	pool.Put(100)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {// 建立10個協程
		wg.Add(1) 
		go func(id int) {
			fmt.Println(pool.Get()) // 獲取對象
			wg.Done() 
		}(i)
	}
	wg.Wait()
}
複製代碼
sync.pool的優勢與問題:
  • 優勢:經過sync.pool下降複雜對象的建立和GC代價。

  • 問題:sync.pool會被GC回收,而且在併發使用中須要考慮加鎖。所以,在程序中要作好取捨。(考慮是建立一個對象的代價大?仍是用sync.pool加鎖緩存複用的代價大?)


最後,本系列我是在蔡超老師的技術分享下總結、實戰完成的, 感謝蔡超老師的技術分享

PS:另附上,分享連接:《Go語言從入門到實戰》 祝你們學有所成,工做順利。謝謝!

相關文章
相關標籤/搜索