精通併發與 Netty (二)經常使用的 rpc 框架

Google Protobuf 使用方式分析

對於 RPC 協議來講,最重要的就是對象的發送與接收,這就要用到序列化與反序列化,也稱爲編碼和解碼,序列化與反序列化和網絡傳輸通常都在對應的 RPC 框架中完成。java

序列化與反序列化的流程以下:git

JavaBean-> stub(client) <->skeleton(server)->JavaBean,簡單點說就是編碼和解碼。apache

相比於 RMI 遠程方法調用,不少 RPC 遠程過程調用的跨語言的,這就須要序列化於反序列化協議也支持跨語言。Google Procobuf 就是這樣一種跨語言的序列化於反序列化協議,效率很是高(怎麼作到比其餘協議效率高那?比其餘協議壓縮生成的對象小)。網絡

Netty 對於 ProtoBuf 提供了很好的支持。多線程

先看如何單獨使用 Google ProtoBuf架構

  1. 新建 .proto 結構描述文件框架

    syntax = "proto2";
    
    package com.paul.protobuf;
    
    //加快解析速度
    option optimize_for = SPEED;
    option java_package = "com.paul.protobuf";
    option java_outer_classname = "DataInfo";
    
    message Student{
      reuqired string name = 1;
      option int32 = 2;
      option string address = 3;
    }
  2. 使用對應的編譯文件生成對應的 Java 類socket

    Proton —java_out src/main/java src/protobuf/Student.protoasync

  3. 這時在咱們代碼的 src/main/java 文件夾下生成了一個新的 pkg com.paul.protobuf,裏面生成了 DataInfo 類。對象會有對應的 builder 方法讓咱們來構建。maven

  4. 測試序列化方法

    // 構建對象->字節->對象
    public class ProtoBufTest{
      public static void main(String[] args) throws Exception{
        DataInfo.Student student = DataInfo.Student.newBuilder().setName("張三").setAge(20).setAddress("abc").build();
        byte[] student2ByteArray = student.toByteArray();
        DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray);
        System.out.println(studdent2);
      }
    
    }

在來看 Netty 對 Google ProtoBuf 的支持

仍是隻給出不同的部分(服務單和客戶端的這部分是同樣的):

@Override
protected void initChannel(SocketChannel ch) throws Exception{
  ChannelPipeline pipeline = ch.pipeline();
  pipeline.addLast(new ProtobufVarint32FrameDecoder());
  //解碼器
  pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance()));
  pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
  //編碼器
  pipeline.addLast(new ProtobufEncoder());
  pipeline.addLast(new MyServerHandler());
}

測試方法就是在客戶端組裝一個 DataInfo.Student 而後發送給服務端,這裏就不演示了。

你們可能會發現上面的代碼存在一個問題,就是上面的程序只能對 DataInfo.Student 進行編解碼,若是傳遞消息的類型有多種怎麼辦那?

解決方案一:定義義協議,須要本身實現解碼器,經過前兩位來標識具體的 JavaBean 類型。

解決方案二:定義一個最外層的類,經過枚舉的方式來肯定傳遞的 JavaBean 類型。

好比咱們有兩個 JavaBean

message MyMessage{
    enum DataType{
     PersonType = 1;
     DogType = 2;
     CatType = 3;
    }
  required Datatype data_type = 1;
  //oneof 在同一時刻只有一個字段會被設置,字段之間會共享內存,後面設置會自動清空前面的。
  oneof dataBody{
        Person person = 2;
    Dog dog = 3;
    Cat cat = 4;
  }
}

message Person{
    option string name = 1;
  option int32 age = 2;
  option string address = 3;
}

message Dog{
    option string name = 1;
  option int32 age = 2;
}

message Cat{
    option string name = 1;
  option int32 city = 2;
}

Pipeline 的改動(客戶端和服務端):

pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));

咱們本身的 handler 的改動:

@Overrode
public void channelActive(ChannelHandlerContext ctx) throws Exception{
  MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder().
       setDataType(DataType.PersonType.PersonType).
       setPerson(MyDataInfo.Person.newBuilder().
                setName("張三").setAge(20).
                setAddress("111").build()).
       build();
       ctx.channel().writeAndFlush(myMessage);
        
}

服務端 handler 根據 enum 的類型分別進行解析。

在實際的應用環境中,咱們客戶端和服務端大機率是兩個分開的應用程序,此時咱們使用 Google ProtoBuf 時 .proto 文件和對應的 class 文件是否是須要在兩邊都保存一份,若是有修改會很是麻煩。下面咱們介紹一種最佳實踐。

最佳實踐是使用 git 做爲版本控制系統爲前提的:

不那麼好的方案:git submodule,就至關於 maven 的子模塊,客戶端和服務端都依賴這個模塊。

比較好的方案:git subtree,將公共的代碼合併到 server 和 client 端,至關於向 server 和 client 提交代碼。

