警戒看不見的重試機制:爲何使用RPC必須考慮冪等性

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習java

0 文章概述

在RPC場景中由於重試或者沒有實現冪等機制而致使的重複數據問題,必須引發你們重視,有可能會形成例如一次購買建立多筆訂單,一條通知信息被髮送屢次等問題,這是技術人員必須面對和解決的問題。sql

有人可能會說:當調用失敗時程序並無顯示重試,爲何還會產生重複數據問題呢?這是由於即便沒有顯示重試,RPC框架在集羣容錯機制中自動進行了重試,這個問題必須引發關注。數據庫

本文咱們以DUBBO框架爲例分析爲何重試,怎麼作重試,怎麼作冪等三個問題。安全


看不見的重試機制.jpeg


1 爲何重試

若是簡單對一個RPC交互過程進行分類,咱們能夠分爲三類:響應成功、響應失敗、沒有響應。微信


RPC3.jpg


對於響應成功和響應失敗這兩種狀況,消費者很好處理。由於響應信息明確,因此只要根據響應信息,繼續處理成功或者失敗邏輯便可。可是沒有響應這種場景比較難處理,這是由於沒有響應可能包含如下狀況:markdown

(1) 生產者根本沒有接收到請求
(2) 生產者接收到請求而且已處理成功,可是消費者沒有接收到響應
(3) 生產者接收到請求而且已處理失敗,可是消費者沒有接收到響應
複製代碼

假設你是一名RPC框架設計者,到底是選擇重試仍是放棄調用呢?其實最終如何選擇取決於業務特性,有的業務自己就具備冪等性,可是有的業務不能容許重試不然會形成重複數據。架構

那麼誰對業務特性最熟悉呢?答案是消費者,由於消費者做爲調用方確定最熟悉自身業務,因此RPC框架只要提供一些策略供消費者選擇便可。併發


2 怎麼作重試

2.1 集羣容錯策略

DUBBO做爲一款優秀RPC框架,提供了以下集羣容錯策略供消費者選擇:負載均衡

Failover: 故障轉移
Failfast: 快速失敗
Failsafe: 安全失敗
Failback: 異步重試
Forking:  並行調用
Broadcast:廣播調用
複製代碼

(1) Failover

故障轉移策略。做爲默認策略當消費發生異常時經過負載均衡策略再選擇一個生產者節點進行調用,直到達到重試次數框架

(2) Failfast

快速失敗策略。消費者只消費一次服務,當發生異常時則直接拋出

(3) Failsafe

安全失敗策略。消費者只消費一次服務,若是消費失敗則包裝一個空結果,不拋出異常

(4) Failback

異步重試策略。當消費發生異常時返回一個空結果,失敗請求將會進行異步重試。若是重試超過最大重試次數還不成功,放棄重試並不拋出異常

(5) Forking

並行調用策略。消費者經過線程池併發調用多個生產者,只要有一個成功就算成功

(6) Broadcast

廣播調用策略。消費者遍歷調用全部生產者節點,任何一個出現異常則拋出異常


2.2 源碼分析

2.2.1 Failover

Failover故障轉移策略做爲默認策略,當消費發生異常時經過負載均衡策略再選擇一個生產者節點進行調用,直到達到重試次數。即便業務代碼沒有顯示重試,也有可能屢次執行消費邏輯從而形成重複數據:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // 全部生產者Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);

        // 獲取重試次數
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;

        // 已經調用過的生產者
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);

        // 重試直到達到最大次數
        for (int i = 0; i < len; i++) {
            if (i > 0) {

                // 若是當前實例被銷燬則拋出異常
                checkWhetherDestroyed();

                // 根據路由策略選出可用生產者Invokers
                copyInvokers = list(invocation);

                // 從新檢查
                checkInvokers(copyInvokers, invocation);
            }

            // 負載均衡選擇一個生產者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 服務消費發起遠程調用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                // 有結果則返回
                return result;
            } catch (RpcException e) {
                // 業務異常直接拋出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException不拋出繼續重試
                le = new RpcException(e.getMessage(), e);
            } finally {
                // 保存已經訪問過的生產者
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}
複製代碼

消費者調用生產者節點A發生RpcException異常時(例如超時異常),在未達到最大重試次數以前,消費者會經過負載均衡策略再次選擇其它生產者節點消費。試想若是生產者節點A其實已經處理成功了,可是沒有及時將成功結果返回給消費者,那麼再次重試可能就會形成重複數據問題。


2.2.2 Failfast

快速失敗策略。消費者只消費一次服務,當發生異常時則直接拋出,不會進行重試:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

        // 檢查生產者Invokers是否合法
        checkInvokers(invokers, invocation);

        // 負載均衡選擇一個生產者Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 服務消費發起遠程調用
            return invoker.invoke(invocation);
        } catch (Throwable e) {

            // 服務消費失敗不重試直接拋出異常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                   "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                   + " select from all providers " + invokers + " for service " + getInterface().getName()
                                   + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                   + " use dubbo version " + Version.getVersion()
                                   + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                   e.getCause() != null ? e.getCause() : e);
        }
    }
}

複製代碼

2.2.3 Failsafe

安全失敗策略。消費者只消費一次服務,若是消費失敗則包裝一個空結果,不拋出異常,不會進行重試:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {

            // 檢查生產者Invokers是否合法
            checkInvokers(invokers, invocation);

            // 負載均衡選擇一個生產者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

            // 服務消費發起遠程調用
            return invoker.invoke(invocation);

        } catch (Throwable e) {
            // 消費失敗包裝爲一個空結果對象
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult();
        }
    }
}
複製代碼

