微言Netty:分佈式服務框架

1. 前言html

幾年前,我就一直想着要設計一款本身的實時通信框架,因而出來了TinySocket,她是基於微軟的SocketAsyncEventArgs來實現的,因爲此類提供的功能很簡潔,因此當時本身實現了緩衝區處理,粘包拆包等,彼時的.net平臺尚未一款成熟的即時通信框架出來,因此當這款框架出來的時候,將當時公司的商業項目的核心競爭力提高至行業前三。可是後來隨着.net平臺上愈來愈多的即時通信框架出來,TinySocket也是英雄暮年,通過了諸多版本迭代和諸多團隊經手,她不只變得臃腫,並且也不符合潮流。總體的重構勢在必行了。可是我還在等,在等一款真正的即時通信底層庫出來。java

都說念念不忘,必有迴響。經過不停的摸索後,我發現了netty這套底層通信庫(對號入座,.net下對應的是dotnetty),憑藉着以前的經驗,第一感受這就是我要找尋的東西。後來寫了一些demo完全印證了個人猜測,簡直是欣喜若狂,想着若是早點發現這個框架,也許就不會那麼被動的踩坑了。就這樣,我算是開啓了本身的netty之旅。linux

微言netty系列,就是個人netty之旅的一些產出,它結合了我過往的經驗來產出一些對你們有用的東西,但願不會讓你們失望。redis

注:本文原理講解並不是以某一種語言爲主,可是對於具體場景分析,用的是Java,讀者能夠類推到其餘語言。同時本文並不提供源碼級別的原理性講解,如讀者有興趣,能夠自行查找實踐。算法

2. 總體架構模型apache

言歸正傳,咱們繼續netty之旅吧。bootstrap

分佈式服務框架,特色在於分佈式,功能在於服務提供,目標在於即時通信框架整合。因爲其可以讓服務端和客戶端進行解耦,讓調用方和被調用方處於網絡的兩端可是通信毫無障礙,從而可以擴充總體的業務規模。對於一些業務場景稍微大一些的公司,通常都會採用分佈式服務框架。包括目前興起的微服務設計,更是讓分佈式服務框架煊赫一時。那麼咱們今天的目標,就是來打造一款手寫的分佈式服務框架TinyWhale,中文名巨小鯨(手寫做品,本文講解專用, 暫無更多精力打形成開源^_^),接下來讓咱們開始吧。api

說道目前比較流行的分佈式服務框架,朗朗上口的有Dubbo,gRpc,Spring cloud等。這些框架無一例外都有着以下圖所示的總體架構模型:promise

3cd145d7-1bad-4740-ab97-5615b04e03c5

總體流程解釋以下:緩存

1. 啓動註冊,指服務端開始啓動並將服務註冊到註冊中心中。註冊中心通常用zookeeper或者consul實現。

2. 啓動並監聽,指客戶端啓動並監聽註冊中心的服務列表。

3. 有變動則通知,指客戶端訂閱的服務列表發生改變,則將更新客戶端緩存。

4. 接口調用,指客戶端進行接口調用,此調用將首先會向服務端發起鏈接操做,而後進行鑑權,以後發起接口調用操做。

5. 客戶端數據監控,指監控端會監控客戶端的行爲和數據並作記錄。

6. 服務端數據監控,指監控端會監控服務端的行爲和數據並作記錄。

7. 數據分析並衍生出其餘業務策略,指監控端會根據服務端和客戶端調用數據,來衍生出新的業務策略,好比熔斷,防刷,異地多活等。

固然,上面的流程是比較標準的分佈式服務框架所涉及的環節。在實際設計過程當中,能夠根據具體的使用方式進行調整,好比監控端只監控服務端數據,由於客戶端我不用關心。或者客戶端不設置服務地址列表緩存,每次調用前都從註冊中心從新獲取最新的服務地址列表等。

TinyWhale,因爲設計的初衷是簡單,可靠,高性能,因此這裏咱們去除了監控端,因此流程5,流程6,流程7都會拿掉,若是有須要使用到監控端的,能夠自行根據提供的接口來實現一套,這裏將再也不對監控端作過多的贅述。

3. 即時通信框架設計涉及要素

編解碼設計

編解碼設計任何通信類框架,編解碼處理是沒法繞過的一個話題。由於網絡上只能流淌字節流,因此這種特性催生了不少的框架。因爲這塊的工具很是多,諸如ProtoBuf,Marshalling,Msgpack等,因此喜歡用哪一個,全憑喜愛。這裏我用使用ProtoStuff來做爲咱們的編解碼工具,緣由有二:其一是易用性,無需編寫描述文件;其二是高性能,性能屬於T0級別梯隊。下面來具體看看吧:

首先看看咱們的編解碼類:

a1c7ca52-390c-497a-964e-143e980963f7

其中serialize方法,用於將類對象編碼成字節數據,而後經過本機發送出去。而deserialize方法,則用於將緩衝區中的字節數據還原爲類對象。考慮到設計的簡潔性,我這裏並未抽象出一個公共的codecInterface和codecFactory來適配不一樣的編解碼工具,你們能夠自行來進行設計和適配。

有了編解碼的輔助類了,如何集成到Netty中呢?

在Netty中,將對象編碼成字節數據的操做,咱們可使用已有的MessageToByteEncoder類來進行操做,繼承自此類,而後override encode方法,便可利用本身實現的protostuff工具類來進行編碼操做。

40907776-cbc0-4888-9c94-6f4e28fbdec2

