多任務的使用模式

僅執行單次的任務

全局資源的加載(初始化), 多任務中公共資源銷燬html

// 初始化工做
// 1. 經過init 方法,僅執行一次,可是不能重複調用
// 模擬全局配置
type AppConfig struct {
	Name string
	IP string
	Version string
}
 var appconfig *AppConfig

func init()  {
	appconfig=new(AppConfig)	
	appconfig.Name = "myapp"
	appconfig.IP = "192.168.66.88"
	appconfig.Version="v1.0"
}

// 2. 使用sync.once方法,保證咱們代碼執行一次
// 銷燬全局資源
var once sync.Once
func DelAPP()  {
	once.Do(func(){
		appconfig = nil
		log.Println("delete app!")
	})
}

編寫異步方法

假如任務耗時比較長能夠考慮把方法編寫成異步的(相似前端知識中的ajax請求),例如任務中有須要下載的任務,這時候下載任務能夠考慮編寫成異步,例如數據存儲任務等前端

編寫異步任務方法要點: 任務內部啓動goroutine處理任務,並當即返回>web

// 編寫一個數據庫異步處理任務(模擬)
func StorageData(data string){
    // 預處理數據
    log.Println("入庫前預處理: ",data)
    go func(data){
        time.Sleep(time.Sencond*3)
        log.Println("數據庫存儲完成")
    }
}

編寫異步帶回調方法

異步方法的基礎上,若是想處理異步的結果,須要傳遞一個回調方法ajax

注意: 編寫異步的基礎上,加上回調方法(處理結果)數據庫

// 編寫一個異步爬取web的方法
package main

import (
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"time"
)

func CrawlUrl(url string,fn func(response *http.Response))  {
	go func() {
		response,err:=http.Get(url)
		if err!=nil{
			log.Println(err)
			return
		}
		fn(response)
	}()
}

func main() {

	CrawlUrl("http://www.baidu.com", func(response *http.Response) {
		data,err:=ioutil.ReadAll(response.Body)
		if err!=nil{
			return
		}
		log.Println("Content length: ",len(data))
	})
	log.Println("main func do other thing")

	CrawlUrl("http://i2.hdslb.com/bfs/archive/bc8adff1dafe7494c6b2155ec82725af0034c31b.png", func(response *http.Response) {
		data,err:=ioutil.ReadAll(response.Body)
		if err!=nil{
			log.Println(err)
			return
		}
		f,_:=os.Create("1.png")
		defer f.Close()
		_, _ = f.Write(data)
	})

	time.Sleep(time.Second*5)
}

 

等待全部任務結束

啓動多個子任務,等待子任務結束緩存

不須要結果網絡

須要結果的app

// 1. 須要返回結果,僅須要完成任務便可
func CallTaskNoRsult(){
    wg := sync.WaitGroup{}
    wg.Add(5)
    for i:=0;i<5;i++{
        go func(num int) {
            defer wg.Done()
            log.Println("任務",num,"完成")
        }(i)
    }
    wg.Wait()
}


// 2. 須要完成任務還須要返回結果,可能須要作後續處理
func CallTaskResults()  {
    // 建立一個channel 用於接收任務處理結果
    results:=make(chan string,10)

    defer close(results)
    // 隨機種子
    rand.Seed(time.Now().UnixNano())

    for i:=0;i<10;i++{
        go func(num int) {
            // 隨機生成一個結果,並把結果添加到結果隊列
            results<-fmt.Sprintf("Task %d# result: %d",num,rand.Intn(20)+10)
        }(i)
    }

    //等待輸出結果
    for i:=0;i<10;i++{
        log.Println(<-results)
    }
}

 

等待任意一個任務完成

執行等待多個子任務,只要任意一個任務完成便可,例以下載文件,假若有多個下載源,不肯定哪一個源網絡狀況好,能夠考慮使用這個方式異步

// 模擬多源下載,任意一個任務完成就結束
func Download(){
	// 定義一個channel,用於接收結果
	result:=make(chan string)

	defer close(result)

	// 種子
	rand.Seed(time.Now().UnixNano())

	// 異步獲取子任務下載 數據
	for i:=0;i<10;i++{
		go func(num int) {
			// 隨機休眠一段時間
			time.Sleep(time.Second*time.Duration(rand.Intn(5)+1))
			result<-fmt.Sprintf("Task %d# download ok!",num)
		}(i)
	}

	// 等待子任務完成
	log.Println(<-result)

}

 

協做等待其餘任務結果

稍微複雜的一些業務,其中的一個子任務須要另外一個子任務的結果url

 

 

 

// 異步協做,任務B須要任務的計算結果
func BWaitA()  {
	// 建立channel 用於任務通信
	ch:=make(chan string,1)  // 緩存爲1
	defer close(ch)
	// 建立wait group
	wg:= sync.WaitGroup{}
	wg.Add(2)
	// 啓動任務A
	go func() {
		defer wg.Done()
		log.Println("A do working...")
		time.Sleep(time.Second*3)
		log.Println("A end calc...., send result to B")
		ch<-"data for B"
		log.Println("A do send result to B, do other thing!")
	}()

	// 啓動任務B
	go func() {
		defer wg.Done()
		log.Println("B do working...")
		time.Sleep(time.Second*2)
		log.Println("B wait A...")
		log.Println(<-ch)
		log.Println("B user A result do other thing!")
	}()
	wg.Wait()
}

