RPC框架pigeon源碼分析

Pigeon是一個分佈式服務通訊框架(RPC),是美團點評最基礎的底層框架之一。已開源,連接:https://github.com/dianping/pigeongit

從接下來三個方面來分析pigeon的源碼。github

一. 基礎框架
1.1 rpc的基礎架構
rpc最基礎的架構圖算法

 

1.2 rpc的基本流程
客戶端在調用某一個服務時,這個服務其實是經過動態代理生成的一個代理類的對象。所以在執行方法的時候,實際上執行的是InvocationHandler的invoke方法(pigeon的InvocationHandler是ServiceInvocationProxy)。而後調用的信息去zk註冊中心去拿服務提供方的集羣信息,經過負載均衡發現一臺實際的服務提供方的服務器地址。將請求信息序列化爲二進制數據,而後經過netty的client將請求發送給服務提供方,同時wait服務的響應。spring

在服務方,啓動應用後,rpc將須要發佈的服務註冊到zk上,開啓netty server監聽器。服務方收到客戶端的數據後,將數據反序列化爲請求對象,而後解析請求,進行一系列的過濾操做。最後根據請求的信息定位到服務方惟一的一個服務,執行服務的方法。將執行的結果反序列化爲二進制數據,回寫到調用服務的客戶端。編程

客戶端接收到服務方的響應後,將響應結果反序列化爲響應對象,最後返回給用戶線程,完成了rpc的調用過程。設計模式

1.3 pigeon的主要組件
1.3.1 客戶端的組件
ReferenceBean/ProxyBeanFactory:獲取服務代理對象的類,實現了FactoryBean,在init方法中初始化了代理對象的建立, 一系列初始化的操做。api

InvokerBootStrap:客戶端一系列的初始化操做,如初始化ServiceInvocationRepository(令牌桶算法,慢慢放入流量),初始化客戶端服務調度器工廠,初始化序列化工廠,初始化負載均衡管理器,初始化路由策略管理器,初始化監控器monitor,初始化response處理器工廠等。安全

ServiceInvocationProxy:pigeon的動態代理handler,實現了jdk的InvocationHandler接口,每個客戶端動態生成service都會執行ServiceInvocationProxy的invoke方法,在這個方法中service去請求真正的遠程服務,這是rpc實現的基礎。服務器

ServiceInvocationHandler:服務真正調度的handler,由InvokerProcessHandlerFactory生成,與下面的ServiceInvocationFilter共同組成責任鏈模式,在handle方法中實際上執行是ServiceInvocationFilter的invoke方法,同時又將下一個handler和上下文InvocationContext傳入filter中,這樣實現一層一層的調用。網絡

ServiceInvocationFilter:pigeon的rpc調用的過濾器,客戶端有RemoteCallMonitorInvokeFilter,TraceFilter,DegradationFilter,FaultInjectionFilter,ClusterInvokeFilter,GatewayInvokeFilter,ContextPrepareInvokeFilter,SecurityFilter,RemoteCallInvokeFilter這些Filter,實現了monitor監控、調用跟蹤、服務降級、集羣重試、網關、上下文初始化/解析、安全控制、網絡調用等過程,實際上rpc的主要功能都是經過這些ServiceInvocationFilter實現的。

NettyClient:pigeon用netty實現的網絡客戶端,實現了Client接口(還有HttpInvokerClient實現了Client接口,負責http通訊),負責初始化netty的ClientBootstrap,維護ChannelPool,以及最重要的給服務提供方write請求數據。

NettyClientHandler:pigeon綁定了ClientBootstrap的客戶端ChannelHandler,客戶端的衆多ChannelHandler之一,綁定在ClientBootstrap的ChannelPipeline上,當服務提供方回覆請求結果給客戶端時,NettyClientHandler會接收到數據,經過ResponseProcessor將響應數據放到CallbackFuture(盛放結果的類)中,當客戶端去取response時,若是已經收到服務提供方響應的數據,則直接獲取;若是沒有,則await,期間若是收到響應數據,則notify獲取響應數據的線程去取,若是超時則報TimeoutException。

1.3.2 服務方的組件
ServiceBean/ServiceRegistry:跟客戶端的ReferenceBean相似,ServiceBean是服務方的入口,實現了ApplicationListener了(爲了檢測服務方的服務是否發佈完成),在init方法中完成了provider的初始化和服務註冊操做。

