聊聊dubbo-go的failbackCluster

本文主要研究一下dubbo-go的failbackClusterapp

failbackCluster

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go異步

type failbackCluster struct{}

const failback = "failback"

func init() {
    extension.SetCluster(failback, NewFailbackCluster)
}

// NewFailbackCluster ...
func NewFailbackCluster() cluster.Cluster {
    return &failbackCluster{}
}

func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
    return newFailbackClusterInvoker(directory)
}
  • failbackCluster的join方法執行newFailbackClusterInvoker

newFailbackClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.gourl

type failbackClusterInvoker struct {
    baseClusterInvoker

    once          sync.Once
    ticker        *time.Ticker
    maxRetries    int64
    failbackTasks int64
    taskList      *queue.Queue
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
    invoker := &failbackClusterInvoker{
        baseClusterInvoker: newBaseClusterInvoker(directory),
    }
    retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
    retries, err := strconv.Atoi(retriesConfig)
    if err != nil || retries < 0 {
        logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
        retries = constant.DEFAULT_FAILBACK_TIMES_INT
    }

    failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
    if failbackTasksConfig <= 0 {
        failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
    }
    invoker.maxRetries = int64(retries)
    invoker.failbackTasks = failbackTasksConfig
    return invoker
}
  • newFailbackClusterInvoker方法建立failbackClusterInvoker,並設置其maxRetries、failbackTasks屬性

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go日誌

func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
    invokers := invoker.directory.List(invocation)
    err := invoker.checkInvokers(invokers, invocation)
    if err != nil {
        logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
            invocation.MethodName(), invoker.GetUrl().Service(), err)
        return &protocol.RPCResult{}
    }
    url := invokers[0].GetUrl()
    methodName := invocation.MethodName()
    //Get the service loadbalance config
    lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)

    //Get the service method loadbalance config if have
    if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
        lb = v
    }
    loadbalance := extension.GetLoadbalance(lb)

    invoked := make([]protocol.Invoker, 0, len(invokers))
    var result protocol.Result

    ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
    //DO INVOKE
    result = ivk.Invoke(ctx, invocation)
    if result.Error() != nil {
        invoker.once.Do(func() {
            invoker.taskList = queue.New(invoker.failbackTasks)
            go invoker.process(ctx)
        })

        taskLen := invoker.taskList.Len()
        if taskLen >= invoker.failbackTasks {
            logger.Warnf("tasklist is too full > %d.\n", taskLen)
            return &protocol.RPCResult{}
        }

        timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
        invoker.taskList.Put(timerTask)

        logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
            methodName, url.Service(), result.Error().Error())
        // ignore
        return &protocol.RPCResult{}
    }

    return result
}
  • Invoke方法先經過invoker.directory.List(invocation)獲取invokers,以後經過extension.GetLoadbalance(lb)獲取loadbalance,而後經過invoker.doSelect(loadbalance, invocation, invokers, invoked)選擇invoker,以後執行其Invoke方法,若是出現異常則設置invoker.taskList,異步執行invoker.process(ctx),以後經過newRetryTimerTask建立timerTask,添加到invoker.taskList

Destroy

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.gocode

func (invoker *failbackClusterInvoker) Destroy() {
    invoker.baseClusterInvoker.Destroy()

    // stop ticker
    if invoker.ticker != nil {
        invoker.ticker.Stop()
    }

    _ = invoker.taskList.Dispose()
}
  • Destroy方法執行invoker.baseClusterInvoker.Destroy()、invoker.ticker.Stop()、invoker.taskList.Dispose()

process

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go隊列

func (invoker *failbackClusterInvoker) process(ctx context.Context) {
    invoker.ticker = time.NewTicker(time.Second * 1)
    for range invoker.ticker.C {
        // check each timeout task and re-run
        for {
            value, err := invoker.taskList.Peek()
            if err == queue.ErrDisposed {
                return
            }
            if err == queue.ErrEmptyQueue {
                break
            }

            retryTask := value.(*retryTimerTask)
            if time.Since(retryTask.lastT).Seconds() < 5 {
                break
            }

            // ignore return. the get must success.
            _, err = invoker.taskList.Get(1)
            if err != nil {
                logger.Warnf("get task found err: %v\n", err)
                break
            }

            go func(retryTask *retryTimerTask) {
                invoked := make([]protocol.Invoker, 0)
                invoked = append(invoked, retryTask.lastInvoker)

                retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
                var result protocol.Result
                result = retryInvoker.Invoke(ctx, retryTask.invocation)
                if result.Error() != nil {
                    retryTask.lastInvoker = retryInvoker
                    invoker.checkRetry(retryTask, result.Error())
                }
            }(retryTask)

        }
    }
}
  • process方法經過time.NewTicker(time.Second * 1)建立invoker.ticker,以後從invoker.taskList.Peek()獲取retryTask(以後Get方法進行poll),而後異步執行invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)選取retryInvoker,而後執行retryInvoker.Invoke(ctx, retryTask.invocation);若是執行出現異常,則經過invoker.checkRetry(retryTask, result.Error())進行check

checkRetry

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.goget

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
    logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",
        retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
    retryTask.retries++
    retryTask.lastT = time.Now()
    if retryTask.retries > invoker.maxRetries {
        logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n",
            retryTask.retries, retryTask.invocation)
    } else {
        invoker.taskList.Put(retryTask)
    }
}
  • checkRetry方法會遞增retryTask.retries,而後判斷是否超過invoker.maxRetries,超過則記錄error日誌,不超過則再次將retryTask添加到invoker.taskList

小結

newFailbackClusterInvoker方法建立failbackClusterInvoker,並設置其maxRetries、failbackTasks屬性;其Invoke方法先經過invoker.directory.List(invocation)獲取invokers,以後經過extension.GetLoadbalance(lb)獲取loadbalance,而後經過invoker.doSelect(loadbalance, invocation, invokers, invoked)選擇invoker,以後執行其Invoke方法,若是出現異常則設置invoker.taskList,異步執行invoker.process(ctx),以後經過newRetryTimerTask建立timerTask,添加到invoker.taskListit

failbackCluster忽略result,針對失敗的會加入隊列重試maxRetries次,適合fireAndForget的通訊模式

doc

相關文章
相關標籤/搜索