本章節闡述micro消息訂閱和發佈相關內容git
微服務之間須要經過broker來傳遞消息,go-micro支持http/nats/memory三種broker,其中http是默認的broker。github
同時,go-micro以強大的插件形式,提供以下幾種常見的broker。golang
$ls gocloud/ googlepubsub/ grpc/ kafka/ mqtt/ nats/ nsq/ proxy/ rabbitmq/ redis/ snssqs/ sqs/ stan/ stomp/
HTTP Broker 是基於HTTP的異步broker,源代碼在github.com\micro\go-micro@v1.9.1\broker\broker.go
中,默認DefaultBroker爲httpweb
var ( DefaultBroker Broker = newHttpBroker() )
httpbroker實際上就是一個結構體redis
type httpBroker struct { id string //微服務ID address string //主機地址 opts Options //一些配置 mux *http.ServeMux //經過這個監聽其餘端發送的http請求 c *http.Client //經過這個發送請求到其餘端 r registry.Registry sync.RWMutex subscribers map[string][]*httpSubscriber //訂閱 running bool exit chan chan error // offline message inbox mtx sync.RWMutex inbox map[string][][]byte //數據緩存 }
經過http.Client
發送請求,經過http.ServeMux
實現請求監聽,經過inbox
存儲數據shell
redis初始化代碼以下json
//main.go //初始化URL格式redis://密碼@主機:端口/ b := redis.NewBroker( broker.Addrs("redis://user:secret@localhost:6379/"), ) //初始化 b.Init() //鏈接 b.Connect() // 新建service service := grpc.NewService( micro.Name("go.micro.web.config"), micro.Version("latest"), micro.Broker(b), ) //初始化service service.Init() //啓動,運行,監聽 service.Run()
啓動應用程序須要指定broker爲redissegmentfault
go run main.go --broker=redis
初始化過程以下api
//main.go import ( "github.com/micro/go-plugins/broker/grpc" ) // 創建鏈接 b := grpc.NewBroker() b.Init() b.Connect() // 訂閱事件 sub, _ := b.Subscribe("events") defer sub.Unsubscribe() // 發佈事件 b.Publish("events", &broker.Message{ Headers: map[string]string{"type": "event"}, Body: []byte(`an event`), })
啓動應用程序須要指定broker爲grpc緩存
go run main.go --broker=grpc
初始化過程以下
//main.go import ( "github.com/micro/go-plugins/broker/grpc" ) b := rabbitmq.NewBroker( broker.Addrs("amqp://用戶名:密碼@主機host:端口port"), ) b.Init() b.Connect()
啓動應用程序須要指定broker爲rabbitmq
go run main.go plugin.go --broker=rabbitmq
初始化過程以下
//main.go import ( "github.com/micro/go-micro" "github.com/micro/go-plugins/broker/mqtt" ) func main() { service := micro.NewService( micro.Name("my.service"), micro.Broker(mqtt.NewBroker()), ) //... }
啓動應用程序須要指定broker爲mqtt
go run main.go plugin.go --broker=mqtt
其餘能夠閱讀代碼
$GOPATH/src/github.com/micro/go-plugins/broker
消息訂閱主要API接口以下,第一個參數標識消息主題,第二個參數表示服務實例。
// Register Struct as Subscriber micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), new(subscriber.Testsrv)) // Register Function as Subscriber micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), subscriber.Handler)
重點注意第三個參數,第三個參數是處理函數,能夠是函數,也能夠是實現了
func Handler(ctx context.Context, msg *testsrv.Message) error
方法的結構體,micro內部會根據參數類型自動適配。結構體中能夠實現多個func Handler(ctx context.Context, msg *testsrv.Message) error
類型方法
Broker提供以下接口
type Broker interface { Init(...Option) error Options() Options Address() string Connect() error Disconnect() error Publish(topic string, m *Message, opts ...PublishOption) error Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) String() string }
在上述涉及處處理函數handler,具體含義以下
type Handler func(Event) error // Event is given to a subscription handler for processing type Event interface { Topic() string Message() *Message Ack() error } type Message struct { Header map[string]string Body []byte }
舉例以下
// 創建鏈接 b := grpc.NewBroker() b.Init() b.Connect() // 訂閱事件 sub, _ := b.Subscribe("events") defer sub.Unsubscribe() // 發佈事件 b.Publish("events", &broker.Message{ Headers: map[string]string{"type": "event"}, Body: []byte(`an event`), })
舉例以下
micro publish "go.micro.web.config" "hello"
下載代碼broker.zip
解壓到techidea8.com/microapp/broker
下運行,效果圖忑
micro publish go.micro.srv.broker "{\"say\":\"這是測試消息\"}"
關注公衆號回覆micro-broker
便可得到
推薦閱讀
掃微信二維碼實現網站登錄提供體驗地址和源代碼
開源項目golang go語言後臺管理框架restgo-admin
支持手勢觸摸,可左右滑動的日曆插件
你必須知道的18個互聯網業務模型