Apache Thrift系列詳解(二) - 網絡服務模型

前言

Thrift提供的網絡服務模型單線程多線程事件驅動,從另外一個角度劃分爲:阻塞服務模型非阻塞服務模型java

  • 阻塞服務模型:TSimpleServerTThreadPoolServer算法

  • 非阻塞服務模型:TNonblockingServerTHsHaServerTThreadedSelectorServer數據庫

TServer類的層次關係:apache


正文

TServer

TServer定義了靜態內部類ArgsArgs繼承自抽象類AbstractServerArgsAbstractServerArgs採用了建造者模式,向TServer提供各類工廠:編程

工廠屬性 工廠類型 做用
ProcessorFactory TProcessorFactory 處理層工廠類,用於具體的TProcessor對象的建立
InputTransportFactory TTransportFactory 傳輸層輸入工廠類,用於具體的TTransport對象的建立
OutputTransportFactory TTransportFactory 傳輸層輸出工廠類,用於具體的TTransport對象的建立
InputProtocolFactory TProtocolFactory 協議層輸入工廠類,用於具體的TProtocol對象的建立
OutputProtocolFactory TProtocolFactory 協議層輸出工廠類,用於具體的TProtocol對象的建立

下面是TServer的部分核心代碼:後端

public abstract class TServer {
    public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
        public Args(TServerTransport transport) {
            super(transport);
        }
    }

    public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
        final TServerTransport serverTransport;
        TProcessorFactory processorFactory;
        TTransportFactory inputTransportFactory = new TTransportFactory();
        TTransportFactory outputTransportFactory = new TTransportFactory();
        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

        public AbstractServerArgs(TServerTransport transport) {
            serverTransport = transport;
        }
    }

    protected TProcessorFactory processorFactory_;
    protected TServerTransport serverTransport_;
    protected TTransportFactory inputTransportFactory_;
    protected TTransportFactory outputTransportFactory_;
    protected TProtocolFactory inputProtocolFactory_;
    protected TProtocolFactory outputProtocolFactory_;
    private boolean isServing;

    protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
        processorFactory_ = args.processorFactory;
        serverTransport_ = args.serverTransport;
        inputTransportFactory_ = args.inputTransportFactory;
        outputTransportFactory_ = args.outputTransportFactory;
        inputProtocolFactory_ = args.inputProtocolFactory;
        outputProtocolFactory_ = args.outputProtocolFactory;
    }

    public abstract void serve();
    public void stop() {}

    public boolean isServing() {
        return isServing;
    }

    protected void setServing(boolean serving) {
        isServing = serving;
    }
}
複製代碼

TServer的三個方法:serve()stop()isServing()serve()用於啓動服務,stop()用於關閉服務,isServing()用於檢測服務的起停狀態。緩存

TServer不一樣實現類的啓動方式不同,所以serve()定義爲抽象方法。不是全部的服務都須要優雅的退出, 所以stop()方法沒有被定義爲抽象。服務器


TSimpleServer

TSimpleServer工做模式採用最簡單的阻塞IO,實現方法簡潔明瞭,便於理解,可是一次只能接收和處理一個socket鏈接,效率比較低。它主要用於演示Thrift的工做過程,在實際開發過程當中不多用到它。網絡

(一) 工做流程

(二) 使用入門

服務端:多線程

ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor processor =
            new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();

    TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
    tArgs.processor(processor);
    tArgs.protocolFactory(protocolFactory);
    // 簡單的單線程服務模型 通常用於測試
    TServer tServer = new TSimpleServer(tArgs);
    System.out.println("Running Simple Server");
    tServer.serve();
複製代碼

客戶端:

TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("Leo");
    System.out.println("Result =: " + result);
    transport.close();
複製代碼

(三) 源碼分析

查看上述流程的源代碼,即TSimpleServer.java中的serve()方法以下:

serve()方法的操做:

  1. 設置TServerSocketlisten()方法啓動鏈接監聽
  2. 阻塞的方式接受客戶端地鏈接請求,每進入一個鏈接即爲其建立一個通道TTransport對象。
  3. 爲客戶端建立處理器對象輸入傳輸通道對象輸出傳輸通道對象輸入協議對象輸出協議對象
  4. 經過TServerEventHandler對象處理具體的業務請求。

ThreadPoolServer

TThreadPoolServer模式採用阻塞socket方式工做,主線程負責阻塞式監聽是否有新socket到來,具體的業務處理交由一個線程池來處理。

(一) 工做流程

