Go | Go 使用 consul 作服務發現

系列文章目錄


@TOCgit


前言

前面一章講了微服務的一些優勢和缺點,那如何作到github

1、目標

2、使用步驟

1. 安裝 consul

咱們能夠直接使用官方提供的二進制文件來進行安裝部署,其官網地址爲 https://www.consul.io/downloads
在這裏插入圖片描述
下載後爲可執行文件,在咱們開發試驗過程當中,能夠直接使用 consul agent -dev 命令來啓動一個單節點的 consul編程

在啓動的打印日誌中能夠看到 agent: Started HTTP server on 127.0.0.1:8500 (tcp), 咱們能夠在瀏覽器直接訪問 127.0.0.1:8500 便可看到以下
在這裏插入圖片描述
這裏咱們的 consul 就啓動成功了api

2. 服務註冊

在網絡編程中,通常會提供項目的 IP、PORT、PROTOCOL,在服務治理中,咱們還須要知道對應的服務名、實例名以及一些自定義的擴展信息瀏覽器

在這裏使用 ServiceInstance 接口來規定註冊服務時必須的一些信息,同時用 DefaultServiceInstance 實現網絡

type ServiceInstance interface {

    // return The unique instance ID as registered.
    GetInstanceId() string

    // return The service ID as registered.
    GetServiceId() string

    // return The hostname of the registered service instance.
    GetHost() string

    // return The port of the registered service instance.
    GetPort() int

    // return Whether the port of the registered service instance uses HTTPS.
    IsSecure() bool

    // return The key / value pair metadata associated with the service instance.
    GetMetadata() map[string]string
}

type DefaultServiceInstance struct {
    InstanceId string
    ServiceId  string
    Host       string
    Port       int
    Secure     bool
    Metadata   map[string]string
}

func NewDefaultServiceInstance(serviceId string, host string, port int, secure bool,
    metadata map[string]string, instanceId string) (*DefaultServiceInstance, error) {

    // 若是沒有傳入 IP 則獲取一下,這個方法在多網卡的狀況下,並很差用
    if len(host) == 0 {
        localIP, err := util.GetLocalIP()
        if err != nil {
            return nil, err
        }
        host = localIP
    }

    if len(instanceId) == 0 {
        instanceId = serviceId + "-" + strconv.FormatInt(time.Now().Unix(), 10) + "-" + strconv.Itoa(rand.Intn(9000)+1000)
    }

    return &DefaultServiceInstance{InstanceId: instanceId, ServiceId: serviceId, Host: host, Port: port, Secure: secure, Metadata: metadata}, nil
}

func (serviceInstance DefaultServiceInstance) GetInstanceId() string {
    return serviceInstance.InstanceId
}

func (serviceInstance DefaultServiceInstance) GetServiceId() string {
    return serviceInstance.ServiceId
}

func (serviceInstance DefaultServiceInstance) GetHost() string {
    return serviceInstance.Host
}

func (serviceInstance DefaultServiceInstance) GetPort() int {
    return serviceInstance.Port
}

func (serviceInstance DefaultServiceInstance) IsSecure() bool {
    return serviceInstance.Secure
}

func (serviceInstance DefaultServiceInstance) GetMetadata() map[string]string {
    return serviceInstance.Metadata
}

定義接口

在上面規定了須要註冊的服務的必要信息,下面定義下服務註冊和剔除的方法app

type ServiceRegistry interface {
    Register(serviceInstance cloud.ServiceInstance) bool

    Deregister()
}

具體實現

由於 consul 提供了 http 接口來對consul 進行操做,咱們也可使用 http 請求方式進行註冊和剔除操做,具體 http 接口文檔見 https://www.consul.io/api-docs, consul 默認提供了go 語言的實現,這裏直接使用 github.com/hashicorp/consul/apitcp

import (
    "errors"
    "fmt"
    "github.com/hashicorp/consul/api"
    "strconv"
    "unsafe"
)

type consulServiceRegistry struct {
    serviceInstances     map[string]map[string]cloud.ServiceInstance
    client               api.Client
    localServiceInstance cloud.ServiceInstance
}

func (c consulServiceRegistry) Register(serviceInstance cloud.ServiceInstance) bool {
    // 建立註冊到consul的服務到
    registration := new(api.AgentServiceRegistration)
    registration.ID = serviceInstance.GetInstanceId()
    registration.Name = serviceInstance.GetServiceId()
    registration.Port = serviceInstance.GetPort()
    var tags []string
    if serviceInstance.IsSecure() {
        tags = append(tags, "secure=true")
    } else {
        tags = append(tags, "secure=false")
    }
    if serviceInstance.GetMetadata() != nil {
        var tags []string
        for key, value := range serviceInstance.GetMetadata() {
            tags = append(tags, key+"="+value)
        }
        registration.Tags = tags
    }
    registration.Tags = tags

    registration.Address = serviceInstance.GetHost()

    // 增長consul健康檢查回調函數
    check := new(api.AgentServiceCheck)

    schema := "http"
    if serviceInstance.IsSecure() {
        schema = "https"
    }
    check.HTTP = fmt.Sprintf("%s://%s:%d/actuator/health", schema, registration.Address, registration.Port)
    check.Timeout = "5s"
    check.Interval = "5s"
    check.DeregisterCriticalServiceAfter = "20s" // 故障檢查失敗30s後 consul自動將註冊服務刪除
    registration.Check = check

    // 註冊服務到consul
    err := c.client.Agent().ServiceRegister(registration)
    if err != nil {
        fmt.Println(err)
        return false
    }

    if c.serviceInstances == nil {
        c.serviceInstances = map[string]map[string]cloud.ServiceInstance{}
    }

    services := c.serviceInstances[serviceInstance.GetServiceId()]

    if services == nil {
        services = map[string]cloud.ServiceInstance{}
    }

    services[serviceInstance.GetInstanceId()] = serviceInstance

    c.serviceInstances[serviceInstance.GetServiceId()] = services

    c.localServiceInstance = serviceInstance

    return true
}

