Go Micro Server 源碼分析

概述

在Go Micro框架中,Server是對Broker、Register、Codec、Transort等服務的一個封裝,從下圖中就能夠看到。
圖片描述
再看一下Server定義的接口segmentfault

  • Init:初始化
  • Handler:註冊rpchandler
  • NewHandler:封裝rpchandler
  • NewSubscriber:封裝Subscriber給Subscribe方法
  • Subscribe://注入訂閱事件
  • Start:啓動服務、監聽端口、處理請求
  • Stop:中止服務,broker等資源關閉
  • String
type Server interface {
    Options() Options
    Init(...Option) error 
    Handle(Handler) error
    NewHandler(interface{}, ...HandlerOption) Handler
    NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
    Subscribe(Subscriber) error
    Start() error
    Stop() error
    String() string

主要方法 源碼 以rpcServer爲例

Init

server的初始化方法,初始化options以後 利用once函數初始化了Cmd框架

func (s *service) Init(opts ...Option) {
    // process options
    for _, o := range opts {
        o(&s.opts)
    }

    s.once.Do(func() {
        // Initialise the command flags, overriding new service
        _ = s.opts.Cmd.Init(
            cmd.Broker(&s.opts.Broker),
            cmd.Registry(&s.opts.Registry),
            cmd.Transport(&s.opts.Transport),
            cmd.Client(&s.opts.Client),
            cmd.Server(&s.opts.Server),
        )
    })
}

Handle

調用 router的handler方法把方法註冊到server中函數

func (s *rpcServer) Handle(h Handler) error {
    s.Lock()
    defer s.Unlock()

    if err := s.router.Handle(h); err != nil {
        return err
    }

    s.handlers[h.Name()] = h

    return nil
}

NewSubscriber

訂閱某個主題,調用subscriber中的newSubscriber
newSubscriber函數咱們在Go Micro Broker源碼分析文章中已經分析過這裏就不展開了oop

func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
    return newSubscriber(topic, sb, opts...)
}

Subscribe

Subscribe函數接受一個Subscriber的接口,下面能夠看到接口的定義
Subscribe函數接受到接口以後保存到rpcServer中的subscribers map中
在Deregister、Register、Subscribe方法中調用註冊的Subscriber() Unsubscribe()函數源碼分析

type Subscriber interface {
    Topic() string
    Subscriber() interface{}
    Endpoints() []*registry.Endpoint
    Options() SubscriberOptions
}
// broker.go 
type Subscriber interface {
    Options() SubscribeOptions
    Topic() string
    Unsubscribe() error
}

func (s *rpcServer) Subscribe(sb Subscriber) error {
    sub, ok := sb.(*subscriber)
    if !ok {
        return fmt.Errorf("invalid subscriber: expected *subscriber")
    }
    if len(sub.handlers) == 0 {
        return fmt.Errorf("invalid subscriber: no handler functions")
    }

    if err := validateSubscriber(sb); err != nil {
        return err
    }

    s.Lock()
    defer s.Unlock()
    _, ok = s.subscribers[sub]
    if ok {
        return fmt.Errorf("subscriber %v already exists", s)
    }
    s.subscribers[sub] = nil
    return nil
}

Start

start函數是server中最重要的函數,在service run的時候回調用server的start方法來正式開啓這個服務。
代碼以下
Go Micro Register 源碼分析這篇文章中已經分析過Start函數的中對於服務發現註冊的項管代碼這邊就不重複了。主要來看一下的是監聽端口、處理清酒、和初始化broker,整體來講就是實現了監聽端口/處理請求/初始化Broker。測試

func (s *rpcServer) Start() error {
    registerDebugHandler(s)
    config := s.Options()

    // 開始監聽transport 處理客戶端接收到的請求
    ts, err := config.Transport.Listen(config.Address)
    if err != nil {
        return err
    }

    log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())

    // swap address
    s.Lock()
    addr := s.opts.Address
    s.opts.Address = ts.Addr()
    s.Unlock()

    // 鏈接broker 用於訂閱服務
    if err := config.Broker.Connect(); err != nil {
        return err
    }

    bname := config.Broker.String()

    log.Logf("Broker [%s] Connected to %s", bname, config.Broker.Address())

    // 調用RegisterCheck檢查註冊服務是否可用 可從外部注入函數
    if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
        log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
    } else {
        // 註冊服務
        if err = s.Register(); err != nil {
            log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
        }
    }

    exit := make(chan bool)
    // 處理上面監聽的Transport.Listen listerer
    go func() {
        for {
            // 監聽到請求處理請求代碼 在Transport會深刻詳細說明
            err := ts.Accept(s.ServeConn)

            select {
            // check if we're supposed to exit
            case <-exit:
                return
            // check the error and backoff
            default:
                if err != nil {
                    log.Logf("Accept error: %v", err)
                    time.Sleep(time.Second)
                    continue
                }
            }

            // no error just exit
            return
        }
    }()
    // 開啓goroutine 檢查服務是否可用,間隔時間爲設置的RegisterInterval
    go func() {
        t := new(time.Ticker)

        // only process if it exists
        if s.opts.RegisterInterval > time.Duration(0) {
            // new ticker
            t = time.NewTicker(s.opts.RegisterInterval)
        }

        // return error chan
        var ch chan error

    Loop:
        for {
            select {
            // register self on interval
            case <-t.C:
                s.RLock()
                registered := s.registered
                s.RUnlock()
                // 調用Register組件中的RegisterCheck方法 測試服務是否正常
                if err = s.opts.RegisterCheck(s.opts.Context); err != nil && registered {
                    log.Logf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, err)
                    // deregister self in case of error
                    if err := s.Deregister(); err != nil {
                        log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
                    }
                } else {
                    if err := s.Register(); err != nil {
                        log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
                    }
                }
            // wait for exit
            case ch = <-s.exit:
                t.Stop()
                close(exit)
                break Loop
            }
        }

        // deregister self
        if err := s.Deregister(); err != nil {
            log.Logf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
        }

        // wait for requests to finish
        if s.wg != nil {
            s.wg.Wait()
        }

        // close transport listener
        ch <- ts.Close()

        // disconnect the broker
        config.Broker.Disconnect()

        // swap back address
        s.Lock()
        s.opts.Address = addr
        s.Unlock()
    }()

    return nil
}

總結

server是對於底層一些方法的封裝,好比實現服務的開啓、關閉、節點的註冊和訂閱的註冊。一樣級別的封裝在最上面圖能夠看到還有CLient,然而在Client和Server上層還有service,這些會在另外的地方再分析。spa

相關文章
相關標籤/搜索