消息中間件企業級應用

消息中間件企業級應用

衆所周知,消息中間件是大型分佈式系統中不可或缺的重要組件。它使用簡單,卻解決了很多難題,好比異步處理,系統藕合,流量削鋒,分佈式事務管理等。實現了一個高性能,高可用,高擴展的系統。本章經過介紹消息中間件的應用場景消息中間件的傳輸模式ActiveMQ快速入門 三個方面來對消息中間件進行入門介紹。還在等什麼,趕快來學習吧!html

說明:消息中間件很是強大,值得咱們認真去學習和使用。完整代碼請異步github。
技術:消息中間件的應用場景,通訊模式,ActiveMQ。
源碼:https://github.com/ITDragonBl...
文章目錄結構:
文章目錄結構前端

消息中間件應用場景

異步處理

異步處理:調用者發起請求後,調用者不會馬上獲得結果,也無需等待結果,繼續執行其餘業務邏輯。提升了效率但存在異步請求失敗的隱患,適用於非核心業務邏輯處理。
同步處理:調用者發起請求後,調用者必須等待直到返回結果,再根據返回的結果執行其餘業務邏輯。效率雖然沒有異步處理高,但能保證業務邏輯可控性,適用於核心業務邏輯處理。java

舉一個比較常見的應用場景:爲了確保註冊用戶的真實性,通常在註冊成功後會發送驗證郵件或者驗證碼短信,只有認證成功的用戶才能正常使用平臺功能。
以下圖所示:同步處理和異步處理的比較。git

異步處理

用消息中間件實現異步處理的好處:
1、在傳統的系統架構,用戶從註冊到跳轉成功頁面,中間須要等待郵件發送的業務邏輯耗時。這不只影響系統響應時間,下降了CPU吞吐量,同時還影響了用戶的體驗。
2、經過消息中間件將郵件發送的業務邏輯異步處理,用戶註冊成功後發送數據到消息中間件,再跳轉成功頁面,郵件發送的邏輯再由訂閱該消息中間件的其餘系統負責處理,
3、消息中間件的讀寫速度很是的快,其中的耗時能夠忽略不計。經過消息中間件能夠處理更多的請求。github

小結:正確使用消息中間件將非核心業務邏輯功能異步處理,能夠提升系統的響應效率,提升了CPU的吞吐量,改善用戶的體驗。spring

系統藕合和事務的最終一致性

分佈式系統是若干個獨立的計算機(系統)集合。每一個計算機負責本身的模塊,實現系統的解耦,也避免單點故障對整個系統的影響。每一個系統還能夠作一個集羣,進一步下降故障的發生機率。
在這樣的分佈式系統中,消息中間件又扮演着什麼樣的角色呢?數據庫

舉一個比較常見的應用場景:訂單系統下單成功後,須要調用倉庫系統接口,選擇最優的發貨倉庫和更新商品庫存。若由於某種緣由在調用倉庫系統接口失敗,會直接影響到下單流程。
以下圖所示:感覺一下消息中間件扮演的重要角色。express

系統解耦

用消息中間件實現系統藕合的好處:
1、消息中間件可讓各系統之間耦合性下降,不會由於其餘系統的異常影響到自身業務邏輯。各盡其職,訂單系統只需負責將訂單數據持久化到數據庫中,倉庫系統只需負責更新庫存,不會由於倉庫系統的緣由從而影響到下單的流程。
2、各位看官是否發現了一個問題,下單和庫存減小本應該是一個事務。由於分佈式的緣由很難保證事務的強一致性。這裏經過消息中間件實現事務的最終一致性效果(後續會詳細介紹)。apache

小結:事務的一致性當然重要,沒有庫存會致使下單失敗是一個理論上很正常的邏輯。但實際業務中並不是如此,咱們徹底能夠利用發貨期經過採購或者借庫的方式來增長庫存。這樣無疑能夠增長銷量,仍是能夠保證事務的最終一致性。瀏覽器

流量削鋒

流量削鋒也稱限流。在秒殺,搶購的活動中,爲了避免影響整個系統的正常使用,通常會經過消息中間件作限流,避免流量突增壓垮系統,前端頁面能夠提示"排隊等待",即使用戶體驗不好,也不能讓系統垮掉。

流量削鋒

