慕課網_《Java消息中間件》學習總結

時間:2017年07月22日星期六
說明:本文部份內容均來自慕課網。@慕課網:http://www.imooc.com
教學源碼:無
學習源碼:https://github.com/zccodere/s...java

第一章:課程介紹

1-1 課程安排

Java消息中間件(入門篇)git

爲何須要使用消息中間件
消息中間件概述
JMS規範
JMS代碼演練

Java消息中間件(拓展篇)github

ActiveMQ集羣配置
消息中間件在大型系統中的最佳實踐
使用其它消息中間件

1-2 使用消息中間件緣由

經過服務調用讓其它系統感知事件發生web

系統之間高耦合
程序執行效率低

clipboard.png

經過消息中間件解耦服務調用spring

clipboard.png

生活中的案例apache

微信公衆號
老師在黑板上寫字
電視機
等等

消息中間件帶來的好處vim

解耦:系統解耦
異步:異步執行
橫向擴展 
安全可靠
順序保證

橫向擴展解釋安全

當登陸系統,須要不少用戶登陸。這些消息所有須要告知積分系統,去增長積分,而增長積分這個處理過程可能比較麻煩、比較耗時。這個時候,能夠啓動多臺積分系統,來同時消費這個消息中間件裏面的登陸消息,達到橫向擴展的做用。

第二章:概述

2-1 消息中間件概述

什麼是中間件服務器

非底層操做系統軟件,非業務應用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統稱爲中間件

什麼是消息中間件微信

關注於數據的發送和接受,利用高效可靠的異步消息傳遞機制集成分佈式系統

示意圖

clipboard.png

什麼是JMS

Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。

什麼是AMQP

AMQP(Advanced Message Queuing Protocol)是一個提供統一消息服務的應用層標準高級消息隊列協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。

JMS和AMQP對比

clipboard.png

ActiveMQ

ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ是一個徹底支持JMS1.1和J2EE1.4規範的JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今J2EE應用中間件仍然扮演者特殊的地位。

ActiveMQ特性

多種語言和協議編寫客戶端。
    語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
    應用協議:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP
徹底支持JMS1.1和J2EE1.4規範(持久化、XA消息、事務)
虛擬主題、組合目的、鏡像隊列

RabbitMQ

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ特性

支持多種客戶端
    如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
AMQP的完整實現(vhost、Exchange、Binding、Routing Key等)
事務支持/發佈確認
消息持久化

Kafka

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,是一個分佈式、分區的、可高的分佈式日誌存儲服務。它經過一種獨一無二的設計提供了一個消息系統的功能。

Kafka特性

經過O(1)的磁盤數據結構提供消息的持久化,
    這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能
高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息
Partition、Consumer Group

綜合評價

clipboard.png

第三章:JMS規範

3-1 JMS規範

Java消息服務定義

Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。

JMS相關概念

提供者:實現JMS規範的消息中間件服務器
客戶端:發送或接收消息的應用程序
生產者/發佈者:建立併發送消息的客戶端
消費者/訂閱者:接收並處理消息的客戶端
消息:應用程序之間傳遞的數據內容
消息模式:在客戶端之間傳遞消息的模式,JMS中定義了主題和隊列兩種模式

JMS消息模式:隊列模式

客戶端包括生產者和消費者
隊列中的消息只能被一個消費者消費
消費者能夠隨時消費隊列中的消息

隊列模型示意圖

clipboard.png

JMS消息模式:主題模型

客戶端包括髮布者和訂閱者
主題中的消息被全部訂閱者消費
消費者不能消費訂閱以前就發送到主題中的消息

主題模型示意圖

clipboard.png

JMS編碼接口

ConnectionFactory:用於建立鏈接到消息中間件的鏈接工廠
Connection:表明了應用程序和消息服務器之間的通訊鏈路
Destination:指消息發佈和接收的地點,包括隊列和主題
Session:表示一個單線程的上下文,用於發送和接收消息
MessageConsumer:由會話建立,用戶接收發送到目標的消息
MessageProducer:由會話建立,用於發送消息到目標
Message:是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體

JMS編碼接口之間的關係

clipboard.png

第四章:使用ActiveMQ