ProviderBootStrap:服務端的一系列初始化的操做,如初始化服務方處理器工廠(初始化服務方的handler),初始化序列化工廠,初始化註冊管理器,初始化JettyHttpServer(就是咱們常常用的那個4080端口的server)等。而後,在ProviderBootStrap的startup方法中初始化NettyServer,執行NettyServer的doStart方法,綁定ServerBootstrap。並初始化RequestThreadPoolProcessor的全部線程。

ServicePublisher:服務方註冊服務節點信息到zk註冊中心的組件,經過RegistryManager管理器註冊,最後其實是使用CuratorRegistry註冊器(用CuratorFramework實現的)實現的。

NettyServer:pigeon用netty實現的網絡服務端,實現了Server接口(Server還有JettyHttpServer子類,負責http通訊,監聽了4080端口)。負責初始化服務端的ServerBootstrap,監聽tcp端口,綁定ChannelHandler,維護tcp的通訊。

NettyServerHandler:pigeon綁定了ServerBootstrap的服務端ChannelHandler,服務端的衆多ChannelHandler之一,當客戶端發起數據請求時,NettyServerHandler會接收到數據,若是是心跳檢查,直接處理回寫數據,若是是真實的請求,則經過RequestThreadPoolProcessor去處理,找到服務調用而後回寫數據。

RequestThreadPoolProcessor:根據不一樣的messageType選擇不一樣的ServiceInvocationHandler,提供了四種類型的handler,分別是業務處理handler、心跳處理handler、健康檢查handler、scanner心跳handler。內部維護了一組線程池,提供了服務隔離功能。經過ServiceInvocationHandler和ServiceInvocationFilter去處理請求信息,在BusinessProcessFilter經過反射執行真正的服務方法,在WriteResponseProcessFilter回寫處理完成的response數據給客戶端,完成整個服務端處理請求的鏈路。

 

二. 客戶端如何調用pigeon的服務
客戶端調用pigeon服務主要分爲兩步:

第一步:初始化及啓動invoker,獲取動態代理類對象;

第二步:經過代理類對象動態的調用遠程服務,並將響應結果返回給客戶端。

2.1 invoker初始化的處理鏈路

客戶端建立代理服務對象有兩種方式:spring注入和api編碼的方式。若是是spring依賴注入的方式,是經過ProxyBeanFactory或ProxyBeanFactory獲取動態代理的bean,其實是經過ServiceFactory的getService()方法調用的;若是是api編程的方式,則是直接經過ServiceFactory的getService()獲取的。

在ServiceFactory中,調用了AbstractServiceProxy的getProxy方法,invoker初始化大部分工做都是在這個類中完成的。首先調用invoker初始化器InvokerBootStrap初始化一系列的管理器與工廠(調度倉庫、調度器、序列化工廠、負載均衡管理器、路由策略管理器、監控器、response處理器等等),這是客戶端調用rpc服務必須的組件。

而後會調用AbstractSerializer的proxyRequest方法經過Proxy.newProxyInstance建立一個代理類對象,設置InvocationHandler爲ServiceInvocationProxy,返回建立好的proxy。

接下來進行一系列的註冊,前後註冊路由策略和服務配置,在ClientManager中根據invokerConfig去zk註冊中心上拉取遠程服務器的ip和端口號。在CuratorRegistry中經過CuratorClient(就是CuratorFramework實現的)從zk上拉取address,同時構建NettyClient也是在這裏實現的。

最後將proxy服務 put到AbstractServiceProxy的services(ConcurrentHashMap類型)中,而後返回代理對象,完成整個過程。

2.2 調用遠程服務的處理鏈路

當客戶端經過proxyService調用遠程服務時,因爲proxyService是代理類,所以實際上會去調用ServiceInvocationProxy的invoke方法,在invoke方法中,會執行ServiceInvocationHandler的handle方法。

ServiceInvocationHandler與ServiceInvocationFilter共同組成責任鏈模式,ServiceInvocationHandler的handle方法實際上執行的是ServiceInvocationFilter的invoke方法。InvokerProcessHandlerFactory構建了一個filterList,遍歷filterList,對每個filter都建立了一個ServiceInvocationHandler,同時將上一個建立的ServiceInvocationHandler傳入filter的invoke方法中。返回最後建立的handler。這樣若是在filter中須要調用下一個filter,執行傳入的handler.handle()便可。

filterList的代碼以下:

