golang微服務框架go-micro 入門筆記2.3 micro工具之消息訂閱和發佈

本章節闡述micro消息訂閱和發佈相關內容git

閱讀本文前你可能須要進行以下知識儲備

broker代理

微服務之間須要經過broker來傳遞消息,go-micro支持http/nats/memory三種broker,其中http是默認的broker。github

同時,go-micro以強大的插件形式,提供以下幾種常見的broker。golang

 $ls

gocloud/  googlepubsub/  grpc/  kafka/  mqtt/  nats/  nsq/  proxy/  rabbitmq/  redis/  snssqs/  sqs/  stan/  stomp/

複製代碼

http

HTTP Broker 是基於HTTP的異步broker,源代碼在github.com\micro\go-micro@v1.9.1\broker\broker.go中,默認DefaultBroker爲httpweb

var (
	DefaultBroker Broker = newHttpBroker()
)

複製代碼

httpbroker實際上就是一個結構體redis

type httpBroker struct {

	id      string  //微服務ID
	address string //主機地址
	opts    Options //一些配置
	mux *http.ServeMux  //經過這個監聽其餘端發送的http請求
	c *http.Client  //經過這個發送請求到其餘端
	r registry.Registry 
	sync.RWMutex
	subscribers map[string][]*httpSubscriber  //訂閱
	running     bool
	exit        chan chan error
	// offline message inbox
	mtx   sync.RWMutex 
	inbox map[string][][]byte   //數據緩存

}

複製代碼

經過http.Client發送請求,經過http.ServeMux實現請求監聽,經過inbox存儲數據shell

redis

redis初始化代碼以下json

//main.go

//初始化URL格式redis://密碼@主機:端口/

b := redis.NewBroker(
		broker.Addrs("redis://user:secret@localhost:6379/"),
		)

//初始化

b.Init()

//鏈接

b.Connect()

// 新建service

service := grpc.NewService(
		micro.Name("go.micro.web.config"),
		micro.Version("latest"),
		micro.Broker(b),

	)

//初始化service
service.Init()
//啓動,運行,監聽
service.Run()

複製代碼

啓動應用程序須要指定broker爲redisapi

go run main.go --broker=redis

複製代碼

grpc

初始化過程以下緩存

//main.go

import (
	"github.com/micro/go-plugins/broker/grpc"
)



// 創建鏈接

b := grpc.NewBroker()
b.Init()
b.Connect()

// 訂閱事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()

// 發佈事件
b.Publish("events", &broker.Message{

	Headers: map[string]string{"type": "event"},

	Body: []byte(`an event`),

})

複製代碼

啓動應用程序須要指定broker爲grpcbash

go run main.go --broker=grpc

複製代碼

rabbitmq

初始化過程以下

//main.go

import (

	"github.com/micro/go-plugins/broker/grpc"

)

b := rabbitmq.NewBroker(

		broker.Addrs("amqp://用戶名:密碼@主機host:端口port"),

	)



	b.Init()

	b.Connect()

複製代碼

啓動應用程序須要指定broker爲rabbitmq

go run main.go plugin.go --broker=rabbitmq

複製代碼

mqtt

初始化過程以下

//main.go

import (

	"github.com/micro/go-micro"

	"github.com/micro/go-plugins/broker/mqtt"

)

func main() {

	service := micro.NewService(

		micro.Name("my.service"),

		micro.Broker(mqtt.NewBroker()),

	)

	//...

}

複製代碼

啓動應用程序須要指定broker爲mqtt

go run main.go plugin.go --broker=mqtt

複製代碼

其餘

其餘能夠閱讀代碼

$GOPATH/src/github.com/micro/go-plugins/broker

複製代碼

消息訂閱和發佈

經過micro.RegisterSubscriber實現消息訂閱

消息訂閱主要API接口以下,第一個參數標識消息主題,第二個參數表示服務實例。

// Register Struct as Subscriber

micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), new(subscriber.Testsrv))

// Register Function as Subscriber

micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), subscriber.Handler)

複製代碼

重點注意第三個參數,第三個參數是處理函數,能夠是函數,也能夠是實現了

func Handler(ctx context.Context, msg *testsrv.Message) error方法的結構體,micro內部會根據參數類型自動適配。結構體中能夠實現多個func Handler(ctx context.Context, msg *testsrv.Message) error類型方法

經過broker.Subscribe實現訂閱

Broker提供以下接口

type Broker interface {

	Init(...Option) error

	Options() Options

	Address() string

	Connect() error

	Disconnect() error

	Publish(topic string, m *Message, opts ...PublishOption) error

	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

	String() string

}

複製代碼
  • Subscribe 訂閱事件,topic表明主題,h事件處理函數

  • Publish 發佈事件

消息處理函數Handler 定義

在上述涉及處處理函數handler,具體含義以下

type Handler func(Event) error // Event is given to a subscription handler for processing type Event interface {

	Topic() string

	Message() *Message

	Ack() error

}



type Message struct {

	Header map[string]string

	Body   []byte

}

複製代碼

經過broker.Publish實現發佈

舉例以下

// 創建鏈接

b := grpc.NewBroker()

b.Init()

b.Connect()
// 訂閱事件

sub, _ := b.Subscribe("events")

defer sub.Unsubscribe()

// 發佈事件

b.Publish("events", &broker.Message{

	Headers: map[string]string{"type": "event"},

	Body: []byte(`an event`),

})

複製代碼

經過micro publish實現發佈

舉例以下

micro publish "go.micro.web.config" "hello"

複製代碼

實戰和代碼

效果

下載代碼broker.zip 解壓到techidea8.com/microapp/broker下運行,效果圖忑

效果

  • 發佈消息須要注意json格式字符串
micro publish go.micro.srv.broker "{\"say\":\"這是測試消息\"}"
複製代碼

得到代碼

關注公衆號betaidea回覆micro-broker便可得到

公衆號betaidea

推薦閱讀

掃微信二維碼實現網站登錄提供體驗地址和源代碼

開源項目golang go語言後臺管理框架restgo-admin

支持手勢觸摸,可左右滑動的日曆插件

你必須知道的18個互聯網業務模型

相關文章
相關標籤/搜索