背景:html
讀取一個500w行的大文件,將每一行的數據讀取出來作數據整合歸併以後,再按照必定的邏輯和算法進行處理後存入redis。golang
文件格式:redis
url地址 用戶32位標識 點擊次數算法
http://jingpin.pgc.panda.tv/hd/xiaopianpian.html aaaaaaaaaaaaaaaaaaaa 5併發
具體場景: 函數
本節先看一下大文件處理最簡單的狀況,即在讀文件的過程當中針對文件每一行都開啓一個協程來作數據合併,看看這種狀況下的定位以及優化的思路。高併發
問題現象:oop
若是將整個文件串行執行來作數據整合的話,只須要4 or 5s就能夠完成,可是每行併發處理卻須要幾十秒到幾十分鐘不等。優化
代碼以下:網站
simple.go
package main import ( "bufio" "fmt" "io" "io/ioutil" "log" "net/http" _ "net/http/pprof" "os" "singleflight" "strconv" "strings" "sync" "time" "utils" ) var ( wg sync.WaitGroup mu sync.RWMutex //全局鎖 single = &singleflight.Task{} ) func main() { defer func() { if err := recover(); err != nil { fmt.Println(err) } }() go func() { log.Println(http.ListenAndServe("localhost:8080", nil)) }() file := "/data/origin_data/part-r-00000" if fp, err := os.Open(file); err != nil { panic(err) } else { start := time.Now() defer fp.Close() defer func() { //時間消耗 fmt.Println("time cost:", time.Now().Sub(start)) }() //統計下每一個url的點擊用戶數 hostNums := hostsStat() buf := bufio.NewReader(fp) hostToFans := make(map[string]utils.MidList) //[url][]用戶id for { line, err := buf.ReadString('\n') if err != nil { if err == io.EOF { //遭遇行尾 fmt.Println("meet the end") break //跳出死循環 } panic(err) } //每一行單獨處理 wg.Add(1) go handleLine(line, hostToFans, hostNums) } wg.Wait() fmt.Println("*************************handle file data complete************************") } } //處理每一行的數據 func handleLine(line string, hostToFans map[string]utils.MidList, hostNums map[string]int) { defer wg.Done() line = strings.TrimSpace(line) components := strings.Split(line, "\t") //先判斷是不是合法網站的url schemes := strings.Split(components[0], "/") if utils.In_array(utils.ValidPlatforms, schemes[2]) == false { fmt.Println("invalid url: ", components[0]) return } mu.RLock() if _, ok := hostToFans[components[0]]; ok { mu.RUnlock() click_times, _ := strconv.Atoi(components[2]) mu.Lock() hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times) mu.Unlock() } else { //下一個url mu.RUnlock() startElement := false //用以標識是不是某個url統計的初始元素 //singleflight代碼 防止有多個相同url同時訪問時,該url對應的[]string尚未初始化,致使屢次make代碼的執行 single.Do(components[0], func() (interface{}, error) { mu.RLock() if _, ok := hostToFans[components[0]]; ok { //再判斷一遍, 防止高併發的情形下,多個相同url的寫map操做,都會進入從新分配空間的步驟 mu.RUnlock() return nil, nil } mu.RUnlock() mu.Lock() click_times, _ := strconv.Atoi(components[2]) hostToFans[components[0]] = utils.NewMidList(hostNums[components[0]]) hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times) mu.Unlock() startElement = true return nil, nil }) if !startElement { mu.Lock() click_times, _ := strconv.Atoi(components[2]) hostToFans[components[0]] = hostToFans[components[0]].Append(components[1], click_times) mu.Unlock() } } } //針對url:用戶的統計文件 該文件列出每一個url對應的獨立用戶個數 func hostsStat() map[string]int { hostStats := "./scripts/data/stat.txt" bytes, _ := ioutil.ReadFile(hostStats) //....some code.... return hostNums }
執行一下這個代碼以後,發現程序執行不久,內存佔用就噌噌噌的漲到90%+,過了一會cpu佔用降低到極低,可是load一直保持再過載的水平,看下圖。因此大膽猜想由於gc致使進程夯住了。以後用pprof和gctrace也印證了這個想法,若是對pprof和gctrace不太清楚的同窗能夠看筆者以前的文章 golang 如何排查和定位GC問題。
其實在筆者最開始的代碼中,已經有一些地方在注意下降內存的消耗了,好比說在初始化每一個url對應的用戶id集合時借鑑groupcache的singleflight,確保不會屢次重複申請空間;好比url對應的用戶id切片,先算好具體大小再make。雖然代碼很簡單, 可是上文中的代碼顯然仍是有一些問題。
經過pprof,我發現程序執行的過程當中,大部分時間都消耗在gc上,以下圖。劃紅線的都是和gc有關的函數。因此問題就變成排查爲何gc會這麼長時間。
大部分狀況下gc被致使的緣由是分配的內存達到某個閾值,很顯然,本例屬於這種狀況,前文提到內存佔用穩定在90+。那麼爲何這個進程會佔用這麼多的內存呢?筆者一直試圖用pprof的heap和profile來分析出這個問題,可是一直無果。直到有一次經過pprof查看goroutine的狀態時,發現當前正在工做的協程高達幾十萬,甚至有時能到達接近150w的量級, 以下圖。這樣就可以解釋一部分問題了,單個協程若是是3K大小,那麼當協程數量到達百萬時,就算協程裏什麼都沒有也會佔用4G的內存。而筆者在作實驗的機器只有8g的內存,因此確定會出現內存被吃滿頻繁gc致使進程夯住。
因此第一步,確定是要控制一下當前的協程數,不能無限的增加。在讀取文本內容的loop裏,加上對行數的計算,這樣每到一個閾值時,就能夠休息一下,暫緩下協程增加的速率。加上限制以後,進程不會再卡死,整個的執行時間穩定在20~30s之間。
iterator := 0 for { line, err := buf.ReadString('\n') if err != nil { if err == io.EOF { //遭遇行尾 fmt.Println("meet the end") break //跳出死循環 } panic(err) } //每一行單獨處理 這裏須要加邏輯防止併發過大致使大量佔用cpu和內存,使得整個進程由於gc夯住, //能夠每讀10w行左右就休息一會下降一下程序同時在線的協程 iterator++ if iterator <= 120000 { wg.Add(1) go handleLine(line, hostToFans, hostNums) } else { iterator = 1 <-time.After(130 * time.Millisecond) //暫停須要5s左右 wg.Add(1) go handleLine(line, hostToFans, hostNums) } } wg.Wait()
對比一下串行執行的結果,能夠發現雖然如今併發執行已經穩定,可是就算刨去休眠時間,和串行執行相比仍是慢不少,因此確定還有優化的空間。這個時候pprof的profile以及heap分析就有了施展拳腳的地方了。看下面兩張圖,分別是內存消耗和cpu消耗圖:
cpu耗時分析
內存佔用分析
上面只截取了一部分的圖,可是從中咱們已經可以找到須要優化的地方了。能夠看到strings.Split函數耗時和耗內存都很嚴重,主要是它會生成slice。分析一下前文的代碼能夠發現至少判斷url是不是合法網站這一塊的strings.Split是能夠不要的。這裏不光會有額外的運行時間還會生成slice佔用內存致使gc。因此對這塊功能進行改造:
//先判斷是不是合法平臺的主播 if is_valid_platform(utils.ValidPlatforms, components[0]) == false { fmt.Println("invalid url: ", components[0]) return nil, errors.New("invalid url") } func is_valid_platform(platforms []string, hostUrl string) bool { for _, platform := range platforms { if strings.Index(hostUrl, platform) != -1 { return true } } return false }
這樣的就能夠減小沒必要要的slice引發的分配空間。改完以後再執行,整個任務穩定在15s左右,減去休眠時間的話就是10s。到這裏其實已經算是優化的差很少了,可是其實還有一個地方能夠看一下。
上面的heap分析圖能夠看到其實singleflight.(*Task).Do函數佔用更多的內存,而且也佔用了不少的cpu時間,以下:
除了一些原生函數以外,就屬它最高了,並且該函數也只會在每次新的url出現的時候纔會執行。能夠看下singleflight的主要結構體,會發現使用了指針變量,而指針變量在gc的時候會致使二次遍歷,使得整個gc變慢。雖然筆者此處用的singleflight確定是不能修改, 可是若是有可能的話,儘可能仍是要少用指針。
這一節只能算是簡單講了下優化的思路和過程,但願下一節能把完整版的優化方案寫出來。