4-1 Windows安裝ActiveMQ

在Windows安裝ActiveMQ

下載安裝包
直接啓動
使用服務啓動

安裝驗證

訪問地址:http://127.0.0.1:8161/
默認用戶:admin
默認密碼:admin

4-2 Linux安裝ActiveMQ

在Linux安裝ActiveMQ

下載並解壓安裝包
啓動

啓動驗證

進入到bin目錄,使用命令./activemq start啓動服務
使用命令ps -ef |grep activemq查看進程是否存在
使用命令./activemq stop關閉服務

安裝驗證

訪問地址:http://Linux主機IP:8161/
默認用戶:admin
默認密碼:admin

4-3 隊列模式的消息演示

使用JMS接口規範鏈接ActiveMQ

建立生產者
建立消費者
建立發佈者
建立訂閱者

回顧JMS編碼接口之間的關係

clipboard.png

代碼演示

1.編寫AppProducer類

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生產者-隊列模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppProducer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啓動鏈接
        connection.start();
        
        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一個生產者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.發佈消息
            producer.send(textMessage);
            
            System.out.println("消息發送:" + textMessage.getText());
        }
        
        // 9.關閉鏈接
        connection.close();
    }
    
}

2.編寫AppConsumer類

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消費者-隊列模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啓動鏈接
        connection.start();
        
        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.關閉鏈接
        //connection.close();
    }
    
}

4-4 主題模式的消息演示

代碼演示

1.編寫AppProducer類

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生產者-主題模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppProducer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主題的名稱 */
    private static final String TOPIC_NAME = "topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啓動鏈接
        connection.start();
        
        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 建立一個目標
        Destination destination = session.createTopic(TOPIC_NAME);
        
        // 6.建立一個生產者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.發佈消息
            producer.send(textMessage);
            
            System.out.println("消息發送:" + textMessage.getText());
        }
        
        // 9.關閉鏈接
        connection.close();
    }
    
}

2.編寫AppConsumer類

package com.myimooc.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消費者-主題模式
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主題的名稱 */
    private static final String TOPIC_NAME = "topic-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啓動鏈接
        connection.start();
        
        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.建立一個目標
        Destination destination = session.createTopic(TOPIC_NAME);
        
        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.關閉鏈接
        //connection.close();
    }
    
}

4-5 Spring jms理論

使用Spring集成JMS鏈接ActiveMQ

ConnectionFactory:用於管理鏈接的鏈接工廠
JmsTemplate:用於發送和接收消息的模版類
MessageListener:消息監聽器

ConnectionFactory

一個Spring爲咱們提供的鏈接池
JmsTemplate每次發消息都會從新建立鏈接,會話和productor
Spring中提供了SingleConnectFactory和CachingConnectionFactory

JmsTemplate

是Spring提供的,只需向Spring容器內註冊這個類就可使用JmsTemplate方便的操做jms
JmsTemplate類是線程安全的,能夠在整個應用範圍使用

MessageListener

實現一個onMessage方法,該方法只接收一個Message參數

4-6 Spring jms演示

代碼演示

1.建立名爲jmsspring的maven項目POM文件以下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.myimooc</groupId>
    <artifactId>jmsspring</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>jmsspring</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.1.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2.完成後的目錄結構以下

clipboard.png

源碼請到個人github地址查看

3.測試

使用Postman向ProducerController發起請求,將消息發送出去

clipboard.png

對應的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息

clipboard.png

第五章:大型系統最佳實踐

5-1 ActiceMQ集羣

爲何要對消息中間件集羣

實現高可用,以排除單點故障引發的服務中斷
實現負載均衡,以提高效率爲更多客戶提供服務

集羣方式

客戶端集羣:讓多個消費者消費同一個隊列
Broker cluster:多個Broker之間同步消息
Master Slave:實現高可用

ActiveMQ失效轉移(failover)-客戶端配置

容許當其中一臺消息服務器宕機時,客戶端在傳輸層上從新鏈接到其它消息服務器
語法:failover:(uri1,…,uriN)?transportOptions

transportOptions參數說明

