時間:2017年07月22日星期六
說明:本文部份內容均來自慕課網。@慕課網:http://www.imooc.com
教學源碼:無
學習源碼:https://github.com/zccodere/s...java
Java消息中間件(入門篇)git
爲何須要使用消息中間件 消息中間件概述 JMS規範 JMS代碼演練
Java消息中間件(拓展篇)github
ActiveMQ集羣配置 消息中間件在大型系統中的最佳實踐 使用其它消息中間件
經過服務調用讓其它系統感知事件發生web
系統之間高耦合 程序執行效率低
經過消息中間件解耦服務調用spring
生活中的案例apache
微信公衆號 老師在黑板上寫字 電視機 等等
消息中間件帶來的好處vim
解耦:系統解耦 異步:異步執行 橫向擴展 安全可靠 順序保證
橫向擴展解釋安全
當登陸系統,須要不少用戶登陸。這些消息所有須要告知積分系統,去增長積分,而增長積分這個處理過程可能比較麻煩、比較耗時。這個時候,能夠啓動多臺積分系統,來同時消費這個消息中間件裏面的登陸消息,達到橫向擴展的做用。
什麼是中間件服務器
非底層操做系統軟件,非業務應用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統稱爲中間件
什麼是消息中間件微信
關注於數據的發送和接受,利用高效可靠的異步消息傳遞機制集成分佈式系統
示意圖
什麼是JMS
Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
什麼是AMQP
AMQP(Advanced Message Queuing Protocol)是一個提供統一消息服務的應用層標準高級消息隊列協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。
JMS和AMQP對比
ActiveMQ
ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ是一個徹底支持JMS1.1和J2EE1.4規範的JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今J2EE應用中間件仍然扮演者特殊的地位。
ActiveMQ特性
多種語言和協議編寫客戶端。 語言:Java、C、C++、C#、Ruby、Perl、Python、PHP 應用協議:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP 徹底支持JMS1.1和J2EE1.4規範(持久化、XA消息、事務) 虛擬主題、組合目的、鏡像隊列
RabbitMQ
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
RabbitMQ特性
支持多種客戶端 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等 AMQP的完整實現(vhost、Exchange、Binding、Routing Key等) 事務支持/發佈確認 消息持久化
Kafka
Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,是一個分佈式、分區的、可高的分佈式日誌存儲服務。它經過一種獨一無二的設計提供了一個消息系統的功能。
Kafka特性
經過O(1)的磁盤數據結構提供消息的持久化, 這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能 高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息 Partition、Consumer Group
綜合評價
Java消息服務定義
Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。
JMS相關概念
提供者:實現JMS規範的消息中間件服務器 客戶端:發送或接收消息的應用程序 生產者/發佈者:建立併發送消息的客戶端 消費者/訂閱者:接收並處理消息的客戶端 消息:應用程序之間傳遞的數據內容 消息模式:在客戶端之間傳遞消息的模式,JMS中定義了主題和隊列兩種模式
JMS消息模式:隊列模式
客戶端包括生產者和消費者 隊列中的消息只能被一個消費者消費 消費者能夠隨時消費隊列中的消息
隊列模型示意圖
JMS消息模式:主題模型
客戶端包括髮布者和訂閱者 主題中的消息被全部訂閱者消費 消費者不能消費訂閱以前就發送到主題中的消息
主題模型示意圖
JMS編碼接口
ConnectionFactory:用於建立鏈接到消息中間件的鏈接工廠 Connection:表明了應用程序和消息服務器之間的通訊鏈路 Destination:指消息發佈和接收的地點,包括隊列和主題 Session:表示一個單線程的上下文,用於發送和接收消息 MessageConsumer:由會話建立,用戶接收發送到目標的消息 MessageProducer:由會話建立,用於發送消息到目標 Message:是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體
JMS編碼接口之間的關係
在Windows安裝ActiveMQ
下載安裝包 直接啓動 使用服務啓動
安裝驗證
訪問地址:http://127.0.0.1:8161/ 默認用戶:admin 默認密碼:admin
在Linux安裝ActiveMQ
下載並解壓安裝包 啓動
啓動驗證
進入到bin目錄,使用命令./activemq start啓動服務 使用命令ps -ef |grep activemq查看進程是否存在 使用命令./activemq stop關閉服務
安裝驗證
訪問地址:http://Linux主機IP:8161/ 默認用戶:admin 默認密碼:admin
使用JMS接口規範鏈接ActiveMQ
建立生產者 建立消費者 建立發佈者 建立訂閱者
回顧JMS編碼接口之間的關係
代碼演示
1.編寫AppProducer類
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生產者-隊列模式 * @author ZhangCheng on 2017-07-22 * */ public class AppProducer { /** 指定ActiveMQ服務的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.啓動鏈接 connection.start(); // 4.建立會話(第一個參數:是否在事務中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 建立一個目標 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一個生產者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.發佈消息 producer.send(textMessage); System.out.println("消息發送:" + textMessage.getText()); } // 9.關閉鏈接 connection.close(); } }
2.編寫AppConsumer類
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消費者-隊列模式 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumer { /** 指定ActiveMQ服務的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.啓動鏈接 connection.start(); // 4.建立會話(第一個參數:是否在事務中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.建立一個目標 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 7.建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息異常:"); e.printStackTrace(); } } }); // 8.關閉鏈接 //connection.close(); } }
代碼演示
1.編寫AppProducer類
package com.myimooc.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生產者-主題模式 * @author ZhangCheng on 2017-07-22 * */ public class AppProducer { /** 指定ActiveMQ服務的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主題的名稱 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.啓動鏈接 connection.start(); // 4.建立會話(第一個參數:是否在事務中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 建立一個目標 Destination destination = session.createTopic(TOPIC_NAME); // 6.建立一個生產者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.發佈消息 producer.send(textMessage); System.out.println("消息發送:" + textMessage.getText()); } // 9.關閉鏈接 connection.close(); } }
2.編寫AppConsumer類
package com.myimooc.jms.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消費者-主題模式 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumer { /** 指定ActiveMQ服務的地址 */ private static final String URL = "tcp://127.0.0.1:61616"; /** 指定主題的名稱 */ private static final String TOPIC_NAME = "topic-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.啓動鏈接 connection.start(); // 4.建立會話(第一個參數:是否在事務中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.建立一個目標 Destination destination = session.createTopic(TOPIC_NAME); // 6.建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 7.建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息異常:"); e.printStackTrace(); } } }); // 8.關閉鏈接 //connection.close(); } }
使用Spring集成JMS鏈接ActiveMQ
ConnectionFactory:用於管理鏈接的鏈接工廠 JmsTemplate:用於發送和接收消息的模版類 MessageListener:消息監聽器
ConnectionFactory
一個Spring爲咱們提供的鏈接池 JmsTemplate每次發消息都會從新建立鏈接,會話和productor Spring中提供了SingleConnectFactory和CachingConnectionFactory
JmsTemplate
是Spring提供的,只需向Spring容器內註冊這個類就可使用JmsTemplate方便的操做jms JmsTemplate類是線程安全的,能夠在整個應用範圍使用
MessageListener
實現一個onMessage方法,該方法只接收一個Message參數
代碼演示
1.建立名爲jmsspring的maven項目POM文件以下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.myimooc</groupId> <artifactId>jmsspring</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>jmsspring</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.1.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
2.完成後的目錄結構以下
源碼請到個人github地址查看
3.測試
使用Postman向ProducerController發起請求,將消息發送出去
對應的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息
爲何要對消息中間件集羣
實現高可用,以排除單點故障引發的服務中斷 實現負載均衡,以提高效率爲更多客戶提供服務
集羣方式
客戶端集羣:讓多個消費者消費同一個隊列 Broker cluster:多個Broker之間同步消息 Master Slave:實現高可用
ActiveMQ失效轉移(failover)-客戶端配置
容許當其中一臺消息服務器宕機時,客戶端在傳輸層上從新鏈接到其它消息服務器 語法:failover:(uri1,…,uriN)?transportOptions
transportOptions參數說明
randomize默認爲true,表示在URI列表中選擇URI鏈接時是否採用隨機策略 initialReconnectDelay默認爲10,單位毫秒,表示第一次嘗試重連之間等待的時間 maxReconnectDelay默認爲30000,單位毫秒,最長重連的時間間隔
Broker cluster集羣配置-原理
NetworkConnector(網絡鏈接器)
網絡鏈接器主要用於配置ActiveMQ服務器與服務器之間的網絡通信方式,用於服務器透傳消息 網絡鏈接器分爲靜態鏈接器和動態鏈接器
靜態鏈接器
動態鏈接器
ActiveMQ Master Slace集羣方案
Share nothing storage master/slave(已過期,5.8+後移除) Shared storage master/slave 共享存儲 Replicated LevelDB Store基於負責的LevelDB Store
共享存儲集羣的原理
基於複製的LevelDB Store的原理
兩種集羣方式對比
三臺服務器的完美集羣方案
ActiveMQ集羣配置方案
配置過程
1.節點準備
mkdir activemq建立目錄 cp -rf apache-activemq-5.15.0 activemq/activemq-a cp -rf apache-activemq-5.15.0 activemq/activemq-b cp -rf apache-activemq-5.15.0 activemq/activemq-c cd activemq mkdir kahadb
2.配置a節點
cd activemq-a/ cd conf/ vim activemq.xml <networkConnectors> <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" /> </networkConnectors> vim jetty.xml:配置管理端口號,a節點使用默認端口,無須配置
3.配置b節點
vim activemq.xml 配置網絡鏈接器 <networkConnectors> <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> 配置持久化存儲路徑 <persistenceAdapter> <kahaDB directory="/studio/activemq/kahadb"/> </persistenceAdapter> 配置服務端口 <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> vim jetty.xml 配置管理端口號 <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8162"/> </bean>
4.配置c節點
vim activemq.xml 配置網絡鏈接器 <networkConnectors> <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> 配置持久化存儲路徑 <persistenceAdapter> <kahaDB directory="/studio/activemq/kahadb"/> </persistenceAdapter> 配置服務端口 <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> vim jetty.xml 配置管理端口號 <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8163"/> </bean>
5.啓動服務
回到activemq目錄,分別啓動a,b,c三個節點
./activemq-a/bin/activemq start ./activemq-b/bin/activemq start ./activemq-c /bin/activemq start
檢查是否都啓動成功
ps -ef |grep activemq
檢查是否對外提供服務,即端口是否被監聽(佔用)
netstat -anp |grep 61616 netstat -anp |grep 61617 netstat -anp |grep 61618
檢查發現61618即c節點沒有提供服務,可是c節點的進程是啓動成功了的。由於b節點和c點擊是master/slave配置,如今b節點獲取到了共享文件夾的全部權,因此c節點正在等待得到資源,而且提供服務。即c節點在未得到資源以前,是不提供服務的。
測試,把b節點殺掉,看c節點能不能提供61618的服務
./activemq-b/bin/activemq stop netstat -anp |grep 61618 ./activemq-b/bin/activemq start netstat -anp |grep 61617
檢查發現,從新啓動b節點後,b節點61617端口並無提供服務,是由於如今b節點成爲了slave節點,而c節點成爲了master節點。因此,如今b節點啓動了,可是它並不對外提供服務。只有當c節點出現問題後,b節點纔對外提供服務。
6.經過代碼測試集羣配置是否生效
生產者
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 生產者-隊列模式-集羣配置測試 * @author ZhangCheng on 2017-07-25 * */ public class AppProducerTest { /** failover 爲狀態轉移的存在部分 * 因a節點只做爲消費者使用,因此這裏不配置61616節點了。 * */ private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.啓動鏈接 connection.start(); // 4.建立會話(第一個參數:是否在事務中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 建立一個目標 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一個生產者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 8.發佈消息 producer.send(textMessage); System.out.println("消息發送:" + textMessage.getText()); } // 9.關閉鏈接 connection.close(); } }
消費者
package com.myimooc.jms.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * App 消費者-隊列模式-集羣配置測試 * @author ZhangCheng on 2017-07-22 * */ public class AppConsumerTest { /** failover 爲狀態轉移的存在部分 * */ private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; /** 指定隊列的名稱 */ private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.建立Connection Connection connection = connectionFactory.createConnection(); // 3.啓動鏈接 connection.start(); // 4.建立會話(第一個參數:是否在事務中處理) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.建立一個目標 Destination destination = session.createQueue(QUEUE_NAME); // 6.建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); // 7.建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { System.out.println("接收消息異常:"); e.printStackTrace(); } } }); // 8.關閉鏈接 //connection.close(); } }
運行生產者,而後到管理界面查看消息發送到了那裏
http://127.0.0.1:8161 http://127.0.0.1:8162 http://127.0.0.1:8163
查看發現,8162沒法訪問,是由於b節點是slave節點,不提供服務,消息都發送到了c節點
把8163即c節點宕掉後,運行消費者,查看消息是否可以使用
./activemq-c/bin/activemq stop
實際業務場景分析
實際業務場景特色
子業務系統都有集羣的可能性 同一個消息會廣播給關注該類消息的全部子業務系統 同一類消息在集羣中被負載消費 業務的發生和消息的發佈最終一致性
須要解決的問題
不一樣業務系統分別處理同一個消息,同一業務系統負載處理同類消息 解決消息發送時的一致性問題 解決消息處理的冪等性問題 基於消息機制創建事件總線
集羣系統處理消息方案-使用JMS級聯的解決方案
集羣系統處理消息方案-使用ActiveMQ的虛擬主題解決方案
發佈者:將消息發佈到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST 消費者:從隊列中獲取消息,在隊列名中表名本身身份,如Consumer.A.VirtualTopic.TEST
解決消息發送時的一致性問題-使用JMS中XA系列接口保證強一致性
引入分佈式事務 要求業務操做必須支持XA協議
解決消息發送時的一致性問題-使用消息表的本地事務解決方案
解決消息發送時的一致性問題-使用內存日誌的解決方案
解決消息處理的冪等性問題
所謂冪等性問題,是指屢次執行所產生的影響(結果)與一次執行所產生的影響(結果)同樣。好比:支付成功後,支付寶會發起屢次通知給業務系統,要求業務系統可以處理這些重複的消息,可是又不重複處理訂單。若是在消息處理系統中保證冪等性,會增長系統複雜度,咱們能夠統一處理冪等性後,再將消息發送給消息處理系統。
解決消息處理的冪等性問題-使用消息表的本地事務解決方案
解決消息處理的冪等性問題-使用內存日誌的解決方案
基於消息機制的事件總線-什麼是事件驅動架構
事件驅動架構(Event Driven Architecture,EDA)定義了一個設計和實現一個應用系統的方法學,在這個系統裏事件可傳輸於鬆散耦合的組件和服務之間。特色:有事我叫你,沒事別煩我
事件驅動架構模型
該教師正在開發該事件總線的框架,github地址https://github.com/jovezhao/nest。
分析須要作的事
解決各業務系統集羣處理同一條消息 實現本身的消息提供者
經常使用消息中間件
ActiveMQ RabbitMQ Kafka
集成RabbitMQ
RabbitMQ:使用交換器綁定到隊列
示意圖
RabbitMQ消息提供者源碼解析
建立ConnectionFactory 建立Connection 建立Channel 定義Exchange 定義Queue而且綁定隊列
集成Kafka
Kafka使用group.id分組消費者
配置消息者參數group.id相對時對消息進行負載處理 配置服務器partitions參數,控制同一個group.id下的consumer數量小於partitions Kafka只保證同一個partition下的消息是有序的