Golang etcd服務註冊與發現

 

//sevice.gogit

package discovery

import (
    "context"
    "errors"
    "sync"
    "time"

    "github.com/coreos/etcd/clientv3"
    l4g "github.com/alecthomas/log4go"
)

type Service struct {
    closeChan chan struct{}    //關閉通道
    client    *clientv3.Client //etcd v3 client
    leaseID   clientv3.LeaseID //etcd 租約id
    key       string           //
    val       string           //
    wg        sync.WaitGroup
}

// NewService 構造一個註冊服務
func NewService(etcdEndpoints []string, key string, val string) (*Service, error) {

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   etcdEndpoints,
        DialTimeout: 2 * time.Second,
    })

    if nil != err {
        return nil, err
    }

    s := &Service{
        client:    cli,
        closeChan: make(chan struct{}),
        key:       key,
        val:       val,
    }

    return s, nil
}

// Start 開啓註冊
// @param - ttlSecond 租期(秒)
func (s *Service) Start(ttlSecond int64) error {

    // minimum lease TTL is 5-second
    resp, err := s.client.Grant(context.TODO(), ttlSecond)
    if err != nil {
        panic(err)
    }

    s.leaseID = resp.ID
    _, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID))
    if err != nil {
        panic(err)
    }

    ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID)
    if nil != err1 {
        panic(err)
    }

    l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val)

    s.wg.Add(1)
    defer s.wg.Done()

    for {
        select {
        case <-s.closeChan:
            return s.revoke()
        case <-s.client.Ctx().Done():
            return errors.New("server closed")
        case ka, ok := <-ch:
            if !ok {
                l4g.Warn("[discovery] Service Start keep alive channel closed")
                return s.revoke()
            } else {
                l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL)
            }
        }
    }

    return nil
}

// Stop 中止
func (s *Service) Stop() {
    close(s.closeChan)
    s.wg.Wait()
    s.client.Close()
}

func (s *Service) revoke() error {

    _, err := s.client.Revoke(context.TODO(), s.leaseID)

    if err != nil {
        l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error())
    } else {
        l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key)
    }

    return err
}

 

//watch.gogithub

package discovery

import (
    "context"
    "os"
    "time"

    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/mvcc/mvccpb"
    l4g "github.com/alecthomas/log4go"
    "google.golang.org/grpc/grpclog"

)

type GroupManager struct {
    wg     sync.WaitGroup
    ctx    context.Context
    cancel context.CancelFunc
    once   sync.Once
}

func NewGroupManager() *GroupManager {
    ret := new(GroupManager)
    ret.ctx, ret.cancel = context.WithCancel(context.Background())
    return ret
}

func (this *GroupManager) Close() {
    this.once.Do(this.cancel)
}

func (this *GroupManager) Wait() {
    this.wg.Wait()
}

func (this *GroupManager) Add(delta int) {
    this.wg.Add(delta)
}

func (this *GroupManager) Done() {
    this.wg.Done()
}

func (this *GroupManager) Chan() <-chan struct{} {
    return this.ctx.Done()
}

type Target interface {
    Set(string, string)
    Create(string, string)
    Modify(string, string)
    Delete(string)
}

type Config struct {
    Servers        []string
    DailTimeout    int64
    RequestTimeout int64
    Prefix         bool
    Target         string
}

func Watch(gm *GroupManager, cfg *Config, target Target) {
    defer gm.Done()

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   cfg.Servers,
        DialTimeout: time.Duration(cfg.DailTimeout) * time.Second,
    })
    if err != nil {
        panic(err.Error())
        return
    }
    defer cli.Close() // make sure to close the client

    l4g.Info("[discovery] start watch %s", cfg.Target)

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second)
    var resp *clientv3.GetResponse
    if cfg.Prefix {
        resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix())
    } else {
        resp, err = cli.Get(ctx, cfg.Target)
    }
    cancel()
    if err != nil {
        panic(err.Error())
    }
    for _, ev := range resp.Kvs {
        target.Set(string(string(ev.Key)), string(ev.Value))
    }

    var rch clientv3.WatchChan
    if cfg.Prefix {
        rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))
    } else {
        rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+1))
    }
    for {
        select {
        case <-gm.Chan():
            l4g.Info("[discovery] watch %s close", cfg.Target)
            return
        case wresp := <-rch:
            err := wresp.Err()
            if err != nil {
                l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error())
                gm.Close()
                return
            }
            l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp)
            for _, ev := range wresp.Events {
                if ev.IsCreate() {
                    target.Create(string(ev.Kv.Key), string(ev.Kv.Value))
                } else if ev.IsModify() {
                    target.Modify(string(ev.Kv.Key), string(ev.Kv.Value))
                } else if ev.Type == mvccpb.DELETE {
                    target.Delete(string(ev.Kv.Key))
                } else {
                    l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key)
                }
            }
        }
    }
}
相關文章
相關標籤/搜索