關於Goroutine與Channel

關於Goroutine的原理

原理上的內容比較多,好比goroutine啓動的時候要執行哪些相關的操做,一點一點的補充一下。web

channel的基本原理

channel是go語言中的特殊的機制,既能夠同步兩個被併發執行的函數,又可讓這兩個函數經過相互傳遞特定類型的值來進行通訊。事實上這也是channel的兩個主要功能。算法

按照通道在初始化時是否有緩衝值,又能夠分爲緩衝通道非緩衝通道。通道初始化的時候也仍是須要使用make進行,好比make(chan int,10)聲明一個緩衝空間爲10個int的通道,直接make(chan int)就是聲明一個非緩衝通道編程

直接採用內建函數close(strChan)就能夠關閉通道。應該保證在安全的狀況下進行關閉通道的操做。基本的原則:內建函數 len(strChan)能夠查看通道中當前有的元素的數量 cap(strChan)能夠查看通道的總的容量,總容量一旦初始化以後就不會再發生改變了。json

關於select語句的使用,在go語言中,執行select語句的時候,會自動地自上而下地判斷每一個case中的發送或者接受的操做能否被當即執行,便是說當前的Goroutine不會所以操做而被阻塞。select語句在執行的時候,會先對各個case中的表達式進行判斷求值,並且直到全部的求值操做都完成以後纔會考慮選其中的某個case去執行。這要依據當時通道的特性來判斷,當發現第一個知足選擇條件的case的時候,這個case中的語句就會被執行,其餘的語句就會被忽略,當有多個case都知足狀況的話,系統會根據一個僞隨機算法決定哪一個case會被執行。default是一個特殊的case,若是沒有合適的case的話,default中的語句就會被執行,若是select語句中沒有加上default語句,那麼若是此時沒有case符合條件的話,當前goroutine就會一直阻塞在當前的這一條select語句上。所以default:對於select而言是必要的。安全

一般select還會和for語句結合在一塊兒來使用,由於單獨的select操做只會被選擇一次,要想持續不斷地使用select從通道中讀出信息,仍是要和for結合在一塊兒使用。因而跳出多層循環的時候,特別是添加了超時控制的案例,能夠參考使用場景(2)中介紹的兩種方法.websocket

  • 不管怎樣都不該該在接收端關閉通道,由於沒法判斷髮送端是否還有數據要發送,通道有一個很好的特性,就是發送端關閉通道後,接收端仍然能夠正常接受已經存在通道中的數據。誰啓的通道,誰最後負責關,是這個道理。
  • 注意element , ok := <-chann 的這種語法, 若是通道被關閉則ok的值會變爲false,element的值會變爲該通道類型的零值,一般用ok這種語法來判斷是否退出某個循環。好比下面這段代碼,同時也能夠看下goroutine的相關使用模式:
package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int, 1)
    sign := make(chan byte, 2)

    go func() {
        for i := 0; i < 5; i++ {
            ch <- i

            time.Sleep(1 * time.Second)
        }

        close(ch)
        fmt.Println("The channel is closed.")
        sign <- 0
    }()

    go func() {
        //這個循環會一直嘗試從ch中讀取信息出來 即便ch已經被髮送端關閉
        //但仍是能夠讀信息出來 最後當ok 爲false的時候 說明已經沒有數據從ch中讀出
        //跳出循環 注意這種判斷方式
        for {
            fmt.Printf("before extract channel len: %v ,", len(ch))
            e, ok := <-ch
            fmt.Printf("channel value: %d if extract ok :(%v) after extraction channel len : %v channel cap : %v \n", e, ok, len(ch), cap(ch))
            if !ok {
                break
            }

            time.Sleep(2 * time.Second)
        }
        fmt.Println("Done.")
        sign <- 1
    }()
    //要是不添加兩次取值的操做的話 主進程就會立刻結束 這裏至關因而實現了一個
    //同步的操做 等待兩個go func都結束以後 再結束主進程 注意這種技巧
    <-sign
    <-sign
}