// deregister a service
func (c consulServiceRegistry) Deregister() {
    if c.serviceInstances == nil {
        return
    }

    services := c.serviceInstances[c.localServiceInstance.GetServiceId()]

    if services == nil {
        return
    }

    delete(services, c.localServiceInstance.GetInstanceId())

    if len(services) == 0 {
        delete(c.serviceInstances, c.localServiceInstance.GetServiceId())
    }

    _ = c.client.Agent().ServiceDeregister(c.localServiceInstance.GetInstanceId())

    c.localServiceInstance = nil
}

// new a consulServiceRegistry instance
// token is optional
func NewConsulServiceRegistry(host string, port int, token string) (*consulServiceRegistry, error) {
    if len(host) < 3 {
        return nil, errors.New("check host")
    }

    if port <= 0 || port > 65535 {
        return nil, errors.New("check port, port should between 1 and 65535")
    }

    config := api.DefaultConfig()
    config.Address = host + ":" + strconv.Itoa(port)
    config.Token = token
    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }

    return &consulServiceRegistry{client: *client}, nil
}

測試用例

註冊服務的代碼基本完成,來測試一下函數

func TestConsulServiceRegistry(t *testing.T) {
    host := "127.0.0.1"
    port := 8500
    registryDiscoveryClient, _ := extension.NewConsulServiceRegistry(host, port, "")

    ip, err := util.GetLocalIP()
    if err != nil {
        t.Error(err)
    }
    
    serviceInstanceInfo, _ := cloud.NewDefaultServiceInstance("go-user-server", "", 8090,
        false, map[string]string{"user":"zyn"}, "")

    registryDiscoveryClient.Register(serviceInstanceInfo)

    r := gin.Default()
    // 健康檢測接口,其實只要是 200 就認爲成功了
    r.GET("/actuator/health", func(c *gin.Context) {
        c.JSON(200, gin.H{
            "message": "pong",
        })
    })
    err = r.Run(":8090")
    if err != nil{
        registryDiscoveryClient.Deregister()
    }
}

若是成功,則會在 consul 看到 go-user-server 這個服務微服務

3. 服務發現

在服務發現中,通常會須要兩個方法

  1. 獲取全部的服務列表
  2. 獲取指定的服務的全部實例信息

接口定義

type DiscoveryClient interface {
    
    /**
     * Gets all ServiceInstances associated with a particular serviceId.
     * @param serviceId The serviceId to query.
     * @return A List of ServiceInstance.
     */
    GetInstances(serviceId string) ([]cloud.ServiceInstance, error)

    /**
     * @return All known service IDs.
     */
    GetServices() ([]string, error)
}

具體實現

來實現一下

type consulServiceRegistry struct {
    serviceInstances     map[string]map[string]cloud.ServiceInstance
    client               api.Client
    localServiceInstance cloud.ServiceInstance
}

func (c consulServiceRegistry) GetInstances(serviceId string) ([]cloud.ServiceInstance, error) {
    catalogService, _, _ := c.client.Catalog().Service(serviceId, "", nil)
    if len(catalogService) > 0 {
        result := make([]cloud.ServiceInstance, len(catalogService))
        for index, sever := range catalogService {
            s := cloud.DefaultServiceInstance{
                InstanceId: sever.ServiceID,
                ServiceId:  sever.ServiceName,
                Host:       sever.Address,
                Port:       sever.ServicePort,
                Metadata:   sever.ServiceMeta,
            }
            result[index] = s
        }
        return result, nil
    }
    return nil, nil
}

func (c consulServiceRegistry) GetServices() ([]string, error) {
    services, _, _ := c.client.Catalog().Services(nil)
    result := make([]string, unsafe.Sizeof(services))
    index := 0
    for serviceName, _ := range services {
        result[index] = serviceName
        index++
    }
    return result, nil
}

// new a consulServiceRegistry instance
// token is optional
func NewConsulServiceRegistry(host string, port int, token string) (*consulServiceRegistry, error) {
    if len(host) < 3 {
        return nil, errors.New("check host")
    }

    if port <= 0 || port > 65535 {
        return nil, errors.New("check port, port should between 1 and 65535")
    }

    config := api.DefaultConfig()
    config.Address = host + ":" + strconv.Itoa(port)
    config.Token = token
    client, err := api.NewClient(config)
    if err != nil {
        return nil, err
    }

    return &consulServiceRegistry{client: *client}, nil
}

測試用例

func TestConsulServiceDiscovery(t *testing.T) {
    host := "127.0.0.1"
    port := 8500
    token := ""
    registryDiscoveryClient, err := extension.NewConsulServiceRegistry(host, port, token)
    if err != nil {
        panic(err)
    }

    t.Log(registryDiscoveryClient.GetServices())

    t.Log(registryDiscoveryClient.GetInstances("go-user-server"))
}

結果

consul_service_registry_test.go:57: [consul go-user-server      ] <nil>

consul_service_registry_test.go:59: [{go-user-server-1602590661-56179 go-user-server 127.0.0.1 8090 false map[user:zyn]}] <nil>

總結

經過使用 consul api 咱們能夠簡單的實現基於 consul 的服務發現,在經過結合 http rpc 就可簡單的實現服務的調用,下面一章來簡單講下 go 如何發起 http 請求,爲咱們作 rpc 作個鋪墊

具體代碼見 https://github.com/zhangyunan...

參考

相關文章
相關標籤/搜索