基於consul構建golang系統分佈式服務發現機制

原文地址-石匠的Blog: http://www.bugclosed.com/post/5html


在分佈式架構中,服務治理是一個重要的問題。在沒有服務治理的分佈式集羣中,各個服務之間經過手工或者配置的方式進行服務關係管理,遇到服務關係變化或者增長服務的時候,人肉配置極其麻煩且容易出錯。java

以前在一個C/C++項目中,採用ZooKeeper進行服務治理,能夠很好的維護服務之間的關係,可是使用起來較爲麻煩。如今愈來愈多新的項目採用consul進行服務治理,各方面的評價都優於ZooKeeper,通過幾天的研究,這裏作一個總結。node

zookeeper和consul比較

  • 開發語言方面,zookeeper採用java開發,安裝的時候須要部署java環境;consul採用golang開發,全部依賴都編譯到了可執行程序中,即插即用。
  • 部署方面,zookeeper通常部署奇數個節點方便作簡單多數的選舉機制。consul部署的時候分server節點和client節點(經過不一樣的啓動參數區分),server節點作leader選舉和數據一致性維護,client節點部署在服務機器上,做爲服務程序訪問consul的接口。
  • 一致性協議方面,zookeeper使用自定義的zab協議,consul的一致性協議採用更流行的Raft。
  • zookeeper不支持多數據中心,consul能夠跨機房支持多數據中心部署,有效避免了單數據中心故障不能訪問的狀況。
  • 連接方式上,zookeeper client api和服務器保持長鏈接,須要服務程序自行管理和維護連接有效性,服務程序註冊回調函數處理zookeeper事件,並本身維護在zookeeper上創建的目錄結構有效性(如臨時節點維護);consul 採用DNS或者http獲取服務信息,沒有主動通知,須要本身輪訓獲取。
  • 工具方面,zookeeper自帶一個cli_mt工具,能夠經過命令行登陸zookeeper服務器,手動管理目錄結構。consul自帶一個Web UI管理系統, 能夠經過參數啓動並在瀏覽器中直接查看信息。

consul相關資源

linux系統中,下載consul可執行程序後直接拷貝到/usr/local/bin就能夠使用了,無需其餘額外配置。linux

服務節點啓動方式:git

consul agent -server -bootstrap-expect 1 -data-dir /tmp/consul -node=service-center -bind=192.168.0.2 -client 0.0.0.0 -ui -config-dir /etc/consul.d/

參數說明:github

  • -server 表示以server節點模式啓動consul
  • -bootstrap-expect 1 表示期待的server節點一共有幾個,如3個server集羣模式
  • -data-dir consul存儲數據的目錄
  • -node 節點的名字
  • -bind 綁定的服務ip
  • -client 0.0.0.0 -ui 啓動Web UI管理工具
  • -config-dir 指定服務配置文件的目錄(這個目錄下的全部.json文件,做爲服務配置文件讀取)

consul服務發現機制測試

爲了測試consul服務治理方式,設定以下場景:golang

一個manager類型的服務,須要根據負載來管理若干worker類型的服務並進行業務通訊;而worker服務也須要知道manager提供的內部服務接口地址作業務交互。即manger和worker都須要互相知道對方的通訊地址。

作以下規則設定準備:json

  • manager和worker都須要向consul註冊本身的服務,讓對方發現本身的服務地址(ip和端口)
  • 採用consul的key-value存儲機制,worker週期性更新本身的負載信息到相應的key;manger從worker的key中獲取負載信息,並同步更新到本地。
  • 服務類型規則: manager的服務類型用字符串"manager"表示,各個worker的服務類型採用字符串"worker"表示。
  • 服務註冊ID規則: 服務類型-服務IP,如 manager-192.168.0.2
  • key的構建規則: 服務類型/IP:Port, 如 worker/192.168.0.2:5400
  • 存儲的數據採用json格式:{"load":100,"ts":1482828232}

golang測試程序

package main

import (
    "encoding/json"
    "flag"
    "fmt"
    "github.com/hashicorp/consul/api"
    "log"
    "math/rand"
    "net/http"
    "os"
    "os/signal"
    "strconv"
    "strings"
    "sync"
    "time"
)

type ServiceInfo struct {
    ServiceID string
    IP        string
    Port      int
    Load      int
    Timestamp int //load updated ts
}
type ServiceList []ServiceInfo

