本文主要研究一下dubbo-go的kubernetesRegistrynode
dubbo-go-v1.4.2/registry/kubernetes/registry.gourl
var ( processID = "" localIP = "" ) const ( Name = "kubernetes" ConnDelay = 3 MaxFailTimes = 15 ) func init() { processID = fmt.Sprintf("%d", os.Getpid()) localIP, _ = gxnet.GetLocalIP() extension.SetRegistry(Name, newKubernetesRegistry) } type kubernetesRegistry struct { registry.BaseRegistry cltLock sync.RWMutex client *kubernetes.Client listenerLock sync.Mutex listener *kubernetes.EventListener dataListener *dataListener configListener *configurationListener }
dubbo-go-v1.4.2/registry/kubernetes/registry.gorest
func newKubernetesRegistry(url *common.URL) (registry.Registry, error) { // actually, kubernetes use in-cluster config, r := &kubernetesRegistry{} r.InitBaseRegistry(url, r) if err := kubernetes.ValidateClient(r); err != nil { return nil, perrors.WithStack(err) } r.WaitGroup().Add(1) go r.HandleClientRestart() r.InitListeners() logger.Debugf("the kubernetes registry started") return r, nil }
dubbo-go-v1.4.2/registry/kubernetes/registry.gocode
func (r *kubernetesRegistry) InitListeners() { r.listener = kubernetes.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) r.dataListener = NewRegistryDataListener(r.configListener) }
dubbo-go-v1.4.2/registry/kubernetes/registry.gokubernetes
func (r *kubernetesRegistry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") }
dubbo-go-v1.4.2/registry/kubernetes/registry.gostring
func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( configListener *configurationListener ) r.listenerLock.Lock() configListener = r.configListener r.listenerLock.Unlock() if r.listener == nil { r.cltLock.Lock() client := r.client r.cltLock.Unlock() if client == nil { return nil, perrors.New("kubernetes client broken") } r.listenerLock.Lock() if r.listener == nil { // double check r.listener = kubernetes.NewEventListener(r.client) } r.listenerLock.Unlock() } //register the svc to dataListener r.dataListener.AddInterestedURL(svc) for _, v := range strings.Split(svc.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") { go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, svc.Service()), r.dataListener) } return configListener, nil }
kubernetesRegistry定義了cltLock、client、listenerLock、listener、dataListener、configListener屬性;InitListeners方法執行kubernetes.NewEventListener、NewConfigurationListener、NewRegistryDataListenerit