(二) 使用入門

服務端:

ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    TServerSocket serverTransport = new TServerSocket(serverSocket);
    HelloWorldService.Processor<HelloWorldService.Iface> processor =
            new HelloWorldService.Processor<>(new HelloWorldServiceImpl());

    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
    ttpsArgs.processor(processor);
    ttpsArgs.protocolFactory(protocolFactory);

    // 線程池服務模型 使用標準的阻塞式IO 預先建立一組線程處理請求
    TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
    System.out.println("Running ThreadPool Server");
    ttpsServer.serve();
複製代碼

客戶端:

TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();
    String result = client.say("ThreadPoolClient");
    System.out.println("Result =: " + result);
    transport.close();
複製代碼

(三) 源碼分析

ThreadPoolServer解決了TSimpleServer不支持併發多鏈接的問題,引入了線程池。實現的模型是One Thread Per Connection。查看上述流程的源代碼,先查看線程池的代碼片斷:

TThreadPoolServer.java中的serve()方法以下:

serve()方法的操做:

  1. 設置TServerSocketlisten()方法啓動鏈接監聽
  2. 阻塞的方式接受客戶端鏈接請求,每進入一個鏈接,將通道對象封裝成一個WorkerProcess對象(WorkerProcess實現了Runnabel接口),並提交到線程池
  3. WorkerProcessrun()方法負責業務處理,爲客戶端建立了處理器對象輸入傳輸通道對象輸出傳輸通道對象輸入協議對象輸出協議對象
  4. 經過TServerEventHandler對象處理具體的業務請求。

WorkerProcessrun()方法:

(四) 優缺點

TThreadPoolServer模式的優勢

拆分了監聽線程(Accept Thread)和處理客戶端鏈接工做線程(Worker Thread),數據讀取業務處理都交給線程池處理。所以在併發量較大時新鏈接也可以被及時接受。

線程池模式比較適合服務器端能預知最多有多少個客戶端併發的狀況,這時每一個請求都能被業務線程池及時處理,性能也很是高。

TThreadPoolServer模式的缺點

線程池模式的處理能力受限於線程池的工做能力,當併發請求數大於線程池中的線程數時,新請求也只能排隊等待


TNonblockingServer

TNonblockingServer模式也是單線程工做,可是採用NIO的模式,藉助Channel/Selector機制, 採用IO事件模型來處理。

全部的socket都被註冊到selector中,在一個線程中經過seletor循環監控全部的socket

每次selector循環結束時,處理全部的處於就緒狀態socket,對於有數據到來的socket進行數據讀取操做,對於有數據發送的socket則進行數據發送操做,對於監聽socket則產生一個新業務socket並將其註冊selector上。

注意:TNonblockingServer要求底層的傳輸通道必須使用TFramedTransport。

(一) 工做流程

(二) 使用入門

服務端:

TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    tnbArgs.processor(tprocessor);
    tnbArgs.transportFactory(new TFramedTransport.Factory());
    tnbArgs.protocolFactory(new TCompactProtocol.Factory());

    // 使用非阻塞式IO服務端和客戶端須要指定TFramedTransport數據傳輸的方式
    TServer server = new TNonblockingServer(tnbArgs);
    System.out.println("Running Non-blocking Server");
    server.serve();
複製代碼

客戶端:

TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // 協議要和服務端一致
    TProtocol protocol = new TCompactProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("NonBlockingClient");
    System.out.println("Result =: " + result);
    transport.close();
複製代碼

(三) 源碼分析

TNonblockingServer繼承於AbstractNonblockingServer,這裏咱們更關心基於NIOselector部分的關鍵代碼。

(四) 優缺點

TNonblockingServer模式優勢

相比於TSimpleServer效率提高主要體如今IO多路複用上TNonblockingServer採用非阻塞IO,對accept/read/writeIO事件進行監控處理,同時監控多個socket的狀態變化。

TNonblockingServer模式缺點

TNonblockingServer模式在業務處理上仍是採用單線程順序來完成。在業務處理比較複雜耗時的時候,例如某些接口函數須要讀取數據庫執行時間較長,會致使整個服務阻塞住,此時該模式效率也不高,由於多個調用請求任務依然是順序一個接一個執行。

THsHaServer

鑑於TNonblockingServer的缺點,THsHaServer繼承於TNonblockingServer,引入了線程池提升了任務處理的併發能力THsHaServer半同步半異步(Half-Sync/Half-Async)的處理模式,Half-Aysnc用於IO事件處理(Accept/Read/Write),Half-Sync用於業務handlerrpc同步處理上。