public static void init() {
   if (!isInitialized) {
      if (Constants.MONITOR_ENABLE) {
         registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
      }
      registerBizProcessFilter(new TraceFilter());
      registerBizProcessFilter(new DegradationFilter());
      registerBizProcessFilter(new FaultInjectionFilter());
      registerBizProcessFilter(new ClusterInvokeFilter());
      registerBizProcessFilter(new GatewayInvokeFilter());
      registerBizProcessFilter(new ContextPrepareInvokeFilter());
      registerBizProcessFilter(new SecurityFilter());
      registerBizProcessFilter(new RemoteCallInvokeFilter());
    //最後返回的是執行RemoteCallMonitorInvokeFilter的handler,執行鏈到RemoteCallInvokeFilter爲止
      bizInvocationHandler = createInvocationHandler(bizProcessFilters);
      isInitialized = true;
   }
}

最後執行的ServiceInvocationFilter是RemoteCallInvokeFilter,實現發送請求的地方,經過NettyClient將請求發送到遠程服務的機器上去。RemoteCallInvokeFilter提供了四種調用方式:SYNC,CALLBACK,FUTURE,ONEWAY。這就是咱們在spring配置中常常須要設置的參數

四種調用方式都是經過InvokerUtils的sendRequest方法中調用NettyClient的write方法,將請求數據發送到目的服務器去。

SYNC:同步的調用方式,發送完數據後,經過getResponse去取結果,若是遠程服務已經返回結果了就直接取,尚未返回結果就等待,等待超時了就報TimeoutException異常。
CALLBACK:回調的調用方式,把回調函數傳入到方法中,當遠程服務返回告終果會調用回調函數,異步的調用。
FUTURE:future的調用方式,發送完請求後,建立一個future對象放入當前線程的threadLocal中,客戶端須要的話能夠經過threadLocal去取結果。對pigeon來講是異步的調用,若是客戶端須要取結果則是非阻塞同步的方式。
ONEWAY:單向調用的方式,發送完請求直接返回,不關心處理結果,異步的調用。
調用方法以下:

switch (callMethod) {
    case SYNC:
        CallbackFuture future = new CallbackFuture();
        //在InvokerUtils的sendRequest方法中調用NettyClient的write方法,將請求數據發送到目的服務器去
        response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
        invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
        if (response == null) {
            //若是是同步的,就經過getResponse去取結果,遠程服務尚未返回結果就等待,有就直接取
            response = future.getResponse(request.getTimeout());
        }
        break;
    case CALLBACK:
        InvocationCallback callback = invokerConfig.getCallback();
        InvocationCallback tlCallback = InvokerHelper.getCallback();
        if (tlCallback != null) {
            callback = tlCallback;
            InvokerHelper.clearCallback();
        }
        //回調的方式則將你的回調函數傳入,遠程服務返回結果後會調用回調函數
        InvokerUtils.sendRequest(client, invocationContext.getRequest(), new ServiceCallbackWrapper(
                invocationContext, callback));
        response = NO_RETURN_RESPONSE;
        invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
        break;
    case FUTURE:
        ServiceFutureImpl futureImpl = new ServiceFutureImpl(invocationContext, request.getTimeout());
        InvokerUtils.sendRequest(client, invocationContext.getRequest(), futureImpl);
        //future方式,將future對象放入當前線程的threadLocal中
        FutureFactory.setFuture(futureImpl);
        //返回的是一個future的response,並無真是的結果
        response = InvokerUtils.createFutureResponse(futureImpl);
        invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
        break;
    case ONEWAY:
        //oneway不處理返回結果
        InvokerUtils.sendRequest(client, invocationContext.getRequest(), null);
        response = NO_RETURN_RESPONSE;
        invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
        break;
    default:
        throw new BadRequestException("Call type[" + callMethod.getName() + "] is not supported!");
 
}

將請求數據發送給遠程服務的機器後,客戶端須要接受服務器響應的結果。這個是經過NettyClientHandler等ChannelHandler實現。在建立NettyClient的時候,ClientBootstrap會setPipelineFactory將一組ChannelHandler關聯到Client上去。當服務端給客戶端發送數據時,會經過這一組handler來響應處理。handlers解決了tcp粘包、序列化、反序列化等問題。

代碼以下:

public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = pipeline();
    pipeline.addLast("framePrepender", new FramePrepender());
    pipeline.addLast("frameDecoder", new FrameDecoder());
    pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
    pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
    pipeline.addLast("invokerDecoder", new InvokerDecoder());
    pipeline.addLast("invokerEncoder", new InvokerEncoder());
    pipeline.addLast("clientHandler", new NettyClientHandler(this.client));
    return pipeline;
}