Apache Thrift 使用方式與文件編寫方式分析

Apache Thrift 和 Google ProtoBuf 總體很是類似,適用於可伸縮的跨語言的服務開發。Thrift 至關於 Netty + Google ProtoBuf,是一個高性能 RPC 框架。Thrift 底層是 socket + RPC 的模式。

Thrift 是一個典型的 CS 結構,客戶端和服務端可使用不一樣的語言開發,既然客戶端和服務端能使用不一樣的語言開發,那麼必定有一種中間語言來關聯服務端和客戶端,這就是 IDL(Interface Description Language)。

Thrift 如何實現多語言之間的通訊?

數據傳輸使用 socket (多種語言均支持),數據再以特定的格式(String 等)發送,接收方語言進行解析。

如何使用?

定義 thrift 的文件,由 thrift 文件(IDL) 生成雙方語言的接口,model,在生成的 model 以及接口中會有解碼編碼的代碼。

Thrift 中的服務

Thrift 定義服務至關於 Java 中建立 Interface 同樣,建立的 service 通過代碼生成命令以後就會生成客戶端和服務端的框架代碼,定義形式以下:

service HelloWorldService{
  //service 中定義的函數,至關於 java interface 中定義的方法
  string doAction(1:string name, 2:i32 age);
}

.thrift 文件的定義

// java 中的包名
namespace jave thrift.generate
// 定義別名
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

struct Person{
  1: optional String username,
  2: optional int age,
  3: optional boolean married
}

exception DataException{
  1: optional String message,
  2: optional String callStack,
  3: optional String date
}

service PersonService{
  Person getPersonByName(1: required String username) throws (1: DataException dataException),
  void savePerson(1:requried Person person) throws (1:DataException dataException)
}

編譯 thrift 文件

thrift --gen java src/thrift/data.thrift

生成的文件

Person.java 裏面包含了編解碼的方法,PersonService 裏面包含了 getPersonByName 和 savePerson 的方法。

測試方法:

服務端服務的具體實現方法

public class PersonServiceImpl implements PersonService.Iface{
  @Override
  public Person getPersonByName(String username) throws DataException,TException{
    Person p = new Person();
    p.setUserName("paul");
    p.setAge(25);
    p.setMarried(true);
    return p;
  }
  
  @Override
  public void savePerson(Person person) throws DataException,TException{
    System.out.println(person.getUserName());
  }
  
}

Thrift 的服務端:

public class ThriftServer{
  public static void main(String[] args){
    //非阻塞的 socket server
    TNonblockingServerSocket socket = new TNonblockingServerSocket(8899);
    // 高可用的 server
    THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4);
    PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());
    
    arg.protocolFactory(new TCompactPrococol.Factory());
    arg.transportFactory(new TFramedTransport.Facotry());
    arg.processorFactory(new TProcessorFactory(processor));
    
    TServer server = new THsHaServer(arg);
    System.out.println("Thrift Server Started");
    //死循環
    server.serve();
  }
}

Thrift 的客戶端:

public class ThriftClient{
  publiuc static void main(String[] args){
    TTransport transport = new TFramedTransport(new TSocket
("localhost",8899),600);
    TProcotol procotol = new TComapctProcotol(transport);
    PersonService.Client client = new PersonService.Client(procotol);
    
    try{
      //打開 socket
      transport.open();
      //好像調用本地方法同樣
      Person person = client.getPersonByName("paul");
      System.out.println(person.getAge());
    }catch(Exception ex){
      throw ex;
    }finally{
      transport.close();
    }
  }
}

Thrift 的架構:

Thrift 的傳輸格式,協議:

TBinaryProtocol-二進制格式

TCompactProtocol-壓縮格式

TJSONProtocol-JSON 格式

TSimpleJSONProtocol-提供 JSON 只寫協議,生成的文件很容易經過腳本語言解析。不多使用,缺乏元數據信息,接收方不能讀取出來。

TDebugProtocol-使用易懂的可讀的文本格式,以便於 debug。

Thrift 數據傳輸方式,transport:

TSocket-阻塞式 socket。

TFramedTransport-以 frame 爲單位進行傳輸,非阻塞式服務中使用。

TFileTransport-以文件形式進行傳輸。

TMemoryTransport-將內存用於 I/O,Java 實現時內部實際使用了簡單的 ByteArrayOutputStream。

支持的服務模型,server:

TSimpleServer-簡單的單線程服務模型,經常使用於測試。

TThreadPoolServer-多線程服務模型,標準的阻塞式 IO。

TNonboockingServer-多線程服務模型,使用非阻塞式 IO(須要使用 TFramedTransport 數據傳輸方式)。

THsHaServer-THsHa 引入了線程池去處理,其模型把讀寫任務放到線程池處理。Half-sync/Half-async 的處理模式,Half-sync 是在處理 IO 事件上,Half-async 用於 handler 對 rpc 的同步處理。

相關文章
相關標籤/搜索