一樣的,將字節數據解碼成對象的操做,咱們可使用已有的ByteToMessageDecoder類來進行操做,繼承自此類,而後override decode方法,便可利用本身實現的protostuff工具類來進行解碼操做。

粘包拆包設計

以前章節已經講過,咱們直接拿來展現下。

粘包拆包,顧名思義,粘包,就是指數據包黏在一塊了;拆包,則是指完整的數據包被拆開了。因爲TCP通信過程當中,會將數據包進行合併後再發出去,因此會有這兩種狀況發生,可是UDP通信則不會。下面咱們以兩個數據包A,B來說解具體的粘包拆包過程:

bb0a0099-2e12-4191-be8a-1d4c031552be

第一種狀況,A數據包和B數據包被分別接收且都是整包狀態,無粘包拆包狀況發生,此種狀況最佳。

f9b580f9-bd49-4dd0-b9e7-742e63f6276f

第二種狀況,A數據包和B數據包在一起且一塊兒被接收,此種狀況,即發生了粘包現象,須要進行數據包拆分處理。

a1f0cd86-ac4d-45a7-b79f-20c8dba93fed

第三種狀況,A數據包和B數據包的一部分先被接收,而後收到B數據包的剩餘部分,此種狀況,即發生了拆包現象,即B數據包被拆分。

a1cfac00-7f70-4183-a57e-becbb95ac9e1

第四種狀況,A數據包的一部分先被接收,而後收到A數據包的剩餘部分和B數據包的完整部分,此種狀況,即發生了拆包現象,即A數據包被拆分。

fd5757c1-63dd-4c8b-a28d-24c9c4b88fe9

第五種狀況,也是最複雜的一種,先收到A數據包的部分,而後收到A數據包剩餘部分和B數據包的一部分,最後收到B數據包的剩餘部分,此種狀況也發生了拆包現象。

至於爲何會發生這種問題,根本緣由在於緩衝區中的數據,Server端不大可能一次性的所有發出去,Client端也不大可能一次性正好把數據所有接收完畢。因此針對這些發生了粘包或者拆包的數據,咱們須要找到合適的手段來讓其造成整包,以便於進行業務處理。好消息是,Netty已經爲咱們準備了多種處理工具,咱們只須要簡單的動動代碼,就能夠了,他們分別是:LineBasedFrameDecoder,StringDecoder,LengthFieldBasedFrameDecoder,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder。

因爲上節中,咱們講解了其大概用法,因此這裏咱們以LengthFieldBasedFrameDecoder來着重講解其使用方式。

LengthFieldBasedFrameDecoder:顧名思義,固定長度的粘包拆包器,此解碼器主要是經過消息頭部附帶的消息體的長度來進行粘包拆包操做的。因爲其配置參數過多(maxFrameLength,lengthFieldOffset,lengthFieldLength,lengthAdjustment,initialBytesToStrip等),因此能夠最大程度的保證能用消息體長度字段來進行消息的解碼操做。這些不一樣的配置參數能夠組合出不一樣的粘包拆包處理效果,在此Rpc框架的設計過程當中,個人使用方式以下:

be24597e-daed-4c17-bad8-7c78bd29bc63

是否是代碼很簡單?

翻閱LengthFieldBasedFrameDecoder源碼,實現原理盡收眼底,因爲網上講解足夠多,並且源碼中的講解也足夠詳細,因此這裏再也不作過多闡釋。具體的原理解釋能夠看這裏:LengthFieldBasedFrameDecoder

自定義協議設計

在進行網絡通信的時候,數據包從一端傳輸到另外一端,而後被解析,被消化。這裏就涉及到一個知識點,數據包是怎樣定義的,才能讓另外一方識別出數據包所表明的業務含義。這就涉及到自定義傳輸協議的設計,咱們來看看具體怎麼設計。

首先,咱們須要明確本身定義的協議須要承載哪些業務數據,通常說來包含以下的業務要點:

1. 自定義協議須要讓雙端識別出哪些包是心跳包

2. 自定義協議須要讓雙端識別出哪些包是鑑權包

3. 自定義協議須要讓雙端識別出哪些包是具體的業務包

4. 自定義協議須要讓雙端識別出哪些包是上下線包等等(本條規則適用於IM系統)

不一樣的系統在設計的時候,自定義協議的設計是不同的,好比分佈式服務框架,其業務包則須要包含客戶端調用了哪一個方法,入參中傳入了哪些參數等。物聯網採集框架,其業務包則須要包含底層採集硬件上傳的數據中,哪些數值表明空氣溫度,哪些數值表明光照強度等。一樣的,IM系統則須要知道當前的聊天是誰發出的,想發給誰等等。正是因爲不一樣系統承載的業務不一樣,因此致使自定義協議種類繁多,不一而足。性能表現也是錯落有致。複雜程度更是簡繁並舉。

那麼針對要講解的分佈式服務框架,咱們來詳細看一下設計方式。

首先定義一個NettyMessage泛型類,此泛型類是一個基礎類,包含了會話ID,消息類型,消息體三個字段。這三個字段是服務端和客戶端進行數據交換過程當中,必傳的三個字段,因此總體抽取出來,放到了這裏。

98e12d4a-8932-4bd6-a2d0-420484b271e4

而後,針對客戶端,定義一個NettyRequest類,包含基本的請求ID,調用的類名稱,方法名稱,入參類型,入參值。

e7c0174d-2858-40e7-a103-462879edc247

