go micro broker

micro.newService()中newOptionsnode

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {
        o(&opt)
    }

    return opt
}

初始化了一堆基礎設置,先來看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()web

// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
    return newHttpBroker(opts...)
}

func newHttpBroker(opts ...Option) Broker {
    options := Options{
        Codec:    json.Marshaler{},
        Context:  context.TODO(),
        Registry: registry.DefaultRegistry,
    }

    for _, o := range opts {
        o(&options)
    }

    // set address
    addr := DefaultAddress

    if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
        addr = options.Addrs[0]
    }

    h := &httpBroker{
        id:          uuid.New().String(),
        address:     addr,
        opts:        options,
        r:           options.Registry,
        c:           &http.Client{Transport: newTransport(options.TLSConfig)},
        subscribers: make(map[string][]*httpSubscriber),
        exit:        make(chan chan error),
        mux:         http.NewServeMux(),
        inbox:       make(map[string][][]byte),
    }

    // specify the message handler
    h.mux.Handle(DefaultPath, h)

    // get optional handlers
    if h.opts.Context != nil {
        handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
        if ok {
            for pattern, handler := range handlers {
                h.mux.Handle(pattern, handler)
            }
        }
    }

    return h
}

這裏作了幾件事json

  1. 初始化options,設置Codec爲json,設置ctx,Registry
  2. 初始化httpBroker,設置http.Client時調用newTransport()設置代理,同時啓用http2,最後指定message handler

h.mux.Handle(DefaultPath, h)h就是httpBroker,在httpBroker中實現了ServeHTTP(),則全部請求都經過他來處理,即全部訂閱的消息處理都是經過httpBroker.ServeHTTP()來處理的segmentfault

  1. 若是ctx不爲空,就取http_handlers數組,依次註冊http handle

下面看看example/broker, 一個broker的示例數組

var (
    topic = "go.micro.topic.foo"
)

