RPC-Thrift(三)

  TProtocol

    TProtocol定義了消息怎麼進行序列化和反序列化的。html

    TProtocol的類結構圖以下:算法

    

    TBinaryProtocol:二進制編碼格式;apache

    TCompactProtocol:高效率,密集的二進制編碼格式,使用了zigzag壓縮算法,使用了相似於ProtocolBuffer的Variable-Length Quantity (VLQ) 編碼方式;數組

    TTupleProtocol:繼承於TCompactProtocol,C++中不支持,Java中支持;框架

    TJSONProtocol:JSON格式;ide

    TProtocolFactory接口:只有getProtocol一個方法,用於經過一個TTransport對象建立一個TProtocol對象;TProtocolFactory接口的實現類都在TProtocol子類中。函數

    TProtocol

      TProtocol是全部消息協議的父類,定義了公用的抽象方法。源碼分析

public abstract class TProtocol {
  @SuppressWarnings("unused")
  private TProtocol() {}
  protected TTransport trans_;//Transport
  protected TProtocol(TTransport trans) {
    trans_ = trans;
  }
  public TTransport getTransport() {
    return trans_;
  }
  /**
   * Writing methods.寫方法
   */
  //TMessage表示一個請求,發送請求時調用的方法
  public abstract void writeMessageBegin(TMessage message) throws TException;
  public abstract void writeMessageEnd() throws TException;
  //TStruct表示一個對象,寫對象時調用的方法
  public abstract void writeStructBegin(TStruct struct) throws TException;
  public abstract void writeStructEnd() throws TException;
  //TField表示一個字段,寫字段時調用的方法
  public abstract void writeFieldBegin(TField field) throws TException;
  public abstract void writeFieldEnd() throws TException;
  //寫字段結束標誌調用的方法
  public abstract void writeFieldStop() throws TException;
  //寫TMap時調用的方法
  public abstract void writeMapBegin(TMap map) throws TException;
  public abstract void writeMapEnd() throws TException;
  //寫TList時調用的方法
  public abstract void writeListBegin(TList list) throws TException;
  public abstract void writeListEnd() throws TException;
  //寫TSet時調用的方法
  public abstract void writeSetBegin(TSet set) throws TException;
  public abstract void writeSetEnd() throws TException;
  //String和基本數據類型的寫
  public abstract void writeBool(boolean b) throws TException;
  public abstract void writeByte(byte b) throws TException;
  public abstract void writeI16(short i16) throws TException;
  public abstract void writeI32(int i32) throws TException;
  public abstract void writeI64(long i64) throws TException;
  public abstract void writeDouble(double dub) throws TException;
  public abstract void writeString(String str) throws TException;
  //將buf中的數據寫出
  public abstract void writeBinary(ByteBuffer buf) throws TException;

  /**
   * Reading methods.讀方法
   */
  public abstract TMessage readMessageBegin() throws TException;
  public abstract void readMessageEnd() throws TException;
  public abstract TStruct readStructBegin() throws TException;
  public abstract void readStructEnd() throws TException;
  public abstract TField readFieldBegin() throws TException;
  public abstract void readFieldEnd() throws TException;
  public abstract TMap readMapBegin() throws TException;
  public abstract void readMapEnd() throws TException;
  public abstract TList readListBegin() throws TException;
  public abstract void readListEnd() throws TException;
  public abstract TSet readSetBegin() throws TException;
  public abstract void readSetEnd() throws TException;
  public abstract boolean readBool() throws TException;
  public abstract byte readByte() throws TException;
  public abstract short readI16() throws TException;
  public abstract int readI32() throws TException;
  public abstract long readI64() throws TException;
  public abstract double readDouble() throws TException;
  public abstract String readString() throws TException;
  public abstract ByteBuffer readBinary() throws TException;

  public void reset() {}
  public Class<? extends IScheme> getScheme() {
    return StandardScheme.class;
  }
}

    TBinaryProtocol

      TBinaryProtocol是二進制編碼。this

public class TBinaryProtocol extends TProtocol {
  private static final TStruct ANONYMOUS_STRUCT = new TStruct();
  //版本號
  protected static final int VERSION_MASK = 0xffff0000;//掩碼
  protected static final int VERSION_1 = 0x80010000;//版本號

  protected boolean strictRead_ = false;//是否嚴格讀
  protected boolean strictWrite_ = true;//是否嚴格寫

  protected int readLength_;
  protected boolean checkReadLength_ = false;

