Thrift入門及Java實例演示【轉】

概述

Thrift是一個軟件框架,用來進行可擴展且跨語言的服務的開發。它結合了功能強大的軟件堆棧和代碼生成引擎,以構建在 C++、Java、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、and OCaml 等等編程語言間無縫結合的、高效的服務。java

Thrift最初由facebook開發,07年四月開放源碼,08年5月進入Apache孵化器。Thrift容許你定義一個簡單的定義文件中的數據類型和服務接口。以做爲輸入文件,編譯器生成代碼用來方便地生成RPC客戶端和服務器通訊的無縫跨編程語言。apache

官網地址:thrift.apache.org編程

下載配置

Maven構建項目,在pom.xml 中添加以下內容:服務器

<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.11.0</version>
</dependency>

 

  • 基本概念

    數據類型

    • 基本類型:
      bool:布爾值,true 或 false,對應 Java 的 boolean
      byte:8 位有符號整數,對應 Java 的 byte
      i16:16 位有符號整數,對應 Java 的 short
      i32:32 位有符號整數,對應 Java 的 int
      i64:64 位有符號整數,對應 Java 的 long
      double:64 位浮點數,對應 Java 的 double
      string:utf-8編碼的字符串,對應 Java 的 String網絡

    • 結構體類型:
      struct:定義公共的對象,相似於 C 語言中的結構體定義,在 Java 中是一個 JavaBean併發

    • 容器類型:
      list:對應 Java 的 ArrayList
      set:對應 Java 的 HashSet
      map:對應 Java 的 HashMap框架

    • 異常類型:
      exception:對應 Java 的 Exception異步

    • 服務類型:
      service:對應服務的類async

    服務端編碼基本步驟

    1. 實現服務處理接口impl
    2. 建立TProcessor
    3. 建立TServerTransport
    4. 建立TProtocol
    5. 建立TServer
    6. 啓動Server

    客戶端編碼基本步驟

    1. 建立Transport
    2. 建立TProtocol
    3. 基於TTransport和TProtocol建立Client
    4. 調用Client的相應方法

    數據傳輸協議

    1. TBinaryProtocol 二進制格式
    2. TCompactProtocol 壓縮格式
    3. TJSONProtocol JSON格式
    4. TSimpleJSONProtocol 提供JSON只寫協議,生成的文件很容易經過腳本語言解析

    提示:客戶端和服務端的協議要一致編程語言

    實例演示

    Thrift生成代碼

    建立Thrift文件,好比D:\Tonny\Doc\Project\prj-oxygen\common\src\main\java\org\tonny\thrift\demo\config\news\NewsModel.thrift ,內容以下:

  • namespace java org.tonny.thrift.demo

    service HelloWorldService {
      string sayHello(1:string username)
    }

  • include類型的數據,文件引用,因此要用「文件名+類型」,即文件名.模塊名
  • 使用從官網提供下載的thrift-0.11.0.exe,運用這個工具生成相關代碼:

    thrift-0.9.3.exe -r -gen java ./HelloWorld.thrift

    將生成的HelloWorldService.java 文件複製到本身測試的工程中,個人工程是用Maven構建的,故在pom.xml中增長以下內容:

    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.3</version>
    </dependency>
    <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-log4j12</artifactId>
     <version>1.7.5</version>
    </dependency>
  • 實現接口Iface

    Java代碼:HelloWorldImpl.java

    package com.thrift.demo;
    
    import org.apache.thrift.TException;
    
    public class HelloWorldImpl implements HelloWorldService.Iface {
    
     public HelloWorldImpl() {
     }
    
     @Override
     public String sayHello(String username) throws TException {
     return "Hi," + username + " welcome to thrift demo world";
     }
    
    }
    

    TSimpleServer服務端

    簡單的單線程服務模型,通常用於測試。
    編寫服務端server代碼:ThriftServer.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TBinaryProtocol.Factory;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TSimpleServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TServerSocket;
    import org.apache.thrift.transport.TTransportException;
    import org.apache.thrift.transport.TTransportFactory;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl;
    
    /**
     ************************************************************
     * @類名 ThriftServer
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class ThriftServerDemo {
    
        public void startServer() {
            try {
                System.out.println("Starting Thrift Server......");
    
                TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    
                TServerSocket serverTransport = new TServerSocket(8191);
    
                TTransportFactory transportFactory = new TFramedTransport.Factory();
    
                Factory factory = new TBinaryProtocol.Factory();
    
                TServer.Args tArgs = new TServer.Args(serverTransport);
                tArgs.protocolFactory(factory);
                tArgs.transportFactory(transportFactory);
                tArgs.processor(processor);
    
                // 簡單的單線程服務模型,通常用於測試
                TServer server = new TSimpleServer(tArgs);
    
                server.serve();
    
            } catch (TTransportException e) {
                System.out.println("Starting Thrift Server......Error!!!");
                e.printStackTrace();
            }
    
        }
    
        public static void main(String[] args) {
            ThriftServerDemo server = new ThriftServerDemo();
            server.startServer();
        }
    
    }

    編寫客戶端Client代碼:ThriftClientDemo.java

    package com.thrift.demo.client;
    
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.HelloWorldService.Client;
    
    /**
     ************************************************************
     * @類名 ThriftClient
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class ThriftClientDemo {
    
        public static void main(String[] args) {
            try {
                TTransport transport = new TFramedTransport(new TSocket("127.0.0.1", 8191, 5000));
                // 協議要和服務端一致
                TProtocol protocol = new TBinaryProtocol(transport);
    
                Client client = new HelloWorldService.Client(protocol);
    
                transport.open();
    
                String string = client.sayHello("Neo");
    
                System.out.println(string);
    
                transport.close();
    
            } catch (TTransportException e) {
                e.printStackTrace();
            } catch (TException e) {
                e.printStackTrace();
            }
        }
    
    }

    先運行服務端程序,日誌以下:

    Starting Thrift Server......

    再運行客戶端調用程序,日誌以下:

    Hello World,Hello Thrift!!! Hi:Neo

    測試成功,和預期的返回信息一致。

    TThreadPoolServer 服務模型

    線程池服務模型,使用標準的阻塞式IO,預先建立一組線程處理請求。
    編寫服務端代碼:HelloServerDemo.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadPoolServer;
    import org.apache.thrift.transport.TServerSocket;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl;
    
    /**
     ************************************************************
     * @類名 HelloServerDemo
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class HelloServerDemo {
        public static final int SERVER_PORT = 8191;
    
        public void startServer() {
            try {
                System.out.println("HelloWorld TThreadPoolServer start ....");
    
                TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    
                TServerSocket serverTransport = new TServerSocket(SERVER_PORT);
                TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
                ttpsArgs.processor(tprocessor);
                ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());
    
                // 線程池服務模型,使用標準的阻塞式IO,預先建立一組線程處理請求。
                TServer server = new TThreadPoolServer(ttpsArgs);
                server.serve();
    
            } catch (Exception e) {
                System.out.println("Server start error!!!");
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            HelloServerDemo server = new HelloServerDemo();
            server.startServer();
        }
    
    }

    客戶端Client代碼和以前的同樣,只要數據傳輸的協議一致便可,客戶端測試成功,結果以下:

    Hello World,Hello Thrift!!! Hi:Neo

    TNonblockingServer 服務模型

    使用非阻塞式IO,服務端和客戶端須要指定 TFramedTransport 數據傳輸的方式。
    編寫服務端代碼:HelloServerDemo.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.server.TNonblockingServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl;
    
    /**
     ************************************************************
     * @類名 HelloServerDemo
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class HelloServerDemo {
        public static final int SERVER_PORT = 8191;
    
        public void startServer() {
            try {
                System.out.println("HelloWorld TNonblockingServer start ....");
    
                TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    
                TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT);
                TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
                tnbArgs.processor(tprocessor);
                tnbArgs.transportFactory(new TFramedTransport.Factory());
                tnbArgs.protocolFactory(new TCompactProtocol.Factory());
    
                // 使用非阻塞式IO,服務端和客戶端須要指定TFramedTransport數據傳輸的方式
                TServer server = new TNonblockingServer(tnbArgs);
                server.serve();
    
            } catch (Exception e) {
                System.out.println("Server start error!!!");
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            HelloServerDemo server = new HelloServerDemo();
            server.startServer();
        }
    }

    編寫客戶端代碼:HelloClientDemo.java

    package com.thrift.demo.client;
    
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    
    import com.thrift.demo.service.HelloWorldService;
    
    /**
     ************************************************************
     * @類名 HelloClientDemo
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class HelloClientDemo {
    
        public static final String SERVER_IP = "127.0.0.1";
    
        public static final int SERVER_PORT = 8191;
    
        public static final int TIMEOUT = 30000;
    
        public void startClient(String userName) {
            TTransport transport = null;
            try {
                transport = new TFramedTransport(new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT));
                // 協議要和服務端一致
                TProtocol protocol = new TCompactProtocol(transport);
                HelloWorldService.Client client = new HelloWorldService.Client(protocol);
                transport.open();
                String result = client.sayHello(userName);
                System.out.println("Thrify client result =: " + result);
            } catch (TTransportException e) {
                e.printStackTrace();
            } catch (TException e) {
                e.printStackTrace();
            } finally {
                if (null != transport) {
                    transport.close();
                }
            }
        }
    
        public static void main(String[] args) {
            HelloClientDemo client = new HelloClientDemo();
            client.startClient("Neo");
    
        }
    }

    客戶端的測試成功,結果以下:

    Thrify client result =: Hello World,Hello Thrift!!! Hi:Neo

    THsHaServer服務模型

    半同步半異步的服務端模型,須要指定爲: TFramedTransport 數據傳輸的方式。
    編寫服務端代碼:HelloServerDemo.java

    package com.thrift.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.THsHaServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl;
    
    /**
     ************************************************************
     * @類名 HelloServerDemo
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class HelloServerDemo {
    
        public static final int SERVER_PORT = 8191;
    
        public void startServer() {
            try {
                System.out.println("HelloWorld THsHaServer start ....");
    
                TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    
                TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT);
                THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
                thhsArgs.processor(tprocessor);
                thhsArgs.transportFactory(new TFramedTransport.Factory());
                thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
    
                // 半同步半異步的服務模型
                TServer server = new THsHaServer(thhsArgs);
                server.serve();
    
            } catch (Exception e) {
                System.out.println("Server start error!!!");
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            HelloServerDemo server = new HelloServerDemo();
            server.startServer();
        }
    }

    客戶端代碼和上一個服務模型的Client中的相似,只要注意傳輸協議一致以及指定傳輸方式爲TFramedTransport。

    異步客戶端

    編寫服務端代碼:HelloServerDemo.java

    package com.thrift.demo.client;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.server.TNonblockingServer;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.impl.HelloWorldServiceImpl;
    
    /**
     ************************************************************
     * @類名 HelloServerDemo
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class HelloServerDemo {
    
        public static final int SERVER_PORT = 8191;
    
        public void startServer() {
            try {
                System.out.println("HelloWorld TNonblockingServer start ....");
    
                TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    
                TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT);
                TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
                tnbArgs.processor(tprocessor);
                tnbArgs.transportFactory(new TFramedTransport.Factory());
                tnbArgs.protocolFactory(new TCompactProtocol.Factory());
    
                // 使用非阻塞式IO,服務端和客戶端須要指定TFramedTransport數據傳輸的方式
                TServer server = new TNonblockingServer(tnbArgs);
                server.serve();
    
            } catch (Exception e) {
                System.out.println("Server start error!!!");
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            HelloServerDemo server = new HelloServerDemo();
            server.startServer();
        }
    }

    編寫客戶端Client代碼:HelloAsynClientDemo.java

    package com.thrift.demo.client;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.thrift.TException;
    import org.apache.thrift.async.AsyncMethodCallback;
    import org.apache.thrift.async.TAsyncClientManager;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.protocol.TProtocolFactory;
    import org.apache.thrift.transport.TNonblockingSocket;
    import org.apache.thrift.transport.TNonblockingTransport;
    
    import com.thrift.demo.service.HelloWorldService;
    import com.thrift.demo.service.HelloWorldService.AsyncClient.sayHello_call;
    
    /**
     ************************************************************
     * @類名 HelloAsynClientDemo
     * 
     * @AUTHOR Neo
     ************************************************************
     */
    public class HelloClientDemo {
    
        public static final String SERVER_IP = "127.0.0.1";
    
        public static final int SERVER_PORT = 8191;
    
        public static final int TIMEOUT = 30000;
    
        public void startClient(String userName) {
            try {
                TAsyncClientManager clientManager = new TAsyncClientManager();
                TNonblockingTransport transport = new TNonblockingSocket(SERVER_IP, SERVER_PORT, TIMEOUT);
    
                TProtocolFactory tprotocol = new TCompactProtocol.Factory();
                HelloWorldService.AsyncClient asyncClient = new HelloWorldService.AsyncClient(tprotocol, clientManager, transport);
                System.out.println("Client start .....");
    
                CountDownLatch latch = new CountDownLatch(1);
                AsynCallback callBack = new AsynCallback(latch);
                System.out.println("call method sayHello start ...");
                asyncClient.sayHello(userName, callBack);
                System.out.println("call method sayHello .... end");
                boolean wait = latch.await(30, TimeUnit.SECONDS);
                System.out.println("latch.await =:" + wait);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("startClient end.");
        }
    
        public class AsynCallback implements AsyncMethodCallback<sayHello_call> {
    
            private CountDownLatch latch;
    
            public AsynCallback(CountDownLatch latch) {
                this.latch = latch;
            }
    
            @Override
            public void onComplete(sayHello_call response) {
                System.out.println("onComplete");
                try {
                    // Thread.sleep(1000L * 1);
                    System.out.println("AsynCall result =:" + response.getResult().toString());
                } catch (TException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }
    
            @Override
            public void onError(Exception exception) {
                System.out.println("onError :" + exception.getMessage());
                latch.countDown();
            }
        }
    
        public static void main(String[] args) {
            HelloAsynClientDemo client = new HelloAsynClientDemo();
            client.startClient("Neo");
        }
        
    }

    先運行服務程序,再運行客戶端程序,測試結果以下:

    Client start .....
    call method sayHello start ...
    call method sayHello .... end
    onComplete
    AsynCall result =:Hello World,Hello Thrift!!! Hi:Neo
    latch.await =:true
    startClient end.

    設計思路

    • Thrift的Server類型有TSimpleServer、TNonblockingServer、THsHaServer、TThreadedSelectorServer、TThreadPoolServer
    • TSimpleServer是單線程阻塞IO的方式,僅用於demo
    • TNonblockingServer是單線程非阻塞IO的方式,經過java.nio.channels.Selector的select()接收鏈接請求,可是處理消息仍然是單線程,吞吐量有限不可用於生產
    • THsHaServer使用一個單獨的線程處理IO,一個獨立的worker線程池處理消息, 能夠並行處理全部請求
    • TThreadPoolServer使用一個專用鏈接接收connection,一旦接收到請求就會放入ThreadPoolExecutor中的一個worker裏處理,當請求處理完畢該worker釋放並回到線程池中,能夠配置線程最大值,當達到線程最大值時請求會被阻塞。TThreadPoolServer性能表現優異,代價是併發高時會建立大量線程
    • TThreadedSelectorServer是thrift 0.8引入的實現,處理IO也使用了線程池,比THsHaServer有更高的吞吐量和更低的時延,與TThreadPoolServer比性能相近且能應對網絡IO較多的狀況
    • 對於客戶端較少的狀況,TThreadPoolServer也有優異的性能表現,可是考慮到將來SOA可能的高併發,決定採用TThreadedSelectorServer
相關文章
相關標籤/搜索