小結:限流能夠在流量突增的狀況下保障系統的穩定。系統宕機會被同行抓住笑柄。

消息中間件的傳輸模式

消息中間件除了支持對點對和發佈訂閱兩種模式外,在實際開發中還有一種雙向應答模式被普遍使用。

點對點(p2p)模式

點對點(p2p)模式有三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。發送者將消息發送到一個特定的隊列中,等待接收者從隊列中獲取消息消耗。
P2P的三個特色:
1、每一個消息只能被一個接收者消費,且消息被消費後默認從隊列中刪掉(也能夠經過其餘簽收機制重複消費)。
2、發送者和接收者之間沒有依賴性,生產者發送消息和消費者接收消息並不要求同時運行。
3、接收者在成功接收消息以後需向隊列發送接收成功的確認消息。

p2p模式

發佈訂閱(Pub/Sub)模式

發佈訂閱(Pub/Sub)模式也有三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber)。發佈者將消息發送到主題隊列中,系統再將這些消息傳遞給訂閱者。
Pub/Sub的特色:
1、每一個消息能夠被多個訂閱者消費。
2、發佈者和訂閱者之間存在依賴性。訂閱者必須先訂閱主題後才能接收到信息,在訂閱前發佈的消息,訂閱者是接收不到的。
3、非持久化訂閱:若是訂閱者不在線,此時發佈的消息訂閱者是也接收不到,即使訂閱者從新上線也接收不到。
4、持久化訂閱:訂閱者訂閱主題後,即使訂閱者不在線,此時發佈的消息能夠在訂閱者從新上線後接收到的。

Pub/Sub模式

雙向應答模式

雙向應答模式並非消息中間件提供的一種通訊模式,它是因爲實際生成環境的須要,在原有的基礎上作了改良。即消息的發送者也是消息的接收者。消息的接收者也是消息的發送者。以下圖所示

雙向應當模式

ActiveMQ 入門

ActiveMQ是Apache出品,簡單好用,能力強大,能夠處理大部分的業務的開源消息總線。同時也支持JMS規範。

JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,容許應用程序組件基於JavaEE平臺建立、發送、接收和讀取消息。它使分佈式通訊耦合度更低,消息服務更加可靠以及異步性。

ActiveMQ 安裝

ActiveMQ 的安裝很簡單,三個步驟:
第一步:下載,官網下載地址:http://activemq.apache.org/do...
第二步:運行,壓縮包解壓後,在bin目錄下根據電腦系統位數找到對應的wrapper.exe程序,雙擊運行。
第三步:訪問,瀏覽器訪問http://localhost:8161/admin,帳號密碼都是admin。

ActiveMQ 工做流程

咱們經過簡單的P2P模式來了解ActiveMQ的工做流程。
生產者發送消息給MQ主要步驟:
第一步:建立鏈接工廠實例
第二步:建立鏈接並啓動
第三步:獲取操做消息的接口
第四步:建立隊列,即Queue或者Topic
第五步:建立消息發送者
第六步:發送消息,關閉資源

import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息隊列生產者
 * @author itdragon
 */
public class ITDragonProducer {
    
    private static final String QUEUE_NAME = "ITDragon.Queue";  
      
    public static void main(String[] args) {  
        // ConnectionFactory: 鏈接工廠,JMS 用它建立鏈接  
        ConnectionFactory connectionFactory = null;  
        // Connection: 客戶端和JMS系統之間創建的連接
        Connection connection = null;  
        // Session: 一個發送或接收消息的線程 ,操做消息的接口
        Session session = null;  
        // Destination: 消息的目的地,消息發送給誰  
        Destination destination = null;  
        // MessageProducer: 消息生產者  
        MessageProducer producer = null;  
        try {  
            // step1 構造ConnectionFactory實例對象,須要填入 用戶名, 密碼 以及要鏈接的地址,默認端口爲"tcp://localhost:61616"  
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);   
            // step2 鏈接工廠建立鏈接對象  
            connection = connectionFactory.createConnection();  
            // step3 啓動  
            connection.start();  
            // step4 獲取操做鏈接  
            /** 
             * 第一個參數:是否設置事務 true or false。 若是設置了true,第二個參數忽略,而且須要commit()才執行 
             * 第二個參數:acknowledge模式 
             * AUTO_ACKNOWLEDGE:自動確認,客戶端發送和接收消息不須要作額外的工做。無論消息是否被正常處理。 默認
             * CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到消息後,必須手動調用acknowledge方法,jms服務器纔會刪除消息。
             * DUPS_OK_ACKNOWLEDGE:容許重複的確認模式。
             */  
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
            // step5 建立一個隊列到目的地  
            destination = session.createQueue(QUEUE_NAME);  
            // step6 在目的地建立一個生產者  
            producer = session.createProducer(destination);  
            // step7 生產者設置不持久化,若要設置持久化則使用 PERSISTENT
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // step8 生產者發送信息,具體的業務邏輯  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection) {  
                    connection.close();  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {  
        for(int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());  
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            TextMessage message = session.createTextMessage(expression);  
            // 發送消息到目的地方  
            producer.send(message);  
            System.out.println("Queue Sender ---------> " + expression);
        }
    }  

}

