目標:介紹thrift協議的設計和實現,介紹dubbo-rpc-thrift的源碼。
dubbo集成thrift協議,是基於Thrift來實現的,Thrift是一種輕量級,與語言無關的軟件堆棧,具備用於點對點RPC的相關代碼生成機制。Thrift爲數據傳輸,數據序列化和應用程序級處理提供了清晰的抽象。代碼生成系統採用簡單的定義語言做爲輸入,並跨編程語言生成代碼,使用抽象堆棧構建可互操做的RPC客戶端和服務器。java
該類對輸入流進行操做並寫入某些輸出流。它實現了TProcessor接口,關鍵的方法是process。git
@Override public boolean process(TProtocol in, TProtocol out) throws TException { // 得到十六進制的魔數 short magic = in.readI16(); // 若是不是規定的魔數,則打印錯誤日誌,返回false if (magic != ThriftCodec.MAGIC) { logger.error("Unsupported magic " + magic); return false; } // 得到三十二進制魔數 in.readI32(); // 得到十六進制魔數 in.readI16(); // 得到版本 byte version = in.readByte(); // 得到服務名 String serviceName = in.readString(); // 得到id long id = in.readI64(); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); // 建立基礎運輸TIOStreamTransport對象 TIOStreamTransport transport = new TIOStreamTransport(bos); // 得到協議 TProtocol protocol = protocolFactory.getProtocol(transport); // 從集合中取出處理器 TProcessor processor = processorMap.get(serviceName); // 若是處理器爲空,則打印錯誤,返回false if (processor == null) { logger.error("Could not find processor for service " + serviceName); return false; } // todo if exception // 得到結果 boolean result = processor.process(in, protocol); ByteArrayOutputStream header = new ByteArrayOutputStream(512); // 協議頭的傳輸器 TIOStreamTransport headerTransport = new TIOStreamTransport(header); TProtocol headerProtocol = protocolFactory.getProtocol(headerTransport); // 寫入16進制的魔數 headerProtocol.writeI16(magic); // 寫入32進制的Integer最大值 headerProtocol.writeI32(Integer.MAX_VALUE); // 寫入Short最大值的16進制 headerProtocol.writeI16(Short.MAX_VALUE); // 寫入版本號 headerProtocol.writeByte(version); // 寫入服務名 headerProtocol.writeString(serviceName); // 寫入id headerProtocol.writeI64(id); // 輸出 headerProtocol.getTransport().flush(); out.writeI16(magic); out.writeI32(bos.size() + header.size()); out.writeI16((short) (0xffff & header.size())); out.writeByte(version); out.writeString(serviceName); out.writeI64(id); out.getTransport().write(bos.toByteArray()); out.getTransport().flush(); return result; }
該類是隨機訪問數組的輸出流,比較簡單,我就很少敘述,有興趣的能夠直接看源碼,不看影響也不大。github
@SPI(DubboClassNameGenerator.NAME) public interface ClassNameGenerator { /** * 生成參數的類名 */ public String generateArgsClassName(String serviceName, String methodName); /** * 生成結果的類名 * @param serviceName * @param methodName * @return */ public String generateResultClassName(String serviceName, String methodName); }
該接口是是可擴展接口,定義了兩個方法。有兩個實現類,下面講述。web
該類實現了ClassNameGenerator接口,是dubbo相關的類名生成實現。編程
public class DubboClassNameGenerator implements ClassNameGenerator { public static final String NAME = "dubbo"; @Override public String generateArgsClassName(String serviceName, String methodName) { return ThriftUtils.generateMethodArgsClassName(serviceName, methodName); } @Override public String generateResultClassName(String serviceName, String methodName) { return ThriftUtils.generateMethodResultClassName(serviceName, methodName); } }
該類實現了ClassNameGenerator接口,是Thrift相關的類名生成實現。數組
public class ThriftClassNameGenerator implements ClassNameGenerator { public static final String NAME = "thrift"; @Override public String generateArgsClassName(String serviceName, String methodName) { return ThriftUtils.generateMethodArgsClassNameThrift(serviceName, methodName); } @Override public String generateResultClassName(String serviceName, String methodName) { return ThriftUtils.generateMethodResultClassNameThrift(serviceName, methodName); } }
以上兩個都調用了ThriftUtils中的方法。緩存
該類中封裝的方法比較簡單,就一些字符串的拼接,有興趣的能夠直接查看我下面貼出來的註釋鏈接。服務器
該類是基於Thrift實現的編解碼器。 這裏須要你們看一下該類的註釋,關於協議的數據:網絡
* |<- message header ->|<- message body ->| * +----------------+----------------------+------------------+---------------------------+------------------+ * | magic (2 bytes)|message size (4 bytes)|head size(2 bytes)| version (1 byte) | header | message body | * +----------------+----------------------+------------------+---------------------------+------------------+ * |<-
/** * 消息長度索引 */ public static final int MESSAGE_LENGTH_INDEX = 2; /** * 消息頭長度索引 */ public static final int MESSAGE_HEADER_LENGTH_INDEX = 6; /** * 消息最短長度 */ public static final int MESSAGE_SHORTEST_LENGTH = 10; public static final String NAME = "thrift"; /** * 類名生成參數 */ public static final String PARAMETER_CLASS_NAME_GENERATOR = "class.name.generator"; /** * 版本 */ public static final byte VERSION = (byte) 1; /** * 魔數 */ public static final short MAGIC = (short) 0xdabc; /** * 請求參數集合 */ static final ConcurrentMap<Long, RequestData> cachedRequest = new ConcurrentHashMap<Long, RequestData>(); /** * thrift序列號 */ private static final AtomicInteger THRIFT_SEQ_ID = new AtomicInteger(0); /** * 類緩存 */ private static final ConcurrentMap<String, Class<?>> cachedClass = new ConcurrentHashMap<String, Class<?>>();
@Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { // 若是消息是Request類型 if (message instanceof Request) { // Request類型消息編碼 encodeRequest(channel, buffer, (Request) message); } else if (message instanceof Response) { // Response類型消息編碼 encodeResponse(channel, buffer, (Response) message); } else { throw new UnsupportedOperationException("Thrift codec only support encode " + Request.class.getName() + " and " + Response.class.getName()); } }
該方法是編碼的邏輯,具體的編碼操做根據請求類型不一樣分別調用不一樣的方法。app
private void encodeRequest(Channel channel, ChannelBuffer buffer, Request request) throws IOException { // 得到會話域 RpcInvocation inv = (RpcInvocation) request.getData(); // 得到下一個id int seqId = nextSeqId(); // 得到服務名 String serviceName = inv.getAttachment(Constants.INTERFACE_KEY); // 若是是空的 則拋出異常 if (StringUtils.isEmpty(serviceName)) { throw new IllegalArgumentException("Could not find service name in attachment with key " + Constants.INTERFACE_KEY); } // 建立TMessage對象 TMessage message = new TMessage( inv.getMethodName(), TMessageType.CALL, seqId); // 得到方法參數 String methodArgs = ExtensionLoader.getExtensionLoader(ClassNameGenerator.class) .getExtension(channel.getUrl().getParameter(ThriftConstants.CLASS_NAME_GENERATOR_KEY, ThriftClassNameGenerator.NAME)) .generateArgsClassName(serviceName, inv.getMethodName()); // 若是是空,則拋出異常 if (StringUtils.isEmpty(methodArgs)) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, "Could not encode request, the specified interface may be incorrect."); } // 從緩存中取出類型 Class<?> clazz = cachedClass.get(methodArgs); if (clazz == null) { try { // 從新得到類型 clazz = ClassHelper.forNameWithThreadContextClassLoader(methodArgs); // 加入緩存 cachedClass.putIfAbsent(methodArgs, clazz); } catch (ClassNotFoundException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } // 生成的Thrift對象的通用基接口 TBase args; try { args = (TBase) clazz.newInstance(); } catch (InstantiationException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } // 遍歷參數 for (int i = 0; i < inv.getArguments().length; i++) { Object obj = inv.getArguments()[i]; if (obj == null) { continue; } TFieldIdEnum field = args.fieldForId(i + 1); // 生成set方法名 String setMethodName = ThriftUtils.generateSetMethodName(field.getFieldName()); Method method; try { // 得到方法 method = clazz.getMethod(setMethodName, inv.getParameterTypes()[i]); } catch (NoSuchMethodException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } try { // 調用下一個調用鏈 method.invoke(args, obj); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (InvocationTargetException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } // 建立一個隨機訪問數組輸出流 RandomAccessByteArrayOutputStream bos = new RandomAccessByteArrayOutputStream(1024); // 建立傳輸器 TIOStreamTransport transport = new TIOStreamTransport(bos); // 建立協議 TBinaryProtocol protocol = new TBinaryProtocol(transport); int headerLength, messageLength; byte[] bytes = new byte[4]; try { // 開始編碼 // magic protocol.writeI16(MAGIC); // message length placeholder protocol.writeI32(Integer.MAX_VALUE); // message header length placeholder protocol.writeI16(Short.MAX_VALUE); // version protocol.writeByte(VERSION); // service name protocol.writeString(serviceName); // dubbo request id protocol.writeI64(request.getId()); protocol.getTransport().flush(); // header size headerLength = bos.size(); // 對body內容進行編碼 // message body protocol.writeMessageBegin(message); args.write(protocol); protocol.writeMessageEnd(); protocol.getTransport().flush(); int oldIndex = messageLength = bos.size(); // fill in message length and header length try { TFramedTransport.encodeFrameSize(messageLength, bytes); bos.setWriteIndex(MESSAGE_LENGTH_INDEX); protocol.writeI32(messageLength); bos.setWriteIndex(MESSAGE_HEADER_LENGTH_INDEX); protocol.writeI16((short) (0xffff & headerLength)); } finally { bos.setWriteIndex(oldIndex); } } catch (TException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } buffer.writeBytes(bytes); buffer.writeBytes(bos.toByteArray()); }
該方法是對request類型的消息進行編碼。
private void encodeResponse(Channel channel, ChannelBuffer buffer, Response response) throws IOException { // 得到結果 RpcResult result = (RpcResult) response.getResult(); // 得到請求 RequestData rd = cachedRequest.get(response.getId()); // 得到結果的類名 String resultClassName = ExtensionLoader.getExtensionLoader(ClassNameGenerator.class).getExtension( channel.getUrl().getParameter(ThriftConstants.CLASS_NAME_GENERATOR_KEY, ThriftClassNameGenerator.NAME)) .generateResultClassName(rd.serviceName, rd.methodName); // 若是爲空,則序列化失敗 if (StringUtils.isEmpty(resultClassName)) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, "Could not encode response, the specified interface may be incorrect."); } // 得到類型 Class clazz = cachedClass.get(resultClassName); // 若是爲空,則從新獲取 if (clazz == null) { try { clazz = ClassHelper.forNameWithThreadContextClassLoader(resultClassName); cachedClass.putIfAbsent(resultClassName, clazz); } catch (ClassNotFoundException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } TBase resultObj; try { // 加載該類 resultObj = (TBase) clazz.newInstance(); } catch (InstantiationException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } TApplicationException applicationException = null; TMessage message; // 若是結果有異常拋出 if (result.hasException()) { Throwable throwable = result.getException(); int index = 1; boolean found = false; while (true) { TFieldIdEnum fieldIdEnum = resultObj.fieldForId(index++); if (fieldIdEnum == null) { break; } String fieldName = fieldIdEnum.getFieldName(); String getMethodName = ThriftUtils.generateGetMethodName(fieldName); String setMethodName = ThriftUtils.generateSetMethodName(fieldName); Method getMethod; Method setMethod; try { // 得到get方法 getMethod = clazz.getMethod(getMethodName); // 若是返回類型和異常類型同樣,則建立set方法,而且調用下一個調用鏈 if (getMethod.getReturnType().equals(throwable.getClass())) { found = true; setMethod = clazz.getMethod(setMethodName, throwable.getClass()); setMethod.invoke(resultObj, throwable); } } catch (NoSuchMethodException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (InvocationTargetException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } if (!found) { // 建立TApplicationException異常 applicationException = new TApplicationException(throwable.getMessage()); } } else { // 得到真實的結果 Object realResult = result.getResult(); // result field id is 0 String fieldName = resultObj.fieldForId(0).getFieldName(); String setMethodName = ThriftUtils.generateSetMethodName(fieldName); String getMethodName = ThriftUtils.generateGetMethodName(fieldName); Method getMethod; Method setMethod; try { // 建立get和set方法 getMethod = clazz.getMethod(getMethodName); setMethod = clazz.getMethod(setMethodName, getMethod.getReturnType()); setMethod.invoke(resultObj, realResult); } catch (NoSuchMethodException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (InvocationTargetException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } if (applicationException != null) { message = new TMessage(rd.methodName, TMessageType.EXCEPTION, rd.id); } else { message = new TMessage(rd.methodName, TMessageType.REPLY, rd.id); } RandomAccessByteArrayOutputStream bos = new RandomAccessByteArrayOutputStream(1024); TIOStreamTransport transport = new TIOStreamTransport(bos); TBinaryProtocol protocol = new TBinaryProtocol(transport); int messageLength; int headerLength; //編碼 byte[] bytes = new byte[4]; try { // magic protocol.writeI16(MAGIC); // message length protocol.writeI32(Integer.MAX_VALUE); // message header length protocol.writeI16(Short.MAX_VALUE); // version protocol.writeByte(VERSION); // service name protocol.writeString(rd.serviceName); // id protocol.writeI64(response.getId()); protocol.getTransport().flush(); headerLength = bos.size(); // message protocol.writeMessageBegin(message); switch (message.type) { case TMessageType.EXCEPTION: applicationException.write(protocol); break; case TMessageType.REPLY: resultObj.write(protocol); break; } protocol.writeMessageEnd(); protocol.getTransport().flush(); int oldIndex = messageLength = bos.size(); try { TFramedTransport.encodeFrameSize(messageLength, bytes); bos.setWriteIndex(MESSAGE_LENGTH_INDEX); protocol.writeI32(messageLength); bos.setWriteIndex(MESSAGE_HEADER_LENGTH_INDEX); protocol.writeI16((short) (0xffff & headerLength)); } finally { bos.setWriteIndex(oldIndex); } } catch (TException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } buffer.writeBytes(bytes); buffer.writeBytes(bos.toByteArray()); }
該方法是對response類型的請求消息進行編碼。
@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int available = buffer.readableBytes(); // 若是小於最小的長度,則還須要更多的輸入 if (available < MESSAGE_SHORTEST_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } else { TIOStreamTransport transport = new TIOStreamTransport(new ChannelBufferInputStream(buffer)); TBinaryProtocol protocol = new TBinaryProtocol(transport); short magic; int messageLength; // 對協議頭中的魔數進行比對 try { // protocol.readI32(); // skip the first message length byte[] bytes = new byte[4]; transport.read(bytes, 0, 4); magic = protocol.readI16(); messageLength = protocol.readI32(); } catch (TException e) { throw new IOException(e.getMessage(), e); } if (MAGIC != magic) { throw new IOException("Unknown magic code " + magic); } if (available < messageLength) { return DecodeResult.NEED_MORE_INPUT; } return decode(protocol); } } /** * 解碼 * @param protocol * @return * @throws IOException */ private Object decode(TProtocol protocol) throws IOException { // version String serviceName; long id; TMessage message; try { // 讀取協議頭中對內容 protocol.readI16(); protocol.readByte(); serviceName = protocol.readString(); id = protocol.readI64(); message = protocol.readMessageBegin(); } catch (TException e) { throw new IOException(e.getMessage(), e); } // 若是是回調 if (message.type == TMessageType.CALL) { RpcInvocation result = new RpcInvocation(); // 設置服務名和方法名 result.setAttachment(Constants.INTERFACE_KEY, serviceName); result.setMethodName(message.name); String argsClassName = ExtensionLoader.getExtensionLoader(ClassNameGenerator.class) .getExtension(ThriftClassNameGenerator.NAME).generateArgsClassName(serviceName, message.name); if (StringUtils.isEmpty(argsClassName)) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, "The specified interface name incorrect."); } // 從緩存中得到class類 Class clazz = cachedClass.get(argsClassName); if (clazz == null) { try { // 從新得到class類型 clazz = ClassHelper.forNameWithThreadContextClassLoader(argsClassName); // 加入集合 cachedClass.putIfAbsent(argsClassName, clazz); } catch (ClassNotFoundException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } TBase args; try { args = (TBase) clazz.newInstance(); } catch (InstantiationException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } try { args.read(protocol); protocol.readMessageEnd(); } catch (TException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } // 參數集合 List<Object> parameters = new ArrayList<Object>(); // 參數類型集合 List<Class<?>> parameterTypes = new ArrayList<Class<?>>(); int index = 1; while (true) { TFieldIdEnum fieldIdEnum = args.fieldForId(index++); if (fieldIdEnum == null) { break; } String fieldName = fieldIdEnum.getFieldName(); // 得到方法名 String getMethodName = ThriftUtils.generateGetMethodName(fieldName); Method getMethod; try { getMethod = clazz.getMethod(getMethodName); } catch (NoSuchMethodException e) { throw new RpcException( RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } // 加入參數類型 parameterTypes.add(getMethod.getReturnType()); try { parameters.add(getMethod.invoke(args)); } catch (IllegalAccessException e) { throw new RpcException( RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (InvocationTargetException e) { throw new RpcException( RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } // 設置參數 result.setArguments(parameters.toArray()); // 設置參數類型 result.setParameterTypes(parameterTypes.toArray(new Class[parameterTypes.size()])); // 建立一個新的請求 Request request = new Request(id); // 把結果放入請求中 request.setData(result); // 放入集合中 cachedRequest.putIfAbsent(id, RequestData.create(message.seqid, serviceName, message.name)); return request; // 若是是拋出異常 } else if (message.type == TMessageType.EXCEPTION) { TApplicationException exception; try { // 讀取異常 exception = TApplicationException.read(protocol); protocol.readMessageEnd(); } catch (TException e) { throw new IOException(e.getMessage(), e); } // 建立結果 RpcResult result = new RpcResult(); // 設置異常 result.setException(new RpcException(exception.getMessage())); // 建立Response響應 Response response = new Response(); // 把結果放入 response.setResult(result); // 加入惟一id response.setId(id); return response; // 若是類型是迴應 } else if (message.type == TMessageType.REPLY) { // 得到結果的類名 String resultClassName = ExtensionLoader.getExtensionLoader(ClassNameGenerator.class) .getExtension(ThriftClassNameGenerator.NAME).generateResultClassName(serviceName, message.name); if (StringUtils.isEmpty(resultClassName)) { throw new IllegalArgumentException("Could not infer service result class name from service name " + serviceName + ", the service name you specified may not generated by thrift idl compiler"); } // 得到class類型 Class<?> clazz = cachedClass.get(resultClassName); // 若是爲空,則從新獲取 if (clazz == null) { try { clazz = ClassHelper.forNameWithThreadContextClassLoader(resultClassName); cachedClass.putIfAbsent(resultClassName, clazz); } catch (ClassNotFoundException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } } TBase<?, ? extends TFieldIdEnum> result; try { result = (TBase<?, ?>) clazz.newInstance(); } catch (InstantiationException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } try { result.read(protocol); protocol.readMessageEnd(); } catch (TException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } Object realResult = null; int index = 0; while (true) { TFieldIdEnum fieldIdEnum = result.fieldForId(index++); if (fieldIdEnum == null) { break; } Field field; try { field = clazz.getDeclaredField(fieldIdEnum.getFieldName()); field.setAccessible(true); } catch (NoSuchFieldException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } try { // 得到真實的結果 realResult = field.get(result); } catch (IllegalAccessException e) { throw new RpcException(RpcException.SERIALIZATION_EXCEPTION, e.getMessage(), e); } if (realResult != null) { break; } } // 建立響應 Response response = new Response(); // 設置惟一id response.setId(id); // 建立結果 RpcResult rpcResult = new RpcResult(); // 用RpcResult包裹結果 if (realResult instanceof Throwable) { rpcResult.setException((Throwable) realResult); } else { rpcResult.setValue(realResult); } // 設置結果 response.setResult(rpcResult); return response; } else { // Impossible throw new IOException(); } }
該方法是對解碼的邏輯。對於消息分爲REPLY、EXCEPTION和CALL三種狀況來分別進行解碼。
static class RequestData { /** * 請求id */ int id; /** * 服務名 */ String serviceName; /** * 方法名 */ String methodName; static RequestData create(int id, String sn, String mn) { RequestData result = new RequestData(); result.id = id; result.serviceName = sn; result.methodName = mn; return result; } }
該內部類是請求參數實體。
該類是thrift協議的Invoker實現。
/** * 客戶端集合 */ private final ExchangeClient[] clients; /** * 活躍的客戶端索引 */ private final AtomicPositiveInteger index = new AtomicPositiveInteger(); /** * 銷燬鎖 */ private final ReentrantLock destroyLock = new ReentrantLock(); /** * invoker集合 */ private final Set<Invoker<?>> invokers;
@Override protected Result doInvoke(Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName; // 得到方法名 methodName = invocation.getMethodName(); // 設置附加值 path inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // for thrift codec inv.setAttachment(ThriftCodec.PARAMETER_CLASS_NAME_GENERATOR, getUrl().getParameter( ThriftCodec.PARAMETER_CLASS_NAME_GENERATOR, DubboClassNameGenerator.NAME)); ExchangeClient currentClient; // 若是隻有一個鏈接的客戶端,則直接返回 if (clients.length == 1) { currentClient = clients[0]; } else { // 不然,取出下一個客戶端,循環數組取 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 得到超時時間 int timeout = getUrl().getMethodParameter( methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); RpcContext.getContext().setFuture(null); // 發起請求 return (Result) currentClient.request(inv, timeout).get(); } catch (TimeoutException e) { // 拋出超時異常 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e); } catch (RemotingException e) { // 拋出網絡異常 throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e); } }
該方法是thrift協議的調用鏈處理邏輯。
該類是thrift協議的主要實現邏輯,分別實現了服務引用和服務調用的邏輯。
/** * 默認端口號 */ public static final int DEFAULT_PORT = 40880; /** * 擴展名 */ public static final String NAME = "thrift"; // ip:port -> ExchangeServer /** * 服務集合,key爲ip:port */ private final ConcurrentMap<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); private ExchangeHandler handler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object msg) throws RemotingException { // 若是消息是Invocation類型的 if (msg instanceof Invocation) { Invocation inv = (Invocation) msg; // 得到服務名 String serviceName = inv.getAttachments().get(Constants.INTERFACE_KEY); // 得到服務的key String serviceKey = serviceKey(channel.getLocalAddress().getPort(), serviceName, null, null); // 從集合中得到暴露者 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); // 若是暴露者爲空,則拋出異常 if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + msg); } // 設置遠程地址 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return exporter.getInvoker().invoke(inv); } // 不然拋出異常,不支持的請求消息 throw new RemotingException(channel, "Unsupported request: " + (msg.getClass().getName() + ": " + msg) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override public void received(Channel channel, Object message) throws RemotingException { // 若是消息是Invocation類型,則調用reply,不然接收消息 if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } };
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // can use thrift codec only // 只能使用thrift編解碼器 URL url = invoker.getUrl().addParameter(Constants.CODEC_KEY, ThriftCodec.NAME); // find server. // 得到服務地址 String key = url.getAddress(); // client can expose a service for server to invoke only. // 客戶端能夠爲服務器暴露服務以僅調用 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer && !serverMap.containsKey(key)) { // 加入到集合 serverMap.put(key, getServer(url)); } // export service. // 獲得服務key key = serviceKey(url); // 建立暴露者 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 加入集合 exporterMap.put(key, exporter); return exporter; }
該方法是服務暴露的邏輯實現。
@Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 建立ThriftInvoker ThriftInvoker<T> invoker = new ThriftInvoker<T>(type, url, getClients(url), invokers); // 加入到集合 invokers.add(invoker); return invoker; }
該方法是服務引用的邏輯實現。
private ExchangeClient[] getClients(URL url) { // 得到鏈接數 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 1); // 建立客戶端集合 ExchangeClient[] clients = new ExchangeClient[connections]; // 建立客戶端 for (int i = 0; i < clients.length; i++) { clients[i] = initClient(url); } return clients; }
該方法是得到客戶端集合。
private ExchangeClient initClient(URL url) { ExchangeClient client; // 加上編解碼器 url = url.addParameter(Constants.CODEC_KEY, ThriftCodec.NAME); try { // 建立客戶端 client = Exchangers.connect(url); } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
該方法是建立客戶端的邏輯。
private ExchangeServer getServer(URL url) { // enable sending readonly event when server closes by default // 加入只讀事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // 得到服務的實現方式 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 若是該實現方式不是dubbo支持的方式,則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); ExchangeServer server; try { // 得到服務器 server = Exchangers.bind(url, handler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 得到實現方式 str = url.getParameter(Constants.CLIENT_KEY); // 若是客戶端實現方式不是dubbo支持的方式,則拋出異常。 if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
該方法是得到server的邏輯實現。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關於thrift協議實現的部分,要對Thrift。接下來我將開始對rpc模塊關於webservice協議部分進行講解。