kubernetes中基於etcd實現集中的數據存儲,今天來學習下基於etcd如何實現數據讀取一致性、更新一致性、事務的具體實現數組
在k8s中有部分數據的存儲是須要通過處理以後才能存儲的,好比secret這種加密的數據,既然要存儲就至少包含兩個操做,加密存儲,解密讀取,transformer就是爲了完成該操做而實現的,其在進行etcd數據存儲的時候回對數據進行加密,而在讀取的時候,則會進行解密微信
在etcd中進行修改(增刪改)操做的時候,都會遞增revision,而在k8s中也經過該值來做爲k8s資源的ResourceVersion,該機制也是實現watch的關鍵機制,在操做etcd解碼從etcd獲取的數據的時候,會經過versioner組件來爲資源動態的修改該值併發
將數據從etcd中讀取後,數據自己就是一個字節數組,如何將對應的數據轉換成咱們真正的運行時對象呢?還記得咱們以前的scheme與codec麼,在這裏咱們知道對應的數據編碼格式,也知道資源對象的類型,則經過codec、字節數組、目標類型,咱們就能夠完成對應數據的反射ide
etcd中的數據寫入是基於leader單點寫入和集羣quorum機制實現的,並非一個強一致性的數據寫入,則若是若是咱們訪問的節點不存在quorum的半數節點內,則可能形成短暫的數據不一致,針對一些強一致的場景,咱們能夠經過其revision機制來進行數據的讀取, 保證咱們讀取到更新以後的數據oop
// 省略非核心代碼
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
// 獲取key
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
// 檢測當前版本,是否達到最小版本的
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
// 執行數據轉換
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
// 解碼數據
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
複製代碼
建立一個接口數據則會首先進行資源對象的檢查,避免重複建立對象,此時會先經過資源對象的version字段來進行初步檢查,而後在利用etcd的事務機制來保證資源建立的原子性操做源碼分析
// 省略非核心代碼
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
// 將數據編碼
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
// 轉換數據
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
startTime := time.Now()
// 事務操做
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key), // 若是以前不存在 這裏是利用的etcd的ModRevision即修改版本爲0, 寓意着對應的key不存在
).Then(
clientv3.OpPut(key, string(newData), opts...), // put修改數據
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
// 獲取對應的Revision
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}複製代碼
刪除接口主要是經過CAS和事務機制來共同實現,確保在etcd不發生異常的狀況,即便併發對同個資源來進行刪除操做也能保證至少有一個節點成功學習
// 省略非核心代碼
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
startTime := time.Now()
// 獲取當前的key的數據
getResp, err := s.client.KV.Get(ctx, key)
for {
// 獲取當前的狀態
origState, err := s.getState(getResp, key, v, false)
if err != nil {
return err
}
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 若是修改版本等於當前狀態,就嘗試刪除
).Then(
clientv3.OpDelete(key), // 刪除
).Else(
clientv3.OpGet(key), // 獲取
).Commit()
if !txnResp.Succeeded {
// 獲取最新的數據重試事務操做
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
// 將最後一個版本的數據解碼到out裏面,而後返回
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}複製代碼
更新接口實現上與刪除接口並沒有本質上的差異,可是若是多個節點同時進行更新,CAS併發操做必然會有一個節點成功,當發現已經有節點操做成功,則當前節點其實並不須要再作過多的操做,直接返回便可fetch
// 省略非核心代碼
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
// 獲取當前key的最新數據
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, ignoreNotFound)
}
// 獲取當前數據
var origState *objState
var mustCheckData bool
if len(suggestion) == 1 && suggestion[0] != nil {
// 若是提供了建議的數據,則會使用,
origState, err = s.getStateFromObject(suggestion[0])
if err != nil {
return err
}
//可是須要檢測數據
mustCheckData = true
} else {
// 嘗試從新獲取數據
origState, err = getCurrentState()
if err != nil {
return err
}
}
transformContext := authenticatedDataString(key)
for {
// 檢查對象是否已經更新, 主要是經過檢測uuid/revision來實現
if err := preconditions.Check(key, origState.obj); err != nil {
// If our data is already up to date, return the error
if !mustCheckData {
return err
}
// 若是檢查數據一致性錯誤,則須要從新獲取
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
// 刪除當前的版本數據revision
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
// If our data is already up to date, return the error
if !mustCheckData {
return err
}
// It's possible we were working with stale data
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
// 編碼數據
data, err := runtime.Encode(s.codec, ret)
if err != nil {
return err
}
if !origState.stale && bytes.Equal(data, origState.data) {
// 若是咱們發現咱們當前的數據與獲取到的數據一致,則會直接跳過
if mustCheckData {
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
if !bytes.Equal(data, origState.data) {
// original data changed, restart loop
continue
}
}
if !origState.stale {
// 直接返回數據
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
// 磚漢數據
newData, err := s.transformer.TransformToStorage(data, transformContext)
if err != nil {
return storage.NewInternalError(err.Error())
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
trace.Step("Transaction prepared")
startTime := time.Now()
// 事務更新數據
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
if err != nil {
return err
}
trace.Step("Transaction committed")
if !txnResp.Succeeded {
// 從新獲取數據
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
trace.Step("Retry value restored")
mustCheckData = false
continue
}
// 獲取put響應
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}複製代碼
transformer的實現和註冊地方我並無找到,只看到了幾個覆蓋資源類型的地方,還有list/watch接口,後續再繼續學習,今天就先到這裏,下次再見ui
微信號:baxiaoshi2020
編碼
關注公告號閱讀更多源碼分析文章
更多文章關注 www.sreguide.com
本文由博客一文多發平臺 OpenWrite 發佈