  //工廠類
  public static class Factory implements TProtocolFactory {
    protected boolean strictRead_ = false;
    protected boolean strictWrite_ = true;
    protected int readLength_;
    public Factory() {
      this(false, true);
    }
    public Factory(boolean strictRead, boolean strictWrite) {
      this(strictRead, strictWrite, 0);
    }
    public Factory(boolean strictRead, boolean strictWrite, int readLength) {
      strictRead_ = strictRead;
      strictWrite_ = strictWrite;
      readLength_ = readLength;
    }
    //經過TTransport實例獲取一個TBinaryProtocol實例
    public TProtocol getProtocol(TTransport trans) {
      TBinaryProtocol proto = new TBinaryProtocol(trans, strictRead_, strictWrite_);
      if (readLength_ != 0) {
        proto.setReadLength(readLength_);
      }
      return proto;
    }
  }
  public TBinaryProtocol(TTransport trans) {
    this(trans, false, true);
  }
  public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) {
    super(trans);
    strictRead_ = strictRead;
    strictWrite_ = strictWrite;
  }
  /**
   * 寫方法
   */
  //發送一個請求,最終轉換爲對基本數據類型的寫
  public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {//
      int version = VERSION_1 | message.type;//版本號和消息類型與運算
      writeI32(version);//調用writeI32寫版本號
      writeString(message.name);//調用writeString寫方法名
      writeI32(message.seqid);//調用writeI32寫序列號
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }
  public void writeMessageEnd() {}
  public void writeStructBegin(TStruct struct) {}
  public void writeStructEnd() {}
  //寫字段方法,最終轉換爲對基本數據類型的寫
  public void writeFieldBegin(TField field) throws TException {
    writeByte(field.type);
    writeI16(field.id);
  }
  public void writeFieldEnd() {}
  public void writeFieldStop() throws TException {
    writeByte(TType.STOP);
  }
  //寫Map
  public void writeMapBegin(TMap map) throws TException {
    writeByte(map.keyType);//寫Key類型
    writeByte(map.valueType);//寫value類型
    writeI32(map.size);//寫map大小
  }
  public void writeMapEnd() {}
  //寫List
  public void writeListBegin(TList list) throws TException {
    writeByte(list.elemType);//寫元素類型
    writeI32(list.size);//寫list大小
  }
  public void writeListEnd() {}
  //寫Set
  public void writeSetBegin(TSet set) throws TException {
    writeByte(set.elemType);//寫元素類型
    writeI32(set.size);//寫Set大小
  }
  public void writeSetEnd() {}
  //寫bool轉換爲寫writeByte
  public void writeBool(boolean b) throws TException {
    writeByte(b ? (byte)1 : (byte)0);
  }
  private byte [] bout = new byte[1];
  public void writeByte(byte b) throws TException {
    bout[0] = b;
    trans_.write(bout, 0, 1);
  }
  private byte[] i16out = new byte[2];
  public void writeI16(short i16) throws TException {
    i16out[0] = (byte)(0xff & (i16 >> 8));
    i16out[1] = (byte)(0xff & (i16));
    trans_.write(i16out, 0, 2);
  }
  private byte[] i32out = new byte[4];
  public void writeI32(int i32) throws TException {
    i32out[0] = (byte)(0xff & (i32 >> 24));
    i32out[1] = (byte)(0xff & (i32 >> 16));
    i32out[2] = (byte)(0xff & (i32 >> 8));
    i32out[3] = (byte)(0xff & (i32));
    trans_.write(i32out, 0, 4);
  }
  private byte[] i64out = new byte[8];
  public void writeI64(long i64) throws TException {
    i64out[0] = (byte)(0xff & (i64 >> 56));
    i64out[1] = (byte)(0xff & (i64 >> 48));
    i64out[2] = (byte)(0xff & (i64 >> 40));
    i64out[3] = (byte)(0xff & (i64 >> 32));
    i64out[4] = (byte)(0xff & (i64 >> 24));
    i64out[5] = (byte)(0xff & (i64 >> 16));
    i64out[6] = (byte)(0xff & (i64 >> 8));
    i64out[7] = (byte)(0xff & (i64));
    trans_.write(i64out, 0, 8);
  }
  //寫Double轉換爲writeI64
  public void writeDouble(double dub) throws TException {
    writeI64(Double.doubleToLongBits(dub));
  }
  //寫String
  public void writeString(String str) throws TException {
    try {
      byte[] dat = str.getBytes("UTF-8");//轉換爲字節數組
      writeI32(dat.length);//寫數組長度
      trans_.write(dat, 0, dat.length);//寫數據
    } catch (UnsupportedEncodingException uex) {
      throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
  }
  //寫ByteBuffer
  public void writeBinary(ByteBuffer bin) throws TException {
    int length = bin.limit() - bin.position();
    writeI32(length);
    trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
  }

  /**
   * Reading methods.讀方法
   */
  //讀一個請求,與寫請求對應
  public TMessage readMessageBegin() throws TException {
    int size = readI32();//讀取消息的頭部(4字節),多是版本號和消息類型的組合,也可能直接是消息方法名
    if (size < 0) {
      //若是小於0,就是二進制爲第一位以1開頭,說明是帶有版本號的
      //校驗版本號是否正確
      int version = size & VERSION_MASK;
      if (version != VERSION_1) {
        throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
      }
      //三個參數依次爲方法名、消息類型、消息序列號
      return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
    } else {
      if (strictRead_) {
        throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
      }
      //readStringBody(size)爲方法名,readByte()爲消息類型,readI32()爲消息序列號
      return new TMessage(readStringBody(size), readByte(), readI32());
    }
  }
  public void readMessageEnd() {}
  public TStruct readStructBegin() {
    return ANONYMOUS_STRUCT;
  }
  public void readStructEnd() {}
  public TField readFieldBegin() throws TException {
    byte type = readByte();
    short id = type == TType.STOP ? 0 : readI16();
    return new TField("", type, id);
  }
  public void readFieldEnd() {}
  public TMap readMapBegin() throws TException {
    return new TMap(readByte(), readByte(), readI32());
  }
  public void readMapEnd() {}
  public TList readListBegin() throws TException {
    return new TList(readByte(), readI32());
  }
  public void readListEnd() {}
  public TSet readSetBegin() throws TException {
    return new TSet(readByte(), readI32());
  }
  public void readSetEnd() {}
  public boolean readBool() throws TException {
    return (readByte() == 1);
  }
  private byte[] bin = new byte[1];
  public byte readByte() throws TException {
    if (trans_.getBytesRemainingInBuffer() >= 1) {
      byte b = trans_.getBuffer()[trans_.getBufferPosition()];
      trans_.consumeBuffer(1);
      return b;
    }
    readAll(bin, 0, 1);
    return bin[0];
  }
  private byte[] i16rd = new byte[2];
  public short readI16() throws TException {
    byte[] buf = i16rd;
    int off = 0;
    if (trans_.getBytesRemainingInBuffer() >= 2) {
      buf = trans_.getBuffer();
      off = trans_.getBufferPosition();
      trans_.consumeBuffer(2);
    } else {
      readAll(i16rd, 0, 2);
    }
    return
      (short)
      (((buf[off] & 0xff) << 8) |
       ((buf[off+1] & 0xff)));
  }
  private byte[] i32rd = new byte[4];
  public int readI32() throws TException {
    byte[] buf = i32rd;
    int off = 0;
    if (trans_.getBytesRemainingInBuffer() >= 4) {
      buf = trans_.getBuffer();
      off = trans_.getBufferPosition();
      trans_.consumeBuffer(4);
    } else {
      readAll(i32rd, 0, 4);
    }
    return
      ((buf[off] & 0xff) << 24) |
      ((buf[off+1] & 0xff) << 16) |
      ((buf[off+2] & 0xff) <<  8) |
      ((buf[off+3] & 0xff));
  }
  private byte[] i64rd = new byte[8];
  public long readI64() throws TException {
    byte[] buf = i64rd;
    int off = 0;
    if (trans_.getBytesRemainingInBuffer() >= 8) {
      buf = trans_.getBuffer();
      off = trans_.getBufferPosition();
      trans_.consumeBuffer(8);
    } else {
      readAll(i64rd, 0, 8);
    }
    return
      ((long)(buf[off]   & 0xff) << 56) |
      ((long)(buf[off+1] & 0xff) << 48) |
      ((long)(buf[off+2] & 0xff) << 40) |
      ((long)(buf[off+3] & 0xff) << 32) |
      ((long)(buf[off+4] & 0xff) << 24) |
      ((long)(buf[off+5] & 0xff) << 16) |
      ((long)(buf[off+6] & 0xff) <<  8) |
      ((long)(buf[off+7] & 0xff));
  }
  public double readDouble() throws TException {
    return Double.longBitsToDouble(readI64());
  }
  public String readString() throws TException {
    int size = readI32();
    if (trans_.getBytesRemainingInBuffer() >= size) {
      try {
        String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
        trans_.consumeBuffer(size);
        return s;
      } catch (UnsupportedEncodingException e) {
        throw new TException("JVM DOES NOT SUPPORT UTF-8");
      }
    }
    return readStringBody(size);
  }
  public String readStringBody(int size) throws TException {
    try {
      checkReadLength(size);
      byte[] buf = new byte[size];
      trans_.readAll(buf, 0, size);
      return new String(buf, "UTF-8");
    } catch (UnsupportedEncodingException uex) {
      throw new TException("JVM DOES NOT SUPPORT UTF-8");
    }
  }
  public ByteBuffer readBinary() throws TException {
    int size = readI32();
    checkReadLength(size);
    if (trans_.getBytesRemainingInBuffer() >= size) {
      ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size);
      trans_.consumeBuffer(size);
      return bb;
    }
    byte[] buf = new byte[size];
    trans_.readAll(buf, 0, size);
    return ByteBuffer.wrap(buf);
  }
  private int readAll(byte[] buf, int off, int len) throws TException {
    checkReadLength(len);
    return trans_.readAll(buf, off, len);
  }
  public void setReadLength(int readLength) {
    readLength_ = readLength;
    checkReadLength_ = true;
  }
  protected void checkReadLength(int length) throws TException {
    if (length < 0) {
      throw new TException("Negative length: " + length);
    }
    if (checkReadLength_) {
      readLength_ -= length;
      if (readLength_ < 0) {
        throw new TException("Message length exceeded: " + length);
      }
    }
  }
}

 

      其中TMessage表示一個請求,看一下TMessage的結構。編碼

