rpc之thrift入門與TBinaryProtocol源碼追蹤

thrift是一個支持多語言進行RPC的軟件庫,開發者經過定義數據類型和服務接口,經由thrift的代碼生成引擎就能夠構建RPC客戶端和服務端所須要的代碼。它的核心組件以下:java

  • Types。thrift支持的數據類型,由thrift自身定義,定義的類型是全部編程語言中都會用到的關鍵類型git

  • Transport。作數據傳輸用,抽象了對網絡的一些操做,好比 read/writegithub

  • Protocol。數據經由client和server進行編碼或者解碼的格式,好比 JSON、binaryapache

  • Versioning。client和server對於接口變更的處理方式,好比server在舊接口上新增了一個字段,可是client仍是舊的編程

  • Processors。服務端收到RPC後的處理邏輯,負責將讀到的內容交由server處理,並將結果寫回輸出流bash

Type

  1. 支持的基本類型爲網絡

    • bool 取值爲true/falsesocket

    • byte 有符號的1個字節編程語言

    • i16 16比特的有符號integer函數

    • i32 32比特的有符號integer

    • i64 64比特的有符號integer

    • double 64比特的浮點數

    • string 文本字符串

  2. 複雜類型,好比java中的自定義對象,能夠經過 struct 來組織

  3. 集合目前支持 list/set/map,能夠對應成java的ArrayList/HashSet/HashMap

  4. 異常經過 exception 來標識,相似 struct

  5. 接口定義使用 service 來標識

  6. 枚舉使用 enum 來標識

定義實例請戳這裏

Thrift接口描述語言(IDL)

Thrift IDL文件是開發者自定義邏輯內容的地方,它會被Thrift 的代碼生成器處理成目標語言的代碼

如下以java來類比

  1. namespace java paxi.maokitty.verify.service namespace 至關於java中聲明瞭當前文件的包名,java則是表示namespace的適用語言

  2. include 'myException.thrift' include至關於import,引入別的包,以便使用它的內容

  3. typedef myException.myException myException 定義別名,方便寫

  4. 1:i32 code 定義字段 code,類型是i32,序號是1

  5. string say(1:string msg)throws(1:myException e) 定義方法say,返回值是string,參數是string,拋出自定義異常 myException(這裏是別名)

Transport

  • TTransport 提供了一些網絡方法的抽象 open/close/read/write/flush ;

  • TServerTransport 則是負責接受建立鏈接 open/listen/accept/close

  • TSocket 對TTransport的集成實現,提供sokcet

Protocol

protocol提供了一種將內存數據映射到一種特定結構的機制,thrift的設計是要支持流式協議

  • 數據整個被編碼成一系列的字段,每一個字段都有一個類型和一個惟一的識別碼(序號)

  • structs的結束則會自帶一個 STOP 的標記

這種設計方式使得thrift協議可以自我界定,而不須要關心具體的編碼方式

固然若是固定格式是一種優點或者流處理不是必須的,可使用TFrameTransport來支持

Versioning

Versioning在thrift中的實現是經過字段識別碼完成的,每一個聲明的字段前面都會有一個惟一的字段識別碼

若是沒有人爲的添加,thrift自身會從-1開始遞減,自動加上

當Server讀到數據以後,會根據字段識別碼來識別是否有根據定義文件存放了這個字段,若是沒有,就會跳過沒法識別字段的長度,而不用拋出異常。若是這個字段是有的,並且必需要的,則在真的使用這個字段前,先經過isset檢查是否真的存在

新老版本可能存在的狀況分析

  1. 添加了新的字段,老的客戶端,新的server。此時老的客戶端不會發送新的字段,新的server發現字段沒有set,就按照過時請求的默認行爲處理

  2. 刪除了字段,老的客戶端,新的server。老的客戶端會發送已經刪掉的字段,新的server會直接無視

  3. 添加了新的字段,新的客戶端,老的server。新客戶端會發送新的字段,老的server則是直接無視這個字段,按現有邏輯處理

  4. 刪了字段,新的客戶端,老的server。建議不要出現這種狀況

Processor

對每一個service,都會對應的生成一個Processor,它主要是負責從網絡獲得的數據中提取出內容,而後代理請求真正的方法,再把結果寫入輸出的Prococol

thrift命令使用

thrift -out ../src/main/java --gen  java:private-members ./myException.thrift

複製代碼

-out 指定輸出的目錄

--gen java:private-members ./myException.thrift 表示目標語言是java,字段使用private

thrift代碼

根據thrift的IDL語法,寫下本身要實現的函數功能

service DemoService{string say(1:string msg)throws(1:myException e),}

