micro.newService()中newOptionsnode
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt }
初始化了一堆基礎設置,來看看Registryregistry.DefaultRegistry,
在registry/registry.go中的DefaultRegistry = NewRegistry()
web
// NewRegistry returns a new default registry which is mdns func NewRegistry(opts ...Option) Registry { return newRegistry(opts...) } func newRegistry(opts ...Option) Registry { options := Options{ Context: context.Background(), Timeout: time.Millisecond * 100, } for _, o := range opts { o(&options) } // set the domain defaultDomain := DefaultDomain d, ok := options.Context.Value("mdns.domain").(string) if ok { defaultDomain = d } return &mdnsRegistry{ defaultDomain: defaultDomain, globalDomain: globalDomain, opts: options, domains: make(map[string]services), watchers: make(map[string]*mdnsWatcher), } }
這裏作了如下事情:segmentfault
在micro server篇中介紹了service的啓動過程service.Run()
中調用了s.Start()
,s.Start()
中調用了s.opts.Server.Start()
,這裏的s.opts.Server就是micro/defaults.go中定義的server.DefaultServer = gsrv.NewServer()
app
那咱們去看server/grpc/grpc.go中的Start()
dom
func (g *grpcServer) Start() error { g.RLock() if g.started { g.RUnlock() return nil } g.RUnlock() config := g.Options() // micro: config.Transport.Listen(config.Address) var ts net.Listener if l := g.getListener(); l != nil { ts = l } else { var err error // check the tls config for secure connect if tc := config.TLSConfig; tc != nil { ts, err = tls.Listen("tcp", config.Address, tc) // otherwise just plain tcp listener } else { ts, err = net.Listen("tcp", config.Address) } if err != nil { return err } } if g.opts.Context != nil { if c, ok := g.opts.Context.Value(maxConnKey{}).(int); ok && c > 0 { ts = netutil.LimitListener(ts, c) } } if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Server [grpc] Listening on %s", ts.Addr().String()) } g.Lock() g.opts.Address = ts.Addr().String() g.Unlock() // only connect if we're subscribed if len(g.subscribers) > 0 { // connect to the broker if err := config.Broker.Connect(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) } return err } if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) } } // announce self to the world if err := g.Register(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Server register error: %v", err) } } // micro: go ts.Accept(s.accept) go func() { if err := g.srv.Serve(ts); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("gRPC Server start error: %v", err) } } }() go func() { t := new(time.Ticker) // only process if it exists if g.opts.RegisterInterval > time.Duration(0) { // new ticker t = time.NewTicker(g.opts.RegisterInterval) } // return error chan var ch chan error Loop: for { select { // register self on interval case <-t.C: if err := g.Register(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error("Server register error: ", err) } } // wait for exit case ch = <-g.exit: break Loop } } // deregister self if err := g.Deregister(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error("Server deregister error: ", err) } } // wait for waitgroup if g.wg != nil { g.wg.Wait() } // stop the grpc server exit := make(chan bool) go func() { g.srv.GracefulStop() close(exit) }() select { case <-exit: case <-time.After(time.Second): g.srv.Stop() } // close transport ch <- nil if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) } // disconnect broker if err := config.Broker.Disconnect(); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) } } }() // mark the server as started g.Lock() g.started = true g.Unlock() return nil }
這個過程在micro server篇中有介紹,如今只看registry部分,註冊後開一個協程定時註冊tcp
g.Register()
註冊到服務發現oop
func (g *grpcServer) Register() error { g.RLock() rsvc := g.rsvc config := g.opts g.RUnlock() regFunc := func(service *registry.Service) error { var regErr error for i := 0; i < 3; i++ { // set the ttl and namespace rOpts := []registry.RegisterOption{ registry.RegisterTTL(config.RegisterTTL), registry.RegisterDomain(g.opts.Namespace), } // attempt to register if err := config.Registry.Register(service, rOpts...); err != nil { // set the error regErr = err // backoff then retry time.Sleep(backoff.Do(i + 1)) continue } // success so nil error regErr = nil break } return regErr } // if service already filled, reuse it and return early if rsvc != nil { if err := regFunc(rsvc); err != nil { return err } return nil } var err error var advt, host, port string var cacheService bool // check the advertise address first // if it exists then use it, otherwise // use the address if len(config.Advertise) > 0 { advt = config.Advertise } else { advt = config.Address } if cnt := strings.Count(advt, ":"); cnt >= 1 { // ipv6 address in format [host]:port or ipv4 host:port host, port, err = net.SplitHostPort(advt) if err != nil { return err } } else { host = advt } if ip := net.ParseIP(host); ip != nil { cacheService = true } addr, err := addr.Extract(host) if err != nil { return err } // make copy of metadata md := meta.Copy(config.Metadata) // register service node := ®istry.Node{ Id: config.Name + "-" + config.Id, Address: mnet.HostPort(addr, port), Metadata: md, } node.Metadata["broker"] = config.Broker.String() node.Metadata["registry"] = config.Registry.String() node.Metadata["server"] = g.String() node.Metadata["transport"] = g.String() node.Metadata["protocol"] = "grpc" g.RLock() // Maps are ordered randomly, sort the keys for consistency var handlerList []string for n, e := range g.handlers { // Only advertise non internal handlers if !e.Options().Internal { handlerList = append(handlerList, n) } } sort.Strings(handlerList) var subscriberList []*subscriber for e := range g.subscribers { // Only advertise non internal subscribers if !e.Options().Internal { subscriberList = append(subscriberList, e) } } sort.Slice(subscriberList, func(i, j int) bool { return subscriberList[i].topic > subscriberList[j].topic }) endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) for _, n := range handlerList { endpoints = append(endpoints, g.handlers[n].Endpoints()...) } for _, e := range subscriberList { endpoints = append(endpoints, e.Endpoints()...) } g.RUnlock() service := ®istry.Service{ Name: config.Name, Version: config.Version, Nodes: []*registry.Node{node}, Endpoints: endpoints, } g.RLock() registered := g.registered g.RUnlock() if !registered { if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), node.Id) } } // register the service if err := regFunc(service); err != nil { return err } // already registered? don't need to register subscribers if registered { return nil } g.Lock() defer g.Unlock() for sb := range g.subscribers { handler := g.createSubHandler(sb, g.opts) var opts []broker.SubscribeOption if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.Queue(queue)) } if cx := sb.Options().Context; cx != nil { opts = append(opts, broker.SubscribeContext(cx)) } if !sb.Options().AutoAck { opts = append(opts, broker.DisableAutoAck()) } if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Subscribing to topic: %s", sb.Topic()) } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil { return err } g.subscribers[sb] = []broker.Subscriber{sub} } g.registered = true if cacheService { g.rsvc = service } return nil }
這裏作了如下事情:this
rsvc := g.rsvc,定義regFunc()編碼
到registry/mdns_registry.go中看看Register()
spa
func (m *mdnsRegistry) Register(service *Service, opts ...RegisterOption) error { m.Lock() // parse the options var options RegisterOptions for _, o := range opts { o(&options) } if len(options.Domain) == 0 { options.Domain = m.defaultDomain } // create the domain in the memory store if it doesn't yet exist if _, ok := m.domains[options.Domain]; !ok { m.domains[options.Domain] = make(services) } // create the wildcard entry used for list queries in this domain entries, ok := m.domains[options.Domain][service.Name] if !ok { entry, err := createServiceMDNSEntry(service.Name, options.Domain) if err != nil { m.Unlock() return err } entries = append(entries, entry) } var gerr error for _, node := range service.Nodes { var seen bool for _, entry := range entries { if node.Id == entry.id { seen = true break } } // this node has already been registered, continue if seen { continue } txt, err := encode(&mdnsTxt{ Service: service.Name, Version: service.Version, Endpoints: service.Endpoints, Metadata: node.Metadata, }) if err != nil { gerr = err continue } host, pt, err := net.SplitHostPort(node.Address) if err != nil { gerr = err continue } port, _ := strconv.Atoi(pt) if logger.V(logger.DebugLevel, logger.DefaultLogger) { logger.Debugf("[mdns] registry create new service with ip: %s for: %s", net.ParseIP(host).String(), host) } // we got here, new node s, err := mdns.NewMDNSService( node.Id, service.Name, options.Domain+".", "", port, []net.IP{net.ParseIP(host)}, txt, ) if err != nil { gerr = err continue } srv, err := mdns.NewServer(&mdns.Config{Zone: s, LocalhostChecking: true}) if err != nil { gerr = err continue } entries = append(entries, &mdnsEntry{id: node.Id, node: srv}) } // save the mdns entry m.domains[options.Domain][service.Name] = entries m.Unlock() // register in the global Domain so it can be queried as one if options.Domain != m.globalDomain { srv := *service srv.Nodes = nil for _, n := range service.Nodes { node := n // set the original domain in node metadata if node.Metadata == nil { node.Metadata = map[string]string{"domain": options.Domain} } else { node.Metadata["domain"] = options.Domain } srv.Nodes = append(srv.Nodes, node) } if err := m.Register(service, append(opts, RegisterDomain(m.globalDomain))...); err != nil { gerr = err } } return gerr }
這裏作了如下事情:
這裏是默認的mdns實現,實際使用中能夠指定consul,etcd等,具體的流程請見各自的Register()
go micro 分析系列文章
go micro server 啓動分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 網關鑑權
go micro 鏈路追蹤
go micro 熔斷與限流
go micro wrapper 中間件
go micro metrics 接入Prometheus、Grafana