go micro registry

micro.newService()中newOptionsnode

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

初始化了一堆基礎設置,來看看Registry
registry.DefaultRegistry,
在registry/registry.go中的
DefaultRegistry = NewRegistry()web

// NewRegistry returns a new default registry which is mdns
func NewRegistry(opts ...Option) Registry {
    return newRegistry(opts...)
}

func newRegistry(opts ...Option) Registry {
    options := Options{
        Context: context.Background(),
        Timeout: time.Millisecond * 100,
    }

    for _, o := range opts {
        o(&options)
    }

    // set the domain
    defaultDomain := DefaultDomain

    d, ok := options.Context.Value("mdns.domain").(string)
    if ok {
        defaultDomain = d
    }

    return &mdnsRegistry{
        defaultDomain: defaultDomain,
        globalDomain:  globalDomain,
        opts:          options,
        domains:       make(map[string]services),
        watchers:      make(map[string]*mdnsWatcher),
    }
}

這裏作了如下事情:segmentfault

  1. 初始化並設置Options
  2. 設置defaultDomain,默認micro,若是options.Context中定義了mdns.domain,則使用這裏定義的
  3. 返回mdnsRegistry{}實例

在micro server篇中介紹了service的啓動過程
service.Run()中調用了s.Start()
s.Start()中調用了s.opts.Server.Start(),這裏的s.opts.Server就是micro/defaults.go中定義的server.DefaultServer = gsrv.NewServer()app

那咱們去看server/grpc/grpc.go中的Start()dom

func (g *grpcServer) Start() error {
    g.RLock()
    if g.started {
        g.RUnlock()
        return nil
    }
    g.RUnlock()

    config := g.Options()

    // micro: config.Transport.Listen(config.Address)
    var ts net.Listener

    if l := g.getListener(); l != nil {
        ts = l
    } else {
        var err error

        // check the tls config for secure connect
        if tc := config.TLSConfig; tc != nil {
            ts, err = tls.Listen("tcp", config.Address, tc)
            // otherwise just plain tcp listener
        } else {
            ts, err = net.Listen("tcp", config.Address)
        }
        if err != nil {
            return err
        }
    }

    if g.opts.Context != nil {
        if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 {
            ts = netutil.LimitListener(ts, c)
        }
    }

    if logger.V(logger.InfoLevel, logger.DefaultLogger) {
        logger.Infof("Server [grpc] Listening on %s", ts.Addr().String())
    }
    g.Lock()
    g.opts.Address = ts.Addr().String()
    g.Unlock()

    // only connect if we're subscribed
    if len(g.subscribers) > 0 {
        // connect to the broker
        if err := config.Broker.Connect(); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
            }
            return err
        }

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
        }
    }

    // announce self to the world
    if err := g.Register(); err != nil {
        if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
            logger.Errorf("Server register error: %v", err)
        }
    }

    // micro: go ts.Accept(s.accept)
    go func() {
        if err := g.srv.Serve(ts); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Errorf("gRPC Server start error: %v", err)
            }
        }
    }()

    go func() {
        t := new(time.Ticker)

        // only process if it exists
        if g.opts.RegisterInterval > time.Duration(0) {
            // new ticker
            t = time.NewTicker(g.opts.RegisterInterval)
        }

        // return error chan
        var ch chan error

    Loop:
        for {
            select {
            // register self on interval
            case <-t.C:
                if err := g.Register(); err != nil {
                    if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                        logger.Error("Server register error: ", err)
                    }
                }
            // wait for exit
            case ch = <-g.exit:
                break Loop
            }
        }

        // deregister self
        if err := g.Deregister(); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Error("Server deregister error: ", err)
            }
        }

        // wait for waitgroup
        if g.wg != nil {
            g.wg.Wait()
        }

        // stop the grpc server
        exit := make(chan bool)

        go func() {
            g.srv.GracefulStop()
            close(exit)
        }()

        select {
        case <-exit:
        case <-time.After(time.Second):
            g.srv.Stop()
        }

        // close transport
        ch <- nil

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
        }
        // disconnect broker
        if err := config.Broker.Disconnect(); err != nil {
            if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
                logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
            }
        }
    }()

    // mark the server as started
    g.Lock()
    g.started = true
    g.Unlock()

    return nil
}

