併發 - Go 語言學習筆記

概述

併發 (concurrency) 是指同時管理不少事情,這些事情可能只作了一半就被暫停去作別的事情了。安全

Go 語言裏的併發指的是能讓某個函數獨立於其它函數運行的能力。當一個函數建立爲 goroutine 時,Go 會將其視爲一個獨立的工做單元。這個單元會被調度到可用的邏輯處理器上執行。bash

Go 語言只需經過 go 關鍵字來開啓 goroutine 便可實現併發。服務器

Goroutine

每個併發的執行單元稱爲一個goroutine,goroutine 能夠看做是輕量級線程,由於它比線程更小,十幾個 goroutine 可能體如今底層就是五六個線程。網絡

普通函數建立 goroutine

語法格式:併發

go 函數名( 參數列表 )
複製代碼
  • 函數名:要調用的函數名。
  • 參數列表:調用函數須要傳入的參數。

Go 容許使用 go 語句開啓一個新的運行期線程, 即 goroutine,以一個不一樣的、新建立的 goroutine 來執行一個函數。 同一個程序中的全部 goroutine 共享同一個地址空間。
例如:tcp

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")
    say("hello")
}
複製代碼

執行以上代碼,你會看到輸出的 hello 和 world 是沒有固定前後順序。由於它們是兩個 goroutine 在執行:函數

world
hello
world
hello
world
hello
world
hello
world
hello
複製代碼

匿名函數建立 goroutine

語法格式:ui

go func( 參數列表 ) {
    函數體
}( 調用參數列表 )
複製代碼
  • 參數列表:函數體內的參數變量列表。
  • 函數體:匿名函數的代碼。
  • 調用參數列表:啓動 goroutine 時,須要向匿名函數傳遞的調用參數。

併發的使用

1. 能夠同時處理多個客戶端請求的網絡時鐘

服務端程序 NetClock.go:atom

package main

import (
	"io"
	"log"
	"net"
	"time"
)

/*
實戰案例:可同時處理多個客戶端請求的網絡時鐘

TCP 編寫基於TCP的服務端和客戶端
*/

func main()  {
	listener, err := net.Listen("tcp", "localhost: 8000")
	if err != nil {
		log.Fatal(err)
		return
	}
	for {
		// 等待客戶端請求
		conn, err := listener.Accept()
		if err != nil {
			log.Println(err)
			continue
		}

		// 向客戶端發送服務端的時間
		go handleConn(conn)
	}
}

func handleConn(c net.Conn) {
	defer c.Close()
	for {
		// 將服務端的時間發送給客戶端
		_, err := io.WriteString(c,time.Now().Format("15:04:05\n"))
		if err != nil {
			return
		}
		time.Sleep(1 * time.Second)
	}
}
複製代碼

客戶端程序 NetClockClient.go:spa

package main

import (
	"io"
	"log"
	"net"
	"os"
)

// 網絡時鐘客戶端
func main()  {
	conn, err := net.Dial("tcp", "localhost:8000")
	if err != nil {
		log.Fatal(err)
		return
	}
	defer conn.Close()
	copy(os.Stdout, conn)

}

func copy(dst io.Writer, src io.Reader)  {
	if _,err := io.Copy(dst,src);err != nil {
		log.Fatal(err)
	}
}
複製代碼

執行結果:

17:32:25
17:32:26
17:32:27
17:32:28
17:32:29
...

複製代碼

2. 同時響應多長請求的Echo服務器

服務端程序 MultiEcho.go

package main

import (
	"bufio"
	"fmt"
	"log"
	"net"
	"strings"
	"time"
)

func main()  {
	listener, err := net.Listen("tcp", "localhost:8888")
	if err != nil {
		log.Fatal(err)
		return
	}
	for {
		// 監聽客戶端請求
		conn, err := listener.Accept()
		if err != nil {
			log.Print(err)
			continue
		}
		go handleConn(conn)
	}
}


func handleConn(c net.Conn) {
	input := bufio.NewScanner(c)
	for input.Scan() {
		// 併發處理同一個客戶端屢次請求
		go echo(c, input.Text(), 2*time.Second)
	}
	c.Close()
}

