單機多rocketmq實現

官方方式部署:
準備工做:
1.準備好依賴環境
64bit os,Linux/unix/mac
64bit jdk 1.7+
maven 3.2.x+
git
保證主機名在hosts中有對應的解析java


yum install -y git
yum install -y jdk*
cat > /etc/profile.d/java.sh << EOF
export JAVA_HOME=/usr/java/latest
export PATH=\${JAVA_HOME}/bin:\$PATH
EOF
tar xf apache-maven-*.tar.gz -C /usr/local/
ln -sv /usr/local/apache-maven-* /usr/local/maven
cat > /etc/profile.d/maven.sh << EOF
export M2_HOME=/usr/local/maven
export PATH=\${M2_HOME}/bin:\${PATH}
EOF
source /etc/profile.d/java.sh
source /etc/profile.d/maven.sh
是hostname值在/etc/hosts中有對應的解析git

2.clone&build
git clone https://github.com/apache/incubator-rocketmq.git
cd incubator-rocketmqgithub

拷貝兩份apache

因爲rocketmq啓動時,開啓了多個端口以下:vim

tcp        0      0 0.0.0.0:10909               0.0.0.0:*                   LISTEN      2042/java          
tcp        0      0 0.0.0.0:10911               0.0.0.0:*                   LISTEN      2042/java          
tcp        0      0 0.0.0.0:10912               0.0.0.0:*                   LISTEN      2042/java          
tcp        0      0 0.0.0.0:9876                0.0.0.0:*                   LISTEN      2011/javaapp

能夠規劃一下,定義好另一個rocketmq使用的端口
另一個rocketmq端口爲
9870             9876
10906            10909
10915            10911
10916            10912jvm

經過遍歷代碼中對應的端口,修改成指望的最終端口:maven

修改端口衝突
[root@test1 incubator-rocketmq]# grep --color -nR "10911" ./
./filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvConfig.java:32:    private String connectWhichBroker = "127.0.0.1:10911";
./common/src/test/java/org/apache/rocketmq/common/MixAllTest.java:41:        assertThat(MixAll.brokerVIPChannel(true, "127.0.0.1:10911")).isEqualTo("127.0.0.1:10909");
./namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java:47:        routeInfoManager.unregisterBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234);
./namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java:76:        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001",
./namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java:85:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java:272:        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001",
./broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java:98:            nettyServerConfig.setListenPort(10911);
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:125:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:238:        Properties result = defaultMQAdminExt.getBrokerConfig("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:253:        KVTable brokerStats = defaultMQAdminExt.fetchBrokerRuntimeStats("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:349:        boolean clean = defaultMQAdminExt.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:376:        Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExt.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:396:        result.setBrokerAddr("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:397:        when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:398:        ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:399:        assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java:404:        SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
./tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java:121:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java:147:        connection.setClientAddr("127.0.0.1:109111");
./tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java:71:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java:95:        assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java:32:        String[] subargs = new String[] {"-t unit-test", "-i 127.0.0.1:10911"};
./tools/src/test/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommandTest.java:36:        assertThat(commandLine.getOptionValue("i").trim()).isEqualTo("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java:32:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-t unit-test", "-p 6"};
./tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommandTest.java:35:        assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java:33:            "-b 127.0.0.1:10911",
./tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java:44:        assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java:82:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java:84:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java:78:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java:74:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-k topicname", "-v unit_test"};
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommandTest.java:73:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-s 1024 -c 10"};
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java:72:        consumeStatsList.setBrokerAddr("127.0l.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommadTest.java:86:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-t 3000", "-l 5", "-o true"};
./tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java:78:        String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
./tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java:76:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java:88:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java:76:        brokerAddrs.put(1234l, "127.0.0.1:10911");
./client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java:144:        doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
./client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumerTest.java:78:        when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
./client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java:202:        brokerAddrs.put(0L, "127.0.0.1:10911");
./client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java:56:        brokerAddrs.put(0L, "127.0.0.1:10911");
./bin/README.md:26:    sh mqadmin updateTopic -b 127.0.0.1:10911 -t TopicA
./bin/README.md:29:    sh mqadmin updateSubGroup -b 127.0.0.1:10911 -g SubGroupAtcp

開啓的每一個端口都要修改,這裏只列舉其中一個測試

 

mvn -Prelease-all -DskipTests clean install –U  #多個實例多要執行

3.調整jvm參數,注意內存可能須要調整根據本身的機器來自行修改
vim bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"


4.設置環境變量   注意路徑須要改了
cat > /etc/profile.d/rocketmq.sh << EOF
export ROCKETMQ_HOME=/opt/incubator-rocketmq/target/apache-rocketmq-broker/apache-rocketmq
export PATH=\${ROCKETMQ_HOME}/bin:\${PATH}
EOF
source /etc/profile.d/rocketmq.sh
echo "export NAMESRV_ADDR='IP:PROT;....'" >> /etc/profile
source /etc/profile

5.啓動服務
chmod +x bin/{mqadmin,mqnamesrv,mqshutdown,mqbroker,tools.sh}
nohup mqnamesrv &
nohup mqbroker -c conf/2m-noslava/broker-a.properties > /dev/null 2>&1 &

6.發送和接收消息測試
tools.sh org.apache.rocketmq.example.quickstart.Producer
tools.sh org.apache.rocketmq.example.quickstart.Consumer

7.中止服務
mqshutdown broker
mqshutdown namesrv

 

另一個實例一樣也能夠作一致的操做

相關文章
相關標籤/搜索