一致性hash算法(golang)

回憶

還記得剛畢業入職到新公司的時候, 個人上級領導與前端同窗解釋後端技術棧龐雜. 大概記得舉了一個例子是 「如何多臺機器提供數據緩存存儲服務?」 , 扭頭問了我一下, 當時直接說使用 hash取模 的方式分攤數據。前端

接着我確定被追問一臺機器掛了怎麼辦, 怎麼減小節點掛掉的影響, 結果是被鄙視了, 從那之後也就記住了 一致性hash 這個詞.node

雖然工做時間也不短了, 可是如今再問我 一致性hash算法 到底是啥, 我大概也只能回答出 一個圓環, 環裏有不少虛擬節點, key hash後定位到對應的虛擬節點, 確歷來沒有本身動手寫過一行代碼.算法

老驥伏櫪、志在千里. 咱們開始吧~後端

一致性hash算法

一致性哈希算法在1997年由麻省理工學院的Karger等人在解決分佈式Cache中提出的,設計目標是爲了解決因特網中的熱點(Hot spot)問題,初衷和CARP十分相似。一致性哈希修正了CARP使用的簡單哈希算法帶來的問題,使得DHT能夠在P2P環境中真正獲得應用.緩存

一致性hash在數據存儲領域中有普遍的應用, 目的主要是減小數據傾斜問題, 在節點失效、節點增長時, 只需影響少許數據.markdown

咱們看上圖, 爲一個環, 在環中咱們根據hash計算放入4個node節點併發

咱們又根據鍵值計算結果放入到對應離他最近的下一個節點.app

當咱們新增node5節點時計算hash值放入環中, 僅將 node4 中部分數據(hash值小於node5) 從新定位到 node5 便可.分佈式

當移除節點node4時, 也僅將 node4 數據移入下一個節點 node3 便可.測試

思考: node4失效後, node4數據壓力所有給到node3, node3的壓力增大, 會不會發生鏈條反應, 致使全部節點崩潰?

這時咱們須要增長虛擬節點來分擔 node3 壓力, 將實體節點經過 hash計算 分散更多的分佈到環上, 相對來講數據 hash key 更能隨機到不一樣的節點上, 理想狀態下當其中一個節點失效時, 多個節點分攤數據壓力

邏輯實現

MyServiceNode節點操做

package main

import (
	"fmt"
	"strconv"
)

//service node 結構體定義
type ServiceNode struct {
	Ip   string
	Port string
}
// 返回service node實例
func NewServiceNode(ip string, port string) *ServiceNode {
	return &ServiceNode{
		Ip:   ip,
		Port: port,
	}
}

func main() {

	//實例化三個實體節點
	node1 := NewServiceNode("127.0.0.1", "3305")
	node2 := NewServiceNode("127.0.0.1", "3306")
	node3 := NewServiceNode("127.0.0.1", "3307")

	//添加對應的虛擬化節點數一、一、3
	virtualNodeService.addVirtualNode(node1, 3)
	virtualNodeService.addVirtualNode(node2, 3)
	virtualNodeService.addVirtualNode(node3, 3)

	//打印節點列表
	PrintRouteList()

	//循環測試
	for i := 0; i < 20; i++ {
		//移除node2節點
		virtualNodeService.removeVirtualNode(node2, 3)
		//獲取對應節點地址
		serviceNode := virtualNodeService.getVirtualNodel("get_cc" + strconv.Itoa(i))
		fmt.Println(serviceNode.Ip + ":" + serviceNode.Port)
	}
}

//打印節點列表
func PrintRouteList() {
	for key, val := range virtualNodeService.VirtualNodes {
		fmt.Println("key=" + strconv.Itoa(int(key)) + ", host=" + val.Ip + ":" + val.Port)
	}
}

複製代碼

virtualNodeService 虛擬化節點

package main

import (
	"crypto/md5"
	"encoding/hex"
	"sort"
	"strconv"
	"sync"
)

var virtualNodeService = NewVirtualNode()

type NodeType []uint32

//Len()
func (s NodeType) Len() int {
	return len(s)
}

//Less():成績將有低到高排序
func (s NodeType) Less(i, j int) bool {
	return s[i] < s[j]
}

//Swap()
func (s NodeType) Swap(i, j int) {
	s[i], s[j] = s[j], s[i]
}

//虛擬節點結構定義
type VirtualNode struct {
	VirtualNodes map[uint32]*ServiceNode
	NodeKeys     NodeType
	sync.RWMutex
}

//實例化虛擬節點對象
func NewVirtualNode() *VirtualNode {
	return &VirtualNode{
		VirtualNodes: map[uint32]*ServiceNode{},
	}
}

//添加虛擬節點
func (v *VirtualNode) addVirtualNode(serviceNode *ServiceNode, virtualNum uint) {

	//併發讀寫map-加鎖
	v.Lock()
	defer v.Unlock()

	for i := uint(0); i < virtualNum; i++ {
		hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
		v.VirtualNodes[v.getHashCode(hashStr)] = serviceNode
	}

	//虛擬節點hash值排序
	v.sortHash()
}

//移除虛擬節點
func (v *VirtualNode) removeVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
	//併發讀寫map-加鎖
	v.Lock()
	defer v.Unlock()

	for i := uint(0); i < virtualNum; i++ {
		hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
		delete(v.VirtualNodes, v.getHashCode(hashStr))
	}

	v.sortHash()
}

//hash數值排序
func (v *VirtualNode) sortHash() {
	v.NodeKeys = nil
	for k := range v.VirtualNodes {
		v.NodeKeys = append(v.NodeKeys, k)
	}
	sort.Sort(v.NodeKeys)
}

//獲取虛擬節點(二分查找)
func (v *VirtualNode) getVirtualNodel(routeKey string) *ServiceNode {
	//併發讀寫map-加讀鎖,可併發讀不可同時寫
	v.RLock()
	defer v.RUnlock()

	index := 0
	hashCode := v.getHashCode(routeKey)
	i := sort.Search(len(v.NodeKeys), func(i int) bool { return v.NodeKeys[i] > hashCode })
	//當i大於下標最大值時,證實沒找到, 給到第0個虛擬節點, 當i小於node節點數時, index爲當前節點
	if i < len(v.NodeKeys) {
		index = i
	} else {
		index = 0
	}

	//返回具體節點
	return v.VirtualNodes[v.NodeKeys[index]]
}

//獲取hash code(採用md5字符串後計算)
func (v *VirtualNode) getHashCode(nodeHash string) uint32 {
	//crc32方式hash code
	//return crc32.ChecksumIEEE([]byte(nodeHash))
	md5 := md5.New()
	md5.Write([]byte(nodeHash))
	md5Str := hex.EncodeToString(md5.Sum(nil))

	h := 0
	byteHash := []byte(md5Str)
	for i := 0; i < 32; i++ {
		h <<= 8
		h |= int(byteHash[i]) & 0xFF
	}
	return uint32(h)
}

複製代碼
相關文章
相關標籤/搜索