目標:介紹dubbo-rpc-api中的各類filter過濾器的實現邏輯。
本文會介紹在dubbo中的過濾器,先來看看下面的圖:java
能夠看到紅色圈圈不服,在服務發現和服務引用中都會進行一些過濾器過濾。具體有哪些過濾器,就看下面的介紹。git
該過濾器是對記錄日誌的過濾器,它所作的工做就是把引用服務或者暴露服務的調用鏈信息寫入到文件中。日誌消息先被放入日誌集合,而後加入到日誌隊列,而後被放入到寫入文件到任務中,最後進入文件。github
private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class); /** * 日誌訪問名稱,默認的日誌訪問名稱 */ private static final String ACCESS_LOG_KEY = "dubbo.accesslog"; /** * 日期格式 */ private static final String FILE_DATE_FORMAT = "yyyyMMdd"; private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; /** * 日誌隊列大小 */ private static final int LOG_MAX_BUFFER = 5000; /** * 日誌輸出的頻率 */ private static final long LOG_OUTPUT_INTERVAL = 5000; /** * 日誌隊列 key爲訪問日誌的名稱,value爲該日誌名稱對應的日誌集合 */ private final ConcurrentMap<String, Set<String>> logQueue = new ConcurrentHashMap<String, Set<String>>(); /** * 日誌線程池 */ private final ScheduledExecutorService logScheduled = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Dubbo-Access-Log", true)); /** * 日誌記錄任務 */ private volatile ScheduledFuture<?> logFuture = null;
按照我上面講到日誌流向,日誌先進入到是日誌隊列中的日誌集合,再進入logQueue,在進入logFuture,最後落地到文件。json
private void init() { // synchronized是一個重操做消耗性能,全部加上判空 if (logFuture == null) { synchronized (logScheduled) { // 爲了避免重複初始化 if (logFuture == null) { // 建立日誌記錄任務 logFuture = logScheduled.scheduleWithFixedDelay(new LogTask(), LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS); } } } }
該方法是初始化方法,就建立了日誌記錄任務。segmentfault
private void log(String accesslog, String logmessage) { init(); Set<String> logSet = logQueue.get(accesslog); if (logSet == null) { logQueue.putIfAbsent(accesslog, new ConcurrentHashSet<String>()); logSet = logQueue.get(accesslog); } if (logSet.size() < LOG_MAX_BUFFER) { logSet.add(logmessage); } }
該方法是增長日誌信息到日誌集合中。api
@Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { try { // 得到日誌名稱 String accesslog = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY); if (ConfigUtils.isNotEmpty(accesslog)) { // 得到rpc上下文 RpcContext context = RpcContext.getContext(); // 得到調用的接口名稱 String serviceName = invoker.getInterface().getName(); // 得到版本號 String version = invoker.getUrl().getParameter(Constants.VERSION_KEY); // 得到組,是消費者側仍是生產者側 String group = invoker.getUrl().getParameter(Constants.GROUP_KEY); StringBuilder sn = new StringBuilder(); sn.append("[").append(new SimpleDateFormat(MESSAGE_DATE_FORMAT).format(new Date())).append("] ").append(context.getRemoteHost()).append(":").append(context.getRemotePort()) .append(" -> ").append(context.getLocalHost()).append(":").append(context.getLocalPort()) .append(" - "); // 拼接組 if (null != group && group.length() > 0) { sn.append(group).append("/"); } // 拼接服務名稱 sn.append(serviceName); // 拼接版本號 if (null != version && version.length() > 0) { sn.append(":").append(version); } sn.append(" "); // 拼接方法名 sn.append(inv.getMethodName()); sn.append("("); // 拼接參數類型 Class<?>[] types = inv.getParameterTypes(); // 拼接參數類型 if (types != null && types.length > 0) { boolean first = true; for (Class<?> type : types) { if (first) { first = false; } else { sn.append(","); } sn.append(type.getName()); } } sn.append(") "); // 拼接參數 Object[] args = inv.getArguments(); if (args != null && args.length > 0) { sn.append(JSON.toJSONString(args)); } String msg = sn.toString(); // 若是用默認的日誌訪問名稱 if (ConfigUtils.isDefault(accesslog)) { LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + invoker.getInterface().getName()).info(msg); } else { // 把日誌加入集合 log(accesslog, msg); } } } catch (Throwable t) { logger.warn("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")", t); } // 調用下一個調用鏈 return invoker.invoke(inv); }
該方法是最重要的方法,從拼接了日誌信息,把日誌加入到集合,而且調用下一個調用鏈。數組
private class LogTask implements Runnable { @Override public void run() { try { if (logQueue != null && logQueue.size() > 0) { // 遍歷日誌隊列 for (Map.Entry<String, Set<String>> entry : logQueue.entrySet()) { try { // 得到日誌名稱 String accesslog = entry.getKey(); // 得到日誌集合 Set<String> logSet = entry.getValue(); // 若是文件不存在則建立文件 File file = new File(accesslog); File dir = file.getParentFile(); if (null != dir && !dir.exists()) { dir.mkdirs(); } if (logger.isDebugEnabled()) { logger.debug("Append log to " + accesslog); } if (file.exists()) { // 得到如今的時間 String now = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date()); // 得到文件最後一次修改的時間 String last = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date(file.lastModified())); // 若是文件最後一次修改的時間不等於如今的時間 if (!now.equals(last)) { // 得到從新生成文件名稱 File archive = new File(file.getAbsolutePath() + "." + last); // 由於都是file的絕對路徑,因此沒有進行移動文件,而是修改文件名 file.renameTo(archive); } } // 把日誌集合中的日誌寫入到文件 FileWriter writer = new FileWriter(file, true); try { for (Iterator<String> iterator = logSet.iterator(); iterator.hasNext(); iterator.remove()) { writer.write(iterator.next()); writer.write("\r\n"); } writer.flush(); } finally { writer.close(); } } catch (Exception e) { logger.error(e.getMessage(), e); } } } } catch (Exception e) { logger.error(e.getMessage(), e); } } }
該內部類實現了Runnable,是把日誌消息落地到文件到線程。服務器
該類時對於每一個服務的每一個方法的最大可並行調用數量限制的過濾器,它是在服務消費者側的過濾。數據結構
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到url對象 URL url = invoker.getUrl(); // 得到方法名稱 String methodName = invocation.getMethodName(); // 得到併發調用數(單個服務的單個方法),默認爲0 int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); // 經過方法名來得到對應的狀態 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) { // 得到該方法調用的超時次數 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); // 得到系統時間 long start = System.currentTimeMillis(); long remain = timeout; // 得到該方法的調用數量 int active = count.getActive(); // 若是活躍數量大於等於最大的併發調用數量 if (active >= max) { synchronized (count) { // 當活躍數量大於等於最大的併發調用數量時一直循環 while ((active = count.getActive()) >= max) { try { // 等待超時時間 count.wait(remain); } catch (InterruptedException e) { } // 得到累計時間 long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; // 若是累計時間大於超時時間,則拋出異常 if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { // 得到系統時間做爲開始時間 long begin = System.currentTimeMillis(); // 開始計數 RpcStatus.beginCount(url, methodName); try { // 調用後面的調用鏈,若是沒有拋出異常,則算成功 Result result = invoker.invoke(invocation); // 結束計數,記錄時間 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if (max > 0) { synchronized (count) { // 喚醒count count.notify(); } } } }
該類只有這一個方法。該過濾器是用來限制調用數量,先進行調用數量的檢測,若是沒有到達最大的調用數量,則先調用後面的調用鏈,若是在後面的調用鏈失敗,則記錄相關時間,若是成功也記錄相關時間和調用次數。併發
該過濾器是作類加載器切換的。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到當前的類加載器 ClassLoader ocl = Thread.currentThread().getContextClassLoader(); // 設置invoker攜帶的服務的類加載器 Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader()); try { // 調用下面的調用鏈 return invoker.invoke(invocation); } finally { // 最後切換回原來的類加載器 Thread.currentThread().setContextClassLoader(ocl); } }
能夠看到先切換成當前的線程鎖攜帶的類加載器,而後調用結束後,再切換回原先的類加載器。
該過濾器是作兼容性的過濾器。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 調用下一個調用鏈 Result result = invoker.invoke(invocation); // 若是方法前面沒有$或者結果沒有異常 if (!invocation.getMethodName().startsWith("$") && !result.hasException()) { Object value = result.getValue(); if (value != null) { try { // 得到方法 Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); // 得到返回的數據類型 Class<?> type = method.getReturnType(); Object newValue; // 序列化方法 String serialization = invoker.getUrl().getParameter(Constants.SERIALIZATION_KEY); // 若是是json或者fastjson形式 if ("json".equals(serialization) || "fastjson".equals(serialization)) { // 得到方法的泛型返回值類型 Type gtype = method.getGenericReturnType(); // 把數據結果進行類型轉化 newValue = PojoUtils.realize(value, type, gtype); // 若是value不是type類型 } else if (!type.isInstance(value)) { // 若是是pojo,則,轉化爲type類型,若是不是,則進行兼容類型轉化。 newValue = PojoUtils.isPojo(type) ? PojoUtils.realize(value, type) : CompatibleTypeUtils.compatibleTypeConvert(value, type); } else { newValue = value; } // 從新設置RpcResult的result if (newValue != value) { result = new RpcResult(newValue); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } return result; }
能夠看到對於調用鏈的返回結果,若是返回值類型和返回值不同的時候,就須要作兼容類型的轉化。從新把結果放入RpcResult,返回。
該過濾器作的是在當前的RpcContext中記錄本地調用的一次狀態信息。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 設置rpc上下文 RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // 若是該會話域是rpc會話域 if (invocation instanceof RpcInvocation) { // 設置實體域 ((RpcInvocation) invocation).setInvoker(invoker); } try { // 調用下個調用鏈 RpcResult result = (RpcResult) invoker.invoke(invocation); // 設置附加值 RpcContext.getServerContext().setAttachments(result.getAttachments()); return result; } finally { // 狀況附加值 RpcContext.getContext().clearAttachments(); } }
能夠看到RpcContext記錄了一次調用狀態信息,而後先調用後面的調用鏈,再回來把附加值設置到RpcContext中。而後返回RpcContext,再清空,這樣是由於後面的調用鏈中的附加值對前面的調用鏈是不可見的。
該過濾器作的是初始化rpc上下文。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到會話域的附加值 Map<String, String> attachments = invocation.getAttachments(); // 刪除異步屬性以免傳遞給如下調用鏈 if (attachments != null) { attachments = new HashMap<String, String>(attachments); attachments.remove(Constants.PATH_KEY); attachments.remove(Constants.GROUP_KEY); attachments.remove(Constants.VERSION_KEY); attachments.remove(Constants.DUBBO_VERSION_KEY); attachments.remove(Constants.TOKEN_KEY); attachments.remove(Constants.TIMEOUT_KEY); attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain. } // 在rpc上下文添加上一個調用鏈的信息 RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) // .setAttachments(attachments) // merged from dubbox .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // mreged from dubbox // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol) if (attachments != null) { // 把會話域中的附加值所有加入RpcContext中 if (RpcContext.getContext().getAttachments() != null) { RpcContext.getContext().getAttachments().putAll(attachments); } else { RpcContext.getContext().setAttachments(attachments); } } // 若是會話域屬於rpc的會話域,則設置實體域 if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { // 調用下一個調用鏈 RpcResult result = (RpcResult) invoker.invoke(invocation); // pass attachments to result 把附加值加入到RpcResult result.addAttachments(RpcContext.getServerContext().getAttachments()); return result; } finally { // 移除本地的上下文 RpcContext.removeContext(); // 清空附加值 RpcContext.getServerContext().clearAttachments(); } }
在《 dubbo源碼解析(十九)遠程調用——開篇》中我已經介紹了RpcContext的做用,角色。該過濾器就是作了初始化RpcContext的做用。
該過濾器的做用是調用了廢棄的方法時打印錯誤日誌。
private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedFilter.class); /** * 日誌集合 */ private static final Set<String> logged = new ConcurrentHashSet<String>(); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到key 服務+方法 String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); // 若是集合中沒有該key if (!logged.contains(key)) { // 則加入集合 logged.add(key); // 若是該服務方法是廢棄的,則打印錯誤日誌 if (invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.DEPRECATED_KEY, false)) { LOGGER.error("The service method " + invoker.getInterface().getName() + "." + getMethodSignature(invocation) + " is DEPRECATED! Declare from " + invoker.getUrl()); } } // 調用下一個調用鏈 return invoker.invoke(invocation); } /** * 得到方法定義 * @param invocation * @return */ private String getMethodSignature(Invocation invocation) { // 方法名 StringBuilder buf = new StringBuilder(invocation.getMethodName()); buf.append("("); // 參數類型 Class<?>[] types = invocation.getParameterTypes(); // 拼接參數 if (types != null && types.length > 0) { boolean first = true; for (Class<?> type : types) { if (first) { first = false; } else { buf.append(", "); } buf.append(type.getSimpleName()); } } buf.append(")"); return buf.toString(); }
該過濾器比較簡單。
該過濾器是處理回聲測試的方法。
@Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { // 若是調用的方法是回聲測試的方法 則直接返回結果,不然 調用下一個調用鏈 if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) return new RpcResult(inv.getArguments()[0]); return invoker.invoke(inv); }
若是調用的方法是回聲測試的方法 則直接返回結果,不然 調用下一個調用鏈。
該過濾器是做用是對異常的處理。
private final Logger logger; public ExceptionFilter() { this(LoggerFactory.getLogger(ExceptionFilter.class)); } public ExceptionFilter(Logger logger) { this.logger = logger; } @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { try { // 調用下一個調用鏈,返回結果 Result result = invoker.invoke(invocation); // 若是結果有異常,而且該服務不是一個泛化調用 if (result.hasException() && GenericService.class != invoker.getInterface()) { try { // 得到異常 Throwable exception = result.getException(); // directly throw if it's checked exception // 若是這是一個checked的異常,則直接返回異常,也就是接口上聲明的Unchecked的異常 if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) { return result; } // directly throw if the exception appears in the signature // 若是已經在接口方法上聲明瞭該異常,則直接返回 try { // 得到方法 Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); // 得到異常類型 Class<?>[] exceptionClassses = method.getExceptionTypes(); for (Class<?> exceptionClass : exceptionClassses) { if (exception.getClass().equals(exceptionClass)) { return result; } } } catch (NoSuchMethodException e) { return result; } // for the exception not found in method's signature, print ERROR message in server's log. // 打印錯誤 該異常沒有在方法上申明 logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception); // directly throw if exception class and interface class are in the same jar file. // 若是異常類和接口類在同一個jar包裏面,則拋出異常 String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface()); String exceptionFile = ReflectUtils.getCodeBase(exception.getClass()); if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) { return result; } // directly throw if it's JDK exception // 若是是jdk中定義的異常,則直接拋出 String className = exception.getClass().getName(); if (className.startsWith("java.") || className.startsWith("javax.")) { return result; } // directly throw if it's dubbo exception // 若是 是dubbo的異常,則直接拋出 if (exception instanceof RpcException) { return result; } // otherwise, wrap with RuntimeException and throw back to the client // 若是不是以上的異常,則包裝成爲RuntimeException而且拋出 return new RpcResult(new RuntimeException(StringUtils.toString(exception))); } catch (Throwable e) { logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e); return result; } } return result; } catch (RuntimeException e) { logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e); throw e; } }
能夠看到除了接口上聲明的Unchecked的異常和有定義的異常外,都會包裝成RuntimeException來返回,爲了防止客戶端反序列化失敗。
該過濾器是限制最大可並行執行請求數,該過濾器是服務提供者側,而上述講到的ActiveLimitFilter是在消費者側的限制。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到url對象 URL url = invoker.getUrl(); // 方法名稱 String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); // 若是該方法設置了executes而且值大於0 if (max > 0) { // 得到該方法對應的RpcStatus RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // if (count.getActive() >= max) { /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */ // 得到信號量 executesLimit = count.getSemaphore(max); // 若是不能得到許可,則拋出異常 if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; // 計數加1 RpcStatus.beginCount(url, methodName); try { // 調用下一個調用鏈 Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { // 計數減1 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } }
爲何這裏須要用到信號量來控制,能夠看一下如下連接的介紹:http://manzhizhen.iteye.com/b...
該過濾器就是對於泛化調用的請求和結果進行反序列化和序列化的操做,它是服務提供者側的。
@Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { // 若是是泛化調用 if (inv.getMethodName().equals(Constants.$INVOKE) && inv.getArguments() != null && inv.getArguments().length == 3 && !invoker.getInterface().equals(GenericService.class)) { // 得到請求名字 String name = ((String) inv.getArguments()[0]).trim(); // 得到請求參數類型 String[] types = (String[]) inv.getArguments()[1]; // 得到請求參數 Object[] args = (Object[]) inv.getArguments()[2]; try { // 得到方法 Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types); // 得到該方法的參數類型 Class<?>[] params = method.getParameterTypes(); if (args == null) { args = new Object[params.length]; } // 得到附加值 String generic = inv.getAttachment(Constants.GENERIC_KEY); // 若是附加值爲空,在用上下文攜帶的附加值 if (StringUtils.isBlank(generic)) { generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY); } // 若是附加值仍是爲空或者是默認的泛化序列化類型 if (StringUtils.isEmpty(generic) || ProtocolUtils.isDefaultGenericSerialization(generic)) { // 直接進行類型轉化 args = PojoUtils.realize(args, params, method.getGenericParameterTypes()); } else if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (byte[].class == args[i].getClass()) { try { UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]); // 使用nativejava方式反序列化 args[i] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA) .deserialize(null, is).readObject(); } catch (Exception e) { throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e); } } else { throw new RpcException( "Generic serialization [" + Constants.GENERIC_SERIALIZATION_NATIVE_JAVA + "] only support message type " + byte[].class + " and your message type is " + args[i].getClass()); } } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (args[i] instanceof JavaBeanDescriptor) { // 用JavaBean方式反序列化 args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]); } else { throw new RpcException( "Generic serialization [" + Constants.GENERIC_SERIALIZATION_BEAN + "] only support message type " + JavaBeanDescriptor.class.getName() + " and your message type is " + args[i].getClass().getName()); } } } // 調用下一個調用鏈 Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments())); if (result.hasException() && !(result.getException() instanceof GenericException)) { return new RpcResult(new GenericException(result.getException())); } if (ProtocolUtils.isJavaGenericSerialization(generic)) { try { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512); // 用nativejava方式序列化 ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA) .serialize(null, os).writeObject(result.getValue()); return new RpcResult(os.toByteArray()); } catch (IOException e) { throw new RpcException("Serialize result failed.", e); } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { // 使用JavaBean方式序列化返回結果 return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD)); } else { // 直接轉化爲pojo類型而後返回 return new RpcResult(PojoUtils.generalize(result.getValue())); } } catch (NoSuchMethodException e) { throw new RpcException(e.getMessage(), e); } catch (ClassNotFoundException e) { throw new RpcException(e.getMessage(), e); } } // 調用下一個調用鏈 return invoker.invoke(inv); }
該過濾器也是對於泛化調用的序列化檢查和處理,它是消費者側的過濾器。
private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class); /** * 參數集合 */ private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class}; @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到泛化的值 String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY); // 若是該值是nativejava或者bean或者true,而且不是一個返回調用 if (ProtocolUtils.isGeneric(generic) && !Constants.$INVOKE.equals(invocation.getMethodName()) && invocation instanceof RpcInvocation) { RpcInvocation invocation2 = (RpcInvocation) invocation; // 得到方法名稱 String methodName = invocation2.getMethodName(); // 得到參數類型集合 Class<?>[] parameterTypes = invocation2.getParameterTypes(); // 得到參數集合 Object[] arguments = invocation2.getArguments(); // 把參數類型的名稱放入集合 String[] types = new String[parameterTypes.length]; for (int i = 0; i < parameterTypes.length; i++) { types[i] = ReflectUtils.getName(parameterTypes[i]); } Object[] args; // 對參數集合進行序列化 if (ProtocolUtils.isBeanGenericSerialization(generic)) { args = new Object[arguments.length]; for (int i = 0; i < arguments.length; i++) { args[i] = JavaBeanSerializeUtil.serialize(arguments[i], JavaBeanAccessor.METHOD); } } else { args = PojoUtils.generalize(arguments); } // 從新把序列化的參數放入 invocation2.setMethodName(Constants.$INVOKE); invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES); invocation2.setArguments(new Object[]{methodName, types, args}); // 調用下一個調用鏈 Result result = invoker.invoke(invocation2); if (!result.hasException()) { Object value = result.getValue(); try { Method method = invoker.getInterface().getMethod(methodName, parameterTypes); if (ProtocolUtils.isBeanGenericSerialization(generic)) { if (value == null) { return new RpcResult(value); } else if (value instanceof JavaBeanDescriptor) { // 用javabean方式反序列化 return new RpcResult(JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) value)); } else { throw new RpcException( "The type of result value is " + value.getClass().getName() + " other than " + JavaBeanDescriptor.class.getName() + ", and the result is " + value); } } else { // 直接轉化爲pojo類型 return new RpcResult(PojoUtils.realize(value, method.getReturnType(), method.getGenericReturnType())); } } catch (NoSuchMethodException e) { throw new RpcException(e.getMessage(), e); } // 若是調用鏈中有異常拋出,而且是GenericException類型的異常 } else if (result.getException() instanceof GenericException) { GenericException exception = (GenericException) result.getException(); try { // 得到異常類名 String className = exception.getExceptionClass(); Class<?> clazz = ReflectUtils.forName(className); Throwable targetException = null; Throwable lastException = null; try { targetException = (Throwable) clazz.newInstance(); } catch (Throwable e) { lastException = e; for (Constructor<?> constructor : clazz.getConstructors()) { try { targetException = (Throwable) constructor.newInstance(new Object[constructor.getParameterTypes().length]); break; } catch (Throwable e1) { lastException = e1; } } } if (targetException != null) { try { Field field = Throwable.class.getDeclaredField("detailMessage"); if (!field.isAccessible()) { field.setAccessible(true); } field.set(targetException, exception.getExceptionMessage()); } catch (Throwable e) { logger.warn(e.getMessage(), e); } result = new RpcResult(targetException); } else if (lastException != null) { throw lastException; } } catch (Throwable e) { throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ", message: " + exception.getExceptionMessage(), e); } } return result; } // 若是是泛化調用 if (invocation.getMethodName().equals(Constants.$INVOKE) && invocation.getArguments() != null && invocation.getArguments().length == 3 && ProtocolUtils.isGeneric(generic)) { Object[] args = (Object[]) invocation.getArguments()[2]; if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (Object arg : args) { // 若是調用消息不是字節數組類型,則拋出異常 if (!(byte[].class == arg.getClass())) { error(generic, byte[].class.getName(), arg.getClass().getName()); } } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (Object arg : args) { if (!(arg instanceof JavaBeanDescriptor)) { error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName()); } } } // 設置附加值 ((RpcInvocation) invocation).setAttachment( Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY)); } return invoker.invoke(invocation); } /** * 拋出錯誤異常 * @param generic * @param expected * @param actual * @throws RpcException */ private void error(String generic, String expected, String actual) throws RpcException { throw new RpcException( "Generic serialization [" + generic + "] only support message type " + expected + " and your message type is " + actual); }
該過濾器是當服務調用超時的時候,記錄告警日誌。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 得到開始時間 long start = System.currentTimeMillis(); // 調用下一個調用鏈 Result result = invoker.invoke(invocation); // 得到調用使用的時間 long elapsed = System.currentTimeMillis() - start; // 若是服務調用超時,則打印告警日誌 if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) { if (logger.isWarnEnabled()) { logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms."); } } return result; }
該過濾器提供了token的驗證功能,關於token的介紹能夠查看官方文檔。
@Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { // 得到token值 String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY); if (ConfigUtils.isNotEmpty(token)) { // 得到服務類型 Class<?> serviceType = invoker.getInterface(); // 得到附加值 Map<String, String> attachments = inv.getAttachments(); String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY); // 若是令牌不同,則拋出異常 if (!token.equals(remoteToken)) { throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost()); } } // 調用下一個調用鏈 return invoker.invoke(inv); }
該過濾器的做用是對TPS限流。
/** * TPS 限制器對象 */ private final TPSLimiter tpsLimiter = new DefaultTPSLimiter(); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 若是限流器不容許,則拋出異常 if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) { throw new RpcException( "Failed to invoke service " + invoker.getInterface().getName() + "." + invocation.getMethodName() + " because exceed max service tps."); } // 調用下一個調用鏈 return invoker.invoke(invocation); }
其中關鍵是TPS 限制器對象,請看下面的分析。
public interface TPSLimiter { /** * judge if the current invocation is allowed by TPS rule * 是否容許經過 * @param url url * @param invocation invocation * @return true allow the current invocation, otherwise, return false */ boolean isAllowable(URL url, Invocation invocation); }
該接口是tps限流器的接口,只定義了一個是否容許經過的方法。
該類是統計的數據結構。
class StatItem { /** * 服務名 */ private String name; /** * 最後一次重置的時間 */ private long lastResetTime; /** * 週期 */ private long interval; /** * 剩餘多少流量 */ private AtomicInteger token; /** * 限制大小 */ private int rate; StatItem(String name, int rate, long interval) { this.name = name; this.rate = rate; this.interval = interval; this.lastResetTime = System.currentTimeMillis(); this.token = new AtomicInteger(rate); } public boolean isAllowable() { long now = System.currentTimeMillis(); // 若是限制的時間大於最後一次時間加上週期,則重置 if (now > lastResetTime + interval) { token.set(rate); lastResetTime = now; } int value = token.get(); boolean flag = false; // 直到有流量 while (value > 0 && !flag) { flag = token.compareAndSet(value, value - 1); value = token.get(); } // 返回flag return flag; } long getLastResetTime() { return lastResetTime; } int getToken() { return token.get(); } @Override public String toString() { return new StringBuilder(32).append("StatItem ") .append("[name=").append(name).append(", ") .append("rate = ").append(rate).append(", ") .append("interval = ").append(interval).append("]") .toString(); } }
能夠看到該類中記錄了一些訪問的流量,而且設置了週期重置機制。
該類實現了TPSLimiter,是默認的tps限流器實現。
public class DefaultTPSLimiter implements TPSLimiter { /** * 統計項集合 */ private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>(); @Override public boolean isAllowable(URL url, Invocation invocation) { // 得到tps限制大小,默認-1,不限制 int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); // 得到限流週期 long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL); String serviceKey = url.getServiceKey(); // 若是限制 if (rate > 0) { // 從集合中得到統計項 StatItem statItem = stats.get(serviceKey); // 若是爲空,則新建 if (statItem == null) { stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); } // 返回是否容許 return statItem.isAllowable(); } else { StatItem statItem = stats.get(serviceKey); if (statItem != null) { // 移除該服務的統計項 stats.remove(serviceKey); } } return true; } }
是否容許的邏輯仍是調用了統計項中的isAllowable方法。
本文介紹了不少的過濾器,哪些過濾器是在服務引用的,哪些服務器是服務暴露的,能夠查看相應源碼過濾器的實現上的註解,
例如ActiveLimitFilter上:
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
能夠看到group爲consumer組的,也就是服務消費者側的,則是服務引用過程當中的的過濾器。
例如ExecuteLimitFilter上:
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
能夠看到group爲provider組的,也就是服務提供者側的,則是服務暴露過程當中的的過濾器。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了在服務引用和服務暴露中的各類filter過濾器。接下來我將開始對rpc模塊的監聽器進行講解。