public final class TMessage {
  public TMessage() {
    this("", TType.STOP, 0);
  }
  public TMessage(String n, byte t, int s) {
    name = n;
    type = t;
    seqid = s;
  }
  public final String name;//方法名
  public final byte type;//消息類型
  public final int seqid;//消息序列號
  @Override
  public String toString() {
    return "<TMessage name:'" + name + "' type: " + type + " seqid:" + seqid + ">";
  }
  @Override
  public boolean equals(Object other) {
    if (other instanceof TMessage) {
      return equals((TMessage) other);
    }
    return false;
  }
  public boolean equals(TMessage other) {
    return name.equals(other.name) && type == other.type && seqid == other.seqid;
  }
}

      消息類型定義以下:

public final class TMessageType {
  public static final byte CALL  = 1;//客戶端請求
  public static final byte REPLY = 2;//服務端響應
  public static final byte EXCEPTION = 3;//服務端返回異常
  public static final byte ONEWAY = 4;//單向RPC,客戶端請求不要求服務端響應
}

 

       TField表示一個字段,TField結構以下:

public class TField {
  public TField() {
    this("", TType.STOP, (short)0);
  }
  public TField(String n, byte t, short i) {
    name = n;
    type = t;
    id = i;
  }
  public final String name;//字段名
  public final byte   type;//字段類型
  public final short  id;//該字段在對象中的序號,與Thrift文件中的序號一致
  public String toString() {
    return "<TField name:'" + name + "' type:" + type + " field-id:" + id + ">";
  }
  public boolean equals(TField otherField) {
    return type == otherField.type && id == otherField.id;
  }
}

      Thrift定義的數據類型