當收到遠程服務的響應數據時,NettyClientHandler會調用ResponseThreadPoolProcessor使用多線程去處理response,在線程中,使用ServiceInvocationRepository的單例對象處理response,把response放到RemoteCallInvokeFilter建立的CallbackFuture中,用condition.signal()喚醒RemoteCallInvokeFilter等待的線程(若是是同步的調用),完成整個調用工做。

pigeon的調用結構圖

 

三. 服務端如何處理rpc的請求
服務提供方提供和處理pigeon服務主要分爲兩步:

第一步:初始化及啓動provider,監聽tcp端口,註冊服務;

第二步:監聽客戶端發送的請求消息,處理結果並返回給客戶端。

3.1 provider初始化的處理鏈路

服務端初始化provider和註冊服務也有兩種方式:spring注入和api編碼的方式。若是是spring注入的方式,經過ServiceBean的init方法,將coder設置在services中的服務經過ServiceFactory.addServices()方法初始化和註冊;若是是api編碼的方式,則是直接調用ServiceFactory.addService()的方法進行初始化和注入,過程都是相同的。

在ServiceFactory中,ServiceFactory的靜態代碼塊會執行ProviderBootStrap.init()方法,ProviderBootStrap是服務方的啓動器,完成服務方各類初始化工廠和管理器(服務方處理器工廠、序列化工廠、註冊管理器),還會啓動JettyHttpServer監聽4080端口。跟客戶端的InvokerBootStrap相似,是服務方提供rpc服務必須的組件。

接下來調用PublishPolicy的doAddService()方法,PublishPolicy首先去startup ProviderBootStrap,在這裏才初始化了NettyServer,執行NettyServer的doStart方法,綁定ServerBootstrap,監聽tcp端口(並非固定端口號)。並初始化RequestThreadPoolProcessor的全部線程。而後經過ServicePublisher發佈註冊服務。

ServicePublisher經過publishService發佈服務,其實是調用preparePublishTask()方法,經過自身的靜態內部類PublishTask在線程中異步發佈註冊服務,在PublishTask經過RegistryManager的registerService()方法在CuratorRegistry註冊了服務,而在CuratorRegistry中經過CuratorClient(CuratorFramework實現的)實現了zk註冊服務,基本邏輯是:若是當前serviceName在zk註冊中心的節點上存在,則在這個節點添加一個address,若是不存在,則建立一個節點存放address。

3.2 響應請求消息的處理鏈路

上面的啓動provider的過程當中,NettyServer將一組ChannelHandler經過setPipelineFactory關聯到ServerBootstrap上去。客戶端發起遠程服務請求後,會經過這一組handler來響應處理。與客戶端的handlers相似,服務端的handlers一樣解決了tcp粘包、序列化、反序列化等問題。

public ChannelPipeline getPipeline() {
    ChannelPipeline pipeline = pipeline();
    pipeline.addLast("framePrepender", new FramePrepender());
    pipeline.addLast("frameDecoder", new FrameDecoder());
    pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
    pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
    pipeline.addLast("providerDecoder", new ProviderDecoder());
    pipeline.addLast("providerEncoder", new ProviderEncoder());
    pipeline.addLast("serverHandler", new NettyServerHandler(server));
    return pipeline;
}

NettyServerHandler收到客戶端的請求數據後,首先判斷是否是scanner心跳的請求, 若是是,則直接回寫數據回去便可;若是是客戶端的正常請求,則經過AbstractServer的processRequest()方法去處理。

AbstractServer直接經過RequestThreadPoolProcessor的doProcessRequest來處理請求數據。RequestThreadPoolProcessor內部維護了一組線程池,提供了服務隔離功能。RequestThreadPoolProcessor首先根據請求去獲取對應的線程池(默認提供slowRequestProcessThreadPool、sharedRequestProcessThreadPool,另外還提供了可定製針對單個服務和方法的methodThreadPools和serviceThreadPools),而後檢測請求是否可行。若是一切ok,則建立一個Callable對象,用線程池去處理這個請求。

在Callable中,首先經過ProviderProcessHandlerFactory的selectInvocationHandler()方法建立一個handler,與客戶端相同的是,ServiceInvocationHandler與ServiceInvocationFilter一樣構成責任鏈模式,provider提供四種handler鏈,分別是業務處理handler、心跳處理handler、健康檢查handler、scanner心跳handler,根據不一樣的請求處理不一樣的handler鏈。

責任鏈模式使得ServiceInvocationFilter一層一層的調用,直到BusinessProcessFilter爲止開始一層一層的返回。BusinessProcessFilter是FilterList最核心的ServiceInvocationFilter,在這個Filter中,provider經過請求的數據獲取到真正對應的bizService和method,而後經過反射的方式調用這個方法,返回調用結果,把結果封裝到response中。

