go - 更爲安全的使用 sync.Map 組件

go 內置了協程安全的 sync 包來方便咱們同步各協程之間的執行狀態,使用起來也很是方便。安全

最近在排查解決一個線下服務的數據同步問題,review 核心代碼後,發現這麼一段流程控制代碼。併發

錯誤示例ui

package main

import (
    "log"
    "runtime"
    "sync"
)

func main() {
    // 可並行也是重點,生產場景沒幾個單核的吧?? 
    runtime.GOMAXPROCS(runtime.NumCPU())
    waitGrp := &sync.WaitGroup{}
    waitGrp.Add(1)

    syncTaskProcessMap := &sync.Map{}
    for i := 0; i < 100; i++ {
        syncTaskProcessMap.Store(i, i)
    }

    for j := 0; j < 100; j++ {
        go func(j int) {
            // 協程可能並行搶佔一輪開始
            syncTaskProcessMap.Delete(j)
            // 協程可能並行搶佔一輪結束
            // 在當前協程 Delete 後 Range 前 又被其餘協程 Delete 操做了
            
            syncTaskProcessCount := 0
            syncTaskProcessMap.Range(func(key, value interface{}) bool {
                syncTaskProcessCount++
                return true
            })
            
            if syncTaskProcessCount == 0 {
                log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
            }
        }(j)
    }
    
    waitGrp.Wait()
}

func GetGoroutineID() uint64 {
    b := make([]byte, 64)
    runtime.Stack(b, false)
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

代碼的本意,是在 i 個協程併發的執行完成後,啓動一次 nextProcess 任務,代碼使用了 sync.Map 來維護和同步 i 個協程的執行進度,防止多協程併發形成的 map 不安全讀寫。當最後一個協程執行完畢,sync.Map 爲空,啓動一次 nextProcess。但能讀到狀態值 syncTaskProcessCount0 的協程,只會是 最後一個 執行完成的協程嗎?日誌

sync.Map::Store\Load\Delete\Range 都是協程安全的操做,在調用期間只會被當前 協程 搶佔訪問,但它們的組合操做並非 獨佔 的,上面的代碼認爲,Delete && Range 兩項操做期間 不會 夾帶其餘協程對 sync.Map 讀寫操做,致使能讀到 syncTaskProcessCount0 的協程可能不止最後一個執行完畢的。code

多執行幾回,可能獲得一下輸出:協程

sqrtcat:demo$ go run test.go 
2021/04/20 14:30:27 114 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:30 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:30 116 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:33 117 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:35 117 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 118 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:35 115 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt
sqrtcat:demo$ go run test.go 
2021/04/20 14:30:38 131 syncTaskProcessMap empty, start syncOnline 0
2021/04/20 14:30:38 132 syncTaskProcessMap empty, start syncOnline 0
^Csignal: interrupt

能夠看到,syncTaskProcessMap empty 的狀態被多個協程讀到了。
G117,G118,G115 在多核場景下肯能 並行 執行。隊列

  1. SyncMapG117 搶佔,Delete 後 2,SyncMap 被釋放。
  2. SyncMapG118 搶佔,Delete 後 1,SyncMap 被釋放。
  3. SyncMapG115 搶佔,Delete 後 0,SyncMap 被釋放。
  4. 這時的 syncMap 已然爲空,G11七、G11八、G115 繼續 Range 獲得的 syncTaskProcessCount 都爲 0,這樣就致使了代碼執行與指望不一樣了。

因此,雖然 sync.Map 的單一操做是自動加鎖的排他操做,但組合在一塊兒就不是了,咱們要自行在 code section 上加鎖。同步

正確示例string

package main

import (
    "log"
    "runtime"
    "sync"
)

// 錯誤代碼示例
func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    syncMutex := &sync.Mutex{}
    
    waitGrp := &sync.WaitGroup{}
    waitGrp.Add(1)

    syncTaskProcessMap := &sync.Map{}
    for i := 0; i < 100; i++ {
        syncTaskProcessMap.Store(i, i)
    }

    for j := 0; j < 100; j++ {
        go func(j int) {
            // 保證協程對 syncMap 的組合操做也是獨佔的
            // 將可能的並行操做順序化
            syncMutex.Lock()
            defer syncMutex.Unlock()
            
            syncTaskProcessMap.Delete(j)
            
            syncTaskProcessCount := 0
            syncTaskProcessMap.Range(func(key, value interface{}) bool {
                syncTaskProcessCount++
                return true
            })
            
            if syncTaskProcessCount == 0 {
                log.Println(GetGoroutineID(), "syncTaskProcessMap empty, start syncOnline", syncTaskProcessCount)
            }
        }(j)
    }
    
    waitGrp.Wait()
}

func GetGoroutineID() uint64 {
    b := make([]byte, 64)
    runtime.Stack(b, false)
    b = bytes.TrimPrefix(b, []byte("goroutine "))
    b = b[:bytes.IndexByte(b, ' ')]
    n, _ := strconv.ParseUint(string(b), 10, 64)
    return n
}

協程並行it

多核 的平臺上,分配在不一樣 時間片隊列 上的協程是能夠 並行 執行的,相同 時間片隊列 上的協程是 併發 執行的

func main() {
    // 這行代碼將會影響子協程裏的日誌輸出量
    runtime.GOMAXPROCS(runtime.NumCPU())
    waitChan := make(chan int)

    go func() {
        defer func() {
            log.Println(GetGoroutineID(), "sub defer")
        }()
        log.Println(GetGoroutineID(), "sub start")
        waitChan <- 1
        log.Println(GetGoroutineID(), "sub finish")
    }()

    log.Println(GetGoroutineID(), "main start")
    log.Println(<-waitChan)
    log.Println(GetGoroutineID(), "main finish")
}
  1. 若是 mainsub 分配在了同一個 cpu 上 或只有一個 cpumain startwaitChan 讀阻塞了 mainsub 開始執行,sub start,寫入 waitChan,後續也沒有觸發協程切換的代碼段,繼續執行 sub finish sub defer 退出,交出 時間片main 繼續執行 main finish
  2. 若是 mainsub 分配在了不一樣 cpu 上,當 waitChan 阻塞了 cpu1 上的 main,而 subcpu2 執行了 寫入waitChan 後,main 可能會被 cpu1 當即繼續執行,主協程 main 退出,sub 也會被終止執行,後面的日誌打印可能就執行不到了。
sqrtcat:demo$ go run test.go 
2021/04/20 15:26:42 5 sub start
2021/04/20 15:26:42 1 main start
2021/04/20 15:26:42 1
2021/04/20 15:26:42 1 main finish
2021/04/20 15:26:42 5 sub finish
相關文章
相關標籤/搜索