上一篇文章裏咱們實現了基本的RPC客戶端和服務端,此次咱們開始着手實現更上層的功能。篇幅所限,具體的代碼實現參見:代碼地址git
client實現github
server實現緩存
首先讓咱們來從新定義Client和Server:SGClient和SGServer。SGClient
封裝了上一節定義的RPCClient的操做,提供服務治理的相關特性;SGServer
則由上一節定義的RPCServer升級而來,支持服務治理的相關特性。這裏的SG(service governance)表示服務治理。 這裏直接貼上相關的定義:服務器
type SGClient interface {
Go(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
}
type sgClient struct {
shutdown bool
option SGOption
clients sync.Map //map[string]RPCClient
serversMu sync.RWMutex
servers []registry.Provider
}
type RPCServer interface {
Register(rcvr interface{}, metaData map[string]string) error
Serve(network string, addr string) error
Services() []ServiceInfo
Close() error
}
type SGServer struct { //原來的RPCServer
codec codec.Codec
serviceMap sync.Map
tr transport.ServerTransport
mutex sync.Mutex
shutdown bool
Option Option
}
複製代碼
在以前的文章提到過,咱們須要提供過濾器同樣的使用方式,來達到對擴展開放對修改關閉的目標。咱們這裏採用高階函數的方式來定義方切面和法攔截器,首先定義幾個切面:網絡
//客戶端切面
type CallFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error type GoFunc func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call //服務端切面 type ServeFunc func(network string, addr string) error type ServeTransportFunc func(tr transport.Transport) type HandleRequestFunc func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) 複製代碼
以上幾個是RPC調用在客戶端和服務端會通過的幾個函數,咱們將其定義爲切面,而後再定義對應的攔截器:app
//客戶端攔截器
packege client
type Wrapper interface {
WrapCall(option *SGOption, callFunc CallFunc) CallFunc
WrapGo(option *SGOption, goFunc GoFunc) GoFunc
}
//f服務端攔截器
package server
type Wrapper interface {
WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc
WrapServeTransport(s *SGServer, transportFunc ServeTransportFunc) ServeTransportFunc
WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc
}
複製代碼
這樣一來,用戶能夠經過實現Wapper
接口來對客戶端或者服務端的行爲進行加強,好比將請求參數和結果記錄到日誌裏,動態的修改參數或者響應等等。咱們的框架自身 的相關功能也能夠經過Wrapper
實現。目前客戶端實現了用於封裝元數據的MetaDataWrapper
和記錄請求和響應的LogWrapper
;服務端目前在DefaultWrapper
實現了用於服務註冊、監聽退出信號以及請求計數的邏輯。負載均衡
由於go並不提供抽象類的方式,因此對於某些實現類可能並不須要攔截全部切面(好比只攔截Call不想攔截Go),這種狀況直接返回參數裏的函數對象就能夠了。框架
客戶端攔截器實現分佈式
服務端攔截器實現ide
在這以前,咱們的RPC服務調用都是經過在客戶端指定服務端的ip和端口來調用的,這種方式十分簡單但也場景十分有限,估計只能在測試或者demo中使用。因此咱們須要提供服務註冊和發現相關的功能,讓客戶端的配置再也不與實際的IP綁定,而是經過獨立的註冊中心獲取服務端的列表,而且可以在服務端節點變動時得到實時更新。
首先定義相關的接口(代碼地址):
//Registry包含兩部分功能:服務註冊(用於服務端)和服務發現(用於客戶端)
type Registry interface {
Register(option RegisterOption, provider ...Provider) //註冊
Unregister(option RegisterOption, provider ...Provider) //註銷
GetServiceList() []Provider //獲取服務列表
Watch() Watcher //監聽服務列表的變化
Unwatch(watcher Watcher) //取消監聽
}
type RegisterOption struct {
AppKey string //AppKey用於惟一標識某個應用
}
type Watcher interface {
Next() (*Event, error) //獲取下一次服務列表的更新
Close()
}
type EventAction byte
const (
Create EventAction = iota
Update
Delete
)
type Event struct { //Event表示一次更新
Action EventAction
AppKey string
Providers []Provider //具體變化的服務提供者(增量而不是全量)
}
type Provider struct { //某個具體的服務提供者
ProviderKey string // Network+"@"+Addr
Network string
Addr string
Meta map[string]string
}
複製代碼
AppKey
咱們使用AppKey這樣一個概念來標識某個服務,好比com.meituan.demo.rpc.server
。服務端在啓動時將自身的相關信息(包括AppKey、ip、port、方法列表等)註冊到註冊中心;客戶端在須要調用時只須要根據服務端的AppKey到註冊中心查找便可。
目前暫時只實現了直連(peer2peer)和基於內存(InMemory)的服務註冊,後續再接入其餘獨立的組件如etcd或者zookeeper等等。
有了服務註冊與發現以後,一個客戶端所面對的可能就不僅有一個服務端了,客戶端在發起調用前須要從多個服務端中選擇一個出來進行實際的通訊,具體的選擇策略有不少,好比隨機選擇、輪詢、基於權重選擇、基於服務端負載或者自定義規則等等。
這裏先給出接口定義:
//Filter用於自定義規則過濾某個節點
type Filter func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool type SelectOption struct {
Filters []Filter
}
type Selector interface {
Next(providers []registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}, opt SelectOption) (registry.Provider, error)
}
複製代碼
目前暫時只實現了隨機負載均衡,後續再實現其餘策略好比輪詢或者一致性哈希等等,用戶也能夠選擇實現本身的負載均衡策略。
爲了減小頻繁建立和斷開網絡鏈接的開銷,咱們維持了客戶端到服務端的長鏈接,並把建立好的鏈接(RPCClient對象)用map緩存起來,key就是對應的服務端的標識。客戶端在調用前根據負載均衡的結果檢索到緩存好的RPCClient而後發起調用。當咱們檢索不到對應的客戶端或者發現緩存的客戶端已經失效時,須要從新創建鏈接(從新建立RPCClient對象)。
func (c *sgClient) selectClient(ctx context.Context, ServiceMethod string, arg interface{}) (provider registry.Provider, client RPCClient, err error) {
//根據負載均衡決定要調用的服務端
provider, err = c.option.Selector.Next(c.providers(), ctx, ServiceMethod, arg, c.option.SelectOption)
if err != nil {
return
}
client, err = c.getClient(provider)
return
}
func (c *sgClient) getClient(provider registry.Provider) (client RPCClient, err error) {
key := provider.ProviderKey
rc, ok := c.clients.Load(key)
if ok {
client := rc.(RPCClient)
if client.IsShutDown() {
//若是已經失效則清除掉
c.clients.Delete(key)
}
}
//再次檢索
rc, ok = c.clients.Load(key)
if ok {
//已經有緩存了,返回緩存的RPCClient
client = rc.(RPCClient)
} else {
//沒有緩存,新建一個而後更新到緩存後返回
client, err = NewRPCClient(provider.Network, provider.Addr, c.option.Option)
if err != nil {
return
}
c.clients.Store(key, client)
}
return
}
複製代碼
目前的實現當中,每一個服務提供者只有一個對應的RPCClient,後續能夠考慮相似鏈接池的實現,即每一個服務提供者對應多個RPCClient,每次調用前從鏈接池中取出一個RPCClient。
在分佈式系統中,異常是不可避免的,當發生調用失敗時,咱們能夠選擇要採起的處理方式,這裏列舉了常見的幾種:
type FailMode byte
const (
FailFast FailMode = iota //快速失敗
FailOver //重試其餘服務器
FailRetry //重試同一個服務器
FailSafe //忽略失敗,直接返回
)
複製代碼
具體實現比較簡單,就是根據配置的容錯選項和重試次數決定是否重試;其餘包括FailBack(延時一段時間後重發)、Fork以及Broadcast等等暫時沒有實現。
在收到程序退出信號時,server端會嘗試優先處理完當前還未結束的請求,等請求處理完畢以後再退出,當超出了指定的時間(默認12s)仍未處理完畢時,server端會直接退出。
func (s *SGServer) Close() error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.shutdown = true
//等待當前請求處理完或者直到指定的時間
ticker := time.NewTicker(s.Option.ShutDownWait)
defer ticker.Stop()
for {
if s.requestInProcess <= 0 { //requestInProcess表示當前正在處理的請求數,在wrapper裏計數
break
}
select {
case <-ticker.C:
break
}
}
return s.tr.Close()
}
複製代碼
到這裏就是此次的所有內容了,總的來講是在以前的基礎上作了封裝,預留了後續的擴展點,而後實現了簡單的服務治理相關的功能。總結一下,此次咱們在上一篇文章的基礎上作了如下改動: