RocketMQ入門手冊

前言

繼我上一篇博客後 分佈式消息隊列RocketMQ學習教程① 上一篇博客最主要介紹了幾種經常使用的MQ,因此本博客再簡單介紹一下RocketMQ的原理和簡單的例子,基於Java實現,但願能夠幫助學習者html

RoketMQ搭建Linux版

「工於利其事,必先利其器」,因此咱們首先須要搭建好RocketMQ, 考慮到學習者不必定有Linux系統的服務器,因此本博客介紹一下Linux和Window系統的兩種安裝方法,以補充上一篇博客java

由於阿里已經將RocketMQ捐給Apache了,因此如今咱們須要去Apache官網下載 RocketMQ官網linux

注意RocketMQ是基於Java開發的,因此安裝前必須安裝JDK,Linux JDK安裝的能夠看分佈式消息隊列RocketMQ學習教程① 下載文件解壓後,能夠看到conf文件夾裏有2m-noslave、2m-2s-async、2m-2s-sync文件夾git

2m-noslave 兩主,無從的配置github

2m-2s-async 兩主,兩從,同步複製數據的配置redis

2m-2s-sync 兩主,兩從,異步複製數據的配置apache

咱們找到2m-noslave的broker-a.properties文件,修改完善配置 broker-a.properties編程

#所屬集羣名字  
brokerClusterName=DefaultCluster
#broker名字,注意此處不一樣的配置文件填寫的不同
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=127.0.0.1:9876

#關鍵
brokerIP1=127.0.0.1

#在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
defaultTopicQueueNums=4
#是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉
autoCreateTopicEnable=true
#是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認凌晨 4點
deleteWhen=04
#文件保留時間,默認 48 小時
fileReservedTime=48
#commitLog每一個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每一個文件默認存30W條,根據業務狀況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88

#這裏是個人 日誌配置
#存儲路徑
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存儲路徑
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消費隊列存儲路徑存儲路徑
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存儲路徑
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存儲路徑
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存儲路徑
abortFile=/usr/local/rocketmq/store/abort


#限制的消息大小  
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 異步複製Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
#- ASYNC_FLUSH 異步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128

先介紹一下linux系統的 通常將壓縮文件解壓到/usr/localvim

cd /usr/local

tar -xzf apache-rocketmq.tar.gz

mv apache-rocketmq rocketmq

mkdir /usr/rocketmq/logs

環境變量配置api

vim /etc/profile

修改以下配置

export JAVA_HOME=/usr/java/jdk1.8.0_102
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$JAVA_HOME/bin:/usr/local/src/redis-3.2.8/bin:$ROCKETMQ_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib

啓動mqnamesrv

cd /usr/local/rocketmq/bin
nohup sh /usr/local/rocketmq/bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &

啓動Broker

nohup sh /usr/local/rocketmq/bin/mqbroker  -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties > /usr/local/rocketmq/logs/mqbroker.log  2>&1 &

要設置自動建立Topic,須要加上 autoCreateTopicEnable=true

關閉Broker服務 sh mqshutdown broker

啓動成功能夠用jps查看 這裏寫圖片描述

RocketMQ搭建Window版

一、下載RocketMQ後,解壓到D:\alibaba-rocketmq

二、在D:\alibaba-rocketmq,Ctrl+Shift,右鍵,打開dom界面,輸入以下命令行 start /b bin/mqnamesrv.exe >D:\alibaba-rocketmq\logs\mqnamesrv.log 查看nameserver是否啓動 jps -v

三、啓動Broker

start /b bin/mqbroker.exe -n "127.0.0.1:9876" autoCreateTopicEnable=true >D:\alibaba-rocketmq\logs\mqbroker.log
Caused by: com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, huang_1
See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details.
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:525) ~[rocketmq-client-3.5.3.jar:na]
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1011) ~[rocketmq-client-3.5.3.jar:na]
	at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:970) ~[rocketmq-client-3.5.3.jar:na]
	at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90) ~[rocketmq-client-3.5.3.jar:na]
	at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107) ~[ons-client-1.2.3.jar:na]

出現以上異常啓動時添加autoCreateTopicEnable=true

四、查看topic命令:mqadmin topicList -n "127.0.0.1:9876"

cd 到bin目錄,執行下面命令 mqadmin updateTopic -t test_1 -b "127.0.0.1:10911" -n "127.0.0.1:9876" 添加以下參數到eclipse啓動工程的VM參數裏 -Drocketmq.namesrv.addr=127.0.0.1:9876

RocketMq監控平臺搭建

須要去github下載,下載連接 rocketmq-console

下載後在rocketmq-console文件夾裏,ctrl+shift,右鍵,在此處打開命令窗口,打開cmd窗口,主要要先搭建好maven環境

mvn clean package -Dmaven.test.skip=true

打包完成以後,咱們去target文件夾找到rocketmq-console-ng-1.0.0.jar 而後

mkdir rocketmq-console
cd /usr/local/rocketmq-console

使用xftp上傳rocketmq-console-ng-1.0.0.jar到/usr/local/rocketmq-console

nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=127.0.0.1:9876 >/usr/local/rocketmq-console/run.log 2>&1 &

端口檢查

netstat -anp|grep 12581

部署成功,打開http://服務器IP:12581

這裏寫圖片描述

編程實現MQ實例

maven加入配置

<dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.0.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.0.10</version>
            <type>pom</type>
        </dependency>

消息隊列消費者消費消息實例

package com.mq.test;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "mq-group");

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("RocketMQ Consumer Started...");
    }
}

消息隊列生產者產生消息實例

package com.mq.test;

import java.util.Date;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class MQProducer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("mq-group");
//        producer.setNamesrvAddr("123.207.63.192:9876");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000);  //MQ每隔一秒發送一條消息
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        ("RocketMQ message"+i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);//發送消息
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();//關閉消息生產者
    }
}

下面是來自github wiki的學習例子

Filter網絡架構,以CPU資源換取寶貴的網卡流量資源

screenshot

啓動Broker時,增長如下配置,能夠自動加載Filter Server進程

filterServerNums=1

Filter樣本(Consumer僅負責將代碼上傳到Filter Server,由Filter Server編譯後執行)

package com.alibaba.rocketmq.example.filter;

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;


public class MessageFilterImpl implements MessageFilter {

    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }

        return false;
    }
}

Consumer例子

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
        
        String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
            filterCode);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        <br>
        consumer.start();

        System.out.println("Consumer Started.");
    }

附錄

RocketMQ原理與安裝教程

RocketMQ實例

阿里RocketMQ Quick Start

RocketMQ集羣安裝

rocketMq監控平臺rocketmq-console搭建

相關文章
相關標籤/搜索