最後,客戶端的請求傳送到服務端,服務端須要反射調用方法並將結果返回,服務端的NettyResponse類,則包含了請求ID,用於識別請求來自於哪一個客戶端,error錯誤,result結果三個字段:

78f8dee2-8f31-4a91-b1e1-99b1bd3d4950

當服務端調用完畢,就會把結果封裝到此類中,而後將結果返回給客戶端,客戶端還原此類,便可拿出本身想要的數據來。

那麼這個稍顯冗雜的自定義協議就設計完畢了,有人會問,心跳包用這個協議如何識別呢?其實直接實例化NettyMessage類,而後在其type字段中塞入心跳標記值便可,相似以下:

1b082d53-92e8-4fea-8fa3-420d80d3a5c0

而上下線包和鑑權包則也是相似的構造,不通點在於,鑑權包 可能須要往body屬性裏面放一些鑑權用的用戶token等。

鑑權設計

顧名思義,就是進行客戶端登陸的認證操做。因爲客戶端不是隨意就能鏈接上來的,因此須要對客戶端鏈接的合法性進行過濾操做,不然很容易形成各類業務或者非業務類的問題,好比數據被盜竊,服務器被壓垮等等。那麼通常說來,如何進行鑑權設計呢?

e7876f6b-6221-4a28-9e57-2582a8d8e07e

能夠看到,上面的鑑權模塊裏面有三個屬性,一個是已登陸的用戶列表clientList,一個是用戶白名單whiteIP,一個是用戶黑名單blackIP,在進行用戶認證的時候,會經過用戶token,白名單,黑名單作驗證。因爲不一樣業務的認證方式不同,因此這裏的設計方式也是五花八門。通常說來,分佈式服務框架的認證方式依賴於token,也就是服務端的provider啓動的時候,會給當前服務分配一個token,客戶端進行請求的時候,須要附帶上這個token纔可以請求成功。因爲我這裏只是作演示效果,並未利用token進行驗證,實際設計的時候,能夠附帶上token驗證便可。

心跳包設計

傳統的心跳包設計,基本上是服務端和客戶端同時維護Scheduler,而後雙方互相接收心跳包信息,而後重置雙方的上下線狀態表。此種心跳方式的設計,能夠選擇在主線程上進行,也能夠選擇在心跳線程中進行,因爲在進行業務調用過程當中,此種心跳包是沒有必要進行發送的,因此在必定程度上會形成資源浪費。嚴重的甚至會影響業務線程的操做。可是在netty中,心跳包的設計並不是按照如上的方式來進行。而是經過檢測鏈路的空閒與否在進行的。鏈路分爲讀操做空閒檢測,寫操做空閒檢測,讀寫操做空閒檢測。若是一段時間沒有收到客戶端的信息,則會觸發服務端發送心跳包到客戶端,稱爲讀操做空閒檢測;若是一段時間沒有向客戶端發送任何消息,則稱爲寫操做空閒檢測;若是一段時間服務端和客戶端沒有任何的交互行爲,則稱爲讀寫操做空閒檢測。因爲空閒檢測自己只有在通道空閒的時候才進行檢測,而不是固定頻率的進行心跳包通信,因此能夠節省網絡帶寬,同時對業務的影響也很小。

那麼就讓咱們看看在netty中,怎麼實現高效的心跳檢測吧。

在netty中,進行讀寫操做空閒檢測,須要引入IdleStateHandler類,而後須要咱們實現本身的心跳處理Handler,具體設計方式以下:

首先,引入IdleStateHandler和服務端心跳處理Handler

fb267adc-dec0-4f36-9d35-055d0d960f0e

其中讀空閒檢測爲45秒,寫空閒檢測爲45秒,讀寫空閒檢測爲120秒,也就是說,若是服務器45秒沒有收到客戶端發來的消息,就會觸發一個回調事件,另外兩個同理。具體觸發什麼事件了呢?咱們來看看服務端心跳處理Handler:HeartBeatResponseHandler

fe66f758-317f-456a-8ba0-1d83fcdc188d

能夠看到,檢測到讀空閒,會調用processReadIdle方法來處理,咱們進來看看具體處理方式:

62e323ee-003a-4aa2-a995-28649b91a5c8

能夠看到,服務端發現一段時間沒收到客戶端消息後,就會主動給客戶端發一次心跳,確認客戶端是否存活。若是在第90秒內尚未收到客戶端的回覆心跳,則會嘗試再發一條,同時在客戶端上下線狀態表中,將當前客戶端的未響應次數加一;若是在第135後認爲收到客戶端的回覆心跳,則會嘗試重發一條,同時未響應次數再加一,當次數累積到三次的時候,則認爲此客戶端掉線,此時將會踢掉此客戶端。若是是IM系統的話,此時服務端就能夠將此客戶端的信息告知其餘在線用戶掉線,這樣其餘用戶就能夠在本身的客戶端列表中刪掉掉線用戶。

至於processWriteIdle和processAllIdle方法,均是如上相似原理,至於須要處理,怎麼去處理,均是業務本身定製,至關靈活。

很遺憾,在翻閱不少基於Netty的源碼中,並未發現此樣的實現方式,這也是至關惋惜的。

斷線重連設計

在實際網絡通信過程當中,客戶端可能因爲網絡緣由未能及時的響應服務端的心跳請求,從而被服務端踢下線。之全部有這種機制,一方面是爲了節省服務端資源,剔除死連接;另外一方面則是出於業務要求,好比IM系統中,用戶掉線了,可是服務端沒有及時剔除,會致使其餘用戶認爲此用戶在線,從而可能形成誤解等。

