本章節闡述micro消息訂閱和發佈相關內容git
微服務之間須要經過broker來傳遞消息,go-micro支持http/nats/memory三種broker,其中http是默認的broker。github
同時,go-micro以強大的插件形式,提供以下幾種常見的broker。golang
$ls
gocloud/ googlepubsub/ grpc/ kafka/ mqtt/ nats/ nsq/ proxy/ rabbitmq/ redis/ snssqs/ sqs/ stan/ stomp/
複製代碼
HTTP Broker 是基於HTTP的異步broker,源代碼在github.com\micro\go-micro@v1.9.1\broker\broker.go
中,默認DefaultBroker爲httpweb
var (
DefaultBroker Broker = newHttpBroker()
)
複製代碼
httpbroker實際上就是一個結構體redis
type httpBroker struct {
id string //微服務ID
address string //主機地址
opts Options //一些配置
mux *http.ServeMux //經過這個監聽其餘端發送的http請求
c *http.Client //經過這個發送請求到其餘端
r registry.Registry
sync.RWMutex
subscribers map[string][]*httpSubscriber //訂閱
running bool
exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte //數據緩存
}
複製代碼
經過http.Client
發送請求,經過http.ServeMux
實現請求監聽,經過inbox
存儲數據shell
redis初始化代碼以下json
//main.go
//初始化URL格式redis://密碼@主機:端口/
b := redis.NewBroker(
broker.Addrs("redis://user:secret@localhost:6379/"),
)
//初始化
b.Init()
//鏈接
b.Connect()
// 新建service
service := grpc.NewService(
micro.Name("go.micro.web.config"),
micro.Version("latest"),
micro.Broker(b),
)
//初始化service
service.Init()
//啓動,運行,監聽
service.Run()
複製代碼
啓動應用程序須要指定broker爲redisapi
go run main.go --broker=redis
複製代碼
初始化過程以下緩存
//main.go
import (
"github.com/micro/go-plugins/broker/grpc"
)
// 創建鏈接
b := grpc.NewBroker()
b.Init()
b.Connect()
// 訂閱事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()
// 發佈事件
b.Publish("events", &broker.Message{
Headers: map[string]string{"type": "event"},
Body: []byte(`an event`),
})
複製代碼
啓動應用程序須要指定broker爲grpcbash
go run main.go --broker=grpc
複製代碼
初始化過程以下
//main.go
import (
"github.com/micro/go-plugins/broker/grpc"
)
b := rabbitmq.NewBroker(
broker.Addrs("amqp://用戶名:密碼@主機host:端口port"),
)
b.Init()
b.Connect()
複製代碼
啓動應用程序須要指定broker爲rabbitmq
go run main.go plugin.go --broker=rabbitmq
複製代碼
初始化過程以下
//main.go
import (
"github.com/micro/go-micro"
"github.com/micro/go-plugins/broker/mqtt"
)
func main() {
service := micro.NewService(
micro.Name("my.service"),
micro.Broker(mqtt.NewBroker()),
)
//...
}
複製代碼
啓動應用程序須要指定broker爲mqtt
go run main.go plugin.go --broker=mqtt
複製代碼
其餘能夠閱讀代碼
$GOPATH/src/github.com/micro/go-plugins/broker
複製代碼
消息訂閱主要API接口以下,第一個參數標識消息主題,第二個參數表示服務實例。
// Register Struct as Subscriber
micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), new(subscriber.Testsrv))
// Register Function as Subscriber
micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), subscriber.Handler)
複製代碼
重點注意第三個參數,第三個參數是處理函數,能夠是函數,也能夠是實現了
func Handler(ctx context.Context, msg *testsrv.Message) error
方法的結構體,micro內部會根據參數類型自動適配。結構體中能夠實現多個func Handler(ctx context.Context, msg *testsrv.Message) error
類型方法
Broker提供以下接口
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
複製代碼
Subscribe 訂閱事件,topic表明主題,h事件處理函數
Publish 發佈事件
在上述涉及處處理函數handler,具體含義以下
type Handler func(Event) error // Event is given to a subscription handler for processing type Event interface {
Topic() string
Message() *Message
Ack() error
}
type Message struct {
Header map[string]string
Body []byte
}
複製代碼
舉例以下
// 創建鏈接
b := grpc.NewBroker()
b.Init()
b.Connect()
// 訂閱事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()
// 發佈事件
b.Publish("events", &broker.Message{
Headers: map[string]string{"type": "event"},
Body: []byte(`an event`),
})
複製代碼
舉例以下
micro publish "go.micro.web.config" "hello"
複製代碼
下載代碼broker.zip
解壓到techidea8.com/microapp/broker
下運行,效果圖忑
micro publish go.micro.srv.broker "{\"say\":\"這是測試消息\"}"
複製代碼
關注公衆號betaidea
回覆micro-broker
便可得到