Message Queue,消息隊列,FIFO 結構。java
例如電商平臺,在用戶支付訂單後執行對應的操做;git
優勢:github
缺點spring
Java Message Service,Java消息服務,相似 JDBC 提供了訪問數據庫的標準,JMS 也制定了一套系統間消息通訊的規範;數據庫
區別於 JDBC,JDK 原生包中並未定義 JMS 相關接口。
協做方式圖示爲;apache
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
單機吞吐量 | 萬級 | 萬級 | 10 萬級 | 10 萬級 |
可用性 | 高 | 高 | 很是高 | 很是高 |
可靠性 | 較低機率丟失消息 | 基本不丟 | 能夠作到 0 丟失 | 能夠作到 0 丟失 |
功能支持 | 較爲完善 | 基於 erlang,併發強,性能好,延時低 | 分佈式,拓展性好,支持分佈式事務 | 較爲簡單,主要應用與大數據實時計算,日誌採集等 |
社區活躍度 | 低 | 中 | 高 | 高 |
做爲 Apache 下的開源項目,徹底支持 JMS 規範。而且 Spring Boot 內置了 ActiveMQ 的自動化配置,做爲入門再適合不過。segmentfault
添加依賴;服務器
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
消息發送;session
// 1. 建立鏈接工廠 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 2. 工廠建立鏈接 Connection connection = factory.createConnection(); // 3. 啓動鏈接 connection.start(); // 4. 建立鏈接會話session,第一個參數爲是否在事務中處理,第二個參數爲應答模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 根據session建立消息隊列目的地 Destination queue = session.createQueue("test-queue"); // 6. 根據session和目的地queue建立生產者 MessageProducer producer = session.createProducer(queue); // 7. 根據session建立消息實體 Message message = session.createTextMessage("hello world!"); // 8. 經過生產者producer發送消息實體 producer.send(message); // 9. 關閉鏈接 connection.close();
自動注入參考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration
添加依賴;多線程
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
添加 yaml 配置;
spring: activemq: broker-url: tcp://localhost:61616 jms: #消息模式 true:廣播(Topic),false:隊列(Queue),默認時false pub-sub-domain: true
收發消息;
@Autowired private JmsTemplate jmsTemplate; // 接收消息 @JmsListener(destination = "test") public void receiveMsg(String msg) { System.out.println(msg); } // 發送消息 public void sendMsg(String destination, String msg) { jmsTemplate.convertAndSend(destination, msg); }
基於 zookeeper 實現主從架構,修改 activemq.xml 節點 persistenceAdapter 配置;
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/levelDB" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183" zkPath="/activemq/leveldb-stores" hostname="localhost" /> </persistenceAdapter>
broker 地址爲:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false
在高可用集羣節點 activemq.xml 添加節點 networkConnectors;
<networkConnectors> <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/> </networkConnectors>
更多詳細信息可參考: https://blog.csdn.net/haoyuya...
因爲發佈訂閱模式,全部訂閱者都會接收到消息,在生產環境,消費者集羣會產生消息重複消費問題。
ActiveMQ 提供 VirtualTopic 功能,解決多消費端接收同一條消息的問題。於生產者而言,VirtualTopic 就是一個 topic,對消費而言則是 queue。
在 activemq.xml 添加節點 destinationInterceptors;
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
生產者正常往 testTopic 中發送消息,訂閱者可修改訂閱主題爲相似 consumer.A.testTopic 這樣來消費。
更多詳細信息可參考: https://blog.csdn.net/java_co...
是一個隊列模型的消息中間件,具備高性能、高可靠、高實時、分佈式特色。
Name Server
名稱服務器,相似於 Zookeeper 註冊中心,提供 Broker 發現;
Broker
RocketMQ 的核心組件,絕大部分工做都在 Broker 中完成,接收請求,處理消費,消息持久化等;
Producer
消息生產方;
Consumer
消息消費方;
安裝後,依次啓動 nameserver 和 broker,能夠用 mqadmin 管理主題、集羣和 broker 等信息;
添加依賴;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
消息發送;
DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); Message msg = new Message( "producer-topic", "msg", "hello world".getBytes() ); //msg.setDelayTimeLevel(1); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); producer.shutdown();
delayLevel 從 1 開始默認依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
參考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。
消息接收;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("producer-topic", "msg"); consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876
添加依賴;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
添加 yaml 配置;
rocketmq: name-server: 127.0.0.1:9876 producer: group: producer
發送消息;
@Autowired private RocketMQTemplate mqTemplate; public void sendMessage(String topic, String tag, String message) { SendResult result = mqTemplate.syncSend(topic + ":" + tag, message); System.out.println(JSON.toJSONString(result)); }
接收消息;
@Component @RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test") public class MsgListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println(message); } }
RocketMQ 拓展包提供了管理控制檯;
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
產生緣由:
怎麼解決重複消費的問題,換句話怎麼保證消息消費的冪等性。
一般基於本地消息表的方案實現,消息處理過便再也不處理。
消息錯亂的緣由:
要保證消息的順序消費,有三個關鍵點:
參考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。
在分佈式系統中,一個事務由多個本地事務組成。這裏介紹一個基於 MQ 的分佈式事務解決方案。
經過 broker 的 HA 高可用,和定時回查 prepare 消息的狀態,來保證最終一致性。