//sevice.gogit
package discovery import ( "context" "errors" "sync" "time" "github.com/coreos/etcd/clientv3" l4g "github.com/alecthomas/log4go" ) type Service struct { closeChan chan struct{} //關閉通道 client *clientv3.Client //etcd v3 client leaseID clientv3.LeaseID //etcd 租約id key string //鍵 val string //值 wg sync.WaitGroup } // NewService 構造一個註冊服務 func NewService(etcdEndpoints []string, key string, val string) (*Service, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: etcdEndpoints, DialTimeout: 2 * time.Second, }) if nil != err { return nil, err } s := &Service{ client: cli, closeChan: make(chan struct{}), key: key, val: val, } return s, nil } // Start 開啓註冊 // @param - ttlSecond 租期(秒) func (s *Service) Start(ttlSecond int64) error { // minimum lease TTL is 5-second resp, err := s.client.Grant(context.TODO(), ttlSecond) if err != nil { panic(err) } s.leaseID = resp.ID _, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID)) if err != nil { panic(err) } ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID) if nil != err1 { panic(err) } l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val) s.wg.Add(1) defer s.wg.Done() for { select { case <-s.closeChan: return s.revoke() case <-s.client.Ctx().Done(): return errors.New("server closed") case ka, ok := <-ch: if !ok { l4g.Warn("[discovery] Service Start keep alive channel closed") return s.revoke() } else { l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL) } } } return nil } // Stop 中止 func (s *Service) Stop() { close(s.closeChan) s.wg.Wait() s.client.Close() } func (s *Service) revoke() error { _, err := s.client.Revoke(context.TODO(), s.leaseID) if err != nil { l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error()) } else { l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key) } return err }
//watch.gogithub
package discovery import ( "context" "os" "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" l4g "github.com/alecthomas/log4go" "google.golang.org/grpc/grpclog" ) type GroupManager struct { wg sync.WaitGroup ctx context.Context cancel context.CancelFunc once sync.Once } func NewGroupManager() *GroupManager { ret := new(GroupManager) ret.ctx, ret.cancel = context.WithCancel(context.Background()) return ret } func (this *GroupManager) Close() { this.once.Do(this.cancel) } func (this *GroupManager) Wait() { this.wg.Wait() } func (this *GroupManager) Add(delta int) { this.wg.Add(delta) } func (this *GroupManager) Done() { this.wg.Done() } func (this *GroupManager) Chan() <-chan struct{} { return this.ctx.Done() } type Target interface { Set(string, string) Create(string, string) Modify(string, string) Delete(string) } type Config struct { Servers []string DailTimeout int64 RequestTimeout int64 Prefix bool Target string } func Watch(gm *GroupManager, cfg *Config, target Target) { defer gm.Done() cli, err := clientv3.New(clientv3.Config{ Endpoints: cfg.Servers, DialTimeout: time.Duration(cfg.DailTimeout) * time.Second, }) if err != nil { panic(err.Error()) return } defer cli.Close() // make sure to close the client l4g.Info("[discovery] start watch %s", cfg.Target) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second) var resp *clientv3.GetResponse if cfg.Prefix { resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix()) } else { resp, err = cli.Get(ctx, cfg.Target) } cancel() if err != nil { panic(err.Error()) } for _, ev := range resp.Kvs { target.Set(string(string(ev.Key)), string(ev.Value)) } var rch clientv3.WatchChan if cfg.Prefix { rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1)) } else { rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+1)) } for { select { case <-gm.Chan(): l4g.Info("[discovery] watch %s close", cfg.Target) return case wresp := <-rch: err := wresp.Err() if err != nil { l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error()) gm.Close() return } l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp) for _, ev := range wresp.Events { if ev.IsCreate() { target.Create(string(ev.Kv.Key), string(ev.Kv.Value)) } else if ev.IsModify() { target.Modify(string(ev.Kv.Key), string(ev.Kv.Value)) } else if ev.Type == mvccpb.DELETE { target.Delete(string(ev.Kv.Key)) } else { l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key) } } } } }