go基於grpc構建微服務框架-服務註冊與發現

概述

圖片描述

grpc 是谷歌開源的rpc框架,基於http2實現,並支持跨語言,目前基本涵蓋了主流語言.跨語言的實現主要得益於protobuf,經過編寫proto文件,經過protobuf工具生成對應語言的類庫進行使用.git

對於go這樣一門新生語言來講,生態鏈還處於發展階段,微服務框架也是如此,下面將基於grpc-go版本搭建一個微服務通信框架.github

1.服務註冊與發佈的機制

1.1 解決的問題

服務註冊與發佈主要解決的服務依賴問題,一般意義上,若是A服務調用B服務時,最直接的作法是配置IP地址和端口.但隨着服務依賴變多時,配置將會是否龐雜,且當服務發生遷移時,那麼全部相關服務的配置均須要修改,這將十分難以維護以及容易出現問題.
所以爲了解決這種服務依賴關係,服務註冊與發佈應運而生.json

1.2 機制

圖片描述

服務註冊與發現主要分爲如下幾點.app

  • 服務信息發佈

這裏主要是服務的服務名,IP信息,以及一些附件元數據.經過註冊接口註冊到服務註冊發佈中心.負載均衡

  • 存活檢測

當服務意外中止時,客戶端須要感知到服務中止,並將服務的IP地址踢出可用的IP地址列表,這裏可使用定時心跳去實現.框架

  • 客戶端負載均衡

經過服務註冊與發佈,能夠實現一個服務部署多臺實例,客戶端實如今實例直接的負載均衡,從而實現服務的橫向擴展.微服務

所以,服務註冊與發佈能夠概況爲,服務將信息上報,客戶端拉取服務信息,經過服務名進行調用,當服務宕機時客戶端踢掉故障服務,服務新上線時客戶端自動添加到調用列表.工具

2.實現

grpc-go的整個實現大量使用go的接口特性,所以經過擴展接口,能夠很容易的實現服務的註冊與發現,這裏服務註冊中心考慮到可用性以及一致性,通常採用etcd或zookeeper來實現,這裏實現etcd的版本.
完整代碼以及使用示例見:https://github.com/g4zhuj/grp...spa

2.1 客戶端

具體須要實現幾個接口,針對客戶端,最簡單的實現方式只須要實現兩個接口方法Resolve(),以及Next(),而後使用輪詢的負載均衡方式.
主要經過etcd的Get接口以及Watch接口實現.code

  • Resolve()接口
//用於生成Watcher,監聽註冊中心中的服務信息變化
func (er *etcdRegistry) Resolve(target string) (naming.Watcher, error) {
    ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
    w := &etcdWatcher{
        cli:    er.cli,
        target: target + "/",
        ctx:    ctx,
        cancel: cancel,
    }
    return w, nil
}
  • Next() 接口
//Next接口主要用於獲取註冊的服務信息,經過channel以及watch,當服務信息發生
//變化時,Next接口會將變化返回給grpc框架從而實現服務信息變動.
func (ew *etcdWatcher) Next() ([]*naming.Update, error) {
    var updates []*naming.Update
    //初次獲取時,建立監聽channel,並返回獲取到的服務信息
    if ew.watchChan == nil {
        //create new chan
        resp, err := ew.cli.Get(ew.ctx, ew.target, etcd.WithPrefix(), etcd.WithSerializable())
        if err != nil {
            return nil, err
        }
        for _, kv := range resp.Kvs {
            var upt naming.Update
            if err := json.Unmarshal(kv.Value, &upt); err != nil {
                continue
            }
            updates = append(updates, &upt)
        }
        //建立etcd的watcher監聽target(服務名)的信息.
        opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
        ew.watchChan = ew.cli.Watch(context.TODO(), ew.target, opts...)
        return updates, nil
    }

    //阻塞監聽,服務發生變化時才返回給上層
    wrsp, ok := <-ew.watchChan
    if !ok {
        err := status.Error(codes.Unavailable, "etcd watch closed")
        return nil, err
    }
    if wrsp.Err() != nil {
        return nil, wrsp.Err()
    }
    for _, e := range wrsp.Events {
        var upt naming.Update
        var err error
        switch e.Type {
        case etcd.EventTypePut:
            err = json.Unmarshal(e.Kv.Value, &upt)
            upt.Op = naming.Add
        case etcd.EventTypeDelete:
            err = json.Unmarshal(e.PrevKv.Value, &upt)
            upt.Op = naming.Delete
        }

        if err != nil {
            continue
        }
        updates = append(updates, &upt)
    }
    return updates, nil
}

2.2 服務端

服務端只須要上報服務信息,並定時保持心跳,這裏經過etcd的Put接口以及KeepAlive接口實現.
具體以下:

func (er *etcdRegistry) Register(ctx context.Context, target string, update naming.Update, opts ...wrapper.RegistryOptions) (err error) {
    //將服務信息序列化成json格式
    var upBytes []byte
    if upBytes, err = json.Marshal(update); err != nil {
        return status.Error(codes.InvalidArgument, err.Error())
    }

    ctx, cancel := context.WithTimeout(context.TODO(), resolverTimeOut)
    er.cancal = cancel
    rgOpt := wrapper.RegistryOption{TTL: wrapper.DefaultRegInfTTL}
    for _, opt := range opts {
        opt(&rgOpt)
    }

    switch update.Op {
    case naming.Add:
        lsRsp, err := er.lsCli.Grant(ctx, int64(rgOpt.TTL/time.Second))
        if err != nil {
            return err
        }

        //Put服務信息到etcd,並設置key的值TTL,經過後面的KeepAlive接口
        //對TTL進行續期,超過TTL的時間未收到續期請求,則說明服務可能掛了,從而清除服務信息
        etcdOpts := []etcd.OpOption{etcd.WithLease(lsRsp.ID)}
        key := target + "/" + update.Addr
        _, err = er.cli.KV.Put(ctx, key, string(upBytes), etcdOpts...)
        if err != nil {
            return err
        }

        //保持心跳
        lsRspChan, err := er.lsCli.KeepAlive(context.TODO(), lsRsp.ID)
        if err != nil {
            return err
        }
        go func() {
            for {
                _, ok := <-lsRspChan
                if !ok {
                    grpclog.Fatalf("%v keepalive channel is closing", key)
                    break
                }
            }
        }()
    case naming.Delete:
        _, err = er.cli.Delete(ctx, target+"/"+update.Addr)
    default:
        return status.Error(codes.InvalidArgument, "unsupported op")
    }
    return nil
}

3. 參考

https://grpc.io/
https://coreos.com/etcd/
https://github.com/g4zhuj/grp...

相關文章
相關標籤/搜索