在前面的 SOFA 源碼分析 —— 服務發佈過程 文章中,咱們分析了 SOFA 的服務發佈過程,一個完整的 RPC 除了發佈服務,固然還須要引用服務。 So,今天就一塊兒來看看 SOFA 是如何引用服務的。實際上,基礎邏輯和咱們以前用 Netty 寫的 RPC 小 demo 相似。有興趣能夠看看這個 demo—— 本身用 Netty 實現一個簡單的 RPC。java
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定協議
.setDirectUrl("bolt://127.0.0.1:9696"); // 指定直連地址
HelloService helloService = consumerConfig.refer();
while (true) {
System.out.println(helloService.sayHello("world"));
try {
Thread.sleep(2000);
} catch (Exception e) {
}
}
複製代碼
一樣的,代碼例子來自 SOFA-RPC 源碼,位於com.alipay.sofa.rpc.quickstart.QuickStartClient
。網絡
很簡單,建立一個消費者配置類,而後使用這個配置引用一個代理對象,調用 代理對象的方法,實際上就是調用了遠程服務。app
咱們就經過上面這個簡單的例子,看看 SOFA 是如何進行服務引用的。注意:咱們今天的目的是看看主流程,有些細節可能暫時就一帶而過了,好比負載均衡,錯誤重試啥的,咱們之後再詳細分析,實際上,Client 相對有 Server,仍是要複雜一些的,由於它要考慮更多的狀況。負載均衡
好,開始吧!框架
首先看這個 ConsumerConfig 類,和前面的 ProviderConfig 相似,甚至於實現的接口和繼承的抽象類都是同樣的。異步
上面的例子設置了一些屬性,好比接口名稱,協議,直連地址。ide
關鍵點來了, refer 方法。源碼分析
這個方法就是返回了一個代理對象,代理對象中包含了以後遠程調用中須要的全部信息,好比過濾器,負載均衡等等。性能
而後,調用動態代理的方法,進行遠程調用,若是是 JDK 的動態代理的話,就是一個實現了 InvocationHandler 接口的類。這個類的 invoke 方法會攔截並進行遠程調用,天然就是使用 Netty 的客戶端對服務端發起調用並獲得數據啦。ui
先看看 refer 方法。
該方法類套路和 provider 的套路相似,都是使用了一個 BootStrap 引導。即單一職責。
public T refer() {
if (consumerBootstrap == null) {
consumerBootstrap = Bootstraps.from(this);
}
return consumerBootstrap.refer();
}
複製代碼
ConsumerBootstrap 是個抽象類,SOFA 基於他進行擴展,目前有 2 個擴展點,bolt 和 rest。默認是 bolt。而 bolt 的實現則是 BoltConsumerBootstrap,目前來看 bolt 和 rest 並無什麼區別,都是使用的一個父類 DefaultConsumerBootstrap。
因此,來看看 DefaultConsumerBootstrap 的 refer 方法。代碼我就不貼了,由於很長。基本上引用服務的邏輯所有在這裏了,相似 Spring 的 refresh 方法。邏輯以下:
其中,關鍵的對象是 Cluster。該對象須要重點關注。
Cluster 是個抽象類,也是個擴展點,實現了 Invoker, ProviderInfoListener, Initializable, Destroyable 等接口。而他目前的具體擴展點有 2 個: FailFastCluster(快速失敗), FailoverCluster(故障轉移和重試)。默認是後者。固然,還有一個抽象父類,AbstractCluster。
該類有個重要的方法, init 方法,初始化 Cluster,Cluster 能夠理解成客戶端,封裝了集羣模式、長鏈接管理、服務路由、負載均衡等抽象類。
init 方法代碼以下,很少。
public synchronized void init() {
if (initialized) { // 已初始化
return;
}
// 構造Router鏈
routerChain = RouterChain.buildConsumerChain(consumerBootstrap);
// 負載均衡策略 考慮是否可動態替換?
loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap);
// 地址管理器
addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap);
// 鏈接管理器
connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);
// 構造Filter鏈,最底層是調用過濾器
this.filterChain = FilterChain.buildConsumerChain(this.consumerConfig,
new ConsumerInvoker(consumerBootstrap));
// 啓動重連線程
connectionHolder.init();
// 獲得服務端列表
List<ProviderGroup> all = consumerBootstrap.subscribe();
if (CommonUtils.isNotEmpty(all)) {
// 初始化服務端鏈接(創建長鏈接)
updateAllProviders(all);
}
// 啓動成功
initialized = true;
// 若是check=true表示強依賴
if (consumerConfig.isCheck() && !isAvailable()) {
}
}
複製代碼
能夠看到,乾的活不少。每一步都值得花不少時間去看,但看完全部不是咱們今天的任務,咱們今天關注一下調用,上面的代碼中,有一段構建 FilterChain 的代碼是值得咱們今天注意的。
建立了一個 ConsumerInvoker 對象,做爲最後一個過濾器調用,關於過濾器的設計,咱們以前已經研究過了,再也不贅述,詳情 SOFA 源碼分析 —— 過濾器設計。
咱們主要看看 ConsumerInvoker 類,該類是離 Netty 最近的過濾器。實際上,他也是擁有了一個 BootStrap,但,注意,擁有了 BootStrap ,至關於挾天子以令諸侯,啥都有了,在他的 invoke 方法中,會直接獲取 Boostrap 的 Cluster 向 Netty 發送數據。
代碼以下:
return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
複製代碼
厲害吧。
那麼,Cluster 是如何進行 sendMsg 的呢?若是是 bolt 類型的 Cluster 的話,就直接使用 bolt 的 RpcClient 進行調用了,而 RpcClient 則是使用的 Netty 的 Channel 的 writeAndFlush 方法發送數據。若是是同步調用的話,就阻塞等待數據。
總的流程就是這樣,具體細節留到之後慢慢分析。
下面看看拿到動態代理對象後,如何進行調用。
當咱們調用的時候,確定會被 JDKInvocationHandler 攔截。攔截方法則是 invoke 方法。方法很簡單,主要就是使用咱們以前注入的 proxyInvoker 的 invoke 方法。咱們以前說了,proxyInvoker 的做用其實就是一個鏈表的頭。而他主要了代理了真正的主角 Cluster,因此,你能夠想到,他的 invoke 方法確定是調用了 Cluster 的 invoke 方法。
Cluster 是真正的主角(注意:默認的 Cluster 是 FailoverCluster),那麼他的調用確定是一連串的過濾器。目前默認有兩個過濾器:ConsumerExceptionFilter, RpcReferenceContextFilter。最後一個過濾器則是咱們以前說的,離 Netty 最近的過濾器 —— ConsumerInvoker。
ConsumerInvoker 會調用 Cluster 的 sendMsg 方法,Cluster 內部包含一個 ClientTransport ,這個 ClientTransport 就是個膠水類,融合 bolt 的 RpcClient。因此,你能夠想到,當 ConsumerInvoker 調用 sendMsg 方法的時候,最後會調用 RpcClient 的 invokeXXX 方法,多是異步,也多是同步的,bolt 支持多種調用方式。
而 RpcClient 最後則是調用 Netty 的 Channel 的 writeAndFlush 方法向服務提供者發送數據。
取一段 RpcClietn 中同步(默認)執行的代碼看看:
protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request, final int timeoutMillis) throws RemotingException, InterruptedException {
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
conn.addInvokeFuture(future);
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
conn.removeInvokeFuture(request.getId());
future.putResponse(commandFactory.createSendFailedResponse(
conn.getRemoteAddress(), f.cause()));
}
}
});
// 阻塞等待
RemotingCommand response = future.waitResponse(timeoutMillis);
return response;
}
複製代碼
經過 Netty 的 Channel 的 writeAndFlush 方法發送數據,並添加一個監聽器,若是失敗了,就向 future 中注入一個失敗的對象。
在異步執行後,主線程開始等待,內部使用 countDownLatch 進行阻塞。而 countDownLatch 的初始化參數爲 1。何時喚醒 countDownLatch 呢?
在 putResponse 方法中,會喚醒 countDownLatch。
而 putResponse 方法則會被不少地方使用。好比在 bolt 的 RpcResponseProcessor 的 doProcess 方法中就會調用。而這個方法則是在 RpcHandler 的 channelRead 方法中間接調用。
因此,若是 writeAndFlush 失敗了,會 putResponse ,沒有失敗的話,正常執行,則會在 channelRead 方法後簡介調用 putResponse.
總結一下調用的邏輯吧,樓主畫了一張圖,你們能夠看看,畫的很差,還請見諒。
紅色線條是調用鏈,從 JDKInvocationHandler 開始,直到 Netty。綠色部分是 Cluster,和 Client 的核心。大紅色部分是 bolt 和 Netty。
好了,關於 SOFA 的服務引用主流程咱們今天就差很少介紹完了,固然,有不少精華尚未研究到。咱們之後會慢慢的去研究。
看完了 SOFA 的服務發佈和服務引用,相比較 SpringCloud 而言,真心以爲很輕量。上面的一幅圖基本就能展現大部分調用過程,這在 Spring 和 Tomcat 這種框架中,是不可想象的。而 bolt 的隔離也讓 RPC 框架有了更多的選擇,經過不一樣的擴展點實現,你可使用不一樣的網絡通訊框架。這時候,有必要上一張 SOFA 官方的圖了:
從上圖能夠看出,咱們今天比較熟悉的地方,好比 Cluster,包含了過濾器,負載均衡,路由,而後調用 remoting 的遠程模塊,也就是 bolt。 經過 sendMsg 方法。
而 Cluster 的外部模塊,咱們今天就沒有仔細看了,這個確定是要留到從此看的。好比地址管理,鏈接管理等等。
好啦,今天就到這裏。若有不對之處,還請指正。