type KVData struct {
    Load      int `json:"load"`
    Timestamp int `json:"ts"`
}

var (
    servics_map     = make(map[string]ServiceList)
    service_locker  = new(sync.Mutex)
    consul_client   *api.Client
    my_service_id   string
    my_service_name string
    my_kv_key       string
)

func CheckErr(err error) {
    if err != nil {
        log.Printf("error: %v", err)
        os.Exit(1)
    }
}
func StatusHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Println("check status.")
    fmt.Fprint(w, "status ok!")
}

func StartService(addr string) {
    http.HandleFunc("/status", StatusHandler)
    fmt.Println("start listen...")
    err := http.ListenAndServe(addr, nil)
    CheckErr(err)
}

func main() {
    var status_monitor_addr, service_name, service_ip, consul_addr, found_service string
    var service_port int
    flag.StringVar(&consul_addr, "consul_addr", "localhost:8500", "host:port of the service stuats monitor interface")
    flag.StringVar(&status_monitor_addr, "monitor_addr", "127.0.0.1:54321", "host:port of the service stuats monitor interface")
    flag.StringVar(&service_name, "service_name", "worker", "name of the service")
    flag.StringVar(&service_ip, "ip", "127.0.0.1", "service serve ip")
    flag.StringVar(&found_service, "found_service", "worker", "found the target service")
    flag.IntVar(&service_port, "port", 4300, "service serve port")
    flag.Parse()

    my_service_name = service_name

    DoRegistService(consul_addr, status_monitor_addr, service_name, service_ip, service_port)

    go DoDiscover(consul_addr, found_service)

    go StartService(status_monitor_addr)

    go WaitToUnRegistService()

    go DoUpdateKeyValue(consul_addr, service_name, service_ip, service_port)

    select {}
}

func DoRegistService(consul_addr string, monitor_addr string, service_name string, ip string, port int) {
    my_service_id = service_name + "-" + ip
    var tags []string
    service := &api.AgentServiceRegistration{
        ID:      my_service_id,
        Name:    service_name,
        Port:    port,
        Address: ip,
        Tags:    tags,
        Check: &api.AgentServiceCheck{
            HTTP:     "http://" + monitor_addr + "/status",
            Interval: "5s",
            Timeout:  "1s",
        },
    }

    client, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatal(err)
    }
    consul_client = client
    if err := consul_client.Agent().ServiceRegister(service); err != nil {
        log.Fatal(err)
    }
    log.Printf("Registered service %q in consul with tags %q", service_name, strings.Join(tags, ","))
}

func WaitToUnRegistService() {
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, os.Kill)
    <-quit

    if consul_client == nil {
        return
    }
    if err := consul_client.Agent().ServiceDeregister(my_service_id); err != nil {
        log.Fatal(err)
    }
}

func DoDiscover(consul_addr string, found_service string) {
    t := time.NewTicker(time.Second * 5)
    for {
        select {
        case <-t.C:
            DiscoverServices(consul_addr, true, found_service)
        }
    }
}

func DiscoverServices(addr string, healthyOnly bool, service_name string) {
    consulConf := api.DefaultConfig()
    consulConf.Address = addr
    client, err := api.NewClient(consulConf)
    CheckErr(err)

    services, _, err := client.Catalog().Services(&api.QueryOptions{})
    CheckErr(err)

    fmt.Println("--do discover ---:", addr)

    var sers ServiceList
    for name := range services {
        servicesData, _, err := client.Health().Service(name, "", healthyOnly,
            &api.QueryOptions{})
        CheckErr(err)
        for _, entry := range servicesData {
            if service_name != entry.Service.Service {
                continue
            }
            for _, health := range entry.Checks {
                if health.ServiceName != service_name {
                    continue
                }
                fmt.Println("  health nodeid:", health.Node, " service_name:", health.ServiceName, " service_id:", health.ServiceID, " status:", health.Status, " ip:", entry.Service.Address, " port:", entry.Service.Port)

                var node ServiceInfo
                node.IP = entry.Service.Address
                node.Port = entry.Service.Port
                node.ServiceID = health.ServiceID

                //get data from kv store
                s := GetKeyValue(service_name, node.IP, node.Port)
                if len(s) > 0 {
                    var data KVData
                    err = json.Unmarshal([]byte(s), &data)
                    if err == nil {
                        node.Load = data.Load
                        node.Timestamp = data.Timestamp
                    }
                }
                fmt.Println("service node updated ip:", node.IP, " port:", node.Port, " serviceid:", node.ServiceID, " load:", node.Load, " ts:", node.Timestamp)
                sers = append(sers, node)
            }
        }
    }

    service_locker.Lock()
    servics_map[service_name] = sers
    service_locker.Unlock()
}