那麼就須要有一種機制來保證客戶端網絡掉線後,可以及時的感知並進行重連,從而保證服務的可用性。以前咱們介紹了心跳包,它是專門用來保持服務端和客戶端的通道鏈接保持的。假設當客戶端由於網絡緣由,被服務端踢下線後,客戶端是無感知的,並不知道本身已經被服務端踢下線,因此這時候若是客戶端依舊向服務端發送數據,將會失敗。此時這就是斷線重連應該工做的地方了。具體設計以下:

75ddab25-f90a-45c7-bf11-d456fc6f1489

能夠看到,咱們依舊用了netty原生的IdleState類來檢測空閒通道。當客戶端一段時間沒收到服務端的消息,將會首先嚐試給服務端發送一次心跳,因爲此時客戶端已經被服務端踢掉了,因此三次心跳均未得到迴應,此時,客戶端忽然想明白了:「哦,我想我已經掉線了」。因而客戶端將會利用ctx對象進行服務端重連操做。

此種方式簡單易行,雖然不具備實時性,可是效果很好,能夠有效地避免由於網絡抖動等未知緣由致使的掉線問題。

以上幾種特性,是設計通訊框架過程當中,基本上都繞不開。雖然不一樣的通訊框架因爲承載的業務不一樣而形成設計上的差別,可是正是由於這些特性的存在,才能保證整個通訊過程當中的穩定性和可靠性。

接下來咱們將焦點轉移到服務端和客戶端的設計上來。

先說說服務端和客戶端,基本上的通信模型爲,服務端bind本地端口,而後進行listen監聽。客戶端connect服務端套接字,而後進行通信。用netty打造的雙端,也繞不開這種通信模型。其實若是讀者有過通訊框架的設計經驗的話,將會對此十分熟悉。不過就通信方式來講,也是很統一的,通常都是一端發送數據,另外一端接收處理,而後看具體業務再決定需不須要返回數據回去。那麼這裏就涉及到一個要點,由於數據的返回有同步和異步之分,通常說來同步等待數據返回的性能要比異步獲取數據的性能要差一些,可是具體能差別多少,徹底由設計者本身把握。

同步等待數據返回這塊,我就無需多說了,基本上就是以下示例代碼:

3c1d4c35-e7cd-4897-925e-760c2eed0521

異步獲取返回數據這塊,則設計上要複雜一些,由於設計方式是多種多樣的。有用雙Queue來作異步化(任務quque和應答queue), 有用Future來作異步化,固然也有用多線程來作異步化等。TinyWhale的異步化處理,採用的是後者,在客戶端講解那塊,將會作詳細的解釋。

再說說netty框架,因爲其純異步化模型,因此獲取的各類結果對象基本上是各類Future,若是以前對這種模型接觸比較少的話,將會不太習慣netty的這種設計思惟。具體的使用方式,將會在接下來的設計中進行詳細講解。

服務端設計

首先說道服務端,是指提供服務的一方,通常用來處理客戶端請求。因爲netty這塊,已經將底層封裝的特別好,因此這裏無需多餘設計,只須要了解netty的異步模型便可。那麼何爲netty的異步模型呢?

既然說到了同步異步,那麼難免就會提起阻塞非阻塞,我就說下我的的理解吧。同步異步的區別,我的認爲,只要不是一個時間只能作一件事兒的,都可稱爲異步。實現異步有多種方式,而多線程只是異步的一種實現方式而已。好比咱們用兩個queue模擬生產消費行爲,也能夠稱之爲異步。阻塞非阻塞的區別,我的認爲,主要體如今對資源的爭搶等待上面,發生了資源爭搶等待,則被阻塞,反之爲非阻塞。好比http請求遠程結果,阻塞等待等。我的意見,若有謬誤,還請指教。接下來讓咱們進入正題。

首先要從同步阻塞模型提及。

同步阻塞 

相信你們都據說過這個模型,客戶端請求到服務端,服務端裏面有個Acceptor接收請求,而後針對每一個請求都建立一個Thread來處理,典型的一對一通訊處理方式。看下具體的模型示意圖:

32c59e26-a4ef-4c46-b192-8985c8fa5142

首先,客戶端請求達到Acceptor,Acceptor接收並處理,而後Acceptor爲每一個請求建立一個線程來處理。這樣後續的請求處理工做就在各自的線程上進行處理了。此種方式最簡便,代碼也很是好寫,可是帶來的問題就是一個請求對應一個線程,沒法作到高性能,並且因爲線程開銷較大,對服務器的穩定運行也有必定的影響,隨時都有可能出現內存耗盡,建立線程失敗等,最終的結果就是由於宕機等緣故形成生產問題。

因爲上述問題,後來產生了僞異步處理模型,其實就是講Acceptor裏面爲每一個請求分配一個線程,改爲了線程池這種池化方式來處理,整體上性能比以前要好不少,並且機器運行也穩定不少,相對以前的模型,有了不小的提高。可是從本質上來將,此種方式和以前方式相比,並未有質的改變,之因此稱爲僞異步,原因在此吧。

非阻塞

同步阻塞模型因爲性能很差,可靠性低,因此催生了非阻塞模型的產生。目前非阻塞模型有兩種,一種是NIO,另外一種是AIO,然而AIO雖能夠稱得上爲真正的異步非阻塞IO模型,代碼也很簡便,可是並未大規模的應用,料想應該有它自身的短板,因此咱們着重來說解NIO模型。首先來看看NIO模型示意圖:

938a5b17-bc55-45cb-be25-87a3b550e824

上面這幅圖是網上流傳比較廣的一幅圖,由於被你們所熟知,因此這裏我就直接拿來用了,這幅圖的出處在這裏。具體來看一下。

首先,從圖中能夠看出,client爲客戶端請求,mainReactor主要接收客戶端請求,而後調用acceptor進行處理,acceptor查到已經就緒的鏈接,則交由subReactor進行處理。subReactor這裏會負責已鏈接客戶端的讀寫網絡操做,也就是若是有讀寫操做,會反映到subReactor中來,至於業務處理部分,則直接扔給ThreadPool進行業務處理。通常說來,subReactor的個數大概和CPU的核數是一致的。從這裏還能夠看出mainReactor和subReactor都有派發器的意味。

因爲此NIO模型使用了事件驅動,並且以linux底層做爲通信支持,徹底使用了epoll高性能的特色,因此總體表現堪稱完美。這裏我要推薦一座金礦,大名鼎鼎的C10k問題,諸位看官若是有興趣,能夠探索一番。

而後來具體說下服務端設計吧:

public class NettyServer {

    /**
     * 服務端帶參構造
     * @param serverAddress
     * @param serviceRegistry
     * @param serverBeans
     */
    public NettyServer(String serverAddress, ServerRegistry serviceRegistry, Map<String, Object> serverBeans) {
        this.serverAddress = serverAddress;
        this.serviceRegistry = serviceRegistry;
        this.serverBeans = serverBeans;
    }

    /**
     * 日誌記錄
     */
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 服務端綁定地址
     */
    private String serverAddress;

    /**
     * 服務註冊
     */
    private ServerRegistry serviceRegistry;

    /**
     * 服務端加載的bean列表
     */
    private Map<String, Object> serverBeans;

    /**
     * 主事件池
     */
    private EventLoopGroup bossGroup = new NioEventLoopGroup();

    /**
     * 副事件池
     */
    private EventLoopGroup workerGroup = new NioEventLoopGroup();

    /**
     * 服務端通道
     */
    private Channel serverChannel;

    /**
     * 綁定本機監聽
     *
     * @throws Exception
     */
    public void bind() throws Exception {

        //啓動器
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //爲Acceptor設置事件池,爲客戶端接收設置事件池
        serverBootstrap.group(bossGroup, workerGroup)
                //工廠模式,建立NioServerSocketChannel類對象
                .channel(NioServerSocketChannel.class)
                //等待隊列大小
                .option(ChannelOption.SO_BACKLOG, 100)
                //地址複用
                .option(ChannelOption.SO_REUSEADDR, true)
                //開啓Nagle算法,
                //網絡好的時候:對響應要求比較高的業務,不建議開啓,好比玩遊戲,鍵盤數據,鼠標響應等,須要實時呈現;
                //            對響應比較低的業務,建議開啓,能夠有效減小小數據包傳輸。
                //網絡差的時候:不建議開啓,不然會致使總體效果更差。
                .option(ChannelOption.TCP_NODELAY, true)
                //日誌記錄組件的level
                .handler(new LoggingHandler(LogLevel.INFO))
                //各類業務處理handler
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //空閒檢測handler,用於檢測通道空閒狀態
                        channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120));
                        //編碼器
                        channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024, 4, 4));
                        //解碼器
                        channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
                        //心跳包業務處理,通常須要配置idleStateHandler一塊兒使用
                        channel.pipeline().addLast("heartBeatHandler", new HeartBeatResponseHandler());
                        //服務端先進行鑑權,而後處理業務
                        channel.pipeline().addLast("loginAuthResponseHandler", new LoginAuthResponseHandler());
                        //業務處理handler
                        channel.pipeline().addLast("nettyHandler", new ServerHandler(serverBeans));
                    }
                });

        //獲取ip和端口
        String[] array = serverAddress.split(":");
        String host = array[0];
        int port = Integer.parseInt(array[1]);

        //綁定端口,同步等待成功
        ChannelFuture future = serverBootstrap.bind(host, port).sync();

        //註冊鏈接事件監聽器
        future.addListener(cfl -> {
            if (cfl.isSuccess()) {
                logger.info("服務端[" + host + ":" + port + "]已上線...");
                serverChannel = future.channel();
            }
        });

        //註冊關閉事件監聽器
        future.channel().closeFuture().addListener(cfl -> {
            //關閉服務端
            close();
            logger.info("服務端[" + host + ":" + port + "]已下線...");
        });

        //註冊服務地址
        if (serviceRegistry != null) {
            serviceRegistry.register(serverBeans.keySet(), host, port);
        }
    }

    /**
     * 關閉server
     */
    public void close() {
        //關閉套接字
        if(serverChannel!=null){
            serverChannel.close();
        }
        //關閉主線程組
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        //關閉副線程組
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }
}
服務端源碼

因爲代碼作了具體的註釋,我這裏就不針對性的進行解釋了。須要注意的是,當服務啓動以後,會註冊兩個監聽器,一個綁定時間監聽,一個關閉事件監聽,當事件被觸發的時候,會回調兩個事件內部的邏輯。最後服務端正常啓動,會被註冊到註冊中心中,以便於客戶端調用。須要注意的是,通常狀況下,業務Handler最好和心跳包Handler等非業務性的Handler處理分開,避免業務高峯時期,由於心跳包等Handler的處理來耗費捉襟見肘的內存資源或者CPU資源等,形成服務器性能降低。來看一下ServerHandler的具體設計:

