原文地址-石匠的Blog: http://www.bugclosed.com/post/5html
在分佈式架構中,服務治理是一個重要的問題。在沒有服務治理的分佈式集羣中,各個服務之間經過手工或者配置的方式進行服務關係管理,遇到服務關係變化或者增長服務的時候,人肉配置極其麻煩且容易出錯。java
以前在一個C/C++項目中,採用ZooKeeper進行服務治理,能夠很好的維護服務之間的關係,可是使用起來較爲麻煩。如今愈來愈多新的項目採用consul進行服務治理,各方面的評價都優於ZooKeeper,通過幾天的研究,這裏作一個總結。node
linux系統中,下載consul可執行程序後直接拷貝到/usr/local/bin就能夠使用了,無需其餘額外配置。linux
服務節點啓動方式:git
consul agent -server -bootstrap-expect 1 -data-dir /tmp/consul -node=service-center -bind=192.168.0.2 -client 0.0.0.0 -ui -config-dir /etc/consul.d/
參數說明:github
爲了測試consul服務治理方式,設定以下場景:golang
一個manager類型的服務,須要根據負載來管理若干worker類型的服務並進行業務通訊;而worker服務也須要知道manager提供的內部服務接口地址作業務交互。即manger和worker都須要互相知道對方的通訊地址。
作以下規則設定準備:json
package main import ( "encoding/json" "flag" "fmt" "github.com/hashicorp/consul/api" "log" "math/rand" "net/http" "os" "os/signal" "strconv" "strings" "sync" "time" ) type ServiceInfo struct { ServiceID string IP string Port int Load int Timestamp int //load updated ts } type ServiceList []ServiceInfo type KVData struct { Load int `json:"load"` Timestamp int `json:"ts"` } var ( servics_map = make(map[string]ServiceList) service_locker = new(sync.Mutex) consul_client *api.Client my_service_id string my_service_name string my_kv_key string ) func CheckErr(err error) { if err != nil { log.Printf("error: %v", err) os.Exit(1) } } func StatusHandler(w http.ResponseWriter, r *http.Request) { fmt.Println("check status.") fmt.Fprint(w, "status ok!") } func StartService(addr string) { http.HandleFunc("/status", StatusHandler) fmt.Println("start listen...") err := http.ListenAndServe(addr, nil) CheckErr(err) } func main() { var status_monitor_addr, service_name, service_ip, consul_addr, found_service string var service_port int flag.StringVar(&consul_addr, "consul_addr", "localhost:8500", "host:port of the service stuats monitor interface") flag.StringVar(&status_monitor_addr, "monitor_addr", "127.0.0.1:54321", "host:port of the service stuats monitor interface") flag.StringVar(&service_name, "service_name", "worker", "name of the service") flag.StringVar(&service_ip, "ip", "127.0.0.1", "service serve ip") flag.StringVar(&found_service, "found_service", "worker", "found the target service") flag.IntVar(&service_port, "port", 4300, "service serve port") flag.Parse() my_service_name = service_name DoRegistService(consul_addr, status_monitor_addr, service_name, service_ip, service_port) go DoDiscover(consul_addr, found_service) go StartService(status_monitor_addr) go WaitToUnRegistService() go DoUpdateKeyValue(consul_addr, service_name, service_ip, service_port) select {} } func DoRegistService(consul_addr string, monitor_addr string, service_name string, ip string, port int) { my_service_id = service_name + "-" + ip var tags []string service := &api.AgentServiceRegistration{ ID: my_service_id, Name: service_name, Port: port, Address: ip, Tags: tags, Check: &api.AgentServiceCheck{ HTTP: "http://" + monitor_addr + "/status", Interval: "5s", Timeout: "1s", }, } client, err := api.NewClient(api.DefaultConfig()) if err != nil { log.Fatal(err) } consul_client = client if err := consul_client.Agent().ServiceRegister(service); err != nil { log.Fatal(err) } log.Printf("Registered service %q in consul with tags %q", service_name, strings.Join(tags, ",")) } func WaitToUnRegistService() { quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, os.Kill) <-quit if consul_client == nil { return } if err := consul_client.Agent().ServiceDeregister(my_service_id); err != nil { log.Fatal(err) } } func DoDiscover(consul_addr string, found_service string) { t := time.NewTicker(time.Second * 5) for { select { case <-t.C: DiscoverServices(consul_addr, true, found_service) } } } func DiscoverServices(addr string, healthyOnly bool, service_name string) { consulConf := api.DefaultConfig() consulConf.Address = addr client, err := api.NewClient(consulConf) CheckErr(err) services, _, err := client.Catalog().Services(&api.QueryOptions{}) CheckErr(err) fmt.Println("--do discover ---:", addr) var sers ServiceList for name := range services { servicesData, _, err := client.Health().Service(name, "", healthyOnly, &api.QueryOptions{}) CheckErr(err) for _, entry := range servicesData { if service_name != entry.Service.Service { continue } for _, health := range entry.Checks { if health.ServiceName != service_name { continue } fmt.Println(" health nodeid:", health.Node, " service_name:", health.ServiceName, " service_id:", health.ServiceID, " status:", health.Status, " ip:", entry.Service.Address, " port:", entry.Service.Port) var node ServiceInfo node.IP = entry.Service.Address node.Port = entry.Service.Port node.ServiceID = health.ServiceID //get data from kv store s := GetKeyValue(service_name, node.IP, node.Port) if len(s) > 0 { var data KVData err = json.Unmarshal([]byte(s), &data) if err == nil { node.Load = data.Load node.Timestamp = data.Timestamp } } fmt.Println("service node updated ip:", node.IP, " port:", node.Port, " serviceid:", node.ServiceID, " load:", node.Load, " ts:", node.Timestamp) sers = append(sers, node) } } } service_locker.Lock() servics_map[service_name] = sers service_locker.Unlock() } func DoUpdateKeyValue(consul_addr string, service_name string, ip string, port int) { t := time.NewTicker(time.Second * 10) for { select { case <-t.C: StoreKeyValue(consul_addr, service_name, ip, port) } } } func StoreKeyValue(consul_addr string, service_name string, ip string, port int) { my_kv_key = my_service_name + "/" + ip + ":" + strconv.Itoa(port) var data KVData data.Load = rand.Intn(100) data.Timestamp = int(time.Now().Unix()) bys, _ := json.Marshal(&data) kv := &api.KVPair{ Key: my_kv_key, Flags: 0, Value: bys, } _, err := consul_client.KV().Put(kv, nil) CheckErr(err) fmt.Println(" store data key:", kv.Key, " value:", string(bys)) } func GetKeyValue(service_name string, ip string, port int) string { key := service_name + "/" + ip + ":" + strconv.Itoa(port) kv, _, err := consul_client.KV().Get(key, nil) if kv == nil { return "" } CheckErr(err) return string(kv.Value) }
程序經過參數控制本身啓動的服務角色類型和須要發現的服務類型。傳入的consul_addr是本機consul client agent的地址,通常是loacalhost:8500 。 因爲consul集成了服務健康檢查,因此服務須要啓動一個檢查接口,這裏啓動一個http服務來作響應。bootstrap
啓動3個consul server :api
consul agent -server -bootstrap-expect 3 -data-dir /tmp/consul -node=server001 -bind=10.2.1.54 consul agent -server -data-dir /tmp/consul -node=server002 -bind=10.2.1.83 -join 10.2.1.54 consul agent -server -data-dir /tmp/consul -node=server003 -bind=10.2.1.80 -join 10.2.1.54
server001-003構成了一個3個server node的consul集羣。先啓動server001,並指定須要3個server node構成集羣,server002和server003啓動的時候指定加入(-join)server001.
啓動一個manger:
consul agent -data-dir /tmp/consul -node=mangaer -bind=10.2.1.92 -join 10.2.1.54 ./service -consul_addr=127.0.0.1:8500 -monitor_addr=127.0.0.1:54321 -service_name=manager -ip=10.2.1.92 -port=4300 -found_service=worker
啓動2個worker:
consul agent -data-dir /tmp/consul -node=worker001 -bind=10.2.1.93 -join 10.2.1.54 ./service -consul_addr=127.0.0.1:8500 -monitor_addr=127.0.0.1:54321 -service_name=worker -ip=10.2.1.93 -port=4300 -found_service=manager consul agent -data-dir /tmp/consul -node=worker002 -bind=10.2.1.94 -join 10.2.1.54 ./service -consul_addr=127.0.0.1:8500 -monitor_addr=127.0.0.1:54321 -service_name=worker -ip=10.2.1.94 -port=4300 -found_service=manager
service程序是前面部分代碼編譯後的測試程序。
這樣就構建了3個server node的consul集羣,以及1各manager和2個worker的分佈式服務程序,他們能夠互相發現對方,而且manager能夠獲取到worker的負載狀況,實現了互通。
經過使用consul的服務註冊發現機制和key-value存儲機制,實現了服務發現以及manager獲取worker服務負載數據的機制。因爲consul的發現機制不能進行更多的數據交互,因此只能使用key-value機制配合進行數據共享(zookeeper中數據能夠存儲在節點上)。若是業務有進一步需求,能夠方便的擴展存儲的數據結構來實現。
以上的測試程序既有服務註冊,存儲數據更新,也有服務發現和數據獲取,可是代碼量比zookeeper機制少不少,由於zookeeper須要本身創建和維護目錄樹,註冊和處理zookeeper event事件,監控zookeeper的連接並處理重連和信息重建等健康管理工做。
總的來講,consul比zookeeper使用簡單易用不少。能夠在新項目中嘗試使用,特別是golang項目,技術棧也比較統一。