/*output:
before extract channel len: 1 ,channel value: 0 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 1 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 2 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 3 if extract ok :(true) after extraction channel len : 0 channel cap : 1
The channel is closed.
before extract channel len: 1 ,channel value: 4 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 0 ,channel value: 0 if extract ok :(false) after extraction channel len : 0 channel cap : 1
Done.
*/
  • 一樣的從通道中迭代取出元素的操做還可使用 for range 來進行操做,當通道已經被關閉或者沒有值能夠再接收的話,for循環會當即被結束,好比使用場景(3)中的Batch函數,能夠修改爲以下的方式,更加簡潔:
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
    dests := make(chan Person, 100)
    go func() {
        for p :=range origs{
            handler.Handle(&p)
            log.Printf("old value : %v\n", p)
            //time.Sleep(time.Second)
            dests <- p
        }
        fmt.Println("alll the info has been handled")
        close(dests)
    }()
    return dests
}

關於通道的基本原則

  • 通道緩衝已經滿了的時候,再向通道中發送數據,會形成Goroutine的阻塞,通道沒有初始化,即值爲nil的時候,向其中發送數據會形成通道永久阻塞。
  • 關閉通道的操做應該由發送端來進行,通道關閉後,若是還有數據,接收端仍能夠正常接受數據。
  • 向通道中發送值,進行的是值傳遞

channel使用場景分析

使用場景(1)

注意 app.go文件夾中的 346 行左右開始地方的一個坑 注意time.After的返回值 因爲放在了for循環中 所以 每次會新new 一個 channel出來 還有注意跳出多層循環的方式
主要參考的是《Go併發編程實戰的相關內容》併發

代碼以下:app

package main

import (
    "fmt"
    "runtime"
)

func main() {
    names := []string{"E", "H", "R", "J", "M"}
    for _, name := range names {
        go func() {
            fmt.Printf("Hello , %s \n", name)
        }()
    }
    //要是不添加runtime的話 就不會有內容輸出
    //由於for循環執行速度太快了 直接循環結束跳出了最後的循環
    //以後 for循環中生成的5個go func 會被分別進行調度
    runtime.Gosched()
}

/* output
Hello , M 
Hello , M 
Hello , M 
Hello , M 
Hello , M
*/

根據代碼能夠看出,具體循環的時候for循環中的go func 的調度並非按照想象的那樣,一次循環一個go func ,不要對go func的執行時機作任何假設。異步

優化方案

一種思路是把runtime.Gosched()函數放在每次for循環結束的時候,這樣每次for循環以後,都會被從新調度一次,可能會出現正確的結果,並非每次都準確,好比go func的程序須要運行一段時間,在這段運行的時間以內,可能就已經循環了幾個元素過去了socket

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
    for _, name := range names {
        go func() {
            time.Sleep(1000 * time.Nanosecond)
            fmt.Printf("Hello , %s \n", name)
        }()

        runtime.Gosched()
    }

}

/* output:
Hello , E
Hello , J
Hello , J
Hello , P
Hello , P
Hello , P
*/

還有一種思路是採用傳遞參數的方式,就是給goroutine帶上了參數,雖然goroutine已經脫離了main函數的控制,可是它已經帶上了main函數給的烙印,至關因而某種解耦的感受,for循環屢次就不會影響打印的結果了,好比下面代碼:

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
    for _, name := range names {
        go func(who string) {
            time.Sleep(1000 * time.Nanosecond)
            fmt.Printf("Hello , %s \n", who)
        }(name)

    }
    runtime.Gosched()

}

/* output:
Hello , E
Hello , H
Hello , R
Hello , J
Hello , M
*/

可是這個方法仍然頗有問題,只能保證在函數執行時間很短的時候結果正常,並且不輸出重複的內容,若是程序執行時間比較長的話,頗有可能main函數會被提早結束,按順序生成的多個goroutine在cpu那裏會不會仍然按照順序被調度執行?這個仍然不肯定?有幾個goroutine會不能被正常調度到而且執行,好比像上面的代碼的輸出樣子,並且每次輸出的結果也都是不肯定的。

