grpc-lb採用客戶端進程內負載均衡方式,支持隨機、輪詢、一致性哈希三種負載均衡策略,並支持服務端權重。可採用etcd或consul做爲註冊中心。node
項目地址:
https://github.com/liyue201/g...git
基本架構如圖,服務提供者起來後向註冊中心註冊本身的信息,ip、端口、權重等,並保持心跳。客戶端監聽註冊中心,獲取服務器列表,一旦服務器發生變化,客戶端立刻更新本地的服務器列表。客戶端每一個請求都經過負載均衡策略選擇一個合適的服務器去訪問。github
隨機負載均衡客戶端例子:golang
package main import ( etcd "github.com/coreos/etcd/client" grpclb "github.com/liyue201/grpc-lb" "github.com/liyue201/grpc-lb/examples/proto" registry "github.com/liyue201/grpc-lb/registry/etcd" "golang.org/x/net/context" "google.golang.org/grpc" "log" ) func main() { etcdConfg := etcd.Config{ Endpoints: []string{"http://120.24.44.201:4001"}, } r := registry.NewResolver("/grpc-lb", "test", etcdConfg) b := grpclb.NewBalancer(r, grpclb.NewRandomSelector()) c, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(b)) if err != nil { log.Printf("grpc dial: %s", err) return } defer c.Close() client := proto.NewTestClient(c) resp, err := client.Say(context.Background(), &proto.SayReq{Content: "random"}) if err != nil { log.Println(err) return } log.Printf(resp.Content) }
輪詢負載均衡,只需把NewRandomSelector改爲NewRoundRobinSelector便可。服務器
package main import ( etcd "github.com/coreos/etcd/client" grpclb "github.com/liyue201/grpc-lb" "github.com/liyue201/grpc-lb/examples/proto" registry "github.com/liyue201/grpc-lb/registry/etcd" "golang.org/x/net/context" "google.golang.org/grpc" "log" ) func main() { etcdConfg := etcd.Config{ Endpoints: []string{"http://120.24.44.201:4001"}, } r := registry.NewResolver("/grpc-lb", "test", etcdConfg) b := grpclb.NewBalancer(r, grpclb.NewRoundRobinSelector()) c, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(b)) if err != nil { log.Printf("grpc dial: %s", err) return } defer c.Close() client := proto.NewTestClient(c) resp, err := client.Say(context.Background(), &proto.SayReq{Content: "round robin"}) if err != nil { log.Println(err) return } log.Printf(resp.Content) }
一致性哈希負載均衡,須要給每一個請求傳一個哈希的參數,這個根據應用場景而定,就是下面這個例子中的hashData。架構
package main import ( "fmt" etcd "github.com/coreos/etcd/client" grpclb "github.com/liyue201/grpc-lb" "github.com/liyue201/grpc-lb/examples/proto" registry "github.com/liyue201/grpc-lb/registry/etcd" "golang.org/x/net/context" "google.golang.org/grpc" "log" "time" ) func main() { etcdConfg := etcd.Config{ Endpoints: []string{"http://120.24.44.201:4001"}, } r := registry.NewResolver("/grpc-lb", "test", etcdConfg) b := grpclb.NewBalancer(r, grpclb.NewKetamaSelector(grpclb.DefaultKetamaKey)) c, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(b), grpc.WithTimeout(time.Second)) if err != nil { log.Printf("grpc dial: %s", err) return } client := proto.NewTestClient(c) for i := 0; i < 10; i++ { ctx := context.Background() hashData := fmt.Sprintf("aaaa %d", i) resp, err := client.Say(context.WithValue(ctx, grpclb.DefaultKetamaKey, hashData), &proto.SayReq{Content: "ketama"}) if err != nil { log.Println(err) time.Sleep(time.Second) continue } log.Printf(resp.Content) time.Sleep(time.Second) } }
服務端的代碼以下, 使用如下命令運行3個服務進程,再啓動客戶端。
go run main.go -node node1 -port 28544
go run main.go -node node2 -port 18562
go run main.go -node node3 -port 27772負載均衡
package main import ( "flag" "fmt" etcd "github.com/coreos/etcd/client" "github.com/liyue201/grpc-lb/examples/proto" registry "github.com/liyue201/grpc-lb/registry/etcd" "golang.org/x/net/context" "google.golang.org/grpc" "log" "net" "sync" "time" ) var nodeID = flag.String("node", "node1", "node ID") var port = flag.Int("port", 8080, "listening port") type RpcServer struct { addr string s *grpc.Server } func NewRpcServer(addr string) *RpcServer { s := grpc.NewServer() rs := &RpcServer{ addr: addr, s: s, } return rs } func (s *RpcServer) Run() { listener, err := net.Listen("tcp", s.addr) if err != nil { log.Printf("failed to listen: %v", err) return } log.Printf("rpc listening on:%s", s.addr) proto.RegisterTestServer(s.s, s) s.s.Serve(listener) } func (s *RpcServer) Stop() { s.s.GracefulStop() } func (s *RpcServer) Say(ctx context.Context, req *proto.SayReq) (*proto.SayResp, error) { text := "Hello " + req.Content + ", I am " + *nodeID log.Println(text) return &proto.SayResp{Content: text}, nil } func StartService() { etcdConfg := etcd.Config{ Endpoints: []string{"http://120.24.44.201:4001"}, } registry, err := registry.NewRegistry( registry.Option{ EtcdConfig: etcdConfg, RegistryDir: "/grpc-lb", ServiceName: "test", NodeID: *nodeID, NData: registry.NodeData{ Addr: fmt.Sprintf("127.0.0.1:%d", *port), //Metadata: map[string]string{"weight": "1"}, //這裏配置權重,不配置默認是1 }, Ttl: 10 * time.Second, }) if err != nil { log.Panic(err) return } server := NewRpcServer(fmt.Sprintf("0.0.0.0:%d", *port)) wg := sync.WaitGroup{} wg.Add(1) go func() { server.Run() wg.Done() }() wg.Add(1) go func() { registry.Register() wg.Done() }() //stop the server after one minute //go func() { // time.Sleep(time.Minute) // server.Stop() // registry.Deregister() //}() wg.Wait() } //go run main.go -node node1 -port 28544 //go run main.go -node node2 -port 18562 //go run main.go -node node3 -port 27772 func main() { flag.Parse() StartService() }