任務取消

某些特殊的情景下,咱們可能須要取消子任務的執行,例如主任務由於用戶的緣由,須要提早結束,通知全部的子任務結束

單層任務取消

func CancleTask()  {
	// 建立一個cancele 的channel 用於通知子任務結束
	canncle:=make(chan struct{})

	// 封裝一個方法用於檢查 任務是否被取消了
	iscancle:= func() bool {
		select {
		case <-canncle:
			return true
		default:
			return false
		}
	}
	//模擬啓動子任務
	for i:=0;i<5;i++{
		go func(num int) {
			for{
        // 監放任務是否被取消
				if iscancle(){
					// 若是被取消則 退出任務
					log.Printf("Task #%d is canceled!\n",num)
					return
				}
				log.Printf("Task #%d is working...\n",num)
                time.Sleep(time.Millisecond *50)
			}
		}(i)
	}
	// 模擬任務運行一段時間
	log.Println("main working for an while...")
	time.Sleep(time.Millisecond *100)

	// 取消任務,利用關閉channel的廣播特性
	close(canncle)

	time.Sleep(time.Second)
}

 

多層任務取消

當任務比較複雜的時候,更多狀況多是關聯任務,例以下圖這樣的多層任務

Context 初識

  • 根context節點,context.BackGroud() 建立一個空的context(根節點),沒有任何做用,爲了給子contentx繼承的

  • ctx,canncel:=context.WithCancel(ctx),返回一個ctx子節點,返回一個cancleFuc

  • 調用取消方法,給全部的子節點發送取消信號

  • 子任務中監聽取消信號並退出任務

使用context取消任務

func CancelWithCtx()  {
	// 建立空ctx,根節點
	root:=context.Background()

	// 定義isCancel方法
	isCanncel:= func(ctx context.Context) bool {
		select {
		case <-ctx.Done():
			return true
		default:
			return false
		}
	}

	Task:=func (ctx context.Context, num int){
		// 啓動子任務
		go func() {
			for{
				if isCanncel(ctx){
					// 若是被取消則 退出任務
					log.Printf("Task #%d sub is canceled!\n",num)
					return
				}
			}
		}()
		for{
			if isCanncel(ctx){
				// 若是被取消則 退出任務
				log.Printf("Task #%d is canceled!\n",num)
				return
			}
			log.Printf("Task #%d is working...\n",num)
			time.Sleep(time.Millisecond*200)
		}
	}
    // 建立子ctx,用於取消
	ctxOne,cancelOne:=context.WithCancel(root)
	go Task(ctxOne,1)
    
    // 建立子ctx,用於取消
	ctxTwo,_:=context.WithCancel(root)
	go Task(ctxTwo,2)

	// 模擬保證全部的goroutine都運行起來
	time.Sleep(time.Second)
    // 取消 第一個任務
	cancelOne()
	time.Sleep(time.Second)
}

output:

2019/06/27 11:55:45 Task #1 is working...
2019/06/27 11:55:45 Task #2 is working...
2019/06/27 11:55:46 Task #1 is working...
2019/06/27 11:55:46 Task #2 is working...
2019/06/27 11:55:46 Task #1 is working...
2019/06/27 11:55:46 Task #2 is working...
2019/06/27 11:55:46 Task #1 is working...
2019/06/27 11:55:46 Task #2 is working...
2019/06/27 11:55:46 Task #1 is working...
2019/06/27 11:55:46 Task #2 is working...
2019/06/27 11:55:46 Task #1 sub is canceled!
2019/06/27 11:55:46 Task #1 is canceled!
2019/06/27 11:55:46 Task #2 is working...
2019/06/27 11:55:47 Task #2 is working...
2019/06/27 11:55:47 Task #2 is working...
2019/06/27 11:55:47 Task #2 is working...
2019/06/27 11:55:47 Task #2 is working...

 

context在net/http包中的演示

package main

import (
	"fmt"
	"net/http"
	"time"
)

func CtxTest(w http.ResponseWriter, r *http.Request){
	// 獲取 ctx
	ctx := r.Context()

	fmt.Println("processing request")

	select {
	case <-time.After(5 * time.Second):
		// 模擬請求處理完
		w.Write([]byte("request processed"))
	case <-ctx.Done(): // 網頁加載完畢 Done 信號發出
		// 用戶取消的時候,獲取取消信號s
		fmt.Println( "request cancelled")
	}
}

func main() {
	// 綁定路由處理方法
	http.HandleFunc("/",CtxTest)
	// 啓動服務
	http.ListenAndServe(":8000", nil)
}
相關文章
相關標籤/搜索