RocketMQ從部署到應用(Golang)

RocketMQ

消息隊列做爲高併發系統的組件之一,可以幫助業務系統解構提升開發效率和系統穩定性。java

優點:git

  • 削峯填谷:解決瞬時寫壓力致使的消息丟失、系統崩潰等問題
  • 系統解耦:處理不一樣重要程度和不一樣能力級別系統之間的消息
  • 提高性能:當存在一對多調用是,能夠發一條消息給消息系統,讓消息系統通知相關係統
  • 蓄流壓測:能夠堆積必定的消息量來壓測

安裝RocketMQ

官方地址github

# git clone https://github.com/apache/rocketmq-docker.git
# cd rocketmq-docker/
# ls
CONTRIBUTING.md  image-build  LICENSE  NOTICE  product  README.md  stage.sh  templates
# cd image-build/
# ls
build-image.sh  Dockerfile-alpine  Dockerfile-centos  scripts  update.sh

建立RocketMQ鏡像

sh build-image.sh RMQ-VERSION BASE-IMAGEgolang

RMQ-VERSIONdocker

BASE-IMAGE支持centos,alpine兩種方式apache

咱們使用centos

sh build-image.sh 4.7.1 alpinebash

構建時間有點長,須要耐心等待。併發

當構建完成以後會提示app

Successfully built 128108c2e50d
Successfully tagged apacherocketmq/rocketmq:4.7.1-alpine

那麼咱們就能查詢到鏡像

# docker images |grep mq
apacherocketmq/rocketmq  4.7.1-alpine   128108c2e50d     4   9 seconds ago      145MB

生成配置

# cd ..
# ls
CONTRIBUTING.md  image-build  LICENSE  NOTICE  product  README.md  stage.sh  templates
# sh stage.sh 4.7.1 (這裏的4.7.1對應以前的鏡像版本)
Stage version = 4.7.1
mkdir /root/rocketmq/rocketmq-docker/stages/4.7.1
staged templates into folder /root/rocketmq/rocketmq-docker/stages/4.7.1
# ls
CONTRIBUTING.md  image-build  LICENSE  NOTICE  product  README.md  stages  stage.sh  templates

生成了stages目錄,裏面存放配置模板文件

# cd stages/
# ls
4.7.1
# cd 4.7.1/
# ls
templates
# cd templates/
# ls
data            kubernetes        play-docker-compose.sh  play-docker.sh      play-kubernetes.sh  ssl
docker-compose  play-consumer.sh  play-docker-dledger.sh  play-docker-tls.sh  play-producer.sh

一、單機

./play-docker.sh alpine

二、docker-compose

./play-docker-compose.sh

三、kubernetes集羣

./play-kubernetes.sh

四、Cluster of Dledger storage(RocketMQ須要4.4.0版本以上)

./play-docker-dledger.sh

五、TLS

./play-docker-tls.sh
./play-producer.sh
./play-consumer.sh

我這裏選擇的是單機部署,能夠看到生成了兩個容器

# docker ps |grep mq
5b557ea1e6be        apacherocketmq/rocketmq:4.7.1-alpine                         "sh mqbroker"            25 seconds ago                                                                                      Up 24 seconds       0.0.0.0:10909->10909/tcp, 9876/tcp, 0.0.0.0:10911-10912->10911-10912/tcp   rmqbroker
8b1318aee5d6        apacherocketmq/rocketmq:4.7.1-alpine                         "sh mqnamesrv"           26 seconds ago                                                                                      Up 25 seconds       10909/tcp, 0.0.0.0:9876->9876/tcp, 10911-10912/tcp                         rmqnamesrv

驗證RocketMQ啓動成功

一、使用命令 docker ps|grep rmqbroker 找到RocketMQ broker的容器id

二、使用命令 docker exec -it 5b557ea1e6be ./mqadmin clusterList -n {nameserver_ip}:9876 驗證RocketMQ broker工做正常

# docker exec -it 5b557ea1e6be ./mqadmin clusterList -n {nameserver_ip}:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
DefaultCluster    5b557ea1e6be            0     172.17.0.8:10911       V4_7_1                   0.00(0,0ms)         0.00(0,0ms)          0 447225.46 -1.0000

