gRPC負載均衡-Golang

一. 負載均衡三種解決方案

構建高可用、高性能的通訊服務,一般採用服務註冊與發現、負載均衡和容錯處理等機制實現。根據負載均衡實現所在的位置不一樣,一般可分爲如下三種解決方案:html

  • 一、集中式LB(Proxy Model)
  • 二、進程內LB(Balancing-aware Client)
  • 三、獨立 LB 進程(External Load Balancing Service)

出處在這裏,寫的很詳細: 連接地址node

二. gRPC的準備

gRPC 默認使用 protocol buffers,這是 Google 開源的一套成熟的結構數據序列化機制(固然也可使用其餘數據格式如 JSON)。其客戶端提供Objective-C、Java接口,服務器側則有Java、Golang、C++等接口,從而爲移動端(iOS/Androi)到服務器端通信提供了一種解決方案。連接地址git

1.安裝brew,這個本身百度谷歌.github

2.終端 :golang

brew install autoconf automake libtool
複製代碼

3.安裝golang protobuf算法

go get -u github.com/golang/protobuf/proto // golang protobuf 庫
go get -u github.com/golang/protobuf/protoc-gen-go //protoc --go_out 工具
複製代碼

三. 簡單的protobuf

../proto/hello.protosegmentfault

syntax = "proto3";

package proto;

message SayReq {
    string content = 1;
}

message SayResp {
    string content = 1;
}

service Test{
    rpc Say(SayReq) returns (SayResp) {}
}
複製代碼

在proto下 , 輸入終端命令 protoc --go_out=plugins=grpc:. hello.proto服務器

生成 hello.pb.go 文件,經過protoc就能生成不一樣語言須要的.pb.go文件架構


這裏是介紹四種protoc,若是不用到流,能夠不看:

1:簡單 RPC負載均衡

2:服務器端流式 RPC

3:客戶端流式 RPC

4:雙向流式 RPC

4種方式與如下定義四種服務對應: test.proto文件: service Test{

rpc LZX1(SayReq) returns (SayResp) {}
rpc LZX2(SayReq) returns (stream SayResp) {}
rpc LZX3(stream SayReq) returns (SayResp) {}
rpc LZX4(stream SayReq) returns (stream SayResp) {}
複製代碼

}

生成的test.pb.go文件:

type TestClient interface {

LZX1(ctx context.Context, in *SayReq, opts ...grpc.CallOption) (*SayResp, error)
LZX2(ctx context.Context, in *SayReq, opts ...grpc.CallOption) (Test_LZX2Client, error)
LZX3(ctx context.Context, opts ...grpc.CallOption) (Test_LZX3Client, error)
LZX4(ctx context.Context, opts ...grpc.CallOption) (Test_LZX4Client, error)
複製代碼

}

因此無流的函數,也就是第一種簡單的RPC,都只是一一對應.只要有流,返回的類型都是 服務結構名_函數名Client;只要客戶端是流式,傳參將不包含其餘參數.


四.6種負載均衡算法

一、輪詢法

二、隨機法

三、源地址哈希法

四、加權輪詢法

五、加權隨機法

六、最小鏈接數法

算法的描述看這裏:連接地址

五. gRPC的例子

例子出處連接:連接地址

架構:

如下用的是隨機負載均衡:

客戶端 etcd/client/random/main.go

package main

import (
	etcd "github.com/coreos/etcd/client"
	grpclb "github.com/liyue201/grpc-lb"
	"github.com/liyue201/grpc-lb/examples/proto"
	registry "github.com/liyue201/grpc-lb/registry/etcd"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"log"
	"strconv"
	"time"
)