func echo(c net.Conn, shout string, delay time.Duration)  {
	// 將客戶端發過來的數據轉換成大寫
	fmt.Fprintln(c, "\t", strings.ToUpper(shout))
	time.Sleep(delay)
	fmt.Fprintf(c, "\t", shout)
	time.Sleep(delay)
	fmt.Fprintln(c, "\t", strings.ToLower(shout))
}

複製代碼

3. 競爭狀態

若是兩個或者多個 goroutine 在沒有互相同步的狀況下,訪問某個共享的資源,並試圖同時讀和寫這個資源,就處於相互競爭的狀態,這種狀況被稱做競爭狀態(race condition)。競爭狀態的存在是讓併發程序變得複雜的地方,十分容易引發潛在問題。對一個共享資源的讀和寫操做必須是原子化的,換句話說,同一時刻只能有一個 goroutine 對共享資源進行讀和寫操做。

如下是包含競爭狀態的示例程序:

// 這個示例程序展現如何在程序裏形成競爭狀態
// 實際上不但願出現這種狀況
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	// counter 是全部 goroutine 都要增長其值的變量
	counter int

	// wg 用來等待程序結束
	wg sync.WaitGroup
)

// main 是全部 Go 程序的入口
func main() {
	// 計數加2,表示要等待兩個 goroutine
	wg.Add(2)

	// 建立兩個 goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待 goroutine 結束
	wg.Wait()
	fmt.Println("Final counter:", counter)
}

// incCounter 增長包裏 counter 變量的值
func incCounter(id int) {
	// 在函數退出時調用 Done 來通知 main 函數工做已經完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 捕獲 counter 的值
		value := counter

		// 當前 goroutine 從線程退出,並放回到隊列
		runtime.Gosched()

		// 增長本地 value 變量的值
		value ++

		// 將該值保存回 counter
		counter = value
	}
}
複製代碼

輸出:

Final Counter: 2
複製代碼

變量 counter 會進行4次讀和寫操做,每一個 goroutine 執行兩次。可是,程序終止時,counter 變量的值卻爲2。這是由於每一個 goroutine 都會覆蓋另外一個 goroutine的工做。這種覆蓋發生在 goroutine 切換的時候。每一個 goroutine 創造了一個 counter 變量的副本,以後就切換到另外一個 goroutine。當這個goroutine 再次運行的時候,counter 變量的值已經改變了,可是 goroutine 並無更新本身的那個副本的值,而是繼續使用這個副本的值,用這個值遞增,並存回 counter 變量,結果覆蓋了另外一個 gouroutine 完成的工做。

一種修正代碼,消除競爭狀態的辦法是,使用 Go 語言提供的鎖機制,來鎖住共享資源,從而保證 goroutine 的同步狀態。

4. 鎖住共享資源

Go 語言提供了傳統的同步 goroutine 的機制,就是對共享資源加鎖。若是須要順序訪問一個整型變量或者一段代碼,atomic 和 sync 包裏的函數提供了很好的解決方案。

原子函數可以以很底層的枷鎖機制來同步訪問整型變量和指針。

package main
// 這個示例程序展現如何在程序裏形成競爭狀態
// 實際上不但願出現這種狀況
import (
	"fmt"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	// counter 是全部 goroutine 都要增長其值的變量
	counter int64

	// wg 用來等待程序結束
	wg sync.WaitGroup
)

// main 是全部 Go 程序的入口
func main() {
	// 計數加2,表示要等待兩個 goroutine
	wg.Add(2)

	// 建立兩個 goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待 goroutine 結束
	wg.Wait()
	fmt.Println("Final counter:", counter)
}

// incCounter 增長包裏 counter 變量的值
func incCounter(id int) {
	// 在函數退出時調用 Done 來通知 main 函數工做已經完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 安全地對 counter 加1
		atomic.AddInt64(&counter, 1)

		// 當前 goroutine 從線程退出,並放回到隊列
		runtime.Gosched()
	}
}
複製代碼

輸出:

Final Counter: 4
複製代碼

如今獲得了正確的值4。
以上程序使用了 atmoic 包的 AddInt64 函數,這個函數會同步整型值的加法,方法是強制同一時刻只能有一個 goroutine 運行並完成這個加法操做。當 goroutine 試圖去調用任何原子函數時,這些 goroutine 都會自動根據所引用的變量作同步處理。

另外兩個有用的原子函數是 LoadInt64AddInt64函數。這兩個函數提供了一種安全地讀和寫一個整型值的方式。

