(接上文《架構設計:系統間通訊(11)——RPC實例Apache Thrift 上篇》)java
在《架構設計:系統間通訊(10)——RPC的基本概念》一文中,我專門介紹了一款RPC規範的具體實現中哪些要素和性能息息相關。包括了RPC通信採用的數據封裝格式、RPC通信採用的網絡IO模型和RPC所採用的請求處理方式。這個小節咱們對Apache Thrift中的這三個要素,這樣讀者就能夠知曉爲何Apache Thrift的性能如此高效了。apache
Apache Thrift支持多種消息格式封裝。這些消息格式是若是進行編碼和解碼的是不須要使用者關心的,只須要根據本身的須要制定不一樣的消息封裝格式便可。Apache Thrift全部消息格式封裝的實現,都繼承與TProtocol這個抽象類,以下圖所示:數組
二進制流的編碼格式。因爲須要支持跨語言,因此Apache Thrift支持有限的幾種通用類型,包括基本類型(Float、Double、Integer、Long、String、Short)、集合類型(Map、Set、List)還有Pojo類型(實際上就是前二者若干類型的組合形式)。服務器
那麼這個類所生成的二進制流和傳統的java序列化後生成的二進制流有什麼樣的區別(或者是優點)呢?咱們能夠經過閱讀TBinaryProtocol的源代碼進行研究。網絡
咱們以TBinaryProtocol中,對Integer的序列化過程進行詳細的解釋,來對比java提供的其餘幾種序列化的方式找到不一樣。首先java中,若是要將一個Integer對象經過網絡發送出去,要作的第一件事情就是序列化,那麼咱們經常使用的序列化方式有兩種,以下所示:架構
java中序列化Integer對象的第一種方法:框架
Integer integerObject = 10066329; integerObject.toString().getBytes(); 12
java中序列化Integer對象的第二種方法:異步
ByteArrayOutputStream aStream = new ByteArrayOutputStream(); ObjectOutputStream oStream = new ObjectOutputStream(aStream); oStream.writeObject(integerObject); aStream.toByteArray();1234
第一種方式是將Integer對象中的值序列化;第二種方式,是將Integer整個對象序列化。這兩種方式雖然都產生byte[],實際上性質是徹底不同的。咱們來看一下這兩種方式產生的byte[]的內容:async
序列化Integer的值:ide
[49, 48, 48, 54, 54, 51, 50, 57]
序列化整個Integer對象:
[-84, -19, 0, 5, 115, 114, 0, 17, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 18, -30, -96, -92, -9, -127, -121, 56, 2, 0, 1, 73, 0, 5, 118, 97, 108, 117, 101, 120, 114, 0, 16, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 78, 117, 109, 98, 101, 114, -122, -84, -107, 29, 11, -108, -32, -117, 2, 0, 0, 120, 112, 0, -103, -103, -103]
第一種方式序列化後,byte數組有8個byte元素(由於是首先轉換成字符串的,因此實際上這個大小會隨着Integer值的大小增長而增長);第二中方式序列化後,byte數組一共有 > 20 個byte元素,其中除了記錄Integer的值之外,還包括描述這個類型的其餘屬性。
那麼咱們再來看看TBinaryProtocol中,是如何序列化Integer類型的。首先咱們來看一下TBinaryProtocol進行Integer序列化的這部分源代碼,以下圖所示:
private byte[] i32out = new byte[4]; public void writeI32(int i32) throws TException { i32out[0] = (byte)(0xff & (i32 >> 24)); i32out[1] = (byte)(0xff & (i32 >> 16)); i32out[2] = (byte)(0xff & (i32 >> 8)); i32out[3] = (byte)(0xff & (i32)); trans_.write(i32out, 0, 4); }12345678
計算過程能夠經過下圖來表示:
經過4次位計算,獲得了一個長度爲4個byte數組,而且這個數組的大小並不會隨着整數大小的增長而變化。而且位運算的速度是全部計算中速度最快的一種計算。反序列化的過程類似,對這個大小爲4的byte[]數組從新進行位計算便可:
((buf[off] & 0xff) << 24) | ((buf[off+1] & 0xff) << 16) | ((buf[off+2] & 0xff) << 8) | ((buf[off+3] & 0xff));1234
因爲本文的篇幅和寫做目的所限,不能一一介紹TBinaryProtocol的各類序列化方式,可是經過對TBinaryProtocol中Integer的序列化過程,咱們能夠找到TBinaryProtocol處理過程的優點,包括速度和大小的優點。因此,若是您的使用環境對序列化過程沒有特別的要求(例如後面要提到的大量的負數狀況),那麼直接使用TBinaryProtocol進行數據格式的封裝的就能夠了。
byte是一個8位二進制描述(一個字節),在java中,一個int須要4個byte進行表示,而。「0x」的前綴表示16進制數字,那麼0xff的二進制表示就是 1111 1111;「&」是「與」運算符,這個運算符用於二進制計算,1 & 1 = 1,其他狀況都 = 0;「<<」 表示左移運算,0011 << 2 = 1100;」>>」表示右移運算,1100 >> 2 = 0011;
使用zigzag編碼方式緊湊傳輸協議。zigzag編碼的優點在於記錄數字類型(整數、單精度浮點和雙精度浮點),最特別的是zigzag編碼對負數的記錄。在計算機中,都會使用很大的數字表示負數,爲了保證節約傳輸量,zigzag編碼採用正數與負數交錯的方式,把負數轉換爲一個正數進行記錄。下面咱們具體來分析一下TCompactProtocol中對32位整數的序列化方式,如下是TCompactProtocol中對32爲整數的處理代碼:
/** * Write an i32 as a zigzag varint. */ public void writeI32(int i32) throws TException { writeVarint32(intToZigZag(i32)); } /** * Convert n into a zigzag int. This allows negative numbers to be * represented compactly as a varint. */ private int intToZigZag(int n) { return (n << 1) ^ (n >> 31); } /** * Write an i32 as a varint. Results in 1-5 bytes on the wire. * TODO: make a permanent buffer like writeVarint64? */ byte[] i32buf = new byte[5]; private void writeVarint32(int n) throws TException { int idx = 0; while (true) { if ((n & ~0x7F) == 0) { i32buf[idx++] = (byte)n; // writeByteDirect((byte)n); break; } else { i32buf[idx++] = (byte)((n & 0x7F) | 0x80); // writeByteDirect((byte)((n & 0x7F) | 0x80)); n >>>= 7; } } trans_.write(i32buf, 0, idx); }1234567891011121314151617181920212223242526272829303132333435
以上代碼片斷一共有一個對外的調用方法,和兩個分別名爲intToZigZag和writeVarint32的私有方法。從字面上的意義咱們能夠知道:當對一個32位整數進行編碼時,首先將這個32位整數轉成ZigZag編碼格式,而後在序列化爲「變長的32位整數」。那麼這個處理的具體過程是什麼樣的呢?咱們以一個較大的32位整數(161061273,二進制計數爲:1001100110011001100110011001)爲例,進行講解:
首先將整個這個整數作成ZigZag編碼格式:
而後進行「變長」處理:
能夠看到,上面的「變長」計算一共進行了5次,比TBinaryProtocol中的32位整數序列化還要多出一個byte。這是爲何呢?由於這個數字比較長。
但實際處理中,咱們通常使用的數據都是比較小的。這也是爲何首先要使用ZigZag編碼把某個負數的符號位從高位移動到低位的緣由。實際上,在實際過程當中,變長計算通常只會進行二至三次就完成。這樣,在大多數狀況下,完成一個32位整數的序列化,TCompactProtocol作使用的空間就比TBinaryProtocol要小。
那麼通過分析,對於TCompactProtocol和TBinaryProtocol的選擇的經驗是:若是傳輸的信息中,基本都是字符串,那麼使用TCompactProtocol仍是使用TBinaryProtocol基本上都是差很少的;若是須要傳輸的信息中,會有較多的「低位數字」,那麼建議使用TCompactProtocol。
固然Apache Thrift還提供其餘的傳輸格式封裝。不一樣的需求場景下,您可使用根據須要選用這些信息傳輸格式:
Apache Thrift支持阻塞式同步IO通信模型和非阻塞式異步IO通訊模型。這裏說明一下,我在這個系列的文章中,已經詳細講述了各類IO模型的特色和工做原理(請參見我另外幾篇文章《架構設計:系統間通訊(3)——IO通訊模型和JAVA實踐 上篇》、《架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇》、《架構設計:系統間通訊(5)——IO通訊模型和JAVA實踐 下篇》)。因此讀者您若是度過本人的拙做,那麼您必定清楚,要發揮Apache Thrift性能上的優點,那麼必定要在正式生產環境中採用Apache Thrift對非阻塞式異步IO通訊模型的支持。下面的代碼咱們將向您展現Apache Thrift的這種特性:
在給出示例代碼以前必定要再強調一次,Apache Thrift的服務器端和客戶端必定要採用相同的通訊模型。這就是說若是Apache Thrift的服務器端採用的是非阻塞異步通訊模型,那麼Apache Thrift客戶端也必定要採用非阻塞異步通訊模型,不然就沒法通訊。
服務器的非阻塞異步通訊代碼:
package testThrift.man; import java.nio.channels.Selector; import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.transport.TNonblockingServerSocket; import testThrift.iface.HelloWorldService; import testThrift.iface.HelloWorldService.Iface; import testThrift.impl.HelloWorldServiceImpl; public class HelloNonServerDemo { static { BasicConfigurator.configure(); } /** * 日誌 */ private static Log LOGGER = LogFactory.getLog(HelloNonServerDemo.class); public static final int SERVER_PORT = 8090; public void startServer() { try { // log4j日誌,若是您工程裏面沒有加入log4j的支持,請待用system.out HelloNonServerDemo.LOGGER.info("HelloWorld TSimpleServer start ...."); // 服務執行控制器(告訴apache thrift,實現了HelloWorldService.Iface接口的是具體的哪個類) // HelloWorldServiceImpl類的代碼,就不在贅述了,不管採用哪一種通訊模型,它的代碼都不會變化 TProcessor tprocessor = new HelloWorldService.Processor<Iface>(new HelloWorldServiceImpl()); // 非阻塞異步通信模型(服務器端) TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(HelloNonServerDemo.SERVER_PORT); // Selector這個類,是否是很熟悉。 serverTransport.registerSelector(Selector.open()); THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport); tArgs.processor(tprocessor); // 指定消息的封裝格式(採用二進制流封裝) tArgs.protocolFactory(new TBinaryProtocol.Factory()); // 指定處理器的所使用的線程池。 tArgs.executorService(Executors.newFixedThreadPool(100)); // 啓動服務 THsHaServer server = new THsHaServer(tArgs); server.serve(); } catch (Exception e) { HelloNonServerDemo.LOGGER.error(e); } } /** * @param args */ public static void main(String[] args) { HelloNonServerDemo server = new HelloNonServerDemo(); server.startServer(); } }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
客戶端的非阻塞異步通訊代碼:
package testThrift.client; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.async.TAsyncClientManager; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TNonblockingSocket; import testThrift.iface.HelloWorldService; import testThrift.iface.Reponse; import testThrift.iface.HelloWorldService.AsyncClient; import testThrift.iface.HelloWorldService.AsyncClient.send_call; import testThrift.iface.Request; public class HelloNonClient { static { BasicConfigurator.configure(); } /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(HelloNonClient.class); private static Object WAITOBJECT = new Object(); public static final void main(String[] args) throws Exception { TNonblockingSocket transport = new TNonblockingSocket("127.0.0.1", 8090); TAsyncClientManager clientManager = new TAsyncClientManager(); // 準備調用參數(這個testThrift.iface.Request,是咱們經過IDL定義,而且生成的) Request request = new Request("{\"param\":\"field1\"}", "\\mySerivce\\queryService"); // 這是客戶端對非阻塞異步網絡通訊方式的支持。 // 注意使用的消息封裝格式,必定要和服務器端使用的一致 HelloWorldService.AsyncClient asyncClient = new HelloWorldService.AsyncClient.Factory(clientManager, new TBinaryProtocol.Factory()).getAsyncClient(transport); // 既然是非阻塞異步模式,因此客戶端必定是經過「事件回調」方式,接收到服務器的響應通知的 asyncClient.send(request,new AsyncMethodCallback<AsyncClient.send_call>() { /** * 當服務器正確響應了客戶端的請求後,這個事件被觸發 */ @Override public void onComplete(send_call call) { Reponse response = null; try { response = call.getResult(); } catch (TException e) { HelloNonClient.LOGGER.error(e); return; } HelloNonClient.LOGGER.info("response = " + response); } /** * 當服務器沒有正確響應了客戶端的請求,或者其中過程當中出現了不可控制的狀況。 * 那麼這個事件會被觸發 */ @Override public void onError(Exception exception) { HelloNonClient.LOGGER.info("exception = " + exception); } }); //這段代碼保證客戶端在獲得服務器回覆前,應用程序自己不會終止 synchronized (HelloNonClient.WAITOBJECT) { HelloNonClient.WAITOBJECT.wait(); } } }1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
以上代碼是能夠直接工做的。讀者能夠直接在本身的工程中執行。運行的結果和Apache Thrift上一節中Apache Thrift阻塞模式下的運行結果是一致的,只是運行過程不同。目前各類主流的RPC框架基本都支持非阻塞式異步IO網絡通訊,若是您有興趣進行這些RPC框架的性能比較,必定要在相同的IO通訊模型下進行。
在以前的文章(《架構設計:系統間通訊(10)——RPC的基本概念》),咱們已經提到影響一款RPC框架性能的主要指標。除了RPC框架實現的數據封裝格式、RPC框架支持的網絡通訊模型外,還有一個重要的指標就是它如何執行客戶端的請求。
在Apache Thrift中,它使用線程池技術運行具體的接口實現,響應客戶端請求(不管Apahce Thrift使用哪一種數據封裝格式、使用哪一種網絡通訊模型)。
org.apache.thrift.server.THsHaServer.Args.executorService(ExecutorService executorService)1
能夠看到,實際上Apache Thrift中設置線程池的方法,所要求的參數類型是java.util.concurrent.ExecutorService接口,也就是說只要實現了ExecutorService接口的類均可以被傳入。通常咱們常使用的是java.util.concurrent.ThreadPoolExecutor這個類。
在本篇文章中,咱們詳細描述了Apache Thrift中和性能息息相關的三個要素:數據封裝格式的實現、網絡IO模型的支持 和 處理客戶端請求的方式。正式有這些實現的細節,才使Apache Thrift成爲一款主流的RPC框架。那麼咱們在正式生產環境中,應該如何使用RPC框架才科學呢?在下文中,咱們將結合RPC的特色和我本身的工做經歷,向各位讀者進行介紹。