go-fork-joingit
什麼是ForkJoingithub
接觸到ForkJoin
框架是由於學習Java
中的Stream
中的並行流,並行流的底層就是藉助ForkJoin
框架golang
ForkJoin
框架更適合如今CPU多核的機器,通常用於處理能夠將一個大任務分解成數個互相沒有依賴性的小任務,利用分治的策略,將任務不斷變小,將這些小任務分發到CPU的核中,將子任務並行運行,大大加快任務處理速度算法
具體的不少博客上說的都很不錯,這裏也不細說了,給幾個我當時學習的博客地址吧併發
任務偷竊app
任務偷竊算法其實就是Worker
能夠從本身對應的工做隊列頭部或者其餘Worker
的工做隊列尾部獲取元素。框架
每次在輪詢任務隊列時,先從每一個Worker
對應的任務隊列中去獲取任務,若是發現任務隊列此時沒有待處理的任務,那麼這個時候就會採用隨機選取策略,隨機選擇一個Worker
對應的工做隊列,去竊取它的任務異步
Join子任務結果post
在Java中須要去不斷的獲取任務的執行狀況,若是任務執行完就返回任務處理的結果;而在Golang中,因爲chan的存在,使得Java的Future模式很是容易實現,只須要任務Join的時候去讀取通道就能夠,由於當咱們把chan的cap設置爲1時,若是通道中沒有數據,讀取一方是會被阻塞等待的性能
func (f *ForkJoinTask) Join() (bool, interface{}) {
for {
select {
case data, ok := <-f.result:
if ok {
return true, data
}
case <-f.ctx.Done():
panic(f.taskPool.err)
}
}
}
複製代碼
任務隊列
對任務隊列進行遍歷操做。任務隊列不止一個,而是存在多個任務隊列,每次都會從這些任務隊列中獲取一個任務出來,若是任務存在則將任務包裝成一個結構體;在獲取到任務後,就是獲取一個任務的執行者worker
了,隨後將包裝好的任務送入Worker
的chan通道中異步發送任務
func (fp *ForkJoinPool) run(ctx context.Context) {
go func() {
wId := int32(0)
for {
select {
case <-ctx.Done():
fmt.Printf("here is err")
fp.err = fp.wp.err
return
default:
hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
if hasTask {
fp.wp.Submit(ctx, &struct {
T Task
F *ForkJoinTask
C context.Context
}{T: job, F: ft, C: ctx})
}
wId = (wId + 1) % fp.cap
}
}
}()
}
複製代碼
獲取一個Worker
在ForkJoin
初始化的時候,根據CPU核數對Worker
池進行初始化操做
func newPool(ctx context.Context, cancel context.CancelFunc) *Pool {
...
wCnt := runtime.NumCPU()
for i := 0; i < wCnt; i ++ {
w := newWorker(p)
w.run(ctx)
p.workers = append(p.workers, w)
}
...
}
複製代碼
隨後,處理任務確定須要一個對應的worker去執行的,所以每次在獲取worker
時,會先去worker
池中判斷是否還存在空閒的worker
,若是存在就直接獲取一個worker
,不然直接建立一個worker
進行接受任務
func (p *Pool) retrieveWorker(ctx context.Context) *Worker {
var w *Worker
idleWorker := p.workers
if len(idleWorker) >= 1 {
p.lock.Lock()
n := len(idleWorker) - 1
w = idleWorker[n]
p.workers = idleWorker[:n]
p.lock.Unlock()
} else {
if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
job: make(chan *struct {
T Task
F *ForkJoinTask
C context.Context
}, 1),
}
}
w.run(ctx)
}
return w
}
複製代碼
Worker
真正執行任務的對象,每一個worker
綁定一個goruntine
,而且有一個chan
通道,用於異步接收任務以及在goruntine
中異步將任務取出並執行;當任務執行完後,將worker
返回到worker
池中
func (w *Worker) run(ctx context.Context) {
go func() {
var tmpTask *ForkJoinTask
defer func() {
if p := recover(); p != nil {
w.pool.panicHandler(p)
if tmpTask != nil {
w.pool.err = p
close(tmpTask.result)
}
}
}()
for {
select {
case <-ctx.Done():
fmt.Println("An exception occurred and the task has stopped")
return
default:
for job := range w.job {
if job == nil {
w.pool.workerCache.Put(w)
return
}
tmpTask = job.F
job.F.result <- job.T.Compute()
w.pool.releaseWorker(w)
}
}
}
}()
}
複製代碼
任務偷竊算法
目前v0.1的任務偷竊算法並不能說像Java的ForkJoin那樣,支持兩個worker同時從一個隊列中獲取任務,而是在獲取任務的時候鎖住整個隊列,所以併發性能不太好,目前正在採用CAS去替換悲觀鎖,實現兩個Worker可同時讀取一個隊列中的數據,若是兩Worker
同時向一個長度只有1的任務隊列獲取元素,則樂觀鎖上升爲悲觀鎖進行控制
Worker數量控制
目前的Worker數量會隨着任務的不斷分解而不斷建立,若是任務分解過深可能會致使建立大量的Worker
,所以還須要繼續理解ForkJoin
的關於線程資源的調度