構建高可用、高性能的通訊服務,一般採用服務註冊與發現、負載均衡和容錯處理等機制實現。根據負載均衡實現所在的位置不一樣,一般可分爲如下三種解決方案:java
在服務消費者和服務提供者之間有一個獨立的LB,一般是專門的硬件設備如 F5,或者基於軟件如 LVS,HAproxy等實現。LB上有全部服務的地址映射表,一般由運維配置註冊,當服務消費方調用某個目標服務時,它向LB發起請求,由LB以某種策略,好比輪詢(Round-Robin)作負載均衡後將請求轉發到目標服務。LB通常具有健康檢查能力,能自動摘除不健康的服務實例。 該方案主要問題:git
單點問題,全部服務調用流量都通過LB,當服務數量和調用量大的時候,LB容易成爲瓶頸,且一旦LB發生故障影響整個系統;github
服務消費方、提供方之間增長了一級,有必定性能開銷。golang
針對第一個方案的不足,此方案將LB的功能集成到服務消費方進程裏,也被稱爲軟負載或者客戶端負載方案。服務提供方啓動時,首先將服務地址註冊到服務註冊表,同時按期報心跳到服務註冊表以代表服務的存活狀態,至關於健康檢查,服務消費方要訪問某個服務時,它經過內置的LB組件向服務註冊表查詢,同時緩存並按期刷新目標服務地址列表,而後以某種負載均衡策略選擇一個目標服務地址,最後向目標服務發起請求。LB和服務發現能力被分散到每個服務消費者的進程內部,同時服務消費方和服務提供方之間是直接調用,沒有額外開銷,性能比較好。該方案主要問題:api
開發成本,該方案將服務調用方集成到客戶端的進程裏頭,若是有多種不一樣的語言棧,就要配合開發多種不一樣的客戶端,有必定的研發和維護成本;緩存
另外生產環境中,後續若是要對客戶庫進行升級,勢必要求服務調用方修改代碼並從新發布,升級較複雜。服務器
該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本相似。
不一樣之處是將LB和服務發現功能從進程內移出來,變成主機上的一個獨立進程。主機上的一個或者多個服務要訪問目標服務時,他們都經過同一主機上的獨立LB進程作服務發現和負載均衡。該方案也是一種分佈式方案沒有單點問題,一個LB進程掛了隻影響該主機上的服務調用方,服務調用方和LB之間是進程內調用性能好,同時該方案還簡化了服務調用方,不須要爲不一樣語言開發客戶庫,LB的升級不須要服務調用方改代碼。
該方案主要問題:部署較複雜,環節多,出錯調試排查問題不方便。mvc
gRPC開源組件官方並未直接提供服務註冊與發現的功能實現,但其設計文檔已提供實現的思路,並在不一樣語言的gRPC代碼API中已提供了命名解析和負載均衡接口供擴展。app
其基本實現原理:負載均衡
服務啓動後gRPC客戶端向命名服務器發出名稱解析請求,名稱將解析爲一個或多個IP地址,每一個IP地址標示它是服務器地址仍是負載均衡器地址,以及標示要使用那個客戶端負載均衡策略或服務配置。
客戶端實例化負載均衡策略,若是解析返回的地址是負載均衡器地址,則客戶端將使用grpclb策略,不然客戶端使用服務配置請求的負載均衡策略。
負載均衡策略爲每一個服務器地址建立一個子通道(channel)。
當有rpc請求時,負載均衡策略決定那個子通道即grpc服務器將接收請求,當可用服務器爲空時客戶端的請求將被阻塞。
根據gRPC官方提供的設計思路,基於進程內LB方案(即第2個案,阿里開源的服務框架 Dubbo 也是採用相似機制),結合分佈式一致的組件(如Zookeeper、Consul、Etcd),可找到gRPC服務發現和負載均衡的可行解決方案。接下來以GO語言爲例,簡單介紹下基於Etcd3的關鍵代碼實現:
1)命名解析實現:resolver.go
package etcdv3 import ( "errors" "fmt" "strings" etcd3 "github.com/coreos/etcd/clientv3" "google.golang.org/grpc/naming" ) // resolver is the implementaion of grpc.naming.Resolver type resolver struct { serviceName string // service name to resolve } // NewResolver return resolver with service name func NewResolver(serviceName string) *resolver { return &resolver{serviceName: serviceName} } // Resolve to resolve the service from etcd, target is the dial address of etcd // target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379" func (re *resolver) Resolve(target string) (naming.Watcher, error) { if re.serviceName == "" { return nil, errors.New("grpclb: no service name provided") } // generate etcd client client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err != nil { return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error()) } // Return watcher return &watcher{re: re, client: *client}, nil }
2)服務發現實現:watcher.go
package etcdv3 import ( "fmt" etcd3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "google.golang.org/grpc/naming" "github.com/coreos/etcd/mvcc/mvccpb" ) // watcher is the implementaion of grpc.naming.Watcher type watcher struct { re *resolver // re: Etcd Resolver client etcd3.Client isInitialized bool } // Close do nothing func (w *watcher) Close() { } // Next to return the updates func (w *watcher) Next() ([]*naming.Update, error) { // prefix is the etcd prefix/value to watch prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName) // check if is initialized if !w.isInitialized { // query addresses from etcd resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix()) w.isInitialized = true if err == nil { addrs := extractAddrs(resp) //if not empty, return the updates or watcher new dir if l := len(addrs); l != 0 { updates := make([]*naming.Update, l) for i := range addrs { updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]} } return updates, nil } } } // generate etcd Watcher rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil case mvccpb.DELETE: return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil } } } return nil, nil } func extractAddrs(resp *etcd3.GetResponse) []string { addrs := []string{} if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v != nil { addrs = append(addrs, string(v)) } } return addrs }
3)服務註冊實現:register.go
package etcdv3 import ( "fmt" "log" "strings" "time" etcd3 "github.com/coreos/etcd/clientv3" "golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" ) // Prefix should start and end with no slash var Prefix = "etcd3_naming" var client etcd3.Client var serviceKey string var stopSignal = make(chan bool, 1) // Register func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error { serviceValue := fmt.Sprintf("%s:%d", host, port) serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue) // get endpoints for register dial address var err error client, err := etcd3.New(etcd3.Config{ Endpoints: strings.Split(target, ","), }) if err != nil { return fmt.Errorf("grpclb: create etcd3 client failed: %v", err) } go func() { // invoke self-register with ticker ticker := time.NewTicker(interval) for { // minimum lease TTL is ttl-second resp, _ := client.Grant(context.TODO(), int64(ttl)) // should get first, if not exist, set it _, err := client.Get(context.Background(), serviceKey) if err != nil { if err == rpctypes.ErrKeyNotFound { if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil { log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error()) } } else { log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error()) } } else { // refresh set to true for not notifying the watcher if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil { log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error()) } } select { case <-stopSignal: return case <-ticker.C: } } }() return nil } // UnRegister delete registered service from etcd func UnRegister() error { stopSignal <- true stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock var err error; if _, err := client.Delete(context.Background(), serviceKey); err != nil { log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error()) } else { log.Printf("grpclb: deregister '%s' ok.", serviceKey) } return err }
4)接口描述文件:helloworld.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "com.midea.jr.test.grpc"; option java_outer_classname = "HelloWorldProto"; option objc_class_prefix = "HLW"; package helloworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) { } } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
5)實現服務端接口:helloworldserver.go
package main import ( "flag" "fmt" "log" "net" "os" "os/signal" "syscall" "time" "golang.org/x/net/context" "google.golang.org/grpc" grpclb "com.midea/jr/grpclb/naming/etcd/v3" "com.midea/jr/grpclb/example/pb" ) var ( serv = flag.String("service", "hello_service", "service name") port = flag.Int("port", 50001, "listening port") reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address") ) func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port)) if err != nil { panic(err) } err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15) if err != nil { panic(err) } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch log.Printf("receive signal '%v'", s) grpclb.UnRegister() os.Exit(1) }() log.Printf("starting hello service at %d", *port) s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) s.Serve(lis) } // server is used to implement helloworld.GreeterServer. type server struct{} // SayHello implements helloworld.GreeterServer func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name) return &pb.HelloReply{Message: "Hello " + in.Name}, nil }
6)實現客戶端接口:helloworldclient.go
package main import ( "flag" "fmt" "time" grpclb "com.midea/jr/grpclb/naming/etcd/v3" "com.midea/jr/grpclb/example/pb" "golang.org/x/net/context" "google.golang.org/grpc" "strconv" ) var ( serv = flag.String("service", "hello_service", "service name") reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address") ) func main() { flag.Parse() r := grpclb.NewResolver(*serv) b := grpc.RoundRobin(r) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b)) if err != nil { panic(err) } ticker := time.NewTicker(1 * time.Second) for t := range ticker.C { client := pb.NewGreeterClient(conn) resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())}) if err == nil { fmt.Printf("%v: Reply is %s\n", t, resp.Message) } } }
7)運行測試
運行3個服務端S一、S二、S3,1個客戶端C,觀察各服務端接收的請求數是否相等?
關閉1個服務端S1,觀察請求是否會轉移到另外2個服務端?
從新啓動S1服務端,觀察另外2個服務端請求是否會平均分配到S1?
關閉Etcd3服務器,觀察客戶端與服務端通訊是否正常?
關閉通訊仍然正常,但新服務端不會註冊進來,服務端掉線了也沒法摘除掉。
從新啓動Etcd3服務器,服務端上下線可自動恢復正常。
關閉全部服務端,客戶端請求將被阻塞。
參考:
http://www.grpc.io/docs/ https://github.com/grpc/grpc/blob/master/doc/load-balancing.md