codis的Rebalance算法

Codis 由四部分組成:node

  • Codis Proxy (codis-proxy)
  • Codis Dashboard (codis-config)
  • Codis Redis (codis-server)
  • ZooKeeper/Etcd

 

codis-proxy 是客戶端鏈接的 Redis 代理服務, codis-proxy 自己實現了 Redis 協議, 表現得和一個原生的 Redis 沒什麼區別 (就像 Twemproxy), 對於一個業務來講, 能夠部署多個 codis-proxy, codis-proxy 自己是無狀態的.git

 

codis-config 是 Codis 的管理工具, 支持包括, 添加/刪除 Redis 節點, 添加/刪除 Proxy 節點, 發起數據遷移等操做. codis-config 自己還自帶了一個 http server, 會啓動一個 dashboard, 用戶能夠直接在瀏覽器上觀察 Codis 集羣的運行狀態.github

codis-server 是 Codis 項目維護的一個 Redis 分支, 基於 2.8.21 開發, 加入了 slot 的支持和原子的數據遷移指令. Codis 上層的 codis-proxy 和 codis-config 只能和這個版本的 Redis 交互才能正常運行.golang

Codis 依賴 ZooKeeper 來存放數據路由表和 codis-proxy 節點的元信息, codis-config 發起的命令都會經過 ZooKeeper 同步到各個存活的 codis-proxy.redis

 

上面這張圖的意思是,client能夠直接訪問codis-proxy,也能夠經過訪問HAProxy。算法

由於codis的proxy是無狀態的,能夠比較容易的搭多個proxy來實現高可用性並橫向擴容。 對Java用戶來講,可使用通過咱們修改過的Jedis,Jodis ,來實現proxy層的HA。它會經過監控zk上的註冊信息來實時得到當前可用的proxy列表,既能夠保證高可用性,也能夠經過輪流請求全部的proxy實現負載均衡。若是須要異步請求,可使用咱們基於Netty開發的Nedis瀏覽器

 

codis的Rebalance數據結構

codis的存儲層能夠實現擴容,下面說明一下codis增長group的策略:app

codis是用golang編寫,使用了c的一些庫,Rebalance的源碼以下,仔細看過源碼後咱們能清楚理解它的策略負載均衡

func Rebalance() error {

    targetQuota, err := getQuotaMap(safeZkConn)

    if err != nil {

        return errors.Trace(err)

    }

    livingNodes, err := getLivingNodeInfos(safeZkConn)

    if err != nil {

        return errors.Trace(err)

    }

    log.Infof("start rebalance")

    for _, node := range livingNodes {

        for len(node.CurSlots) > targetQuota[node.GroupId] {

            for _, dest := range livingNodes {

                if dest.GroupId != node.GroupId && len(dest.CurSlots) < targetQuota[dest.GroupId] && len(node.CurSlots) > targetQuota[node.GroupId] {

                    slot := node.CurSlots[len(node.CurSlots)-1]

                    // create a migration task

                    info := &MigrateTaskInfo{

                        Delay:      0,

                        SlotId:     slot,

                        NewGroupId: dest.GroupId,

                        Status:     MIGRATE_TASK_PENDING,

                        CreateAt:   strconv.FormatInt(time.Now().Unix(), 10),

                    }

                    globalMigrateManager.PostTask(info)



                    node.CurSlots = node.CurSlots[0 : len(node.CurSlots)-1]

                    dest.CurSlots = append(dest.CurSlots, slot)

                }

            }

        }

    }

    log.Infof("rebalance tasks submit finish")

    return nil

}

首先getQuotaMap(safeZkConn)獲取並計算出存活的group的slot配額,數據結構爲map<group, slot_num>。若是新增了group,那麼新的配額會在這個函數中計算完成。getLivingNodeInfos(safeZkConn),這個函數獲取存活的group列表。其實getQuotaMap也調用了這個方法,從ZK上獲取元數據。下面看一下getQuotaMap(safeZkConn):

