MIT 6.824學習筆記2 RPC/Thread

本節內容:Lect 2   RPC and Threadshtml


 

線程:Threads allow one program to (logically) execute many things at once.
The threads share memory. However, each thread includes some per-thread state: program counter, registers, stack.

java

下面以go語言寫一個爬蟲做爲例子來介紹線程:golang

Go example: crawler.go數組

package main
import (
	"fmt"
	"sync"
)
// Several solutions to the crawler exercise from the Go tutorial: https://tour.golang.org/concurrency/10


type fakeResult struct {
	body string
	urls []string
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
	"http://golang.org/": &fakeResult{
		"Title: The Go Programming Language",
		[]string{
			"http://golang.org/pkg/",
			"http://golang.org/cmd/",
		},
	},
	"http://golang.org/pkg/": &fakeResult{
		"Title: Packages",
		[]string{
			"http://golang.org/",
			"http://golang.org/cmd/",
			"http://golang.org/pkg/fmt/",
			"http://golang.org/pkg/os/",
		},
	},
	"http://golang.org/pkg/fmt/": &fakeResult{
		"Title: Package fmt",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
	"http://golang.org/pkg/os/": &fakeResult{
		"Title: Package os",
		[]string{
			"http://golang.org/",
			"http://golang.org/pkg/",
		},
	},
}

type Fetcher interface {
	Fetch(urlstring string) (urllist []string, err error)
	// Fetch(urlstring) method returns a slice of URLs found on the page.
}

func (f fakeFetcher) Fetch(urlstring string) ([]string, error) {
	if res, ok := f[urlstring]; ok {
		//https://tour.golang.org/flowcontrol/6
		fmt.Printf("found:   %s\n", urlstring)
		return res.urls, nil
	}
	fmt.Printf("missing: %s\n", urlstring)
	return nil, fmt.Errorf("not found: %s", urlstring)
}


// ###### Serial crawler ######

func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
	if fetched[url] {
		return
	}
	fetched[url] = true
	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	for _, u := range urls {
		Serial(u, fetcher, fetched)
	}
	return
}

// ###### Concurrent crawler with shared state and Mutex ######

func makeState() *fetchState {
	f := &fetchState{}
	f.fetched = make(map[string]bool)
	return f
}

type fetchState struct {
	mu      sync.Mutex
	fetched map[string]bool
}

func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
	f.mu.Lock()
	if f.fetched[url] {
		f.mu.Unlock()
		return
	}
	f.fetched[url] = true
	f.mu.Unlock()

	urls, err := fetcher.Fetch(url)
	if err != nil {
		return
	}
	var done sync.WaitGroup
	for _, u := range urls {
		done.Add(1)
		go func(u string) {
			defer done.Done()
			ConcurrentMutex(u, fetcher, f)
		}(u)
	}
	done.Wait()
	return
}

// ###### Concurrent crawler with channels ######

func worker(url string, ch chan []string, fetcher Fetcher) {
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{}
	} else {
		ch <- urls
	}
}

func master(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch {
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
				go worker(u, ch, fetcher)
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

func ConcurrentChannel(url string, fetcher Fetcher) {
	ch := make(chan []string)
	go func() {
		ch <- []string{url}
	}()
	master(ch, fetcher)
}

// ###### main ######

func main() {
	fmt.Printf("=== Serial===\n")
	Serial("http://golang.org/", fetcher, make(map[string]bool))	//Serial version of crawler

	fmt.Printf("=== ConcurrentMutex ===\n")
	ConcurrentMutex("http://golang.org/", fetcher, makeState())

	fmt.Printf("=== ConcurrentChannel ===\n")
	ConcurrentChannel("http://golang.org/", fetcher)
}
View Code

 

爲了簡便起見,這其實只是一個假的爬蟲......並無涉及網絡訪問,它的做用就是在fetcher中創建一個string->fakeResult類型的hash table,表示每一個網頁上的連接列表,並經過爬蟲函數讀取它們。爲了演示go語言的併發,代碼中實現了三種函數:Serial,ConcurrentMutex,ConcurrentChannel網絡

在這段代碼中,首先定義了一個接口Fetcher(go中接口的概念和java類似),其中有一個方法Fetch,用於在fetcher中返回urlstring所對應的連接列表。和java不同,go語言中方法和函數不是一個概念:方法是面向對象中的概念。go中方法和函數最大的區別就是方法帶有一個接收器(Fetch()中的f fakeFetcher參數),表示調用f對象的Fetch()方法(用法即some_obj_f.Fetch(url),這樣就能夠自動適配不一樣對象的同名方法;而函數是面向過程當中的概念,函數只有輸入參數和輸出參數,和對象無關。多線程

在58行這裏的if有個神奇的用法,參考 https://tour.golang.org/flowcontrol/6併發

接下來咱們先來看serial的版本。它的輸入參數包括根域名url,fetcher(前面提到過的hash table),和一個bool數組fetched(用來記錄哪些網站被訪問過了)。注意163行這裏有個神奇的用法make(),參考https://www.jianshu.com/p/f01841004810。  serial函數自己比較簡單,就不贅述了,基本思路就是對fetcher中的每一個域名,遞歸抓取它下面的連接(在fakeResult裏面)。app

 

第二個版本是ConcurrentMutextcp

  • 注意它的輸入參數fetchState,裏面除了bool數組以外還多了一個互斥鎖mu。它的原理就是用來給共享變量fetched加鎖,保證在多線程爬蟲時,每次只有一個線程能訪問fetched變量。當mu已經被鎖上時,任何試圖訪問它的線程都會阻塞在mu.Lock()處,直到mu被釋放掉才能往下進行(能夠理解爲二元信號量的wait操做)。而對於每一個域名下面的連接,會再啓動一個ConcurrentMutex線程來抓取,而不是單純的遞歸,這樣就實現了多線程。  
  • 另外110行有一個var done sync.WaitGroup,這個是用來肯定什麼時候結束爬蟲的,WaitGroup 對象內部有一個計數器,最初從0開始,它有三個方法:Add(), Done(), Wait() 用來控制計數器的數量。Add(n) 把計數器設置爲n ,Done() 每次把計數器-1 ,wait() 會阻塞代碼的運行,直到計數器地值減爲0 (能夠理解爲counting semaphore)[參考https://studygolang.com/articles/12972?fr=sidebar]。  
  • 注意113-116行有一個匿名函go func,這個是go中的協程(go routine),做用有點像c裏面的fork(),能夠理解爲新開了一個線程。函數裏面的語句會在一個新建的go routine裏執行。這樣就實現了併發訪問多個url
  • 114行有個defer done.Done(),defer關鍵字的含義是:defer後面的函數在defer語句所在的函數執行結束的時候會被調用。這裏也就是func函數運行結束後(ConcurrentMutex以後)把計數器-1。       另外用defer和不用defer的一大不一樣點就是,defer後面緊跟的函數值和函數參數會當即被求值(函數體會當即執行),但函數不會當即調用(不會當即被return),本例中還看不出來這一點,能夠參考http://www.javashuo.com/article/p-tlszkeag-ek.htmlhttp://www.javashuo.com/article/p-tlszkeag-ek.html
  • 這個版本雖然實現了必定程度上的併發,可是對fetched的訪問仍然是serial的。若是其中發生了不少的race,那麼總體速度就被拖慢了。

 

第三個版本是ConcurrentChannel,這個例子中用了Go channel。這部分能夠參考http://www.javashuo.com/article/p-rucmpbsr-gp.htmlide

 

When to use sharing and locks, versus channels?

  •   Most problems can be solved in either style
  •   What makes the most sense depends on how the programmer thinks
    •   state -- sharing and locks
    •   communication -- channels
    •   waiting for events -- channels
  •   Use Go's race detector:
    •   https://golang.org/doc/articles/race_detector.html
    •   go test -race

 


RPC

基本概念5105都學過了.....這裏來看看用go語言如何實現吧。

在5105課上講過Reliable RPC的概念,講的是若是在server-client之間若是傳輸出了故障該怎麼辦。

17_reliable_comm 1. Reliable RPC: client-server 
1.1 Server failure( client 不知道 server 啥時候掛的,是 operation 執行前仍是執行後) Sol: 分三種 operation semantics: Exactly once(保證操做剛好執行一次): impossible to achieve At least once(至少執行過一次): retry At most once(執行過 0 次或 1 次): send request only once 1.2 Client failure( client 已經掛了。 server 不必再執行了,浪費資源) Sol: Extermination: log at client stub and explicitly kill orphans
/ Reincarnation: Divide time into epochs between failures and delete computations from old epochs.
/ Expiration: give each RPC a fixed quantum T. Explicitly request extensions.

At least once適用於如下場景:If it's OK to repeat operations (e.g. read-only op), or if application has its own plan for coping w/ duplicates (which you will need for Lab 1)

at most once的思路是,server RPC code could detect duplicate requests, and returns previous reply instead of re-running the handler(RPC function).  在Lab2中就會用到這個方法。

  Q: how to detect a duplicate request?
  A: client includes unique ID (XID) when sending each request, and uses the same XID for re-send
      server:
      if seen[xid]:
          r = old[xid]
      else
          r = handler()
          old[xid] = r
          seen[xid] = true
可是at most once也有個問題:若是server掛了,致使seen[]丟失了,那麼server就不知道哪一個xid曾經接收過了。

exactly once須要在at most once的基礎上增長容錯協議。這個會在Lab3中用到。

 

Go RPC is "at-most-once"
  STEP1  open TCP connection
  STEP2  write request to TCP connection
  STEP3  TCP may retransmit, but server's TCP will filter out duplicates


There is no retry in Go code (i.e. will NOT create 2nd TCP connection)
    Go RPC code returns an error if it doesn't get a reply, when
          perhaps after a timeout (from TCP)
          perhaps server didn't see request
          perhaps server processed request but server/net failed before reply came back

 

下面以go語言寫的簡易key-value storage爲例:

Go example: kv.go

package main
import (
	"fmt"
	"log"
	"net"
	"net/rpc"
	"sync"
)

// RPC request/reply definitions

const (
	OK       = "OK"
	ErrNoKey = "ErrNoKey"
)

type Err string

type PutArgs struct {
	Key   string
	Value string
}

type PutReply struct {
	Err Err
}

type GetArgs struct {
	Key string
}

type GetReply struct {
	Err   Err
	Value string
}

// Client -------------------------------------------------------

func connect() *rpc.Client {                           //創建與server的鏈接
	client, err := rpc.Dial("tcp", "127.0.0.1:1234")
	if err != nil {
		log.Fatal("dialing:", err)
	}
	return client
}

func get(key string) string {
	client := connect()
	args := GetArgs{"subject"}
	reply := GetReply{}
	err := client.Call("KV.Get", &args, &reply)    //rpc調用server上的函數
	if err != nil {
		log.Fatal("error:", err)
	}
	client.Close()                                 //關閉鏈接
	return reply.Value
}

func put(key string, val string) {
	client := connect()
	args := PutArgs{"subject", "6.824"}
	reply := PutReply{}
	err := client.Call("KV.Put", &args, &reply)
	if err != nil {
		log.Fatal("error:", err)
	}
	client.Close()
}

// Server -------------------------------------------------------

type KV struct {
	mu   sync.Mutex                                //手動爲數據區設置一個鎖
	data map[string]string
}

func server() {                                        //創建server
	kv := new(KV)
	kv.data = map[string]string{}
	rpcs := rpc.NewServer()
	rpcs.Register(kv)
	l, e := net.Listen("tcp", ":1234")
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go func() {
		for {
			conn, err := l.Accept()
			if err == nil {
				go rpcs.ServeConn(conn)
			} else {
				break
			}
		}
		l.Close()
	}()
}

func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
	kv.mu.Lock()
	defer kv.mu.Unlock()

	val, ok := kv.data[args.Key]
	if ok {
		reply.Err = OK
		reply.Value = val
	} else {
		reply.Err = ErrNoKey
		reply.Value = ""
	}
	return nil
}

func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
	kv.mu.Lock()
	defer kv.mu.Unlock()

	kv.data[args.Key] = args.Value
	reply.Err = OK
	return nil
}

// main -------------------------------------------------------

func main() {
	server()

	put("subject", "6.824")
	fmt.Printf("Put(subject, 6.824) done\n")
	fmt.Printf("get(subject) -> %s\n", get("subject"))
}

邏輯仍是比較簡單的...比java thrift簡潔多了。

 

 


 

 

Ref:

https://golang.org/doc/effective_go.html

https://golang.org/pkg/net/rpc/

https://tour.golang.org/concurrency/10

https://www.cnblogs.com/pdev/p/10936485.html

相關文章
相關標籤/搜索