用go和zk實現一個簡單的分佈式server

golang的zk客戶端

最近打算寫個簡單的配置中心,考慮到實現便捷性,語言選擇了go,因爲其中計劃用到zk,就調研了下golang的zk客戶端,並實現了個簡單的分佈式server。最終找到了兩個,地址以下:node

  • gozk:https://wiki.ubuntu.com/gozk
  • go-zookeeper:https://github.com/samuel/go-zookeeper

因爲gozk的文檔不如後者,且代碼沒在gihub上,因此就直接選擇了後者。go-zookeeper文檔仍是比較全面的:文檔git

基本操做測試

這裏默認你們已經瞭解zk的用處和基本用法了,若是還不瞭解能夠參看:官方文檔中文文檔
下邊咱們先來寫個簡單的例子來測試下基本的操做:github

package main

/**
客戶端doc地址:github.com/samuel/go-zookeeper/zk
**/
import (
    "fmt"
    zk "github.com/samuel/go-zookeeper/zk"
    "time"
)

/**
 * 獲取一個zk鏈接
 * @return {[type]}
 */
func getConnect(zkList []string) (conn *zk.Conn) {
    conn, _, err := zk.Connect(zkList, 10*time.Second)
    if err != nil {
        fmt.Println(err)
    }
    return
}

/**
 * 測試鏈接
 * @return
 */
func test1() {
    zkList := []string{"localhost:2183"}
    conn := getConnect(zkList)

    defer conn.Close()
    conn.Create("/go_servers", nil, 0, zk.WorldACL(zk.PermAll))

    time.Sleep(20 * time.Second)
}

/**
 * 測試臨時節點
 * @return {[type]}
 */
func test2() {
    zkList := []string{"localhost:2183"}
    conn := getConnect(zkList)

    defer conn.Close()
    conn.Create("/testadaadsasdsaw", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

    time.Sleep(20 * time.Second)
}

/**
 * 獲取全部節點
 */
func test3() {
    zkList := []string{"localhost:2183"}
    conn := getConnect(zkList)

    defer conn.Close()

    children, _, err := conn.Children("/go_servers")
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("%v \n", children)
}

func main() {
    test3()
}

上邊的代碼裏邊咱們測試了golang對zk的鏈接、建立節點、或取節點操做,在下邊的分佈式server中將用到這幾個操做。golang

簡單的分佈式server

目前分佈式系統已經很流行了,一些開源框架也被普遍應用,如dubbo、Motan等。對於一個分佈式服務,最基本的一項功能就是服務的註冊和發現,而利用zk的EPHEMERAL節點則能夠很方便的實現該功能。EPHEMERAL節點正如其名,是臨時性的,其生命週期是和客戶端會話綁定的,當會話鏈接斷開時,節點也會被刪除。下邊咱們就來實現一個簡單的分佈式server:
server:apache

  1. 服務啓動時,建立zk鏈接,並在go_servers節點下建立一個新節點,節點名爲"ip:port",完成服務註冊
  2. 服務結束時,因爲鏈接斷開,建立的節點會被刪除,這樣client就不會連到該節點

client:ubuntu

  1. 先從zk獲取go_servers節點下全部子節點,這樣就拿到了全部註冊的server
  2. 從server列表中選中一個節點(這裏只是隨機選取,實際服務通常會提供多種策略),建立鏈接進行通訊

這裏爲了演示,咱們每次client鏈接server,獲取server發送的時間後就斷開。主要代碼以下:
zkutil:用來處理zk操做框架

func GetConnect() (conn *zk.Conn, err error) {
    conn, _, err = zk.Connect(hosts, timeOut*time.Second)
    if err != nil {
        fmt.Println(err)
    }
    return
}

func RegistServer(conn *zk.Conn, host string) (err error) {
    _, err = conn.Create("/go_servers/"+host, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
    return
}

func GetServerList(conn *zk.Conn) (list []string, err error) {
    list, _, err = conn.Children("/go_servers")
    return
}

server: server端操做tcp

func main() {
    go starServer("127.0.0.1:8897")
    go starServer("127.0.0.1:8898")
    go starServer("127.0.0.1:8899")

    a := make(chan bool, 1)
    <-a
}

func starServer(port string) {
    tcpAddr, err := net.ResolveTCPAddr("tcp4", port)
    fmt.Println(tcpAddr)
    checkError(err)

    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    //註冊zk節點q
    conn, err := example.GetConnect()
    if err != nil {
        fmt.Printf(" connect zk error: %s ", err)
    }
    defer conn.Close()
    err = example.RegistServer(conn, port)
    if err != nil {
        fmt.Printf(" regist node error: %s ", err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            fmt.Fprintf(os.Stderr, "Error: %s", err)
            continue
        }
        go handleCient(conn, port)
    }

    fmt.Println("aaaaaa")
}

func handleCient(conn net.Conn, port string) {
    defer conn.Close()

    daytime := time.Now().String()
    conn.Write([]byte(port + ": " + daytime))
}

client: 客戶端操做分佈式

func main() {
    for i := 0; i < 100; i++ {
        startClient()

        time.Sleep(1 * time.Second)
    }
}

func startClient() {
    // service := "127.0.0.1:8899"
    //獲取地址
    serverHost, err := getServerHost()
    if err != nil {
        fmt.Printf("get server host fail: %s \n", err)
        return
    }

    fmt.Println("connect host: " + serverHost)
    tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
    checkError(err)
    conn, err := net.DialTCP("tcp", nil, tcpAddr)
    checkError(err)
    defer conn.Close()

    _, err = conn.Write([]byte("timestamp"))
    checkError(err)

    result, err := ioutil.ReadAll(conn)
    checkError(err)
    fmt.Println(string(result))

    return
}

func getServerHost() (host string, err error) {
    conn, err := example.GetConnect()
    if err != nil {
        fmt.Printf(" connect zk error: %s \n ", err)
        return
    }
    defer conn.Close()
    serverList, err := example.GetServerList(conn)
    if err != nil {
        fmt.Printf(" get server list error: %s \n", err)
        return
    }

    count := len(serverList)
    if count == 0 {
        err = errors.New("server list is empty \n")
        return
    }

    //隨機選中一個返回
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    host = serverList[r.Intn(3)]
    return
}

咱們先啓動server,能夠看到有三個節點註冊到zk:

再啓動client,能夠看到每次client都會隨機鏈接到一個節點進行通訊:

至此,咱們的分佈式server就實現了,夠簡單吧。固然,實際項目中這樣的實現是遠遠不夠的,有興趣的能夠研究下一些開源的框架。
相關代碼:github測試

相關文章
相關標籤/搜索