對於 RPC 協議來講,最重要的就是對象的發送與接收,這就要用到序列化與反序列化,也稱爲編碼和解碼,序列化與反序列化和網絡傳輸通常都在對應的 RPC 框架中完成。java
序列化與反序列化的流程以下:git
JavaBean-> stub(client) <->skeleton(server)->JavaBean,簡單點說就是編碼和解碼。apache
相比於 RMI 遠程方法調用,不少 RPC 遠程過程調用的跨語言的,這就須要序列化於反序列化協議也支持跨語言。Google Procobuf 就是這樣一種跨語言的序列化於反序列化協議,效率很是高(怎麼作到比其餘協議效率高那?比其餘協議壓縮生成的對象小)。網絡
Netty 對於 ProtoBuf 提供了很好的支持。多線程
先看如何單獨使用 Google ProtoBuf架構
新建 .proto 結構描述文件框架
syntax = "proto2"; package com.paul.protobuf; //加快解析速度 option optimize_for = SPEED; option java_package = "com.paul.protobuf"; option java_outer_classname = "DataInfo"; message Student{ reuqired string name = 1; option int32 = 2; option string address = 3; }
使用對應的編譯文件生成對應的 Java 類socket
Proton —java_out src/main/java src/protobuf/Student.protoasync
這時在咱們代碼的 src/main/java 文件夾下生成了一個新的 pkg com.paul.protobuf,裏面生成了 DataInfo 類。對象會有對應的 builder 方法讓咱們來構建。maven
測試序列化方法
// 構建對象->字節->對象 public class ProtoBufTest{ public static void main(String[] args) throws Exception{ DataInfo.Student student = DataInfo.Student.newBuilder().setName("張三").setAge(20).setAddress("abc").build(); byte[] student2ByteArray = student.toByteArray(); DataInfo.Student student2 = DataInfo.Student.parseFrom(student2ByteArray); System.out.println(studdent2); } }
在來看 Netty 對 Google ProtoBuf 的支持
仍是隻給出不同的部分(服務單和客戶端的這部分是同樣的):
@Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); //解碼器 pipeline.addLast(new ProtobufDecoder(DataInfo.Student.getDefaultInstance())); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); //編碼器 pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new MyServerHandler()); }
測試方法就是在客戶端組裝一個 DataInfo.Student 而後發送給服務端,這裏就不演示了。
你們可能會發現上面的代碼存在一個問題,就是上面的程序只能對 DataInfo.Student 進行編解碼,若是傳遞消息的類型有多種怎麼辦那?
解決方案一:定義義協議,須要本身實現解碼器,經過前兩位來標識具體的 JavaBean 類型。
解決方案二:定義一個最外層的類,經過枚舉的方式來肯定傳遞的 JavaBean 類型。
好比咱們有兩個 JavaBean
message MyMessage{ enum DataType{ PersonType = 1; DogType = 2; CatType = 3; } required Datatype data_type = 1; //oneof 在同一時刻只有一個字段會被設置,字段之間會共享內存,後面設置會自動清空前面的。 oneof dataBody{ Person person = 2; Dog dog = 3; Cat cat = 4; } } message Person{ option string name = 1; option int32 age = 2; option string address = 3; } message Dog{ option string name = 1; option int32 age = 2; } message Cat{ option string name = 1; option int32 city = 2; }
Pipeline 的改動(客戶端和服務端):
pipeline.addLast(new ProtobufDecoder(DataInfo.MyMessage.getDefaultInstance()));
咱們本身的 handler 的改動:
@Overrode public void channelActive(ChannelHandlerContext ctx) throws Exception{ MyDataInfo.MyMessage myMessage = MyDataInfo.MyMessage.newBuilder(). setDataType(DataType.PersonType.PersonType). setPerson(MyDataInfo.Person.newBuilder(). setName("張三").setAge(20). setAddress("111").build()). build(); ctx.channel().writeAndFlush(myMessage); }
服務端 handler 根據 enum 的類型分別進行解析。
在實際的應用環境中,咱們客戶端和服務端大機率是兩個分開的應用程序,此時咱們使用 Google ProtoBuf 時 .proto 文件和對應的 class 文件是否是須要在兩邊都保存一份,若是有修改會很是麻煩。下面咱們介紹一種最佳實踐。
最佳實踐是使用 git 做爲版本控制系統爲前提的:
不那麼好的方案:git submodule,就至關於 maven 的子模塊,客戶端和服務端都依賴這個模塊。
比較好的方案:git subtree,將公共的代碼合併到 server 和 client 端,至關於向 server 和 client 提交代碼。
Apache Thrift 和 Google ProtoBuf 總體很是類似,適用於可伸縮的跨語言的服務開發。Thrift 至關於 Netty + Google ProtoBuf,是一個高性能 RPC 框架。Thrift 底層是 socket + RPC 的模式。
Thrift 是一個典型的 CS 結構,客戶端和服務端可使用不一樣的語言開發,既然客戶端和服務端能使用不一樣的語言開發,那麼必定有一種中間語言來關聯服務端和客戶端,這就是 IDL(Interface Description Language)。
Thrift 如何實現多語言之間的通訊?
數據傳輸使用 socket (多種語言均支持),數據再以特定的格式(String 等)發送,接收方語言進行解析。
如何使用?
定義 thrift 的文件,由 thrift 文件(IDL) 生成雙方語言的接口,model,在生成的 model 以及接口中會有解碼編碼的代碼。
Thrift 中的服務
Thrift 定義服務至關於 Java 中建立 Interface 同樣,建立的 service 通過代碼生成命令以後就會生成客戶端和服務端的框架代碼,定義形式以下:
service HelloWorldService{ //service 中定義的函數,至關於 java interface 中定義的方法 string doAction(1:string name, 2:i32 age); }
.thrift 文件的定義
// java 中的包名 namespace jave thrift.generate // 定義別名 typedef i16 short typedef i32 int typedef i64 long typedef bool boolean typedef string String struct Person{ 1: optional String username, 2: optional int age, 3: optional boolean married } exception DataException{ 1: optional String message, 2: optional String callStack, 3: optional String date } service PersonService{ Person getPersonByName(1: required String username) throws (1: DataException dataException), void savePerson(1:requried Person person) throws (1:DataException dataException) }
編譯 thrift 文件
thrift --gen java src/thrift/data.thrift
生成的文件
Person.java 裏面包含了編解碼的方法,PersonService 裏面包含了 getPersonByName 和 savePerson 的方法。
測試方法:
服務端服務的具體實現方法
public class PersonServiceImpl implements PersonService.Iface{ @Override public Person getPersonByName(String username) throws DataException,TException{ Person p = new Person(); p.setUserName("paul"); p.setAge(25); p.setMarried(true); return p; } @Override public void savePerson(Person person) throws DataException,TException{ System.out.println(person.getUserName()); } }
Thrift 的服務端:
public class ThriftServer{ public static void main(String[] args){ //非阻塞的 socket server TNonblockingServerSocket socket = new TNonblockingServerSocket(8899); // 高可用的 server THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(4); PersonService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl()); arg.protocolFactory(new TCompactPrococol.Factory()); arg.transportFactory(new TFramedTransport.Facotry()); arg.processorFactory(new TProcessorFactory(processor)); TServer server = new THsHaServer(arg); System.out.println("Thrift Server Started"); //死循環 server.serve(); } }
Thrift 的客戶端:
public class ThriftClient{ publiuc static void main(String[] args){ TTransport transport = new TFramedTransport(new TSocket ("localhost",8899),600); TProcotol procotol = new TComapctProcotol(transport); PersonService.Client client = new PersonService.Client(procotol); try{ //打開 socket transport.open(); //好像調用本地方法同樣 Person person = client.getPersonByName("paul"); System.out.println(person.getAge()); }catch(Exception ex){ throw ex; }finally{ transport.close(); } } }
Thrift 的架構:
Thrift 的傳輸格式,協議:
TBinaryProtocol-二進制格式
TCompactProtocol-壓縮格式
TJSONProtocol-JSON 格式
TSimpleJSONProtocol-提供 JSON 只寫協議,生成的文件很容易經過腳本語言解析。不多使用,缺乏元數據信息,接收方不能讀取出來。
TDebugProtocol-使用易懂的可讀的文本格式,以便於 debug。
Thrift 數據傳輸方式,transport:
TSocket-阻塞式 socket。
TFramedTransport-以 frame 爲單位進行傳輸,非阻塞式服務中使用。
TFileTransport-以文件形式進行傳輸。
TMemoryTransport-將內存用於 I/O,Java 實現時內部實際使用了簡單的 ByteArrayOutputStream。
支持的服務模型,server:
TSimpleServer-簡單的單線程服務模型,經常使用於測試。
TThreadPoolServer-多線程服務模型,標準的阻塞式 IO。
TNonboockingServer-多線程服務模型,使用非阻塞式 IO(須要使用 TFramedTransport 數據傳輸方式)。
THsHaServer-THsHa 引入了線程池去處理,其模型把讀寫任務放到線程池處理。Half-sync/Half-async 的處理模式,Half-sync 是在處理 IO 事件上,Half-async 用於 handler 對 rpc 的同步處理。