衆所周知,消息中間件是大型分佈式系統中不可或缺的重要組件。它使用簡單,卻解決了很多難題,好比異步處理,系統藕合,流量削鋒,分佈式事務管理等。實現了一個高性能,高可用,高擴展的系統。本章經過介紹消息中間件的應用場景,消息中間件的傳輸模式,ActiveMQ快速入門 三個方面來對消息中間件進行入門介紹。還在等什麼,趕快來學習吧!html
說明:消息中間件很是強大,值得咱們認真去學習和使用。完整代碼請異步github。
技術:消息中間件的應用場景,通訊模式,ActiveMQ。
源碼:https://github.com/ITDragonBl...
文章目錄結構:
前端
異步處理:調用者發起請求後,調用者不會馬上獲得結果,也無需等待結果,繼續執行其餘業務邏輯。提升了效率但存在異步請求失敗的隱患,適用於非核心業務邏輯處理。
同步處理:調用者發起請求後,調用者必須等待直到返回結果,再根據返回的結果執行其餘業務邏輯。效率雖然沒有異步處理高,但能保證業務邏輯可控性,適用於核心業務邏輯處理。java
舉一個比較常見的應用場景:爲了確保註冊用戶的真實性,通常在註冊成功後會發送驗證郵件或者驗證碼短信,只有認證成功的用戶才能正常使用平臺功能。
以下圖所示:同步處理和異步處理的比較。git
用消息中間件實現異步處理的好處:
1、在傳統的系統架構,用戶從註冊到跳轉成功頁面,中間須要等待郵件發送的業務邏輯耗時。這不只影響系統響應時間,下降了CPU吞吐量,同時還影響了用戶的體驗。
2、經過消息中間件將郵件發送的業務邏輯異步處理,用戶註冊成功後發送數據到消息中間件,再跳轉成功頁面,郵件發送的邏輯再由訂閱該消息中間件的其餘系統負責處理,
3、消息中間件的讀寫速度很是的快,其中的耗時能夠忽略不計。經過消息中間件能夠處理更多的請求。github
小結:正確使用消息中間件將非核心業務邏輯功能異步處理,能夠提升系統的響應效率,提升了CPU的吞吐量,改善用戶的體驗。spring
分佈式系統是若干個獨立的計算機(系統)集合。每一個計算機負責本身的模塊,實現系統的解耦,也避免單點故障對整個系統的影響。每一個系統還能夠作一個集羣,進一步下降故障的發生機率。
在這樣的分佈式系統中,消息中間件又扮演着什麼樣的角色呢?數據庫
舉一個比較常見的應用場景:訂單系統下單成功後,須要調用倉庫系統接口,選擇最優的發貨倉庫和更新商品庫存。若由於某種緣由在調用倉庫系統接口失敗,會直接影響到下單流程。
以下圖所示:感覺一下消息中間件扮演的重要角色。express
用消息中間件實現系統藕合的好處:
1、消息中間件可讓各系統之間耦合性下降,不會由於其餘系統的異常影響到自身業務邏輯。各盡其職,訂單系統只需負責將訂單數據持久化到數據庫中,倉庫系統只需負責更新庫存,不會由於倉庫系統的緣由從而影響到下單的流程。
2、各位看官是否發現了一個問題,下單和庫存減小本應該是一個事務。由於分佈式的緣由很難保證事務的強一致性。這裏經過消息中間件實現事務的最終一致性效果(後續會詳細介紹)。apache
小結:事務的一致性當然重要,沒有庫存會致使下單失敗是一個理論上很正常的邏輯。但實際業務中並不是如此,咱們徹底能夠利用發貨期經過採購或者借庫的方式來增長庫存。這樣無疑能夠增長銷量,仍是能夠保證事務的最終一致性。瀏覽器
流量削鋒也稱限流。在秒殺,搶購的活動中,爲了避免影響整個系統的正常使用,通常會經過消息中間件作限流,避免流量突增壓垮系統,前端頁面能夠提示"排隊等待",即使用戶體驗不好,也不能讓系統垮掉。
小結:限流能夠在流量突增的狀況下保障系統的穩定。系統宕機會被同行抓住笑柄。
消息中間件除了支持對點對和發佈訂閱兩種模式外,在實際開發中還有一種雙向應答模式被普遍使用。
點對點(p2p)模式有三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。發送者將消息發送到一個特定的隊列中,等待接收者從隊列中獲取消息消耗。
P2P的三個特色:
1、每一個消息只能被一個接收者消費,且消息被消費後默認從隊列中刪掉(也能夠經過其餘簽收機制重複消費)。
2、發送者和接收者之間沒有依賴性,生產者發送消息和消費者接收消息並不要求同時運行。
3、接收者在成功接收消息以後需向隊列發送接收成功的確認消息。
發佈訂閱(Pub/Sub)模式也有三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber)。發佈者將消息發送到主題隊列中,系統再將這些消息傳遞給訂閱者。
Pub/Sub的特色:
1、每一個消息能夠被多個訂閱者消費。
2、發佈者和訂閱者之間存在依賴性。訂閱者必須先訂閱主題後才能接收到信息,在訂閱前發佈的消息,訂閱者是接收不到的。
3、非持久化訂閱:若是訂閱者不在線,此時發佈的消息訂閱者是也接收不到,即使訂閱者從新上線也接收不到。
4、持久化訂閱:訂閱者訂閱主題後,即使訂閱者不在線,此時發佈的消息能夠在訂閱者從新上線後接收到的。
雙向應答模式並非消息中間件提供的一種通訊模式,它是因爲實際生成環境的須要,在原有的基礎上作了改良。即消息的發送者也是消息的接收者。消息的接收者也是消息的發送者。以下圖所示
ActiveMQ是Apache出品,簡單好用,能力強大,能夠處理大部分的業務的開源消息總線。同時也支持JMS規範。
JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,容許應用程序組件基於JavaEE平臺建立、發送、接收和讀取消息。它使分佈式通訊耦合度更低,消息服務更加可靠以及異步性。
ActiveMQ 的安裝很簡單,三個步驟:
第一步:下載,官網下載地址:http://activemq.apache.org/do...。
第二步:運行,壓縮包解壓後,在bin目錄下根據電腦系統位數找到對應的wrapper.exe程序,雙擊運行。
第三步:訪問,瀏覽器訪問http://localhost:8161/admin,帳號密碼都是admin。
咱們經過簡單的P2P模式來了解ActiveMQ的工做流程。
生產者發送消息給MQ主要步驟:
第一步:建立鏈接工廠實例
第二步:建立鏈接並啓動
第三步:獲取操做消息的接口
第四步:建立隊列,即Queue或者Topic
第五步:建立消息發送者
第六步:發送消息,關閉資源
import java.util.Random; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息隊列生產者 * @author itdragon */ public class ITDragonProducer { private static final String QUEUE_NAME = "ITDragon.Queue"; public static void main(String[] args) { // ConnectionFactory: 鏈接工廠,JMS 用它建立鏈接 ConnectionFactory connectionFactory = null; // Connection: 客戶端和JMS系統之間創建的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 ,操做消息的接口 Session session = null; // Destination: 消息的目的地,消息發送給誰 Destination destination = null; // MessageProducer: 消息生產者 MessageProducer producer = null; try { // step1 構造ConnectionFactory實例對象,須要填入 用戶名, 密碼 以及要鏈接的地址,默認端口爲"tcp://localhost:61616" connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // step2 鏈接工廠建立鏈接對象 connection = connectionFactory.createConnection(); // step3 啓動 connection.start(); // step4 獲取操做鏈接 /** * 第一個參數:是否設置事務 true or false。 若是設置了true,第二個參數忽略,而且須要commit()才執行 * 第二個參數:acknowledge模式 * AUTO_ACKNOWLEDGE:自動確認,客戶端發送和接收消息不須要作額外的工做。無論消息是否被正常處理。 默認 * CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到消息後,必須手動調用acknowledge方法,jms服務器纔會刪除消息。 * DUPS_OK_ACKNOWLEDGE:容許重複的確認模式。 */ session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // step5 建立一個隊列到目的地 destination = session.createQueue(QUEUE_NAME); // step6 在目的地建立一個生產者 producer = session.createProducer(destination); // step7 生產者設置不持久化,若要設置持久化則使用 PERSISTENT producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // step8 生產者發送信息,具體的業務邏輯 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for(int i = 0; i < 5; i++) { String []operators = {"+","-","*","/"}; Random random = new Random(System.currentTimeMillis()); String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1); TextMessage message = session.createTextMessage(expression); // 發送消息到目的地方 producer.send(message); System.out.println("Queue Sender ---------> " + expression); } } }
消費者從MQ中獲取數據消費步驟和上面相似,只是將發送消息改爲了接收消息。
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.itdragon.utils.ITDragonUtil; /** * 消息隊列消費者 * @author itdragon */ public class ITDragonConsumer { private static final String QUEUE_NAME = "ITDragon.Queue"; // 要和Sender一致 public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; // MessageConsumer: 信息消費者 MessageConsumer consumer = null; try { connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(QUEUE_NAME); consumer = session.createConsumer(destination); // 不斷地接收信息,直到沒有爲止 while (true) { TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.print(ITDragonUtil.cal(message.getText())); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
SpringBoot能夠幫助咱們快速搭建項目,減小Spring整合第三方配置的精力。SpringBoot整合ActiveMQ也是很是簡單,只須要簡單的兩個步驟:
第一步,在pom.xml文件中添加依賴使其支持ActiveMQ
第二步,在application.properties文件中配置鏈接ActiveMQ參數
pom.xml是Maven項目的核心配置文件
<dependency> <!-- 支持ActiveMQ依賴 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <!-- 支持使用mq鏈接池 --> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
application.properties是SpringBoot項目的核心參數配置文件
spring.activemq.user=admin spring.activemq.password=admin spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.in-memory=true spring.activemq.pool.enabled=true
spring.activemq.in-memory
默認值爲true,表示無需安裝ActiveMQ的服務器,直接使用內存。spring.activemq.pool.enabled
表示經過鏈接池的方式鏈接。
springboot-activemq-producer 項目模擬生產者所在的系統,支持同時發送點對點模式和發佈訂閱模式。
兩個核心文件:一個是消息發送類,一個是隊列Bean管理配置類。
三種主要模式:一個是對點對模式,隊列名爲"queue.name";一個是發佈訂閱模式,主題名爲"topic.name";最後一個是雙向應答模式,隊列名爲"response.name" 。
import java.util.Random; import javax.jms.Queue; import javax.jms.Topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; /** * 消息隊列生產者 * @author itdragon */ @Service @EnableScheduling public class ITDragonProducer { @Autowired private JmsMessagingTemplate jmsTemplate; @Autowired private Queue queue; @Autowired private Topic topic; @Autowired private Queue responseQueue; /** * 點對點(p2p)模式測試 * 包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。 * 發送者將消息發送到一個特定的隊列,隊列保留着消息,直到接收者從隊列中獲取消息。 */ @Scheduled(fixedDelay = 5000) public void testP2PMQ(){ for(int i = 0; i < 5; i++) { String []operators = {"+","-","*","/"}; Random random = new Random(System.currentTimeMillis()); String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1); jmsTemplate.convertAndSend(this.queue, expression); System.out.println("Queue Sender ---------> " + expression); } } /** * 訂閱/發佈(Pub/Sub)模擬測試 * 包含三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。 * 多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。 */ @Scheduled(fixedDelay = 5000) public void testPubSubMQ() { for (int i = 0; i < 5; i++) { String []operators = {"+","-","*","/"}; Random random = new Random(System.currentTimeMillis()); String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1); jmsTemplate.convertAndSend(this.topic, expression); System.out.println("Topic Sender ---------> " + expression); } } /** * 雙向應答模式測試 * P2P和Pub/Sub是MQ默認提供的兩種模式,而雙向應答模式則是在原有的基礎上作了改進。發送者既是接收者,接收者也是發送者。 */ @Scheduled(fixedDelay = 5000) public void testReceiveResponseMQ(){ for (int i = 0; i < 5; i++) { String []operators = {"+","-","*","/"}; Random random = new Random(System.currentTimeMillis()); String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1); jmsTemplate.convertAndSend(this.responseQueue, expression); } } // 接收P2P模式,消費者返回的數據 @JmsListener(destination = "out.queue") public void receiveResponse(String message) { System.out.println("Producer Response Receiver ---------> " + message); } }
import javax.jms.Queue; import javax.jms.Topic; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * bean配置管理類 * @author itdragon */ @Configuration public class ActiveMQBeansConfig { @Bean // 定義一個名字爲queue.name的點對點列隊 public Queue queue() { return new ActiveMQQueue("queue.name"); } @Bean // 定義一個名字爲topic.name的主題隊列 public Topic topic() { return new ActiveMQTopic("topic.name"); } @Bean // 定義一個名字爲response.name的雙向應答隊列 public Queue responseQueue() { return new ActiveMQQueue("response.name"); } }
springboot-activemq-consumer 模擬消費者所在的服務器,主要負責監聽隊列消息。
兩個核心文件:一個是消息接收類,一個是兼容點對點模式和發佈訂閱模式的連接工廠配置類。
import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Service; import com.itdragon.utils.ITDragonUtil; /** * 消息隊列消費者 * @author itdragon */ @Service public class ITDragonConsumer { // 1. 監聽名字爲"queue.name"的點對點隊列 @JmsListener(destination = "queue.name", containerFactory="queueListenerFactory") public void receiveQueue(String text) { System.out.println("Queue Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text)); } // 2. 監聽名字爲"topic.name"的發佈訂閱隊列 @JmsListener(destination = "topic.name", containerFactory="topicListenerFactory") public void receiveTopicOne(String text) { System.out.println(Thread.currentThread().getName() + " No.1 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text)); } // 2. 監聽名字爲"topic.name"的發佈訂閱隊列 @JmsListener(destination = "topic.name", containerFactory="topicListenerFactory") public void receiveTopicTwo(String text) { System.out.println(Thread.currentThread().getName() +" No.2 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text)); } // 3. 監聽名字爲"response.name"的接收應答(雙向)隊列 @JmsListener(destination = "response.name") @SendTo("out.queue") public String receiveResponse(String text) { System.out.println("Response Receiver : " + text + " \t 正在返回數據......"); return ITDragonUtil.cal(text).toString(); } }
import java.util.concurrent.Executors; import javax.jms.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; @Configuration @EnableJms public class JmsConfig { @Bean // 開啓pub/Sub模式 public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(true); factory.setConnectionFactory(connectionFactory); return factory; } @Bean // JMS默認開啓P2P模式 public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory); return factory; } }
1) 消息中間件能夠解決異步處理,系統解耦,流量削鋒,分佈式系統事務管理等問題。
2) 消息中間件默認支持點對點模式和發佈訂閱模式,實際工做中還可使用雙向應當模式。
3) ActiveMQ是Apache出品,簡單好用,功能強大,能夠處理大部分的業務的開源消息總線。
到這裏 消息中間件企業級應用使用 的文章就寫完了。若是文章對你有幫助,能夠點個"推薦",也能夠"關注"我,得到更多豐富的知識。後續博客計劃是:RocketMQ和Kafka的使用,Zookeeper和相關集羣的搭建。若文中有什麼不對或者不嚴謹的地方,請指正。