簡介
etcd是CoreOS團隊於2013年6月發起的開源項目,它的目標是構建一個高可用的分佈式鍵值(key-value)數據庫。etcd內部採用raft
協議做爲一致性算法,etcd基於Go語言實現。git
etcd做爲服務發現系統,有如下的特色:github
- 簡單:安裝配置簡單,並且提供了HTTP API進行交互,使用也很簡單
- 安全:支持SSL證書驗證
- 快速:根據官方提供的benchmark數據,單實例支持每秒2k+讀操做
- 可靠:採用raft算法,實現分佈式系統數據的可用性和一致性
etcd項目地址:https://github.com/coreos/etcd/golang
容器化的etcd服務搭建
version: '2.2' services: etcd: image: quay.io/coreos/etcd:v3.3.12 container_name: etcd ports: - 23791:2379 - 2380 environment: ETCDCTL_API: 3 volumes: - ./data/etcd/etcd-data:/etcd-data command: - "/usr/local/bin/etcd" - "--name" - "s1" - "--data-dir" - "/etcd-data" - "--advertise-client-urls" - "http://0.0.0.0:2379" - --listen-client-urls - "http://0.0.0.0:2379" - "--initial-advertise-peer-urls" - "http://0.0.0.0:2380" - "--listen-peer-urls" - "http://0.0.0.0:2380" - "--initial-cluster-token" - "tkn" - "--initial-cluster" - "s1=http://0.0.0.0:2380" - "--initial-cluster-state" - "new"
etcd命令行的使用
- 首先進入到etcd容器中
docker exec -it etcd bash
- 執行如下命令進行練習
## 獲取版本信息 etcdctl version ## 獲取全部鍵值對 etcdctl get --prefix "" ## 添加鍵值對 etcdctl put zhangsan hello ## 刪除鍵值對 etcdctl del zhangsan ## 添加一個過時時間爲20s的租約 etcdctl lease grant 20 ## 獲取全部租約 etcdctl lease list ## 添加鍵值對,併爲該鍵指定租約 etcdctl put lisi world --lease="3f3574057fe0e61c" ## 查看某個租約的keepalived時間 etcdctl lease keep-alive 3f3574057fe0e61c ## 續租 etcdctl lease timetolive 3f3574057fe0e61c --keys ## 回收租約 etcdctl lease revoke 3f3574057fe0e61c
etcd的api接口說明
## 獲取版本信息 curl -L http://127.0.0.1:2379/version ## 獲取健康狀態 curl -L http://127.0.0.1:2379/health ## 添加鍵值對 curl http://127.0.0.1:2379/v2/keys/zhangsan -XPUT -d value="hello" ## 查看鍵值對 curl http://127.0.0.1:2379/v2/keys/zhangsan
go操做etcd
場景描述
咱們試圖使用go開發etcd sdk,啓動兩個簡單的server端,向etcd分別註冊本身的address,再啓動一個client端,從etcd中發現服務,隨機抽取一個進行訪問請求。算法
目錄結構
採坑指南:go mod管理的時候,導入etcd相關sdk包會出現版本衝突問題,踩了不少坑以後,總結了一下,須要在go.mod文件中,添加以下三行replacedocker
replace ( github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4 github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 google.golang.org/grpc => google.golang.org/grpc v1.26.0 )
- one_server端的編寫
package main import ( "context" "fmt" "github.com/gin-gonic/gin" "go.etcd.io/etcd/clientv3" "net/http" "sync" "time" ) const ( EtcdPrefix = "/test/server/" ServerSerial = "1" Address = "http://127.0.0.1:18081/" ) var ( EtcdAddress = []string{"http://127.0.0.1:23791"} leaseTTL = 5 ) type HealthProvider struct { etcdClient *EtcdClient } var ( healthProvider *HealthProvider healthProviderOnce sync.Once ) func GetHealthProvider() *HealthProvider { healthProviderOnce.Do(func() { healthProvider = &HealthProvider{ etcdClient: NewEtcdClient(), } }) return healthProvider } type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID leaseTTL int64 } func NewEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, leaseTTL: int64(leaseTTL), } err := client.connect() if err != nil { panic(err) } return client } func (etcdClient *EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 * time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return } func (etcdClient *EtcdClient) Close() (err error) { return etcdClient.client.Close() } func (etcdClient *EtcdClient) register(address string) (*clientv3.PutResponse, error) { etcdClient.lease = clientv3.NewLease(etcdClient.client) leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL) if err != nil { return nil, err } etcdClient.leaseID = leaseResp.ID return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID)) } func (etcdClient *EtcdClient) LeaseKeepAlive() error { if etcdClient.lease == nil { _, err := etcdClient.register(Address) if err != nil { return err } } _, err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID) if err != nil { return err } return nil } func healthCheck(provider *HealthProvider) { var tick = time.NewTicker(time.Second) for { select { case <-tick.C: err := provider.etcdClient.LeaseKeepAlive() if err != nil { fmt.Println(err.Error()) return } } } } func main() { provider := GetHealthProvider() go healthCheck(provider) defer provider.etcdClient.Close() engine := gin.Default() engine.GET("/ping", func(c *gin.Context) { c.JSON(http.StatusOK, "one") }) engine.Run(":18081") }
- two_server端的編寫
package main import ( "context" "fmt" "github.com/gin-gonic/gin" "go.etcd.io/etcd/clientv3" "net/http" "sync" "time" ) const ( EtcdPrefix = "/test/server/" ServerSerial = "2" Address = "http://127.0.0.1:18082/" ) var ( EtcdAddress = []string{"http://127.0.0.1:23791"} leaseTTL = 5 ) type HealthProvider struct { etcdClient *EtcdClient } var ( healthProvider *HealthProvider healthProviderOnce sync.Once ) func GetHealthProvider() *HealthProvider { healthProviderOnce.Do(func() { healthProvider = &HealthProvider{ etcdClient: NewEtcdClient(), } }) return healthProvider } type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID leaseTTL int64 } func NewEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, leaseTTL: int64(leaseTTL), } err := client.connect() if err != nil { panic(err) } return client } func (etcdClient *EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 * time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return } func (etcdClient *EtcdClient) Close() (err error) { return etcdClient.client.Close() } func (etcdClient *EtcdClient) register(address string) (*clientv3.PutResponse, error) { etcdClient.lease = clientv3.NewLease(etcdClient.client) leaseResp, err := etcdClient.lease.Grant(etcdClient.ctx, etcdClient.leaseTTL) if err != nil { return nil, err } etcdClient.leaseID = leaseResp.ID return etcdClient.kv.Put(etcdClient.ctx, EtcdPrefix+ServerSerial, address, clientv3.WithLease(leaseResp.ID)) } func (etcdClient *EtcdClient) LeaseKeepAlive() error { if etcdClient.lease == nil { _, err := etcdClient.register(Address) if err != nil { return err } } _, err := etcdClient.lease.KeepAlive(etcdClient.ctx, etcdClient.leaseID) if err != nil { return err } return nil } func healthCheck(provider *HealthProvider) { var tick = time.NewTicker(time.Second) for { select { case <-tick.C: err := provider.etcdClient.LeaseKeepAlive() if err != nil { fmt.Println(err.Error()) return } } } } func main() { provider := GetHealthProvider() go healthCheck(provider) defer provider.etcdClient.Close() engine := gin.Default() engine.GET("/ping", func(c *gin.Context) { c.JSON(http.StatusOK, "two") }) engine.Run(":18082") }
- client端的編寫
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "io/ioutil" "math/rand" "net/http" "time" ) var ( EtcdAddress = []string{"http://127.0.0.1:23791"} ServerPrefix = "/test/server/" ) type EtcdClient struct { address []string username string password string kv clientv3.KV client *clientv3.Client ctx context.Context lease clientv3.Lease leaseID clientv3.LeaseID } func newEtcdClient() *EtcdClient { var client = &EtcdClient{ ctx: context.Background(), address: EtcdAddress, } err := client.connect() if err != nil { panic(err) } return client } func (etcdClient *EtcdClient) connect() (err error) { etcdClient.client, err = clientv3.New(clientv3.Config{ Endpoints: etcdClient.address, DialTimeout: 5 * time.Second, TLS: nil, Username: etcdClient.username, Password: etcdClient.password, }) if err != nil { return } etcdClient.kv = clientv3.NewKV(etcdClient.client) etcdClient.ctx = context.Background() return } func (etcdClient *EtcdClient) list(prefix string) ([]string, error) { resp, err := etcdClient.kv.Get(etcdClient.ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } servers := make([]string, 0) for _, value := range resp.Kvs { if value != nil { servers = append(servers, string(value.Value)) } } return servers, nil } func (etcdClient *EtcdClient) close() (err error) { return etcdClient.client.Close() } func genRand(num int) int { return int(rand.Int31n(int32(num))) } func getServer(client *EtcdClient) (string, error) { servers, err := client.list(ServerPrefix) if err != nil { return "", err } return servers[genRand(len(servers))], nil } func Get(url string) ([]byte, error) { client := &http.Client{} req, err := http.NewRequest("GET", url, nil) if err != nil { return nil, err } res, err := client.Do(req) if err != nil { return nil, err } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { return nil, err } return body, nil } func main() { client := newEtcdClient() err := client.connect() if err != nil { panic(err) } defer client.close() for i := 0; i < 10; i++ { address, err := getServer(client) if err != nil { fmt.Println(err.Error()) return } data, err := Get(address + "ping") if err != nil { fmt.Println(err.Error()) return } fmt.Println(string(data)) time.Sleep(2 * time.Second) } }
測試
- 分別啓動one_server和two_server服務,向etcd註冊;
- 啓動client服務,循環請求10次,查看結果;
結果
會發現請求one_server和two_server的頻率慢慢趨於平均shell