使用場景(2)

編碼的時候遇到這樣一個場景,服務建立成功以後,須要等待ip被分配,ip被分配好以後,服務才正式部署成功,最後將全部的信息返回給前臺,因而打算這樣實現,在服務建立成功以後就不斷循環,查詢ip若是分配成功了就返回,若是超過了時間也返回失敗,最後這部分的代碼像下面這樣,
第一個例子中退出的方式採用的是標記的思路形式,每次循環結束的時候會檢查一下標記看看是否退出,第二個採用的是特殊的語法,直接跳出最外層的循環,注意這種時間控制的實現,仍是弄成一個defalt一個case比較好,因爲case的調度可能有隨機性,所以正常執行的內容放在default的部分,時間控制的那個channel放在某一個case當中。

package main

import (
    "fmt"
    "time"
)

func main() {
    sign := make(chan int)
    chtemp := make(chan int, 5)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(time.Millisecond * 300)
            chtemp <- i
        }
        close(chtemp)

    }()
    var e int
    ok := true

    //new 一個新的channel返回 注意這裏要提早聲明好
    t := time.After(time.Second)
    go func() {
        for {
            select {
            case <-t:
                fmt.Println("time out")
                ok = false
                break
                //注意這裏是使用 = 而不是 :=

            default:
                e, ok = <-chtemp
                fmt.Printf("value : %v \n", e)
                if !ok {
                    break
                }
            }
            if !ok {
                sign <- 1
                break
            }
        }
    }()
    <-sign
}
//一個時間控制的channel
//注意這個要在循環以外單獨聲明 不然每次都會分配一個新的 time.After的channel返回過來
t := time.After(time.Second * 10)

//注意這種跳出多層循環的操做方式 要是單層使用break的話 僅僅跳出的是 select 那一層的循環

A:
    for {
        select {
        //若是時間到了 就返回錯誤信息
        case <-t:
            log.Println("time out to allocate ip")
            //delete the se which deploy failed
            a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
            http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+"deploy error : time out"+`"}`, 406)
            break A
        //若是時間沒到 就是 t 尚未發回信息 select語句就默認跳轉到default塊中
        //執行查找ip是否分配的操做
        default:
            //log.Println("logout:", <-timeout)
            sename := service.ObjectMeta.Labels["name"]
            podslist, err := a.Podip(sename)
            if err != nil {
                log.Println(err.Error())
                a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
                http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+err.Error()+`"}`, 406)
                break A
            }
            if len(podslist) == 0 {
                continue
            } else {
                log.Println("allocation ok ......")
                a.Data["json"] = detail
                a.ServeJson()
                break A
            }

        }

    }

使用場景(3)

經常有這樣一種場景,把某些信息從舊的資源池中取出來,通過一些加工處理,再放入新的資源池中,這個過程若是按傳統的方式就是採用徹底串行的方式效率會很低,粒度太粗了,具體的粒度能夠細化以每次所取的單位資源爲粒度。
好比以書上p339爲例,有一個資源池存儲這person的信息,將每一個person從中取出來,以後進行一些處理,再存到新的資源池中,這裏用oldarray以及newarray來模擬舊的和新的資源池:

具體的代碼以下:

package main

//參考go 併發編程實戰 p337
import (
    "log"
    "strconv"
    "time"
)

type Person struct {
    name string
    age  int
    addr string
}

var oldpersonarray = [5]Person{}
var newpersonarray = [5]Person{}

type PersonHandler interface {
    Batch(origs <-chan Person) <-chan Person
    Handle(orig *Person)
}

//struct 實現了personhandler 接口
type PersonHandlerImpl struct{}

