dubbo源碼分析-集羣容錯(二)

FailbackClusterInvoker

FailbackClusterInvoke是失敗後,返回一個空結果給服務提供者。並經過定時任務對失敗的調用進行重傳,適合執行消息通知等操做。緩存

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
    private static final long RETRY_FAILED_PERIOD = 5000L;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
    private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap();
    private volatile ScheduledFuture<?> retryFuture;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
        if (this.retryFuture == null) {
		//使用對象鎖 初始化定時任務 
            synchronized(this) {
                if (this.retryFuture == null) {
                    this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                        public void run() {
                            try {
								//調用當前實例的重試方法 重試後移除
                                FailbackClusterInvoker.this.retryFailed();
                            } catch (Throwable var2) {
                                FailbackClusterInvoker.logger.error("Unexpected error occur at collect statistic", var2);
                            }

                        }
                    }, 5000L, 5000L, TimeUnit.MILLISECONDS);
                }
            }
        }

        this.failed.put(invocation, router);
    }

    void retryFailed() {
        if (this.failed.size() != 0) {
            Iterator i$ = (new HashMap(this.failed)).entrySet().iterator();

            while(i$.hasNext()) {
                Entry<Invocation, AbstractClusterInvoker<?>> entry = (Entry)i$.next();
                Invocation invocation = (Invocation)entry.getKey();
                Invoker invoker = (Invoker)entry.getValue();

                try {
                    invoker.invoke(invocation);
                    this.failed.remove(invocation);
                } catch (Throwable var6) {
                    logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", var6);
                }
            }

        }
    }

    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            this.checkInvokers(invokers, invocation);
            Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
            return invoker.invoke(invocation);
        } catch (Throwable var5) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + var5.getMessage() + ", ", var5);
			//若是調用失敗 放入失敗列表
            this.addFailed(invocation, this);
            return new RpcResult();
        }
    }
}

FailsafeClusterInvoker

FailsafeClusterInvoker實現了調用失敗 直接返回空的對象安全

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            this.checkInvokers(invokers, invocation);
            Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
            return invoker.invoke(invocation);
        } catch (Throwable var5) {
            logger.error("Failsafe ignore exception: " + var5.getMessage(), var5);
            return new RpcResult();
        }
    }
}

FailfastClusterInvoker

FailfastClusterInvoker實現了只調用一次 失敗後拋異常併發

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        this.checkInvokers(invokers, invocation);
        Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null);

        try {
            return invoker.invoke(invocation);
        } catch (Throwable var6) {
            if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
                throw (RpcException)var6;
            } else {
                throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + this.getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6);
            }
        }
    }
}

ForkingClusterInvoker

ForkingClusterInvoker 會在運行時經過線程池建立多個線程,併發調用多個服務提供者。只要有一個服務提供者成功返回告終果,doInvoke 方法就會當即結束運行。ForkingClusterInvoker 的應用場景是在一些對實時性要求比較高讀操做(注意是讀操做,並行寫操做可能不安全)下使用,但這將會耗費更多的資源。ide

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        this.checkInvokers(invokers, invocation);
        int forks = this.getUrl().getParameter("forks", 2);
        int timeout = this.getUrl().getParameter("timeout", 1000);
        final Object selected;
        if (forks > 0 && forks < invokers.size()) {
            selected = new ArrayList();
			
            for(int i = 0; i < forks; ++i) {
                Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)selected);
                if (!((List)selected).contains(invoker)) {
                    ((List)selected).add(invoker);
                }
            }
        } else {
            selected = invokers;
        }

        RpcContext.getContext().setInvokers((List)selected);
        final AtomicInteger count = new AtomicInteger();
        final BlockingQueue<Object> ref = new LinkedBlockingQueue();
        Iterator i$ = ((List)selected).iterator();

        while(i$.hasNext()) {
            final Invoker<T> invoker = (Invoker)i$.next();
			//同時執行多個調用任務
            this.executor.execute(new Runnable() {
                public void run() {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable var3) {
                        int value = count.incrementAndGet();
						if (value >= ((List)selected).size()) {
                            ref.offer(var3);
                        }
                    }

                }
            });
        }

        try {
			//指定時間後 獲取隊列返回的結果 不管是否有異常 有一個結果即返回
            Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS);
            if (ret instanceof Throwable) {
                Throwable e = (Throwable)ret;
                throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
            } else {
                return (Result)ret;
            }
        } catch (InterruptedException var11) {
            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + var11.getMessage(), var11);
        }
    }
}

BroadcastClusterInvoker

BroadcastClusterInvoker會逐個調用每一個服務提供者,若是其中一臺報錯,在循環調用結束後,BroadcastClusterInvoker 會拋出異常。該類一般用於通知全部提供者更新緩存或日誌等本地資源信息。this

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        this.checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers(invokers);
        RpcException exception = null;
        Result result = null;
        Iterator i$ = invokers.iterator();

        while(i$.hasNext()) {
            Invoker invoker = (Invoker)i$.next();

            try {
                result = invoker.invoke(invocation);
            } catch (RpcException var9) {
                exception = var9;
                logger.warn(var9.getMessage(), var9);
            } catch (Throwable var10) {
                exception = new RpcException(var10.getMessage(), var10);
                logger.warn(var10.getMessage(), var10);
            }
        }

        if (exception != null) {
            throw exception;
        } else {
            return result;
        }
    }
}
相關文章
相關標籤/搜索