2.2.4 Failback

異步重試策略。當消費發生異常時返回一個空結果,失敗請求將會進行異步重試。若是重試超過最大重試次數還不成功,放棄重試並不拋出異常:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // 建立定時器
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 構造定時任務
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            // 定時任務放入定時器等待執行
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {

            // 檢查生產者Invokers是否合法
            checkInvokers(invokers, invocation);

            // 負責均衡選擇一個生產者Invoker
            invoker = select(loadbalance, invocation, invokers, null);

            // 消費服務發起遠程調用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);

            // 若是服務消費失敗則記錄失敗請求
            addFailed(loadbalance, invocation, invokers, invoker);

            // 返回空結果
            return new RpcResult();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        if (failTimer != null) {
            failTimer.stop();
        }
    }

    /** * RetryTimerTask */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;

        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }

        @Override
        public void run(Timeout timeout) {
            try {
                // 負載均衡選擇一個生產者Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;

                // 服務消費發起遠程調用
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);

                // 超出最大重試次數記錄日誌不拋出異常
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // 未超出最大重試次數從新放入定時器
                    rePut(timeout);
                }
            }
        }

        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }

            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }

            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
}
複製代碼

2.2.5 Forking

並行調用策略。消費者經過線程池併發調用多個生產者,只要有一個成功就算成功:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;

            // 獲取配置參數
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 獲取並行執行的Invoker列表
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // 選擇生產者
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 防止重複增長Invoker
                    if (!selected.contains(invoker)) {
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {

                // 在線程池中併發執行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 執行消費邏輯
                            Result result = invoker.invoke(invocation);
                            // 存儲消費結果
                            ref.offer(result);
                        } catch (Throwable e) {
                            // 若是異常次數大於等於forks參數值說明所有調用失敗,則把異常放入隊列
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // 從隊列獲取結果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // 若是異常類型表示所有調用失敗則拋出異常
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}
複製代碼

2.2.6 Broadcast

廣播調用策略。消費者遍歷調用全部生產者節點,任何一個出現異常則拋出異常:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        // 遍歷調用全部生產者節點
        for (Invoker<T> invoker : invokers) {
            try {
                // 執行消費邏輯
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        // 任何一個出現異常則拋出異常
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}
複製代碼

3 怎麼作冪等

通過上述分析咱們知道,RPC框架自帶的重試機制可能會形成數據重複問題,那麼在使用中必須考慮冪等性。冪等性是指一次操做與屢次操做產生結果相同,並不會由於屢次操做而產生不一致性。常見冪等方案有取消重試、冪等表、數據庫鎖、狀態機。


3.1 取消重試

取消重試有兩種方法,第一是設置重試次數爲零,第二是選擇不重試的集羣容錯策略。

<!-- 設置重試次數爲零 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />

<!-- 選擇集羣容錯方案 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />
複製代碼

3.2 冪等表

假設用戶支付成功後,支付系統將支付成功消息,發送至消息隊列。物流系統訂閱到這個消息,準備爲這筆訂單建立物流單。

可是消息隊列可能會重複推送,物流系統有可能接收到屢次這條消息。咱們但願達到效果是:不管接收到多少條重複消息,只能建立一筆物流單。

解決方案是冪等表方案。新建一張冪等表,該表就是用來作冪等,無其它業務意義,有一個字段名爲key建有惟一索引,這個字段是冪等標準。

物流系統訂閱到消息後,首先嚐試插入冪等表,訂單編號做爲key字段。若是成功則繼續建立物流單,若是訂單編號已經存在則違反惟一性原則,沒法插入成功,說明已經進行過業務處理,丟棄消息。

這張表數據量會比較大,咱們能夠經過定時任務對數據進行歸檔,例如只保留7天數據,其它數據存入歸檔表。

還有一種廣義冪等表就是咱們能夠用Redis替代數據庫,在建立物流單以前,咱們能夠檢查Redis是否存在該訂單編號數據,同時能夠爲這類數據設置7天過時時間。


3.3 狀態機

物流單建立成功後會發送消息,訂單系統訂閱到消息後更新狀態爲完成,假設變動是將訂單狀態0更新至狀態1。訂單系統也可能收到多條消息,可能在狀態已經被更新至狀態1以後,依然收到物流單建立成功消息。

解決方案是狀態機方案。首先繪製狀態機圖,分析狀態流轉形態。例如通過分析狀態1已是最終態,那麼即便接收到物流單建立成功消息也再也不處理,丟棄消息。


3.4 數據庫鎖

數據庫鎖又能夠分爲悲觀鎖和樂觀鎖兩種類型,悲觀鎖是在獲取數據時加鎖:

select * from table where col='xxx' for update 
複製代碼

樂觀鎖是在更新時加鎖,第一步首先查出數據,數據包含version字段。第二步進行更新操做,若是此時記錄已經被修改則version字段已經發生變化,沒法更新成功:

update table set xxx,
version = #{version} + 1 
where id = #{id} 
and version = #{version}
複製代碼

4 文章總結

本文首先分析了爲何重試這個問題,由於對於RPC交互無響應場景,重試是一種重要選擇。而後分析了DUBBO提供的六種集羣容錯策略,Failover做爲默認策略提供了重試機制,在業務代碼沒有顯示重試狀況下,仍有可能發起屢次調用,這必須引發重視。最後咱們分析了幾種經常使用冪等方案,但願本文對你們有所幫助。

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習

相關文章
相關標籤/搜索