public final class TType {
  public static final byte STOP   = 0;
  public static final byte VOID   = 1;
  public static final byte BOOL   = 2;
  public static final byte BYTE   = 3;
  public static final byte DOUBLE = 4;
  public static final byte I16    = 6;
  public static final byte I32    = 8;
  public static final byte I64    = 10;
  public static final byte STRING = 11;
  public static final byte STRUCT = 12;
  public static final byte MAP    = 13;
  public static final byte SET    = 14;
  public static final byte LIST   = 15;
  public static final byte ENUM   = 16;
}

 

  TProcessor

    TProcessor是服務端Thrift框架轉入用戶邏輯的關鍵。先看一下類結構圖。TProcessor對TServer中一次請求的InputProtocol和OutputTProtocol進行操做,也就是從InputProtocol中讀出Client的請求數據,向OutputProtcol中寫入用戶邏輯的返回值。

    

 

    TProcessor接口:只有一個抽象方法process();

    TBaseProcessor類:實現了TProcessor接口,給出了process()方法的具體實現;

    Processor類:經過thrift文件自動生成的代碼,繼承了TBaseProcessor類,在Processor類內部爲每個方法生成了一個類,如sayHello類;

    TProcessorFactory類:一個工廠類,返回一個TProcessor單例;

    ProcessFunction類:處理函數抽象類。

    HelloService類:由thrift文件自動生成的類,包含內部類Processor類、參數類、返回結果類等等。

    

    TProcessor