// 這個示例程序展現如何使用 atomic 包裏的 Store 和 Load 類函數來提供對數值類型的安全訪問

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	// shutdown 是通知正在執行的 goroutine 中止工做的標誌
	shutdown int64

	// wg 用來等待程序結束
	wg sync.WaitGroup
)

// main 是全部 Go 程序的入口
func main()  {
	// 計數加2,表示要等待兩個 goroutine
	wg.Add(2)

	//建立兩個 goroutine
	go doWork("A")
	go doWork("B")

	// 給定 goroutine 執行的時間
	time.Sleep(1 * time.Second)

	// 該中止工做了, 安全地設置 shutdown 標誌
	fmt.Println("Shutdown now")
	atomic.StoreInt64(&shutdown, 1)

	// 等待 goroutine 結束
	wg.Wait()
}

// doWork 用來模擬執行工做的 goroutine
// 檢測以前的 shutdown 標誌來決定是否提早終止
func doWork(name string)  {
	// 在函數退出是調用 Done 來通知 main 函數工做已經完成
	defer wg.Done()

	for {
		fmt.Printf("Doing %s Work \n", name)
		time.Sleep(250 * time.Millisecond)

		// 要中止工做了嗎?
		if atomic.LoadInt64(&shutdown) == 1 {
			fmt.Printf("Shutting %s Down\n", name)
			break
		}
	}
}
複製代碼

輸出:

Doing A Work 
Doing B Work 
Doing B Work 
Doing A Work 
Doing B Work 
Doing A Work 
Doing B Work 
Doing A Work 
Shutdown now
Shutting B Down
Shutting A Down
複製代碼

以上程序啓動了兩個goroutine, 在各自循環的每次迭代以後,會使用 LoadInt64 來檢查 shutdown 變量的值,這個函數會安全地返回 shutdown 變量的一個副本,若是這個副本的值爲1,goroutine 就會跳出循環並終止。

main 函數使用 StoreInt64 函數來安全地修改 shutdown 變量的值。若是哪一個 doWork goroutine 試圖在 main 函數調用 StoreInt64 的同時調用 LoadInt64函數,那麼原子函數會將這些調用互相同步,保證這些操做都是安全的,不會進入競爭狀態。

5. 互斥鎖

另外一種同步訪問共享資源的方式是使用互斥鎖(mutex)。互斥鎖用於在代碼上建立一個臨界區,保證同一時間只有一個 goroutine 能夠執行這個臨界區代碼。

// 這個示例程序展現如何使用互斥鎖來定義一段須要同步訪問的代碼臨界區資源的同步訪問
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	// counter 是全部 goroutine 都要增長其值的變量
	counter int

	// wg 用來等待程序結束
	wg sync.WaitGroup

	// mutex 用來定義一段代碼臨界區
	mutex sync.Mutex
)

// main 是全部 Go 程序的入口
func main() {
	// 計數加2,表示要等待兩個 goroutine
	wg.Add(2)

	// 建立兩個 goroutine
	go incCounter(1)
	go incCounter(2)

	// 等待 goroutine 結束
	wg.Wait()
	fmt.Printf("Final counter: %d\n", counter)
}

// incCounter 使用互斥鎖來同步並保證安全訪問
// 增長包裏 counter 變量的值
func incCounter(id int) {
	// 在函數退出時調用 Done 來通知 main 函數工做已經完成
	defer wg.Done()

	for count := 0; count < 2; count++ {
		// 同一時刻只容許一個 goroutine 進入這個臨界區
		mutex.Lock()
		{
			// 捕獲 counter 的值
			value := counter

			// 當前 goroutine 從線程退出,並放回到隊列
			runtime.Gosched()

			// 增長本地 value 變量的值
			value ++

			// 將該值保存回 counter
			counter = value
		}
		mutex.Unlock()
		// 釋放鎖,容許其它正在等待的 goroutine 進入臨界區
	}
}
複製代碼

輸出:

Final counter: 4
複製代碼

在對 counter 變量操做的先後,分別用 Lock() 和 Unlock() 函數調用定義的臨界區裏將其保護起來,同一時刻只有一個 goroutine 能夠進入臨界區,以後,直到調用 Unlock() 函數以後,其它 goroutine 才能進入臨界區。

相關文章
相關標籤/搜索