5ca2ea97-bcac-45bb-8c22-49ca6599a2c3

從這裏能夠看出,咱們用了一個線程池來將業務處理進行池化,這樣作就不會受到心跳包等其餘非業務處理Handler的影響,最大限度的保證系統的穩定性。

更多關於同步異步,阻塞非阻塞的設計,請參見Doug Lea:Scalable IO in Java

客戶端設計

再來講說客戶端,是指消費服務的一方,通常用來實現特定的消費業務。一樣的,netty這塊已經將底層封裝的很好,因此直接編寫業務便可。和編寫服務端不一樣的是,這裏不須要分BossGroup和WorkerGroup,由於對於客戶端來講,只須要鏈接服務端,而後發送數據並監聽便可,不存在影響性能的問題。具體的寫法看看吧:

public class NettyClient {

    /**
     * 日誌記錄
     */
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    /**
     * 客戶端請求Future列表
     */
    private Map<String, TinyWhaleFuture> clientFutures = new ConcurrentHashMap<>();


    /**
     * 客戶端業務處理handler
     */
    private ClientHandler clientHandler = new ClientHandler(clientFutures);

    /**
     * 事件池
     */
    private EventLoopGroup group = new NioEventLoopGroup();

    /**
     * 啓動器
     */
    private Bootstrap bootstrap = new Bootstrap();

    /**
     * 客戶端通道
     */
    private Channel clientChannel;

