FailbackClusterInvoke是失敗後,返回一個空結果給服務提供者。並經過定時任務對失敗的調用進行重傳,適合執行消息通知等操做。緩存
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); private static final long RETRY_FAILED_PERIOD = 5000L; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap(); private volatile ScheduledFuture<?> retryFuture; public FailbackClusterInvoker(Directory<T> directory) { super(directory); } private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (this.retryFuture == null) { //使用對象鎖 初始化定時任務 synchronized(this) { if (this.retryFuture == null) { this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { try { //調用當前實例的重試方法 重試後移除 FailbackClusterInvoker.this.retryFailed(); } catch (Throwable var2) { FailbackClusterInvoker.logger.error("Unexpected error occur at collect statistic", var2); } } }, 5000L, 5000L, TimeUnit.MILLISECONDS); } } } this.failed.put(invocation, router); } void retryFailed() { if (this.failed.size() != 0) { Iterator i$ = (new HashMap(this.failed)).entrySet().iterator(); while(i$.hasNext()) { Entry<Invocation, AbstractClusterInvoker<?>> entry = (Entry)i$.next(); Invocation invocation = (Invocation)entry.getKey(); Invoker invoker = (Invoker)entry.getValue(); try { invoker.invoke(invocation); this.failed.remove(invocation); } catch (Throwable var6) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", var6); } } } } protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { this.checkInvokers(invokers, invocation); Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation); } catch (Throwable var5) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + var5.getMessage() + ", ", var5); //若是調用失敗 放入失敗列表 this.addFailed(invocation, this); return new RpcResult(); } } }
FailsafeClusterInvoker實現了調用失敗 直接返回空的對象安全
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory<T> directory) { super(directory); } public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { this.checkInvokers(invokers, invocation); Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation); } catch (Throwable var5) { logger.error("Failsafe ignore exception: " + var5.getMessage(), var5); return new RpcResult(); } } }
FailfastClusterInvoker實現了只調用一次 失敗後拋異常併發
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { public FailfastClusterInvoker(Directory<T> directory) { super(directory); } public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); try { return invoker.invoke(invocation); } catch (Throwable var6) { if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) { throw (RpcException)var6; } else { throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + this.getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6); } } } }
ForkingClusterInvoker 會在運行時經過線程池建立多個線程,併發調用多個服務提供者。只要有一個服務提供者成功返回告終果,doInvoke 方法就會當即結束運行。ForkingClusterInvoker 的應用場景是在一些對實時性要求比較高讀操做(注意是讀操做,並行寫操做可能不安全)下使用,但這將會耗費更多的資源。ide
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> { private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory<T> directory) { super(directory); } public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); int forks = this.getUrl().getParameter("forks", 2); int timeout = this.getUrl().getParameter("timeout", 1000); final Object selected; if (forks > 0 && forks < invokers.size()) { selected = new ArrayList(); for(int i = 0; i < forks; ++i) { Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)selected); if (!((List)selected).contains(invoker)) { ((List)selected).add(invoker); } } } else { selected = invokers; } RpcContext.getContext().setInvokers((List)selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue(); Iterator i$ = ((List)selected).iterator(); while(i$.hasNext()) { final Invoker<T> invoker = (Invoker)i$.next(); //同時執行多個調用任務 this.executor.execute(new Runnable() { public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable var3) { int value = count.incrementAndGet(); if (value >= ((List)selected).size()) { ref.offer(var3); } } } }); } try { //指定時間後 獲取隊列返回的結果 不管是否有異常 有一個結果即返回 Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable)ret; throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } else { return (Result)ret; } } catch (InterruptedException var11) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + var11.getMessage(), var11); } } }
BroadcastClusterInvoker會逐個調用每一個服務提供者,若是其中一臺報錯,在循環調用結束後,BroadcastClusterInvoker 會拋出異常。該類一般用於通知全部提供者更新緩存或日誌等本地資源信息。this
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory<T> directory) { super(directory); } public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers(invokers); RpcException exception = null; Result result = null; Iterator i$ = invokers.iterator(); while(i$.hasNext()) { Invoker invoker = (Invoker)i$.next(); try { result = invoker.invoke(invocation); } catch (RpcException var9) { exception = var9; logger.warn(var9.getMessage(), var9); } catch (Throwable var10) { exception = new RpcException(var10.getMessage(), var10); logger.warn(var10.getMessage(), var10); } } if (exception != null) { throw exception; } else { return result; } } }