func DoUpdateKeyValue(consul_addr string, service_name string, ip string, port int) {
    t := time.NewTicker(time.Second * 10)
    for {
        select {
        case <-t.C:
            StoreKeyValue(consul_addr, service_name, ip, port)
        }
    }
}

func StoreKeyValue(consul_addr string, service_name string, ip string, port int) {

    my_kv_key = my_service_name + "/" + ip + ":" + strconv.Itoa(port)

    var data KVData
    data.Load = rand.Intn(100)
    data.Timestamp = int(time.Now().Unix())
    bys, _ := json.Marshal(&data)

    kv := &api.KVPair{
        Key:   my_kv_key,
        Flags: 0,
        Value: bys,
    }

    _, err := consul_client.KV().Put(kv, nil)
    CheckErr(err)
    fmt.Println(" store data key:", kv.Key, " value:", string(bys))
}

func GetKeyValue(service_name string, ip string, port int) string {
    key := service_name + "/" + ip + ":" + strconv.Itoa(port)

    kv, _, err := consul_client.KV().Get(key, nil)
    if kv == nil {
        return ""
    }
    CheckErr(err)

    return string(kv.Value)
}

程序經過參數控制本身啓動的服務角色類型和須要發現的服務類型。傳入的consul_addr是本機consul client agent的地址,通常是loacalhost:8500 。 因爲consul集成了服務健康檢查,因此服務須要啓動一個檢查接口,這裏啓動一個http服務來作響應。bootstrap

consul集羣啓動

啓動3個consul server :api

consul agent -server -bootstrap-expect 3 -data-dir /tmp/consul -node=server001 -bind=10.2.1.54
consul agent -server  -data-dir /tmp/consul -node=server002 -bind=10.2.1.83 -join 10.2.1.54
consul agent -server  -data-dir /tmp/consul -node=server003 -bind=10.2.1.80 -join 10.2.1.54

server001-003構成了一個3個server node的consul集羣。先啓動server001,並指定須要3個server node構成集羣,server002和server003啓動的時候指定加入(-join)server001.

啓動一個manger:

consul agent -data-dir /tmp/consul -node=mangaer -bind=10.2.1.92  -join 10.2.1.54
./service -consul_addr=127.0.0.1:8500 -monitor_addr=127.0.0.1:54321 -service_name=manager -ip=10.2.1.92 -port=4300 -found_service=worker

啓動2個worker:

consul agent -data-dir /tmp/consul -node=worker001 -bind=10.2.1.93  -join 10.2.1.54
./service -consul_addr=127.0.0.1:8500 -monitor_addr=127.0.0.1:54321 -service_name=worker -ip=10.2.1.93 -port=4300 -found_service=manager
consul agent -data-dir /tmp/consul -node=worker002 -bind=10.2.1.94  -join 10.2.1.54
./service -consul_addr=127.0.0.1:8500 -monitor_addr=127.0.0.1:54321 -service_name=worker -ip=10.2.1.94 -port=4300 -found_service=manager

service程序是前面部分代碼編譯後的測試程序。

這樣就構建了3個server node的consul集羣,以及1各manager和2個worker的分佈式服務程序,他們能夠互相發現對方,而且manager能夠獲取到worker的負載狀況,實現了互通。

結束

經過使用consul的服務註冊發現機制和key-value存儲機制,實現了服務發現以及manager獲取worker服務負載數據的機制。因爲consul的發現機制不能進行更多的數據交互,因此只能使用key-value機制配合進行數據共享(zookeeper中數據能夠存儲在節點上)。若是業務有進一步需求,能夠方便的擴展存儲的數據結構來實現。

以上的測試程序既有服務註冊,存儲數據更新,也有服務發現和數據獲取,可是代碼量比zookeeper機制少不少,由於zookeeper須要本身創建和維護目錄樹,註冊和處理zookeeper event事件,監控zookeeper的連接並處理重連和信息重建等健康管理工做。

總的來講,consul比zookeeper使用簡單易用不少。能夠在新項目中嘗試使用,特別是golang項目,技術棧也比較統一。

相關文章
相關標籤/搜索