在Go Micro框架中,Server是對Broker、Register、Codec、Transort等服務的一個封裝,從下圖中就能夠看到。
再看一下Server定義的接口segmentfault
type Server interface { Options() Options Init(...Option) error Handle(Handler) error NewHandler(interface{}, ...HandlerOption) Handler NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber Subscribe(Subscriber) error Start() error Stop() error String() string
Init
server的初始化方法,初始化options以後 利用once函數初始化了Cmd框架
func (s *service) Init(opts ...Option) { // process options for _, o := range opts { o(&s.opts) } s.once.Do(func() { // Initialise the command flags, overriding new service _ = s.opts.Cmd.Init( cmd.Broker(&s.opts.Broker), cmd.Registry(&s.opts.Registry), cmd.Transport(&s.opts.Transport), cmd.Client(&s.opts.Client), cmd.Server(&s.opts.Server), ) }) }
Handle
調用 router的handler方法把方法註冊到server中函數
func (s *rpcServer) Handle(h Handler) error { s.Lock() defer s.Unlock() if err := s.router.Handle(h); err != nil { return err } s.handlers[h.Name()] = h return nil }
NewSubscriber
訂閱某個主題,調用subscriber中的newSubscriber
newSubscriber函數咱們在Go Micro Broker源碼分析文章中已經分析過這裏就不展開了oop
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { return newSubscriber(topic, sb, opts...) }
Subscribe
Subscribe函數接受一個Subscriber的接口,下面能夠看到接口的定義
Subscribe函數接受到接口以後保存到rpcServer中的subscribers map中
在Deregister、Register、Subscribe方法中調用註冊的Subscriber() Unsubscribe()函數源碼分析
type Subscriber interface { Topic() string Subscriber() interface{} Endpoints() []*registry.Endpoint Options() SubscriberOptions } // broker.go type Subscriber interface { Options() SubscribeOptions Topic() string Unsubscribe() error } func (s *rpcServer) Subscribe(sb Subscriber) error { sub, ok := sb.(*subscriber) if !ok { return fmt.Errorf("invalid subscriber: expected *subscriber") } if len(sub.handlers) == 0 { return fmt.Errorf("invalid subscriber: no handler functions") } if err := validateSubscriber(sb); err != nil { return err } s.Lock() defer s.Unlock() _, ok = s.subscribers[sub] if ok { return fmt.Errorf("subscriber %v already exists", s) } s.subscribers[sub] = nil return nil }
Start
start函數是server中最重要的函數,在service run的時候回調用server的start方法來正式開啓這個服務。
代碼以下
在Go Micro Register 源碼分析這篇文章中已經分析過Start函數的中對於服務發現註冊的項管代碼這邊就不重複了。主要來看一下的是監聽端口、處理清酒、和初始化broker
,整體來講就是實現了監聽端口/處理請求/初始化Broker。測試
func (s *rpcServer) Start() error { registerDebugHandler(s) config := s.Options() // 開始監聽transport 處理客戶端接收到的請求 ts, err := config.Transport.Listen(config.Address) if err != nil { return err } log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) // swap address s.Lock() addr := s.opts.Address s.opts.Address = ts.Addr() s.Unlock() // 鏈接broker 用於訂閱服務 if err := config.Broker.Connect(); err != nil { return err } bname := config.Broker.String() log.Logf("Broker [%s] Connected to %s", bname, config.Broker.Address()) // 調用RegisterCheck檢查註冊服務是否可用 可從外部注入函數 if err = s.opts.RegisterCheck(s.opts.Context); err != nil { log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err) } else { // 註冊服務 if err = s.Register(); err != nil { log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) } } exit := make(chan bool) // 處理上面監聽的Transport.Listen listerer go func() { for { // 監聽到請求處理請求代碼 在Transport會深刻詳細說明 err := ts.Accept(s.ServeConn) select { // check if we're supposed to exit case <-exit: return // check the error and backoff default: if err != nil { log.Logf("Accept error: %v", err) time.Sleep(time.Second) continue } } // no error just exit return } }() // 開啓goroutine 檢查服務是否可用,間隔時間爲設置的RegisterInterval go func() { t := new(time.Ticker) // only process if it exists if s.opts.RegisterInterval > time.Duration(0) { // new ticker t = time.NewTicker(s.opts.RegisterInterval) } // return error chan var ch chan error Loop: for { select { // register self on interval case <-t.C: s.RLock() registered := s.registered s.RUnlock() // 調用Register組件中的RegisterCheck方法 測試服務是否正常 if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered { log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err) // deregister self in case of error if err := s.Deregister(); err != nil { log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) } } else { if err := s.Register(); err != nil { log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err) } } // wait for exit case ch = <-s.exit: t.Stop() close(exit) break Loop } } // deregister self if err := s.Deregister(); err != nil { log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err) } // wait for requests to finish if s.wg != nil { s.wg.Wait() } // close transport listener ch <- ts.Close() // disconnect the broker config.Broker.Disconnect() // swap back address s.Lock() s.opts.Address = addr s.Unlock() }() return nil }
server是對於底層一些方法的封裝,好比實現服務的開啓、關閉、節點的註冊和訂閱的註冊。一樣級別的封裝在最上面圖能夠看到還有CLient,然而在Client和Server上層還有service,這些會在另外的地方再分析。spa