複製代碼

完整實例戳這裏

以使用TBinaryProtocol爲例,指定好端口、建立Transport、發起請求的客戶端

//1:網路請求相關設置
    transport=new TSocket("127.0.0.1",9000,1000);
    //2:傳輸數據的編碼方式
    TProtocol protocol=new TBinaryProtocol(transport);
    //3:創建鏈接
    transport.open();
    //4:建立客戶端
    DemoService.Client client=new DemoService.Client(protocol);
    //5:發起請求
    String say = client.say("i am client");
複製代碼

在服務端則對應設置接收請求的端口,而後等待鏈接的到來

//1:建立等待鏈接的serverSocket
    TServerSocket serverSocket=new TServerSocket(9000);
    //2:構建server所須要的參數
    TServer.Args serverArgs=new TServer.Args(serverSocket);
    //3:邏輯處理
    TProcessor processor=new DemoService.Processor<DemoService.Iface>(new DemoServiceImpl());
    //4:解析協議
    serverArgs.protocolFactory(new TBinaryProtocol.Factory());
    serverArgs.processor(processor);
    //5:組織組件完成功能
    TServer server=new TSimpleServer(serverArgs);
    LOG.info("main server start ... ");
    //6:等待鏈接到來
    server.serve();
複製代碼

可運行的客戶端和服務端案例請戳這裏

TBinaryProtocol源碼追蹤

服務端啓動後,等待鏈接的到來

@Trace(
        index = 9,
        originClassName = "org.apache.thrift.server.TSimpleServer",
        function = "public void serve() "
)
public void serve(){
   //...
    Code.SLICE.source("client = serverTransport_.accept();")
            .interpretation("底層就是ServerSocket的accept函數,它將返回的結果封裝成TSocket返回");
    //..
    Code.SLICE.source(" processor = processorFactory_.getProcessor(client);\n" +
            " inputTransport = inputTransportFactory_.getTransport(client);\n" +
            " outputTransport = outputTransportFactory_.getTransport(client);\n" +
            " inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);\n" +
            " outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);\n" +
            " while (processor.process(inputProtocol, outputProtocol)) {}")
            .interpretation("processor即thrift根據用戶寫的代碼實現類的processor,其他四個參數則是獲得的請求中獲取的協議處理器,用來讀取數據和返回數據,拿到後交由處理器處理");
    }
複製代碼

客戶端則是在主動發起請求的時候,按照TBinaryProtocol的協議格式寫入數據

Code.SLICE.source("oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));")
        .interpretation("opprot即初始化 DemoService.client 時傳入的 TBinaryProtocol ,seqid默認值爲0")
        .interpretation("Begin部分的寫入首先是按照字節數寫入版本、而後是方法名的長度,再是方法名,最後寫入序列號,按照特定的規則寫入數據");
Code.SLICE.source("args.write(oprot_);")
        .interpretation("負責將參數寫入Buffer,它會按照參數的順序寫入,每一個參數又是按照類型、序號、值的順序寫入");
//..
Code.SLICE.source("oprot_.getTransport().flush();")
        .interpretation("數據已經寫入了緩衝區,把沒有寫完的數據寫入對應的文件描述符");

複製代碼

當收到請求後服務端則交由服務端的實現具體的處理邏輯而後再回寫內容

Code.SLICE.source("T args = getEmptyArgsInstance();")
     .interpretation("拿到參數的類型,這裏就是 say_args");
//..
Code.SLICE.source("args.read(iprot);")
        .interpretation("從say_args的scheme(say_argsStandardScheme)中讀取參數");
//..
Code.SLICE.source("TBase result = getResult(iface, args);")
        .interpretation("調用實現類,去執行用戶本身寫的邏輯,並獲得對應的結果");
//...
Code.SLICE.source("oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));" +
        " result.write(oprot);\n" +
        " oprot.writeMessageEnd();\n" +
        " oprot.getTransport().flush();")
        .interpretation("開始往返回Stream中寫入數據,代表這是對那個方法的返回值,而後寫入返回的結果,最後輸入socket");

複製代碼

源碼追蹤詳細過程請戳這裏

TBinaryProtocol源碼總結

client會按照字節的寫入規則嚴格的寫入和讀取。底層通訊實際上就是socket,服務端接收到請求後,交由對應用戶的實現接口來調用實現類,再將結果寫入輸出流, 客戶端等結果返回後再按照規則讀取結果,完成1次rpc的調用

附錄

Thrift: Scalable Cross-Language Services Implementation

thrift的類型

IDL語言語法

相關文章
相關標籤/搜索