在使用dubbo時,一般會遇到timeout這個屬性,timeout屬性的做用是:給某個服務調用設置超時時間,若是服務在設置的時間內未返回結果,則會拋出調用超時異常:TimeoutException,在使用的過程當中,咱們有時會對provider和consumer兩個配置都會設置timeout值,那麼服務調用過程當中會以哪一個爲準?橘子同窗今天主要針對這個問題進行分析和擴展。多線程
以provider配置爲例:less
#### 方法級別 <dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl"> <dubbo:method name="test" timeout="10000"/> </dubbo:service>
#### 接口級別 <dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
#### 全局級別 <dubbo:service ="10000"/>
在dubbo中若是provider和consumer都配置了相同的一個屬性,好比本文分析的timeout,其實它們是有優先級的,consumer方法配置 > provider方法配置 > consumer接口配置 > provider接口配置 > consumer全局配置 > provider全局配置。因此對於小橘開始的提出的問題就有告終果,會以消費者配置的爲準,接下結合源碼來進行解析,其實源碼很簡單,在RegistryDirectory類中將服務列表轉換爲DubboInvlker方法中進行了處理:異步
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<String>(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { // If protocol is configured at the reference side, only the matching protocol is selected if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } // 重點就是下面這個方法 URL url = mergeUrl(providerUrl); String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = !url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
重點就是上面mergeUrl方法,將provider和comsumer的url參數進行了整合,在mergeUrl方法有會調用ClusterUtils.mergeUrl方法進行整合,由於這個方法比較簡單,就是對一些參數進行了整合了,會用consumer參數進行覆蓋,這裏就不分析了,若是感興趣的同窗能夠去研究一下。async
在配置設置了超時timeout,那麼代碼中是如何處理的,這裏我們在進行一下擴展,分析一下dubbo中是如何處理超時的,在調用服務方法,最後都會調用DubboInvoker.doInvoke方法,我們就從這個方法開始分析:ide
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); // For compatibility FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; // 異步處理 if (isAsyncFuture) { // register resultCallback, sometimes we need the async result being processed by the filter chain. result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { // 同步處理 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
在這個方法中,就以同步模式進行分析,看request方法,request()方法會返回一個DefaultFuture類,在去調用DefaultFuture.get()方法,這裏其實涉及到一個在異步中實現同步的技巧,我們這裏不作分析,因此重點就在get()方法裏:函數
@Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); }
在調用get()方法時,會去調用get(timeout)這個方法,在這個方法中會傳一個timeout字段,在和timeout就是我們配置的那個參數,在這個方法中我們要關注下面一個代碼塊:工具
if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 線程阻塞 done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // 在超時時間裏,尚未結果,則拋出超時異常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } }
重點看await()方法,會進行阻塞timeout時間,若是阻塞時間到了,則會喚醒往下執行,超時跳出while循環中,判斷是否有結果返回,若是沒有(這個地方要注意:只有有結果返回,或超時才跳出循環中),則拋出超時異常。講到這裏,超時原理基本上其實差很少了,DefaultFuture這個類還有個地方須要注意,在初始化DefaultFuture對象時,會去建立一個超時的延遲任務,延遲時間就是timeout值,在這個延遲任務中也會調用signal()方法喚醒阻塞。ui
不過在調用rpc遠程接口,若是對方的接口不能一次承載返回請求結果能力,咱們通常作法是分批調用,將調用一次分紅調用屢次,而後對每次結果進行匯聚,固然也能夠作用利用多線程的能力去執行。後面文章小橘將會介紹這種模式,敬請關注哦!this
/** * Description:通用 分批調用工具類 * 場景: * <pre> * 好比List參數的size可能爲 幾十個甚至上百個 * 若是invoke接口比較慢,傳入50個以上會超時,那麼能夠每次傳入20個,分批執行。 * </pre> * Author: OrangeCsong */ public class ParallelInvokeUtil { private ParallelInvokeUtil() {} /** * @param sourceList 源數據 * @param size 分批大小 * @param buildParam 構建函數 * @param processFunction 處理函數 * @param <R> 返回值 * @param <T> 入參\ * @param <P> 構建參數 * @return */ public static <R, T, P> List<R> partitionInvokeWithRes(List<T> sourceList, Integer size, Function<List<T>, P> buildParam, Function<P, List<R>> processFunction) { if (CollectionUtils.isEmpty(sourceList)) { return new ArrayList<>(0); } Preconditions.checkArgument(size > 0, "size大小必須大於0"); return Lists.partition(sourceList, size).stream() .map(buildParam) .map(processFunction) .filter(Objects::nonNull) .reduce(new ArrayList<>(), (resultList1, resultList2) -> { resultList1.addAll(resultList2); return resultList1; }); } }