如何將golang的併發編程運用到實際開發

前言:這幾天在寫一個工具腳本分析線上的大量的日誌文件,原本應該是索然無味的一個工做,可是本着作到極致的原則,激發了我不斷思考如何優化。本文將從開發過程當中的最開始版本,一點點講解優化的過程,最終用golang實現了一個相似java的worker線程池,收穫滿滿。java

一,無腦開goroutine階段

1,任務背景 這個工具的做用簡單介紹以下:首先是線上的日誌量是很是龐大的,而後要去讀取日誌文件的內容,而後一條條日誌項分析,匹配出想要的日誌項寫入文件,再提取該文件中的數據計算。日誌項格式模擬數據以下:golang

{
 "id":xx,"time":"2017-11-19","key1":"value1","key2":"value2".......  
}
複製代碼

2,無腦開gorountine 先給下代碼(只放核心代碼,省略文件操做和異常錯誤處理),再來講說這個階段的思路bash

var wirteChan = make(chan []byte) //用於寫入文件
var waitgroup sync.WaitGroup //用於控制同步
func main(){
    //省略寫入文件的打開操做,畢竟咱們主要講併發這塊
    
    //初始化寫入的channel
	InitWriter(outLogFileWriter)
	//接下來去遍歷每一個日誌文件,每讀出一個日誌項就開一個gorountine去處理
	for _,file := range logDir {
	    if file.IsDir() {
			continue
		} else {
			HandlerFile(arg.dir + "/" + file.Name()) //處理每一個文件
		}
	}
}
/**
 * 初始化Writer的channel
 */
func InitWriter(outLogFileWriter *bufio.Writer) {
	go func() {
		for data := range wirteChan {
			nn, err := outLogFileWriter.Write(data)
		}
	}()
}
//處理每一個文件,而後開G去處理每一個日誌項
func HandlerFile(fileName string) {
	file, err := os.Open(fileName)
	defer file.Close()
	br := bufio.NewReader(file)
	for {
		data, err := br.ReadBytes('\n')
		if err == io.EOF {
			break
		} else {
			go Handler(data) //每次開一個G去處理,處理完寫入writeChannel
		}
	}
}
複製代碼

解析:寫博客很不喜歡放大篇幅代碼,因此上面給的只是重要的代碼,上面代碼註釋有說到的也不重複說了。好了,咱們來想一想上面的代碼有什麼問題?咱們如今就假設咱們就只有一個很是大的文件,文件中每一個記錄項都無腦開一個G去執行。那麼,運行一下,咱們會發現,好慢呀~。問題出在哪裏呢?首先咱們沒法控制G的數量,其第二天志文件很是大,這樣運行下來,G的數量是很是龐大的,多個G要往一個channel中寫數據,那麼也會發生嚴重的阻塞。種種緣由,致使了這個方法是不適用的併發

二,加入帶緩衝的任務隊列

1,任務隊列 在上面咱們說到,咱們沒法控制任務的數量,那麼,我在這裏就加入了一個任務隊列,來對任務進行排隊,同時能夠控制任務的數量。上代碼:函數

/**
 * Job結構體,包含要處理的數據和處理函數(這個可根據須要修改)
 */
type Job struct {
	Data []byte
	Proc func([]byte)
}
//Job隊列,存儲要作的Job,將每一個任務打包成Job發送到這裏
var JobQueue chan Job = make(chan Job, arg.maxqueue)
//啓動處理函數處理
func Handler(Data []byte) {
    for range job := <-Queue {
        job.Proc(Data)
    }
}
複製代碼

解析:在這個時候,抽象出來了任務模型Job,因爲函數調用其實就是函數地址加函數參數,因此咱們能夠將處理函數也放進Job中。而後讓處理函數去處理就好了。想到這裏,稍微有點佩服本身了,接着興致勃勃的運行一下。嗯,好像沒快多少(其實這個取決了你的處理函數,就是Job中的Proc)。What?冷靜下來分析一下,真以爲本身真可愛。我僅僅是對任務進行了包裝,而後用了一個帶緩衝的任務隊列,因爲建立的Job遠遠大於單個M的處理能力,帶緩衝只是稍微把問題拖後了一點。工具

