go語言的redis客戶端

redis3.0以後提供了新的HA的解決方案,即Cluster模式,由多個節點組成的集羣模式。集羣master之間基於crc16算法,對key進行校驗,獲得的值對16384取餘,就是key的hash slot(槽)值,每一個節點各自存儲一部分的hash槽值,主從節點之間基於異步複製方式同步數據。node

基於redis集羣的基本原理,gedis須要提供一下方面的能力:git

一、統一的客戶端Cluster;github

二、集羣鏈接池的實現;redis

三、集羣節點的健康檢查(後續實現);算法

四、負載均衡機制實現;負載均衡

五、協議的封裝保證對上層透明。dom

模型基本設計以下:異步

 

基礎模型定義測試

/**
* 節點
* master:主節點ip+port
* slaves:從節點ip+port集合
*/
type Node struct {
Url string
Pwd string
InitActive int
}

type ClusterConfig struct {
Nodes []*Node
HeartBeatInterval int
}

/**
* 集羣客戶端
* heartBeatInterval 心跳檢測時間間隔,單位s
* clusterPool key:鏈接串 value:鏈接池
*/
type Cluster struct {
config *ClusterConfig
clusterPool map[string]*ConnPool
}
 Cluster初始化url

/**
* 初始化Cluster client
*/
func NewCluster(clusterConfig ClusterConfig) *Cluster {
nodes := clusterConfig.Nodes

var cluster Cluster
clusterPool := make(map[string]*ConnPool)

for _, node := range nodes {
var config = ConnConfig{node.Url, node.Pwd}
pool, _ := NewConnPool(node.InitActive, config)
clusterPool[node.Url] = pool
}
cluster.config = &clusterConfig
cluster.clusterPool = clusterPool
//初始化節點健康檢測線程
defer func() {
go cluster.heartBeat()
}()
if m==nil {
m = new(sync.RWMutex)
}
return &cluster
}
節點心跳檢測

cluster建立後,開啓異步線程定時輪詢各個節點,向節點發出ping請求,若未響應pong,則表示當前節點異常,而後將當前節點退出鏈接池,並將該節點加入失敗隊列,定時輪詢隊列,檢測是否恢復鏈接,若恢復,則從新建立鏈接池,從失敗隊列中退出當前節點。

/**
* 鏈接池心跳檢測,定時ping各個節點,ping失敗的,從鏈接池退出,並將節點加入失敗隊列
* 定時輪詢失敗節點隊列,檢測節點是否已恢復鏈接,若恢復,則從新建立鏈接池,並從失敗隊列中移除
*/
func (cluster *Cluster) heartBeat() {
clusterPool := cluster.GetClusterPool()
interval := cluster.config.HeartBeatInterval
if interval <= 0 {
interval = defaultHeartBeatInterval
}
var nodes = make(map[string]*Node)

for i := 0; i < len(cluster.GetClusterNodesInfo()); i++ {
node := cluster.GetClusterNodesInfo()[i]
nodes[node.Url] = node
}

var failNodes = make(map[string]*Node)
for {
for url, pool := range clusterPool {
result, err := executePing(pool)
if err != nil {
log.Printf("節點[%s] 健康檢查異常,緣由[%s], 節點將被移除\n", url, err)
//加鎖
m.Lock()
time.Sleep(time.Duration(5)*time.Second)
failNodes[url] = nodes[url]
delete(clusterPool, url)
m.Unlock()
} else {
log.Printf("節點[%s] 健康檢查結果[%s]\n", url, result)
}
}
//恢復檢測
recover(failNodes, clusterPool)

time.Sleep(time.Duration(interval) * time.Second)
}
}

/**
* 檢測fail節點是否已恢復正常
*/
func recover(failNodes map[string]*Node, clusterPool map[string]*ConnPool) {
for url,node:=range failNodes{
conn := Connect(url)
if conn != nil {
//節點重連,恢復鏈接
var config = ConnConfig{url, node.Pwd}
pool, _ := NewConnPool(node.InitActive, config)
//加鎖
m.Lock()
clusterPool[node.Url] = pool
delete(failNodes,url)
m.Unlock()
log.Printf("節點[%s] 已重連\n", url)
}
}
}
 測試結果:

 

 

loadbalance目前僅實現隨機模式,每次訪問前隨機選擇一個節點進行通訊

func (cluster *Cluster) RandomSelect() *ConnPool {
m.RLock()
defer m.RUnlock()
pools := cluster.GetClusterPool()
for _,pool:= range pools{
if pool !=nil{
return pool
}
}
fmt.Errorf("none pool can be used")
return nil
}
通訊模塊的大體流程以下:

一、cluster隨機選擇一個健康的節點,進行訪問;

二、若是節點返回業務數據則通訊結束;

三、若是節點返回的消息協議上知足「-MOVED」,例如 -MOVED 5678 127.0.0.1,則代表當前數據不在該節點;

四、重定向到redis指定的節點訪問;

func (cluster *Cluster) Set(key string, value string) (interface{}, error) {
result, err := executeSet(cluster.RandomSelect(), key, value)
if err.Error() != protocol.MOVED {
return result, err
}

//重定向到新的節點
return executeSet(cluster.SelectOne(result.(string)), key, value)
}

func executeSet(pool *ConnPool, key string, value string) (interface{}, error) {
conn, err := GetConn(pool)
if err != nil {
return nil, fmt.Errorf("get conn fail")
}
defer pool.PutConn(conn)
result := SendCommand(conn, protocol.SET, protocol.SafeEncode(key), protocol.SafeEncode(value))
return handler.HandleReply(result)
}
這樣,對於應用層來說,不管訪問的哪一個節點,都能獲得最終的結果,相對是透明的。

調用測試:

package main

import (
. "client"
"net"
"fmt"
)

func main() {
var node7000 = Node{"127.0.0.1:7000", "123456", 10}
var node7001 = Node{"127.0.0.1:7001", "123456", 10}
var node7002 = Node{"127.0.0.1:7002", "123456", 10}
var node7003 = Node{"127.0.0.1:7003", "123456", 10}
var node7004 = Node{"127.0.0.1:7004", "123456", 10}
var node7005 = Node{"127.0.0.1:7005", "123456", 10}

nodes := []*Node{&node7000, &node7001, &node7002, &node7003, &node7004, &node7005}
var clusterConfig = ClusterConfig{nodes,10}
cluster := NewCluster(clusterConfig)
value,err:=cluster.Get("name")
fmt.Println(value, err)
}
 響應結果:

 

心跳檢查和其餘loadbalance機制後續補充實現。

項目地址:

https://github.com/zhangxiaomin1993/gedis

相關文章
相關標籤/搜索