本節內容:Lect 2 RPC and Threadshtml
線程:Threads allow one program to (logically) execute many things at once.
The threads share memory. However, each thread includes some per-thread state: program counter, registers, stack.
java
下面以go語言寫一個爬蟲做爲例子來介紹線程:golang
Go example: crawler.go數組
package main import ( "fmt" "sync" ) // Several solutions to the crawler exercise from the Go tutorial: https://tour.golang.org/concurrency/10 type fakeResult struct { body string urls []string } // fakeFetcher is Fetcher that returns canned results. type fakeFetcher map[string]*fakeResult // fetcher is a populated fakeFetcher. var fetcher = fakeFetcher{ "http://golang.org/": &fakeResult{ "Title: The Go Programming Language", []string{ "http://golang.org/pkg/", "http://golang.org/cmd/", }, }, "http://golang.org/pkg/": &fakeResult{ "Title: Packages", []string{ "http://golang.org/", "http://golang.org/cmd/", "http://golang.org/pkg/fmt/", "http://golang.org/pkg/os/", }, }, "http://golang.org/pkg/fmt/": &fakeResult{ "Title: Package fmt", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, "http://golang.org/pkg/os/": &fakeResult{ "Title: Package os", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, } type Fetcher interface { Fetch(urlstring string) (urllist []string, err error) // Fetch(urlstring) method returns a slice of URLs found on the page. } func (f fakeFetcher) Fetch(urlstring string) ([]string, error) { if res, ok := f[urlstring]; ok { //https://tour.golang.org/flowcontrol/6 fmt.Printf("found: %s\n", urlstring) return res.urls, nil } fmt.Printf("missing: %s\n", urlstring) return nil, fmt.Errorf("not found: %s", urlstring) } // ###### Serial crawler ###### func Serial(url string, fetcher Fetcher, fetched map[string]bool) { if fetched[url] { return } fetched[url] = true urls, err := fetcher.Fetch(url) if err != nil { return } for _, u := range urls { Serial(u, fetcher, fetched) } return } // ###### Concurrent crawler with shared state and Mutex ###### func makeState() *fetchState { f := &fetchState{} f.fetched = make(map[string]bool) return f } type fetchState struct { mu sync.Mutex fetched map[string]bool } func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) { f.mu.Lock() if f.fetched[url] { f.mu.Unlock() return } f.fetched[url] = true f.mu.Unlock() urls, err := fetcher.Fetch(url) if err != nil { return } var done sync.WaitGroup for _, u := range urls { done.Add(1) go func(u string) { defer done.Done() ConcurrentMutex(u, fetcher, f) }(u) } done.Wait() return } // ###### Concurrent crawler with channels ###### func worker(url string, ch chan []string, fetcher Fetcher) { urls, err := fetcher.Fetch(url) if err != nil { ch <- []string{} } else { ch <- urls } } func master(ch chan []string, fetcher Fetcher) { n := 1 fetched := make(map[string]bool) for urls := range ch { for _, u := range urls { if fetched[u] == false { fetched[u] = true n += 1 go worker(u, ch, fetcher) } } n -= 1 if n == 0 { break } } } func ConcurrentChannel(url string, fetcher Fetcher) { ch := make(chan []string) go func() { ch <- []string{url} }() master(ch, fetcher) } // ###### main ###### func main() { fmt.Printf("=== Serial===\n") Serial("http://golang.org/", fetcher, make(map[string]bool)) //Serial version of crawler fmt.Printf("=== ConcurrentMutex ===\n") ConcurrentMutex("http://golang.org/", fetcher, makeState()) fmt.Printf("=== ConcurrentChannel ===\n") ConcurrentChannel("http://golang.org/", fetcher) }
爲了簡便起見,這其實只是一個假的爬蟲......並無涉及網絡訪問,它的做用就是在fetcher中創建一個string->fakeResult類型的hash table,表示每一個網頁上的連接列表,並經過爬蟲函數讀取它們。爲了演示go語言的併發,代碼中實現了三種函數:Serial,ConcurrentMutex,ConcurrentChannel網絡
在這段代碼中,首先定義了一個接口Fetcher(go中接口的概念和java類似),其中有一個方法Fetch,用於在fetcher中返回urlstring所對應的連接列表。和java不同,go語言中方法和函數不是一個概念:方法是面向對象中的概念。go中方法和函數最大的區別就是方法帶有一個接收器(Fetch()中的f fakeFetcher參數),表示調用f對象的Fetch()方法(用法即some_obj_f.Fetch(url),這樣就能夠自動適配不一樣對象的同名方法;而函數是面向過程當中的概念,函數只有輸入參數和輸出參數,和對象無關。多線程
在58行這裏的if有個神奇的用法,參考 https://tour.golang.org/flowcontrol/6併發
接下來咱們先來看serial的版本。它的輸入參數包括根域名url,fetcher(前面提到過的hash table),和一個bool數組fetched(用來記錄哪些網站被訪問過了)。注意163行這裏有個神奇的用法make(),參考https://www.jianshu.com/p/f01841004810。 serial函數自己比較簡單,就不贅述了,基本思路就是對fetcher中的每一個域名,遞歸抓取它下面的連接(在fakeResult裏面)。app
第二個版本是ConcurrentMutextcp
第三個版本是ConcurrentChannel,這個例子中用了Go channel。這部分能夠參考http://www.javashuo.com/article/p-rucmpbsr-gp.htmlide
When to use sharing and locks, versus channels?
RPC
基本概念5105都學過了.....這裏來看看用go語言如何實現吧。
在5105課上講過Reliable RPC的概念,講的是若是在server-client之間若是傳輸出了故障該怎麼辦。
17_reliable_comm 1. Reliable RPC: client-server
1.1 Server failure( client 不知道 server 啥時候掛的,是 operation 執行前仍是執行後) Sol: 分三種 operation semantics: Exactly once(保證操做剛好執行一次): impossible to achieve At least once(至少執行過一次): retry At most once(執行過 0 次或 1 次): send request only once 1.2 Client failure( client 已經掛了。 server 不必再執行了,浪費資源) Sol: Extermination: log at client stub and explicitly kill orphans
/ Reincarnation: Divide time into epochs between failures and delete computations from old epochs.
/ Expiration: give each RPC a fixed quantum T. Explicitly request extensions.
At least once適用於如下場景:If it's OK to repeat operations (e.g. read-only op), or if application has its own plan for coping w/ duplicates (which you will need for Lab 1)
at most once的思路是,server RPC code could detect duplicate requests, and returns previous reply instead of re-running the handler(RPC function). 在Lab2中就會用到這個方法。
Q: how to detect a duplicate request?
A: client includes unique ID (XID) when sending each request, and uses the same XID for re-send
server:
if seen[xid]:
r = old[xid]
else
r = handler()
old[xid] = r
seen[xid] = true
可是at most once也有個問題:若是server掛了,致使seen[]丟失了,那麼server就不知道哪一個xid曾經接收過了。
exactly once須要在at most once的基礎上增長容錯協議。這個會在Lab3中用到。
Go RPC is "at-most-once"
STEP1 open TCP connection
STEP2 write request to TCP connection
STEP3 TCP may retransmit, but server's TCP will filter out duplicates
There is no retry in Go code (i.e. will NOT create 2nd TCP connection)
Go RPC code returns an error if it doesn't get a reply, when
perhaps after a timeout (from TCP)
perhaps server didn't see request
perhaps server processed request but server/net failed before reply came back
下面以go語言寫的簡易key-value storage爲例:
Go example: kv.go
package main import ( "fmt" "log" "net" "net/rpc" "sync" ) // RPC request/reply definitions const ( OK = "OK" ErrNoKey = "ErrNoKey" ) type Err string type PutArgs struct { Key string Value string } type PutReply struct { Err Err } type GetArgs struct { Key string } type GetReply struct { Err Err Value string } // Client ------------------------------------------------------- func connect() *rpc.Client { //創建與server的鏈接 client, err := rpc.Dial("tcp", "127.0.0.1:1234") if err != nil { log.Fatal("dialing:", err) } return client } func get(key string) string { client := connect() args := GetArgs{"subject"} reply := GetReply{} err := client.Call("KV.Get", &args, &reply) //rpc調用server上的函數 if err != nil { log.Fatal("error:", err) } client.Close() //關閉鏈接 return reply.Value } func put(key string, val string) { client := connect() args := PutArgs{"subject", "6.824"} reply := PutReply{} err := client.Call("KV.Put", &args, &reply) if err != nil { log.Fatal("error:", err) } client.Close() } // Server ------------------------------------------------------- type KV struct { mu sync.Mutex //手動爲數據區設置一個鎖 data map[string]string } func server() { //創建server kv := new(KV) kv.data = map[string]string{} rpcs := rpc.NewServer() rpcs.Register(kv) l, e := net.Listen("tcp", ":1234") if e != nil { log.Fatal("listen error:", e) } go func() { for { conn, err := l.Accept() if err == nil { go rpcs.ServeConn(conn) } else { break } } l.Close() }() } func (kv *KV) Get(args *GetArgs, reply *GetReply) error { kv.mu.Lock() defer kv.mu.Unlock() val, ok := kv.data[args.Key] if ok { reply.Err = OK reply.Value = val } else { reply.Err = ErrNoKey reply.Value = "" } return nil } func (kv *KV) Put(args *PutArgs, reply *PutReply) error { kv.mu.Lock() defer kv.mu.Unlock() kv.data[args.Key] = args.Value reply.Err = OK return nil } // main ------------------------------------------------------- func main() { server() put("subject", "6.824") fmt.Printf("Put(subject, 6.824) done\n") fmt.Printf("get(subject) -> %s\n", get("subject")) }
邏輯仍是比較簡單的...比java thrift簡潔多了。
Ref:
https://golang.org/doc/effective_go.html
https://golang.org/pkg/net/rpc/
https://tour.golang.org/concurrency/10
https://www.cnblogs.com/pdev/p/10936485.html