//從origs接收信息 處理以後再返回給新的channel
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
    dests := make(chan Person, 100)
    go func() {
        for {
            p, ok := <-origs
            if !ok {
                close(dests)
                break
            }
            handler.Handle(&p)
            log.Printf("old value : %v\n", p)
            //time.Sleep(time.Second)
            dests <- p
        }
    }()
    return dests
}

//這裏要使用引用傳遞
func (handler PersonHandlerImpl) Handle(orig *Person) {
    orig.addr = "new address"
}

func getPersonHandler() PersonHandler {
    return &PersonHandlerImpl{}

}

//print the oldpersonarray into the chan<-Person
func fetchPerson(origs chan<- Person) {
    for _, v := range oldpersonarray {
        //fmt.Printf("get the value : %v %v \n", k, v)
        time.Sleep(time.Second)
        origs <- v
    }
    close(origs)

}

//fetch the value from the channel and store it into the newpersonarray
func savePerson(dest <-chan Person) <-chan int {
    intChann := make(chan int)
    go func() {
        index := 0
        for {
            p, ok := <-dest
            if !ok {
                break
            }
            //time.Sleep(time.Second)
            log.Printf("new value transfer %v \n", p)

            newpersonarray[index] = p
            index++

        }

        intChann <- 1
    }()
    return intChann
}

func init() {
    //使用range的話是值傳遞 這裏要給oldpersonarray賦值進來
    tmplen := len(oldpersonarray)
    for i := 0; i < tmplen; i++ {
        oldpersonarray[i].addr = "old address"
        oldpersonarray[i].age = i
        oldpersonarray[i].name = strconv.Itoa(i)

    }

    log.Printf("first print init value : %v\n", oldpersonarray)

}
func main() {

    handeler := getPersonHandler()
    origs := make(chan Person, 100)
    dests := handeler.Batch(origs)
    //go func() { fetchPerson(origs) }()
    // 不加go func的話 要等這句執行完 才能執行下一句
    // 則orgis信息都輸出 徹底關閉掉 這個時候 從dest接收信息的語句纔開始執行
    // 因此不會動態輸出 這句加上go func的話 就會沒隔 1s 動態輸出
    // 若是將fetchPerson 再往前面放一句 則old value也不會動態輸出
    fetchPerson(origs)
    sign := savePerson(dests)
    <-sign
    log.Printf("last print new value : %v \n", newpersonarray)

}

總體的結構圖以下:

代碼結構圖

代碼基本分析:

  • 首先聲明一個 PersonHandler 的接口,以後聲明一個struct PersonHandlerImpl 將接口中的兩個方法都實現了,init函數用於進行oldarray的初始化工做。注意爲了減小出錯,內部的函數在方聲明的時候都是單向的channel。
  • 1,2 fetchperson從oldarray中區數據,並把數據存到origs channel中,注意最後取完數據到通道以後,要由發送方將channel關閉,不然可能形成deadlock。注意在main函數中,若是fech操做沒有放到一個goroutine中來執行,就仍然是串行的,至關因而把數據都放入到channel中,另外一端纔開始取,沒發揮出併發的優點。
  • 3,4 Batch函數將person信息從origs中取出來,進行處理後,同時傳到dests中,最後將dests返回,注意這裏不是所有傳入以後纔將dests返回,而是新啓動一個goroutine執行傳入操做,同時將dests返回,注意要主動關閉channel。
  • 5 savePerson操做接收一個<-chann 以後從中接受person信息,將值寫入到新的資源池中,最後所有寫入結束以後,傳一個sign channel給主進程,結束。
  • 總結,在須要動態輸出信息的時候,goroutine每每是和channel結合在一塊兒使用。最多見的用法是,一個goroutine負責向channel中寫入數據,以後將channel返回,由其餘進程取出信息。好比以前寫過的一些websocket從前臺接受信息,後臺處理信息以後再動態返回給前臺打出結果的模型,就和這個差很少,總之具體的異步執行流程要理清楚,都有哪些channel,負責傳遞的信息分別是什麼。
相關文章
相關標籤/搜索