Pigeon是一個分佈式服務通訊框架(RPC),是美團點評最基礎的底層框架之一。已開源,連接:https://github.com/dianping/pigeongit
從接下來三個方面來分析pigeon的源碼。github
一. 基礎框架
1.1 rpc的基礎架構
rpc最基礎的架構圖算法
1.2 rpc的基本流程
客戶端在調用某一個服務時,這個服務其實是經過動態代理生成的一個代理類的對象。所以在執行方法的時候,實際上執行的是InvocationHandler的invoke方法(pigeon的InvocationHandler是ServiceInvocationProxy)。而後調用的信息去zk註冊中心去拿服務提供方的集羣信息,經過負載均衡發現一臺實際的服務提供方的服務器地址。將請求信息序列化爲二進制數據,而後經過netty的client將請求發送給服務提供方,同時wait服務的響應。spring
在服務方,啓動應用後,rpc將須要發佈的服務註冊到zk上,開啓netty server監聽器。服務方收到客戶端的數據後,將數據反序列化爲請求對象,而後解析請求,進行一系列的過濾操做。最後根據請求的信息定位到服務方惟一的一個服務,執行服務的方法。將執行的結果反序列化爲二進制數據,回寫到調用服務的客戶端。編程
客戶端接收到服務方的響應後,將響應結果反序列化爲響應對象,最後返回給用戶線程,完成了rpc的調用過程。設計模式
1.3 pigeon的主要組件
1.3.1 客戶端的組件
ReferenceBean/ProxyBeanFactory:獲取服務代理對象的類,實現了FactoryBean,在init方法中初始化了代理對象的建立, 一系列初始化的操做。api
InvokerBootStrap:客戶端一系列的初始化操做,如初始化ServiceInvocationRepository(令牌桶算法,慢慢放入流量),初始化客戶端服務調度器工廠,初始化序列化工廠,初始化負載均衡管理器,初始化路由策略管理器,初始化監控器monitor,初始化response處理器工廠等。安全
ServiceInvocationProxy:pigeon的動態代理handler,實現了jdk的InvocationHandler接口,每個客戶端動態生成service都會執行ServiceInvocationProxy的invoke方法,在這個方法中service去請求真正的遠程服務,這是rpc實現的基礎。服務器
ServiceInvocationHandler:服務真正調度的handler,由InvokerProcessHandlerFactory生成,與下面的ServiceInvocationFilter共同組成責任鏈模式,在handle方法中實際上執行是ServiceInvocationFilter的invoke方法,同時又將下一個handler和上下文InvocationContext傳入filter中,這樣實現一層一層的調用。網絡
ServiceInvocationFilter:pigeon的rpc調用的過濾器,客戶端有RemoteCallMonitorInvokeFilter,TraceFilter,DegradationFilter,FaultInjectionFilter,ClusterInvokeFilter,GatewayInvokeFilter,ContextPrepareInvokeFilter,SecurityFilter,RemoteCallInvokeFilter這些Filter,實現了monitor監控、調用跟蹤、服務降級、集羣重試、網關、上下文初始化/解析、安全控制、網絡調用等過程,實際上rpc的主要功能都是經過這些ServiceInvocationFilter實現的。
NettyClient:pigeon用netty實現的網絡客戶端,實現了Client接口(還有HttpInvokerClient實現了Client接口,負責http通訊),負責初始化netty的ClientBootstrap,維護ChannelPool,以及最重要的給服務提供方write請求數據。
NettyClientHandler:pigeon綁定了ClientBootstrap的客戶端ChannelHandler,客戶端的衆多ChannelHandler之一,綁定在ClientBootstrap的ChannelPipeline上,當服務提供方回覆請求結果給客戶端時,NettyClientHandler會接收到數據,經過ResponseProcessor將響應數據放到CallbackFuture(盛放結果的類)中,當客戶端去取response時,若是已經收到服務提供方響應的數據,則直接獲取;若是沒有,則await,期間若是收到響應數據,則notify獲取響應數據的線程去取,若是超時則報TimeoutException。
1.3.2 服務方的組件
ServiceBean/ServiceRegistry:跟客戶端的ReferenceBean相似,ServiceBean是服務方的入口,實現了ApplicationListener了(爲了檢測服務方的服務是否發佈完成),在init方法中完成了provider的初始化和服務註冊操做。
ProviderBootStrap:服務端的一系列初始化的操做,如初始化服務方處理器工廠(初始化服務方的handler),初始化序列化工廠,初始化註冊管理器,初始化JettyHttpServer(就是咱們常常用的那個4080端口的server)等。而後,在ProviderBootStrap的startup方法中初始化NettyServer,執行NettyServer的doStart方法,綁定ServerBootstrap。並初始化RequestThreadPoolProcessor的全部線程。
ServicePublisher:服務方註冊服務節點信息到zk註冊中心的組件,經過RegistryManager管理器註冊,最後其實是使用CuratorRegistry註冊器(用CuratorFramework實現的)實現的。
NettyServer:pigeon用netty實現的網絡服務端,實現了Server接口(Server還有JettyHttpServer子類,負責http通訊,監聽了4080端口)。負責初始化服務端的ServerBootstrap,監聽tcp端口,綁定ChannelHandler,維護tcp的通訊。
NettyServerHandler:pigeon綁定了ServerBootstrap的服務端ChannelHandler,服務端的衆多ChannelHandler之一,當客戶端發起數據請求時,NettyServerHandler會接收到數據,若是是心跳檢查,直接處理回寫數據,若是是真實的請求,則經過RequestThreadPoolProcessor去處理,找到服務調用而後回寫數據。
RequestThreadPoolProcessor:根據不一樣的messageType選擇不一樣的ServiceInvocationHandler,提供了四種類型的handler,分別是業務處理handler、心跳處理handler、健康檢查handler、scanner心跳handler。內部維護了一組線程池,提供了服務隔離功能。經過ServiceInvocationHandler和ServiceInvocationFilter去處理請求信息,在BusinessProcessFilter經過反射執行真正的服務方法,在WriteResponseProcessFilter回寫處理完成的response數據給客戶端,完成整個服務端處理請求的鏈路。
二. 客戶端如何調用pigeon的服務
客戶端調用pigeon服務主要分爲兩步:
第一步:初始化及啓動invoker,獲取動態代理類對象;
第二步:經過代理類對象動態的調用遠程服務,並將響應結果返回給客戶端。
2.1 invoker初始化的處理鏈路
客戶端建立代理服務對象有兩種方式:spring注入和api編碼的方式。若是是spring依賴注入的方式,是經過ProxyBeanFactory或ProxyBeanFactory獲取動態代理的bean,其實是經過ServiceFactory的getService()方法調用的;若是是api編程的方式,則是直接經過ServiceFactory的getService()獲取的。
在ServiceFactory中,調用了AbstractServiceProxy的getProxy方法,invoker初始化大部分工做都是在這個類中完成的。首先調用invoker初始化器InvokerBootStrap初始化一系列的管理器與工廠(調度倉庫、調度器、序列化工廠、負載均衡管理器、路由策略管理器、監控器、response處理器等等),這是客戶端調用rpc服務必須的組件。
而後會調用AbstractSerializer的proxyRequest方法經過Proxy.newProxyInstance建立一個代理類對象,設置InvocationHandler爲ServiceInvocationProxy,返回建立好的proxy。
接下來進行一系列的註冊,前後註冊路由策略和服務配置,在ClientManager中根據invokerConfig去zk註冊中心上拉取遠程服務器的ip和端口號。在CuratorRegistry中經過CuratorClient(就是CuratorFramework實現的)從zk上拉取address,同時構建NettyClient也是在這裏實現的。
最後將proxy服務 put到AbstractServiceProxy的services(ConcurrentHashMap類型)中,而後返回代理對象,完成整個過程。
2.2 調用遠程服務的處理鏈路
當客戶端經過proxyService調用遠程服務時,因爲proxyService是代理類,所以實際上會去調用ServiceInvocationProxy的invoke方法,在invoke方法中,會執行ServiceInvocationHandler的handle方法。
ServiceInvocationHandler與ServiceInvocationFilter共同組成責任鏈模式,ServiceInvocationHandler的handle方法實際上執行的是ServiceInvocationFilter的invoke方法。InvokerProcessHandlerFactory構建了一個filterList,遍歷filterList,對每個filter都建立了一個ServiceInvocationHandler,同時將上一個建立的ServiceInvocationHandler傳入filter的invoke方法中。返回最後建立的handler。這樣若是在filter中須要調用下一個filter,執行傳入的handler.handle()便可。
filterList的代碼以下:
public static void init() { if (!isInitialized) { if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new RemoteCallMonitorInvokeFilter()); } registerBizProcessFilter(new TraceFilter()); registerBizProcessFilter(new DegradationFilter()); registerBizProcessFilter(new FaultInjectionFilter()); registerBizProcessFilter(new ClusterInvokeFilter()); registerBizProcessFilter(new GatewayInvokeFilter()); registerBizProcessFilter(new ContextPrepareInvokeFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new RemoteCallInvokeFilter()); //最後返回的是執行RemoteCallMonitorInvokeFilter的handler,執行鏈到RemoteCallInvokeFilter爲止 bizInvocationHandler = createInvocationHandler(bizProcessFilters); isInitialized = true; } }
最後執行的ServiceInvocationFilter是RemoteCallInvokeFilter,實現發送請求的地方,經過NettyClient將請求發送到遠程服務的機器上去。RemoteCallInvokeFilter提供了四種調用方式:SYNC,CALLBACK,FUTURE,ONEWAY。這就是咱們在spring配置中常常須要設置的參數
四種調用方式都是經過InvokerUtils的sendRequest方法中調用NettyClient的write方法,將請求數據發送到目的服務器去。
SYNC:同步的調用方式,發送完數據後,經過getResponse去取結果,若是遠程服務已經返回結果了就直接取,尚未返回結果就等待,等待超時了就報TimeoutException異常。
CALLBACK:回調的調用方式,把回調函數傳入到方法中,當遠程服務返回告終果會調用回調函數,異步的調用。
FUTURE:future的調用方式,發送完請求後,建立一個future對象放入當前線程的threadLocal中,客戶端須要的話能夠經過threadLocal去取結果。對pigeon來講是異步的調用,若是客戶端須要取結果則是非阻塞同步的方式。
ONEWAY:單向調用的方式,發送完請求直接返回,不關心處理結果,異步的調用。
調用方法以下:
switch (callMethod) { case SYNC: CallbackFuture future = new CallbackFuture(); //在InvokerUtils的sendRequest方法中調用NettyClient的write方法,將請求數據發送到目的服務器去 response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future); invocationContext.getTimeline().add(new TimePoint(TimePhase.Q)); if (response == null) { //若是是同步的,就經過getResponse去取結果,遠程服務尚未返回結果就等待,有就直接取 response = future.getResponse(request.getTimeout()); } break; case CALLBACK: InvocationCallback callback = invokerConfig.getCallback(); InvocationCallback tlCallback = InvokerHelper.getCallback(); if (tlCallback != null) { callback = tlCallback; InvokerHelper.clearCallback(); } //回調的方式則將你的回調函數傳入,遠程服務返回結果後會調用回調函數 InvokerUtils.sendRequest(client, invocationContext.getRequest(), new ServiceCallbackWrapper( invocationContext, callback)); response = NO_RETURN_RESPONSE; invocationContext.getTimeline().add(new TimePoint(TimePhase.Q)); break; case FUTURE: ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout()); InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl); //future方式,將future對象放入當前線程的threadLocal中 FutureFactory.setFuture(futureImpl); //返回的是一個future的response,並無真是的結果 response = InvokerUtils.createFutureResponse(futureImpl); invocationContext.getTimeline().add(new TimePoint(TimePhase.Q)); break; case ONEWAY: //oneway不處理返回結果 InvokerUtils.sendRequest(client, invocationContext.getRequest(), null); response = NO_RETURN_RESPONSE; invocationContext.getTimeline().add(new TimePoint(TimePhase.Q)); break; default: throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!"); }
將請求數據發送給遠程服務的機器後,客戶端須要接受服務器響應的結果。這個是經過NettyClientHandler等ChannelHandler實現。在建立NettyClient的時候,ClientBootstrap會setPipelineFactory將一組ChannelHandler關聯到Client上去。當服務端給客戶端發送數據時,會經過這一組handler來響應處理。handlers解決了tcp粘包、序列化、反序列化等問題。
代碼以下:
public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("invokerDecoder", new InvokerDecoder()); pipeline.addLast("invokerEncoder", new InvokerEncoder()); pipeline.addLast("clientHandler", new NettyClientHandler(this.client)); return pipeline; }
當收到遠程服務的響應數據時,NettyClientHandler會調用ResponseThreadPoolProcessor使用多線程去處理response,在線程中,使用ServiceInvocationRepository的單例對象處理response,把response放到RemoteCallInvokeFilter建立的CallbackFuture中,用condition.signal()喚醒RemoteCallInvokeFilter等待的線程(若是是同步的調用),完成整個調用工做。
pigeon的調用結構圖
三. 服務端如何處理rpc的請求
服務提供方提供和處理pigeon服務主要分爲兩步:
第一步:初始化及啓動provider,監聽tcp端口,註冊服務;
第二步:監聽客戶端發送的請求消息,處理結果並返回給客戶端。
3.1 provider初始化的處理鏈路
服務端初始化provider和註冊服務也有兩種方式:spring注入和api編碼的方式。若是是spring注入的方式,經過ServiceBean的init方法,將coder設置在services中的服務經過ServiceFactory.addServices()方法初始化和註冊;若是是api編碼的方式,則是直接調用ServiceFactory.addService()的方法進行初始化和注入,過程都是相同的。
在ServiceFactory中,ServiceFactory的靜態代碼塊會執行ProviderBootStrap.init()方法,ProviderBootStrap是服務方的啓動器,完成服務方各類初始化工廠和管理器(服務方處理器工廠、序列化工廠、註冊管理器),還會啓動JettyHttpServer監聽4080端口。跟客戶端的InvokerBootStrap相似,是服務方提供rpc服務必須的組件。
接下來調用PublishPolicy的doAddService()方法,PublishPolicy首先去startup ProviderBootStrap,在這裏才初始化了NettyServer,執行NettyServer的doStart方法,綁定ServerBootstrap,監聽tcp端口(並非固定端口號)。並初始化RequestThreadPoolProcessor的全部線程。而後經過ServicePublisher發佈註冊服務。
ServicePublisher經過publishService發佈服務,其實是調用preparePublishTask()方法,經過自身的靜態內部類PublishTask在線程中異步發佈註冊服務,在PublishTask經過RegistryManager的registerService()方法在CuratorRegistry註冊了服務,而在CuratorRegistry中經過CuratorClient(CuratorFramework實現的)實現了zk註冊服務,基本邏輯是:若是當前serviceName在zk註冊中心的節點上存在,則在這個節點添加一個address,若是不存在,則建立一個節點存放address。
3.2 響應請求消息的處理鏈路
上面的啓動provider的過程當中,NettyServer將一組ChannelHandler經過setPipelineFactory關聯到ServerBootstrap上去。客戶端發起遠程服務請求後,會經過這一組handler來響應處理。與客戶端的handlers相似,服務端的handlers一樣解決了tcp粘包、序列化、反序列化等問題。
public ChannelPipeline getPipeline() { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("providerDecoder", new ProviderDecoder()); pipeline.addLast("providerEncoder", new ProviderEncoder()); pipeline.addLast("serverHandler", new NettyServerHandler(server)); return pipeline; }
NettyServerHandler收到客戶端的請求數據後,首先判斷是否是scanner心跳的請求, 若是是,則直接回寫數據回去便可;若是是客戶端的正常請求,則經過AbstractServer的processRequest()方法去處理。
AbstractServer直接經過RequestThreadPoolProcessor的doProcessRequest來處理請求數據。RequestThreadPoolProcessor內部維護了一組線程池,提供了服務隔離功能。RequestThreadPoolProcessor首先根據請求去獲取對應的線程池(默認提供slowRequestProcessThreadPool、sharedRequestProcessThreadPool,另外還提供了可定製針對單個服務和方法的methodThreadPools和serviceThreadPools),而後檢測請求是否可行。若是一切ok,則建立一個Callable對象,用線程池去處理這個請求。
在Callable中,首先經過ProviderProcessHandlerFactory的selectInvocationHandler()方法建立一個handler,與客戶端相同的是,ServiceInvocationHandler與ServiceInvocationFilter一樣構成責任鏈模式,provider提供四種handler鏈,分別是業務處理handler、心跳處理handler、健康檢查handler、scanner心跳handler,根據不一樣的請求處理不一樣的handler鏈。
責任鏈模式使得ServiceInvocationFilter一層一層的調用,直到BusinessProcessFilter爲止開始一層一層的返回。BusinessProcessFilter是FilterList最核心的ServiceInvocationFilter,在這個Filter中,provider經過請求的數據獲取到真正對應的bizService和method,而後經過反射的方式調用這個方法,返回調用結果,把結果封裝到response中。
InvocationResponse response = null; ServiceMethod method = invocationContext.getServiceMethod(); if (method == null) { method = ServiceMethodFactory.getMethod(request); } ProviderHelper.setContext(invocationContext); invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis())); Object returnObj = null; try { returnObj = method.invoke(request.getParameters()); } finally { ProviderHelper.clearContext(); if (Constants.REPLY_MANUAL || invocationContext.isAsync()) { if (request.getCallType() == Constants.CALLTYPE_REPLY) { request.setCallType(Constants.CALLTYPE_MANUAL); } } } invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis())); if (request.getCallType() == Constants.CALLTYPE_REPLY) { response = ProviderUtils.createSuccessResponse(request, returnObj); } return response;
FilterList中還有一個WriteResponseProcessFilter,WriteResponseProcessFilter會經過netty的channel將處理結果response發送給客戶端,這樣就完成了服務端處理請求的過程。
四. pigeon中的設計模式
pigeon是一個設計很優雅的rpc框架,裏面用到不少很好的設計模式
4.1 代理模式
客戶端建立代理服務對象的時候就用到了代理模式,建立的對象並非經過new的方式來建立,而是Proxy.newProxyInstance()方法。在調用代理類的方法時,會去調用傳入newProxyInstance的參數ServiceInvocationProxy對象的invoke方法,這樣就實現了經過網絡通訊去調用遠程服務的功能了。
pigeon在AbstractSerializer類中經過代理模式建立了代理服務對象。
4.2 工廠模式
工廠模式是將建立對象的操做放在工程類中,須要使用的時候去工廠類中取。pigeon使用的工廠模式不少,如ProviderProcessHandlerFactory,在這個工廠類中建立了四種ServiceInvocationHandler,須要用的時候根據不一樣的type去取。以下:
public static ServiceInvocationHandler selectInvocationHandler(int messageType) { if (Constants.MESSAGE_TYPE_HEART == messageType) { return heartBeatInvocationHandler; } else if (Constants.MESSAGE_TYPE_HEALTHCHECK == messageType) { return healthCheckInvocationHandler; } else if (Constants.MESSAGE_TYPE_SCANNER_HEART == messageType) { return scannerHeartBeatInvocationHandler; } else { return bizInvocationHandler; } }
4.3 策略模式
策略模式是定義了一系列的策略,並將每個策略封裝起來,並且使他們能夠相互替換,在運行時動態選擇具體要執行的行爲。pigeon也使用了不少策略模式。如在ClusterInvokeFilter中,定義了一組集羣重試策略,在運行時能夠動態的選擇具體須要執行的集羣重試行爲。
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster()); if (cluster == null) { throw new IllegalArgumentException("Unsupported cluster type:" + cluster); } return cluster.invoke(handler, invocationContext); }
4.4 責任鏈模式
責任鏈模式是使多個對象都有機會處理請求,從而避免請求的發送者和接受者之間的耦合關係, 將這個對象連成一條鏈,並沿着這條鏈傳遞該請求,直到有一個對象處理他爲止。在pigeon中,ServiceInvocationHandler與ServiceInvocationFilter組成責任鏈模式,InvokerProcessHandlerFactory構建了一個filterList,ServiceInvocationHandler的handle方法實際上執行的是ServiceInvocationFilter的invoke方法。ServiceInvocationFilter一層一層的調用,直到最後一個filter而後開始一層一層的返回。
for (int i = filterList.size() - 1; i >= 0; i--) { final V filter = filterList.get(i); final ServiceInvocationHandler next = last; last = new ServiceInvocationHandler() { @SuppressWarnings("unchecked") @Override public InvocationResponse handle(InvocationContext invocationContext) throws Throwable { InvocationResponse resp = filter.invoke(next, invocationContext); return resp; } }; }
4.5 單例模式
單例模式是最簡單的設計模式,也是pigeon中用得最多的設計模式。單例模式能夠保證系統中,應用該模式的類一個類只有一個實例。pigeon的資源管理器大部分都設計爲單例模式,如ServiceInvocationRepository、RoutePolicyManager、ServiceConfigManager、ClientManager等等。主要是爲了管理公共資源,保持惟一訪問路徑。
原文連接:https://blog.csdn.net/ningdunquan/article/details/79910367