戰旗直播基於consul服務註冊與發現的GRPC服務實戰與感想

前言

鄙人關注consul也有一段時間了,從2017年開始瞭解它的一些特性,它能幫助解決哪些問題,而後怎麼應用到微服務中去。隨着時間的推移,微服務的發展也是很是的迅速,能夠說突飛猛進,天天都在變化。consul工具所提供的功能也在不斷地新增和完善。OK,有些扯遠了,我們仍是回到主題上來吧。
在微服務領域有個重要的概念——服務註冊與發現。google或baidu一下,會發現有大量的關於服務註冊發現的文章、博客等,有基於consul的、也有基於etcd和zoomkeeper的,每一個工具都有本身的特色和優點,也有必定的類似性,好比它們均可以實現服務註冊與發現,也均可以實現kv存儲等。可是它們也有必定的區別,好比consul重點是服務註冊與發現,其次是kv存儲;像etcd重點是處理kv存儲的功能,在kv存儲上來實現服務的註冊與發現。側重點不同,感興趣的朋友能夠去google或baidu一下,這裏就不在敘述了。因爲側重點不一樣,依據戰旗直播業務的實際狀況,選擇了consul來實現戰旗後端的服務註冊與發現。html

consul能解決的問題

  1. 服務註冊(註銷)與發現
  2. 節點/服務監控
  3. KV存儲——業務服務配置統一管理
  4. consul-template
  5. ACL
  6. DNS
  7. 其餘

consul集羣部署

依據consul的文檔,consul集羣中須要部署兩種類型的節點:server節點和client節點。server節點推薦部署奇數個節點,有利於leader的選舉過程快速的結束(偶數個節點可能須要多個選舉過程才能選舉出leader)。這裏有幾個概念:集羣,leader選舉,對這些原理感興趣同窗能夠去google下.
戰旗部署了5個server節點,其餘節點都是client,也就是說每臺服務上都有一個node。而後,戰旗的服務經過localhost:8500地址向consul集羣註冊本身。部署結構1以下圖所示:
Consul dep.png前端

思考:這樣的部署結構存在必定的缺陷,若是某個節點的consul掛了,會直接影響該節點上的全部應用服務,應爲它們都是經過localhost來跟consul進行通信的。那怎麼預防這樣的狀況呢?
這時備選方案plan B出臺了.node

  1. 經過coreDNS獲取consul集羣情況,來自動配置一個內部DNS路由條目。當訪問consul服務時,自動路由到consul集羣中的某個節點。
  2. 也能夠部署兩個nginx(主備),由nginx路由到consul集羣中的某個節點。
  3. 若是您採用的是k8s環境,那恭喜您,能夠省略不少工做來,直接在k8s中部署一個service來指向consul集羣。

部署結構2以下圖所示:
consul dep2.jpgnginx

小結:爲了提升consul集羣的高可靠性,保證服務的正常運行。在結構1的基礎上,同時又部署了DNS方案。當本節點的consul不可達時,經過DNS來與consul集羣通信。這裏可能有些同窗會問,爲何不直接採用結構2呢?這兩個結構各有千秋,結構1通迅效率高,應用服務的健康檢測都在當前節點完成,檢測壓力小。結構2雖無單節點風險,但存在consul負載不均衡等缺點。git

具體consul怎麼下載,怎麼安裝,怎麼啓動server,怎麼加入集羣就不講了,你們能夠參考官方文檔:https://www.consul.io/docs/index.htmlgithub

consul與grpc實戰

服務註冊與註銷

當應用服務啓動時,將本身註冊到consul中,以便其餘服務能及時發現該服務。後端

1.導入sdk,api

import (
    consul "github.com/hashicorp/consul/api"
)

2.獲取consul client對象restful

// @param uriStr string: consul通迅地址,默認http://localhost:8500; 當localhost不可達時,須要經過服務名來訪問其餘consul節點,如:http://consul:8500
// @param token string: 訪問控制用
func MakeClient(uriStr string, token string) (*consul.Client, error) {

    uri, err := url.Parse(uriStr)
    if err != nil {
        logs.Error("url parse error: ", err)
        return nil, err
    }

    config := consulapi.DefaultConfig()
    config.Address = uriStr

    if len(token) > 0 {
        config.Token = token
    } else {
        config.Token = defaultToken
    }

    client, err := consulapi.NewClient(config)
    if err != nil {
        logs.Error("consul: ", uri.Scheme)
        return nil, err
    }

    return client, nil
}

3.準備工做框架

// @param svrName string: 要註冊當服務名
// @param useType string: 對應consul中的tag, 可用於過濾
// @param svrPort int: 服務對應的端口號
// @param healthPort int: http檢測時須要對應端口號,tcp檢測默認當前端口
// @param healthType string: http或tcp,
// @param localIp string: 當前節點的內網IP,即其餘服務能訪問到的IP
func Prepare(svrName string, useType string, svrPort int, healthPort int, healthType string, localIp string) *consulapi.AgentServiceRegistration {
    ip := localIp 
    // 註冊配置信息
    reg := &consul.AgentServiceRegistration{
        ID:      strings.ToLower(fmt.Sprintf("%s_%d", svrName, libs.Ip2Long(ip))), // 生成一個惟一當服務ID
        Name:    strings.ToLower(fmt.Sprintf("%s", svrName)), // 註冊服務名
        Tags:    []string{strings.ToLower(useType)},// 標籤
        Port:    svrPort, // 端口號
        Address: ip, // 所在節點ip地址
    }
    // 健康檢測配置信息
    reg.Check = &consulapi.AgentServiceCheck{
        TCP:                            fmt.Sprintf("%s:%d", ip, svrPort),
        Timeout:                        "1s",
        Interval:                       "15s",
        DeregisterCriticalServiceAfter: "30s",// 30秒服務不可達時,註銷服務
        Status:                         "passing",// 服務啓動時,默認正常
    }

    if healthType == "http" {
        reg.Check.HTTP = fmt.Sprintf("http://%s:%d%s", reg.Address, healthPort, "/health") // http檢測默認/health路徑
        reg.Check.TCP = ""
    }
    // 啓動http健康檢測響應
    if len(reg.Check.HTTP) > 0 {
        RunHealthCheck(reg.Check.HTTP)
    }
    p.curRegistration = reg
    return reg
}