消費者從MQ中獲取數據消費步驟和上面相似,只是將發送消息改爲了接收消息。

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.MessageConsumer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;
import com.itdragon.utils.ITDragonUtil;

/**
 * 消息隊列消費者
 * @author itdragon
 */
public class ITDragonConsumer {
    
    private static final String QUEUE_NAME = "ITDragon.Queue"; // 要和Sender一致  
    
    public static void main(String[] args) {  
        ConnectionFactory connectionFactory = null;  
        Connection connection = null;  
        Session session = null;  
        Destination destination = null;  
        // MessageConsumer: 信息消費者  
        MessageConsumer consumer = null;  
        try {  
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);  
            connection = connectionFactory.createConnection();  
            connection.start();  
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
            destination = session.createQueue(QUEUE_NAME);  
            consumer = session.createConsumer(destination);  
            // 不斷地接收信息,直到沒有爲止  
            while (true) {  
                TextMessage message = (TextMessage) consumer.receive();  
                if (null != message) {  
                    System.out.print(ITDragonUtil.cal(message.getText()));  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection) {  
                    connection.close();  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

SpringBoot 整合ActiveMQ使用

SpringBoot能夠幫助咱們快速搭建項目,減小Spring整合第三方配置的精力。SpringBoot整合ActiveMQ也是很是簡單,只須要簡單的兩個步驟:
第一步,在pom.xml文件中添加依賴使其支持ActiveMQ
第二步,在application.properties文件中配置鏈接ActiveMQ參數

pom.xml是Maven項目的核心配置文件

<dependency>  <!-- 支持ActiveMQ依賴 -->
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-activemq</artifactId>  
</dependency> 
<dependency>  <!-- 支持使用mq鏈接池 -->
    <groupId>org.apache.activemq</groupId>  
    <artifactId>activemq-pool</artifactId>  
</dependency>

application.properties是SpringBoot項目的核心參數配置文件

spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=true

spring.activemq.in-memory 默認值爲true,表示無需安裝ActiveMQ的服務器,直接使用內存。
spring.activemq.pool.enabled 表示經過鏈接池的方式鏈接。

springboot-activemq-producer

springboot-activemq-producer 項目模擬生產者所在的系統,支持同時發送點對點模式和發佈訂閱模式。
兩個核心文件:一個是消息發送類,一個是隊列Bean管理配置類。
三種主要模式:一個是對點對模式,隊列名爲"queue.name";一個是發佈訂閱模式,主題名爲"topic.name";最後一個是雙向應答模式,隊列名爲"response.name" 。

import java.util.Random;
import javax.jms.Queue;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
 * 消息隊列生產者
 * @author itdragon
 */
@Service
@EnableScheduling
public class ITDragonProducer {
    
    @Autowired
    private JmsMessagingTemplate jmsTemplate;  
    @Autowired
    private Queue queue;
    @Autowired
    private Topic topic;
    @Autowired
    private Queue responseQueue;
    
    /**
     * 點對點(p2p)模式測試
     * 包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。
     * 發送者將消息發送到一個特定的隊列,隊列保留着消息,直到接收者從隊列中獲取消息。
     */
    @Scheduled(fixedDelay = 5000)
    public void testP2PMQ(){
        for(int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());  
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            jmsTemplate.convertAndSend(this.queue, expression);
            System.out.println("Queue Sender ---------> " + expression);
        }
    }
    
    /**
     * 訂閱/發佈(Pub/Sub)模擬測試
     * 包含三個角色:主題(Topic),發佈者(Publisher),訂閱者(Subscriber) 。
     * 多個發佈者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
     */
    @Scheduled(fixedDelay = 5000)
    public void testPubSubMQ() {
        for (int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            jmsTemplate.convertAndSend(this.topic, expression);
            System.out.println("Topic Sender ---------> " + expression);
        }
    }
    
    /**
     * 雙向應答模式測試
     * P2P和Pub/Sub是MQ默認提供的兩種模式,而雙向應答模式則是在原有的基礎上作了改進。發送者既是接收者,接收者也是發送者。
     */
    @Scheduled(fixedDelay = 5000)
    public void testReceiveResponseMQ(){
        for (int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());  
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            jmsTemplate.convertAndSend(this.responseQueue, expression);
        }
    }
    
    // 接收P2P模式,消費者返回的數據
    @JmsListener(destination = "out.queue")
    public void receiveResponse(String message) {  
        System.out.println("Producer Response Receiver  ---------> " + message);  
    }
}
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * bean配置管理類 
 * @author itdragon
 */
@Configuration
public class ActiveMQBeansConfig {
    
    @Bean    // 定義一個名字爲queue.name的點對點列隊
    public Queue queue() {
        return new ActiveMQQueue("queue.name");
    }
    @Bean    // 定義一個名字爲topic.name的主題隊列
    public Topic topic() {
        return new ActiveMQTopic("topic.name");
    }
    @Bean    // 定義一個名字爲response.name的雙向應答隊列
    public Queue responseQueue() {
        return new ActiveMQQueue("response.name");
    }
}

springboot-activemq-consumer

springboot-activemq-consumer 模擬消費者所在的服務器,主要負責監聽隊列消息。
兩個核心文件:一個是消息接收類,一個是兼容點對點模式和發佈訂閱模式的連接工廠配置類。

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import com.itdragon.utils.ITDragonUtil;
/**
 * 消息隊列消費者
 * @author itdragon
 */
@Service
public class ITDragonConsumer {
    
    // 1. 監聽名字爲"queue.name"的點對點隊列
    @JmsListener(destination = "queue.name", containerFactory="queueListenerFactory")
    public void receiveQueue(String text) {  
        System.out.println("Queue Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));  
    }  
    
    // 2. 監聽名字爲"topic.name"的發佈訂閱隊列
    @JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
    public void receiveTopicOne(String text) {  
        System.out.println(Thread.currentThread().getName() + " No.1 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));  
    }  
    
    // 2. 監聽名字爲"topic.name"的發佈訂閱隊列
    @JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
    public void receiveTopicTwo(String text) {  
        System.out.println(Thread.currentThread().getName() +" No.2 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));  
    }  
    
    // 3. 監聽名字爲"response.name"的接收應答(雙向)隊列
    @JmsListener(destination = "response.name")
    @SendTo("out.queue")
    public String receiveResponse(String text) {
        System.out.println("Response Receiver : " + text + " \t 正在返回數據......");  
        return ITDragonUtil.cal(text).toString();
    }

}
import java.util.concurrent.Executors;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration  
@EnableJms  
public class JmsConfig {  
    
    @Bean  // 開啓pub/Sub模式
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {  
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();  
        factory.setPubSubDomain(true);  
        factory.setConnectionFactory(connectionFactory);  
        return factory;  
    }  
    @Bean  // JMS默認開啓P2P模式
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {  
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();  
        factory.setPubSubDomain(false);  
        factory.setConnectionFactory(connectionFactory);  
        return factory;  
    }  
}

總結

1) 消息中間件能夠解決異步處理,系統解耦,流量削鋒,分佈式系統事務管理等問題。

2) 消息中間件默認支持點對點模式和發佈訂閱模式,實際工做中還可使用雙向應當模式。

3) ActiveMQ是Apache出品,簡單好用,功能強大,能夠處理大部分的業務的開源消息總線。

到這裏 消息中間件企業級應用使用 的文章就寫完了。若是文章對你有幫助,能夠點個"推薦",也能夠"關注"我,得到更多豐富的知識。後續博客計劃是:RocketMQ和Kafka的使用,Zookeeper和相關集羣的搭建。若文中有什麼不對或者不嚴謹的地方,請指正。

相關文章
相關標籤/搜索