Golang之chan/goroutine(轉)

原文地址:http://tchen.me/posts/2014-01-27-golang-chatroom.html?utm_source=tuicool&utm_medium=referral 看了一上午寫得很好,能夠拿來試試刀javascript

最近在team內部培訓golang,目標是看看golang可否被C工程師快速掌握。我定了個一個月,共計20小時的培訓計劃,首先花10個小時(兩週,天天1小時)讓你們掌握golang的基本要素,能寫一些入門級的程序,以後再花兩週時間作一個1000行代碼規模的Proof of concept項目。爲了能在培訓的slides上直接運行go code,我作了個簡單的 coderunnerd,能夠接受websocket傳過來的code,編譯運行再把stdout返回給websocket,爲了更清晰地說明goroutine和chan的使用,以及golang的一些best practice,我分階段寫了個 chatroom。本文介紹一下如何使用goroutine和chan來作一個簡單的聊天室。css

需求

聊天室的需求很簡單:html

  • 服務器監聽某個端口,客戶端可鏈接並開始聊天。
  • 任何客戶端的發言都會被廣播給全部客戶端。
  • 客戶端能夠爲本身設定名字或者執行一些聊天命令。

設計與實現

基本想法

服務器(Server):java

  • Server accept下來的connection被存在一個數據結構Client中,並以connection爲key,Client爲value,存在map裏。
  • 每一個Client都有本身的goroutine去接受和發送消息。Client和Server之間經過channel來傳遞消息。

客戶端(Client):git

  • 發送和接收都有各自的goroutine,經過channel和stdin/stdout交互

實現

全部chat相關的邏輯都被封裝在 chat package裏,client和server的cli只負責將ui和chat粘合起來。github

首先,是核心的數據結構:golang

type Message chan string type Client struct { conn net.Conn incoming Message outgoing Message reader *bufio.Reader writer *bufio.Writer quiting chan net.Conn name string }

Client 是一個服務器和客戶端都共享的數據結構。conn是創建的鏈接,reader/writer是conn上的bufio。Client與外界的接口是incoming/outgoing兩個channel,即:Server 會把要發送的內容 push 到 outgoing channel 裏,供writer去寫;而從reader讀入的數據會 push 到 incoming channel 裏,供 Server 讀。web

每一個 Client 有本身的名字,服務器端代碼會使用這個名字(客戶端代碼不會使用)。正則表達式

type Token chan int type ClientTable map[net.Conn]*Client type Server struct { listener net.Listener clients ClientTable tokens Token pending chan net.Conn quiting chan net.Conn incoming Message outgoing Message }

Server 保存一張 ClientTable。每一個 accept 到的 conn 會 push 進 pending channel,等待建立client。Server有 incoming / outgoing 兩個 channel,分別和 client 的 incoming / outgoing 關聯。ruby

Server 有一組 tokens,決定了一個Server最多能裝多少Client(避免Server overloading)。

下面看 Server 的建立流程:

const ( MAXCLIENTS = 50 ) func CreateServer() *Server { server := &Server{ clients: make(ClientTable, MAXCLIENTS), tokens: make(Token, MAXCLIENTS), pending: make(chan net.Conn), quiting: make(chan net.Conn), incoming: make(Message), outgoing: make(Message), } server.listen() return server }

很簡單,無須多說。server.Listen() 實現以下:

func (self *Server) listen() { go func() { for { select { case message := <-self.incoming: self.broadcast(message) case conn := <-self.pending: self.join(conn) case conn := <-self.quiting: self.leave(conn) } } }() }

這是一個 goroutine,作三件事:

  • 若是 self.incoming 收到東西,將其 broadcast 出去。
  • 若是有新的鏈接,則將其接入到聊天室。
  • 若是一個 Client 退出,則進行一些清理和通知。

咱們先看一個新鏈接如何加入到聊天室:

func (self *Server) join(conn net.Conn) { client := CreateClient(conn) name := getUniqName() client.SetName(name) self.clients[conn] = client log.Printf("Auto assigned name for conn %p: %s\n", conn, name) go func() { for { msg := <-client.incoming log.Printf("Got message: %s from client %s\n", msg, client.GetName()) if strings.HasPrefix(msg, ":") { if cmd, err := parseCommand(msg); err == nil { if err = self.executeCommand(client, cmd); err == nil { continue } else { log.Println(err.Error()) } } else { log.Println(err.Error()) } } // fallthrough to normal message if it is not parsable or executable self.incoming <- fmt.Sprintf("%s says: %s", client.GetName(), msg) } }() go func() { for { conn := <-client.quiting log.Printf("Client %s is quiting\n", client.GetName()) self.quiting <- conn } }() }

這裏先經過鏈接創建 Client 數據,爲其自動分配一個惟一的名字,而後將其加入到 ClientTable 中。注意在這個函數裏每一個 Client 會運行兩個 goroutine,咱們先記住這一點。

第一個 goroutine 從 Client 的 incoming channel 中拿出 message,若是是命令的話就執行之,不然將其放入 Server 的 incoming channel,等待被 broadcast 出去。以前 Listen() 方法裏有對應的處理:

case message := <-self.incoming: self.broadcast(message)

順手看一下 broadcast 怎麼作的:

func (self *Server) broadcast(message string) { log.Printf("Broadcasting message: %s\n", message) for _, client := range self.clients { client.outgoing <- message } }

第二個 goroutine 從 Client 的 quiting channel 中拿出 conn,放入 Server 的 quiting channel 中,等待處理某個 Client 的退出。一樣在 Listen() 中有處理:

case conn := <-self.quiting: self.leave(conn)

順手也看看 Leave 作些什麼:

func (self *Server) leave(conn net.Conn) { if conn != nil { conn.Close() delete(self.clients, conn) } self.generateToken() }

Leave 裏有兩個坑,一個是從 map 裏刪除一個 key 是否須要 synchronize,咱們放在下面的『併發與同步』裏詳細再表;另外一個坑是 generateToken(),立刻就會講到。

看了這麼多代碼了,還沒看到服務器建連的代碼,有點說不過去。接下來咱們看 Start

func (self *Server) Start(connString string) { self.listener, _ = net.Listen("tcp", connString) log.Printf("Server %p starts\n", self) // filling the tokens for i := 0; i < MAXCLIENTS; i++ { self.generateToken() } for { conn, err := self.listener.Accept() if err != nil { log.Println(err) return } log.Printf("A new connection %v kicks\n", conn) self.takeToken() self.pending <- conn } }

這裏 generateToken 及 takeToken 與 Leave 裏的 generateToken 呼應。這些代碼對應一個隱式需求:服務器不可過載。因此咱們有 MAXCLIENTS 來限制一個服務器的 client 上限。可是,怎麼比較漂亮地處理這個上限問題?由於在一個真實的聊天場景下,聊天室裏的人是能夠進進出出的。

咱們採用 token。系統生成有限的 token,被拿光後,當且僅當有人歸還 token,等待者才能得到 token,進入聊天室。在 golang 中,goroutine 和 chan 簡直是爲此需求量身定製的。咱們看運做機制:

  • 首先生成 MAXCLIENTS 個 token。
  • 第 1 - MAXCLIENTS 個 client:
    • 從 tokens 裏拿走一個 token
    • 把本身的 conn 放入 pending channel(若是以前的 pending conn 還被取走,則這個 goroutine就會被掛起,等待以前的 pending conn 被取走。不然,繼續執行。
  • 第 (MAXCLIENTS + 1) 個 client:
    • 從 tokens 裏拿不到 token 了,當前的 goroutine 在這一點上掛起,等待 token。
  • 有人離開:
    • 歸還一個 token,這樣以前被掛起等待 token 的 goroutine 被喚醒,繼續執行。

沒有使用任何同步機制,代碼乾淨清晰漂亮,咱們就完成了一個排隊系統。Ura for go!


喘一口氣,接下來看 join 的時候調用的 CreateClient 的代碼:

func CreateClient(conn net.Conn) *Client { reader := bufio.NewReader(conn) writer := bufio.NewWriter(conn) client := &Client{ conn: conn, incoming: make(Message), outgoing: make(Message), quiting: make(chan net.Conn), reader: reader, writer: writer, } client.Listen() return client }

client.Listen 極其細節:

func (self *Client) Listen() { go self.Read() go self.Write() } func (self *Client) Read() { for { if line, _, err := self.reader.ReadLine(); err == nil { self.incoming <- string(line) } else { log.Printf("Read error: %s\n", err) self.quit() return } } } func (self *Client) Write() { for data := range self.outgoing { if _, err := self.writer.WriteString(data + "\n"); err != nil { self.quit() return } if err := self.writer.Flush(); err != nil { log.Printf("Write error: %s\n", err) self.quit() return } } }

client.Listen 裏咱們也生成了兩個 goroutine,加上以前的兩個,每一個 client 有四個 goroutine(因此運行中的Server的 gorutine 的數量接近於 client num * 4)。雖然咱們能夠作一些優化,但這並沒關係,一個 go 進程裏運行成千上萬個 goroutine沒有太大問題,由於 goroutine 運行在 userspace,其 memory footprint很小(幾k),切換代價很是低(沒有 syscall)。

這兩個 goroutine 正如一開始設計時提到的,一讀一寫,經過 channel 和外界交互。

這就是整個聊天室的主體代碼。接下來的命令行就很簡單了。

先看 Server 代碼:

package main import ( . "chatroom/chat" "fmt" "os" ) func main() { if len(os.Args) != 2 { fmt.Printf("Usage: %s <port>\n", os.Args[0]) os.Exit(-1) } server := CreateServer() fmt.Printf("Running on %s\n", os.Args[1]) server.Start(os.Args[1]) }

接下來是 Client 代碼:

package main import ( "bufio" . "chatroom/chat" "fmt" "log" "net" "os" ) func main() { if len(os.Args) != 2 { fmt.Printf("Usage: %s <port>\n", os.Args[0]) os.Exit(-1) } conn, err := net.Dial("tcp", os.Args[1]) if err != nil { log.Fatal(err) } defer conn.Close() in := bufio.NewReader(os.Stdin) out := bufio.NewWriter(os.Stdout) client := CreateClient(conn) go func() { for { out.WriteString(client.GetIncoming() + "\n") out.Flush() } }() for { line, _, _ := in.ReadLine() client.PutOutgoing(string(line)) } }

運行一下(起了兩個client):

➜  chatroom git:(master) ./bin/chatserver :5555 ➜ chatroom git:(master) ./bin/chatserver :5555 Running on :5555 2014/01/30 09:05:24 Server 0xc2000723c0 starts 2014/01/30 09:05:34 A new connection &{{0xc20008f090}} kicks 2014/01/30 09:05:34 Auto assigned name for conn 0xc200000100: User 0 2014/01/30 09:05:48 A new connection &{{0xc20008f120}} kicks 2014/01/30 09:05:48 Auto assigned name for conn 0xc200000148: User 1 2014/01/30 09:06:39 Got message: Hello from client User 0 2014/01/30 09:06:39 Broadcasting message: User 0 says: Hello 2014/01/30 09:06:48 Got message: :name Tyr from client User 1 2014/01/30 09:06:48 Broadcasting message: Notification: User 1 changed its name to Tyr 2014/01/30 09:06:57 Got message: Hello world! from client User 0 2014/01/30 09:06:57 Broadcasting message: User 0 says: Hello world! 2014/01/30 09:07:01 Got message: Hello from client Tyr 2014/01/30 09:07:01 Broadcasting message: Tyr says: Hello 2014/01/30 09:08:19 Read error: EOF 2014/01/30 09:08:19 Client User 0 is quiting 2014/01/30 09:08:19 Broadcasting message: Notification: User 0 quit the chat room.

其中一個 client:

➜  chatroom git:(master) ./bin/chatclient :5555 User 0 says: Hello :name Tyr Notification: User 1 changed its name to Tyr User 0 says: Hello world! Hello Tyr says: Hello Notification: User 0 quit the chat room.

完整代碼請見 github repo

以上代碼能正確運行,不過還有很多問題,好比 server stop 時 goroutine 並未正確 cleanup。但對於理解 goroutine 和 chan 來講,不失爲一個很好的例子。

Lessons learnt

使用go test

我如今寫代碼已經離不開很是方便的 go test 了。golang 的開發者們很是聰明,他們知道把一個 test framework / utility 放在覈心的安裝包中是多麼重要。這個 chatroom 是迭代開發的,你能夠 checkout v0.1/v0.2/v0.3 分別看不一樣時期的代碼。每次添加新功能,或者重構代碼時,go test ./chat 就是我信心的保證。代碼和test case同步開發,新的 feature 有新的 case 去 cover,這樣一點點作上去。拿柳總的話說,就是:『壘一層土,夯實,再壘一層』。

例子:

➜  chatroom git:(master) go test ./chat ok chatroom/chat  0.246s

併發與同步

golang 在設計時作了不少取捨。其中,對map的操做是否原子就有不少 debate。最終,爲了 performance,map 的操做不具有原子性,亦即不是 multithread safe。因此,正確的作法是在從 map 中刪除一個 conn 時和使用 range 中讀取時作讀寫同步。因爲本例運行在單線程環境下(是的,若是你不指定,golang process 默認單線程),且以教學爲目的,實在不忍用難看的同步操做下降代碼的美感。

另一種作法是在讀寫兩個須要同步的地方使用 channel 進行同步(還記得剛剛講的 token)吧?

若是你對 map 的 thread-safe 感興趣,能夠讀讀 stackoverflow上的這個問題

經過close來向全部goroutine傳遞終止訊息

在個人代碼裏,close 作得比較 ugly,不知你是否感覺到了。更好的作法是使用 close 一個 channel 來完成關閉 goroutine 的動做。當 close 發生時,全部接收這個 channel 的 goroutine 都會收到通知。下面是個簡單的例子:

package main import ( "fmt" "strconv" "time" ) const ( N = 10 ) func main() { quit := make(chan bool) for i := 0; i < N; i++ { go func(name string) { for { select { case <-quit: fmt.Printf("clean up %s\n", name) return } } }(strconv.Itoa(i)) } close(quit) for { time.Sleep(1 * time.Second) } }

我生成了 N 個 goroutine,但只需使用一個 close 就能夠將其所有關閉。在 chatroom 代碼中,關閉 server 時,也能夠採用相同的方法,關閉全部的 client 上的 goroutine。

下面是上述代碼執行的結果:

➜  terminate  go run terminate.go clean up 0 clean up 1 clean up 2 clean up 3 clean up 4 clean up 5 clean up 6 clean up 7 clean up 8 clean up 9

儘量把任務分佈在goroutine中

若是你沒有看過 Rob Pike 的 Concurrency is not parallelism,建議必定要看,無論你有沒有 golang 的 background。Concurrency 是你寫軟件的一種追求,和是否並行無關,但和模塊化,簡單,優雅有關。

goroutine不可作無阻塞的infinite loop

goroutine,至少在 golang 1.2 及以前的版本,都運行在一個 cooperative multitasking 的 scheduler 上。因此你要保證你的任何一個 infinite loop 都要有可能被 block 住,不管是 block 在 IO, chan, 仍是主動 block 在 timer 上,總之,infinite loop 要有退出機制。剛纔的例子咱們稍微改改:

package main import ( "fmt" "strconv" //"time" ) const ( N = 10 ) func main() { quit := make(chan bool) for i := 0; i < N; i++ { go func(name string) { for { select { case <-quit: fmt.Printf("clean up %s\n", name) return } } }(strconv.Itoa(i)) } close(quit) for { //time.Sleep(1 * time.Second) } }

乍一看,這個例子中的 gorountine應該能收到 close 而自我關閉。在 main 執行的過程當中,頭十個新建立出來的 goroutine 還未獲得調度。雖然在 main 裏咱們 close 了 quit,但因爲接下來的 dead loop 一直不釋放 CPU,因此其餘 goroutine 一直得不到調度。運行的話沒有任何輸出:

➜  terminate  go run terminate.go ^Cexit status 2

咱們稍稍改改這個程序:

package main import ( "fmt" "runtime" "strconv" //"time" ) const ( N = 10 ) func main() { runtime.GOMAXPROCS(2) quit := make(chan bool) for i := 0; i < N; i++ { go func(name string) { for { select { case <-quit: fmt.Printf("clean up %s\n", name) return } } }(strconv.Itoa(i)) } close(quit) for { //time.Sleep(1 * time.Second) } }

如今容許這個程序運行在兩個 thread 上。這樣就能正常運行了。但切記,沒有阻塞機制的 infinite loop 不是一個好的設計。

➜  terminate  go run terminate1.go clean up 0 clean up 1 clean up 2 clean up 3 clean up 4 clean up 5 clean up 6 clean up 7 clean up 8 clean up 9 ^Cexit status 2

DRY (Don't Repeat Yourself)

寫 chatroom 時,我不斷重構代碼,其目的就是能讓代碼乾淨,漂亮。比方個人一次 commit:git diff 39690d9 6851177,就是在作 test case refactor。

DRY 的前提是有完善的 test case,前文也提到。這是項目內部的 DRY。

另一種 DRY 的方式是(從我途客圈的前同事 @chenchiyuan 那裏學到的):若是兩個或以上的項目中都用到相似結構的代碼,則考慮將其重構到一個第三方的 lib 裏。在 chatroom 中,有兩處這樣的重構,重構在個人 goutil 項目中。

第一處是生成惟一數:

package uniq var ( num = make(chan int) ) func init() { go func() { for i := 0; ; i++ { num <- i } }() } func GetUniq() int { return <-num }

第二處是正則表達式匹配,將匹配的結果放入一個 map 的 slice 裏:

package regex import ( "regexp" ) const ( KVPAIR_CAP = 16 ) type KVPair map[string]string func MatchAll(r *regexp.Regexp, data string) (captures []KVPair, ok bool) { captures = make([]KVPair, 0, KVPAIR_CAP) names := r.SubexpNames() length := len(names) matches := r.FindAllStringSubmatch(data, -1) for _, match := range matches { cmap := make(KVPair, length) for pos, val := range match { name := names[pos] if name != "" { cmap[name] = val } } captures = append(captures, cmap) } if len(captures) > 0 { ok = true } return }

總結一條鐵律:project 級的 DRY 是函數化,package化;cross project的 DRY 是 repo 化。

相關文章
相關標籤/搜索