Golang實現ForkJoin小文

如何用Golang實現一個簡單的ForkJoin框架

扔上個人項目地址

go-fork-joingit

簡易原理

  • 什麼是ForkJoingithub

    接觸到ForkJoin框架是由於學習Java中的Stream中的並行流,並行流的底層就是藉助ForkJoin框架golang

    ForkJoin框架更適合如今CPU多核的機器,通常用於處理能夠將一個大任務分解成數個互相沒有依賴性的小任務,利用分治的策略,將任務不斷變小,將這些小任務分發到CPU的核中,將子任務並行運行,大大加快任務處理速度算法

    具體的不少博客上說的都很不錯,這裏也不細說了,給幾個我當時學習的博客地址吧併發

  • 任務偷竊app

    任務偷竊算法其實就是Worker能夠從本身對應的工做隊列頭部或者其餘Worker的工做隊列尾部獲取元素。框架

    每次在輪詢任務隊列時,先從每一個Worker對應的任務隊列中去獲取任務,若是發現任務隊列此時沒有待處理的任務,那麼這個時候就會採用隨機選取策略,隨機選擇一個Worker對應的工做隊列,去竊取它的任務異步

  • Join子任務結果post

    在Java中須要去不斷的獲取任務的執行狀況,若是任務執行完就返回任務處理的結果;而在Golang中,因爲chan的存在,使得Java的Future模式很是容易實現,只須要任務Join的時候去讀取通道就能夠,由於當咱們把chan的cap設置爲1時,若是通道中沒有數據,讀取一方是會被阻塞等待的性能

func (f *ForkJoinTask) Join() (bool, interface{}) {
	for {
		select {
		case data, ok := <-f.result:
			if ok {
				return true, data
			}
		case <-f.ctx.Done():
			panic(f.taskPool.err)
		}
	}
}
複製代碼

核心代碼

任務隊列

對任務隊列進行遍歷操做。任務隊列不止一個,而是存在多個任務隊列,每次都會從這些任務隊列中獲取一個任務出來,若是任務存在則將任務包裝成一個結構體;在獲取到任務後,就是獲取一個任務的執行者worker了,隨後將包裝好的任務送入Worker的chan通道中異步發送任務

func (fp *ForkJoinPool) run(ctx context.Context) {
	go func() {
		wId := int32(0)
		for {
			select {
			case <-ctx.Done():
				fmt.Printf("here is err")
				fp.err = fp.wp.err
				return
			default:
				hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
				if hasTask {
					fp.wp.Submit(ctx, &struct {
						T Task
						F *ForkJoinTask
						C context.Context
					}{T: job, F: ft, C: ctx})
				}
				wId = (wId + 1) % fp.cap
			}
		}
	}()
}
複製代碼

獲取一個Worker

ForkJoin初始化的時候,根據CPU核數對Worker池進行初始化操做

func newPool(ctx context.Context, cancel context.CancelFunc) *Pool {
	...
	wCnt := runtime.NumCPU()
	for i := 0; i < wCnt; i ++ {
		w := newWorker(p)
		w.run(ctx)
		p.workers = append(p.workers, w)
	}
	...
}
複製代碼

隨後,處理任務確定須要一個對應的worker去執行的,所以每次在獲取worker時,會先去worker池中判斷是否還存在空閒的worker,若是存在就直接獲取一個worker,不然直接建立一個worker進行接受任務

func (p *Pool) retrieveWorker(ctx context.Context) *Worker {

	var w *Worker

	idleWorker := p.workers

	if len(idleWorker) >= 1 {
		p.lock.Lock()
		n := len(idleWorker) - 1
		w = idleWorker[n]
		p.workers = idleWorker[:n]
		p.lock.Unlock()
	} else {
		if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
			w = cacheWorker.(*Worker)
		} else {
			w = &Worker{
				pool: p,
				job: make(chan *struct {
					T Task
					F *ForkJoinTask
					C context.Context
				}, 1),
			}
		}
		w.run(ctx)
	}
	return w
}
複製代碼

Worker

真正執行任務的對象,每一個worker綁定一個goruntine,而且有一個chan通道,用於異步接收任務以及在goruntine中異步將任務取出並執行;當任務執行完後,將worker返回到worker池中

func (w *Worker) run(ctx context.Context) {
	go func() {

		var tmpTask *ForkJoinTask

		defer func() {
			if p := recover(); p != nil {
				w.pool.panicHandler(p)
				if tmpTask != nil {
					w.pool.err = p
					close(tmpTask.result)
				}
			}
		}()

		for {
			select {
			case <-ctx.Done():
				fmt.Println("An exception occurred and the task has stopped")
				return
			default:
				for job := range w.job {
					if job == nil {
						w.pool.workerCache.Put(w)
						return
					}
					tmpTask = job.F
					job.F.result <- job.T.Compute()
					w.pool.releaseWorker(w)
				}
			}
		}
	}()
}
複製代碼

成果

benchtest

正在改進的地方

  • 任務偷竊算法

    目前v0.1的任務偷竊算法並不能說像Java的ForkJoin那樣,支持兩個worker同時從一個隊列中獲取任務,而是在獲取任務的時候鎖住整個隊列,所以併發性能不太好,目前正在採用CAS去替換悲觀鎖,實現兩個Worker可同時讀取一個隊列中的數據,若是兩Worker同時向一個長度只有1的任務隊列獲取元素,則樂觀鎖上升爲悲觀鎖進行控制

  • Worker數量控制

    目前的Worker數量會隨着任務的不斷分解而不斷建立,若是任務分解過深可能會致使建立大量的Worker,所以還須要繼續理解ForkJoin的關於線程資源的調度

相關文章
相關標籤/搜索