RocketMq 完整部署

RocketMq 部署

環境

64bit OS, Linux/Unix/Mac is recommended;
64bit JDK 1.8+;
Maven 3.2.x;
Git;
4g+ free disk for Broker server

物理機部署

  • 下載最新的二進制文件 https://rocketmq.apache.org/docs/quick-start/
  • 解壓
unzip rocketmq-all-4.4.0-bin-release.zip 
cd rocketmq-all-4.4.0-bin-release/

自定義日誌目錄

# 在如下文件中替換默認的日誌路徑 ${user.home}/logs/rocketmqlogs/*
conf/logback_broker.xml
conf/logback_namesrv.xml
conf/logback_tools.xml

自定義參數和數據存放位置

conf 文件夾裏有三種配置git

  • 2m-noslave 兩主,無從的配置
  • 2m-2s-async 兩主,兩從,同步複製數據的配置
  • 2m-2s-sync 兩主,兩從,異步複製數據的配置
##以修改 conf/2m-noslave/broker-a.properties 爲例

#所屬集羣名字  
brokerClusterName=DefaultCluster
#broker名字,注意此處不一樣的配置文件填寫的不同
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=127.0.0.1:9876

#改爲服務器的本機ip,重要!
brokerIP1=127.0.0.1

#在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=48
#commitLog每一個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每一個文件默認存30W條,根據業務狀況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88

#自定義數據存儲路徑
#存儲路徑
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/rocketmq/store/abort


#限制的消息大小  
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複製Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128

服務啓動

啓動name server

> nohup sh bin/mqnamesrv &
  > tail -f logs/rocketmqlogs/namesrv.log
  The Name Server boot success...

啓動broker

> nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-noslave/broker-a.propertie &
 > tail -f logs/rocketmqlogs/broker.log 
  The broker[%s, 172.30.30.233:10911] boot success...

關停服務

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

嘗試發送消息

> export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

常見報錯

發消息報錯github

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:588)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1223)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1173)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214)
    at com.flying.demo.Producer.main(Producer.java:25)

緣由:沒有配置brokerIPdocker

部署 rockermq console

建議docker 部署
非docker部署參見官方文檔 https://github.com/apache/rocketmq-externals/tree/master/rocketmq-consoleapache

docker pull styletang/rocketmq-console-ng
docker run --name rocketmq-console -dit -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m" -p 8080:8080 styletang/rocketmq-console-ng bash
# 進入docker查看
docker exec -it rocketmq-console /bin/bash

部署完成,到對應的端口查看便可。能夠查看服務監控,歷史消息詳情,手動建立topic等。
bash

docker 部署

官方鏡像 rocketmqinc/rocketmq
部署步驟參見 http://www.justdojava.com/2019/08/26/rocketmq-creator/服務器

Java 示例

# 引入依賴
implementation 'org.apache.rocketmq:rocketmq-client:4.3.0'
package mq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {

    public static void main(String[] args) throws Exception {
        //建立一個消息生產者,並設置一個消息生產者組
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");

        //指定 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        //初始化 Producer,整個應用生命週期內只須要初始化一次
        producer.start();

        for (int i = 0; i < 5; i++) {
            //建立一條消息對象,指定其主題、標籤和消息內容,若是服務不支持自動建立topic的話,須要先手動建立topic,也能夠在rocketmq console裏操做
            Message msg = new Message(
                    "topic_test" /* 消息主題名 */,
                    "TagA" /* 消息標籤 */,
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
            );

            //發送消息並返回結果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        }

        // 一旦生產者實例再也不被使用則將其關閉,包括清理資源,關閉網絡鏈接等
        producer.shutdown();
    }

}
package mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

public class Consumer {

    public static void main(String[] args) throws Exception {
        //建立一個消息消費者,並設置一個消息消費者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test");
        //指定 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設置 Consumer 第一次啓動時從隊列頭部開始消費仍是隊列尾部開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱指定 Topic 下的全部消息
        consumer.subscribe("topic_test", "*");

        //註冊消息監聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                //默認 list 裏只有一條消息,能夠經過設置參數來批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 消費者對象在使用以前必需要調用 start 初始化
        consumer.start();
        System.out.println("消息消費者已啓動");
    }
}
相關文章
相關標籤/搜索