高性能RPC over MINA&google protobuf 代碼&實例 (一)

做者的其餘文章還不錯 ! java

原文:  服務器

http://maoyidao.iteye.com/blog/1636923 架構


最近團隊在開發基於移動互聯網的項目,又一次涉及到post service,即在服務器集羣之間投遞消息。是的,又是一個RPC服務。RPC實現方式從笨重的CORBA,SOAP over HTTP,XMPP over TCP,到輕量級的protobuf,scribe和Avro。這裏不想比較各自的應用場景(另外後面三種RPC方式極爲接近,都是經過提供Object <-> 二進制映射來提升高效的傳輸),本文的目的是給你們一點能夠實際操做的代碼:java如何用protobuf 實現rpc socket

 

和protobuf-socket-rpc的區別

protobuf-socket-rpc(code.google.com/p/protobuf-socket-rpc/)是googlecode爲rpc寫的簡單實現。本文介紹的代碼和googlecode不一樣之處在於: 工具

1,基於NIO post

2,增長了校驗碼 性能

高性能RPC over google protobuf

 

Google's protocol buffer library makes writing rpc services easy, but it does not contain a rpc implementation. The transport details are left up to the user to implement. ui

google把這問題留給了咱們,那麼看看應該怎麼實現。hellow world僞代碼應該是這樣的: google

 

Java代碼   收藏代碼
  1. MessageLite message = getMessage(); // get a proto message object by proto file  
  2.   
  3. OutputStream out = getOutputStream();  
  4. InputStream in = getInputStream();  
  5. message.writeDelimitedTo(out); //  Like writeTo(OutputStream), but writes the size of the message as a varint before writing the data  
  6. messageBuilder.mergeDelimitedFrom(in);  
 

好了,這樣就實現了序列化和反序列化。在真正的內容以前加入內容長度,這是一種最簡單的實現。爲了能可靠的進行傳輸,我在消息長度前加入了2個byte的驗證碼,下面就開始逐步構建個人rpc代碼。 spa

 

定義你的proto文件,爲傳輸多種消息,須要有「命令」字段:好比:Maoyidao.proto

List 1:

 

Java代碼   收藏代碼
  1. package com.maoyidao.rpc;  
  2.   
  3. message MaoyidaoPacket {  
  4.   required int32 cmd = 1;           
  5.   required int32 subcmd = 2;     
  6.   optional bytes content = 3;        
  7. }  

 

 

OK,compile it to Java class: protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/addressbook.proto

你會獲得一個MaoyidaoPacket 類,而後你須要這樣得到實例:

List 2:

 

Java代碼   收藏代碼
  1. Maoyidao.MaoyidaoPacket packet = Maoyidao.MaoyidaoPacket.newBuilder()  
  2.    .setCmd(mycmd)  
  3.    .setSubcmd(mysubcmd)  
  4.    .setContent(ByteString.copyFromUtf8("some message")).build();   

 

 

咱們先不討論怎麼基於MINA建立一個NIO,先假設咱們得到了一個OutputStream,看看怎麼把消息寫出去(其中的關鍵是用一些特殊的字符來區分你的消息,這是RPC over TCP的基本要求):

List 3:

 

Java代碼   收藏代碼
  1. private final void writeObject(OutputStream os, Maoyidao.MaoyidaoPacket packet)  {        
  2.         ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  3.         com.google.protobuf.CodedOutputStream cos = com.google.protobuf.CodedOutputStream.newInstance(baos);      
  4.         cos.writeRawVarint32(3);  
  5.         cos.writeRawVarint32(7);  
  6.         cos.writeRawVarint32(packet.getSerializedSize());  
  7.         vpacket.writeTo(cos);  
  8.         cos.flush();          
  9.         os.write(baos.toByteArray());                 
  10.             baos.close();  
  11.     }  
  12. }  

 

注意我不只寫了分隔符,還寫了content長度。

 

讀進來的時候要用相同的方式解析,假設咱們獲得了一個Bytebuffer,熟悉NIO的同窗知道,你老是會從ByteBuffer中讀取數據。同時我須要用到com.google.protobuf.CodedInputStream:Reads and decodes protocol message fields. This class contains two kinds of methods: methods that read specific protocol message constructs and field types (e.g. readTag() and readInt32()) and methods that read low-level values (e.g. readRawVarint32() and readRawBytes(int)).)這樣我就能夠從inputstream中讀到校驗碼:

 

 

Java代碼   收藏代碼
  1. ByteBuffer in = getByteBuffer();  
  2. CodedInputStream cis = CodedInputStream.newInstance(in);  
  3.   
  4. int flag1 = cis.readRawVarint32();  
  5. int flag2 = cis.readRawVarint32();  
  6. if(flag1 != 3 || flag2 != 7){  
  7.     // find some error  
  8. }  
  9.   
  10. int contentLength = cis.readRawVarint32();  
  11. int contentLength0 = contentLength + CodedOutputStream.computeRawVarint32Size(contentLength);  
  12.   
  13. if(in.remaining() >= contentLength0){              
  14.     try {  
  15.         Maoyidao.MaoyidaoPacket.Builder builder = Maoyidao.MaoyidaoPacket.newBuilder();  
  16.         CodedInputStream.newInstance(getBytesFromIn(in,contentLength0)).readMessage(  
  17.             builder, ExtensionRegistry.getEmptyRegistry());               
  18.         out.write(builder.build());  
  19.         in.position(in.position() + protocolLength);  
  20.         return true;  
  21.     } catch (Exception e) {  
  22.         //   
  23.     }             
  24. }  
  25.   
  26. // ByteBuffer沒有足夠的數據,等待下一次  
  27. // do something  
 

截止目前,咱們完成了帶校驗碼的基於protobuf的消息序列化和反序列。在這個實現中,我更偏向把protobuf當作一個序列化工具來使用,總體仍是依賴MINA自己提供的架構,這部分將在本系列的下一篇中詳細闡述。

 

 

本文系maoyidao原創,轉載請引用原連接:

http://maoyidao.iteye.com/blog/1636923

相關文章
相關標籤/搜索