TProtocol定義了消息怎麼進行序列化和反序列化的。html
TProtocol的類結構圖以下:算法
TBinaryProtocol:二進制編碼格式;apache
TCompactProtocol:高效率,密集的二進制編碼格式,使用了zigzag壓縮算法,使用了相似於ProtocolBuffer的Variable-Length Quantity (VLQ) 編碼方式;數組
TTupleProtocol:繼承於TCompactProtocol,C++中不支持,Java中支持;框架
TJSONProtocol:JSON格式;ide
TProtocolFactory接口:只有getProtocol一個方法,用於經過一個TTransport對象建立一個TProtocol對象;TProtocolFactory接口的實現類都在TProtocol子類中。函數
TProtocol是全部消息協議的父類,定義了公用的抽象方法。源碼分析
public abstract class TProtocol { @SuppressWarnings("unused") private TProtocol() {} protected TTransport trans_;//Transport protected TProtocol(TTransport trans) { trans_ = trans; } public TTransport getTransport() { return trans_; } /** * Writing methods.寫方法 */ //TMessage表示一個請求,發送請求時調用的方法 public abstract void writeMessageBegin(TMessage message) throws TException; public abstract void writeMessageEnd() throws TException; //TStruct表示一個對象,寫對象時調用的方法 public abstract void writeStructBegin(TStruct struct) throws TException; public abstract void writeStructEnd() throws TException; //TField表示一個字段,寫字段時調用的方法 public abstract void writeFieldBegin(TField field) throws TException; public abstract void writeFieldEnd() throws TException; //寫字段結束標誌調用的方法 public abstract void writeFieldStop() throws TException; //寫TMap時調用的方法 public abstract void writeMapBegin(TMap map) throws TException; public abstract void writeMapEnd() throws TException; //寫TList時調用的方法 public abstract void writeListBegin(TList list) throws TException; public abstract void writeListEnd() throws TException; //寫TSet時調用的方法 public abstract void writeSetBegin(TSet set) throws TException; public abstract void writeSetEnd() throws TException; //String和基本數據類型的寫 public abstract void writeBool(boolean b) throws TException; public abstract void writeByte(byte b) throws TException; public abstract void writeI16(short i16) throws TException; public abstract void writeI32(int i32) throws TException; public abstract void writeI64(long i64) throws TException; public abstract void writeDouble(double dub) throws TException; public abstract void writeString(String str) throws TException; //將buf中的數據寫出 public abstract void writeBinary(ByteBuffer buf) throws TException; /** * Reading methods.讀方法 */ public abstract TMessage readMessageBegin() throws TException; public abstract void readMessageEnd() throws TException; public abstract TStruct readStructBegin() throws TException; public abstract void readStructEnd() throws TException; public abstract TField readFieldBegin() throws TException; public abstract void readFieldEnd() throws TException; public abstract TMap readMapBegin() throws TException; public abstract void readMapEnd() throws TException; public abstract TList readListBegin() throws TException; public abstract void readListEnd() throws TException; public abstract TSet readSetBegin() throws TException; public abstract void readSetEnd() throws TException; public abstract boolean readBool() throws TException; public abstract byte readByte() throws TException; public abstract short readI16() throws TException; public abstract int readI32() throws TException; public abstract long readI64() throws TException; public abstract double readDouble() throws TException; public abstract String readString() throws TException; public abstract ByteBuffer readBinary() throws TException; public void reset() {} public Class<? extends IScheme> getScheme() { return StandardScheme.class; } }
TBinaryProtocol是二進制編碼。this
public class TBinaryProtocol extends TProtocol { private static final TStruct ANONYMOUS_STRUCT = new TStruct(); //版本號 protected static final int VERSION_MASK = 0xffff0000;//掩碼 protected static final int VERSION_1 = 0x80010000;//版本號 protected boolean strictRead_ = false;//是否嚴格讀 protected boolean strictWrite_ = true;//是否嚴格寫 protected int readLength_; protected boolean checkReadLength_ = false; //工廠類 public static class Factory implements TProtocolFactory { protected boolean strictRead_ = false; protected boolean strictWrite_ = true; protected int readLength_; public Factory() { this(false, true); } public Factory(boolean strictRead, boolean strictWrite) { this(strictRead, strictWrite, 0); } public Factory(boolean strictRead, boolean strictWrite, int readLength) { strictRead_ = strictRead; strictWrite_ = strictWrite; readLength_ = readLength; } //經過TTransport實例獲取一個TBinaryProtocol實例 public TProtocol getProtocol(TTransport trans) { TBinaryProtocol proto = new TBinaryProtocol(trans, strictRead_, strictWrite_); if (readLength_ != 0) { proto.setReadLength(readLength_); } return proto; } } public TBinaryProtocol(TTransport trans) { this(trans, false, true); } public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) { super(trans); strictRead_ = strictRead; strictWrite_ = strictWrite; } /** * 寫方法 */ //發送一個請求,最終轉換爲對基本數據類型的寫 public void writeMessageBegin(TMessage message) throws TException { if (strictWrite_) {// int version = VERSION_1 | message.type;//版本號和消息類型與運算 writeI32(version);//調用writeI32寫版本號 writeString(message.name);//調用writeString寫方法名 writeI32(message.seqid);//調用writeI32寫序列號 } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } } public void writeMessageEnd() {} public void writeStructBegin(TStruct struct) {} public void writeStructEnd() {} //寫字段方法,最終轉換爲對基本數據類型的寫 public void writeFieldBegin(TField field) throws TException { writeByte(field.type); writeI16(field.id); } public void writeFieldEnd() {} public void writeFieldStop() throws TException { writeByte(TType.STOP); } //寫Map public void writeMapBegin(TMap map) throws TException { writeByte(map.keyType);//寫Key類型 writeByte(map.valueType);//寫value類型 writeI32(map.size);//寫map大小 } public void writeMapEnd() {} //寫List public void writeListBegin(TList list) throws TException { writeByte(list.elemType);//寫元素類型 writeI32(list.size);//寫list大小 } public void writeListEnd() {} //寫Set public void writeSetBegin(TSet set) throws TException { writeByte(set.elemType);//寫元素類型 writeI32(set.size);//寫Set大小 } public void writeSetEnd() {} //寫bool轉換爲寫writeByte public void writeBool(boolean b) throws TException { writeByte(b ? (byte)1 : (byte)0); } private byte [] bout = new byte[1]; public void writeByte(byte b) throws TException { bout[0] = b; trans_.write(bout, 0, 1); } private byte[] i16out = new byte[2]; public void writeI16(short i16) throws TException { i16out[0] = (byte)(0xff & (i16 >> 8)); i16out[1] = (byte)(0xff & (i16)); trans_.write(i16out, 0, 2); } private byte[] i32out = new byte[4]; public void writeI32(int i32) throws TException { i32out[0] = (byte)(0xff & (i32 >> 24)); i32out[1] = (byte)(0xff & (i32 >> 16)); i32out[2] = (byte)(0xff & (i32 >> 8)); i32out[3] = (byte)(0xff & (i32)); trans_.write(i32out, 0, 4); } private byte[] i64out = new byte[8]; public void writeI64(long i64) throws TException { i64out[0] = (byte)(0xff & (i64 >> 56)); i64out[1] = (byte)(0xff & (i64 >> 48)); i64out[2] = (byte)(0xff & (i64 >> 40)); i64out[3] = (byte)(0xff & (i64 >> 32)); i64out[4] = (byte)(0xff & (i64 >> 24)); i64out[5] = (byte)(0xff & (i64 >> 16)); i64out[6] = (byte)(0xff & (i64 >> 8)); i64out[7] = (byte)(0xff & (i64)); trans_.write(i64out, 0, 8); } //寫Double轉換爲writeI64 public void writeDouble(double dub) throws TException { writeI64(Double.doubleToLongBits(dub)); } //寫String public void writeString(String str) throws TException { try { byte[] dat = str.getBytes("UTF-8");//轉換爲字節數組 writeI32(dat.length);//寫數組長度 trans_.write(dat, 0, dat.length);//寫數據 } catch (UnsupportedEncodingException uex) { throw new TException("JVM DOES NOT SUPPORT UTF-8"); } } //寫ByteBuffer public void writeBinary(ByteBuffer bin) throws TException { int length = bin.limit() - bin.position(); writeI32(length); trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length); } /** * Reading methods.讀方法 */ //讀一個請求,與寫請求對應 public TMessage readMessageBegin() throws TException { int size = readI32();//讀取消息的頭部(4字節),多是版本號和消息類型的組合,也可能直接是消息方法名 if (size < 0) { //若是小於0,就是二進制爲第一位以1開頭,說明是帶有版本號的 //校驗版本號是否正確 int version = size & VERSION_MASK; if (version != VERSION_1) { throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin"); } //三個參數依次爲方法名、消息類型、消息序列號 return new TMessage(readString(), (byte)(size & 0x000000ff), readI32()); } else { if (strictRead_) { throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?"); } //readStringBody(size)爲方法名,readByte()爲消息類型,readI32()爲消息序列號 return new TMessage(readStringBody(size), readByte(), readI32()); } } public void readMessageEnd() {} public TStruct readStructBegin() { return ANONYMOUS_STRUCT; } public void readStructEnd() {} public TField readFieldBegin() throws TException { byte type = readByte(); short id = type == TType.STOP ? 0 : readI16(); return new TField("", type, id); } public void readFieldEnd() {} public TMap readMapBegin() throws TException { return new TMap(readByte(), readByte(), readI32()); } public void readMapEnd() {} public TList readListBegin() throws TException { return new TList(readByte(), readI32()); } public void readListEnd() {} public TSet readSetBegin() throws TException { return new TSet(readByte(), readI32()); } public void readSetEnd() {} public boolean readBool() throws TException { return (readByte() == 1); } private byte[] bin = new byte[1]; public byte readByte() throws TException { if (trans_.getBytesRemainingInBuffer() >= 1) { byte b = trans_.getBuffer()[trans_.getBufferPosition()]; trans_.consumeBuffer(1); return b; } readAll(bin, 0, 1); return bin[0]; } private byte[] i16rd = new byte[2]; public short readI16() throws TException { byte[] buf = i16rd; int off = 0; if (trans_.getBytesRemainingInBuffer() >= 2) { buf = trans_.getBuffer(); off = trans_.getBufferPosition(); trans_.consumeBuffer(2); } else { readAll(i16rd, 0, 2); } return (short) (((buf[off] & 0xff) << 8) | ((buf[off+1] & 0xff))); } private byte[] i32rd = new byte[4]; public int readI32() throws TException { byte[] buf = i32rd; int off = 0; if (trans_.getBytesRemainingInBuffer() >= 4) { buf = trans_.getBuffer(); off = trans_.getBufferPosition(); trans_.consumeBuffer(4); } else { readAll(i32rd, 0, 4); } return ((buf[off] & 0xff) << 24) | ((buf[off+1] & 0xff) << 16) | ((buf[off+2] & 0xff) << 8) | ((buf[off+3] & 0xff)); } private byte[] i64rd = new byte[8]; public long readI64() throws TException { byte[] buf = i64rd; int off = 0; if (trans_.getBytesRemainingInBuffer() >= 8) { buf = trans_.getBuffer(); off = trans_.getBufferPosition(); trans_.consumeBuffer(8); } else { readAll(i64rd, 0, 8); } return ((long)(buf[off] & 0xff) << 56) | ((long)(buf[off+1] & 0xff) << 48) | ((long)(buf[off+2] & 0xff) << 40) | ((long)(buf[off+3] & 0xff) << 32) | ((long)(buf[off+4] & 0xff) << 24) | ((long)(buf[off+5] & 0xff) << 16) | ((long)(buf[off+6] & 0xff) << 8) | ((long)(buf[off+7] & 0xff)); } public double readDouble() throws TException { return Double.longBitsToDouble(readI64()); } public String readString() throws TException { int size = readI32(); if (trans_.getBytesRemainingInBuffer() >= size) { try { String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8"); trans_.consumeBuffer(size); return s; } catch (UnsupportedEncodingException e) { throw new TException("JVM DOES NOT SUPPORT UTF-8"); } } return readStringBody(size); } public String readStringBody(int size) throws TException { try { checkReadLength(size); byte[] buf = new byte[size]; trans_.readAll(buf, 0, size); return new String(buf, "UTF-8"); } catch (UnsupportedEncodingException uex) { throw new TException("JVM DOES NOT SUPPORT UTF-8"); } } public ByteBuffer readBinary() throws TException { int size = readI32(); checkReadLength(size); if (trans_.getBytesRemainingInBuffer() >= size) { ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size); trans_.consumeBuffer(size); return bb; } byte[] buf = new byte[size]; trans_.readAll(buf, 0, size); return ByteBuffer.wrap(buf); } private int readAll(byte[] buf, int off, int len) throws TException { checkReadLength(len); return trans_.readAll(buf, off, len); } public void setReadLength(int readLength) { readLength_ = readLength; checkReadLength_ = true; } protected void checkReadLength(int length) throws TException { if (length < 0) { throw new TException("Negative length: " + length); } if (checkReadLength_) { readLength_ -= length; if (readLength_ < 0) { throw new TException("Message length exceeded: " + length); } } } }
其中TMessage表示一個請求,看一下TMessage的結構。編碼
public final class TMessage { public TMessage() { this("", TType.STOP, 0); } public TMessage(String n, byte t, int s) { name = n; type = t; seqid = s; } public final String name;//方法名 public final byte type;//消息類型 public final int seqid;//消息序列號 @Override public String toString() { return "<TMessage name:'" + name + "' type: " + type + " seqid:" + seqid + ">"; } @Override public boolean equals(Object other) { if (other instanceof TMessage) { return equals((TMessage) other); } return false; } public boolean equals(TMessage other) { return name.equals(other.name) && type == other.type && seqid == other.seqid; } }
消息類型定義以下:
public final class TMessageType { public static final byte CALL = 1;//客戶端請求 public static final byte REPLY = 2;//服務端響應 public static final byte EXCEPTION = 3;//服務端返回異常 public static final byte ONEWAY = 4;//單向RPC,客戶端請求不要求服務端響應 }
TField表示一個字段,TField結構以下:
public class TField { public TField() { this("", TType.STOP, (short)0); } public TField(String n, byte t, short i) { name = n; type = t; id = i; } public final String name;//字段名 public final byte type;//字段類型 public final short id;//該字段在對象中的序號,與Thrift文件中的序號一致 public String toString() { return "<TField name:'" + name + "' type:" + type + " field-id:" + id + ">"; } public boolean equals(TField otherField) { return type == otherField.type && id == otherField.id; } }
Thrift定義的數據類型
public final class TType { public static final byte STOP = 0; public static final byte VOID = 1; public static final byte BOOL = 2; public static final byte BYTE = 3; public static final byte DOUBLE = 4; public static final byte I16 = 6; public static final byte I32 = 8; public static final byte I64 = 10; public static final byte STRING = 11; public static final byte STRUCT = 12; public static final byte MAP = 13; public static final byte SET = 14; public static final byte LIST = 15; public static final byte ENUM = 16; }
TProcessor是服務端Thrift框架轉入用戶邏輯的關鍵。先看一下類結構圖。TProcessor對TServer中一次請求的InputProtocol和OutputTProtocol進行操做,也就是從InputProtocol中讀出Client的請求數據,向OutputProtcol中寫入用戶邏輯的返回值。
TProcessor接口:只有一個抽象方法process();
TBaseProcessor類:實現了TProcessor接口,給出了process()方法的具體實現;
Processor類:經過thrift文件自動生成的代碼,繼承了TBaseProcessor類,在Processor類內部爲每個方法生成了一個類,如sayHello類;
TProcessorFactory類:一個工廠類,返回一個TProcessor單例;
ProcessFunction類:處理函數抽象類。
HelloService類:由thrift文件自動生成的類,包含內部類Processor類、參數類、返回結果類等等。
public interface TProcessor { public boolean process(TProtocol in, TProtocol out) throws TException; }
public abstract class TBaseProcessor<I> implements TProcessor { private final I iface;//業務邏輯實現的接口,接口中的方法即thrift文件中定義的方法 private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;//處理方法Map,key爲方法名,Value爲方法對象 protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) { this.iface = iface; this.processMap = processFunctionMap; } @Override public boolean process(TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin();//讀出客戶端發的請求 ProcessFunction fn = processMap.get(msg.name);//根據方法名從processMap中找處處理方法 if (fn == null) { //若是找不到該方法,返回異常 TProtocolUtil.skip(in, TType.STRUCT); in.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); x.write(out); out.writeMessageEnd(); out.getTransport().flush(); return true; } fn.process(msg.seqid, in, out, iface);//處理請求 return true; } }
Thrift將方法抽象爲ProcessFunction類,每個方法都會生成一個ProcessFunction類的子類。
public abstract class ProcessFunction<I, T extends TBase> { private final String methodName;//方法名 public ProcessFunction(String methodName) { this.methodName = methodName; } //處理請求 public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException { T args = getEmptyArgsInstance();//獲取一個參數實例(TBase子類的參數實例),由子類實現 try { args.read(iprot);//從iprot中讀取參數,具體實現由thrift文件自動生成(見sayHello_args類) } catch (TProtocolException e) { //讀取參數異常,返回異常 iprot.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.EXCEPTION, seqid)); x.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); return; } iprot.readMessageEnd(); TBase result = getResult(iface, args);//獲取處理結果,此時調用業務邏輯,具體實現由thrift文件自動生成(見sayHello類) oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid)); result.write(oprot);//將result寫到oprot,具體實現由thrift文件自動生成(見sayHello_result類) oprot.writeMessageEnd(); oprot.getTransport().flush(); } //獲取處理結果,調用業務邏輯,由子類實現 protected abstract TBase getResult(I iface, T args) throws TException; //獲取一個參數實例,由子類實現 protected abstract T getEmptyArgsInstance(); public String getMethodName() { return methodName; } }
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName()); public Processor(I iface) { super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>())); } protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { super(iface, getProcessMap(processMap)); } //初始化processMap,在Processor初始化時會調用該方法 private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) { processMap.put("sayHello", new sayHello()); return processMap; } //每一個方法生成一個類 private static class sayHello<I extends Iface> extends org.apache.thrift.ProcessFunction<I, sayHello_args> { public sayHello() { super("sayHello"); } //獲取空參數實例 protected sayHello_args getEmptyArgsInstance() { return new sayHello_args(); } //獲取返回結果 protected sayHello_result getResult(I iface, sayHello_args args) throws org.apache.thrift.TException { sayHello_result result = new sayHello_result(); result.success = iface.sayHello(args.paramJson);//調用業務邏輯接口的sayHello方法, return result; } } }
1)TServer接收到請求後,調用TProcessor的process(TProtocol in, TProtocol out)方法進行處理;
2)TProcessor經過in.readMessageBegin()獲取客戶端請求,並根據請求方法名找到對應的ProcessFunction實例;
3)調用ProcessFunction的process方法;
首先從inTProtocol中讀取參數,
而後經過調用getResult(iface, args)方法調用業務邏輯,獲取到返回結果,
最後將返回結果寫入到outTProtocol。