func RunHealthCheck(addr string) error {
    uri, err := url.Parse(addr)
    if err != nil {
        return err
    }

    http.HandleFunc(uri.Path, func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("success"))
    })
    go http.ListenAndServe(uri.Host, nil)
    return nil
}

4.服務註冊

var (
    // 獲取consul.Client, 若是localhost不可達須要自動切換地址
    client := MakeClient("http:localhost:8500", "")
    // 配置要註冊的服務信息
    registration = Prepare("ImSvr", "grpc", "10000", 30000, "http", "192.168.1.100")
)

// 
func Register() error {
    err := client.Agent().ServiceRegister(registration)
    if err != nil {
        return err
    }

    return nil
}

5.服務註銷

//
func Deregister() error {
    svrId := registration.ID
    return client.Agent().ServiceDeregister(svrId)
}

小結:當這些服務註冊/註銷方法封裝好後,在應用服務啓動的時候,調用Register()方法進行註冊;當應用服務退出的時候,調用Deregister()方法進行註銷。當服務異常退出當時候,並不會調用Deregister()方法,那怎麼辦呢?放心,前面已經有說到健康檢測的DeregisterCriticalServiceAfter字段,當服務不可達時,會自動註銷服務。OK,到這裏服務的自動化註冊已經完成了。

服務發現

對於服務與服務之間的通迅有不少方式,有人直接用tcp/http;有人會考慮restfullapi,讓接口處理起來更容易;有人用dubbo,抱緊ali大腿;我們用grpc, 抱緊google大腿。通迅框架之間各有千秋,總之適合本身的纔是最好的。感興趣的盆友本身去google/百度。

google grpc框架裏並無實現如何基於consul進行服務發現(或許後期會加上),不過有DNS的服務發現。看了它的實現方式,大體懂了實現原理。OK,我們就寫個基於consul的服務發現。鄙人github中可直接用consulresolver(歡迎你們關注,若是有問題請提交issue會第一時間修改),目前只有roundrobin策略,後續會逐漸加入其餘策略(如隨機,權重隨機,負載策略等).同時該項目
github.com/generalzgd/grpc-svr-frame
還對grpc做了一些簡單的封裝,有利於快速搭建grpc服務。
這裏就不貼代碼了。

OK,基於consul的服務發現有了,那怎麼使用呢?
1.導入模塊,會自動執行包中的Init方法

import (
    grpclb_consul `github.com/generalzgd/grpc-svr-frame/grpc-consul`
    ctrl `github.com/generalzgd/grpc-svr-frame/grpc-ctrl`
)

2.組合grpc的方法封裝

type MyServer {
    ctrl.GrpcController
}

var (
    mySvr = MyServer{
        GrpcController:ctrl.MakeGrpcController(),
    }
)

func (p *MyServer) callSample(){
    // 設置5秒超時
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
    // 注意address字段,由「consul:///」字符串開頭,表示要用consulresolver來解析,後面寫上服務名,會自動解析到對應的服務地址。能夠細看下對應的文件,而後理解是怎麼運行的。
    cfg = yaml.EndpointConfig{
        Name: "ImSvr",
        Address: "consul:///imsvr", 
    }
    // 裏面還封裝了connection pool,以提升通迅效率
    clientConn,err := p.GetGrpcConnWithLB(cfg, ctx)
    if err != nil {
        return
    }
    // 如下是grpc框架的通用僞代碼
    client := NewImSvrClient(clientConn.ClientConn)
    resp, err := client.Login(ctx, &LoginReq{})
    if err != nil {
        return
    }
    logs.Info("Got:", resp)
}

總結

至此基於consul的服務註冊與發現已經整合到grpc通迅框架中,而且封裝了grpc服務的一些經常使用方法,可用於快速的開發微服務。這個整合過程歷經了不少辛酸,不只分析了官方文檔源代碼,也吸取了其餘在consul/grpc方面的貢獻者經驗,前先後後琢磨嘗試了許許多多的失敗,也走了不少的彎路。固然了,整個結構仍是個雛形,還須要完善。例如,均衡負載策略,目前只實現了roundrobin, 後續還有隨機、權重、負載策略等。

遇到的坑

我們在部署consul的時候選擇了一個老版本,覺得老版本(1.4.4)會相對穩定些。在新加節點的時候,忽然已有的services每隔幾分鐘會消失,而後又重現,而後又消失不停的重複。媽呀,出大問題了,跟運維老哥一塊兒折騰了個吧小時,推測可能join server節點的時候,數據同步可能存在問題。期間也沒作其餘操做,就一個節點一個節點leave回退,發現leave最後加入的server節點後consul穩定了。穩定後再仔細分析緣由,發現這個版本有bug,https://github.com/hashicorp/consul/issues/5518, 之後生產環境儘可能用次新版本。這是血的教訓!!!!!

相關文章
相關標籤/搜索