到目前爲止咱們已經支持了基本的RPC調用,也支持基於zk的服務註冊和發現,還支持鑑權和熔斷等等。雖然實現得都很是簡單,可是這些功能都是基於可替換的接口實現的,因此咱們後續能夠很方便的替換成更加完善成熟的實現。git
此次咱們繼續服務治理方面的功能,包括註冊中心優化、限流的支持、鏈路追蹤的支持,同時增長了一種路由策略。github
具體代碼參考:githubdocker
在上一篇文章裏咱們藉助libkv實現了基於zookeeper的服務註冊與發現,此次咱們更進一步,將咱們的ZookeeperRegistry改形成支持多種數據源的Registry。實際上的改造也比較簡單,最重要的註冊、發現以及通知等都已經完成了,咱們只須要將底層的數據源類型改造爲可配置的便可。代碼以下:api
//Registry的定義,就是從上一篇的ZookeeperRegistry改過來的
type KVRegistry struct {
AppKey string //KVRegistry
ServicePath string //數據存儲的基本路徑位置,好比/service/providers
UpdateInterval time.Duration //定時拉取數據的時間間隔
kv store.Store //store實例是一個封裝過的客戶端
providersMu sync.RWMutex
providers []registry.Provider //本地緩存的列表
watchersMu sync.Mutex
watchers []*Watcher //watcher列表
}
//初始化邏輯,根據backend參數的不一樣支持不一樣的底層數據源
func NewKVRegistry(backend store.Backend,addrs []string,AppKey string,cfg *store.Config,ServicePath string,updateInterval time.Duration) registry.Registry {
//libkv中須要顯式初始化數據源
switch backend {
case store.ZK:
zookeeper.Register()
case store.ETCD:
etcd.Register()
case store.CONSUL:
consul.Register()
case store.BOLTDB:
boltdb.Register()
}
r := new(KVRegistry)
r.AppKey = AppKey
r.ServicePath = ServicePath
r.UpdateInterval = updateInterval
//生成實際的數據源
kv, err := libkv.NewStore(backend, addrs, cfg)
if err != nil {
log.Fatalf("cannot create kv registry: %v", err)
}
r.kv = kv
//省略了後面的初始化邏輯,由於和以前沒有改動
return r
}
複製代碼
這裏其實是偷懶了,能夠看出來這裏徹底就是對libkv的包裝,因此可以支持的數據源也僅限libkv支持的幾種,包括:boltdb、etcd、consul、zookeeper。後續若是要支持其餘的註冊中西好比eureka或者narcos,就得本身寫接入代碼了。緩存
當前的限流是基於Ticker實現的,同時支持服務端和客戶端的限流,具體的邏輯參考了https://gobyexample.com/rate-limiting裏的實現。app
首先列舉限流器接口的定義:框架
type RateLimiter interface {
//獲取許可,會阻塞直到得到許可
Acquire()
//嘗試獲取許可,若是不成功會當即返回false,而不是一直阻塞
TryAcquire() bool
//獲取許可,會阻塞直到得到許可或者超時,超時時會返回一個超時異常,成功時返回nil
AcquireWithTimeout(duration time.Duration) error
}
複製代碼
客戶端的實現以下(基於Wrapper):分佈式
type RateLimitInterceptor struct {
//內嵌了defaultClientInterceptor,defaultClientInterceptor類實現了Wrapper的全部方法,咱們只須要覆蓋本身須要實現的方法便可
defaultClientInterceptor
Limit ratelimit.RateLimiter
}
var ErrRateLimited = errors.New("request limited")
func (r *RateLimitInterceptor) WrapCall(option *SGOption, callFunc CallFunc) CallFunc {
return func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error {
if r.Limit != nil {
//進行嘗試獲取,獲取失敗時直接返回限流異常
if r.Limit.TryAcquire() {
return callFunc(ctx, ServiceMethod, arg, reply)
} else {
return ErrRateLimited
}
} else {//若限流器爲nil則不進行限流
return callFunc(ctx, ServiceMethod, arg, reply)
}
}
}
func (r *RateLimitInterceptor) WrapGo(option *SGOption, goFunc GoFunc) GoFunc {
return func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) *Call {
if r.Limit != nil {
//進行嘗試獲取,獲取失敗時直接返回限流異常
if r.Limit.TryAcquire() {
return goFunc(ctx, ServiceMethod, arg, reply, done)
} else {
call := &Call{
ServiceMethod: ServiceMethod,
Args:arg,
Reply: nil,
Error: ErrRateLimited,
Done: done,
}
done <- call
return call
}
} else {//若限流器爲nil則不進行限流
return goFunc(ctx, ServiceMethod, arg, reply, done)
}
}
}
複製代碼
服務端的限流實現以下(基於Wrapper):ide
type RequestRateLimitInterceptor struct {
//這裏內嵌了defaultServerInterceptor,defaultServerInterceptor類實現了Wrapper的全部方法,咱們只須要覆蓋本身須要實現的方法便可
defaultServerInterceptor
Limiter ratelimit.RateLimiter
}
func (rl *RequestRateLimitInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {
return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {
if rl.Limiter != nil {
//進行嘗試獲取,獲取失敗時直接返回限流異常
if rl.Limiter.TryAcquire() {
requestFunc(ctx, request, response, tr)
} else {
s.writeErrorResponse(response, tr, "request limited")
}
} else {//若是限流器爲nil則直接返回
requestFunc(ctx, request, response, tr)
}
}
}
複製代碼
能夠看到這裏的限流邏輯很是簡單,只支持全侷限流,沒有區分各個方法,但要支持也很簡單,在Wrapper裏維護一個方法到限流器的map,在限流時根據具體的方法名獲取不一樣的限流器進行限流判斷便可;同時這裏限流也是基於單機的,不支持集羣限流,要支持集羣級別的限流須要獨立的數據源進行次數統計等等,這裏暫時不涉及了。函數
鏈路追蹤在大型分佈式系統中能夠有效地幫助咱們進行故障排查、性能分析等等。鏈路追蹤一般包括三部分工做:數據埋點、數據收集和數據展現,而到RPC框架這裏實際上就只涉及數據埋點了。目前業界有許多鏈路追蹤的產品,而他們各自的api和實現都不同,要支持不一樣的產品須要作不少額外的兼容改造工做,因而就有了opentracing規範。opentracing旨在統一各個不一樣的追蹤產品的api,提供標準的接入層。而咱們這裏就直接集成opentracing,用戶能夠在使用時綁定到不一樣的opentracing實現,比較主流的opentracing實現有zipkin和jaeger。
客戶端鏈路追蹤的實現(一樣基於Wrapper):
//目前只作了同步調用支持
type OpenTracingInterceptor struct {
defaultClientInterceptor
}
func (*OpenTracingInterceptor) WrapCall(option *SGOption, callFunc CallFunc) CallFunc {
return func(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error {
var clientSpan opentracing.Span
if ServiceMethod != "" { //不是心跳的請求才進行追蹤
//先從當前context獲取已存在的追蹤信息
var parentCtx opentracing.SpanContext
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentCtx = parent.Context()
}
//開始埋點
clientSpan := opentracing.StartSpan(
ServiceMethod,
opentracing.ChildOf(parentCtx),
ext.SpanKindRPCClient)
defer clientSpan.Finish()
meta := metadata.FromContext(ctx)
writer := &trace.MetaDataCarrier{&meta}
//將追蹤信息注入到metadata中,經過rpc傳遞到下游
injectErr := opentracing.GlobalTracer().Inject(clientSpan.Context(), opentracing.TextMap, writer)
if injectErr != nil {
log.Printf("inject trace error: %v", injectErr)
}
ctx = metadata.WithMeta(ctx, meta)
}
err := callFunc(ctx, ServiceMethod, arg, reply)
if err != nil && clientSpan != nil {
clientSpan.LogFields(opentracingLog.String("error", err.Error()))
}
return err
}
}
複製代碼
服務端鏈路追蹤的實現(一樣基於Wrapper):
type OpenTracingInterceptor struct {
defaultServerInterceptor
}
func (*OpenTracingInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {
return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {
if protocol.MessageTypeHeartbeat != request.MessageType {
meta := metadata.FromContext(ctx)
//從metadata中提取追蹤信息
spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, &trace.MetaDataCarrier{&meta})
if err != nil && err != opentracing.ErrSpanContextNotFound {
log.Printf("extract span from meta error: %v", err)
}
//開始服務端埋點
serverSpan := opentracing.StartSpan(
request.ServiceName + "." + request.MethodName,
ext.RPCServerOption(spanContext),
ext.SpanKindRPCServer)
defer serverSpan.Finish()
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
}
requestFunc(ctx, request, response, tr)
}
}
複製代碼
能夠看到咱們實現鏈路追蹤的邏輯主要就是兩部分:
前面也提到了,RPC框架的工做也僅限於數據埋點而已,剩下的數據收集和數據展現部分須要依賴具體的產品。用戶須要在程序裏設置具體的實現,相似這樣:
//mocktracker是mock的追蹤,只限於測試目的使用
opentracing.SetGlobalTracer(mocktracer.New())
//或者使用jaeger
import (
"github.com/uber/jaeger-client-go/config"
"github.com/uber/jaeger-lib/metrics/prometheus"
)
metricsFactory := prometheus.New()
tracer, closer, err := config.Configuration{
ServiceName: "your-service-name",
}.NewTracer(
config.Metrics(metricsFactory),
)
//設置tracer
opentracing.SetGlobalTracer(tracer)
複製代碼
最後咱們來實現基於服務端元數據的規則路由,用戶在實際使用過程當中,確定有一些特殊的路由要求,好比「咱們的服務運行在不一樣的idc,我但願可以儘可能保證同idc相互調用」,或者「我但願可以在運行時切斷某個服務提供者的流量」,這些需求均可以抽象成基於標籤的路由。咱們給每一個服務提供者都打上不一樣的標籤,客戶端在調用時會根據本身的須要過濾出符合某些標籤的服務提供者。
而標籤的具體實現就是將標籤放到服務提供者的元數據裏,這些元數據會被註冊到註冊中心,也會被客戶端服務發現時獲取到,客戶端在調用前進行過濾便可。
代碼實現:
//服務端註冊時,將咱們設置的tags做爲元數據註冊到註冊中心
func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {
return func(network string, addr string, meta map[string]interface{}) error {
//省略註冊shutdownHook的邏輯
...
if meta == nil {
meta = make(map[string]interface{})
}
//注入tags
if len(s.Option.Tags) > 0 {
meta["tags"] = s.Option.Tags
}
meta["services"] = s.Services()
provider := registry.Provider{
ProviderKey: network + "@" + addr,
Network: network,
Addr: addr,
Meta: meta,
}
r := s.Option.Registry
rOpt := s.Option.RegisterOption
r.Register(rOpt, provider)
log.Printf("registered provider %v for app %s", provider, rOpt)
return serveFunc(network, addr, meta)
}
}
//客戶端實現,基於tags進行過濾
func TaggedProviderFilter(tags map[string]string) Filter {
return func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool {
if tags == nil {
return true
}
if provider.Meta == nil {
return false
}
providerTags, ok := provider.Meta["tags"].(map[string]string)
if !ok || len(providerTags) <= 0{
return false
}
for k, v := range tags {
if tag, ok := providerTags[k];ok {
if tag != v {
return false
}
} else {
return false
}
}
return true
}
}
複製代碼
這裏的實現當中,服務端和客戶端的標籤在註冊前就已經設置好了,只能知足比較簡單的策略,後續再考慮實現運行時修改標籤的支持了。
今天的內容就到此爲止了,實際上咱們的不少功能都是基於最開始定義的Wrapper實現的攔截器來完成的。這樣設計的好處就是能保證對擴展開放,對修改關閉,也就是開閉原則,咱們在擴充時能夠徹底不影響以前的邏輯。可是這種基於高階函數的實現有個不方便的地方就是debug時比較困難,不容易找到具體的實現邏輯,不知道有沒有更好的解決方式。