升級

cd image-build
./update.sh

安裝GUI

# docker pull apacherocketmq/rocketmq-console:2.0.0
# docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.150.70:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 6881:8080 -t apacherocketmq/rocketmq-console:2.0.0

golang client使用問題

因爲使用的docker服務啓動,broker的地址是內網地址,須要將地址修改成外網地址

# docker ps |grep mq
8abb966542a3        apacherocketmq/rocketmq-console:2.0.0                        "sh -c 'java $JAVA..."   17 hours ago        Up 17 hours         0.0.0.0:6881->8080/tcp                                                     dazzling_tesla
5b557ea1e6be        apacherocketmq/rocketmq:4.7.1-alpine                         "sh mqbroker"            18 hours ago        Up 18 hours         0.0.0.0:10909->10909/tcp, 9876/tcp, 0.0.0.0:10911-10912->10911-10912/tcp   rmqbroker
8b1318aee5d6        apacherocketmq/rocketmq:4.7.1-alpine                         "sh mqnamesrv"           18 hours ago        Up 18 hours         10909/tcp, 0.0.0.0:9876->9876/tcp, 10911-10912/tcp                         rmqnamesrv
# docker exec -it 5b557ea1e6be bash // 進入到容器內部修改配置
# vi ../confbroker.conf

在文件中添加 brokerIP1=xxx.xxx.xxx.xxx

而後重啓broker, docker restart 5b557ea1e6be

這裏須要去修改啓動腳本 ./play-docker.sh 裏的start_namesrv_broker() 函數中的docker啓動命令,在mybroker後面添加-c ../conf/broker.conf

# Start Broker
    docker run -d -v `pwd`/data/broker/logs:/home/rocketmq/logs -v `pwd`/data/broker/store:/home/rocketmq/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -p 10909:10909 -p 10911:10911 -p 10912:10912 apacherocketmq/rocketmq:4.7.1${TAG_SUFFIX} sh mqbroker -c ../conf/broker.conf

這樣查看cluster會發現Address變成外網地址。

client-go Topic

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2/admin"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	topic := "Develop"
	nameSrvAddr := []string{"192.168.150.70:9876"}
	brokerAddr := "192.168.150.70:10911"

	testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
	if err != nil {
		panic(err)
	}

	// 建立topic
	err = testAdmin.CreateTopic(
		context.Background(),
		admin.WithTopicCreate(topic),
		admin.WithBrokerAddrCreate(brokerAddr))
	if err != nil {
		fmt.Println("Create topic error:", err)
	}

	// 刪除topic
	err = testAdmin.DeleteTopic(
		context.Background(),
		admin.WithTopicDelete(topic),
		//admin.WithBrokerAddrDelete(brokerAddr), 
		//admin.WithNameSrvAddr(nameSrvAddr),
	)

	err = testAdmin.Close()
	if err != nil {
		fmt.Println("Shutdown admin error:", err)
	}
}

client-go 生產者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"strconv"
)

