consulnginx
當server端是集羣部署時,client調用server就須要用到服務發現與負載均衡。一般有兩總方式:git
第一種方式常見的就是用nginx給http服務作負載均衡,client端不直接與server交互,而是把請求並給nginx,nginx再轉給後端的服務。
這種方式的優勢是:github
這種方式有幾個缺點:golang
第二種方式能夠參考dubbo的rpc方式,全部的服務都註冊在zookeeper上,client端從zookeeper訂閱server的列表,而後本身選擇把請求發送到哪一個server上。對於上面提到的兩個缺點,這種方式都很好的避免了:算法
這種方式的缺點就是實現起來比較複雜。apache
用第一種方式作grpc的負載均衡時能夠有如下的選擇:c#
用第二種方式時,能夠選擇的數據中心中間件有:segmentfault
他們都實現了raft算法,均可以用來作註冊中心,本篇文章選擇consul是由於consul的特色就是作服務發現,有現成的api能夠用。後端
grpc的Dial()和DialContent()方法中均可以添加Load-Balance的選項,Dial方法已經被廢棄了,本篇文章介紹使用DialContext的方法。api
grpc官方實現了[dns_resolver]()用來作dns的負載均衡。咱們經過例子看看grpc client端的代碼是怎麼寫的,而後再理解dns_resolver的源碼,最後參照dns_resolver來寫本身的consul_resovler。
dns的負載均衡的例子:
package main import ( "context" "log" "google.golang.org/grpc" "google.golang.org/grpc/balancer/roundrobin" pb "google.golang.org/grpc/examples/helloworld/helloworld" "google.golang.org/grpc/resolver" ) const ( address = "dns:///dns-record-name:443" defaultName = "world" ) func main() { // The secret sauce resolver.SetDefaultScheme("dns") // Set up a connection to the server. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) conn, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the servers in round-robin manner. for i := 0; i < 3; i++ { ctx := context.Background() r, err := c.SayHello(ctx, &pb.HelloRequest{Name: defaultName}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.Message) } }
DialContext的定義以下:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)
下面這行代碼指明瞭用dns_resolver,實際上也能夠不寫,grpc會根據DialContext的第二個參數target來判斷選用哪一個resolver,例子中傳給DialContext的target是 dns:///dns-record-name:443,grpc會自動選擇dns_resolver
resolver.SetDefaultScheme("dns")
下面的這個選項,指明瞭grpc用輪詢作爲負載均衡的策略
grpc.WithBalancerName(roundrobin.Name)
調用grpc.DialContext以後,grpc會找到對應的resovler,拿到服務的地址列表,而後在調用服務提供的接口時,根據指定的輪詢策略選擇一個服務。
gRPC Name Resolution裏面說了,能夠實現自定義的resolver做爲插件。
先看看resolver.go的源碼,源碼路徑是$GOPATH/src/google.golang.org/grpc/resolver/resolver.go
m = make(map[string]Builder) //scheme到Builder的map func Register(b Builder) { //用於resolver註冊的接口,dns_resolver.go的init方中調用了這個方法,實際就是更新了map m[b.Scheme()] = b } type Resolver interface { ResolveNow(ResolveNowOption) //當即resolve,從新查詢服務信息 Close() //關閉這個Resolver } type Target struct {//uri解析以後的對象, uri的格式詳見RFC3986 Scheme string Authority string Endpoint string } type Address struct {//描述一個服務的地址信息 Addr string //格式是 host:port Type AddressType ServerName string Metadata interface{} } type ClientConn interface {//定義了兩個callback函數,用於通知服務信息的更新 NewAddress(addresses []Address) NewServiceConfig(serviceConfig string) } type Builder interface { Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error) //返回一個Resolver Scheme() string //返回scheme如 "dns", "passthrough", "consul" } func Get(scheme string) Builder { //grpc.ClientConn會高用這個方法獲取指定的Builder接口的實例 if b, ok := m[scheme]; ok { return b } return nil }
即便加了註釋,估計也很難立刻理解這個其中的具體含意,博主也是結合dns_resolver.go,反覆讀了好幾遍才理解resolver.go。其大體的意思是,grpc.DialContext方法調用以後:
調用Builder.Build方法
瞭解了resolver源碼的意思以後,再看一下dns_resolver.go就比較清晰了
//註冊一個Builder到resolver的map裏面 //這個方法會被默認調用,瞭解go的init能夠自行百度 func init() { resolver.Register(NewBuilder()) } func NewBuilder() resolver.Builder {//建立一個resolver.Builder的實例 return &dnsBuilder{minFreq: defaultFreq} } func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { //解析target拿到ip和端口 host, port, err := parseTarget(target.Endpoint, defaultPort) if err != nil { return nil, err } // IP address. if net.ParseIP(host) != nil { host, _ = formatIP(host) addr := []resolver.Address{{Addr: host + ":" + port}} i := &ipResolver{ cc: cc, ip: addr, rn: make(chan struct{}, 1), q: make(chan struct{}), } cc.NewAddress(addr) go i.watcher() return i, nil } // DNS address (non-IP). ctx, cancel := context.WithCancel(context.Background()) d := &dnsResolver{ freq: b.minFreq, backoff: backoff.Exponential{MaxDelay: b.minFreq}, host: host, port: port, ctx: ctx, cancel: cancel, cc: cc, t: time.NewTimer(0), rn: make(chan struct{}, 1), disableServiceConfig: opts.DisableServiceConfig, } if target.Authority == "" { d.resolver = defaultResolver } else { d.resolver, err = customAuthorityResolver(target.Authority) if err != nil { return nil, err } } d.wg.Add(1) go d.watcher()//起一個goroutine,由於watcher這個方法是個死循環,當定時器 return d, nil } func (d *dnsResolver) watcher() { defer d.wg.Done() for { //這個select沒有default,當沒有case知足時會一直阻塞 //結束阻塞的條件是定時器超時d.t.C,或者d.rn這個channel中有數據可讀 select { case <-d.ctx.Done(): return case <-d.t.C: case <-d.rn: } result, sc := d.lookup() // Next lookup should happen within an interval defined by d.freq. It may be // more often due to exponential retry on empty address list. if len(result) == 0 { d.retryCount++ d.t.Reset(d.backoff.Backoff(d.retryCount)) } else { d.retryCount = 0 d.t.Reset(d.freq) } //resolver.ClientConn的兩個callback的調用,實現服務信息傳入上層 d.cc.NewServiceConfig(sc) d.cc.NewAddress(result) } } //向channel中寫入,用於結束watcher中那個select的阻塞狀態,後面的代碼就是從新查詢服務信息的邏輯 func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOption) { select { case i.rn <- struct{}{}: default: } }
上面咱們瞭解了grpc的resolver的機制,接下來實現consul_resolver, 咱們先把代碼的架子搭起來
init() //返回一個resolver.Builder的實例 //實現resolver.Builder的接口中的全部方法就是一個resolver.Builder type consulBuidler strcut { } func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { //TODO 解析target, 拿到consul的ip和端口 //TODO 用consul的go api鏈接consul,查詢服務結點信息,而且調用resolver.ClientConn的兩個callback } func (cb *consulBuilder) Scheme() string { return "consul" } //ResolverNow方法什麼也不作,由於和consul保持了發佈訂閱的關係 //不須要像dns_resolver那個定時的去刷新 func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) { } //暫時先什麼也不作吧 func (cr *consulResolver) Close() { }
如今來看,實現consul_resolver.go最大的問題就是怎麼用consul提供的go api了,參考這篇文章就能夠了,而後consul_resolver.go的代碼就出來了
package consul import ( "errors" "fmt" "github.com/hashicorp/consul/api" "google.golang.org/grpc/resolver" "regexp" "sync" ) const ( defaultPort = "8500" ) var ( errMissingAddr = errors.New("consul resolver: missing address") errAddrMisMatch = errors.New("consul resolver: invalied uri") errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon") regexConsul, _ = regexp.Compile("^([A-z0-9.]+)(:[0-9]{1,5})?/([A-z_]+)$") ) func Init() { fmt.Printf("calling consul init\n") resolver.Register(NewBuilder()) } type consulBuilder struct { } type consulResolver struct { address string wg sync.WaitGroup cc resolver.ClientConn name string disableServiceConfig bool lastIndex uint64 } func NewBuilder() resolver.Builder { return &consulBuilder{} } func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { fmt.Printf("calling consul build\n") fmt.Printf("target: %v\n", target) host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint)) if err != nil { return nil, err } cr := &consulResolver{ address: fmt.Sprintf("%s%s", host, port), name: name, cc: cc, disableServiceConfig: opts.DisableServiceConfig, lastIndex: 0, } cr.wg.Add(1) go cr.watcher() return cr, nil } func (cr *consulResolver) watcher() { fmt.Printf("calling consul watcher\n") config := api.DefaultConfig() config.Address = cr.address client, err := api.NewClient(config) if err != nil { fmt.Printf("error create consul client: %v\n", err) return } for { services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex}) if err != nil { fmt.Printf("error retrieving instances from Consul: %v", err) } cr.lastIndex = metainfo.LastIndex var newAddrs []resolver.Address for _, service := range services { addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port) newAddrs = append(newAddrs, resolver.Address{Addr: addr}) } fmt.Printf("adding service addrs\n") fmt.Printf("newAddrs: %v\n", newAddrs) cr.cc.NewAddress(newAddrs) cr.cc.NewServiceConfig(cr.name) } } func (cb *consulBuilder) Scheme() string { return "consul" } func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) { } func (cr *consulResolver) Close() { } func parseTarget(target string) (host, port, name string, err error) { fmt.Printf("target uri: %v\n", target) if target == "" { return "", "", "", errMissingAddr } if !regexConsul.MatchString(target) { return "", "", "", errAddrMisMatch } groups := regexConsul.FindStringSubmatch(target) host = groups[1] port = groups[2] name = groups[3] if port == "" { port = defaultPort } return host, port, name, nil }
到此,grpc客戶端服務發現就搞定了。
服務註冊直接用consul的go api就能夠了,也是參考前一篇文章,簡單的封裝一下,consul_register.go的代碼以下:
package consul import ( "fmt" "github.com/hashicorp/consul/api" "time" ) type ConsulService struct { IP string Port int Tag []string Name string } func RegitserService(ca string, cs *ConsulService) { //register consul consulConfig := api.DefaultConfig() consulConfig.Address = ca client, err := api.NewClient(consulConfig) if err != nil { fmt.Printf("NewClient error\n%v", err) return } agent := client.Agent() interval := time.Duration(10) * time.Second deregister := time.Duration(1) * time.Minute reg := &api.AgentServiceRegistration{ ID: fmt.Sprintf("%v-%v-%v", cs.Name, cs.IP, cs.Port), // 服務節點的名稱 Name: cs.Name, // 服務名稱 Tags: cs.Tag, // tag,能夠爲空 Port: cs.Port, // 服務端口 Address: cs.IP, // 服務 IP Check: &api.AgentServiceCheck{ // 健康檢查 Interval: interval.String(), // 健康檢查間隔 GRPC: fmt.Sprintf("%v:%v/%v", cs.IP, cs.Port, cs.Name), // grpc 支持,執行健康檢查的地址,service 會傳到 Health.Check 函數中 DeregisterCriticalServiceAfter: deregister.String(), // 註銷時間,至關於過時時間 }, } fmt.Printf("registing to %v\n", ca) if err := agent.ServiceRegister(reg); err != nil { fmt.Printf("Service Register error\n%v", err) return } }
把grpc的helloworld的demo改一下,用consul來作服務註冊和發現。
server端代碼:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "log" "net" "server/internal/consul" pb "server/proto/helloworld" ) const ( port = ":50051" ) // server is used to implement helloworld.GreeterServer. type server struct{} // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.Name) return &pb.HelloReply{Message: "Hello " + in.Name}, nil } func RegisterToConsul() { consul.RegitserService("127.0.0.1:8500", &consul.ConsulService{ Name: "helloworld", Tag: []string{"helloworld"}, IP: "127.0.0.1", Port: 50051, }) } //health type HealthImpl struct{} // Check 實現健康檢查接口,這裏直接返回健康狀態,這裏也能夠有更復雜的健康檢查策略,好比根據服務器負載來返回 func (h *HealthImpl) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { fmt.Print("health checking\n") return &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_SERVING, }, nil } func (h *HealthImpl) Watch(req *grpc_health_v1.HealthCheckRequest, w grpc_health_v1.Health_WatchServer) error { return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) grpc_health_v1.RegisterHealthServer(s, &HealthImpl{}) RegisterToConsul() if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
client端代碼:
package main import ( "client/internal/consul" pb "client/proto/helloworld" "context" "google.golang.org/grpc" "log" "os" "time" ) const ( target = "consul://127.0.0.1:8500/helloworld" defaultName = "world" ) func main() { consul.Init() // Set up a connection to the server. ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) conn, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn) // Contact the server and print out its response. name := defaultName if len(os.Args) > 1 { name = os.Args[1] } for { ctx, _ := context.WithTimeout(context.Background(), time.Second) r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.Message) time.Sleep(time.Second * 2) } }
運行一把
啓動consul
consul agent -dev
啓動hello server
cd server go run cmd/main.go
啓動hello client
cd client go run cmd/main.go
運行結果:
//client 2019/03/07 17:22:04 Greeting: Hello world 2019/03/07 17:22:06 Greeting: Hello world //server 2019/03/07 17:22:04 Received: world 2019/03/07 17:22:06 Received: world
完整工程的git地址
工程使用方法:
cd server go mod tidy go run cmd/main.go cd client go mod tidy go run cmd/main.go
請自行解決防火牆的問題