Go語言同步和異步執行多個任務封裝

同步適合多個連續執行的,每一步的執行依賴於上一步操做,異步執行則和任務執行順序無關(如從10個站點抓取數據)閉包

同步執行類RunnerAsync

支持返回超時檢測,系統中斷檢測

錯誤常量定義,task/err.go併發

package task

import "errors"


//超時錯誤
var ErrTimeout = errors.New("received timeout")

//操做系統系統中斷錯誤
var ErrInterrupt = errors.New("received interrupt")

實現代碼以下,task/runner_async.goapp

package task

import (
    "os"
    "os/signal"
    "time"
)

//同步執行任務
type RunnerAsync struct {
    //操做系統的信號檢測
    interrupt chan os.Signal

    //記錄執行完成的狀態
    complete chan error

    //超時檢測
    timeout <-chan time.Time

    //保存全部要執行的任務,順序執行
    tasks []func(id int)
}

//new一個RunnerAsync對象
func NewRunnerAsync(d time.Duration) *RunnerAsync {
    return &RunnerAsync{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
    }
}

//添加一個任務
func (this *RunnerAsync) Add(tasks ...func(id int)) {
    this.tasks = append(this.tasks, tasks...)
}

//啓動RunnerAsync,監聽錯誤信息
func (this *RunnerAsync) Start() error {

    //接收操做系統信號
    signal.Notify(this.interrupt, os.Interrupt)

    //執行任務
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回執行結果
    case err := <-this.complete:
        return err

        //超時返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//順序執行全部的任務
func (this *RunnerAsync) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }
        //執行任務
        task(id)
    }
    return nil
}

//判斷是否接收到操做系統中斷信號
func (this *RunnerAsync) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //中止接收別的信號
        signal.Stop(this.interrupt)
        return true
        //正常執行
    default:
        return false
    }
}

使用方法    異步

Add添加一個任務,任務爲接收int類型的一個閉包async

Start開始執行傷,返回一個error類型,nil爲執行完畢, ErrTimeout表明執行超時,ErrInterrupt表明執行被中斷(相似Ctrl + C操做)測試

 

測試代碼

task/runner_async_test.gothis

package task

import (
	"fmt"
	"os"
	"runtime"
	"testing"
	"time"
)

func TestRunnerAsync_Start(t *testing.T) {

	//開啓多核
	runtime.GOMAXPROCS(runtime.NumCPU())

	//建立runner對象,設置超時時間
	runner := NewRunnerAsync(8 * time.Second)
	//添加運行的任務
	runner.Add(
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
		createTaskAsync(),
	)

	fmt.Println("同步執行任務")

	//開始執行任務
	if err := runner.Start(); err != nil {
		switch err {
		case ErrTimeout:
			fmt.Println("執行超時")
			os.Exit(1)
		case ErrInterrupt:
			fmt.Println("任務被中斷")
			os.Exit(2)
		}
	}

	t.Log("執行結束")

}

//建立要執行的任務
func createTaskAsync() func(id int) {
	return func(id int) {
		fmt.Printf("正在執行%v個任務\n", id)
		//模擬任務執行,sleep兩秒
		//time.Sleep(1 * time.Second)
	}
}

執行結果spa

同步執行任務
正在執行0個任務
正在執行1個任務
正在執行2個任務
正在執行3個任務
正在執行4個任務
正在執行5個任務
正在執行6個任務
正在執行7個任務
正在執行8個任務
正在執行9個任務
正在執行10個任務
正在執行11個任務
正在執行12個任務

  

異步執行類Runner

支持返回超時檢測,系統中斷檢測

實現代碼以下,task/runner.go操作系統

package task

import (
    "os"
    "time"
    "os/signal"
    "sync"
)

//異步執行任務
type Runner struct {
    //操做系統的信號檢測
    interrupt chan os.Signal

    //記錄執行完成的狀態
    complete chan error

    //超時檢測
    timeout <-chan time.Time

    //保存全部要執行的任務,順序執行
    tasks []func(id int) error

    waitGroup sync.WaitGroup

    lock sync.Mutex

    errs []error
}

//new一個Runner對象
func NewRunner(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
        waitGroup: sync.WaitGroup{},
        lock:      sync.Mutex{},
    }
}

//添加一個任務
func (this *Runner) Add(tasks ...func(id int) error) {
    this.tasks = append(this.tasks, tasks...)
}

//啓動Runner,監聽錯誤信息
func (this *Runner) Start() error {

    //接收操做系統信號
    signal.Notify(this.interrupt, os.Interrupt)

    //併發執行任務
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回執行結果
    case err := <-this.complete:
        return err
        //超時返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//異步執行全部的任務
func (this *Runner) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }

        this.waitGroup.Add(1)
        go func(id int) {
            this.lock.Lock()

            //執行任務
            err := task(id)
            //加鎖保存到結果集中
            this.errs = append(this.errs, err)

            this.lock.Unlock()
            this.waitGroup.Done()
        }(id)
    }
    this.waitGroup.Wait()

    return nil
}

//判斷是否接收到操做系統中斷信號
func (this *Runner) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //中止接收別的信號
        signal.Stop(this.interrupt)
        return true
        //正常執行
    default:
        return false
    }
}

//獲取執行完的error
func (this *Runner) GetErrs() []error {
    return this.errs
}

使用方法    code

Add添加一個任務,任務爲接收int類型,返回類型error的一個閉包

Start開始執行傷,返回一個error類型,nil爲執行完畢, ErrTimeout表明執行超時,ErrInterrupt表明執行被中斷(相似Ctrl + C操做)

getErrs獲取全部的任務執行結果

 

測試示例代碼

task/runner_test.go

package task

import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)

func TestRunner_Start(t *testing.T) {
    //開啓多核心
    runtime.GOMAXPROCS(runtime.NumCPU())

    //建立runner對象,設置超時時間
    runner := NewRunner(18 * time.Second)
    //添加運行的任務
    runner.Add(
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
    )

    fmt.Println("異步執行任務")

    //開始執行任務
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("執行超時")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任務被中斷")
            os.Exit(2)
        }
    }

    t.Log("執行結束")

    t.Log(runner.GetErrs())

}

//建立要執行的任務
func createTask() func(id int) error {
    return func(id int) error {
        fmt.Printf("正在執行%v個任務\n", id)
        //模擬任務執行,sleep
        //time.Sleep(1 * time.Second)
        return nil
    }
}

執行結果

異步執行任務
正在執行2個任務
正在執行1個任務
正在執行4個任務
正在執行3個任務
正在執行6個任務
正在執行5個任務
正在執行9個任務
正在執行7個任務
正在執行10個任務
正在執行13個任務
正在執行8個任務
正在執行11個任務
正在執行12個任務
正在執行0個任務 
相關文章
相關標籤/搜索