到目前爲止咱們的框架已經有了一部分服務治理的功能,此次咱們在以前的基礎上實現一些其餘功能。篇幅所限這裏只列舉部分實現,完整代碼參考:githubgit
實現咱們以前的註冊中心的接口便可,這裏使用了docker的libkv而不是直接用zk客戶端(從rpcx那學的),libkv封裝了對於幾種存儲服務的操做,包括Consul、Etcd、Zookeeper和BoltDB,後續若是要支持其餘類型的存儲就得本身寫客戶端了。基於zk的註冊中心的定義以下:github
type ZookeeperRegistry struct {
AppKey string //一個ZookeeperRegistry實例和一個appkey關聯
ServicePath string //數據存儲的基本路徑位置,好比/service/providers
UpdateInterval time.Duration //定時拉取數據的時間間隔
kv store.Store //封裝過的zk客戶端
providersMu sync.RWMutex
providers []registry.Provider //本地緩存的列表
watchersMu sync.Mutex
watchers []*Watcher //watcher列表
}
複製代碼
初始化部分邏輯以下:docker
func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string, updateInterval time.Duration, cfg *store.Config) registry.Registry {
zk := new(ZookeeperRegistry)
zk.AppKey = AppKey
zk.ServicePath = ServicePath
zk.UpdateInterval = updateInterval
kv, err := libkv.NewStore(store.ZK, zkAddrs, cfg)
if err != nil {
log.Fatalf("cannot create zk registry: %v", err)
}
zk.kv = kv
basePath := zk.ServicePath
if basePath[0] == '/' { //路徑不能以"/"開頭
basePath = basePath[1:]
zk.ServicePath = basePath
}
//先建立基本路徑
err = zk.kv.Put(basePath, []byte("base path"), &store.WriteOptions{IsDir: true})
if err != nil {
log.Fatalf("cannot create zk path %s: %v", zk.ServicePath, err)
}
//顯式拉取第一次數據
zk.doGetServiceList()
go func() {
t := time.NewTicker(updateInterval)
for range t.C {
//定時拉取數據
zk.doGetServiceList()
}
}()
go func() {
//後臺watch數據
zk.watch()
}()
return zk
}
複製代碼
咱們在初始化註冊中心時執行兩個後臺任務:定時拉取和監聽數據,至關於推拉結合的方式。同時監聽得到的數據是全量數據,由於實現起來簡單一些,後續若是服務列表愈來愈大時,可能須要加上基於版本號的機制或者只傳輸增量數據。這裏額外指出幾個要點:緩存
具體的註冊註銷邏輯這裏再也不列舉,參考:githubapp
若是咱們使用zk做爲註冊中心,更簡單的作法多是直接將服務提供者做爲臨時節點添加到zk上,這樣就能夠利用臨時節點的特性實現動態的服務發現。可是咱們使用的libkv庫並不支持臨時節點的功能,並且除了zk其餘存儲服務好比etcd等可能也不支持臨時節點的特性,因此咱們註冊到註冊中心的都是持久節點。在這種狀況下,可能某些因爲特殊狀況沒法訪問的服務提供者並無及時地將自身從註冊中心註銷掉,因此客戶端須要額外的能力來判斷一個服務提供者是否可用,而不是徹底依賴註冊中心。框架
因此咱們須要增長客戶端心跳的支持,客戶端能夠定時向服務端發送心跳請求,服務端收到心跳請求時能夠直接返回,只要通知客戶端自身仍然可用就行。客戶端能夠根據設置的閾值,對心跳失敗的服務提供者進行降級處理,直到心跳恢復或者服務提供者被註銷掉。客戶端發送心跳邏輯以下:ide
func (c *sgClient) heartbeat() {
if c.option.HeartbeatInterval <= 0 {
return
}
//根據指定的時間間隔發送心跳
t := time.NewTicker(c.option.HeartbeatInterval)
for range t.C {
if c.shutdown {
t.Stop()
return
}
//遍歷每一個RPCClient進行心跳檢查
c.clients.Range(func(k, v interface{}) bool {
err := v.(RPCClient).Call(context.Background(), "", "", nil)
c.mu.Lock()
if err != nil {
//心跳失敗進行計數
if fail, ok := c.clientsHeartbeatFail[k.(string)]; ok {
fail++
c.clientsHeartbeatFail[k.(string)] = fail
} else {
c.clientsHeartbeatFail[k.(string)] = 1
}
} else {
//心跳成功則進行恢復
c.clientsHeartbeatFail[k.(string)] = 0
c.serversMu.Lock()
for i, p := range c.servers {
if p.ProviderKey == k {
delete(c.servers[i].Meta, protocol.ProviderDegradeKey)
}
}
c.serversMu.Unlock()
}
c.mu.Unlock()
//心跳失敗次數超過閾值則進行降級
if c.clientsHeartbeatFail[k.(string)] > c.option.HeartbeatDegradeThreshold {
c.serversMu.Lock()
for i, p := range c.servers {
if p.ProviderKey == k {
c.servers[i].Meta[protocol.ProviderDegradeKey] = true
}
}
c.serversMu.Unlock()
}
return true
})
}
}
複製代碼
鑑權的實現比較簡單,客戶端能夠在元數據中攜帶鑑權相關的信息,而服務端能夠經過指定的Wrapper進行鑑權。服務端Wrapper的代碼以下:post
type AuthFunc func(key string) bool type ServerAuthInterceptor struct {
authFunc AuthFunc
}
func NewAuthInterceptor(authFunc AuthFunc) Wrapper {
return &ServerAuthInterceptor{authFunc}
}
func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {
return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {
if auth, ok := ctx.Value(protocol.AuthKey).(string); ok {
//鑑權經過則執行業務邏輯
if sai.authFunc(auth) {
requestFunc(ctx, response, response, tr)
return
}
}
//鑑權失敗則返回異常
s.writeErrorResponse(response, tr, "auth failed")
}
}
複製代碼
暫時實現了簡單的基於時間窗口的熔斷器,實現以下:ui
type CircuitBreaker interface {
AllowRequest() bool
Success()
Fail(err error)
}
type DefaultCircuitBreaker struct {
lastFail time.Time
fails uint64
threshold uint64
window time.Duration
}
func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {
return &DefaultCircuitBreaker{
threshold: threshold,
window: window,
}
}
func (cb *DefaultCircuitBreaker) AllowRequest() bool {
if time.Since(cb.lastFail) > cb.window {
cb.reset()
return true
}
failures := atomic.LoadUint64(&cb.fails)
return failures < cb.threshold
}
func (cb *DefaultCircuitBreaker) Success() {
cb.reset()
}
func (cb *DefaultCircuitBreaker) Fail() {
atomic.AddUint64(&cb.fails, 1)
cb.lastFail = time.Now()
}
func (cb *DefaultCircuitBreaker) reset() {
atomic.StoreUint64(&cb.fails, 0)
cb.lastFail = time.Now()
}
複製代碼
此次的內容就到此爲止,有任何意見或者建議歡迎指正。atom