func main() {
	etcdConfg := etcd.Config{
		Endpoints: []string{"http://120.24.44.201:2379"},
	}
	r := registry.NewResolver("/grpc-lb", "test", etcdConfg) // 加載registry
	b := grpclb.NewBalancer(r, grpclb.NewRandomSelector())   //加載grpclbs
	c, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(b))
	if err != nil {
		log.Printf("grpc dial: %s", err)
		return
	}
	defer c.Close()

	client := proto.NewTestClient(c)
	var num int
	for i := 0; i < 1000; i++ {
		resp, err := client.Say(context.Background(), &proto.SayReq{Content: "random"})
		if err != nil {
			log.Println(err)
			time.Sleep(time.Second)
			continue
		}
		time.Sleep(time.Second)
		num++
		log.Printf(resp.Content + ",  clientOfnum: " + strconv.Itoa(num))
	}
}
複製代碼

服務端 etcd/server/main.go

package main

import (
	"flag"
	"fmt"
	etcd "github.com/coreos/etcd/client"
	"github.com/liyue201/grpc-lb/examples/proto"
	registry "github.com/liyue201/grpc-lb/registry/etcd"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"log"
	"net"
	"sync"
	"time"
)

var nodeID = flag.String("node", "node1", "node ID")
var port = flag.Int("port", 8080, "listening port")

type RpcServer struct {
	addr string
	s    *grpc.Server
}

func NewRpcServer(addr string) *RpcServer {
	s := grpc.NewServer()
	rs := &RpcServer{
		addr: addr,
		s:    s,
	}
	return rs
}

func (s *RpcServer) Run() {
	listener, err := net.Listen("tcp", s.addr)
	if err != nil {
		log.Printf("failed to listen: %v", err)
		return
	}
	log.Printf("rpc listening on:%s", s.addr)

	proto.RegisterTestServer(s.s, s)
	s.s.Serve(listener)
}

func (s *RpcServer) Stop() {
	s.s.GracefulStop()
}

var num int

func (s *RpcServer) Say(ctx context.Context, req *proto.SayReq) (*proto.SayResp, error) {
	num++
	text := "Hello " + req.Content + ", I am " + *nodeID + ", serverOfnum: " + strconv.Itoa(num)
	log.Println(text)

	return &proto.SayResp{Content: text}, nil
}

func StartService() {
	etcdConfg := etcd.Config{
		Endpoints: []string{"http://120.24.44.201:2379"},
	}

	registry, err := registry.NewRegistry(
		registry.Option{
			EtcdConfig:  etcdConfg,
			RegistryDir: "/grpc-lb",
			ServiceName: "test",
			NodeID:      *nodeID,
			NData: registry.NodeData{
				Addr: fmt.Sprintf("127.0.0.1:%d", *port),
				//Metadata: map[string]string{"weight": "1"},
			},
			Ttl: 10 * time.Second,
		})
	if err != nil {
		log.Panic(err)
		return
	}
	server := NewRpcServer(fmt.Sprintf("0.0.0.0:%d", *port))
	wg := sync.WaitGroup{}

	wg.Add(1)
	go func() {
		server.Run()
		wg.Done()
	}()

	wg.Add(1)
	go func() {
		registry.Register()
		wg.Done()
	}()

	//stop the server after one minute
	//go func() {
	//	time.Sleep(time.Minute)
	//	server.Stop()
	//	registry.Deregister()
	//}()

	wg.Wait()
}

func main() {
	flag.Parse()
	StartService()
}
複製代碼

服務端的代碼以下, 使用如下命令運行3個服務進程,再啓動客戶端。

go run main.go -node node1 -port 28544

go run main.go -node node2 -port 18562

go run main.go -node node3 -port 27772

六.最終效果

(客戶端不停的隨機訪問三個服務端)

客戶端:


服務端node1:


服務端node2:


服務端node3:


斷開服務端node1,node3,client照樣能跑,並只鏈接到node2服務端:

客戶端:

服務器node2:


再次啓動node1,node3服務端,client自動鏈接上,並繼續隨機訪問3臺服務端:

客戶端:

服務端node1:

服務端node3:


最後,關閉全部服務端,客戶端處於阻塞狀態.

相關文章
相關標籤/搜索