在第一篇概述文章中已經提到了在Micro中 Broker的做用,Go Micro 整體設計。咱們也知道Micro是一個可插拔的分佈式框架,咱們能夠使用kafka,rabbitmq,cache,redis,nats等各類實現具體能夠在git上的插件庫中找到go-plugins
咱們再來看一下接口:node
type Broker interface { Init(...Option) error Options() Options Address() string Connect() error Disconnect() error Publish(topic string, m *Message, opts ...PublishOption) error Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) String() string }
Connect 開啓broker
Subsribe 註冊對某個topic的監聽
Publish 發佈某個topic的消息
Disconnect 關閉brokergit
本片文章使用的是默認broker實現httpbroker分析,是一個比較簡單和容易理解的方式,代碼地址。github
PubTopic函數爲發佈函數
SubTopic函數爲訂閱函數
下面的例子其實不須要不少註釋,就是簡單的使用一個發佈一個訂閱函數來實現。
package main import ( "fmt" "github.com/micro/go-micro/broker" "log" "time" ) func main() { if err := broker.Connect(); err != nil { fmt.Println("Broker Connect error: %v", err) } go PubTopic("SubServerName") go SubTopic("SubServerName") time.Sleep(time.Second * 10) } func PubTopic(topic string) { tick := time.NewTicker(time.Second) i := 0 for _ = range tick.C { msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) } i++ } } func SubTopic(topic string) { _, err := broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil }) if err != nil { fmt.Println(err) } }
[pub] pubbed message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 [sub] received message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 header map[id:0] [pub] pubbed message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 [sub] received message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 header map[id:1] [pub] pubbed message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 [sub] received message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 header map[id:2] [pub] pubbed message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 [sub] received message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 header map[id:3] [pub] pubbed message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 [sub] received message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 header map[id:4]
Connect
- 開啓tcp監聽
- 啓一個goroutine,在registerInterval間隔對subscriber就行註冊,相似心跳
- 設置服務發現註冊服務
- 設置緩存對象
- 設置running = true
func (h *httpBroker) Connect() error { h.RLock() if h.running { h.RUnlock() return nil } h.RUnlock() h.Lock() defer h.Unlock() var l net.Listener var err error // 建立監聽函數 判斷是否有配置 if h.opts.Secure || h.opts.TLSConfig != nil { config := h.opts.TLSConfig fn := func(addr string) (net.Listener, error) { if config == nil { hosts := []string{addr} // check if its a valid host:port if host, _, err := net.SplitHostPort(addr); err == nil { if len(host) == 0 { hosts = maddr.IPs() } else { hosts = []string{host} } } // generate a certificate cert, err := mls.Certificate(hosts...) if err != nil { return nil, err } config = &tls.Config{Certificates: []tls.Certificate{cert}} } return tls.Listen("tcp", addr, config) } l, err = mnet.Listen(h.address, fn) } else { fn := func(addr string) (net.Listener, error) { return net.Listen("tcp", addr) } l, err = mnet.Listen(h.address, fn) } if err != nil { return err } addr := h.address h.address = l.Addr().String() go http.Serve(l, h.mux) go func() { // 根據設置的registerInterval 心跳時間,檢測服務是否存活 h.run(l) h.Lock() h.opts.Addrs = []string{addr} h.address = addr h.Unlock() }() // get registry reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) if !ok { reg = registry.DefaultRegistry } // set cache h.r = cache.New(reg) // set running h.running = true return nil }
Subscribe
- 解析address
- 建立惟一id
- 拼裝服務信息 最後的服務信息以下圖
- 調用Register(默認的是mdns)註冊服務
- 把service放到subscribers map[string][]*httpSubscriber中
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { options := NewSubscribeOptions(opts...) // parse address for host, port parts := strings.Split(h.Address(), ":") host := strings.Join(parts[:len(parts)-1], ":") port, _ := strconv.Atoi(parts[len(parts)-1]) addr, err := maddr.Extract(host) if err != nil { return nil, err } // create unique id id := h.id + "." + uuid.New().String() var secure bool if h.opts.Secure || h.opts.TLSConfig != nil { secure = true } // register service node := ®istry.Node{ Id: id, Address: fmt.Sprintf("%s:%d", addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), }, } // check for queue group or broadcast queue version := options.Queue if len(version) == 0 { version = broadcastVersion } service := ®istry.Service{ Name: "topic:" + topic, Version: version, Nodes: []*registry.Node{node}, } // 組裝sub subscriber := &httpSubscriber{ opts: options, hb: h, id: id, topic: topic, fn: handler, svc: service, } // 調用subscribe函數 內部調用register 註冊服務 if err := h.subscribe(subscriber); err != nil { return nil, err } // return the subscriber return subscriber, nil }
Publish
- 從registry獲取topic的消費者節點
- 對消息進行編碼
- 依次把編碼後的消息異步publish到節點(http post)
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { // create the message first m := &Message{ Header: make(map[string]string), Body: msg.Body, } for k, v := range msg.Header { m.Header[k] = v } m.Header[":topic"] = topic // encode the message b, err := h.opts.Codec.Marshal(m) if err != nil { return err } // save the message h.saveMessage(topic, b) // now attempt to get the service h.RLock() s, err := h.r.GetService("topic:" + topic) if err != nil { h.RUnlock() // ignore error return nil } h.RUnlock() pub := func(node *registry.Node, t string, b []byte) error { scheme := "http" // check if secure is added in metadata if node.Metadata["secure"] == "true" { scheme = "https" } vals := url.Values{} vals.Add("id", node.Id) uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode()) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) if err != nil { return err } // discard response body io.Copy(ioutil.Discard, r.Body) r.Body.Close() return nil } srv := func(s []*registry.Service, b []byte) { for _, service := range s { // only process if we have nodes if len(service.Nodes) == 0 { continue } switch service.Version { // broadcast version means broadcast to all nodes case broadcastVersion: var success bool // publish to all nodes for _, node := range service.Nodes { // publish async if err := pub(node, topic, b); err == nil { success = true } } // save if it failed to publish at least once if !success { h.saveMessage(topic, b) } default: // select node to publish to node := service.Nodes[rand.Int()%len(service.Nodes)] // publish async to one node if err := pub(node, topic, b); err != nil { // if failed save it h.saveMessage(topic, b) } } } } // do the rest async go func() { // get a third of the backlog messages := h.getMessage(topic, 8) delay := (len(messages) > 1) // publish all the messages for _, msg := range messages { // serialize here srv(s, msg) // sending a backlog of messages if delay { time.Sleep(time.Millisecond * 100) } } }() return nil }
默認的broker實現是用http,不過通常狀況下不會使用http來實現發佈和訂閱的。咱們能夠使用kafka,redis等來實現發佈和訂閱。redis