【學習】025 RocketMQ

RocketMQ概述

RocketMQ 是一款分佈式、隊列模型的消息中間件,具備如下特色: 可以保證嚴格的消息順序 提供豐富的消息拉取模式 高效的訂閱者水平擴展能力 實時的消息訂閱機制 億級消息堆積能力java

RocketMQ包含的組件

NameServer:單點,供Producer和Consumer獲取Broker地址web

Producer:產生併發送消息apache

Consumer:接受並消費消息vim

Broker:消息暫存,消息轉發tomcat

Name Server

Name Server是RocketMQ的尋址服務。用於把Broker的路由信息作聚合。客戶端依靠Name Server決定去獲取對應topic的路由信息,從而決定對哪些Broker作鏈接。服務器

Name Server是一個幾乎無狀態的結點,Name Server之間採起share-nothing的設計,互不通訊。併發

對於一個Name Server集羣列表,客戶端鏈接Name Server的時候,只會選擇隨機鏈接一個結點,以作到負載均衡。app

Name Server全部狀態都從Broker上報而來,自己不存儲任何狀態,全部數據均在內存。負載均衡

若是中途全部Name Server全都掛了,影響到路由信息的更新,不會影響和Broker的通訊。webapp

Broker

Broker是處理消息存儲,轉發等處理的服務器。

Broker以group分開,每一個group只容許一個master,若干個slave。

只有master才能進行寫入操做,slave不容許。

slave從master中同步數據。同步策略取決於master的配置,能夠採用同步雙寫,異步複製兩種。

客戶端消費能夠從master和slave消費。在默認狀況下,消費者都從master消費,在master掛後,客戶端因爲從Name Server中感知到Broker掛機,就會從slave消費。

Broker向全部的NameServer結點創建長鏈接,註冊Topic信息。

RocketMQ優勢

1.強調集羣無單點,可擴展

2.任意一點高可用,水平可擴展

3.海量消息堆積能力,消息堆積後,寫入低延遲。

4.支持上萬個隊列

5.消息失敗重試機制

6.消息可查詢

7.開源社區活躍

8.成熟度(通過雙十一考驗)

RocketMQ環境安裝

服務器配置

192.168.110.187 nameServer1,brokerServer1

192.168.110.188 nameServer2,brokerServer2

添加Host文件

vi /etc/hosts

192.168.110.187 rocketmq-nameserver1

192.168.110.187 rocketmq-master1

192.168.110.188 rocketmq-nameserver2

192.168.110.188 rocketmq-master2

service network restart

注意: Error:No suitable device found: no device found for connection "System eth0"

解決辦法:

(1)ifconfig -a 查看物理 MAC HWADDR 的值

(2)vim 編輯文件 /etc/sysconfig/network-scripts/ifcfg-eth0中修改ifconfig中查出的MAC HWADDR值;

上傳安裝包

# 上傳alibaba-rocketmq-3.2.6.tar.gz文件至/usr/localtar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/localmv alibaba-rocketmq alibaba-rocketmq-3.2.6ln -s alibaba-rocketmq-3.2.6 rocketmq

建立存儲路徑【兩臺機器】

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

RocketMQ配置文件【兩臺機器】

vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties

vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

修改日誌配置文件【兩臺機器】

mkdir -p /usr/local/rocketmq/logs

cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改啓動NameServer【兩臺機器】

vim /usr/local/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"
vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -

XX:PermSize=128m -XX:MaxPermSize=320m"

啓動NameServer【兩臺機器】

cd /usr/local/rocketmq/binnohup sh mqnamesrv & 

啓動BrokerServer A

cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

啓動BrokerServer B

cd /usr/local/rocketmq/bin
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

RocketMQ Console

rocketmq-web-console 部署到webapps目錄中。

/usr/local/apache-tomcat-7.0.65/webapps/rocketmq-web-console/WEB-INF/classes/

修改config.properties

rocketmq.namesrv.addr=192.168.110.195:9876;192.168.110.199:9876

運行效果

安裝jdk環境

vi /etc/profile

export JAVA_HOME=/usr/local/jdk1.7.0_80

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$JAVA_HOME/bin:$PATH

source /etc/profile

Java操做RocketMQ

pom文件依賴

    <dependencies>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.0.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.0.10</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

生產者

package com.hongmoshui;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer
{
    public static void main(String[] args) throws MQClientException
    {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
        producer.setInstanceName("producer");
        producer.start();
        try
        {
            for (int i = 0; i < 10; i++)
            {
                // 每秒發送一次MQ
                Thread.sleep(1000);
                // topic:主題名稱,tag:臨時值,body:內容
                Message msg = new Message("hongmoshui-topic", "TagA", ("hongmoshui-" + i).getBytes());
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult.toString());
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

消費者

package com.hongmoshui;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer
{
    public static void main(String[] args) throws MQClientException
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

        consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("hongmoshui-topic", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
            {
                for (MessageExt msg : msgs)
                {
                    System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

RocketMQ重試機制

MQ 消費者的消費邏輯失敗時,能夠經過設置返回狀態達到消息重試的結果。

MQ 消息重試只針對集羣消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗消息再也不重試,繼續消費新的消息。

package com.hongmoshui.test2;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer
{
    public static void main(String[] args) throws MQClientException
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

        consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("hongmoshui-topic", "TagA");
        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
            {
                for (MessageExt msg : msgs)
                {
                    System.out.println(msg.getMsgId() + "---" + new String(msg.getBody()));
                }
                try
                {
                    int i = 1 / 0;
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                    // 須要重試
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                // 不須要重試
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

注意:每次重試後,消息ID都不一致,因此不能使用消息ID判斷冪等。

RocketMQ如何解決消息冪等

注意:每次重試後,消息ID都不一致,因此不能使用消息ID判斷冪等。

解決辦法:使用自定義全局ID判斷冪等,例如流水ID、訂單號

使用msg.setKeys 進行區分

生產者:

package com.hongmoshui.test3;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer
{
    public static void main(String[] args) throws MQClientException
    {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
        producer.setInstanceName("producer");
        producer.start();
        try
        {
            for (int i = 0; i < 1; i++)
            {
                // 每秒發送一次MQ
                Thread.sleep(1000);
                // topic:主題名稱,tag:臨時值,body內容
                Message msg = new Message("hongmoshui-topic", "TagA", ("hongmoshui-" + i).getBytes());
                msg.setKeys(System.currentTimeMillis() + "");
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult.toString());
            }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        producer.shutdown();
    }

}

消費者:

package com.hongmoshui.test3;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer
{
    static private Map<String, String> logMap = new HashMap<String, String>();

    public static void main(String[] args) throws MQClientException
    {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");

        consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("hongmoshui-topic", "TagA");
        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
            {
                String key = null;
                String msgId = null;
                try
                {
                    for (MessageExt msg : msgs)
                    {
                        key = msg.getKeys();
                        if (logMap.containsKey(key))
                        {
                            // 無需繼續重試。
                            System.out.println("key:" + key + ",無需重試...");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        msgId = msg.getMsgId();
                        System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody()));
                        int i = 1 / 0;
                    }
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                finally
                {
                    logMap.put(key, msgId);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

}
相關文章
相關標籤/搜索