etcd 是一個高可用強一致性的鍵值倉庫在不少分佈式系統架構中獲得了普遍的應用,本教程結合一些簡單的例子介紹golang版本的etcd/clientv3
中提供的主要功能及其使用方法。git
若是還不熟悉etcd推薦先閱讀:github
看圖輕鬆瞭解etcdgolang
etcd經常使用操做介紹json
Let's get started now!bash
咱們使用v3版本的etcd client, 首先經過go get
下載並編譯安裝etcd clinet v3
。架構
go get github.com/coreos/etcd/clientv3
複製代碼
該命令會將包下載到$GOPATH/src/github.com/coreos/etcd/clientv3
中,全部相關依賴包會自動下載編譯,包括protobuf
、grpc
等。mvc
官方文檔地址:godoc.org/github.com/…app
文檔中列出了Go官方實現的etcd client中支持的全部方法,方法仍是不少的,咱們主要梳理一下使用etcd時常常用到的主要API並進行演示。分佈式
用程序訪問etcd首先要建立client,它須要傳入一個Config配置,這裏傳了2個選項:函數
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}
DialTimeout: 5 * time.Second,
})
複製代碼
返回的client
,它的類型具體以下:
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
// contains filtered or unexported fields
}
複製代碼
類型中的成員是etcd客戶端幾何核心功能模塊的具體實現,它們分別用於:
咱們須要使用什麼功能,就去client裏獲取對應的成員便可。
Client.KV是一個
interface`,提供了關於K-V操做的全部方法:
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
複製代碼
咱們經過方法clientv3.NewKV()
來得到KV接口的實現(實現中內置了錯誤重試機制):
kv := clientv3.NewKV(cli)
複製代碼
接下來,咱們將經過kv
操做etcd中的數據。
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
複製代碼
第一個參數是goroutine
的上下文Context
。後面兩個參數分別是key和value,對於etcd來講,key=/test/key1只是一個字符串而已,可是對咱們而言卻能夠模擬出目錄層級關係。
Put函數的聲明以下:
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
複製代碼
除了上面例子中的三個的參數,還支持一個變長參數,能夠傳遞一些控制項來影響Put的行爲,例如能夠攜帶一個lease ID來支持key過時。
Put操做返回的是PutResponse,不一樣的KV操做對應不一樣的response結構,全部KV操做返回的response結構以下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
複製代碼
程序代碼裏導入clientv3
後在GoLand中能夠很快定位到PutResponse
的定義文件中,PutResponse只是pb.PutResponse的類型別名,經過Goland跳轉過去後能夠看到PutResponse的詳細定義。
type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
複製代碼
Header裏保存的主要是本次更新的revision信息,而PrevKv能夠返回Put覆蓋以前的value是什麼(目前是nil,後面會說緣由),把返回的PutResponse
打印出來看一下:
fmt.Printf("PutResponse: %v, err: %v", putResp, err)
// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7 <nil>}, err: <nil>%
複製代碼
咱們須要判斷err來肯定操做是否成功。
咱們再Put其餘2個key,用於後續演示:
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再寫一個同前綴的干擾項
kv.Put(context.TODO(), "/testspam", "spam")
複製代碼
如今/test目錄下有兩個鍵: key1和key2, 而/testspam並不歸屬於/test目錄
使用KV的Get
方法來讀取給定鍵的值:
getResp, err := kv.Get(context.TODO(), "/test/key1")
複製代碼
其函數聲明以下:
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
複製代碼
和Put相似,函數註釋裏提示咱們能夠傳遞一些控制參數來影響Get的行爲,好比:WithFromKey表示讀取從參數key開始遞增的全部key,而不是讀取單個key。
在上面的例子中,我沒有傳遞opOption,因此就是獲取key=/test/key1的最新版本數據。
這裏err並不能反饋出key是否存在(只能反饋出本次操做由於各類緣由異常了),咱們須要經過GetResponse(其實是pb.RangeResponse)判斷key是否存在:
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}
複製代碼
Kvs字段,保存了本次Get查詢到的全部k-v對,由於上述例子只Get了一個單key,因此只須要判斷一下len(Kvs)是否等於1便可知道key是否存在。
RangeResponse.More
和Count
,當咱們使用withLimit()
等選項進行Get
時會發揮做用,至關於翻頁查詢。
接下來,咱們經過給Get查詢增長WithPrefix選項,獲取/test目錄下的全部子元素:
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
複製代碼
WithPrefix()
是指查找以/test/
爲前綴的全部key,所以能夠模擬出查找子目錄的效果。
etcd
是一個有序的k-v存儲,所以/test/爲前綴的key老是順序排列在一塊兒。
withPrefix()
實際上會轉化爲範圍查詢,它根據前綴/test/
生成了一個前閉後開的key range:[「/test/」, 「/test0」)
,爲何呢?由於比/
大的字符是0
,因此以/test0
做爲範圍的末尾,就能夠掃描到全部以/test/
爲前綴的key了。
在以前,咱們Put了一個/testspam
鍵值,由於不符合/test/
前綴(注意末尾的/),因此就不會被此次Get
獲取到。可是,若是查詢的前綴是/test
,那麼/testspam
就會被返回,使用時必定要特別注意。
打印rangeResp.Kvs能夠看到得到了兩個鍵值:
[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!" key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]
複製代碼
etcd客戶端的Lease對象能夠經過如下的代碼獲取到
lease := clientv3.NewLease(cli)
複製代碼
lease對象是Lease接口的實現,Lease接口的聲明以下:
type Lease interface {
// Grant 建立一個新租約
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke 銷燬給定租約ID的租約
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
複製代碼
Lease提供瞭如下功能:
要想實現key自動過時,首先得建立一個租約,下面的代碼建立一個TTL爲10秒的租約:
grantResp, err := lease.Grant(context.TODO(), 10)
複製代碼
返回的grantResponse的結構體聲明以下:
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
複製代碼
在應用程序代碼中主要使用到的是租約ID。
接下來咱們用這個Lease往etcd中存儲一個10秒過時的key:
kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))
複製代碼
這裏特別須要注意,有一種狀況是在Put以前Lease已通過期了,那麼這個Put操做會返回error,此時你須要從新分配Lease。
當咱們實現服務註冊時,須要主動給Lease進行續約,一般是以小於TTL的間隔循環調用Lease的KeepAliveOnce()方法對租約進行續期,一旦某個服務節點出錯沒法完成租約的續期,等key過時後客戶端即沒法在查詢服務時得到對應節點的服務,這樣就經過租約到期實現了服務的錯誤隔離。
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
複製代碼
或者使用KeepAlive()
方法,其會返回<-chan *LeaseKeepAliveResponse
只讀通道,每次自動續租成功後會向通道中發送信號。通常都用KeepAlive()
方法
KeepAlive和Put同樣,若是在執行以前Lease就已通過期了,那麼須要從新分配Lease。etcd並無提供API來實現原子的Put with Lease,須要咱們本身判斷err從新分配Lease。
Op字面意思就是」操做」,Get和Put都屬於Op,只是爲了簡化用戶開發而開放的特殊API。
KV對象有一個Do方法接受一個Op:
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
複製代碼
其參數Op是一個抽象的操做,能夠是Put/Get/Delete…;而OpResponse是一個抽象的結果,能夠是PutResponse/GetResponse…
能夠經過Client中定義的一些方法來建立Op:
其實和直接調用KV.Put,KV.GET沒什麼區別。
下面是一個例子:
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
複製代碼
把Op交給Do方法執行,返回的opResp結構以下:
type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
}
複製代碼
你的操做是什麼類型,你就用哪一個指針來訪問對應的結果。
etcd中事務是原子執行的,只支持if … then … else …這種表達。首先來看一下Txn中定義的方法:
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
複製代碼
Txn必須是這樣使用的:If(知足條件) Then(執行若干Op) Else(執行若干Op)。
If中支持傳入多個Cmp比較條件,若是全部條件知足,則執行Then中的Op(上一節介紹過Op),不然執行Else中的Op。
首先,咱們須要開啓一個事務,這是經過KV對象的方法實現的:
txn := kv.Txn(context.TODO())
複製代碼
下面的測試程序,判斷若是k1的值大於v1而且k1的版本號是2,則Put 鍵值k2和k3,不然Put鍵值k4和k5。
kv.Txn(context.TODO()).If(
clientv3.Compare(clientv3.Value(k1), ">", v1),
clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(
clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(
clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()
複製代碼
相似於clientv3.Value()\用於指定key屬性的,有這麼幾個方法:
Watch用於監聽某個鍵的變化, Watch
調用後返回一個WatchChan
,它的類型聲明以下:
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
CompactRevision int64
Canceled bool
Created bool
}
複製代碼
當監聽的key有變化後會向WatchChan
發送WatchResponse
。Watch的典型應用場景是應用於系統配置的熱加載,咱們能夠在系統讀取到存儲在etcd key中的配置後,用Watch監聽key的變化。在單獨的goroutine中接收WatchChan發送過來的數據,並將更新應用到系統設置的配置變量中,好比像下面這樣在goroutine中更新變量appConfig,這樣系統就實現了配置變量的熱加載。
type AppConfig struct {
config1 string
config2 string
}
var appConfig Appconfig
func watchConfig(clt *clientv3.Client, key string, ss interface{}) {
watchCh := clt.Watch(context.TODO(), key)
go func() {
for res := range watchCh {
value := res.Events[0].Kv.Value
if err := json.Unmarshal(value, ss); err != nil {
fmt.Println("now", time.Now(), "watchConfig err", err)
continue
}
fmt.Println("now", time.Now(), "watchConfig", ss)
}
}()
}
watchConfig(client, "config_key", &appConfig)
複製代碼
golang etcd clientv3的主要功能就是這些,但願能幫你們梳理出學習脈絡,這樣工做中應用到etcd時再看官方文檔就會容易不少。