題目內容:給定陌陌一段時間的Nginx AccessLog(多個文件,估計66G左右),以最快的方式找到訪問次數最多的5個IP。提交腳本或是可執行程序,約定以命令行參數的形式傳入文件所在路徑。按照次數降序輸出5個IP,每一個IP一行。 node
已知說明:
1. Linux Centos7服務器,配置限制在內存2G,4核CPU
2. Nginx access log 放置在指定目錄下, 文件內容格式
'$remote\_addr\\t-\\t$remote_user\t$time_local\t'
'$http\_x\_forwarded\_for\\t$tcpinfo_rtt\t$tcpinfo\_rttvar\\t$tcpinfo_snd_cwnd\t$tcpinfo_rcv_space\t'
'$request\_method\\t$host\t$request\_uri\\t$server_protocol\t$request\_length\\t$request_time\t'
'$status\\t$body_bytes_sent\t$bytes_sent\t'
'$http\_referer\\t$http_user_agent\t'
'$connection\\t$connection_requests\t'
'$upstream\_addr\\t$upstream_status\t$upstream\_response\_length\\t$upstream_response_time\t'
'$scheme\\t$ssl_session_reused';
10.0.0.1 - - 22/Oct/2019:00:00:05 +0800 - 45250 5000 20 14600 POST api.immomo.com /v1/welcome/logs?fr=123456789 HTTP/1.1 567 0.029 200 96 651 - MomoChat/8.20.2 ios/1878 (iPhone 7 Plus; iOS 11.0.3; zh_CN; iPhone9,2; S1) 93983365152 15 10.0.0.1:9000 200 101 0.029 https .
3. 不限制實現語言,可是不能依賴任何開源的第三方依賴或者服務
3. 題目輸入參數只有一個就是: Accesslog的文件夾路徑
4. 題目輸出須要在程序運行路徑下建立result的文件,文件內容的格式是:按照訪問量倒排的5個IP和對應的訪問次數。
好比:
10.12.12.1 10000
102.12.12.2 9999
...
評判規則:
統計準確且耗時最短者勝出
2核4G 機械硬盤ios
本文下方解題代碼是使用思路1 思路1: 2.1 直接將IP變成十進制 hash算次數。2.2 mod N 進行堆排序 2.3 進行N個堆TOP10 排序聚合 | 2.4 輸出聚合後的堆TOP10 思路2: 是否能夠組合咱們的超大數字- 組合方式 出現次數+十進制數字、堆排序、直接就能獲得結果集-避免我自建結構體
注意本題目給的機器配置是2核4G、 對測試數據(5GB)進行 以下算法。發現堆排序佔用耗時近300ms 左右、 processLine 和CalculateIp 耗時幾秒,可優化點不多。 ReadLine 佔比耗時90%、那麼本文重點討論的就是ReadLine 讀取文件IO 的性能! 咱們若是進行多線程讀取會不會更快那?繼續往下看~
1. 磁盤IO 單線程順序讀取是最快的?why ? 若是多線程讀取,磁盤的磁頭要不斷從新尋址,致使讀取速度慢於單線程 2. Linux會對順序讀取 進行預讀! 3. 隨機讀取多線程大概會比單線程快N倍。(取決於線程數量) 4. 多線程IO,咱們讀取的仍是同一文件,就算咱們使用seek+w/r 方式讀取的話,須要加鎖。 5. 咱們每一個線程打開一套文件描述符(file 對象),可否提升IO?咱們在覈心中有N個file對象,可是隻有一個inode 對象,文件讀寫最終是落到inode 完成。因此不會提升IO
結論:在咱們處理大文件讀取的時候,單線程要優於多線程的~算法
package main import ( "bufio" "container/heap" "fmt" "io" "os" "runtime" "strconv" "strings" "time" ) const N = 256 //構建N個堆 var GlobalIp map[int64]*IpQueue //而後N個堆 獲取TOP10 var GlobalNum map[int64]int64 //次數 func ReadLine(filePth string, hookfn func([]byte)) error { f, err := os.Open(filePth) if err != nil { return err } defer f.Close() bfRd := bufio.NewReader(f) for { line, err := bfRd.ReadBytes('\n') hookfn(line) if err != nil { if err == io.EOF { return nil } return err } } } //初始化全局變量 func initHeap() { GlobalNum = make(map[int64]int64) GlobalIp = make(map[int64]*IpQueue) for i := 0; i <= N; i++ { q := make(IpQueue, 1) q[0] = &Item{ip: "0.0.0.0", num: -1} heap.Init(&q) GlobalIp[int64(i)] = &q //堆給到全局Global } } //2.1 直接將IP變成十進制 hash算次數 func processLine(line []byte) { var result int for i := 7; i <= 15; i++ { if line[i] == '\t' || line[i] == '-' { result = i break } } str := string(line[0:result]) ipv4 := CalculateIp(string(str)) GlobalNum[int64(ipv4)]++ } //2.2 mod N 進行堆排序 func handleHash() { //堆耗時開始 timestamp := time.Now().UnixNano() / 1000000 for k, v := range GlobalNum { heap.Push(GlobalIp[k%N], &Item{ip: RevIp(k), num: int64(v)}) } edgiest := time.Now().UnixNano() / 1000000 fmt.Println("堆耗時總時間ms:", edgiest-timestamp) } //2.3 進行N個堆TOP10 排序聚合 func polyHeap() { //聚合N 個 小堆的top10 for i := 0; i < N; i++ { iterator := 10 if iterator > GlobalIp[int64(i)].Len() { iterator = GlobalIp[int64(i)].Len() } for j := 0; j < iterator; j++ { //寫入到堆棧N item := heap.Pop(GlobalIp[int64(i)]).(*Item) heap.Push(GlobalIp[N], item) } } } //2.4 輸出聚合後的堆TOP10 func printResult() { result := 0 for result < 10 { item := heap.Pop(GlobalIp[N]).(*Item) fmt.Printf("出現的次數:%d|IP:%s \n", item.num, item.ip) result++ } } //string 轉IP func CalculateIp(str string) int64 { x := strings.Split(str, ".") b0, _ := strconv.ParseInt(x[0], 10, 0) b1, _ := strconv.ParseInt(x[1], 10, 0) b2, _ := strconv.ParseInt(x[2], 10, 0) b3, _ := strconv.ParseInt(x[3], 10, 0) number0 := b0 * 16777216 //256*256*256 number1 := b1 * 65536 //256*256 number2 := b2 * 256 //256 number3 := b3 * 1 //1 sum := number0 + number1 + number2 + number3 return sum } //ip 轉string func RevIp(ip int64) string { ip0 := ip / 16777216 //高一位 ip1 := (ip - ip0*16777216) / 65536 ip2 := (ip - ip0*16777216 - ip1*65536) / 256 ip3 := ip - ip0*16777216 - ip1*65536 - ip2*256 return fmt.Sprintf("%d.%d.%d.%d", ip0, ip1, ip2, ip3) } type Item struct { ip string num int64 } type IpQueue []*Item func (pq IpQueue) Len() int { return len(pq) } func (pq IpQueue) Less(i, j int) bool { return pq[i].num > pq[j].num } func (pq IpQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } func (pq *IpQueue) Push(x interface{}) { item := x.(*Item) *pq = append(*pq, item) } func (pq *IpQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] *pq = old[0 : n-1] return item } func main() { runtime.GOMAXPROCS(2) timestamp := time.Now().UnixNano() / 1000000 //初始化 initHeap() //串行 讀取文件 寫入到hash map _ = ReadLine("/Users/admin/Downloads/api.immomo.com-access_10-01.log", processLine) //多個小堆 handleHash() //聚合堆 polyHeap() //打印結果 printResult() fmt.Println(time.Now().UnixNano()/1000000 - timestamp) }
感謝 信惠敏(陌陌)、李耕勇 的熱心支持與討論、api