做者 | 李志信java
導讀:有了上一篇文章 《Dubbo-go 源碼筆記(一)Server 端開啓服務過程》的鋪墊,能夠類比客戶端啓動於服務端的啓動過程。其中最大的區別是服務端經過 zk 註冊服務,發佈本身的ivkURL並訂閱事件開啓監聽;而客戶應該是經過zk註冊組件,拿到須要調用的serviceURL,更新invoker並重寫用戶的RPCService,從而實現對遠程過程調用細節的封裝。
helloworld 提供的 demo:profiles/client.yaml。設計模式
registries : "demoZk": protocol: "zookeeper" timeout : "3s" address: "127.0.0.1:2181" username: "" password: "" references: "UserProvider": # 能夠指定多個registry,使用逗號隔開;不指定默認向全部註冊中心註冊 registry: "demoZk" protocol : "dubbo" interface : "com.ikurento.user.UserProvider" cluster: "failover" methods : - name: "GetUser" retries: 3
可看到配置文件與以前討論過的 Server 端很是相似,其 refrences 部分字段就是對當前服務要主調的服務的配置,其中詳細說明了調用協議、註冊協議、接口 id、調用方法、集羣策略等,這些配置都會在以後與註冊組件交互、重寫 ivk、調用的過程當中使用到。數組
user.go:緩存
func init() { config.SetConsumerService(userProvider) hessian.RegisterPOJO(&User{}) }
main.go:app
func main() { hessian.RegisterPOJO(&User{}) config.Load() time.Sleep(3e9) println("\n\n\nstart to test dubbo") user := &User{} err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user) if err != nil { panic(err) } println("response result: %v\n", user) initSignal() }
在官網提供的 helloworld demo 的源碼中,可看到與服務端相似,在 user.go 內註冊了 rpc-service,以及須要 rpc 傳輸的結構體 user。負載均衡
在 main 函數中,一樣調用了 config.Load() 函數,以後就能夠經過實現好的 rpc-service:userProvider 直接調用對應的功能函數,便可實現 rpc 調用。框架
能夠猜到,從 hessian 註冊結構、SetConsumerService,到調用函數 .GetUser() 期間,用戶定義的 rpc-service 也就是 userProvider 對應的函數被重寫,重寫後的 GetUser 函數已經包含實現了遠程調用邏輯的 invoker。less
接下來,就要經過閱讀源碼,看看 dubbo-go 是如何作到的。異步
// file: config/config_loader.go :Load() // Load Dubbo Init func Load() { // init router initRouter() // init the global event dispatcher extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType) // start the metadata report if config set if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil { logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err) return } // reference config loadConsumerConfig()
在 main 函數中調用了 config.Load() 函數,進而調用了 loadConsumerConfig,相似於以前講到的 server 端配置讀入函數。ide
在 loadConsumerConfig 函數中,進行了三步操做:
// config/config_loader.go func loadConsumerConfig() { // 1 init other consumer config conConfigType := consumerConfig.ConfigType for key, value := range extension.GetDefaultConfigReader() {} checkApplicationName(consumerConfig.ApplicationConfig) configCenterRefreshConsumer() checkRegistries(consumerConfig.Registries, consumerConfig.Registry) // 2 refer-implement-reference for key, ref := range consumerConfig.References { if ref.Generic { genericService := NewGenericService(key) SetConsumerService(genericService) } rpcService := GetConsumerService(key) ref.id = key ref.Refer(rpcService) ref.Implement(rpcService) } // 3 wait for invoker is available, if wait over default 3s, then panic for {} }
其中重要的就是 for 循環裏面的引用和實例化,兩步操做,會在接下來展開討論。
至此,配置已經被寫入了框架。
上述的 ref.Refer 完成的就是這部分的操做。
圖(一)
和 server 端相似,存在註冊 url 和服務 url,dubbo 習慣將服務 url 做爲註冊 url 的 sub。
// file: config/reference_config.go: Refer() func (c *ReferenceConfig) Refer(_ interface{}) { //(一)配置url參數(serviceUrl),將會做爲sub cfgURL := common.NewURLWithOptions( common.WithPath(c.id), common.WithProtocol(c.Protocol), common.WithParams(c.getUrlMap()), common.WithParamsValue(constant.BEAN_NAME_KEY, c.id), ) ... // (二)註冊地址能夠經過url格式給定,也能夠經過配置格式給定 // 這一步的意義就是配置->提取信息生成URL if c.Url != "" {// 用戶給定url信息,能夠是點對點的地址,也能夠是註冊中心的地址 // 1. user specified URL, could be peer-to-peer address, or register center's address. urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*") for _, urlStr := range urlStrings { serviceUrl, err := common.NewURL(urlStr) ... } } else {// 配置讀入註冊中心的信息 // assemble SubURL from register center's configuration mode // 這是註冊url,protocol = registry,包含了zk的用戶名、密碼、ip等等 c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER) ... // set url to regUrls for _, regUrl := range c.urls { regUrl.SubURL = cfgURL// regUrl的subURl存當前配置url } } //至此,不管經過什麼形式,已經拿到了所有的regURL // (三)獲取registryProtocol實例,調用其Refer方法,傳入新構建好的regURL if len(c.urls) == 1 { // 這一步訪問到registry/protocol/protocol.go registryProtocol.Refer // 這裏是registry c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0]) } else { // 若是有多個註冊中心,即有多個invoker,則採起集羣策略 invokers := make([]protocol.Invoker, 0, len(c.urls)) ... }
這個函數中,已經處理完從 Register 配置到 RegisterURL 的轉換,即圖(一)中部分:
接下來,已經拿到的 url 將被傳遞給 RegistryProtocol,進一步 refer。
// file: registry/protocol/protocol.go: Refer // Refer provider service from registry center // 拿到的是配置文件registries的url,他可以生成一個invoker = 指向目的addr,以供客戶端直接調用。 func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url // 這裏拿到的是referenceConfig,serviceUrl裏面包含了Reference的全部信息,包含interfaceName、method等等 var serviceUrl = registryUrl.SubURL if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry" protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "") registryUrl.Protocol = protocol//替換成了具體的值,好比"zookeeper" } // 接口對象 var reg registry.Registry // (一)實例化接口對象,緩存策略 if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded { // 緩存中不存在當前registry,新建一個reg reg = getRegistry(®istryUrl) // 緩存起來 proto.registries.Store(registryUrl.Key(), reg) } else { reg = regI.(registry.Registry) } // 到這裏,獲取到了reg實例 zookeeper的registry //(二)根據Register的實例zkRegistry和傳入的regURL新建一個directory // 這一步存在複雜的異步邏輯,從註冊中心拿到了目的service的真實addr,獲取了invoker並放入directory, // 這一步將在下面詳細給出步驟 // new registry directory for store service url from registry directory, err := extension.GetDefaultRegistryDirectory(®istryUrl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) return nil } // (三)DoRegister 在zk上註冊當前client service err = reg.Register(*serviceUrl) if err != nil { logger.Errorf("consumer service %v register registry %v error, error message is %s", serviceUrl.String(), registryUrl.String(), err.Error()) } // (四)new cluster invoker,將directory寫入集羣,得到具備集羣策略的invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) invoker := cluster.Join(directory) // invoker保存 proto.invokers = append(proto.invokers, invoker) return invoker }
可詳細閱讀上述註釋,這個函數完成了從 url 到 invoker 的所有過程:
(一)首先得到 Registry 對象,默認是以前實例化的 zkRegistry,和以前 server 獲取 Registry 的處理很相似。
(二)經過構造一個新的 directory,異步拿到以前在 zk 上註冊的 server 端信息,生成 invoker。
(三)在 zk 上註冊當前 service。
(四)集羣策略,得到最終 invoker。
這一步完成了圖(一)中全部餘下的絕大多數操做,接下來就須要詳細地查看 directory 的構造過程。
圖(二)
上述的 extension.GetDefaultRegistryDirectory(®istryUrl, reg)
函數,本質上調用了已經註冊好的 NewRegistryDirectory
函數:
// file: registry/directory/directory.go: NewRegistryDirectory() // NewRegistryDirectory will create a new RegistryDirectory // 這個函數做爲default註冊在extension上面 // url爲註冊url,reg爲zookeeper registry func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) { if url.SubURL == nil { return nil, perrors.Errorf("url is invalid, suburl can not be nil") } dir := &RegistryDirectory{ BaseDirectory: directory.NewBaseDirectory(url), cacheInvokers: []protocol.Invoker{}, cacheInvokersMap: &sync.Map{}, serviceType: url.SubURL.Service(), registry: registry, } dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) go dir.subscribe(url.SubURL) return dir, nil }
首先構造了一個註冊 directory,開啓協程調用其 subscribe 函數,傳入 serviceURL。
這個 directory 目前包含了對應的 zkRegistry,以及傳入的 URL,它的 cacheInvokers 部分是空的。
進入 dir.subscribe(url.SubURL) 這個異步函數:
/ file: registry/directory/directory.go: subscribe() // subscribe from registry func (dir *RegistryDirectory) subscribe(url *common.URL) { // 增長兩個監聽, dir.consumerConfigurationListener.addNotifyListener(dir) dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url) // subscribe調用 dir.registry.Subscribe(url, dir) }
重點來了,它調用了 zkRegistry 的 Subscribe 方法,與此同時將本身做爲 ConfigListener 傳入。
我認爲這種傳入 listener 的設計模式很是值得學習,並且頗有 java 的味道。
針對等待 zk 返回訂閱信息這樣的異步操做,須要傳入一個 Listener,這個 Listener 須要實現 Notify 方法,進而在做爲參數傳入內部以後,能夠被異步地調用 Notify,將內部觸發的異步事件「傳遞出來」,再進一步處理加工。
層層的 Listener 事件鏈,能將傳入的原始 serviceURL 經過 zkConn 發送給 zk 服務,獲取到服務端註冊好的 url 對應的二進制信息。
而 Notify 回調鏈,則將這串 byte[] 一步一步解析、加工;以事件的形式向外傳遞,最終落到 directory 上的時候,已是成型的 newInvokers 了。
具體細節再也不以源碼形式展現,可參照上圖查閱源碼。
至此已經拿到了 server 端註冊好的真實 invoker。
完成了圖(一)中的部分:
通過上述操做,已經拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 數組裏面緩存。
後續的操做對應本文從 url 到 invoker 的過程的最後一步,由 directory 生成帶有特性集羣策略的 invoker。
// (四)new cluster invoker,將directory寫入集羣,得到具備集羣策略的invoker cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) invoker := cluster.Join(directory) 123
Join 函數的實現就是以下函數:
// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker() func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker { return &failoverClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } } 12345
dubbo-go 框架默認選擇 failover 策略,既然返回了一個 invoker,咱們查看一下 failoverClusterInvoker 的 Invoker 方法,看它是如何將集羣策略封裝到 Invoker 函數內部的:
// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker() // Invoker 函數 func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { ... //調用List方法拿到directory緩存的全部invokers invokers := invoker.directory.List(invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil {// 檢查是否能夠實現調用 return &protocol.RPCResult{Err: err} } // 獲取來自用戶方向傳入的 methodName := invocation.MethodName() retries := getRetries(invokers, methodName) loadBalance := getLoadBalance(invokers[0], invocation) for i := 0; i <= retries; i++ { // 重要!這裏是集羣策略的體現,失敗後重試! //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if i > 0 { if err := invoker.checkWhetherDestroyed(); err != nil { return &protocol.RPCResult{Err: err} } invokers = invoker.directory.List(invocation) if err := invoker.checkInvokers(invokers, invocation); err != nil { return &protocol.RPCResult{Err: err} } } // 這裏是負載均衡策略的體現!選擇特定ivk進行調用。 ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked) if ivk == nil { continue } invoked = append(invoked, ivk) //DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { providers = append(providers, ivk.GetUrl().Key()) continue } return result } ... }
看了不少 Invoke 函數的實現,全部相似的 Invoker 函數都包含兩個方向:一個是用戶方向的 invcation;一個是函數方向的底層 invokers。
而集羣策略的 invoke 函數自己做爲接線員,把 invocation 一步步解析,根據調用需求和集羣策略,選擇特定的 invoker 來執行。
proxy 函數也是這樣,一個是用戶方向的 ins[] reflect.Type, 一個是函數方向的 invoker。
proxy 函數負責將 ins 轉換爲 invocation,調用對應 invoker 的 invoker 函數,實現連通。
而出於這樣的設計,能夠在一步步 Invoker 封裝的過程當中,每一個 Invoker 只關心本身負責操做的部分,從而使整個調用棧解耦。
妙啊!!!
至此,咱們理解了 failoverClusterInvoker 的 Invoke 函數實現,也正是和這個集羣策略 Invoker 被返回,接受來自上方的調用。
已完成圖(一)中的:
拿到 invokers 後,能夠回到這個函數了:
// file: config/refrence_config.go: Refer() if len(c.urls) == 1 { // 這一步訪問到registry/protocol/protocol.go registryProtocol.Refer c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0]) // (一)拿到了真實的invokers } else { // 若是有多個註冊中心,即有多個invoker,則採起集羣策略 invokers := make([]protocol.Invoker, 0, len(c.urls)) ... cluster := extension.GetCluster(hitClu) // If 'zone-aware' policy select, the invoker wrap sequence would be: // ZoneAwareClusterInvoker(StaticDirectory) -> // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } // (二)create proxy,爲函數配置代理 if c.Async { callback := GetCallback(c.id) c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL) } else { // 這裏c.invoker已是目的addr了 c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL) }
咱們有了能夠打通的 invokers,但還不能直接調用,由於 invoker 的入參是 invocation,而調用函數使用的是具體的參數列表,須要經過一層 proxy 來規範入參和出參。
接下來新建一個默認 proxy,放置在 c.proxy 內,以供後續使用。
至此,完成了圖(一)中最後的操做:
上面完成了 config.Refer 操做,回到:
config/config_loader.go: loadConsumerConfig()
下一個重要的函數是 Implement,它的操做較爲簡單:旨在使用上面生成的 c.proxy 代理,連接用戶本身定義的 rpcService 到 clusterInvoker 的信息傳輸。
函數較長,只選取了重要的部分:
// file: common/proxy/proxy.go: Implement() // Implement // proxy implement // In consumer, RPCService like: // type XxxProvider struct { // Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error // } // Implement 實現的過程,就是proxy根據函數名和返回值,經過調用invoker 構造出擁有遠程調用邏輯的代理函數 // 將當前rpc全部可供調用的函數註冊到proxy.rpc內 func (p *Proxy) Implement(v common.RPCService) { // makeDubboCallProxy 這是一個構造代理函數,這個函數的返回值是func(in []reflect.Value) []reflect.Value 這樣一個函數 // 這個被返回的函數是請求實現的載體,由他來發起調用獲取結果 makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { // 根據methodName和outs的類型,構造這樣一個函數,這個函數能將in 輸入的value轉換爲輸出的value // 這個函數具體的實現以下: ... // 目前拿到了 methodName、全部入參的interface和value,出參數reply // (一)根據這些生成一個 rpcinvocation inv = invocation_impl.NewRPCInvocationWithOptions( invocation_impl.WithMethodName(methodName), invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()), invocation_impl.WithCallBack(p.callBack), invocation_impl.WithParameterValues(inVArr)) for k, value := range p.attachments { inv.SetAttachments(k, value) } // add user setAttachment atm := invCtx.Value(constant.AttachmentKey) // 若是傳入的ctx裏面有attachment,也要寫入inv if m, ok := atm.(map[string]string); ok { for k, value := range m { inv.SetAttachments(k, value) } } // 至此構造inv完畢 // (二)觸發Invoker 以前已經將cluster_invoker放入proxy,使用Invoke方法,經過getty遠程過程調用 result := p.invoke.Invoke(invCtx, inv) // 若是有attachment,則加入 if len(result.Attachments()) > 0 { invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments()) } ... } } numField := valueOfElem.NumField() for i := 0; i < numField; i++ { t := typeOf.Field(i) methodName := t.Tag.Get("dubbo") if methodName == "" { methodName = t.Name } f := valueOfElem.Field(i) if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 針對於每一個函數 outNum := t.Type.NumOut() // 規定函數輸出只能有1/2個 if outNum != 1 && outNum != 2 { logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2", t.Name, t.Type.String(), outNum) continue } // The latest return type of the method must be error. // 規定最後一個返回值必定是error if returnType := t.Type.Out(outNum - 1); returnType != typError { logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name) continue } // 獲取到全部的出參類型,放到數組裏 var funcOuts = make([]reflect.Type, outNum) for i := 0; i < outNum; i++ { funcOuts[i] = t.Type.Out(i) } // do method proxy here: // (三)調用make函數,傳入函數名和返回值,得到能調用遠程的proxy,將這個proxy替換掉原來的函數位置 f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts))) logger.Debugf("set method [%s]", methodName) } } ... }
正如以前所說,proxy 的做用是將用戶定義的函數參數列表,轉化爲抽象的 invocation 傳入 Invoker,進行調用。
其中已標明有三處較爲重要的地方:
至此,也就解決了一開始的問題:
// file: client.go: main() config.Load() user := &User{} err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
這裏直接調用用戶定義的 rpcService 的函數 GetUser,此處實際調用的是通過重寫入的函數代理,因此就能實現遠程調用了。
在閱讀 dubbo-go 源碼的過程當中,咱們可以發現一條清晰的 invoker-proxy 嵌套鏈,但願可以經過圖的形式來展示:
若是你有任何疑問,歡迎釘釘掃碼加入釘釘交流羣:釘釘羣號 23331795。
李志信 (GitHubID LaurenceLiZhixin),中山大學軟件工程專業在校學生,擅長使用 Java/Go 語言,專一於雲原生和微服務等技術方向。
「 阿里巴巴雲原生關注微服務、Serverless、容器、Service Mesh 等技術領域、聚焦雲原生流行技術趨勢、雲原生大規模的落地實踐,作最懂雲原生開發者的公衆號。」