本文以roundRobin爲例介紹gRPC負載均衡實現。git
https://github.com/messixukej...
在liangzhiyang/annotate-grpc-go基礎上補充了部分註釋github
負載均衡器: type Balancer interface{ //啓動負載均衡器,dialing的時候調用。 Start(target string, config BalancerConfig) error //通知負載均衡器由新地址鏈接ok Up(addr Address) (down func(error)) //獲取下一個有效的負載均衡地址 Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) //地址更新通知channel,這裏返回的是全量地址 Notify() <-chan[]Address Close() error } //地址解析器接口 type Resolver interface{ //解析器Resolver 用於建立 服務地址watcher。 Resolve(target string) (Watcher, error) } //服務地址發現器接口 //watcher用於觀察服務地址的變化 type Watcher interface{ //阻塞接口,知道有服務地址變化或者錯誤發生 Next() ([]*Update, error) Close() }
type testWatcher struct{ //用於接收地址更新 update chan*naming.Update //用於表示多少個更新發生 side chan int //用於通知地址注入者更新讀已結束 readDone chanint }
一、注入負載均衡規則數據結構
使用Dial時,使用WithBalance注入負載均衡規則 func WithBalancer(b Balancer) DialOption { return func(o *dialOptions) { o.balancer = b } } 例如,這裏注入了RoundRobin 輪尋規則。 cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
二、 啓動負載均衡器負載均衡
roundRobin.Start -> testNameResolver.Resolve 生成服務地址發現器 -> 創建服務地址發現任務 watchAddrUpdates(新的goroutine循環監控地址)
三、服務地址發現器(開啓負載均衡後,無論是初始化仍是過程當中的變化,都是經過服務地址發現器來獲取服務端地址)ide
roundRobin.watchAddrUpdates ->阻塞在watcher.Next獲取地址變動動做 -> 更新roundRobin.addrs -> 寫入roundRobin.addrCh
3.一、開工過程,根據balancer.Notify獲取服務端地址 -> 根據地址列表建立鏈接resetAddrConn
地址注入方式:Resolve中將服務端初始地址注入code
3.二、動態變動地址 ClientConn.lbWatcher,根據balancer.Notify獲取服務端地址 -> 新增地址resetAddrConn,刪除地址tearDown。
地址注入方式:testWatcher.inject接口
3.3 建立鏈接細化resetAddrConn->resetTransport->roundRobin.Uprpc
3.4 刪除鏈接細化tearDown->roundRobin.downget
四、提交記錄string
Invoke->getTransport->balancer.Get獲取有效地址(策略:根據connected狀態,循環找到有效地址。若是找不到則阻塞在waitCh) waitCh阻塞解除由3.3觸發