MQ 入門實踐

MQ

Message Queue,消息隊列,FIFO 結構。java

例如電商平臺,在用戶支付訂單後執行對應的操做;git

優勢:github

  • 異步
  • 削峯
  • 解耦

缺點spring

  • 增長系統複雜性
  • 數據一致性
  • 可用性

JMS

Java Message Service,Java消息服務,相似 JDBC 提供了訪問數據庫的標準,JMS 也制定了一套系統間消息通訊的規範;數據庫

區別於 JDBC,JDK 原生包中並未定義 JMS 相關接口。
  1. ConnectionFactory
  2. Connection
  3. Destination
  4. Session
  5. MessageConsumer
  6. MessageProducer
  7. Message

協做方式圖示爲;apache

業界產品

ActiveMQ RabbitMQ RocketMQ kafka
單機吞吐量 萬級 萬級 10 萬級 10 萬級
可用性 很是高 很是高
可靠性 較低機率丟失消息 基本不丟 能夠作到 0 丟失 能夠作到 0 丟失
功能支持 較爲完善 基於 erlang,併發強,性能好,延時低 分佈式,拓展性好,支持分佈式事務 較爲簡單,主要應用與大數據實時計算,日誌採集等
社區活躍度

ActiveMQ

做爲 Apache 下的開源項目,徹底支持 JMS 規範。而且 Spring Boot 內置了 ActiveMQ 的自動化配置,做爲入門再適合不過。segmentfault

快速開始

添加依賴;服務器

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

消息發送;session

// 1. 建立鏈接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 工廠建立鏈接
Connection connection = factory.createConnection();
// 3. 啓動鏈接
connection.start();
// 4. 建立鏈接會話session,第一個參數爲是否在事務中處理,第二個參數爲應答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 根據session建立消息隊列目的地
Destination queue = session.createQueue("test-queue");
// 6. 根據session和目的地queue建立生產者
MessageProducer producer = session.createProducer(queue);
// 7. 根據session建立消息實體
Message message = session.createTextMessage("hello world!");
// 8. 經過生產者producer發送消息實體
producer.send(message);
// 9. 關閉鏈接
connection.close();

Spring Boot 集成

自動注入參考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

添加依賴;多線程

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

添加 yaml 配置;

spring:
  activemq:
    broker-url: tcp://localhost:61616
  jms:
    #消息模式 true:廣播(Topic),false:隊列(Queue),默認時false
    pub-sub-domain: true

收發消息;

@Autowired
private JmsTemplate jmsTemplate;

// 接收消息
@JmsListener(destination = "test")
public void receiveMsg(String msg) {
    System.out.println(msg);
}

// 發送消息
public void sendMsg(String destination, String msg) {
    jmsTemplate.convertAndSend(destination, msg);
}

高可用

基於 zookeeper 實現主從架構,修改 activemq.xml 節點 persistenceAdapter 配置;

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/levelDB"
        replicas="3"
        bind="tcp://0.0.0.0:0"
        zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
        zkPath="/activemq/leveldb-stores"
        hostname="localhost"
    />
</persistenceAdapter>

broker 地址爲:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

負載均衡

在高可用集羣節點 activemq.xml 添加節點 networkConnectors;

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
</networkConnectors>
更多詳細信息可參考: https://blog.csdn.net/haoyuya...

集羣消費

因爲發佈訂閱模式,全部訂閱者都會接收到消息,在生產環境,消費者集羣會產生消息重複消費問題。

ActiveMQ 提供 VirtualTopic 功能,解決多消費端接收同一條消息的問題。於生產者而言,VirtualTopic 就是一個 topic,對消費而言則是 queue。

在 activemq.xml 添加節點 destinationInterceptors;

<destinationInterceptors> 
    <virtualDestinationInterceptor> 
        <virtualDestinations> 
            <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
        </virtualDestinations>
    </virtualDestinationInterceptor> 
</destinationInterceptors>

生產者正常往 testTopic 中發送消息,訂閱者可修改訂閱主題爲相似 consumer.A.testTopic 這樣來消費。

更多詳細信息可參考: https://blog.csdn.net/java_co...

RocketMQ

是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色。

架構圖示

  1. Name Server

    名稱服務器,相似於 Zookeeper 註冊中心,提供 Broker 發現;

  2. Broker

    RocketMQ 的核心組件,絕大部分工做都在 Broker 中完成,接收請求,處理消費,消息持久化等;

  3. Producer

    消息生產方;

  4. Consumer

    消息消費方;

快速開始

安裝後,依次啓動 nameserver 和 broker,能夠用 mqadmin 管理主題、集羣和 broker 等信息;

https://segmentfault.com/a/11...

添加依賴;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

消息發送;

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("producer");
producer.start();
Message msg = new Message(
    "producer-topic",
    "msg",
    "hello world".getBytes()
);
//msg.setDelayTimeLevel(1);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.toString());
producer.shutdown();

delayLevel 從 1 開始默認依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

參考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

消息接收;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("consumer");
consumer.subscribe("producer-topic", "msg");
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
    for (MessageExt msg : list) {
        System.out.println(new String(msg.getBody()));
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876

Spring Boot 集成

添加依賴;

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

添加 yaml 配置;

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer

發送消息;

@Autowired
private RocketMQTemplate mqTemplate;

public void sendMessage(String topic, String tag, String message) {
    SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
    System.out.println(JSON.toJSONString(result));
}

接收消息;

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
public class MsgListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }
}

Console 控制檯

RocketMQ 拓展包提供了管理控制檯;

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

重複消費

產生緣由:

  1. 生產者重複投遞;
  2. 消息隊列異常;
  3. 消費者異常消費;

怎麼解決重複消費的問題,換句話怎麼保證消息消費的冪等性

一般基於本地消息表的方案實現,消息處理過便再也不處理。

順序消息

消息錯亂的緣由:

  1. 一個消息隊列 queue,多個 consumer 消費;
  2. 一個 queue 對應一個 consumer,可是 consumer 多線程消費;

要保證消息的順序消費,有三個關鍵點:

  1. 消息順序發送
  2. 消息順序存儲
  3. 消息順序消費

參考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。

分佈式事務

在分佈式系統中,一個事務由多個本地事務組成。這裏介紹一個基於 MQ 的分佈式事務解決方案。

經過 broker 的 HA 高可用,和定時回查 prepare 消息的狀態,來保證最終一致性。

相關文章
相關標籤/搜索