手把手教你使用Go基於zookeeper編寫服務發現「原創」

zookeeper是一個強一致【不嚴格】的分佈式數據庫,由多個節點共同組成一個分佈式集羣,掛掉任意一個節點,數據庫仍然能夠正常工做,客戶端無感知故障切換。客戶端向任意一個節點寫入數據,其它節點能夠當即看到最新的數據。node

zookeeper的內部是一個key/value存儲引擎,key是以樹狀的形式構成了一個多級的層次結構,每個節點既能夠存儲數據,又能夠做爲一個目錄存放下一級子節點。mysql

zookeeper提供了建立/修改/刪除節點的api,若是父節點沒有建立,字節點會建立失敗。若是父節點還有子節點,父節點不能夠被刪除。git

zookeeper和客戶端之間以socket形式進行雙向通信,客戶端能夠主動調用服務器提供的api,服務器能夠主動向客戶端推送事件。有多種事件能夠watch,好比節點的增刪改,子節點的增刪改,會話狀態變動等。github

zookeeper的事件有傳遞機制,字節點的增刪改觸發的事件會向上層依次傳播,全部的父節點均可以收到字節點的數據變動事件,因此層次太深/子節點太多會給服務器的事件系統帶來壓力,節點分配要作好周密的規劃。sql

zookeeper知足了CAP定理的分區容忍性P和強一致性C,犧牲了高性能A【可用性蘊含性能】。zookeeper的存儲能力是有限的,當節點層次太深/子節點太多/節點數據太大,都會影響數據庫的穩定性。因此zookeeper不是一個用來作高併發高性能的數據庫,zookeeper通常只用來存儲配置信息。數據庫

zookeeper的讀性能隨着節點數量的提高能不斷增長,可是寫性能會隨着節點數量的增長而下降,因此節點的數量不宜太多,通常配置成3個或者5個就能夠了。json

圖中能夠看出當服務器節點增多時,複雜度會隨之提高。由於每一個節點和其它節點之間要進行p2p的鏈接。3個節點能夠容忍掛掉1個節點,5個節點能夠容忍掛掉2個節點。api

客戶端鏈接zookeeper時會選擇任意一個節點保持長連接,後續通訊都是經過這個節點進行讀寫的。若是該節點掛了,客戶端會嘗試去鏈接其它節點。bash

服務器會爲每一個客戶端鏈接維持一個會話對象,會話的ID會保存在客戶端。會話對象也是分佈式的,意味着當一個節點掛掉了,客戶端使用原有的會話ID去鏈接其它節點,服務器維持的會話對象還繼續存在,並不須要從新建立一個新的會話。服務器

若是客戶端主動發送會話關閉消息,服務器的會話對象會當即刪除。若是客戶端不當心奔潰了,沒有發送關閉消息,服務器的會話對象還會繼續存在一段時間。這個時間是會話的過時時間,在建立會話的時候客戶端會提供這個參數,通常是10到30秒。

也許你會問鏈接斷開了,服務器是能夠感知到的,爲何須要客戶端主動發送關閉消息呢?

由於服務器要考慮網絡抖動的狀況,鏈接可能只是臨時斷開了。爲了不這種狀況下反覆建立和銷燬複雜的會話對象以及建立會話後要進行的一系列事件初始化操做,服務器會盡可能延長會話的生存時間。

zookeeper的節點能夠是持久化(Persistent)的,也能夠是臨時(Ephermeral)的。所謂臨時的節點就是會話關閉後,會話期間建立的全部臨時節點會當即消失。通常用於服務發現系統,將服務進程的生命期和zookeeper子節點的生命期綁定在一塊兒,起到了實時監控服務進程的存活的效果。

zookeeper還提供了順序節點。相似於mysql裏面的auto_increment屬性。服務器會在順序節點名稱後自動增長自增的惟一後綴,保持節點名稱的惟一性和順序性。

還有一種節點叫着保護(Protected)節點。這個節點很是特殊,可是也很是經常使用。在應用服務發現的場合時,客戶端建立了一個臨時節點後,服務器節點掛了,鏈接斷開了,而後客戶端去重連到其它的節點。由於會話沒有關閉,以前建立的臨時節點還存在,可是這個時候客戶端卻沒法識別去這個臨時節點是否是本身建立的,由於節點內部並不存儲會話ID字段。因此客戶端會在節點名稱上加上一個GUID前綴,這個前綴會保存在客戶端,這樣它就能夠在重連後識別出哪一個臨時節點是本身以前建立的了。

接下來咱們使用Go語言實現一下服務發現的註冊和發現功能。

如圖所示,咱們要提供api.user這樣的服務,這個服務有3個節點,每一個節點有不同的服務地址,這3個節點各自將本身的服務註冊進zk,而後消費者進行讀取zk獲得api.user的服務地址,任選一個節點地址進行服務調用。爲了簡單化,這裏就沒有提供權重參數了。在一個正式的服務發現裏通常都有權重參數,用於調整服務節點之間的流量分配。

go get github.com/samuel/go-zookeeper/zk
複製代碼

首先咱們定義一個ServiceNode結構,這個結構數據會存儲在節點的data中,表示服務發現的地址信息。

type ServiceNode struct {
	Name string `json:"name"` // 服務名稱,這裏是user
	Host string `json:"host"`
	Port int    `json:"port"`
}
在定義一個服務發現的客戶端結構體SdClient。