這個過程在micro server篇中有介紹,如今只看registry部分,註冊後開一個協程定時註冊tcp

g.Register()註冊到服務發現oop

func (g *grpcServer) Register() error {
    g.RLock()
    rsvc := g.rsvc
    config := g.opts
    g.RUnlock()

    regFunc := func(service *registry.Service) error {
        var regErr error

        for i := 0; i < 3; i++ {
            // set the ttl and namespace
            rOpts := []registry.RegisterOption{
                registry.RegisterTTL(config.RegisterTTL),
                registry.RegisterDomain(g.opts.Namespace),
            }

            // attempt to register
            if err := config.Registry.Register(service, rOpts...); err != nil {
                // set the error
                regErr = err
                // backoff then retry
                time.Sleep(backoff.Do(i + 1))
                continue
            }
            // success so nil error
            regErr = nil
            break
        }

        return regErr
    }

    // if service already filled, reuse it and return early
    if rsvc != nil {
        if err := regFunc(rsvc); err != nil {
            return err
        }
        return nil
    }

    var err error
    var advt, host, port string
    var cacheService bool

    // check the advertise address first
    // if it exists then use it, otherwise
    // use the address
    if len(config.Advertise) > 0 {
        advt = config.Advertise
    } else {
        advt = config.Address
    }

    if cnt := strings.Count(advt, ":"); cnt >= 1 {
        // ipv6 address in format [host]:port or ipv4 host:port
        host, port, err = net.SplitHostPort(advt)
        if err != nil {
            return err
        }
    } else {
        host = advt
    }

    if ip := net.ParseIP(host); ip != nil {
        cacheService = true
    }

    addr, err := addr.Extract(host)
    if err != nil {
        return err
    }

    // make copy of metadata
    md := meta.Copy(config.Metadata)

    // register service
    node := &registry.Node{
        Id:       config.Name + "-" + config.Id,
        Address:  mnet.HostPort(addr, port),
        Metadata: md,
    }

    node.Metadata["broker"] = config.Broker.String()
    node.Metadata["registry"] = config.Registry.String()
    node.Metadata["server"] = g.String()
    node.Metadata["transport"] = g.String()
    node.Metadata["protocol"] = "grpc"

    g.RLock()
    // Maps are ordered randomly, sort the keys for consistency
    var handlerList []string
    for n, e := range g.handlers {
        // Only advertise non internal handlers
        if !e.Options().Internal {
            handlerList = append(handlerList, n)
        }
    }
    sort.Strings(handlerList)

    var subscriberList []*subscriber
    for e := range g.subscribers {
        // Only advertise non internal subscribers
        if !e.Options().Internal {
            subscriberList = append(subscriberList, e)
        }
    }
    sort.Slice(subscriberList, func(i, j int) bool {
        return subscriberList[i].topic > subscriberList[j].topic
    })

    endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
    for _, n := range handlerList {
        endpoints = append(endpoints, g.handlers[n].Endpoints()...)
    }
    for _, e := range subscriberList {
        endpoints = append(endpoints, e.Endpoints()...)
    }
    g.RUnlock()

    service := &registry.Service{
        Name:      config.Name,
        Version:   config.Version,
        Nodes:     []*registry.Node{node},
        Endpoints: endpoints,
    }

    g.RLock()
    registered := g.registered
    g.RUnlock()

    if !registered {
        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
        }
    }

    // register the service
    if err := regFunc(service); err != nil {
        return err
    }

    // already registered? don't need to register subscribers
    if registered {
        return nil
    }

    g.Lock()
    defer g.Unlock()

    for sb := range g.subscribers {
        handler := g.createSubHandler(sb, g.opts)
        var opts []broker.SubscribeOption
        if queue := sb.Options().Queue; len(queue) > 0 {
            opts = append(opts, broker.Queue(queue))
        }

        if cx := sb.Options().Context; cx != nil {
            opts = append(opts, broker.SubscribeContext(cx))
        }

        if !sb.Options().AutoAck {
            opts = append(opts, broker.DisableAutoAck())
        }

        if logger.V(logger.InfoLevel, logger.DefaultLogger) {
            logger.Infof("Subscribing to topic: %s", sb.Topic())
        }
        sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
        if err != nil {
            return err
        }
        g.subscribers[sb] = []broker.Subscriber{sub}
    }

    g.registered = true
    if cacheService {
        g.rsvc = service
    }

    return nil
}