public interface TProcessor {
  public boolean process(TProtocol in, TProtocol out)
    throws TException;
}

    TBaseProcessor

public abstract class TBaseProcessor<I> implements TProcessor {
  private final I iface;//業務邏輯實現的接口,接口中的方法即thrift文件中定義的方法
  private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;//處理方法Map,key爲方法名,Value爲方法對象
  protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
    this.iface = iface;
    this.processMap = processFunctionMap;
  }
  @Override
  public boolean process(TProtocol in, TProtocol out) throws TException {
    TMessage msg = in.readMessageBegin();//讀出客戶端發的請求
    ProcessFunction fn = processMap.get(msg.name);//根據方法名從processMap中找處處理方法
    if (fn == null) {
      //若是找不到該方法,返回異常
      TProtocolUtil.skip(in, TType.STRUCT);
      in.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
      x.write(out);
      out.writeMessageEnd();
      out.getTransport().flush();
      return true;
    }
    fn.process(msg.seqid, in, out, iface);//處理請求
    return true;
  }
}

    ProcessFunction

       Thrift將方法抽象爲ProcessFunction類,每個方法都會生成一個ProcessFunction類的子類。

public abstract class ProcessFunction<I, T extends TBase> {
  private final String methodName;//方法名
  public ProcessFunction(String methodName) {
    this.methodName = methodName;
  }
  //處理請求
  public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
    T args = getEmptyArgsInstance();//獲取一個參數實例(TBase子類的參數實例),由子類實現
    try {
      args.read(iprot);//從iprot中讀取參數,具體實現由thrift文件自動生成(見sayHello_args類)
    } catch (TProtocolException e) {
      //讀取參數異常,返回異常
      iprot.readMessageEnd();
      TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage());
      oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid));
      x.write(oprot);
      oprot.writeMessageEnd();
      oprot.getTransport().flush();
      return;
    }
    iprot.readMessageEnd();
    TBase result = getResult(iface, args);//獲取處理結果,此時調用業務邏輯,具體實現由thrift文件自動生成(見sayHello類)
    oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
    result.write(oprot);//將result寫到oprot,具體實現由thrift文件自動生成(見sayHello_result類)
    oprot.writeMessageEnd();
    oprot.getTransport().flush();
  }
  //獲取處理結果,調用業務邏輯,由子類實現
  protected abstract TBase getResult(I iface, T args) throws TException;
  //獲取一個參數實例,由子類實現
  protected abstract T getEmptyArgsInstance();
  public String getMethodName() {
    return methodName;
  }
}

 

     Processor

  public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
    public Processor(I iface) {
      super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }
    protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }
    //初始化processMap,在Processor初始化時會調用該方法
    private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("sayHello", new sayHello());
      return processMap;
    }
    //每一個方法生成一個類
    private static class sayHello<I extends Iface> extends org.apache.thrift.ProcessFunction<I, sayHello_args> {
      public sayHello() {
        super("sayHello");
      }
      //獲取空參數實例
      protected sayHello_args getEmptyArgsInstance() {
        return new sayHello_args();
      }
      //獲取返回結果
      protected sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException {
        sayHello_result result = new sayHello_result();
        result.success = iface.sayHello(args.paramJson);//調用業務邏輯接口的sayHello方法,
        return result;
      }
    }
  }

   

  最後總結一下TProcessor的處理流程:

    1)TServer接收到請求後,調用TProcessor的process(TProtocol in, TProtocol out)方法進行處理;

    2)TProcessor經過in.readMessageBegin()獲取客戶端請求,並根據請求方法名找到對應的ProcessFunction實例;

    3)調用ProcessFunction的process方法;

      首先從inTProtocol中讀取參數,

      而後經過調用getResult(iface, args)方法調用業務邏輯,獲取到返回結果,

      最後將返回結果寫入到outTProtocol。

    

    

 

參考資料

  Apache Thrift設計概要

  Thrift源碼分析(二)-- 協議和編解碼

  Thrift RPC詳解

相關文章
相關標籤/搜索