注意:THsHaServer和TNonblockingServer同樣,要求底層的傳輸通道必須使用TFramedTransport。

(一) 工做流程

(二) 使用入門

服務端:

TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // 半同步半異步
    THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
    thhsArgs.processor(tprocessor);
    thhsArgs.transportFactory(new TFramedTransport.Factory());
    thhsArgs.protocolFactory(new TBinaryProtocol.Factory());

    TServer server = new THsHaServer(thhsArgs);
    System.out.println("Running HsHa Server");
    server.serve();
複製代碼

客戶端:

TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    // 協議要和服務端一致
    TProtocol protocol = new TBinaryProtocol(transport);
    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    transport.open();

    String result = client.say("HsHaClient");
    System.out.println("Result =: " + result);
    transport.close();
複製代碼

(三) 源碼分析

THsHaServer繼承於TNonblockingServer,新增了線程池併發處理工做任務的功能,查看線程池的相關代碼:

任務線程池的建立過程:

下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源碼分析可參考TThreadedSelectorServer。

(四) 優缺點

THsHaServer的優勢

THsHaServerTNonblockingServer模式相比,THsHaServer在完成數據讀取以後,將業務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環操做,效率大大提高。

THsHaServer的缺點

主線程仍然須要完成全部socket監聽接收數據讀取數據寫入操做。當併發請求數較大時,且發送數據量較多時,監聽socket新鏈接請求不能被及時接受。


TThreadedSelectorServer

TThreadedSelectorServer是對THsHaServer的一種擴充,它將selector中的讀寫IO事件(read/write)從主線程中分離出來。同時引入worker工做線程池,它也是種Half-Sync/Half-Async的服務模型。

TThreadedSelectorServer模式是目前Thrift提供的最高級的線程服務模型,它內部有若是幾個部分構成:

  1. 一個AcceptThread線程對象,專門用於處理監聽socket上的新鏈接。
  2. 若干個SelectorThread對象專門用於處理業務socket網絡I/O讀寫操做,全部網絡數據的讀寫均是有這些線程來完成。
  3. 一個負載均衡器SelectorThreadLoadBalancer對象,主要用於AcceptThread線程接收到一個新socket鏈接請求時,決定將這個新鏈接請求分配給哪一個SelectorThread線程
  4. 一個ExecutorService類型的工做線程池,在SelectorThread線程中,監聽到有業務socket中有調用請求過來,則將請求數據讀取以後,交給ExecutorService線程池中的線程完成這次調用的具體執行。主要用於處理每一個rpc請求的handler回調處理(這部分是同步的)。

(一) 工做流程

(二) 使用入門

服務端:

TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    // 多線程半同步半異步
    TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
    ttssArgs.processor(processor);
    ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
    // 使用非阻塞式IO時 服務端和客戶端都須要指定數據傳輸方式爲TFramedTransport
    ttssArgs.transportFactory(new TFramedTransport.Factory());

    // 多線程半同步半異步的服務模型
    TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
    System.out.println("Running ThreadedSelector Server");
    server.serve();
複製代碼

客戶端:

for (int i = 0; i < 10; i++) {
    new Thread("Thread " + i) {
        @Override
        public void run() {
            // 設置傳輸通道 對於非阻塞服務 須要使用TFramedTransport(用於將數據分塊發送)
            for (int j = 0; j < 10; j++) {
                TTransport transport = null;
                try {
                    transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
                    TProtocol protocol = new TBinaryProtocol(transport);
                    HelloWorldService.Client client = new HelloWorldService.Client(protocol);
                    transport.open();
                    String result = client.say("ThreadedSelector Client");
                    System.out.println("Result =: " + result);
                    transport.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 關閉傳輸通道
                    transport.close();
                }
            }
        }
    }.start();
}
複製代碼

(三) 核心代碼

以上工做流程的三個組件AcceptThreadSelectorThreadExecutorService在源碼中的定義以下:

TThreadedSelectorServer模式中有一個專門的線程AcceptThread用於處理新鏈接請求,所以可以及時響應大量併發鏈接請求;另外它將網絡I/O操做分散到多個SelectorThread線程中來完成,所以可以快速對網絡I/O進行讀寫操做,可以很好地應對網絡I/O較多的狀況。

