在兩天前第一次遇到本身的程序出現死鎖, 我一直很是的當心使用鎖,瞭解死鎖致使的各類可能性,
此次的經歷讓我將來會更加當心,下面來回顧一下死鎖發生的過程與代碼演進的過程吧。html
個人程序中有一塊緩存,數據會組織好放到內存中,會根據數據源(MySQL)更新而刷新緩存,是讀多寫少的應用場景。
內存中有一個很大數據列表,緩存模塊會按數據維度進行分組,每次訪問根據維度查找到這個列表裏面的全部數據。
業務模塊拿到數據後會根據業務須要再作一次篩選,選出N個符合條件的數據(具體多少個由業務模塊的規則決定)。緩存
如下是簡化的代碼:安全
package cache import "sync" type Cache struct { lock sync.RWMutex data []int // 實際數據比這個複雜不少有不少維度 } func (c *Cache) Get() []int { c.lock.RLock() defer c.lock.RUnlock() var res []int // 篩選數據, 簡單寫一個篩選過程 for i := range c.data { if c.data[i] > 10 { res = append(res, c.data[i]) } } return res }
這個方法返回的數據會不少,可實際業務須要的數據只有幾個而已,那作一個優化吧,利用 go
的 chan
實現一個迭代生成器,每次只返回一個數據,業務端找到須要的數據後當即終止。服務器
<!-- more -->網絡
調整後的方法大體像下面這樣:app
package cache import "sync" type Cache struct { lock sync.RWMutex data []int // 實際數據比這個複雜不少有不少維度 } func (c *Cache) Get(next chan struct{}) chan int { ch := make(chan int, 1) go func() { c.lock.RLock() defer c.lock.RUnlock() defer close(ch) // 篩選數據, 簡單寫一個篩選過程 for i := range c.data { if c.data[i] > 10 { ch <- i if _, ok := <-next; !ok { return } } } }() return ch }
調用端的代碼相似下面這樣:tcp
data := make([]int, 0, 10) c := Cache{} next := make(chan struct{}) for i := range c.Get(next) { data = append(data, i) if len(data) >= 10 { close(next) break } next <- struct{}{} }
這樣調整後查看程序的內存分配顯著下降,並且平安無事在生產環境運行了半個月^_^,固然截止當前還不會出現死鎖的狀況。
有一天業務調整了,在 cache
模塊有另一個方法,公用這個鎖(實際我緩存模塊爲了統一,都使用一個鎖,方便管理),下面的代碼也寫到這個 cache
組件裏面。ide
如下代碼只增長了改變的部分,....
保持原來的代碼不變。大數據
package cache import "sync" type Cache struct { .... x int } func (c *Cache) XX(i int) int{ c.lock.RLock() defer c.lock.RUnlock() if i >c.x { return i } return 0 } ....
添加一個方法怎麼就致使死鎖了呢,主要是調用端的業務代碼也發生變化了,更改以下:優化
data := make([]int, 0, 10) c := Cache{} next := make(chan struct{}) for i := range c.Get(next) { data = append(data, i) if c.XX(i) != i { // 在這裏調用了緩存模塊的另外一個方法 close(next) break } next <- struct{}{} }
修改後的代碼上線存活了5天就掛了,實際是當時業務訂單需求不多,只是有不少流量請求,並無頻繁訪問這個方法,否者會在極短的時間致使死鎖,
經過這塊簡化的代碼,也很難分析出會致使死鎖,真實的業務代碼不少,並且調用關係比較複雜,咱們經過代碼審覈並無發現任何問題。
上線5天后忽然接到服務沒法響應的報警,事故發生當即查看了 grafana
的監控數據,發如今極段時間內服務器資源消耗極速增加,而後就當即沒有響應了
經過業務監控發現服務在極端的時間打開近10萬個 goroutine
以後持續了很長一段時間,cpu
佔用和 gc
都很正常, 內存方面能夠看出短期內分配了不少內存,可是沒有被釋放,gc
無法回收說明一直被佔用,
看到這裏我內心在想多是有個 goroutine
由於什麼緣由致使沒法結束形成的事故吧,
而後我再往下看(實際頁面是在須要滾動屏幕,第一屏只顯示了上面6個模塊),發現 open files 和 goroutine
的狀況一致,而且以後的數據忽然中斷,
中斷是由於服務沒法影響,也就沒法採集服務的信息了。
goroutine
並不會佔用 open files,一個http服務致使這種狀況大概只能是網絡鏈接過多,咱們遭受攻擊了嗎……
顯然是沒有的否則cpu不能很正常,那就是有可能請求沒法響應,什麼緣由致使呢?
使用 lsof -n | grep dsp | wc -l
命令去服務器查找服務打開文件數,確實在六萬五千多,
經過 cat /proc/30717/limits
發現 Max open files 65535 65535 files
,
配置的最大打開文件數只有 65535,使用 lsof -n | grep dsp |grep TCP | wc -l
發現數據和以前接近,只小了幾個,那是日誌文件佔用的。
查看日誌發現大量 http: Accept error: accept tcp 172.17.191.231:8090: accept4: too many open files; retrying in 1s
錯誤。
這些數據幫助我快速定位確實是有請求發送到服務器,服務器沒法響應致使短期內佔用不少文件打開數,致使系統限制沒法創建新的鏈接。
這裏要說一下,即便客戶端斷開鏈接了,服務器鏈接仍是沒有辦法關閉,由於 goroutine
沒有辦法關閉, 除非本身退出。
找到緣由了,服務無法響應,無法經過現場查找問題了,先從新啓動一下服務,恢復業務在查找代碼問題。
接下來就是查找代碼問題了,期間又出現了一次故障,當即重啓服務,恢復業務。
經過幾個小時分析代碼邏輯,終於有了進展,發現上面的示例代碼邏輯塊致使讀鎖重入,存在死鎖風險,這種死鎖的碰撞機率很是低,
以前說過咱們的緩存是讀多寫少的場景,若是隻是讀取數據,上面的代碼不會有任何問題,咱們一天刷新緩存的次數也不過百餘次而已。
看一下究竟發生了什麼致使的死鎖吧:
cache.Get
獲取一個 chan
, 在 cache.Get
裏面有一個 goroutine
讀取數據只有加了讀寫鎖,只有 goroutine
關閉纔會釋放for i := range c.Get(next) {
遍歷 chan
時 goroutine
不會結束,也就說讀鎖沒有被釋放c.XX(i)
方法,在該方面裏面也加了讀鎖, 造成了讀鎖重入的場景,可是該放執行週期很短,執行完就會立刻釋放好吧,這樣的流程並無造成死鎖,什麼狀況下致使的死鎖呢,接着看一下一個場景:
cache.Get
獲取一個 chan
, 在 cache.Get
裏面有一個 goroutine
讀取數據只有加了讀寫鎖,只有 goroutine
關閉纔會釋放for i := range c.Get(next) {
遍歷 chan
時 goroutine
不會結束,也就說讀鎖沒有被釋放c.XX(i)
方法,該方法申請讀鎖,由於寫鎖在等待,因此任何讀鎖都將等待寫鎖釋放後才能添加成功cache.Get
裏面的 goroutine
沒法退出,沒法釋放讀鎖c.XX(i)
等待寫鎖釋放重點看第三步,這裏是關鍵,由於在兩個嵌套的讀鎖中間申請寫鎖,致使死鎖發生,找到緣由修復起來很簡單的,
調整 cache.Get
加鎖的方法,把 c.data
賦值給一個臨時變量 data
, 在這段代碼先後加鎖和釋放鎖,鎖的代碼塊更小,時間更短
c.data
單獨拷貝是安全的,那怕是指針數據,由於每次刷新緩存都會給 c.data
從新賦值,分配新的內存空間。
package cache import "sync" type Cache struct { lock sync.RWMutex data []int // 實際數據比這個複雜不少有不少維度 x int } func (c *Cache) XX(i int) int{ c.lock.RLock() defer c.lock.RUnlock() if i >c.x { return i } return 0 } func (c *Cache) Get(next chan struct{}) chan int { ch := make(chan int, 1) go func() { defer close(ch) c.lock.RLock() data := c.data c.lock.RUnlock() // 篩選數據, 簡單寫一個篩選過程 for i := range data { if data[i] > 10 { ch <- i if _, ok := <-next; !ok { return } } } }() return ch }
修復以後的業務狀態:
用程序復現一下上面的場景能夠嗎,好像有點難,我寫了一個簡單的復現代碼,以下:
package main import ( "fmt" "runtime" "sync" ) var l = sync.RWMutex{} func main() { var wg sync.WaitGroup wg.Add(2) c := make(chan int) go func() { l.RLock() // 讀鎖1 defer l.RUnlock() fmt.Println(1) c <- 1 fmt.Println(2) runtime.Gosched() fmt.Println(3) b() fmt.Println(4) wg.Done() }() go func() { fmt.Println(5) <-c fmt.Println(6) l.Lock() fmt.Println(7) fmt.Println(8) defer l.Unlock() fmt.Println(9) wg.Done() }() go func() { i := 1 for { i++ } }() wg.Wait() } func b() { fmt.Println(10) l.RLock() // 讀鎖2 fmt.Println(11) defer l.RUnlock() fmt.Println(12) }
這段程序的輸出(受 goroutine
運行時影響在輸出數字3以前會有些許差別):
1 5 6 2 3 10
分析一下這個運行流程吧:
fmt.Println(1)
以前, 狀態加讀鎖1goroutine
啓動,fmt.Println(5)
, 狀態加讀鎖1c <- 1
, 狀態加讀鎖1<-c
fmt.Println(6)
, 狀態加讀鎖1fmt.Println(2)
, 狀態加讀鎖1goroutine
runtime.Gosched()
, 狀態加讀鎖1l.Lock()
, 等待讀鎖1釋放, 狀態加讀鎖一、寫鎖等待goroutine
執行 fmt.Println(3)
與 b()
, 狀態加讀鎖一、寫鎖等待fmt.Println(10)
, 申請讀鎖2,等待寫鎖釋放, 狀態加讀鎖一、寫鎖等待、讀鎖2等待func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
申請寫鎖時會在 rw.readerCount
讀數量變量上自增長 1,若是結果小於 0,當前讀鎖進入修改等待讀鎖喚醒信號,
單獨看着一個方法會比較懵,爲啥讀的數量會小於0呢,接着看寫鎖。
func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }
申請寫鎖時會先加上互斥鎖,也就是有其它寫的客戶端的話會等待寫鎖釋放才能加上,具體實現看互斥鎖的代碼,
而後在 rw.readerCount
上自增一個極大的負數 1 << 30
, 讀寫鎖這裏也就限制了咱們的同時讀的進程不能超過這個值。
而後在結果上加上 rwmutexMaxReaders
也就是 atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
獲得實際讀客戶端的數量
若是讀的客戶端不等於0,就在 rw.readerWait
自增讀客戶端的數量,以後陷入睡眠,等待 rw.writerSem
喚醒。
分析了這兩段代碼咱們就能明白,寫鎖等待或者添加時,讀鎖無法添加上
func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false) } } if race.Enabled { race.Enable() } }
釋放讀鎖,先在 rw.readerCount
減 1,而後檢查讀客戶端是否小於0,若是小於0說明有寫鎖在等待,
在 rw.readerWait
上減1,這個變量記錄的是寫等待讀客戶端的數量,若是沒有須要等待的讀客戶端了,就通知 rw.writerSem
喚醒寫鎖
func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } }
寫鎖在釋放時會給 rw.readerCount
自增 rwmutexMaxReaders
還原真實讀客戶端數量。for i := 0; i < int(r); i++ {
用來喚醒全部的讀客戶端,由於在寫鎖的時候,申請讀鎖的客戶端會被計數,可是都會陷入睡眠狀態。
之前特別強調過讀鎖重入致使死鎖的問題,並且這個問題很是難在業務代碼裏面復現,觸發概率很低,
編譯和運行時都沒法檢測這種狀況,因此千萬不能陷入讀鎖重入的嵌套使用的狀況,否者問題很是難以排查。
關於加鎖的幾個小經驗:
defer
釋放鎖。