從零開始實現一個RPC框架(三)

前言

到目前爲止咱們的框架已經有了一部分服務治理的功能,此次咱們在以前的基礎上實現一些其餘功能。篇幅所限這裏只列舉部分實現,完整代碼參考:githubgit

zookeeper註冊中心

實現咱們以前的註冊中心的接口便可,這裏使用了docker的libkv而不是直接用zk客戶端(從rpcx那學的),libkv封裝了對於幾種存儲服務的操做,包括Consul、Etcd、Zookeeper和BoltDB,後續若是要支持其餘類型的存儲就得本身寫客戶端了。基於zk的註冊中心的定義以下:github

type ZookeeperRegistry struct {
	AppKey         string //一個ZookeeperRegistry實例和一個appkey關聯
	ServicePath    string //數據存儲的基本路徑位置,好比/service/providers
	UpdateInterval time.Duration //定時拉取數據的時間間隔
	kv store.Store //封裝過的zk客戶端
	providersMu sync.RWMutex
	providers   []registry.Provider //本地緩存的列表
	watchersMu sync.Mutex
	watchers   []*Watcher //watcher列表
}
複製代碼

初始化部分邏輯以下:docker

func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string, updateInterval time.Duration, cfg *store.Config) registry.Registry {
	zk := new(ZookeeperRegistry)
	zk.AppKey = AppKey
	zk.ServicePath = ServicePath
	zk.UpdateInterval = updateInterval
	kv, err := libkv.NewStore(store.ZK, zkAddrs, cfg)
	if err != nil {
		log.Fatalf("cannot create zk registry: %v", err)
	}
	zk.kv = kv
	basePath := zk.ServicePath
	if basePath[0] == '/' { //路徑不能以"/"開頭
		basePath = basePath[1:]
		zk.ServicePath = basePath
	}
	//先建立基本路徑
	err = zk.kv.Put(basePath, []byte("base path"), &store.WriteOptions{IsDir: true})
	if err != nil {
		log.Fatalf("cannot create zk path %s: %v", zk.ServicePath, err)
	}
	//顯式拉取第一次數據
	zk.doGetServiceList()
	go func() {
		t := time.NewTicker(updateInterval)
		for range t.C {
			//定時拉取數據
			zk.doGetServiceList()
		}
	}()
	go func() {
		//後臺watch數據
		zk.watch()
	}()
	return zk
}
複製代碼

咱們在初始化註冊中心時執行兩個後臺任務:定時拉取和監聽數據,至關於推拉結合的方式。同時監聽得到的數據是全量數據,由於實現起來簡單一些,後續若是服務列表愈來愈大時,可能須要加上基於版本號的機制或者只傳輸增量數據。這裏額外指出幾個要點:緩存

  1. 後臺定時拉取數據並緩存起來
  2. 查詢時直接返回緩存
  3. 註冊時在zk添加節點,註銷時在zk刪除節點
  4. 監聽時並不監聽每一個服務提供者,而是監聽其父級目錄,有變動時再統一拉取服務提供者列表,這樣能夠減小watcher的數目,邏輯也更簡單一些
  5. 由於第4點,因此註冊和註銷時須要更改父級目錄的內容(lastUpdate)來觸發監聽

具體的註冊註銷邏輯這裏再也不列舉,參考:githubapp

客戶端心跳

若是咱們使用zk做爲註冊中心,更簡單的作法多是直接將服務提供者做爲臨時節點添加到zk上,這樣就能夠利用臨時節點的特性實現動態的服務發現。可是咱們使用的libkv庫並不支持臨時節點的功能,並且除了zk其餘存儲服務好比etcd等可能也不支持臨時節點的特性,因此咱們註冊到註冊中心的都是持久節點。在這種狀況下,可能某些因爲特殊狀況沒法訪問的服務提供者並無及時地將自身從註冊中心註銷掉,因此客戶端須要額外的能力來判斷一個服務提供者是否可用,而不是徹底依賴註冊中心。框架

