micro.newService()中newOptionsnode
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt }
初始化了一堆基礎設置,先來看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()
web
// NewBroker returns a new http broker func NewBroker(opts ...Option) Broker { return newHttpBroker(opts...) } func newHttpBroker(opts ...Option) Broker { options := Options{ Codec: json.Marshaler{}, Context: context.TODO(), Registry: registry.DefaultRegistry, } for _, o := range opts { o(&options) } // set address addr := DefaultAddress if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { addr = options.Addrs[0] } h := &httpBroker{ id: uuid.New().String(), address: addr, opts: options, r: options.Registry, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), exit: make(chan chan error), mux: http.NewServeMux(), inbox: make(map[string][][]byte), } // specify the message handler h.mux.Handle(DefaultPath, h) // get optional handlers if h.opts.Context != nil { handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) if ok { for pattern, handler := range handlers { h.mux.Handle(pattern, handler) } } } return h }
這裏作了幾件事json
h.mux.Handle(DefaultPath, h)
h就是httpBroker,在httpBroker中實現了ServeHTTP(),則全部請求都經過他來處理,即全部訂閱的消息處理都是經過httpBroker.ServeHTTP()來處理的segmentfault
下面看看example/broker, 一個broker的示例數組
var ( topic = "go.micro.topic.foo" ) func pub() { tick := time.NewTicker(time.Second) i := 0 for _ = range tick.C { msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) } i++ } } func sub() { _, err := broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil }) if err != nil { fmt.Println(err) } } func main() { cmd.Init() if err := broker.Init(); err != nil { log.Fatalf("Broker Init error: %v", err) } if err := broker.Connect(); err != nil { log.Fatalf("Broker Connect error: %v", err) } go pub() go sub() <-time.After(time.Second * 10) }
cmd.init()請見micro cmd篇,有詳細介紹
cmd.opts.Broker默認使用的是上文分析的httpBroker
先看broker.Init()
app
func (h *httpBroker) Init(opts ...Option) error { h.RLock() if h.running { h.RUnlock() return errors.New("cannot init while connected") } h.RUnlock() h.Lock() defer h.Unlock() for _, o := range opts { o(&h.opts) } if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { h.address = h.opts.Addrs[0] } if len(h.id) == 0 { h.id = "go.micro.http.broker-" + uuid.New().String() } // get registry reg := h.opts.Registry if reg == nil { reg = registry.DefaultRegistry } // get cache if rc, ok := h.r.(cache.Cache); ok { rc.Stop() } // set registry h.r = cache.New(reg) // reconfigure tls config if c := h.opts.TLSConfig; c != nil { h.c = &http.Client{ Transport: newTransport(c), } } return nil }
這裏作了如下幾件事情異步
上讀寫鎖,在進行後面操做async
再看broker.Connect()
tcp
func (h *httpBroker) Connect() error { h.RLock() if h.running { h.RUnlock() return nil } h.RUnlock() h.Lock() defer h.Unlock() var l net.Listener var err error if h.opts.Secure || h.opts.TLSConfig != nil { config := h.opts.TLSConfig fn := func(addr string) (net.Listener, error) { if config == nil { hosts := []string{addr} // check if its a valid host:port if host, _, err := net.SplitHostPort(addr); err == nil { if len(host) == 0 { hosts = maddr.IPs() } else { hosts = []string{host} } } // generate a certificate cert, err := mls.Certificate(hosts...) if err != nil { return nil, err } config = &tls.Config{Certificates: []tls.Certificate{cert}} } return tls.Listen("tcp", addr, config) } l, err = mnet.Listen(h.address, fn) } else { fn := func(addr string) (net.Listener, error) { return net.Listen("tcp", addr) } l, err = mnet.Listen(h.address, fn) } if err != nil { return err } addr := h.address h.address = l.Addr().String() go http.Serve(l, h.mux) go func() { h.run(l) h.Lock() h.opts.Addrs = []string{addr} h.address = addr h.Unlock() }() // get registry reg := h.opts.Registry if reg == nil { reg = registry.DefaultRegistry } // set cache h.r = cache.New(reg) // set running h.running = true return nil }
這裏作了如下幾件事情函數
上讀寫鎖,在進行後面操做
newHttpBroker()
中指定的handle函數ServeHTTP()
( 標準庫http提供了Handler接口,用於開發者實現本身的handler。只要實現接口的ServeHTTP方法便可。)看看剛纔的run()
作了什麼
func (h *httpBroker) run(l net.Listener) { t := time.NewTicker(registerInterval) defer t.Stop() for { select { // heartbeat for each subscriber case <-t.C: h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) } } h.RUnlock() // received exit signal case ch := <-h.exit: ch <- l.Close() h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { _ = h.r.Deregister(sub.svc) } } h.RUnlock() return } } }
這裏作了如下幾件事情
Deregister()
註銷subscribers中的訂閱接下來看看怎麼訂閱消息broker.Subscribe()
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { var err error var host, port string options := NewSubscribeOptions(opts...) // parse address for host, port host, port, err = net.SplitHostPort(h.Address()) if err != nil { return nil, err } addr, err := maddr.Extract(host) if err != nil { return nil, err } var secure bool if h.opts.Secure || h.opts.TLSConfig != nil { secure = true } // register service node := ®istry.Node{ Id: topic + "-" + h.id, Address: mnet.HostPort(addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), "broker": "http", "topic": topic, }, } // check for queue group or broadcast queue version := options.Queue if len(version) == 0 { version = broadcastVersion } service := ®istry.Service{ Name: serviceName, Version: version, Nodes: []*registry.Node{node}, } // generate subscriber subscriber := &httpSubscriber{ opts: options, hb: h, id: node.Id, topic: topic, fn: handler, svc: service, } // subscribe now if err := h.subscribe(subscriber); err != nil { return nil, err } // return the subscriber return subscriber, nil } func (h *httpBroker) subscribe(s *httpSubscriber) error { h.Lock() defer h.Unlock() if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { return err } h.subscribers[s.topic] = append(h.subscribers[s.topic], s) return nil }
這裏作了如下幾件事情
調用subscribe(),返回subscriber
獲取消息經過調用Subscribe時傳遞的處理函數,示例以下
broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil })
Event及httpEvent定義
// Event is given to a subscription handler for processing type Event interface { Topic() string Message() *Message Ack() error Error() error } type httpEvent struct { m *Message t string err error } func (h *httpEvent) Ack() error { return nil } func (h *httpEvent) Error() error { return h.err } func (h *httpEvent) Message() *Message { return h.m } func (h *httpEvent) Topic() string { return h.t }
能夠看到httpEvent實現了Event,這樣p.Message()就能夠獲得消息了
獲取消息在ServeHTTP()中,收到消息,調用傳入的fn處理便可
下面再看發佈消息,只須要定義broker.Message
,再調用broker.Publish()
便可,示例以下
msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) }
看看broker.Publish()
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { // create the message first m := &Message{ Header: make(map[string]string), Body: msg.Body, } for k, v := range msg.Header { m.Header[k] = v } m.Header["Micro-Topic"] = topic // encode the message b, err := h.opts.Codec.Marshal(m) if err != nil { return err } // save the message h.saveMessage(topic, b) // now attempt to get the service h.RLock() s, err := h.r.GetService(serviceName) if err != nil { h.RUnlock() return err } h.RUnlock() pub := func(node *registry.Node, t string, b []byte) error { scheme := "http" // check if secure is added in metadata if node.Metadata["secure"] == "true" { scheme = "https" } vals := url.Values{} vals.Add("id", node.Id) uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode()) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) if err != nil { return err } // discard response body io.Copy(ioutil.Discard, r.Body) r.Body.Close() return nil } srv := func(s []*registry.Service, b []byte) { for _, service := range s { var nodes []*registry.Node for _, node := range service.Nodes { // only use nodes tagged with broker http if node.Metadata["broker"] != "http" { continue } // look for nodes for the topic if node.Metadata["topic"] != topic { continue } nodes = append(nodes, node) } // only process if we have nodes if len(nodes) == 0 { continue } switch service.Version { // broadcast version means broadcast to all nodes case broadcastVersion: var success bool // publish to all nodes for _, node := range nodes { // publish async if err := pub(node, topic, b); err == nil { success = true } } // save if it failed to publish at least once if !success { h.saveMessage(topic, b) } default: // select node to publish to node := nodes[rand.Int()%len(nodes)] // publish async to one node if err := pub(node, topic, b); err != nil { // if failed save it h.saveMessage(topic, b) } } } } // do the rest async go func() { // get a third of the backlog messages := h.getMessage(topic, 8) delay := (len(messages) > 1) // publish all the messages for _, msg := range messages { // serialize here srv(s, msg) // sending a backlog of messages if delay { time.Sleep(time.Millisecond * 100) } } }() return nil }
這裏作了如下幾件事情
Micro-Topic
,值是topic調用saveMessage()保存消息
數組大於64條則只取前64條重賦值給inbox,【`震驚,後面直接丟了!!!`】
httpBroker.r.GetService(serviceName)
獲取service,serviceName默認"micro.http.broker"建立處理函數pub()
建立處理函數srv()
pub()
發送,失敗則從新調用saveMessage()
建立協程
調用getMessage()
srv()
處理每條消息,若是剛纔取出的消息大於1條,每次發送間隔Millisecond*100至此,整個broker的流程比較清晰了。
總結:
寫到這裏又要推薦你們看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker
go micro 分析系列文章
go micro server 啓動分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 網關鑑權
go micro 鏈路追蹤
go micro 熔斷與限流
go micro wrapper 中間件
go micro metrics 接入Prometheus、Grafana