type SdClient struct {
	zkServers []string // 多個節點地址
	zkRoot    string // 服務根節點,這裏是/api
	conn      *zk.Conn // zk的客戶端鏈接
}
編寫構造器,建立根節點


func NewClient(zkServers []string, zkRoot string, timeout int) (*SdClient, error) {
	client := new(SdClient)
	client.zkServers = zkServers
	client.zkRoot = zkRoot
	// 鏈接服務器
	conn, _, err := zk.Connect(zkServers, time.Duration(timeout)*time.Second)
	if err != nil {
		return nil, err
	}
	client.conn = conn
	// 建立服務根節點
	if err := client.ensureRoot(); err != nil {
		client.Close()
		return nil, err
	}
	return client, nil}// 關閉鏈接,釋放臨時節點func (s *SdClient) Close() {
	s.conn.Close()
}

func (s *SdClient) ensureRoot() error {
	exists, _, err := s.conn.Exists(s.zkRoot)
	if err != nil {
		return err
	}
	if !exists {
		_, err := s.conn.Create(s.zkRoot, []byte(""), 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			return err
		}
	}
	return nil
}
值得注意的是代碼中的Create調用可能會返回節點已存在錯誤,這是正常現象,由於會存在多進程同時建立節點的可能。若是建立根節點出錯,還須要及時關閉鏈接。咱們不關心節點的權限控制,因此使用zk.WorldACL(zk.PermAll)表示該節點沒有權限限制。Create參數中的flag=0表示這是一個持久化的普通節點。

接下來咱們編寫服務註冊方法

func (s *SdClient) Register(node *ServiceNode) error {
	if err := s.ensureName(node.Name); err != nil {
		return err
	}
	path := s.zkRoot + "/" + node.Name + "/n"
	data, err := json.Marshal(node)
	if err != nil {
		return err
	}
	_, err = s.conn.CreateProtectedEphemeralSequential(path, data, zk.WorldACL(zk.PermAll))
	if err != nil {
		return err
	}
	return nil}func (s *SdClient) ensureName(name string) error {
	path := s.zkRoot + "/" + name
	exists, _, err := s.conn.Exists(path)
	if err != nil {
		return err
	}
	if !exists {
		_, err := s.conn.Create(path, []byte(""), 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			return err
		}
	}
	return nil
}
先要建立/api/user節點做爲服務列表的父節點。而後建立一個保護順序臨時(ProtectedEphemeralSequential)子節點,同時將地址信息存儲在節點中。什麼叫保護順序臨時節點,首先它是一個臨時節點,會話關閉後節點自動消失。其它它是個順序節點,zookeeper自動在名稱後面增長自增後綴,確保節點名稱的惟一性。同時仍是個保護性節點,節點前綴增長了GUID字段,確保斷開重連後臨時節點能夠和客戶端狀態對接上。

接下來咱們實現消費者獲取服務列表方法

func (s *SdClient) GetNodes(name string) ([]*ServiceNode, error) {
	path := s.zkRoot + "/" + name
	// 獲取字節點名稱
	childs, _, err := s.conn.Children(path)
	if err != nil {
		if err == zk.ErrNoNode {
			return []*ServiceNode{}, nil
		}
		return nil, err
	}
	nodes := []*ServiceNode{}
	for _, child := range childs {
		fullPath := path + "/" + child
		data, _, err := s.conn.Get(fullPath)
		if err != nil {
			if err == zk.ErrNoNode {
				continue
			}
			return nil, err
		}
		node := new(ServiceNode)
		err = json.Unmarshal(data, node)
		if err != nil {
			return nil, err
		}
		nodes = append(nodes, node)
	}
	return nodes, nil
}
複製代碼

獲取服務節點列表時,咱們先獲取字節點的名稱列表,而後依次讀取內容拿到服務地址。由於獲取字節點名稱和獲取字節點內容不是一個原子操做,因此在調用Get獲取內容時可能會出現節點不存在錯誤,這是正常現象。

將以上代碼湊在一塊兒,一個簡單的服務發現包裝就實現了。

最後咱們看看若是使用以上代碼,爲了方便起見,咱們將多個服務提供者和消費者寫在一個main方法裏。

func main() {
        // 服務器地址列表
	servers := []string{"192.168.0.101:2118", "192.168.0.102:2118", "192.168.0.103:2118"}
	client, err := NewClient(servers, "/api", 10)
	if err != nil {
		panic(err)
	}
	defer client.Close()
	node1 := &ServiceNode{"user", "127.0.0.1", 4000}
	node2 := &ServiceNode{"user", "127.0.0.1", 4001}
	node3 := &ServiceNode{"user", "127.0.0.1", 4002}
	if err := client.Register(node1); err != nil {
		panic(err)
	}
	if err := client.Register(node2); err != nil {
		panic(err)
	}
	if err := client.Register(node3); err != nil {
		panic(err)
	}
	nodes, err := client.GetNodes("user")
	if err != nil {
		panic(err)
	}
	for _, node := range nodes {
		fmt.Println(node.Host, node.Port)
	}
}
複製代碼

值得注意的是使用時必定要在進程退出前調用Close方法,不然zookeeper的會話不會當即關閉,服務器建立的臨時節點也就不會當即消失,而是要等到timeout以後服務器纔會清理。

閱讀相關文章,關注公衆號【碼洞】

相關文章
相關標籤/搜索