因此咱們須要增長客戶端心跳的支持,客戶端能夠定時向服務端發送心跳請求,服務端收到心跳請求時能夠直接返回,只要通知客戶端自身仍然可用就行。客戶端能夠根據設置的閾值,對心跳失敗的服務提供者進行降級處理,直到心跳恢復或者服務提供者被註銷掉。客戶端發送心跳邏輯以下:ide

func (c *sgClient) heartbeat() {
	if c.option.HeartbeatInterval <= 0 {
		return
	}
	//根據指定的時間間隔發送心跳
	t := time.NewTicker(c.option.HeartbeatInterval)
	for range t.C {
		if c.shutdown {
			t.Stop()
			return
		}
		//遍歷每一個RPCClient進行心跳檢查
		c.clients.Range(func(k, v interface{}) bool {
			err := v.(RPCClient).Call(context.Background(), "", "", nil)
			c.mu.Lock()
			if err != nil {
				//心跳失敗進行計數
				if fail, ok := c.clientsHeartbeatFail[k.(string)]; ok {
					fail++
					c.clientsHeartbeatFail[k.(string)] = fail
				} else {
					c.clientsHeartbeatFail[k.(string)] = 1
				}
			} else {
				//心跳成功則進行恢復
				c.clientsHeartbeatFail[k.(string)] = 0
				c.serversMu.Lock()
				for i, p := range c.servers {
					if p.ProviderKey == k {
						delete(c.servers[i].Meta, protocol.ProviderDegradeKey)
					}
				}
				c.serversMu.Unlock()
			}
			c.mu.Unlock()
			//心跳失敗次數超過閾值則進行降級
			if c.clientsHeartbeatFail[k.(string)] > c.option.HeartbeatDegradeThreshold {
				c.serversMu.Lock()
				for i, p := range c.servers {
					if p.ProviderKey == k {
						c.servers[i].Meta[protocol.ProviderDegradeKey] = true
					}
				}
				c.serversMu.Unlock()
			}
			return true
		})
	}
}
複製代碼

鑑權

鑑權的實現比較簡單,客戶端能夠在元數據中攜帶鑑權相關的信息,而服務端能夠經過指定的Wrapper進行鑑權。服務端Wrapper的代碼以下:post

type AuthFunc func(key string) bool type ServerAuthInterceptor struct {
	authFunc AuthFunc
}
func NewAuthInterceptor(authFunc AuthFunc) Wrapper {
	return &ServerAuthInterceptor{authFunc}
}
func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {
	return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {
		if auth, ok := ctx.Value(protocol.AuthKey).(string); ok {
			//鑑權經過則執行業務邏輯
			if sai.authFunc(auth) {
				requestFunc(ctx, response, response, tr)
				return
			}
		}
		//鑑權失敗則返回異常
		s.writeErrorResponse(response, tr, "auth failed")
	}
}
複製代碼

熔斷降級

暫時實現了簡單的基於時間窗口的熔斷器,實現以下:ui

type CircuitBreaker interface {
	AllowRequest() bool
	Success()
	Fail(err error)
}
type DefaultCircuitBreaker struct {
	lastFail  time.Time
	fails     uint64
	threshold uint64
	window    time.Duration
}
func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {
	return &DefaultCircuitBreaker{
		threshold: threshold,
		window:    window,
	}
}
func (cb *DefaultCircuitBreaker) AllowRequest() bool {
	if time.Since(cb.lastFail) > cb.window {
		cb.reset()
		return true
	}

	failures := atomic.LoadUint64(&cb.fails)
	return failures < cb.threshold
}
func (cb *DefaultCircuitBreaker) Success() {
	cb.reset()
}
func (cb *DefaultCircuitBreaker) Fail() {
	atomic.AddUint64(&cb.fails, 1)
	cb.lastFail = time.Now()
}
func (cb *DefaultCircuitBreaker) reset() {
	atomic.StoreUint64(&cb.fails, 0)
	cb.lastFail = time.Now()
}
複製代碼

結語

此次的內容就到此爲止,有任何意見或者建議歡迎指正。atom

歷史連接

從零開始實現一個RPC框架(零)

從零開始實現一個RPC框架(一)

從零開始實現一個RPC框架(二)

相關文章
相關標籤/搜索