還記得剛畢業入職到新公司的時候, 個人上級領導與前端同窗解釋後端技術棧龐雜. 大概記得舉了一個例子是 「如何多臺機器提供數據緩存存儲服務?」 , 扭頭問了我一下, 當時直接說使用 hash取模 的方式分攤數據。前端
接着我確定被追問一臺機器掛了怎麼辦, 怎麼減小節點掛掉的影響, 結果是被鄙視了, 從那之後也就記住了 一致性hash 這個詞.node
雖然工做時間也不短了, 可是如今再問我 一致性hash算法 到底是啥, 我大概也只能回答出 一個圓環, 環裏有不少虛擬節點, key hash後定位到對應的虛擬節點, 確歷來沒有本身動手寫過一行代碼.算法
老驥伏櫪、志在千里. 咱們開始吧~後端
一致性哈希算法在1997年由麻省理工學院的Karger等人在解決分佈式Cache中提出的,設計目標是爲了解決因特網中的熱點(Hot spot)問題,初衷和CARP十分相似。一致性哈希修正了CARP使用的簡單哈希算法帶來的問題,使得DHT能夠在P2P環境中真正獲得應用.緩存
一致性hash在數據存儲領域中有普遍的應用, 目的主要是減小數據傾斜問題, 在節點失效、節點增長時, 只需影響少許數據.markdown
咱們看上圖, 爲一個環, 在環中咱們根據hash計算放入4個node節點併發
咱們又根據鍵值計算結果放入到對應離他最近的下一個節點.app
當咱們新增node5節點時計算hash值放入環中, 僅將 node4 中部分數據(hash值小於node5) 從新定位到 node5 便可.分佈式
當移除節點node4時, 也僅將 node4 數據移入下一個節點 node3 便可.測試
思考: node4失效後, node4數據壓力所有給到node3, node3的壓力增大, 會不會發生鏈條反應, 致使全部節點崩潰?
這時咱們須要增長虛擬節點來分擔 node3 壓力, 將實體節點經過 hash計算 分散更多的分佈到環上, 相對來講數據 hash key 更能隨機到不一樣的節點上, 理想狀態下當其中一個節點失效時, 多個節點分攤數據壓力
MyServiceNode節點操做
package main
import (
"fmt"
"strconv"
)
//service node 結構體定義
type ServiceNode struct {
Ip string
Port string
}
// 返回service node實例
func NewServiceNode(ip string, port string) *ServiceNode {
return &ServiceNode{
Ip: ip,
Port: port,
}
}
func main() {
//實例化三個實體節點
node1 := NewServiceNode("127.0.0.1", "3305")
node2 := NewServiceNode("127.0.0.1", "3306")
node3 := NewServiceNode("127.0.0.1", "3307")
//添加對應的虛擬化節點數一、一、3
virtualNodeService.addVirtualNode(node1, 3)
virtualNodeService.addVirtualNode(node2, 3)
virtualNodeService.addVirtualNode(node3, 3)
//打印節點列表
PrintRouteList()
//循環測試
for i := 0; i < 20; i++ {
//移除node2節點
virtualNodeService.removeVirtualNode(node2, 3)
//獲取對應節點地址
serviceNode := virtualNodeService.getVirtualNodel("get_cc" + strconv.Itoa(i))
fmt.Println(serviceNode.Ip + ":" + serviceNode.Port)
}
}
//打印節點列表
func PrintRouteList() {
for key, val := range virtualNodeService.VirtualNodes {
fmt.Println("key=" + strconv.Itoa(int(key)) + ", host=" + val.Ip + ":" + val.Port)
}
}
複製代碼
virtualNodeService 虛擬化節點
package main
import (
"crypto/md5"
"encoding/hex"
"sort"
"strconv"
"sync"
)
var virtualNodeService = NewVirtualNode()
type NodeType []uint32
//Len()
func (s NodeType) Len() int {
return len(s)
}
//Less():成績將有低到高排序
func (s NodeType) Less(i, j int) bool {
return s[i] < s[j]
}
//Swap()
func (s NodeType) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
//虛擬節點結構定義
type VirtualNode struct {
VirtualNodes map[uint32]*ServiceNode
NodeKeys NodeType
sync.RWMutex
}
//實例化虛擬節點對象
func NewVirtualNode() *VirtualNode {
return &VirtualNode{
VirtualNodes: map[uint32]*ServiceNode{},
}
}
//添加虛擬節點
func (v *VirtualNode) addVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
//併發讀寫map-加鎖
v.Lock()
defer v.Unlock()
for i := uint(0); i < virtualNum; i++ {
hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
v.VirtualNodes[v.getHashCode(hashStr)] = serviceNode
}
//虛擬節點hash值排序
v.sortHash()
}
//移除虛擬節點
func (v *VirtualNode) removeVirtualNode(serviceNode *ServiceNode, virtualNum uint) {
//併發讀寫map-加鎖
v.Lock()
defer v.Unlock()
for i := uint(0); i < virtualNum; i++ {
hashStr := serviceNode.Ip + ":" + serviceNode.Port + ":" + strconv.Itoa(int(i))
delete(v.VirtualNodes, v.getHashCode(hashStr))
}
v.sortHash()
}
//hash數值排序
func (v *VirtualNode) sortHash() {
v.NodeKeys = nil
for k := range v.VirtualNodes {
v.NodeKeys = append(v.NodeKeys, k)
}
sort.Sort(v.NodeKeys)
}
//獲取虛擬節點(二分查找)
func (v *VirtualNode) getVirtualNodel(routeKey string) *ServiceNode {
//併發讀寫map-加讀鎖,可併發讀不可同時寫
v.RLock()
defer v.RUnlock()
index := 0
hashCode := v.getHashCode(routeKey)
i := sort.Search(len(v.NodeKeys), func(i int) bool { return v.NodeKeys[i] > hashCode })
//當i大於下標最大值時,證實沒找到, 給到第0個虛擬節點, 當i小於node節點數時, index爲當前節點
if i < len(v.NodeKeys) {
index = i
} else {
index = 0
}
//返回具體節點
return v.VirtualNodes[v.NodeKeys[index]]
}
//獲取hash code(採用md5字符串後計算)
func (v *VirtualNode) getHashCode(nodeHash string) uint32 {
//crc32方式hash code
//return crc32.ChecksumIEEE([]byte(nodeHash))
md5 := md5.New()
md5.Write([]byte(nodeHash))
md5Str := hex.EncodeToString(md5.Sum(nil))
h := 0
byteHash := []byte(md5Str)
for i := 0; i < 32; i++ {
h <<= 8
h |= int(byteHash[i]) & 0xFF
}
return uint32(h)
}
複製代碼