這裏作了如下事情:this

  1. rsvc := g.rsvc,定義regFunc()編碼

    1. 定義[]registry.RegisterOption{}
    2. 調用config.Registry.Register()註冊,失敗會重試3次
  2. rsvc不爲空就調用regFunc()註冊了並返回了,空就往下走,繼續註冊
  3. 驗證host,port,複製metadata,定義registry.Node{},在metadata中增長broker,registry,server,transport,protocol
  4. g.handlers放到handlerList中(非內部handle),排個序,保持一致性。g.subscribers也放到subscriberList,按topic排序。最後都放入endpoints
  5. 定義registry.Service{},調用regFunc()註冊,若是沒有錯誤,也沒有訂閱須要處理就返回
  6. 處理訂閱

到registry/mdns_registry.go中看看Register()spa

func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error {
    m.Lock()

    // parse the options
    var options RegisterOptions
    for _, o := range opts {
        o(&options)
    }
    if len(options.Domain) == 0 {
        options.Domain = m.defaultDomain
    }

    // create the domain in the memory store if it doesn't yet exist
    if _, ok := m.domains[options.Domain]; !ok {
        m.domains[options.Domain] = make(services)
    }

    // create the wildcard entry used for list queries in this domain
    entries, ok := m.domains[options.Domain][service.Name]
    if !ok {
        entry, err := createServiceMDNSEntry(service.Name, options.Domain)
        if err != nil {
            m.Unlock()
            return err
        }
        entries = append(entries, entry)
    }

    var gerr error
    for _, node := range service.Nodes {
        var seen bool

        for _, entry := range entries {
            if node.Id == entry.id {
                seen = true
                break
            }
        }

        // this node has already been registered, continue
        if seen {
            continue
        }

        txt, err := encode(&mdnsTxt{
            Service:   service.Name,
            Version:   service.Version,
            Endpoints: service.Endpoints,
            Metadata:  node.Metadata,
        })

        if err != nil {
            gerr = err
            continue
        }

        host, pt, err := net.SplitHostPort(node.Address)
        if err != nil {
            gerr = err
            continue
        }
        port, _ := strconv.Atoi(pt)

        if logger.V(logger.DebugLevel, logger.DefaultLogger) {
            logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host)
        }
        // we got here, new node
        s, err := mdns.NewMDNSService(
            node.Id,
            service.Name,
            options.Domain+".",
            "",
            port,
            []net.IP{net.ParseIP(host)},
            txt,
        )
        if err != nil {
            gerr = err
            continue
        }

        srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true})
        if err != nil {
            gerr = err
            continue
        }

        entries = append(entries, &mdnsEntry{id: node.Id, node: srv})
    }

    // save the mdns entry
    m.domains[options.Domain][service.Name] = entries
    m.Unlock()

    // register in the global Domain so it can be queried as one
    if options.Domain != m.globalDomain {
        srv := *service
        srv.Nodes = nil

        for _, n := range service.Nodes {
            node := n

            // set the original domain in node metadata
            if node.Metadata == nil {
                node.Metadata = map[string]string{"domain": options.Domain}
            } else {
                node.Metadata["domain"] = options.Domain
            }

            srv.Nodes = append(srv.Nodes, node)
        }

        if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil {
            gerr = err
        }
    }

    return gerr
}

這裏作了如下事情:

  1. 設置optionsentries
  2. 建立m.domains[options.Domain],並賦值entries
  3. 循環每一個service.Nodes,entries看有沒有註冊過,有就跳過
  4. 編碼mdnsTxt{},調用mdns.NewMDNSService()獲得一個新node,在調用mdns.NewServer()獲得mdns.Server,包裝到mdnsEntry{},放入entries,在存入m.domainsoptions.Domain
  5. 若是options.Domain != m.globalDomain,設置service.node中的Metadata["domain"]爲options.Domain,註冊到global Domain中

這裏是默認的mdns實現,實際使用中能夠指定consul,etcd等,具體的流程請見各自的Register()

go micro 分析系列文章
go micro server 啓動分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 網關鑑權
go micro 鏈路追蹤
go micro 熔斷與限流
go micro wrapper 中間件
go micro metrics 接入Prometheus、Grafana

相關文章
相關標籤/搜索