目標:介紹dubbo中集羣的Mock,介紹dubbo-cluster下關於服務降級和本地假裝的源碼。
本文講解兩塊內容,分別是本地假裝和服務降級,本地假裝一般用於服務降級,好比某驗權服務,當服務提供方所有掛掉後,客戶端不拋出異常,而是經過 Mock 數據返回受權失敗。而服務降級則是臨時屏蔽某個出錯的非關鍵服務,並定義降級後的返回策略。java
public class MockClusterWrapper implements Cluster { private Cluster cluster; public MockClusterWrapper(Cluster cluster) { this.cluster = cluster; } @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { // 建立MockClusterInvoker return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } }
該類是服務降級的裝飾器類,對Cluster進行了功能加強,加強了服務降級的功能。git
該類是服務降級中定義降級後的返回策略的實現。github
private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class); /** * 目錄 */ private final Directory<T> directory; /** * invoker對象 */ private final Invoker<T> invoker;
@Override public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 得到 "mock" 配置項,有多種配置方式 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); // 若是沒有mock if (value.length() == 0 || value.equalsIgnoreCase("false")) { //no mock // 直接調用 result = this.invoker.invoke(invocation); // 若是強制服務降級 } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock // 直接調用 Mock Invoker ,執行本地 Mock 邏輯 result = doMockInvoke(invocation, null); } else { //fail-mock // 失敗服務降級 try { // 不然正常調用 result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } // 若是調用失敗,則服務降級 result = doMockInvoke(invocation, e); } } } return result; }
該方法是定義降級後的返回策略的實現,根據配置的不一樣來決定不用降級仍是強制服務降級仍是失敗後再服務降級。數組
@SuppressWarnings({"unchecked", "rawtypes"}) private Result doMockInvoke(Invocation invocation, RpcException e) { Result result = null; Invoker<T> minvoker; // 路由匹配 Mock Invoker 集合 List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); // 若是mockInvokers爲空,則建立一個MockInvoker if (mockInvokers == null || mockInvokers.isEmpty()) { // 建立一個MockInvoker minvoker = (Invoker<T>) new MockInvoker(directory.getUrl()); } else { // 取出第一個 minvoker = mockInvokers.get(0); } try { // 調用invoke result = minvoker.invoke(invocation); } catch (RpcException me) { // 若是拋出異常,則返回異常結果 if (me.isBiz()) { result = new RpcResult(me.getCause()); } else { throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); } } catch (Throwable me) { throw new RpcException(getMockExceptionMessage(e, me), me.getCause()); } return result; }
該方法是執行本地Mock,服務降級。緩存
private List<Invoker<T>> selectMockInvoker(Invocation invocation) { List<Invoker<T>> invokers = null; //TODO generic invoker? if (invocation instanceof RpcInvocation) { //Note the implicit contract (although the description is added to the interface declaration, but extensibility is a problem. The practice placed in the attachement needs to be improved) // 注意隱式契約(儘管描述被添加到接口聲明中,可是可擴展性是一個問題。附件中的作法須要改進) ((RpcInvocation) invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString()); //directory will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return. // 若是調用中存在Constants.INVOCATION_NEED_MOCK,則目錄將返回正常調用者列表,不然,將返回模擬調用者列表。 try { invokers = directory.list(invocation); } catch (RpcException e) { if (logger.isInfoEnabled()) { logger.info("Exception when try to invoke mock. Get mock invokers error for service:" + directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName() + ", will contruct a new mock with 'new MockInvoker()'.", e); } } } return invokers; }
該方法是路由匹配 Mock Invoker 集合。app
該類是路由選擇器實現類。ide
@Override public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, URL url, final Invocation invocation) throws RpcException { // 若是附加值爲空,則直接 if (invocation.getAttachments() == null) { // 得到普通的invoker集合 return getNormalInvokers(invokers); } else { // 得到是否須要降級的值 String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK); // 若是爲空,則得到普通的Invoker集合 if (value == null) return getNormalInvokers(invokers); else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { // 得到MockedInvoker集合 return getMockedInvokers(invokers); } } return invokers; }
該方法是根據配置來決定選擇普通的invoker集合仍是mockInvoker集合。源碼分析
private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) { // 若是沒有MockedInvoker,則返回null if (!hasMockProviders(invokers)) { return null; } // 找到MockedInvoker,往sInvokers中加入,而且返回 List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1); for (Invoker<T> invoker : invokers) { if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { sInvokers.add(invoker); } } return sInvokers; }
該方法是得到MockedInvoker集合。this
private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) { // 若是沒有MockedInvoker,則返回普通的Invoker 集合 if (!hasMockProviders(invokers)) { return invokers; } else { // 不然 去除MockedInvoker,把普通的Invoker 集合返回 List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size()); for (Invoker<T> invoker : invokers) { // 把不是MockedInvoker的invoker加入sInvokers,返回 if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { sInvokers.add(invoker); } } return sInvokers; } }
該方法是得到普通的Invoker集合,不包含mock的。url
private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) { boolean hasMockProvider = false; for (Invoker<T> invoker : invokers) { // 若是有一個是MockInvoker,則返回true if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { hasMockProvider = true; break; } } return hasMockProvider; }
該方法是判斷是否有MockInvoker。
以上三個類是對服務降級功能的實現,下面兩個類是對本地假裝的實現。
該類實現了AbstractProtocol接口,是服務
final public class MockProtocol extends AbstractProtocol { @Override public int getDefaultPort() { return 0; } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { throw new UnsupportedOperationException(); } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 建立MockInvoker return new MockInvoker<T>(url); } }
本地假裝的invoker實現類。
/** * 代理工廠 */ private final static ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); /** * mock 與 Invoker 對象的映射緩存 */ private final static Map<String, Invoker<?>> mocks = new ConcurrentHashMap<String, Invoker<?>>(); /** * 異常集合 */ private final static Map<String, Throwable> throwables = new ConcurrentHashMap<String, Throwable>(); /** * url對象 */ private final URL url;
public static Object parseMockValue(String mock, Type[] returnTypes) throws Exception { Object value = null; // 若是mock爲empty,則 if ("empty".equals(mock)) { // 得到空的對象 value = ReflectUtils.getEmptyObject(returnTypes != null && returnTypes.length > 0 ? (Class<?>) returnTypes[0] : null); } else if ("null".equals(mock)) { // 若是爲null,則返回null value = null; } else if ("true".equals(mock)) { // 若是爲true,則返回true value = true; } else if ("false".equals(mock)) { // 若是爲false,則返回false value = false; } else if (mock.length() >= 2 && (mock.startsWith("\"") && mock.endsWith("\"") || mock.startsWith("\'") && mock.endsWith("\'"))) { // 使用 '' 或 "" 的字符串,截取掉頭尾 value = mock.subSequence(1, mock.length() - 1); } else if (returnTypes != null && returnTypes.length > 0 && returnTypes[0] == String.class) { // 字符串 value = mock; } else if (StringUtils.isNumeric(mock)) { // 是數字 value = JSON.parse(mock); } else if (mock.startsWith("{")) { // 是map類型的 value = JSON.parseObject(mock, Map.class); } else if (mock.startsWith("[")) { // 是數組類型 value = JSON.parseObject(mock, List.class); } else { value = mock; } if (returnTypes != null && returnTypes.length > 0) { value = PojoUtils.realize(value, (Class<?>) returnTypes[0], returnTypes.length > 1 ? returnTypes[1] : null); } return value; }
該方法是解析mock值
@Override public Result invoke(Invocation invocation) throws RpcException { // 得到 `"mock"` 配置項,方法級 > 類級 String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(this); } // 若是mock爲空 if (StringUtils.isBlank(mock)) { // 得到mock值 mock = getUrl().getParameter(Constants.MOCK_KEY); } // 若是仍是爲空。則拋出異常 if (StringUtils.isBlank(mock)) { throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url)); } // 標準化 `"mock"` 配置項 mock = normalizeMock(URL.decode(mock)); // 等於 "return " ,返回值爲空的 RpcResult 對象 if (mock.startsWith(Constants.RETURN_PREFIX)) { // 分割 mock = mock.substring(Constants.RETURN_PREFIX.length()).trim(); try { // 得到返回類型 Type[] returnTypes = RpcUtils.getReturnTypes(invocation); // 解析mock值 Object value = parseMockValue(mock, returnTypes); return new RpcResult(value); } catch (Exception ew) { throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew); } // 若是是throw } else if (mock.startsWith(Constants.THROW_PREFIX)) { // 根據throw分割 mock = mock.substring(Constants.THROW_PREFIX.length()).trim(); // 若是爲空,則拋出異常 if (StringUtils.isBlank(mock)) { throw new RpcException("mocked exception for service degradation."); } else { // user customized class // 建立自定義異常 Throwable t = getThrowable(mock); // 拋出業務類型的 RpcException 異常 throw new RpcException(RpcException.BIZ_EXCEPTION, t); } } else { //impl mock try { // 不然直接得到invoker Invoker<T> invoker = getInvoker(mock); // 調用 return invoker.invoke(invocation); } catch (Throwable t) { throw new RpcException("Failed to create mock implementation class " + mock, t); } } }
該方法是本地假裝的核心實現,mock分三種,分別是return、throw、自定義的mock類。
public static String normalizeMock(String mock) { // 若爲空,直接返回 if (mock == null) { return mock; } mock = mock.trim(); if (mock.length() == 0) { return mock; } if (Constants.RETURN_KEY.equalsIgnoreCase(mock)) { return Constants.RETURN_PREFIX + "null"; } // 若果爲 "true" "default" "fail" "force" 四種字符串,返回default if (ConfigUtils.isDefault(mock) || "fail".equalsIgnoreCase(mock) || "force".equalsIgnoreCase(mock)) { return "default"; } // fail:throw/return foo => throw/return if (mock.startsWith(Constants.FAIL_PREFIX)) { mock = mock.substring(Constants.FAIL_PREFIX.length()).trim(); } // force:throw/return foo => throw/return if (mock.startsWith(Constants.FORCE_PREFIX)) { mock = mock.substring(Constants.FORCE_PREFIX.length()).trim(); } // 若是是return或者throw,替換`爲" if (mock.startsWith(Constants.RETURN_PREFIX) || mock.startsWith(Constants.THROW_PREFIX)) { mock = mock.replace('`', '"'); } return mock; }
該方法是規範化mock值。
public static Throwable getThrowable(String throwstr) { // 從異常集合中取出異常 Throwable throwable = throwables.get(throwstr); // 若是不爲空,則拋出異常 if (throwable != null) { return throwable; } try { Throwable t; // 得到異常類 Class<?> bizException = ReflectUtils.forName(throwstr); Constructor<?> constructor; // 得到構造方法 constructor = ReflectUtils.findConstructor(bizException, String.class); // 建立 Throwable 對象 t = (Throwable) constructor.newInstance(new Object[]{"mocked exception for service degradation."}); // 添加到緩存中 if (throwables.size() < 1000) { throwables.put(throwstr, t); } return t; } catch (Exception e) { throw new RpcException("mock throw error :" + throwstr + " argument error.", e); } }
該方法是得到異常。
private Invoker<T> getInvoker(String mockService) { // 從緩存中,得到 Invoker 對象,若是有,直接緩存。 Invoker<T> invoker = (Invoker<T>) mocks.get(mockService); if (invoker != null) { return invoker; } // 得到服務類型 Class<T> serviceType = (Class<T>) ReflectUtils.forName(url.getServiceInterface()); // 得到MockObject T mockObject = (T) getMockObject(mockService, serviceType); // 建立invoker invoker = proxyFactory.getInvoker(mockObject, serviceType, url); if (mocks.size() < 10000) { // 加入集合 mocks.put(mockService, invoker); } return invoker; }
該方法是得到invoker。
public static Object getMockObject(String mockService, Class serviceType) { if (ConfigUtils.isDefault(mockService)) { mockService = serviceType.getName() + "Mock"; } // 得到類型 Class<?> mockClass = ReflectUtils.forName(mockService); if (!serviceType.isAssignableFrom(mockClass)) { throw new IllegalStateException("The mock class " + mockClass.getName() + " not implement interface " + serviceType.getName()); } try { // 初始化 return mockClass.newInstance(); } catch (InstantiationException e) { throw new IllegalStateException("No default constructor from mock class " + mockClass.getName(), e); } catch (IllegalAccessException e) { throw new IllegalStateException(e); } }
該方法是得到mock對象。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了集羣中關於mock實現的部分,到這裏爲止,集羣部分就所有講完了,這是2.6.x版本的集羣,那在2.7中對於路由和配置規則都有相應的大改動,我會在以後2.7版本的講解中講到。接下來我將開始對序列化模塊進行講解。