TThreadedSelectorServer默認參數定義以下:

  • 負責網絡IO讀寫的selector默認線程數(selectorThreads):2
  • 負責業務處理的默認工做線程數(workerThreads):5
  • 工做線程池單個線程的任務隊列大小(acceptQueueSizePerThread):4

建立、初始化並啓動AcceptThreadSelectorThreads,同時啓動selector線程的負載均衡器(selectorThreads)。

AcceptThread源碼

AcceptThread繼承於Thread,能夠看出包含三個重要的屬性:非阻塞式傳輸通道(TNonblockingServerTransport)、NIO選擇器(acceptSelector)和選擇器線程負載均衡器(threadChooser)。

查看AcceptThreadrun()方法,能夠看出accept線程一旦啓動,就會不停地調用select()方法:

查看select()方法,acceptSelector選擇器等待IO事件的到來,拿到SelectionKey即檢查是否是accept事件。若是是,經過handleAccept()方法接收一個新來的鏈接;不然,若是是IO讀寫事件AcceptThread不做任何處理,交由SelectorThread完成。

handleAccept()方法中,先經過doAccept()去拿鏈接通道,而後Selector線程負載均衡器選擇一個Selector線程,完成接下來的IO讀寫事件

接下來繼續查看doAddAccept()方法的實現,毫無懸念,它進一步調用了SelectorThreadaddAcceptedConnection()方法,把非阻塞傳輸通道對象傳遞給選擇器線程作進一步的IO讀寫操做

SelectorThreadLoadBalancer源碼

SelectorThreadLoadBalancer如何建立?

SelectorThreadLoadBalancer是一個基於輪詢算法Selector線程選擇器,經過線程迭代器爲新進來的鏈接順序分配SelectorThread

SelectorThread源碼

SelectorThreadAcceptThread同樣,是TThreadedSelectorServer的一個成員內部類,每一個SelectorThread線程對象內部都有一個阻塞式的隊列,用於存放該線程被接收鏈接通道

阻塞隊列的大小可由構造函數指定:

上面看到,在AcceptThreaddoAddAccept()方法中調用了SelectorThreadaddAcceptedConnection()方法。

這個方法作了兩件事:

  1. 將被此SelectorThread線程接收的鏈接通道放入阻塞隊列中。
  2. 經過wakeup()方法喚醒SelectorThread中的NIO選擇器selector

既然SelectorThread也是繼承於Thread,查看其run()方法的實現:

SelectorThread方法的select()監聽IO事件,僅僅用於處理數據讀取數據寫入。若是鏈接有數據可讀,讀取並以frame的方式緩存;若是須要向鏈接中寫入數據,緩存併發送客戶端的數據。且在數據讀寫處理完成後,須要向NIOselector清空註銷自身的SelectionKey

  • 數據寫操做完成之後,整個rpc調用過程也就結束了,handleWrite()方法以下:

  • 數據讀操做完成之後,Thrift會利用已讀數據執行目標方法handleRead()方法以下:

handleRead方法在執行read()方法,將數據讀取完成後,會調用requestInvoke()方法調用目標方法完成具體業務處理。requestInvoke()方法將請求數據封裝爲一個Runnable對象,提交到工做任務線程池(ExecutorService)進行處理。

select()方法完成後,線程繼續運行processAcceptedConnections()方法處理下一個鏈接IO事件。

這裏比較核心的幾個操做:

  1. 嘗試從SelectorThread阻塞隊列acceptedQueue中獲取一個鏈接的傳輸通道。若是獲取成功,調用registerAccepted()方法;不然,進入下一次循環。
  2. registerAccepted()方法將傳輸通道底層的鏈接註冊到NIO選擇器selector上面,獲取到一個SelectionKey
  3. 建立一個FrameBuffer對象,並綁定到獲取的SelectionKey上面,用於數據傳輸時的中間讀寫緩存

總結

本文對Thrift的各類線程服務模型進行了介紹,包括2種阻塞式服務模型TSimpleServerTThreadPoolServer,3種非阻塞式服務模型TNonblockingServerTHsHaServerTThreadedSelectorServer。對各類服務模型的具體用法工做流程原理和源碼實現進行了必定程度的分析。

鑑於篇幅較長,請各位看官請慢慢批閱!

相關連接

  1. Apache Thrift系列詳解(一) - 概述與入門

  2. Apache Thrift系列詳解(二) - 網絡服務模型

  3. Apache Thrift系列詳解(三) - 序列化機制


歡迎關注技術公衆號: 零壹技術棧

零壹技術棧

本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。

相關文章
相關標籤/搜索