    /**
     * 客戶端鏈接
     * @param host
     * @param port
     * @throws InterruptedException
     */
    public NettyClient(String host, int port) throws InterruptedException {
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        //通道空閒檢測
                        channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(45, 45, 120));
                        //解碼器
                        channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));
                        //編碼器
                        channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
                        //心跳處理
                        channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
                        //業務處理
                        channel.pipeline().addLast("clientHandler", clientHandler);
                        //鑑權處理
                        channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
                    }
                });

        //發起同步鏈接操做
        ChannelFuture channelFuture = bootstrap.connect(host, port);

        //註冊鏈接事件
        channelFuture.addListener((ChannelFutureListener)future -> {
            //若是鏈接成功
            if (future.isSuccess()) {
                logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]已鏈接...");
                clientChannel = channelFuture.channel();
            }
            //若是鏈接失敗,嘗試從新鏈接
            else{
                logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]鏈接失敗,從新鏈接中...");
                future.channel().close();
                bootstrap.connect(host, port);
            }
        });

        //註冊關閉事件
        channelFuture.channel().closeFuture().addListener(cfl -> {
            close();
            logger.info("客戶端[" + channelFuture.channel().localAddress().toString() + "]已斷開...");
        });
    }

    /**
     * 客戶端關閉
     */
    private void close() {
        //關閉客戶端套接字
        if(clientChannel!=null){
            clientChannel.close();
        }
        //關閉客戶端線程組
        if (group != null) {
            group.shutdownGracefully();
        }
    }

    /**
     * 客戶端發送消息,將獲取的Future句柄保存到clientFutures列表
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public TinyWhaleFuture send(NettyMessage<NettyRequest> request) {
        TinyWhaleFuture rpcFuture = new TinyWhaleFuture(request);
        rpcFuture.addCallback(new TinyWhaleAsyncCallback() {
            @Override
            public void success(Object result) {
            }

            @Override
            public void fail(Exception e) {
                logger.error("發送失敗", e);
            }
        });
        clientFutures.put(request.getBody().getRequestId(), rpcFuture);
        clientHandler.sendMessage(request);
        return rpcFuture;
    }
}
客戶端源碼

因爲代碼中,我也作了諸多的註釋,因此這裏再也不一一解釋。須要注意的是,和編寫服務端相似,我這裏添加了兩個監聽事件,監聽鏈接成功事件,監聽關閉事件,響應的業務場景若是觸發了這兩個事件,將會執行事件內部的邏輯。

這裏須要提一下消息發送的場景。通常說來,客戶端向服務端發送數據,而後服務端處理功能後返回給客戶端,客戶端接收到消息後再進行後續處理。這個流程通常有兩種實現方式,一種是同步的實現方式,另外一種是異步的實現方式,具體來呈現如下:

首先是同步實現方式,顧名思義,客戶端發送數據給服務端,服務端在處理完畢並返回數據以前,客戶端一直處於阻塞等待狀態,send方法的代碼設計以下:

e0387579-7276-4d74-bd83-31c1d974ede2

來看看clientHandler裏面的sendMessage方法:

4b2d29fd-55fa-4159-873c-0a9ea9cd553a

在開始發送以前,咱們先拿到當前ctx的promise句柄,而後將數據寫入到緩衝區,最後將此句柄返回給send方法,send方法接收到此句柄後,將會等待promise執行完畢,如何判斷promise執行完畢呢?當客戶端接收到服務端返回,就能夠將promise置爲完成狀態:

677e5ed7-63f4-4e75-a7bd-763304e1dcd4

能夠看到,經過重置promise的setSuccess方法,便可將promise置爲完成態,這樣操做以後,send方法裏面就能夠正常的拿到數據並返回了。不然將會一直處於阻塞狀態。

能夠看到,在netty中實現阻塞的方式來接收服務端返回,處理起來仍是挺麻煩的,根本緣由在於netty徹底異步化的模型,因此只能用如上的方式來進行同步化處理。

再來講說異步化處理吧, 這也是netty很推崇的方式。

首先來看看send方法:

9d017228-d265-4b58-b735-9618856758a5

從上面代碼中能夠看到,當咱們將消息發送出去後,會當即得到一個TinyWhaleFuture的句柄,不會再有阻塞等待的場景。咱們看看clientHandler.sendMessage的具體實現:

2974eedd-4e1c-46c0-96a7-1735a4ad1916

能夠看到,只是單純的將數據推送到緩衝區而已。

還記得咱們的TinyWhaleFuture句柄嗎?既然返回給咱們了這個句柄,那麼咱們確定是能夠今後句柄中取出咱們想要獲取的數據的,咱們看看客戶端若是收到服務端的返回結果,該如何處理呢?

50149181-14be-4cce-9fdc-1393e0cd70b3

能夠看到,這裏利用了一個Map來保存用戶每一個發送請求,一旦當服務端返回數據後,就會將請求置爲完成態,同時從Map中將已完成的操做刪掉。這樣,客戶端拿到TinyWhaleFuture句柄後,經過提供的get方法便可在想獲取結果的地方來獲取返回結果。這樣作,是不會阻塞其餘業務執行的。

其實不只僅是netty中,在設計其餘框架的時候,也能夠利用此思想來實現真正意義上的異步執行邏輯。固然,可以實現這種執行邏輯的方式有不少種,至於更好的實現方式,還請君細細斟酌吧,這裏只起到拋磚引玉的做用。

4. 動態調用設計

服務註冊和服務發現

先來上個大體的類設計圖,ServerCache接口提供基礎的本地緩存操做;ServerBase提供基礎的鏈接註冊中心,關閉註冊中心鏈接操做;ServerRegistry爲服務註冊類;ServerDiscovery爲服務發現類,下面是類UML圖,咱們來具體的說一說:

33f8e329-fe57-4223-8fb9-8e61865ce163

首先是註冊中心,這個就沒必要說了,通常都是使用zookeeper或者consul等框架來實現,這裏咱們使用zookeeper。可是咱們這裏並非用原生的zookeeper sdk來操做,而是使用curator來操做,curator是什麼呢?在其介紹頁面有句很經典的話:Guava is to Java what Curator is to Zookeeper,至關的簡潔明瞭吧。來看下具體的使用方式吧。

首先定義用於加載註冊中心服務套接字的共享緩存,客戶端啓動的時候,此共享緩存會從註冊中心拉取服務器列表到本地保存:

1bbffe83-acca-4c28-962b-20c9ea286b3a

而後,定義服務治理的公共操做類:

8074c8cc-fb0c-44ca-9941-95ba65d32c19

能夠看到,此基類中,open方法和close方法,用於鏈接zk服務器,關閉和zk服務器的鏈接。以後即是對接口中操做本地緩存的實現。

因爲服務治理這塊包含了服務註冊和服務發現功能,因此這裏,咱們分別定義ServerRegistry類和ServerDiscovery類來進行處理。

ServerRegistry類,顧名思義,表示服務註冊,也就是當咱們的服務端啓動以後,綁定了本機端口以後,會將承載的服務註冊到zk中。

44220ee6-9d3b-44af-b06f-546ea2771065

ServerDiscovery類,顧名思義,服務發現,那麼此類中的discovery方法則就是根據用戶傳入的接口名稱來找到對應的服務器,而後將結果返回。須要注意的是,服務發現的過程,須要涉及到負載均衡,之因此涉及到這個,主要是爲了讓每臺服務器收到的請求均勻一些,以達到均衡的目的,這樣就不會由於請求打的不均勻致使有些服務器負載太大,有些服務器負載幾乎沒有的狀況。負載均衡,我將在後面的章節講解,先繼續看看服務發現這塊:

43f13e2c-9f5e-4bd5-89a9-8998f9f92923

能夠看到,我用了一個watchNode方法來檢測節點的改動,此方法內部設置了一個Listener,只要有節點的改動,都會推送到此Listener中,而後我就能夠根據改動的類型來決定是否對本地緩存進行更新操做。

更具體的服務註冊和服務發現使用方式,能夠參考curator官網:Service register and Service discovery

負載均衡

前面說到了服務治理這塊,因爲裏面涉及到負載均衡這塊,這裏就詳細說一下。

通常說來,有三種負載均衡模型是繞不開的,分別是一致性哈希,此模型可讓帶有業務標記的請求每次請求都會導向到指定的服務器上。輪詢,此模型主要是對服務器列表進行順序訪問。隨機,此模型主要是隨機獲取服務器並返回。其餘的模型還有不少,能夠根據具體的業務進行衍生,這裏不作一一的展現。

首先來看看負載均衡基類:

6db1acf2-c315-48a9-b11a-599193ee965b

而後看看三種模型的實現:

一致性哈希實現,直接對服務端的size進行取餘操做:

92cd52ab-0d2d-44ec-960a-0cbd383eb652

輪詢實現,對訪問過的服務器進行計數累加,而後把此計數做爲下標並獲取元素返回:

f3915660-1dbe-4346-9fb8-e2592e58be9c

隨機實現,對服務器進行隨機選取:

9fc58dcf-bcd2-4458-96b3-080c7ad5a66a

你也許會問,爲何你設計的負載均衡裏面沒有權重操做呢?其實若是願意,也是能夠加上權重操做的,這樣就會衍生出來其餘的負載均衡模型,好比服務訪問不一樣,權重-10,服務能訪問通,權重+1,這樣就能夠經過權重,選取一些權重較高的服務器優先返回,而對那些權重較低的服務器,能夠少分一些請求,讓其慢慢恢復到正常狀態以後,再多分配一些請求過來等等。

總之,你能夠在此基礎上進行本身的設計,可是大致思想就是讓服務器得到的負載越均衡越好。

容災處理

此處整合Hystrix進行的設計,能夠對請求作FailFast處理,RetryOnece處理,RetryTwice處理等, 具體細節能夠翻看Hystrix設計便可。這裏就不詳解(哈哈哈,實際上是由於寫着寫着,寫的懶了,這塊就不想講了,畢竟基本上都是Hystrix那套)。

反射調用

最後要說的部分就是反射調用這塊了。咱們知道,當客戶端發送待調用的方法發送給服務端,服務端接收後,須要經過反射調用方法,而後將結果返回給客戶端。首先來看看服務端業務處理Handler:

c04f7664-506d-470b-b2cd-56d1d934ae6a

能夠看到,此業務處理handler會讀取客戶端的請求,而後分析數據包內容,最後利用反射來調用相應的方法獲取結果並壓入緩衝區中,以後發送給客戶端。

再來看看handle方法是如何進行反射調用並獲得結果的:

a3e6bb50-4421-43c6-be3b-413cec9424f8

能夠看到,很經典的反射調用場景,這裏就不細說了。

從這裏,咱們能夠看出,服務端的處理方式如上,很是的簡單。可是客戶端是怎麼發送請求消息給服務端,又是如何接收服務端的返回數據的呢?

3acc13f4-28cc-4de4-965d-336074b6a6f5

從上面能夠看出,咱們用了javassist組件的反射(java自身的反射也是相似的使用方式)來構建完整的類對象,而後利用callback回調來發送請求給服務端獲取數據,而後獲取服務端返回的數據,最後將返回的數據拆解後,返回給客戶端。若是用java自帶的反射來實現,編碼也是差很少的:

00ff2b8e-0261-4cc4-896e-ab7ea42ee7a1

這裏須要注意的是,此處用了動態反射的功能來實現,性能並非特別好,若是能用上字節碼技術,性能會再提高一個臺階。具體的字節碼實現方式,能夠參見我後續的文章。

5. 跑起來吧!!

好了,咱們終於把一切都準備好了,那麼就讓咱們運行起來吧。

在服務端,首先能夠看到以下的註冊中心上線日誌:

068cefb2-70c4-44ac-8176-17e7d7b48963

而後能夠看到客戶端登陸日誌:

57981f83-d3a9-429b-a956-75c7ab4a8861

在客戶端,咱們能夠看到以下的日誌:

2f6f67b2-4a52-4d78-b235-c9381fe23cdf

能夠看到,客戶端鏈接上來後,先發送鑑權請求,鑑權成功後,將會發送服務調用請求,調用完畢後,獲得返回結果,整個過程耗費18ms,最後客戶端退出。

當咱們在客戶端調用的時候,加上Thread.Sleep來觸發心跳探活,能夠看到以下的檢測結果:

b2eca78b-c48b-4a06-b5f3-a5b7b3b81da7

能夠看到,每隔5秒鐘,咱們都能收到客戶端的心跳,而後咱們模擬網絡差,客戶端掉線,看看服務端如何檢測:

ee8a82c9-902e-4d54-9b8f-2ee0727ceabf

能夠看到客戶端被踢掉了,此時咱們再去看看客戶端日誌,能夠看出來,客戶端確實被服務端踢掉線了:

d7498e9b-8f60-4d72-b52b-7596b126c8f6

最後,東西作完後,補一個benchmark吧,因爲個人機器性能比較差,並且測試是直接開啓IDEA這個IDE來測試的,因此性能並不見得很好:

57366a4e-c94a-41b7-a8dc-9224f7c71654

而後來看看benchmark結果吧:

0f003fc7-3e98-4488-8037-6bf5dc50c02a

性能並非特別好,關鍵有如下幾個地方是耗時大戶:編解碼,反射,同步等待服務端返回

編解碼這個只能找性能比較好的組件來解決

反射能夠經過字節碼來實現,性能會再提高一個檔次,可是難度也會提高很多。

同步等待服務返回,能夠經過徹底異步化實現來解決,那麼剛剛展現的

a5b71565-ff20-4f5f-bafb-d4271f48a0c0

調用方式,會被改變成:

71bc7cea-5640-411b-a8a9-a75ac57ce63f

雖然這樣速度會快不少,可是用戶可否接受這種調用方式,則是另外一個頭疼的問題。性能和易用,自己就具備相悖性,因此只能在進退之間作平衡了。

寫到這裏,總體介紹差很少了,可是還有不少東西沒有接入,譬如說kafka,mq,redis等。若是能把這些東西接入,則會讓其總體顯得更加豐滿,同時功能也更豐富,應用場景也會更廣闊一些。

6.總結

寫到這裏,利用netty打造分佈式服務框架的要點就基本上完結了。通篇看來,知識點不少,可是都是咱們耳熟能詳的東西,能把它們串在一塊兒,組成一個能夠用的框架,則須要必定的思考。

文中全部內容基本上爲原創,如需轉載,請標明 轉載自博客園程序詩人 字樣,算是對本家付出的辛苦的一點尊重吧。

相關文章
相關標籤/搜索