InvocationResponse response = null;
ServiceMethod method = invocationContext.getServiceMethod();
if (method == null) {
    method = ServiceMethodFactory.getMethod(request);
}
 
ProviderHelper.setContext(invocationContext);
invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis()));
Object returnObj = null;
try {
    returnObj = method.invoke(request.getParameters());
} finally {
    ProviderHelper.clearContext();
    if (Constants.REPLY_MANUAL || invocationContext.isAsync()) {
        if (request.getCallType() == Constants.CALLTYPE_REPLY) {
            request.setCallType(Constants.CALLTYPE_MANUAL);
        }
    }
}
invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis()));
if (request.getCallType() == Constants.CALLTYPE_REPLY) {
    response = ProviderUtils.createSuccessResponse(request, returnObj);
}
return response;

FilterList中還有一個WriteResponseProcessFilter,WriteResponseProcessFilter會經過netty的channel將處理結果response發送給客戶端,這樣就完成了服務端處理請求的過程。

四. pigeon中的設計模式
pigeon是一個設計很優雅的rpc框架,裏面用到不少很好的設計模式

4.1 代理模式
客戶端建立代理服務對象的時候就用到了代理模式,建立的對象並非經過new的方式來建立,而是Proxy.newProxyInstance()方法。在調用代理類的方法時,會去調用傳入newProxyInstance的參數ServiceInvocationProxy對象的invoke方法,這樣就實現了經過網絡通訊去調用遠程服務的功能了。

pigeon在AbstractSerializer類中經過代理模式建立了代理服務對象。

4.2 工廠模式
工廠模式是將建立對象的操做放在工程類中,須要使用的時候去工廠類中取。pigeon使用的工廠模式不少,如ProviderProcessHandlerFactory,在這個工廠類中建立了四種ServiceInvocationHandler,須要用的時候根據不一樣的type去取。以下:

public static ServiceInvocationHandler selectInvocationHandler(int messageType) {
   if (Constants.MESSAGE_TYPE_HEART == messageType) {
      return heartBeatInvocationHandler;
   } else if (Constants.MESSAGE_TYPE_HEALTHCHECK == messageType) {
      return healthCheckInvocationHandler;
   } else if (Constants.MESSAGE_TYPE_SCANNER_HEART == messageType) {
      return scannerHeartBeatInvocationHandler;
   } else {
      return bizInvocationHandler;
   }
}

4.3 策略模式
策略模式是定義了一系列的策略,並將每個策略封裝起來,並且使他們能夠相互替換,在運行時動態選擇具體要執行的行爲。pigeon也使用了不少策略模式。如在ClusterInvokeFilter中,定義了一組集羣重試策略,在運行時能夠動態的選擇具體須要執行的集羣重試行爲。

public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
      throws Throwable {
   InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
   Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
   if (cluster == null) {
      throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
   }
   return cluster.invoke(handler, invocationContext);
}

4.4 責任鏈模式
責任鏈模式是使多個對象都有機會處理請求,從而避免請求的發送者和接受者之間的耦合關係, 將這個對象連成一條鏈,並沿着這條鏈傳遞該請求,直到有一個對象處理他爲止。在pigeon中,ServiceInvocationHandler與ServiceInvocationFilter組成責任鏈模式,InvokerProcessHandlerFactory構建了一個filterList,ServiceInvocationHandler的handle方法實際上執行的是ServiceInvocationFilter的invoke方法。ServiceInvocationFilter一層一層的調用,直到最後一個filter而後開始一層一層的返回。

for (int i = filterList.size() - 1; i >= 0; i--) {
   final V filter = filterList.get(i);
   final ServiceInvocationHandler next = last;
   last = new ServiceInvocationHandler() {
      @SuppressWarnings("unchecked")
      @Override
      public InvocationResponse handle(InvocationContext invocationContext) throws Throwable {
         InvocationResponse resp = filter.invoke(next, invocationContext);
         return resp;
      }
   };
}

4.5 單例模式
單例模式是最簡單的設計模式,也是pigeon中用得最多的設計模式。單例模式能夠保證系統中,應用該模式的類一個類只有一個實例。pigeon的資源管理器大部分都設計爲單例模式,如ServiceInvocationRepository、RoutePolicyManager、ServiceConfigManager、ClientManager等等。主要是爲了管理公共資源,保持惟一訪問路徑。

原文連接:https://blog.csdn.net/ningdunquan/article/details/79910367

相關文章
相關標籤/搜索