本文主要研究一下dubbo-go的failbackClusterapp
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go異步
type failbackCluster struct{} const failback = "failback" func init() { extension.SetCluster(failback, NewFailbackCluster) } // NewFailbackCluster ... func NewFailbackCluster() cluster.Cluster { return &failbackCluster{} } func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker { return newFailbackClusterInvoker(directory) }
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.gourl
type failbackClusterInvoker struct { baseClusterInvoker once sync.Once ticker *time.Ticker maxRetries int64 failbackTasks int64 taskList *queue.Queue } func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { invoker := &failbackClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), } retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) retries, err := strconv.Atoi(retriesConfig) if err != nil || retries < 0 { logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.") retries = constant.DEFAULT_FAILBACK_TIMES_INT } failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS) if failbackTasksConfig <= 0 { failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS } invoker.maxRetries = int64(retries) invoker.failbackTasks = failbackTasksConfig return invoker }
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go日誌
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) err := invoker.checkInvokers(invokers, invocation) if err != nil { logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", invocation.MethodName(), invoker.GetUrl().Service(), err) return &protocol.RPCResult{} } url := invokers[0].GetUrl() methodName := invocation.MethodName() //Get the service loadbalance config lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE) //Get the service method loadbalance config if have if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" { lb = v } loadbalance := extension.GetLoadbalance(lb) invoked := make([]protocol.Invoker, 0, len(invokers)) var result protocol.Result ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked) //DO INVOKE result = ivk.Invoke(ctx, invocation) if result.Error() != nil { invoker.once.Do(func() { invoker.taskList = queue.New(invoker.failbackTasks) go invoker.process(ctx) }) taskLen := invoker.taskList.Len() if taskLen >= invoker.failbackTasks { logger.Warnf("tasklist is too full > %d.\n", taskLen) return &protocol.RPCResult{} } timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk) invoker.taskList.Put(timerTask) logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n", methodName, url.Service(), result.Error().Error()) // ignore return &protocol.RPCResult{} } return result }
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.gocode
func (invoker *failbackClusterInvoker) Destroy() { invoker.baseClusterInvoker.Destroy() // stop ticker if invoker.ticker != nil { invoker.ticker.Stop() } _ = invoker.taskList.Dispose() }
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go隊列
func (invoker *failbackClusterInvoker) process(ctx context.Context) { invoker.ticker = time.NewTicker(time.Second * 1) for range invoker.ticker.C { // check each timeout task and re-run for { value, err := invoker.taskList.Peek() if err == queue.ErrDisposed { return } if err == queue.ErrEmptyQueue { break } retryTask := value.(*retryTimerTask) if time.Since(retryTask.lastT).Seconds() < 5 { break } // ignore return. the get must success. _, err = invoker.taskList.Get(1) if err != nil { logger.Warnf("get task found err: %v\n", err) break } go func(retryTask *retryTimerTask) { invoked := make([]protocol.Invoker, 0) invoked = append(invoked, retryTask.lastInvoker) retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) var result protocol.Result result = retryInvoker.Invoke(ctx, retryTask.invocation) if result.Error() != nil { retryTask.lastInvoker = retryInvoker invoker.checkRetry(retryTask, result.Error()) } }(retryTask) } } }
以後Get方法進行poll
),而後異步執行invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)選取retryInvoker,而後執行retryInvoker.Invoke(ctx, retryTask.invocation);若是執行出現異常,則經過invoker.checkRetry(retryTask, result.Error())進行checkdubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.goget
func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) { logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n", retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error()) retryTask.retries++ retryTask.lastT = time.Now() if retryTask.retries > invoker.maxRetries { logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n", retryTask.retries, retryTask.invocation) } else { invoker.taskList.Put(retryTask) } }
newFailbackClusterInvoker方法建立failbackClusterInvoker,並設置其maxRetries、failbackTasks屬性;其Invoke方法先經過invoker.directory.List(invocation)獲取invokers,以後經過extension.GetLoadbalance(lb)獲取loadbalance,而後經過invoker.doSelect(loadbalance, invocation, invokers, invoked)選擇invoker,以後執行其Invoke方法,若是出現異常則設置invoker.taskList,異步執行invoker.process(ctx),以後經過newRetryTimerTask建立timerTask,添加到invoker.taskListit
failbackCluster忽略result,針對失敗的會加入隊列重試maxRetries次,適合fireAndForget的通訊模式