Thrift客戶端有兩種:同步客戶端和異步客戶端。html
同步客戶端比較簡單,以RPC-Thrift(一)中的的例子爲基礎進行研究源碼,先看一下類圖。apache
TServiceClient:用於以同步方式與TService進行通訊;數組
Iface接口和Client類都是經過Thrift文件自動生成的代碼。緩存
TServiceClient定義了基礎的向Server發送請求和從Server接收響應的方法。安全
public abstract class TServiceClient { public TServiceClient(TProtocol prot) { this(prot, prot); } public TServiceClient(TProtocol iprot, TProtocol oprot) { iprot_ = iprot; oprot_ = oprot; } protected TProtocol iprot_;//輸入TProtocol protected TProtocol oprot_;//輸出TProtocol protected int seqid_;//序列號 public TProtocol getInputProtocol() { return this.iprot_; } public TProtocol getOutputProtocol() { return this.oprot_; } //向Server發送請求 protected void sendBase(String methodName, TBase args) throws TException { //寫消息頭,seqid_只是簡單的++,非線程安全,接收響應時要進行seqid_的校驗 oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_)); args.write(oprot_);//寫參數 oprot_.writeMessageEnd(); oprot_.getTransport().flush();//發送 } //從Server接收響應 protected void receiveBase(TBase result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin();//讀消息頭,若沒有數據一直等待,詳見TTransport的實現 if (msg.type == TMessageType.EXCEPTION) { //異常消息經過TApplicationException讀取 TApplicationException x = TApplicationException.read(iprot_); iprot_.readMessageEnd(); throw x; } if (msg.seqid != seqid_) { //序列號不一致報異常 throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response"); } result.read(iprot_);//讀數據,由其result子類實現 iprot_.readMessageEnd(); } }
public interface Iface { //thrift中定義的方法 public ResultCommon sayHello(String paramJson) throws org.apache.thrift.TException; }
public static class Client extends org.apache.thrift.TServiceClient implements Iface { //Client工廠類 public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> { public Factory() {} public Client getClient(org.apache.thrift.protocol.TProtocol prot) { return new Client(prot); } public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { return new Client(iprot, oprot); } } public Client(org.apache.thrift.protocol.TProtocol prot) { super(prot, prot); } public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { super(iprot, oprot); } //sayHello方法調用入口 public ResultCommon sayHello(String paramJson) throws org.apache.thrift.TException { send_sayHello(paramJson);//發送請求 return recv_sayHello();//接收響應 } //發送請求 public void send_sayHello(String paramJson) throws org.apache.thrift.TException { sayHello_args args = new sayHello_args();//組裝參數 args.setParamJson(paramJson); sendBase("sayHello", args);//調用父類的sendBase方法發送請求 } //接收響應 public ResultCommon recv_sayHello() throws org.apache.thrift.TException { sayHello_result result = new sayHello_result(); receiveBase(result, "sayHello");//調用父類的receiveBase方法發送請求 if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result"); } }
異步客戶端實現比較複雜,經過回調實現,先看一個異步客戶端的例子。異步客戶端須要使用TNonblockingSocket,經過AsyncMethodCallback接收服務端的回調。併發
1 String paramJson = "{\"wewe\":\"111\"}"; 2 TNonblockingSocket tNonblockingSocket = new TNonblockingSocket("127.0.0.1", 8090);//使用非阻塞TNonblockingSocket 3 TAsyncClientManager tAsyncClientManager = new TAsyncClientManager(); 4 HelloService.AsyncClient asyncClient = new HelloService.AsyncClient.Factory(tAsyncClientManager, new TBinaryProtocol.Factory()).getAsyncClient(tNonblockingSocket); 5 asyncClient.sayHello(paramJson, new AsyncMethodCallback<HelloService.AsyncClient.sayHello_call>() { 6 @Override 7 public void onError(Exception exception) { 8 //... 9 } 10 @Override 11 public void onComplete(sayHello_call response) { 12 ResultCommon resultCommon = response.getResult(); 13 System.out.println(resultCommon.getDesc()); 14 } 15 });
涉及到的類結構圖以下:異步
TAsyncClient:異步客戶端抽象類,經過Thrift文件生成的AsyncClient需繼承該類;async
TAsyncClientManager:異步客戶端管理類,包含一個selector線程,用於轉換方法調用對象;ide
TAsyncMethodCall:封裝了異步方法調用,Thrift文件定義的全部方法都會在AsyncClient中生成對應的繼承於TAsyncMethodCall的內部類(如sayHello_call);源碼分析
AsyncMethodCallback:接收服務端回調的接口,用戶須要定義實現該接口的類。
TAsyncClient爲異步客戶端提供了公共的屬性和方法。
public abstract class TAsyncClient { protected final TProtocolFactory ___protocolFactory; protected final TNonblockingTransport ___transport; protected final TAsyncClientManager ___manager;//異步客戶端管理類 protected TAsyncMethodCall ___currentMethod;//異步方法調用 private Exception ___error; private long ___timeout; public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) { this(protocolFactory, manager, transport, 0); } public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) { this.___protocolFactory = protocolFactory; this.___manager = manager; this.___transport = transport; this.___timeout = timeout; } public TProtocolFactory getProtocolFactory() { return ___protocolFactory; } public long getTimeout() { return ___timeout; } public boolean hasTimeout() { return ___timeout > 0; } public void setTimeout(long timeout) { this.___timeout = timeout; } //客戶端是否處於異常狀態 public boolean hasError() { return ___error != null; } public Exception getError() { return ___error; } //檢查是否準備就緒,若是當前Cilent正在執行一個方法或處於error狀態則報異常 protected void checkReady() { if (___currentMethod != null) { throw new IllegalStateException("Client is currently executing another method: " + ___currentMethod.getClass().getName()); } if (___error != null) { throw new IllegalStateException("Client has an error!", ___error); } } //執行完成時delegate方法會調用該方法,將___currentMethod置爲null protected void onComplete() { ___currentMethod = null; } //執行出現異常時delegate方法會調用該方法, protected void onError(Exception exception) { ___transport.close();//關閉鏈接 ___currentMethod = null;//將___currentMethod置爲null ___error = exception;//異常信息 } }
AsyncClient類是經過Thrift文件自動生成的,在該類中含有每一個方法的調用入口,而且爲每一個方法生成了一個方法調用類方法名_call,如sayHello_call。sayHello_call實現了父類TAsyncMethodCall的連個抽象方法:write_args和getResult,由於每一個方法的參數和返回值不一樣,因此這兩個方法須要具體子類實現。
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface { //AsyncClient工廠類 public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> { private org.apache.thrift.async.TAsyncClientManager clientManager; private org.apache.thrift.protocol.TProtocolFactory protocolFactory; public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) { this.clientManager = clientManager; this.protocolFactory = protocolFactory; } public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) { return new AsyncClient(protocolFactory, clientManager, transport); } } public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) { super(protocolFactory, clientManager, transport); } //sayHello方法調用入口 public void sayHello(String paramJson, org.apache.thrift.async.AsyncMethodCallback<sayHello_call> resultHandler) throws org.apache.thrift.TException { checkReady();//檢查當前Client是否可用 //建立方法調用實例 sayHello_call method_call = new sayHello_call(paramJson, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; //調用TAsyncClientManager的call方法 ___manager.call(method_call); } public static class sayHello_call extends org.apache.thrift.async.TAsyncMethodCall { private String paramJson; public sayHello_call(String paramJson, org.apache.thrift.async.AsyncMethodCallback<sayHello_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.paramJson = paramJson; } //發送請求 public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("sayHello", org.apache.thrift.protocol.TMessageType.CALL, 0)); sayHello_args args = new sayHello_args(); args.setParamJson(paramJson); args.write(prot); prot.writeMessageEnd(); } //獲取返回結果 public ResultCommon getResult() throws org.apache.thrift.TException { if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new IllegalStateException("Method call not finished!"); } org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); return (new Client(prot)).recv_sayHello(); } } }
TAsyncClientManager是異步客戶端管理類,它爲維護了一個待處理的方法調用隊列pendingCalls,並經過SelectThread線程監聽selector事件,當有就緒事件時進行方法調用的處理。
public class TAsyncClientManager { private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName()); private final SelectThread selectThread; //TAsyncMethodCall待處理隊列 private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>(); //初始化TAsyncClientManager,新建selectThread線程並啓動 public TAsyncClientManager() throws IOException { this.selectThread = new SelectThread(); selectThread.start(); } //方法調用 public void call(TAsyncMethodCall method) throws TException { if (!isRunning()) { throw new TException("SelectThread is not running"); } method.prepareMethodCall();//作方法調用前的準備 pendingCalls.add(method);//加入待處理隊列 selectThread.getSelector().wakeup();//喚醒selector,很重要,由於首次執行方法調用時select Thread還阻塞在selector.select()上 } public void stop() { selectThread.finish(); } public boolean isRunning() { return selectThread.isAlive(); } //SelectThread線程類,處理方法調用的核心 private class SelectThread extends Thread { private final Selector selector; private volatile boolean running; private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator()); public SelectThread() throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.running = true; this.setName("TAsyncClientManager#SelectorThread " + this.getId()); setDaemon(true);//非守護線程 } public Selector getSelector() { return selector; } public void finish() { running = false; selector.wakeup(); } public void run() { while (running) { try { try { if (timeoutWatchSet.size() == 0) { //若是超時TAsyncMethodCall監控集合爲空,直接無限期阻塞監聽select()事件。TAsyncClientManager剛初始化時是空的 selector.select(); } else { //若是超時TAsyncMethodCall監控集合不爲空,則計算Set中第一個元素的超時時間戳是否到期 long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp(); long selectTime = nextTimeout - System.currentTimeMillis(); if (selectTime > 0) { //尚未到期,超時監聽select()事件,超過selectTime自動喚醒selector selector.select(selectTime); } else { //已經到期,馬上監聽select()事件,不會阻塞selector selector.selectNow(); } } } catch (IOException e) { LOGGER.error("Caught IOException in TAsyncClientManager!", e); } //監聽到就緒事件或者selector被喚醒會執行到此處 transitionMethods();//處理就緒keys timeoutMethods();//超時方法調用處理 startPendingMethods();//處理pending的方法調用 } catch (Exception exception) { LOGGER.error("Ignoring uncaught exception in SelectThread", exception); } } } //監聽到就緒事件或者selector被喚醒,若是有就緒的SelectionKey就調用methodCall.transition(key); private void transitionMethods() { try { Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); if (!key.isValid()) { //跳過無效key,方法調用出現異常或key被取消等會致使無效key continue; } TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment(); //調用methodCall的transition方法,執行相關的動做並將methodCall的狀態轉換爲下一個狀態 methodCall.transition(key); //若是完成或發生錯誤,從timeoutWatchSet刪除該methodCall if (methodCall.isFinished() || methodCall.getClient().hasError()) { timeoutWatchSet.remove(methodCall); } } } catch (ClosedSelectorException e) { LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e); } } //超時方法調用處理 private void timeoutMethods() { Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator(); long currentTime = System.currentTimeMillis(); while (iterator.hasNext()) { TAsyncMethodCall methodCall = iterator.next(); if (currentTime >= methodCall.getTimeoutTimestamp()) { //若是超時,從timeoutWatchSet中刪除並調用onError()方法 iterator.remove(); methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms.")); } else { //若是沒有超時,說明以後的TAsyncMethodCall也不會超時,跳出循環,由於越早進入timeoutWatchSet的TAsyncMethodCall越先超時。 break; } } } //開始等待的方法調用,循環處理pendingCalls中的methodCall private void startPendingMethods() { TAsyncMethodCall methodCall; while ((methodCall = pendingCalls.poll()) != null) { // Catch registration errors. method will catch transition errors and cleanup. try { //向selector註冊並設置初次狀態 methodCall.start(selector); //若是客戶端指定了超時時間且transition成功,將methodCall加入到timeoutWatchSet TAsyncClient client = methodCall.getClient(); if (client.hasTimeout() && !client.hasError()) { timeoutWatchSet.add(methodCall); } } catch (Exception exception) { //異常處理 LOGGER.warn("Caught exception in TAsyncClientManager!", exception); methodCall.onError(exception); } } } } //TreeSet用的比較器,判斷是不是同一個TAsyncMethodCall實例 private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall> { public int compare(TAsyncMethodCall left, TAsyncMethodCall right) { if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) { return (int)(left.getSequenceId() - right.getSequenceId()); } else { return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp()); } } } }
TAsyncMethodCall實現了對方法調用的封裝。一次方法調用過程就是一個TAsyncMethodCall實例的生命週期。TAsyncMethodCall實例在整個生命週期內有如下狀態,正常狀況下的狀態狀態過程爲:CONNECTING -> WRITING_REQUEST_SIZE -> WRITING_REQUEST_BODY -> READING_RESPONSE_SIZE -> READING_RESPONSE_BODY -> RESPONSE_READ,若是任何一個過程當中發生了異常則直接轉換爲ERROR狀態。
public static enum State { CONNECTING,//鏈接狀態 WRITING_REQUEST_SIZE,//寫請求size WRITING_REQUEST_BODY,//寫請求體 READING_RESPONSE_SIZE,//讀響應size READING_RESPONSE_BODY,//讀響應體 RESPONSE_READ,//讀響應完成 ERROR;//異常狀態 }
TAsyncMethodCall的源碼分析以下:
public abstract class TAsyncMethodCall<T> { private static final int INITIAL_MEMORY_BUFFER_SIZE = 128; private static AtomicLong sequenceIdCounter = new AtomicLong(0);//序列號計數器private State state = null;//狀態在start()方法中初始化 protected final TNonblockingTransport transport; private final TProtocolFactory protocolFactory; protected final TAsyncClient client; private final AsyncMethodCallback<T> callback;//回調實例 private final boolean isOneway; private long sequenceId;//序列號 private ByteBuffer sizeBuffer;//Java NIO概念,frameSize buffer private final byte[] sizeBufferArray = new byte[4];//4字節的消息Size字節數組 private ByteBuffer frameBuffer;//Java NIO概念,frame buffer private long startTime = System.currentTimeMillis(); protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) { this.transport = transport; this.callback = callback; this.protocolFactory = protocolFactory; this.client = client; this.isOneway = isOneway; this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement(); } protected State getState() { return state; } protected boolean isFinished() { return state == State.RESPONSE_READ; } protected long getStartTime() { return startTime; } protected long getSequenceId() { return sequenceId; } public TAsyncClient getClient() { return client; } public boolean hasTimeout() { return client.hasTimeout(); } public long getTimeoutTimestamp() { return client.getTimeout() + startTime; } //將請求寫入protocol,由子類實現 protected abstract void write_args(TProtocol protocol) throws TException; //方法調用前的準備處理,初始化frameBuffer和sizeBuffer protected void prepareMethodCall() throws TException { //TMemoryBuffer內存緩存傳輸類,繼承了TTransport TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE); TProtocol protocol = protocolFactory.getProtocol(memoryBuffer); write_args(protocol);//將請求寫入protocol int length = memoryBuffer.length(); frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length); TFramedTransport.encodeFrameSize(length, sizeBufferArray); sizeBuffer = ByteBuffer.wrap(sizeBufferArray); } //向selector註冊並設置開始狀態,多是鏈接狀態或寫狀態 void start(Selector sel) throws IOException { SelectionKey key; if (transport.isOpen()) { state = State.WRITING_REQUEST_SIZE; key = transport.registerSelector(sel, SelectionKey.OP_WRITE); } else { state = State.CONNECTING; key = transport.registerSelector(sel, SelectionKey.OP_CONNECT); //若是是非阻塞鏈接初始化會當即成功,轉換爲寫狀態並修改感興趣事件 if (transport.startConnect()) { registerForFirstWrite(key); } } key.attach(this);//將本methodCall附加在key上 } void registerForFirstWrite(SelectionKey key) throws IOException { state = State.WRITING_REQUEST_SIZE; key.interestOps(SelectionKey.OP_WRITE); } protected ByteBuffer getFrameBuffer() { return frameBuffer; } //轉換爲下一個狀態,根據不一樣的狀態作不一樣的處理。該方法只會在selector thread中被調用,不用擔憂併發 protected void transition(SelectionKey key) { // 確保key是有效的 if (!key.isValid()) { key.cancel(); Exception e = new TTransportException("Selection key not valid!"); onError(e); return; } try { switch (state) { case CONNECTING: doConnecting(key);//建鏈接 break; case WRITING_REQUEST_SIZE: doWritingRequestSize();//寫請求size break; case WRITING_REQUEST_BODY: doWritingRequestBody(key);//寫請求體 break; case READING_RESPONSE_SIZE: doReadingResponseSize();//讀響應size break; case READING_RESPONSE_BODY: doReadingResponseBody(key);//讀響應體 break; default: // RESPONSE_READ, ERROR, or bug throw new IllegalStateException("Method call in state " + state + " but selector called transition method. Seems like a bug..."); } } catch (Exception e) { key.cancel(); key.attach(null); onError(e); } } //出現異常時的處理 protected void onError(Exception e) { client.onError(e);//置Client異常信息 callback.onError(e);//回調異常方法 state = State.ERROR;//置當前對象爲ERROR狀態 } //讀響應消息體 private void doReadingResponseBody(SelectionKey key) throws IOException { if (transport.read(frameBuffer) < 0) { throw new IOException("Read call frame failed"); } if (frameBuffer.remaining() == 0) { cleanUpAndFireCallback(key); } } //方法調用完成的處理 private void cleanUpAndFireCallback(SelectionKey key) { state = State.RESPONSE_READ;//狀態轉換爲讀取response完成 key.interestOps(0);//清空感興趣事件 key.attach(null);//清理key的附加信息 client.onComplete();//將client的___currentMethod置爲null callback.onComplete((T)this);//回調onComplete方法 } //讀響應size,一樣可能須要多屢次直到把sizeBuffer讀滿 private void doReadingResponseSize() throws IOException { if (transport.read(sizeBuffer) < 0) { throw new IOException("Read call frame size failed"); } if (sizeBuffer.remaining() == 0) { state = State.READING_RESPONSE_BODY; //讀取FrameSize完成,爲frameBuffer分配FrameSize大小的空間用於讀取響應體 frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray)); } } //寫請求體 private void doWritingRequestBody(SelectionKey key) throws IOException { if (transport.write(frameBuffer) < 0) { throw new IOException("Write call frame failed"); } if (frameBuffer.remaining() == 0) { if (isOneway) { //若是是單向RPC,此時方法調用已經結束,清理key並進行回調 cleanUpAndFireCallback(key); } else { //非單向RPC,狀態轉換爲READING_RESPONSE_SIZE state = State.READING_RESPONSE_SIZE; //重置sizeBuffer,準備讀取frame size sizeBuffer.rewind(); key.interestOps(SelectionKey.OP_READ);//修改感興趣事件 } } } //寫請求size到transport,可能會寫屢次直到sizeBuffer.remaining() == 0才轉換狀態 private void doWritingRequestSize() throws IOException { if (transport.write(sizeBuffer) < 0) { throw new IOException("Write call frame size failed"); } if (sizeBuffer.remaining() == 0) { state = State.WRITING_REQUEST_BODY; } } //創建鏈接 private void doConnecting(SelectionKey key) throws IOException { if (!key.isConnectable() || !transport.finishConnect()) { throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT"); } registerForFirstWrite(key); } }
最後總結一下異步客戶端的處理流程,以下圖所示。
須要注意的是,一個AsyncClient實例只能同時處理一個方法調用,必須等待前一個方法調用完成後才能使用該AsyncClient實例調用其餘方法,疑問:和同步客戶端相比有什麼優點?不用等返回結果,能夠幹其餘的活?又能幹什麼活呢?若是客戶端使用了鏈接池(也是AsyncClient實例池,一個AsyncClient實例對應一個鏈接),該線程不用等待前一個鏈接進行方法調用的返回結果,就能夠去線程池獲取一個可用的鏈接,使用新的鏈接進行方法調用,而原來的鏈接在收到返回結果後,狀態變爲可用,返回給鏈接池。這樣相對於同步客戶端單個線程串行發送請求的狀況,異步客戶端單個線程進行發送請求的效率會大大提升,須要的線程數變小,可是可能須要的鏈接數會增大,單個請求的響應時間會變長。在線程數是性能瓶頸,或對請求的響應時間要求不高的狀況下,使用異步客戶端比較合適。