目錄html
本節看看kratos的學習負載均衡策略的使用。node
kratos 的負載均衡和服務發現同樣也是基於grpc官方api實現的。git
grpc官方的負載均衡自帶了一個round-robin
輪詢策略、即像一個for循環同樣挨個服的發請求、但這顯然不能知足咱們的需求、因而kratos自帶了兩種負載均衡策略:github
WRR (Weighted Round Robin)
該算法在加權輪詢法基礎上增長了動態調節權重值,用戶能夠在爲每個節點先配置一個初始的權重分,以後算法會根據節點cpu、延遲、服務端錯誤率、客戶端錯誤率動態打分,在將打分乘用戶自定義的初始權重分獲得最後的權重值。golang
P2C (Pick of two choices)
本算法經過隨機選擇兩個node選擇優勝者來避免羊羣效應,並經過ewma儘可能獲取服務端的實時狀態。
服務端: 服務端獲取最近500ms內的CPU使用率(須要將cgroup設置的限制考慮進去,併除於CPU核心數),並將CPU使用率乘與1000後塞入每次grpc請求中的的Trailer中夾帶返回: cpu_usage uint64 encoded with string cpu_usage : 1000
客戶端: 主要參數:
server_cpu:經過每次請求中服務端塞在trailer中的cpu_usage拿到服務端最近500ms內的cpu使用率
inflight:當前客戶端正在發送並等待response的請求數(pending request)
latency: 加權移動平均算法計算出的接口延遲
client_success:加權移動平均算法計算出的請求成功率(只記錄grpc內部錯誤,好比context deadline)
目前客戶端,已經默認使用p2c負載均衡算法redis
// NewClient returns a new blank Client instance with a default client interceptor. // opt can be used to add grpc dial options. func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client { c := new(Client) if err := c.SetConfig(conf); err != nil { panic(err) } c.UseOpt(grpc.WithBalancerName(p2c.Name)) c.UseOpt(opt...) return c }
本節使用在筆記四kratos warden-direct方式client調用 使用的direct服務發現方式、和相關代碼。算法
demo操做
一、分別在兩個docker中啓動一個grpc demo服務。
二、啓動一個client demo服務採用默認p2c負載均衡方式調用grpc SayHello()方法sql
一、先啓動demo服務 (其實就是一個kratos工具new出來的demo服務、代碼可參考筆記4、或者在最後的github地址裏面獲取整個demo完整代碼):
docker
package dao import ( "context" "github.com/bilibili/kratos/pkg/net/rpc/warden" "google.golang.org/grpc" "fmt" demoapi "call-server/api" "google.golang.org/grpc/balancer/roundrobin" ) // target server addrs. const target = "direct://default/10.0.75.2:30001,10.0.75.2:30002" // NOTE: example // NewClient new member grpc client func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (demoapi.DemoClient, error) { client := warden.NewClient(cfg, opts...) conn, err := client.Dial(context.Background(), target) if err != nil { return nil, err } // 注意替換這裏: // NewDemoClient方法是在"api"目錄下代碼生成的 // 對應proto文件內自定義的service名字,請使用正確方法名替換 return demoapi.NewDemoClient(conn), nil } // NewClient new member grpc client func NewGrpcConn(cfg *warden.ClientConfig, opts ...grpc.DialOption) (*grpc.ClientConn, error) { fmt.Println("-----tag: NewGrpcConn...") //opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) client := warden.NewClient(cfg, opts...) conn, err := client.Dial(context.Background(), target) if err != nil { return nil, err } return conn, nil }
target 填上兩個服務ipapi
其中我多加了一個NewGrpcConn() 函數 、主要用來提取grpc鏈接。這裏我用了kratos自帶的pool類型來作鏈接池。
關於這個池、它在 kratos pkg/container/pool
有兩種實現方式 Slice
和List
方式。
package pool import ( "context" "errors" "io" "time" xtime "github.com/bilibili/kratos/pkg/time" ) var ( // ErrPoolExhausted connections are exhausted. ErrPoolExhausted = errors.New("container/pool exhausted") // ErrPoolClosed connection pool is closed. ErrPoolClosed = errors.New("container/pool closed") // nowFunc returns the current time; it's overridden in tests. nowFunc = time.Now ) // Config is the pool configuration struct. type Config struct { // Active number of items allocated by the pool at a given time. // When zero, there is no limit on the number of items in the pool. Active int // Idle number of idle items in the pool. Idle int // Close items after remaining item for this duration. If the value // is zero, then item items are not closed. Applications should set // the timeout to a value less than the server's timeout. IdleTimeout xtime.Duration // If WaitTimeout is set and the pool is at the Active limit, then Get() waits WatiTimeout // until a item to be returned to the pool before returning. WaitTimeout xtime.Duration // If WaitTimeout is not set, then Wait effects. // if Wait is set true, then wait until ctx timeout, or default flase and return directly. Wait bool } type item struct { createdAt time.Time c io.Closer } func (i *item) expired(timeout time.Duration) bool { if timeout <= 0 { return false } return i.createdAt.Add(timeout).Before(nowFunc()) } func (i *item) close() error { return i.c.Close() } // Pool interface. type Pool interface { Get(ctx context.Context) (io.Closer, error) Put(ctx context.Context, c io.Closer, forceClose bool) error Close() error }
dao中添加一個鏈接池。
package dao import ( "context" "time" demoapi "call-server/api" "call-server/internal/model" "github.com/bilibili/kratos/pkg/cache/memcache" "github.com/bilibili/kratos/pkg/cache/redis" "github.com/bilibili/kratos/pkg/conf/paladin" "github.com/bilibili/kratos/pkg/database/sql" "github.com/bilibili/kratos/pkg/net/rpc/warden" "github.com/bilibili/kratos/pkg/sync/pipeline/fanout" xtime "github.com/bilibili/kratos/pkg/time" //grpcempty "github.com/golang/protobuf/ptypes/empty" //"github.com/pkg/errors" "github.com/google/wire" "github.com/bilibili/kratos/pkg/container/pool" "io" "reflect" "google.golang.org/grpc" ) var Provider = wire.NewSet(New, NewDB, NewRedis, NewMC) //go:generate kratos tool genbts // Dao dao interface type Dao interface { Close() Ping(ctx context.Context) (err error) // bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1 Article(c context.Context, id int64) (*model.Article, error) //SayHello(c context.Context, req *demoapi.HelloReq) (resp *grpcempty.Empty, err error) //get an demo grpcConn/grpcClient/ from rpc pool GrpcConnPut(ctx context.Context, cc *grpc.ClientConn) (err error) GrpcConn(ctx context.Context) (gcc *grpc.ClientConn, err error) GrpcClient(ctx context.Context) (cli demoapi.DemoClient, err error) } // dao dao. type dao struct { db *sql.DB redis *redis.Redis mc *memcache.Memcache cache *fanout.Fanout demoExpire int32 rpcPool pool.Pool } // New new a dao and return. func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, cf func(), err error) { return newDao(r, mc, db) } func newDao(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d *dao, cf func(), err error) { var cfg struct { DemoExpire xtime.Duration } if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil { return } // new pool pool_config := &pool.Config{ Active: 0, Idle: 0, IdleTimeout: xtime.Duration(0 * time.Second), WaitTimeout: xtime.Duration(30 * time.Millisecond), } rpcPool := pool.NewSlice(pool_config) rpcPool.New = func(ctx context.Context) (cli io.Closer, err error) { wcfg := &warden.ClientConfig{} paladin.Get("grpc.toml").UnmarshalTOML(wcfg) if cli, err = NewGrpcConn(wcfg); err != nil { return } return } d = &dao{ db: db, redis: r, mc: mc, cache: fanout.New("cache"), demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second), rpcPool: rpcPool, } cf = d.Close return } // Close close the resource. func (d *dao) Close() { d.cache.Close() } // Ping ping the resource. func (d *dao) Ping(ctx context.Context) (err error) { return nil } func (d *dao) GrpcClient(ctx context.Context) (cli demoapi.DemoClient, err error) { var cc io.Closer if cc, err = d.rpcPool.Get(ctx); err != nil { return } cli = demoapi.NewDemoClient(reflect.ValueOf(cc).Interface().(*grpc.ClientConn)) return } func (d *dao) GrpcConnPut(ctx context.Context, cc *grpc.ClientConn) (err error) { err = d.rpcPool.Put(ctx, cc, false) return } func (d *dao) GrpcConn(ctx context.Context) (gcc *grpc.ClientConn, err error) { var cc io.Closer if cc, err = d.rpcPool.Get(ctx); err != nil { return } gcc = reflect.ValueOf(cc).Interface().(*grpc.ClientConn) return }
// SayHello grpc demo func. func (s *Service) SayHello(ctx context.Context, req *pb.HelloReq) (reply *empty.Empty, err error) { reply = new(empty.Empty) var cc demoapi.DemoClient var gcc *grpc.ClientConn if gcc, err = s.dao.GrpcConn(ctx); err != nil { return } defer s.dao.GrpcConnPut(ctx, gcc) cc = demoapi.NewDemoClient(gcc) //if cc, err = s.dao.GrpcClient(ctx); err != nil { // return //} cc.SayHello(ctx, req) fmt.Printf("hello %s", req.Name) return }
好了如今測試 、 佈局以下 :
輪詢方式只須要在NewGrpcConn()裏面加語一句配置項便可,它會覆蓋掉p2c的配置項。
opts = append(opts, grpc.WithBalancerName(roundrobin.Name))
咱們目前也只是使用了Api、最後來瞧瞧官方grpc的工做流程 :
gRPC開源組件官方並未直接提供服務註冊與發現的功能實現,但其設計文檔已提供實現的思路,並在不一樣語言的gRPC代碼API中已提供了命名解析和負載均衡接口供擴展。
服務啓動後,gPRC客戶端經過resolve發起一個名稱解析請求。名稱會被解析爲一個或更多的IP地址,每一個地址指明它是一個服務器地址仍是一個負載均衡器地址,而且包含一個Opt指明哪個客戶端的負載均衡策略應該被使用(例如: 輪詢調度或grpclb)。
客戶端實現一個負載均衡策略。
注意:若是任何一個被解析器返回的地址是均衡器地址,那麼這個客戶端會使用grpclb策略,而無論請求的Opt配置的是哪一種負載均衡策略。不然,客戶端會使用一個Opt項配置負載均衡策略。若是沒有負載均衡策略,那麼客戶端會使用默認的取第一個可用服務器地址的策略。
負載均衡策略對每個服務器地址建立一個子通道。
當調用rpc請求時,負載均衡策略會決定應該發送到哪一個子通道(例如: 哪一個服務器)。
grpclb策略下,客戶端按負載均衡器返回的順序發送請求到服務器。若是服務器列表爲空,調用將會阻塞直到收到一個非空的列表。
本節測試代碼 : https://github.com/ailumiyana/kratos-note/tree/master/warden/balancer