三,Job/Worker模型

其實寫到這裏,內心對如何優化已經有點B數了。我想起了java中的線程池的概念,我能夠創建一個線程池,而後池中包含多個worker(數量能夠指定),每一個worker去隊列中取任務處理,處理完則繼續取任務。同時爲了提升通用性,參數類型都改成了interface{}。好了,接下來看看代碼,這裏的代碼都很關鍵,因此就所有放上來了學習

type Job struct {
	Data interface{}
	Proc func(interface{})
}
//Job隊列,存儲要作的Job
var JobQueue chan Job = make(chan Job, arg.maxqueue)
//Woker,用來從Job隊列中取出Job執行
type Worker struct {
	WokerPool  chan chan Job //表示屬於哪一個Worker池,同時接收JobChannel註冊
	JobChannel chan Job      //任務管道,經過這個管道獲取任務執行
	Quit       chan bool     //用來中止Worker
}
//新建一個Worker,須要傳入Worker池參數
func NewWorker(wokerPool chan chan Job) Worker {
	return Worker{
		WokerPool:  wokerPool,
		JobChannel: make(chan Job),
		Quit:       make(chan bool),
	}
}
//Worker的啓動:包含:(1) 把該worker的JobChannel註冊到WorkerPool中去  (2) 監聽JobChannel上有沒有新的任務到來 (3) 監聽是否受到關閉的請求
func (worker Worker) Start() {
	go func() {
		for {
			worker.WokerPool <- worker.JobChannel //每次作完任務後就從新註冊上去通知本worker又處於可用狀態了
			select {
			case job := <-worker.JobChannel:
				job.Proc(job.Data)
			case quit := <-worker.Quit: //接收到關閉信息,直接退出便可
				if quit {
					return
				}
			}
		}
	}()
}
//Worker的關閉:只要發送一個關閉信號便可
func (worker Worker) Stop() {
	go func() {
		worker.Quit <- true
	}()
}
//管理Worker的調度器,包含最大worker數量和workerpool
type Dispatcher struct {
	MaxWorker  int
	WorkerPool chan chan Job
}

//啓動一個調度器
func (dispatcher *Dispatcher) Run() {
	//啓動maxworker個worker
	for i := 0; i < dispatcher.MaxWorker; i++ {
		worker := NewWorker(dispatcher.WorkerPool)
		worker.Start()
	}
	//接下來啓動調度服務
	go dispatcher.dispatch()
}

func (dispatcher *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				jobChannel := <-dispatcher.WorkerPool //獲取一個可用的worker
				jobChannel <- job                     //將該job發送給該worker
			}(job)
		}
	}
}

//新建一個調度器
func NewDispatcher(maxWorker int) *Dispatcher {
	workerPool := make(chan chan Job, maxWorker)
	return &Dispatcher{
		WorkerPool: workerPool,
		MaxWorker:  maxWorker,
	}
}
複製代碼

解析:代碼中每句都註釋得很是清楚了,就不重複了。咱們能夠經過這樣來開啓這個模型:dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()。有一點須要強調的是,處理函數這塊須要根據本身的業務去寫,而後和數據打包成Job再發給JobQueue就好了。接着我運行了個人腳本,幾十G的文件通過三輪的處理函數(就是說我須要三輪處理,每輪處理都根據上輪的結果)耗時在三分鐘到四分鐘之間,並且CPU佔用率等也不高。對於耗時高的,可使用pprof工具分析一下到底慢在了哪裏優化

四,總結

由於以前剛學了golang的併發原理,而後恰好有這個任務,因而本身就開始了從零一點點的摸索和優化,整個工具寫完,本身對golang的併發的理解又更加的深刻了,並且對鎖,文件操做等也熟悉了起來。收穫不少東西,因此我鼓勵學習一個新東西,不能只懂原理,還要本身多動手一下,這樣才牢固。其實這個模型仍是存在一些不足之處,後續會繼續優化。 期間也參考了一些很不錯的博客,在這裏也表示感謝。ui

相關文章
相關標籤/搜索