前言:這幾天在寫一個工具腳本分析線上的大量的日誌文件,原本應該是索然無味的一個工做,可是本着作到極致的原則,激發了我不斷思考如何優化。本文將從開發過程當中的最開始版本,一點點講解優化的過程,最終用golang實現了一個相似java的worker線程池,收穫滿滿。java
1,任務背景 這個工具的做用簡單介紹以下:首先是線上的日誌量是很是龐大的,而後要去讀取日誌文件的內容,而後一條條日誌項分析,匹配出想要的日誌項寫入文件,再提取該文件中的數據計算。日誌項格式模擬數據以下:golang
{
"id":xx,"time":"2017-11-19","key1":"value1","key2":"value2".......
}
複製代碼
2,無腦開gorountine 先給下代碼(只放核心代碼,省略文件操做和異常錯誤處理),再來講說這個階段的思路bash
var wirteChan = make(chan []byte) //用於寫入文件
var waitgroup sync.WaitGroup //用於控制同步
func main(){
//省略寫入文件的打開操做,畢竟咱們主要講併發這塊
//初始化寫入的channel
InitWriter(outLogFileWriter)
//接下來去遍歷每一個日誌文件,每讀出一個日誌項就開一個gorountine去處理
for _,file := range logDir {
if file.IsDir() {
continue
} else {
HandlerFile(arg.dir + "/" + file.Name()) //處理每一個文件
}
}
}
/**
* 初始化Writer的channel
*/
func InitWriter(outLogFileWriter *bufio.Writer) {
go func() {
for data := range wirteChan {
nn, err := outLogFileWriter.Write(data)
}
}()
}
//處理每一個文件,而後開G去處理每一個日誌項
func HandlerFile(fileName string) {
file, err := os.Open(fileName)
defer file.Close()
br := bufio.NewReader(file)
for {
data, err := br.ReadBytes('\n')
if err == io.EOF {
break
} else {
go Handler(data) //每次開一個G去處理,處理完寫入writeChannel
}
}
}
複製代碼
解析:寫博客很不喜歡放大篇幅代碼,因此上面給的只是重要的代碼,上面代碼註釋有說到的也不重複說了。好了,咱們來想一想上面的代碼有什麼問題?咱們如今就假設咱們就只有一個很是大的文件,文件中每一個記錄項都無腦開一個G去執行。那麼,運行一下,咱們會發現,好慢呀~。問題出在哪裏呢?首先咱們沒法控制G的數量,其第二天志文件很是大,這樣運行下來,G的數量是很是龐大的,多個G要往一個channel中寫數據,那麼也會發生嚴重的阻塞。種種緣由,致使了這個方法是不適用的併發
1,任務隊列 在上面咱們說到,咱們沒法控制任務的數量,那麼,我在這裏就加入了一個任務隊列,來對任務進行排隊,同時能夠控制任務的數量。上代碼:函數
/**
* Job結構體,包含要處理的數據和處理函數(這個可根據須要修改)
*/
type Job struct {
Data []byte
Proc func([]byte)
}
//Job隊列,存儲要作的Job,將每一個任務打包成Job發送到這裏
var JobQueue chan Job = make(chan Job, arg.maxqueue)
//啓動處理函數處理
func Handler(Data []byte) {
for range job := <-Queue {
job.Proc(Data)
}
}
複製代碼
解析:在這個時候,抽象出來了任務模型Job,因爲函數調用其實就是函數地址加函數參數,因此咱們能夠將處理函數也放進Job中。而後讓處理函數去處理就好了。想到這裏,稍微有點佩服本身了,接着興致勃勃的運行一下。嗯,好像沒快多少(其實這個取決了你的處理函數,就是Job中的Proc)。What?冷靜下來分析一下,真以爲本身真可愛。我僅僅是對任務進行了包裝,而後用了一個帶緩衝的任務隊列,因爲建立的Job遠遠大於單個M的處理能力,帶緩衝只是稍微把問題拖後了一點。工具
其實寫到這裏,內心對如何優化已經有點B數了。我想起了java中的線程池的概念,我能夠創建一個線程池,而後池中包含多個worker(數量能夠指定),每一個worker去隊列中取任務處理,處理完則繼續取任務。同時爲了提升通用性,參數類型都改成了interface{}。好了,接下來看看代碼,這裏的代碼都很關鍵,因此就所有放上來了學習
type Job struct {
Data interface{}
Proc func(interface{})
}
//Job隊列,存儲要作的Job
var JobQueue chan Job = make(chan Job, arg.maxqueue)
//Woker,用來從Job隊列中取出Job執行
type Worker struct {
WokerPool chan chan Job //表示屬於哪一個Worker池,同時接收JobChannel註冊
JobChannel chan Job //任務管道,經過這個管道獲取任務執行
Quit chan bool //用來中止Worker
}
//新建一個Worker,須要傳入Worker池參數
func NewWorker(wokerPool chan chan Job) Worker {
return Worker{
WokerPool: wokerPool,
JobChannel: make(chan Job),
Quit: make(chan bool),
}
}
//Worker的啓動:包含:(1) 把該worker的JobChannel註冊到WorkerPool中去 (2) 監聽JobChannel上有沒有新的任務到來 (3) 監聽是否受到關閉的請求
func (worker Worker) Start() {
go func() {
for {
worker.WokerPool <- worker.JobChannel //每次作完任務後就從新註冊上去通知本worker又處於可用狀態了
select {
case job := <-worker.JobChannel:
job.Proc(job.Data)
case quit := <-worker.Quit: //接收到關閉信息,直接退出便可
if quit {
return
}
}
}
}()
}
//Worker的關閉:只要發送一個關閉信號便可
func (worker Worker) Stop() {
go func() {
worker.Quit <- true
}()
}
//管理Worker的調度器,包含最大worker數量和workerpool
type Dispatcher struct {
MaxWorker int
WorkerPool chan chan Job
}
//啓動一個調度器
func (dispatcher *Dispatcher) Run() {
//啓動maxworker個worker
for i := 0; i < dispatcher.MaxWorker; i++ {
worker := NewWorker(dispatcher.WorkerPool)
worker.Start()
}
//接下來啓動調度服務
go dispatcher.dispatch()
}
func (dispatcher *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
go func(job Job) {
jobChannel := <-dispatcher.WorkerPool //獲取一個可用的worker
jobChannel <- job //將該job發送給該worker
}(job)
}
}
}
//新建一個調度器
func NewDispatcher(maxWorker int) *Dispatcher {
workerPool := make(chan chan Job, maxWorker)
return &Dispatcher{
WorkerPool: workerPool,
MaxWorker: maxWorker,
}
}
複製代碼
解析:代碼中每句都註釋得很是清楚了,就不重複了。咱們能夠經過這樣來開啓這個模型:dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
。有一點須要強調的是,處理函數這塊須要根據本身的業務去寫,而後和數據打包成Job再發給JobQueue就好了。接着我運行了個人腳本,幾十G的文件通過三輪的處理函數(就是說我須要三輪處理,每輪處理都根據上輪的結果)耗時在三分鐘到四分鐘之間,並且CPU佔用率等也不高。對於耗時高的,可使用pprof工具分析一下到底慢在了哪裏優化
由於以前剛學了golang的併發原理,而後恰好有這個任務,因而本身就開始了從零一點點的摸索和優化,整個工具寫完,本身對golang的併發的理解又更加的深刻了,並且對鎖,文件操做等也熟悉了起來。收穫不少東西,因此我鼓勵學習一個新東西,不能只懂原理,還要本身多動手一下,這樣才牢固。其實這個模型仍是存在一些不足之處,後續會繼續優化。 期間也參考了一些很不錯的博客,在這裏也表示感謝。ui