func pub() {
    tick := time.NewTicker(time.Second)
    i := 0
    for _ = range tick.C {
        msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {
            log.Printf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func sub() {
    _, err := broker.Subscribe(topic, func(p broker.Event) error {
        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })
    if err != nil {
        fmt.Println(err)
    }
}

func main() {
    cmd.Init()

    if err := broker.Init(); err != nil {
        log.Fatalf("Broker Init error: %v", err)
    }
    if err := broker.Connect(); err != nil {
        log.Fatalf("Broker Connect error: %v", err)
    }

    go pub()
    go sub()

    <-time.After(time.Second * 10)
}

cmd.init()請見micro cmd篇,有詳細介紹
cmd.opts.Broker默認使用的是上文分析的httpBroker
先看broker.Init()app

func (h *httpBroker) Init(opts ...Option) error {
    h.RLock()
    if h.running {
        h.RUnlock()
        return errors.New("cannot init while connected")
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    for _, o := range opts {
        o(&h.opts)
    }

    if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
        h.address = h.opts.Addrs[0]
    }

    if len(h.id) == 0 {
        h.id = "go.micro.http.broker-" + uuid.New().String()
    }

    // get registry
    reg := h.opts.Registry
    if reg == nil {
        reg = registry.DefaultRegistry
    }

    // get cache
    if rc, ok := h.r.(cache.Cache); ok {
        rc.Stop()
    }

    // set registry
    h.r = cache.New(reg)

    // reconfigure tls config
    if c := h.opts.TLSConfig; c != nil {
        h.c = &http.Client{
            Transport: newTransport(c),
        }
    }

    return nil
}

這裏作了如下幾件事情異步

  1. 上讀鎖,檢查是否正在運行
  2. 上讀寫鎖,在進行後面操做async

    1. 設置opt
    2. 設置address,id
    3. 獲取Registry,cache,設置registry
    4. 設置http.Client中Transport的tls

再看broker.Connect()tcp

func (h *httpBroker) Connect() error {
    h.RLock()
    if h.running {
        h.RUnlock()
        return nil
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    var l net.Listener
    var err error

    if h.opts.Secure || h.opts.TLSConfig != nil {
        config := h.opts.TLSConfig

        fn := func(addr string) (net.Listener, error) {
            if config == nil {
                hosts := []string{addr}

                // check if its a valid host:port
                if host, _, err := net.SplitHostPort(addr); err == nil {
                    if len(host) == 0 {
                        hosts = maddr.IPs()
                    } else {
                        hosts = []string{host}
                    }
                }

                // generate a certificate
                cert, err := mls.Certificate(hosts...)
                if err != nil {
                    return nil, err
                }
                config = &tls.Config{Certificates: []tls.Certificate{cert}}
            }
            return tls.Listen("tcp", addr, config)
        }

        l, err = mnet.Listen(h.address, fn)
    } else {
        fn := func(addr string) (net.Listener, error) {
            return net.Listen("tcp", addr)
        }

        l, err = mnet.Listen(h.address, fn)
    }

    if err != nil {
        return err
    }

    addr := h.address
    h.address = l.Addr().String()

    go http.Serve(l, h.mux)
    go func() {
        h.run(l)
        h.Lock()
        h.opts.Addrs = []string{addr}
        h.address = addr
        h.Unlock()
    }()

    // get registry
    reg := h.opts.Registry
    if reg == nil {
        reg = registry.DefaultRegistry
    }
    // set cache
    h.r = cache.New(reg)

    // set running
    h.running = true
    return nil
}

這裏作了如下幾件事情函數

  1. 上讀鎖,檢查是否正在運行
  2. 上讀寫鎖,在進行後面操做

    1. 若是有Secure和TLSConfig,作一些tls的設置,沒有則直接返回默認net.Listener
    2. 開一個協程運行http.Serve,處理請求是newHttpBroker()中指定的handle函數ServeHTTP()( 標準庫http提供了Handler接口,用於開發者實現本身的handler。只要實現接口的ServeHTTP方法便可。)
    3. 開一個協程運行run(),後面看,設置地址
    4. 獲取Registry,設置cache
    5. 標記httpBroker正在運行

看看剛纔的run()作了什麼

func (h *httpBroker) run(l net.Listener) {
    t := time.NewTicker(registerInterval)
    defer t.Stop()

    for {
        select {
        // heartbeat for each subscriber
        case <-t.C:
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
                }
            }
            h.RUnlock()
        // received exit signal
        case ch := <-h.exit:
            ch <- l.Close()
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {
                    _ = h.r.Deregister(sub.svc)
                }
            }
            h.RUnlock()
            return
        }
    }
}

這裏作了如下幾件事情

  1. 設置默認30秒一次的時間觸發器,每一個週期都在服務發現中心依次註冊subscribers中的訂閱
  2. 接受退出消息,依次調用Deregister()註銷subscribers中的訂閱

接下來看看怎麼訂閱消息broker.Subscribe()

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
    var err error
    var host, port string
    options := NewSubscribeOptions(opts...)

    // parse address for host, port
    host, port, err = net.SplitHostPort(h.Address())
    if err != nil {
        return nil, err
    }

    addr, err := maddr.Extract(host)
    if err != nil {
        return nil, err
    }

    var secure bool

    if h.opts.Secure || h.opts.TLSConfig != nil {
        secure = true
    }

    // register service
    node := &registry.Node{
        Id:      topic + "-" + h.id,
        Address: mnet.HostPort(addr, port),
        Metadata: map[string]string{
            "secure": fmt.Sprintf("%t", secure),
            "broker": "http",
            "topic":  topic,
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {
        version = broadcastVersion
    }

    service := &registry.Service{
        Name:    serviceName,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // generate subscriber
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    node.Id,
        topic: topic,
        fn:    handler,
        svc:   service,
    }

    // subscribe now
    if err := h.subscribe(subscriber); err != nil {
        return nil, err
    }

    // return the subscriber
    return subscriber, nil
}

func (h *httpBroker) subscribe(s *httpSubscriber) error {
    h.Lock()
    defer h.Unlock()

    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
        return err
    }

    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
    return nil
}

這裏作了如下幾件事情

  1. 初始化SubscribeOptions
  2. 解析Address中host, port,並驗證ip
  3. 初始化registry.Node{}
  4. 檢查options.Queue,設置registry.Service{},
  5. 生成訂閱信息結構體httpSubscriber{}
  6. 調用subscribe(),返回subscriber

    1. 開讀寫鎖,把訂閱服務(Connect()中開了http.Serve())註冊到服務發現(默認mdns),發消息的時候經過服務發現找node,就是往這些註冊的服務中發了
    2. 訂閱頻道記錄到h.subscribers中

獲取消息經過調用Subscribe時傳遞的處理函數,示例以下

broker.Subscribe(topic, func(p broker.Event) error {
        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })

Event及httpEvent定義

// Event is given to a subscription handler for processing
type Event interface {
    Topic() string
    Message() *Message
    Ack() error
    Error() error
}

type httpEvent struct {
    m   *Message
    t   string
    err error
}

func (h *httpEvent) Ack() error {
    return nil
}

func (h *httpEvent) Error() error {
    return h.err
}

func (h *httpEvent) Message() *Message {
    return h.m
}

func (h *httpEvent) Topic() string {
    return h.t
}

能夠看到httpEvent實現了Event,這樣p.Message()就能夠獲得消息了
獲取消息在ServeHTTP()中,收到消息,調用傳入的fn處理便可

下面再看發佈消息,只須要定義broker.Message,再調用broker.Publish()便可,示例以下

msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {
            log.Printf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }

看看broker.Publish()

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
    // create the message first
    m := &Message{
        Header: make(map[string]string),
        Body:   msg.Body,
    }

    for k, v := range msg.Header {
        m.Header[k] = v
    }

    m.Header["Micro-Topic"] = topic

    // encode the message
    b, err := h.opts.Codec.Marshal(m)
    if err != nil {
        return err
    }

    // save the message
    h.saveMessage(topic, b)

    // now attempt to get the service
    h.RLock()
    s, err := h.r.GetService(serviceName)
    if err != nil {
        h.RUnlock()
        return err
    }
    h.RUnlock()

    pub := func(node *registry.Node, t string, b []byte) error {
        scheme := "http"

        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {
            scheme = "https"
        }

        vals := url.Values{}
        vals.Add("id", node.Id)

        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err != nil {
            return err
        }

        // discard response body
        io.Copy(ioutil.Discard, r.Body)
        r.Body.Close()
        return nil
    }

    srv := func(s []*registry.Service, b []byte) {
        for _, service := range s {
            var nodes []*registry.Node

            for _, node := range service.Nodes {
                // only use nodes tagged with broker http
                if node.Metadata["broker"] != "http" {
                    continue
                }

                // look for nodes for the topic
                if node.Metadata["topic"] != topic {
                    continue
                }

                nodes = append(nodes, node)
            }

            // only process if we have nodes
            if len(nodes) == 0 {
                continue
            }

            switch service.Version {
            // broadcast version means broadcast to all nodes
            case broadcastVersion:
                var success bool

                // publish to all nodes
                for _, node := range nodes {
                    // publish async
                    if err := pub(node, topic, b); err == nil {
                        success = true
                    }
                }

                // save if it failed to publish at least once
                if !success {
                    h.saveMessage(topic, b)
                }
            default:
                // select node to publish to
                node := nodes[rand.Int()%len(nodes)]

                // publish async to one node
                if err := pub(node, topic, b); err != nil {
                    // if failed save it
                    h.saveMessage(topic, b)
                }
            }
        }
    }

    // do the rest async
    go func() {
        // get a third of the backlog
        messages := h.getMessage(topic, 8)
        delay := (len(messages) > 1)

        // publish all the messages
        for _, msg := range messages {
            // serialize here
            srv(s, msg)

            // sending a backlog of messages
            if delay {
                time.Sleep(time.Millisecond * 100)
            }
        }
    }()

    return nil
}