randomize默認爲true,表示在URI列表中選擇URI鏈接時是否採用隨機策略
initialReconnectDelay默認爲10,單位毫秒,表示第一次嘗試重連之間等待的時間
maxReconnectDelay默認爲30000,單位毫秒,最長重連的時間間隔

Broker cluster集羣配置-原理

clipboard.png

NetworkConnector(網絡鏈接器)

網絡鏈接器主要用於配置ActiveMQ服務器與服務器之間的網絡通信方式,用於服務器透傳消息
網絡鏈接器分爲靜態鏈接器和動態鏈接器

靜態鏈接器

clipboard.png

動態鏈接器

clipboard.png

5-2 ActiveMQ集羣理論

ActiveMQ Master Slace集羣方案

Share nothing storage master/slave(已過期,5.8+後移除)
Shared storage master/slave 共享存儲
Replicated LevelDB Store基於負責的LevelDB Store

共享存儲集羣的原理

clipboard.png

clipboard.png

基於複製的LevelDB Store的原理

clipboard.png

兩種集羣方式對比

clipboard.png

三臺服務器的完美集羣方案

clipboard.png

5-3 ActiveMQ集羣實踐

ActiveMQ集羣配置方案

clipboard.png

配置過程

1.節點準備

mkdir activemq建立目錄
cp -rf apache-activemq-5.15.0 activemq/activemq-a
cp -rf apache-activemq-5.15.0 activemq/activemq-b
cp -rf apache-activemq-5.15.0 activemq/activemq-c
cd activemq
mkdir kahadb

2.配置a節點

cd activemq-a/
cd conf/
vim activemq.xml
    <networkConnectors>
              <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
    </networkConnectors>
vim jetty.xml:配置管理端口號,a節點使用默認端口,無須配置

3.配置b節點

vim activemq.xml
配置網絡鏈接器
        <networkConnectors>
      <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
    </networkConnectors>
配置持久化存儲路徑
    <persistenceAdapter>
        <kahaDB directory="/studio/activemq/kahadb"/>
    </persistenceAdapter>
配置服務端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
vim jetty.xml
配置管理端口號
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>

4.配置c節點

vim activemq.xml
配置網絡鏈接器
        <networkConnectors>
      <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
    </networkConnectors>
配置持久化存儲路徑
    <persistenceAdapter>
        <kahaDB directory="/studio/activemq/kahadb"/>
    </persistenceAdapter>
配置服務端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
vim jetty.xml
配置管理端口號
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>

5.啓動服務
回到activemq目錄,分別啓動a,b,c三個節點

./activemq-a/bin/activemq start
./activemq-b/bin/activemq start
./activemq-c /bin/activemq start

檢查是否都啓動成功

ps -ef |grep activemq

檢查是否對外提供服務,即端口是否被監聽(佔用)

netstat -anp |grep 61616
netstat -anp |grep 61617
netstat -anp |grep 61618

檢查發現61618即c節點沒有提供服務,可是c節點的進程是啓動成功了的。由於b節點和c點擊是master/slave配置,如今b節點獲取到了共享文件夾的全部權,因此c節點正在等待得到資源,而且提供服務。即c節點在未得到資源以前,是不提供服務的。

測試,把b節點殺掉,看c節點能不能提供61618的服務

./activemq-b/bin/activemq stop
netstat -anp |grep 61618
./activemq-b/bin/activemq start
netstat -anp |grep 61617

檢查發現,從新啓動b節點後,b節點61617端口並無提供服務,是由於如今b節點成爲了slave節點,而c節點成爲了master節點。因此,如今b節點啓動了,可是它並不對外提供服務。只有當c節點出現問題後,b節點纔對外提供服務。

6.經過代碼測試集羣配置是否生效

生產者

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 生產者-隊列模式-集羣配置測試
 * @author ZhangCheng on 2017-07-25
 *
 */
public class AppProducerTest {
    /** failover 爲狀態轉移的存在部分
     * 因a節點只做爲消費者使用,因此這裏不配置61616節點了。
     * */
    private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啓動鏈接
        connection.start();
        
        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5. 建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一個生產者
        MessageProducer producer = session.createProducer(destination);
        