func getQuotaMap(zkConn zkhelper.Conn) (map[int]int, error) {

    nodes, err := getLivingNodeInfos(zkConn)

    if err != nil {

        return nil, errors.Trace(err)

    }

    ret := make(map[int]int)

    var totalMem int64

    totalQuota := 0

    for _, node := range nodes {

        totalMem += node.MaxMemory

    }

    for _, node := range nodes {

        quota := int(models.DEFAULT_SLOT_NUM * node.MaxMemory * 1.0 / totalMem)

        ret[node.GroupId] = quota

        totalQuota += quota

    }

    // round up

    if totalQuota < models.DEFAULT_SLOT_NUM {

        for k, _ := range ret {

            ret[k] += models.DEFAULT_SLOT_NUM - totalQuota

            break

        }

    }

    return ret, nil

}

先遍歷nodes,計算最大內存總和,再遍歷nodes,根據該節點佔總內存大小的比例,分配slot數量,存入返回結果中。若是最後totalQuota數量小於1024,把剩餘的solt分配給第一個group。下面是getLivingNodeInfos(safeZkConn):

func getLivingNodeInfos(zkConn zkhelper.Conn) ([]*NodeInfo, error) {

    groups, err := models.ServerGroups(zkConn, globalEnv.ProductName())

    if err != nil {

        return nil, errors.Trace(err)

    }

    slots, err := models.Slots(zkConn, globalEnv.ProductName())

    slotMap := make(map[int][]int)

    for _, slot := range slots {

        if slot.State.Status == models.SLOT_STATUS_ONLINE {

            slotMap[slot.GroupId] = append(slotMap[slot.GroupId], slot.Id)

        }

    }

    var ret []*NodeInfo

    for _, g := range groups {

        master, err := g.Master(zkConn)

        if err != nil {

            return nil, errors.Trace(err)

        }

        if master == nil {

            return nil, errors.Errorf("group %d has no master", g.Id)

        }

        out, err := utils.GetRedisConfig(master.Addr, globalEnv.Password(), "maxmemory")

        if err != nil {

            return nil, errors.Trace(err)

        }

        maxMem, err := strconv.ParseInt(out, 10, 64)

        if err != nil {

            return nil, errors.Trace(err)

        }

        if maxMem <= 0 {

            return nil, errors.Errorf("redis %s should set maxmemory", master.Addr)

        }

        node := &NodeInfo{

            GroupId:   g.Id,

            CurSlots:  slotMap[g.Id],

            MaxMemory: maxMem,

        }

        ret = append(ret, node)

    }

    cnt := 0

    for _, info := range ret {

        cnt += len(info.CurSlots)

    }

    if cnt != models.DEFAULT_SLOT_NUM {

        return nil, errors.Errorf("not all slots are online")

    }

    return ret, nil

}

這段代碼中先獲取group和slot信息列表,遍歷slots,要求slot是在線狀態,若是有任何一個不在線,在最後判斷slot數量的時候都會報錯。遍歷groups,要求任何一個group都有master、配置了maxMem。

繼續分析Rebalance。三層遍歷,遍歷存活節點,若是當前配額大於rebalance以後的配額的話,第三個for循環中標紅的判斷的結果必定是新增的group。下面的邏輯就是將node中多出來的slot(當前舊的group)遷移到dest(當前新的group)。起一個task,完成遷移。

綜上所述,codis的Rebalance算法簡單、清晰,符合常規思路。另外go語言的可讀性也很強,以前沒有涉及過go,但適應一下能夠看懂。

若是group中master宕機,須要注意,codis將其中一個slave升級爲master時,該組內其餘slave實例是不會自動改變狀態的,這些slave仍將試圖從舊的master上同步數據,於是會致使組內新的master和其餘slave之間的數據不一致。由於redis的slave of命令切換master時會丟棄slave上的所有數據,重新master完整同步,會消耗master資源。所以建議在知情的狀況下手動操做。使用 codis-config server add <group_id> <redis_addr> slave 命令刷新這些節點的狀態便可。codis-ha不會自動刷新其餘slave的狀態。

相關文章
相關標籤/搜索