這裏作了如下幾件事情

  1. 建立消息,header中添加一個Micro-Topic,值是topic
  2. 編碼消息,默認json
  3. 調用saveMessage()保存消息

    1. httpBroker.mtx加鎖
    2. 在httpBroker.inbox中添加這條消息,若是inbox
數組大於64條則只取前64條重賦值給inbox,【`震驚,後面直接丟了!!!`】
  1. 調用httpBroker.r.GetService(serviceName)獲取service,serviceName默認"micro.http.broker"
  2. 建立處理函數pub()

    1. 肯定scheme[http或https]
    2. url參數中增長id,值爲node.id
    3. 拼接uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
    4. post發起請求,返回結果放入ioutil.Discard(就是丟掉了),關閉返回body
  3. 建立處理函數srv()

    1. 收集全部可用的registry.Node
    2. 判斷消息廣播仍是指定了Queue(指定了Queue隨機選一個node),根據狀況異步調用pub()發送,失敗則從新調用saveMessage()
  4. 建立協程

    1. 調用getMessage()

      1. 開讀寫鎖h.mtx.Lock()
      2. 從inbox取出指定條數的消息
    2. 依次用第6步的srv()處理每條消息,若是剛纔取出的消息大於1條,每次發送間隔Millisecond*100

至此,整個broker的流程比較清晰了。

總結:

  1. 默認的http broker,訂閱就是開了一個http服務手消息,發佈就是從服務發現拿到節點信息,往節點post數據。
  2. 實際使用中一般能夠指定etcd,consul這樣的服務發現。
  3. 消息inbox最多隻放64條,儘可能避免消息堆積,消息最好寫日誌

寫到這裏又要推薦你們看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker

go micro 分析系列文章
go micro server 啓動分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 網關鑑權
go micro 鏈路追蹤
go micro 熔斷與限流
go micro wrapper 中間件
go micro metrics 接入Prometheus、Grafana

相關文章
相關標籤/搜索