        for (int i = 0; i < 100; i++) {
            
            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            
            // 8.發佈消息
            producer.send(textMessage);
            
            System.out.println("消息發送:" + textMessage.getText());
        }
        
        // 9.關閉鏈接
        connection.close();
    }
    
}

消費者

package com.myimooc.jms.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * App 消費者-隊列模式-集羣配置測試
 * @author ZhangCheng on 2017-07-22
 *
 */
public class AppConsumerTest {
    /** failover 爲狀態轉移的存在部分
     * */
    private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";
    
    public static void main(String[] args) throws JMSException {
        
        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();
        
        // 3.啓動鏈接
        connection.start();
        
        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 5.建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);
        
        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);
        
        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });
        
        // 8.關閉鏈接
        //connection.close();
    }
    
}

運行生產者,而後到管理界面查看消息發送到了那裏

http://127.0.0.1:8161
http://127.0.0.1:8162
http://127.0.0.1:8163

查看發現,8162沒法訪問,是由於b節點是slave節點,不提供服務,消息都發送到了c節點

把8163即c節點宕掉後,運行消費者,查看消息是否可以使用

./activemq-c/bin/activemq stop

5-4 企業級系統中的最佳實踐

實際業務場景分析

clipboard.png

實際業務場景特色

子業務系統都有集羣的可能性
同一個消息會廣播給關注該類消息的全部子業務系統
同一類消息在集羣中被負載消費
業務的發生和消息的發佈最終一致性

須要解決的問題

不一樣業務系統分別處理同一個消息,同一業務系統負載處理同類消息
解決消息發送時的一致性問題
解決消息處理的冪等性問題
基於消息機制創建事件總線

集羣系統處理消息方案-使用JMS級聯的解決方案

clipboard.png

集羣系統處理消息方案-使用ActiveMQ的虛擬主題解決方案

發佈者:將消息發佈到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST
消費者:從隊列中獲取消息,在隊列名中表名本身身份,如Consumer.A.VirtualTopic.TEST

解決消息發送時的一致性問題-使用JMS中XA系列接口保證強一致性

引入分佈式事務
要求業務操做必須支持XA協議

解決消息發送時的一致性問題-使用消息表的本地事務解決方案

clipboard.png

解決消息發送時的一致性問題-使用內存日誌的解決方案

clipboard.png

解決消息處理的冪等性問題

所謂冪等性問題,是指屢次執行所產生的影響(結果)與一次執行所產生的影響(結果)同樣。好比:支付成功後,支付寶會發起屢次通知給業務系統,要求業務系統可以處理這些重複的消息,可是又不重複處理訂單。若是在消息處理系統中保證冪等性,會增長系統複雜度,咱們能夠統一處理冪等性後,再將消息發送給消息處理系統。

解決消息處理的冪等性問題-使用消息表的本地事務解決方案

clipboard.png

解決消息處理的冪等性問題-使用內存日誌的解決方案

clipboard.png

基於消息機制的事件總線-什麼是事件驅動架構

事件驅動架構(Event Driven Architecture,EDA)定義了一個設計和實現一個應用系統的方法學,在這個系統裏事件可傳輸於鬆散耦合的組件和服務之間。特色:有事我叫你,沒事別煩我

事件驅動架構模型

clipboard.png

該教師正在開發該事件總線的框架,github地址https://github.com/jovezhao/nest

第六章:使用其它消息中間件

6-1 使用其它消息中間件

分析須要作的事

解決各業務系統集羣處理同一條消息
實現本身的消息提供者

經常使用消息中間件

ActiveMQ
RabbitMQ
Kafka

集成RabbitMQ

RabbitMQ:使用交換器綁定到隊列

示意圖

clipboard.png

RabbitMQ消息提供者源碼解析

建立ConnectionFactory
建立Connection
建立Channel
定義Exchange
定義Queue而且綁定隊列

集成Kafka

Kafka使用group.id分組消費者

配置消息者參數group.id相對時對消息進行負載處理
配置服務器partitions參數,控制同一個group.id下的consumer數量小於partitions
Kafka只保證同一個partition下的消息是有序的

第七章:課程總結

7-1 課程總結

clipboard.png

相關文章
相關標籤/搜索