func main() {
	addr,err := primitive.NewNamesrvAddr("192.168.150.70:9876")
	if err != nil {
		panic(err)
	}
	topic := "Develop"
	p,err := rocketmq.NewProducer(
		producer.WithGroupName("my_service"),
		producer.WithNameServer(addr),
		producer.WithCreateTopicKey(topic),
		producer.WithRetry(1))
	if err != nil {
		panic(err)
	}

	err = p.Start()
	if err != nil {
		panic(err)
	}

	// 發送異步消息
	res,err := p.SendSync(context.Background(),primitive.NewMessage(topic,[]byte("send sync message")))
	if err != nil {
		fmt.Printf("send sync message error:%s\n",err)
	} else {
		fmt.Printf("send sync message success. result=%s\n",res.String())
	}

	// 發送消息後回調
	err = p.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) {
		if err != nil {
			fmt.Printf("receive message error:%v\n",err)
		} else {
			fmt.Printf("send message success. result=%s\n",result.String())
		}
	},primitive.NewMessage(topic,[]byte("send async message")))
	if err != nil {
		fmt.Printf("send async message error:%s\n",err)
	}

	// 批量發送消息
	var msgs []*primitive.Message
	for i := 0; i < 5; i++ {
		msgs = append(msgs, primitive.NewMessage(topic,[]byte("batch send message. num:"+strconv.Itoa(i))))
	}
	res,err = p.SendSync(context.Background(),msgs...)
	if err != nil {
		fmt.Printf("batch send sync message error:%s\n",err)
	} else {
		fmt.Printf("batch send sync message success. result=%s\n",res.String())
	}

	// 發送延遲消息
	msg := primitive.NewMessage(topic,[]byte("delay send message"))
	msg.WithDelayTimeLevel(3)
	res,err = p.SendSync(context.Background(),msg)
	if err != nil {
		fmt.Printf("delay send sync message error:%s\n",err)
	} else {
		fmt.Printf("delay send sync message success. result=%s\n",res.String())
	}

	// 發送帶有tag的消息
	msg1 := primitive.NewMessage(topic,[]byte("send tag message"))
	msg1.WithTag("tagA")
	res,err = p.SendSync(context.Background(),msg1)
	if err != nil {
		fmt.Printf("send tag sync message error:%s\n",err)
	} else {
		fmt.Printf("send tag sync message success. result=%s\n",res.String())
	}

	err = p.Shutdown()
	if err != nil {
		panic(err)
	}
}

client-go 消費者

// 在v2.1.0-rc5.0不支持,會在下一個版本中支持
func PullConsumer() {
	topic := "Develop"

	// 消費者主動拉取消息
	// not
	c1,err := rocketmq.NewPullConsumer(
		consumer.WithGroupName("my_service"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.150.70:9876"})))
	if err != nil {
		panic(err)
	}
	err = c1.Start()
	if err != nil {
		fmt.Println(err)
		os.Exit(-1)
	}
	queue := primitive.MessageQueue{
		Topic:      topic,
		BrokerName: "broker-a", // 使用broker的名稱
		QueueId:    0,
	}

	err = c1.Shutdown()
	if err != nil {
		fmt.Println("Shutdown Pull Consumer error: ",err)
	}

	offset := int64(0)
	for  {
		resp,err := c1.PullFrom(context.Background(),queue,offset,10)
		if err != nil {
			if err == rocketmq.ErrRequestTimeout {
				fmt.Printf("timeout\n")
				time.Sleep(time.Second)
				continue
			}
			fmt.Printf("unexpected error: %v\n",err)
			return
		}
		if resp.Status == primitive.PullFound {
			fmt.Printf("pull message success. nextOffset: %d\n",resp.NextBeginOffset)
			for _, ext := range resp.GetMessageExts() {
				fmt.Printf("pull msg: %s\n",ext)
			}
		}
		offset = resp.NextBeginOffset
	}
}

func PushConsumer() {
	topic := "Develop"

	// 消息主動推送給消費者
	c2,err := rocketmq.NewPushConsumer(
		consumer.WithGroupName("my_service"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.150.70:9876"})),
        consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), // 選擇消費時間(首次/當前/根據時間)
        consumer.WithConsumerModel(consumer.BroadCasting)) // 消費模式(集羣消費:消費完其餘人不能再讀取/廣播消費:全部人都能讀)
	if err != nil {
		panic(err)
	}

	err = c2.Subscribe(
		topic,consumer.MessageSelector{
		Type: consumer.TAG,
		Expression: "*", // 能夠 TagA || TagB
		},
	func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		orderlyCtx,_ := primitive.GetOrderlyCtx(ctx)
		fmt.Printf("orderly context: %v\n",orderlyCtx)
		for i := range msgs {
			fmt.Printf("Subscribe callback: %v\n",msgs[i])
		}
		return consumer.ConsumeSuccess,nil
	})
	if err != nil {
		fmt.Printf("Subscribe error:%s\n",err)
	}

	err = c2.Start()
	if err != nil {
		fmt.Println(err)
		os.Exit(-1)
	}
	time.Sleep(time.Minute)
	err = c2.Shutdown()
	if err != nil {
		fmt.Println("Shutdown Consumer error: ",err)
	}
}
相關文章
相關標籤/搜索