Java消息中間件入門筆記 - ActiveMQ篇

入門

消息中間件帶來的好處:css

1)解耦:系統解耦
2)異步:異步執行
3)橫向擴展 
4)安全可靠
5)順序保證

栗子:
經過服務調用讓其它系統感知事件發生java

系統之間高耦合
程序執行效率低

經過消息中間件解耦服務調用
linux

1.Linux安裝消息中間件ActiveMQ

1.下載安裝包
wget http://120.221.32.78:6510/mirrors.shu.edu.cn/apache//activemq/5.15.3/apache-activemq-5.15.3-bin.tar.gz
解壓
tar -zxvf apache-activemq-5.15.3-bin.tar.gzweb

2.啓動與關閉spring

進入到bin目錄,使用命令./activemq start啓動服務
使用命令ps -ef |grep activemq查看進程是否存在
使用命令./activemq stop關閉服務

3.安裝驗證sql

訪問地址:http://Linux主機IP:8161/
默認用戶:admin
默認密碼:admin

4.Maven依賴apache

<dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-all</artifactId>
     <version>5.15.3</version>
</dependency>

2.隊列模式的消息演示

隊列模式:
1)客戶端包括生產者和消費者
2)隊列中的消息只能被一個消費者消費
3)消費者能夠隨時消費vim

1.編寫AppProducer類segmentfault

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** * App 生產者-隊列模式 * @author * */
public class AppProducer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {

        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啓動鏈接
        connection.start();

        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5. 建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.建立一個生產者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {

            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            // 8.發佈消息
            producer.send(textMessage);

            System.out.println("消息發送:" + textMessage.getText());
        }

        // 9.關閉鏈接
        connection.close();
    }

}

2.編寫AppConsumer類安全

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** * App 消費者-隊列模式 * @author * */
public class AppConsumer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {

        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啓動鏈接
        connection.start();

        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });

        // 8.關閉鏈接
        //connection.close();
    }

}

3.主題模式的消息演示

主題模式
1)客戶端包括髮布者和訂閱者
2)主體中的消息被全部訂閱者消費
3)消費者不能消費訂閱以前就發送到主題中的消息

1.編寫AppProducer類

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** * App 生產者-主題模式 * @author * */
public class AppProducer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主題的名稱 */
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {

        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啓動鏈接
        connection.start();

        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5. 建立一個目標
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.建立一個生產者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {

            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            // 8.發佈消息
            producer.send(textMessage);

            System.out.println("消息發送:" + textMessage.getText());
        }

        // 9.關閉鏈接
        connection.close();
    }

}

2.編寫AppConsumer類

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** * App 消費者-主題模式 * @author * */
public class AppConsumer {
    /** 指定ActiveMQ服務的地址 */
    private static final String URL = "tcp://127.0.0.1:61616";
    /** 指定主題的名稱 */
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {

        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啓動鏈接
        connection.start();

        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });

        // 8.關閉鏈接
        //connection.close();
    }

}

4.Spring集成ActiveMQ

1.Maven依賴

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.3</version>
        </dependency>

        <!--spring依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.7.RELEASE</version>
        </dependency>

2.spring配置文件

1)common.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">

    <!-- 開啓註解:可使用@Autowired等 -->
    <context:annotation-config />

    <!-- ActiveMQ爲咱們提供的ConnectionFactory -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://118.89.177.110:61616"></property>
    </bean>

    <!-- Spring jms爲咱們提供的鏈接池 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
    </bean>

    <!-- 隊列模式 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="queue"></constructor-arg>
    </bean>

    <!-- 主題模式-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"></constructor-arg>
    </bean>

</beans>

2)consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">

    <!-- 引入配置 -->
    <import resource="common.xml"/>

    <!-- 配置消息監聽器 -->
    <bean id="consumerMessageListener" class="cn.zyzpp.spring.consumer.ConsumerMessageListener"/>

    <!-- 配置消息容器 -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <!-- 隊列模式 -->
        <!-- <property name="destination" ref="queueDestination" /> -->
        <!-- 主題模式 -->
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>

</beans>

3)producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd">

    <!-- 引入配置 -->
    <import resource="common.xml"/>

    <!-- 配置JmsTemplate發送消息 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"></property>
    </bean>

    <!-- 注入實體Bean而不是接口 -->
    <bean class="cn.zyzpp.spring.producer.ProducerServiceImpl"></bean>

</beans>

3)生產者消費者編碼

ProducerService.java

package cn.zyzpp.spring.producer;
/** * Created by yster@foxmail.com */
public interface ProducerService {
    void sendMessage(String message);
}

ProducerServiceImpl.java

package cn.zyzpp.spring.producer;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/** * Created by yster@foxmail.com 2018年4月24日 下午7:13:27 */
public class ProducerServiceImpl implements ProducerService {

    @Autowired
    private JmsTemplate jmsTemplate;

// @Resource(name = "queueDestination") //隊列模式
    @Resource(name = "topicDestination") //主題模式
    private Destination destination;

    @Override
    public void sendMessage(String message) {
        //使用jmsTemplate發送消息
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                //建立一個消息
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
        System.out.println("發送消息:" + message);
    }

}

AppProducer.java

package cn.zyzpp.spring.producer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/** * Created by yster@foxmail.com * 2018年4月24日 下午7:29:30 */
public class AppProducer {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:producer.xml");
        ProducerService service = context.getBean(ProducerService.class);
        for(int i=0; i<100; i++){
            service.sendMessage("消息發送:"+i);
        }
        context.close();
    }

}

ConsumerMessageListener.java

package cn.zyzpp.spring.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/** * Created by yster@foxmail.com * 2018年4月24日 下午8:48:39 */
public class ConsumerMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接受消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

AppConsumer.java

package cn.zyzpp.spring.consumer;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/** * Created by yster@foxmail.com * 2018年4月24日 下午7:29:30 */
public class AppConsumer {

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:consumer.xml");

    }

}

5.ActiveMQ集羣

爲何要對消息中間件集羣

實現高可用,以排除單點故障引發的服務中斷
實現負載均衡,以提高效率爲更多客戶提供服務

集羣方式

客戶端集羣:讓多個消費者消費同一個隊列
Broker cluster:多個Broker之間同步消息
Master Slave:實現高可用

ActiveMQ失效轉移(failover)-客戶端配置

容許當其中一臺消息服務器宕機時,客戶端在傳輸層上從新鏈接到其它消息服務器
語法:failover:(uri1,…,uriN)?transportOptions

transportOptions參數說明

randomize默認爲true,表示在URI列表中選擇URI鏈接時是否採用隨機策略
initialReconnectDelay默認爲10,單位毫秒,表示第一次嘗試重連之間等待的時間
maxReconnectDelay默認爲30000,單位毫秒,最長重連的時間間隔

Broker cluster集羣配置-原理

NetworkConnector(網絡鏈接器)

網絡鏈接器主要用於配置ActiveMQ服務器與服務器之間的網絡通信方式,
用於服務器透傳消息。
網絡鏈接器分爲靜態鏈接器和動態鏈接器

靜態鏈接器

動態鏈接器

5-2 ActiveMQ集羣理論
ActiveMQ Master Slace集羣方案

Share nothing storage master/slave(已過期,5.8+後移除)
Shared storage master/slave 共享存儲
Replicated LevelDB Store基於負責的LevelDB Store

共享存儲集羣的原理

基於複製的LevelDB Store的原理

兩種集羣方式對比

三臺服務器的完美集羣方案

ActiveMQ集羣配置方案

配置過程

1.節點準備

mkdir activemq建立目錄
cp -rf apache-activemq-5.15.3 activemq/activemq-a
cp -rf apache-activemq-5.15.3 activemq/activemq-b
cp -rf apache-activemq-5.15.3 activemq/activemq-c
cd activemq
mkdir kahadb

2.配置a節點

cd activemq-a/
cd conf/
vim activemq.xml
    <networkConnectors>
              <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
    </networkConnectors>
vim jetty.xml:配置管理端口號,a節點使用默認端口,無須配置

3.配置b節點

vim activemq.xml
配置網絡鏈接器
    <networkConnectors>
      <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
    </networkConnectors>
配置持久化存儲路徑
    <persistenceAdapter>
        <kahaDB directory="/***/activemq/kahadb"/>
    </persistenceAdapter>
配置服務端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
vim jetty.xml
配置管理端口號
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>

4.配置c節點

vim activemq.xml
配置網絡鏈接器
    <networkConnectors>
      <networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
    </networkConnectors>
配置持久化存儲路徑
    <persistenceAdapter>
        <kahaDB directory="/***/activemq/kahadb"/>
    </persistenceAdapter>
配置服務端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
vim jetty.xml
配置管理端口號
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>

5.啓動服務

回到activemq目錄,分別啓動a,b,c三個節點

./activemq-a/bin/activemq start ./activemq-b/bin/activemq start ./activemq-c /bin/activemq start

檢查是否都啓動成功

ps -ef |grep activemq

檢查是否對外提供服務,即端口是否被監聽(佔用)

netstat -anp |grep 61616
netstat -anp |grep 61617
netstat -anp |grep 61618

檢查發現61618即c節點沒有提供服務,可是c節點的進程是啓動成功了的。由於b節點和c點擊是master/slave配置,如今b節點獲取到了共享文件夾的全部權,因此c節點正在等待得到資源,而且提供服務。即c節點在未得到資源以前,是不提供服務的。

測試,把b節點殺掉,看c節點能不能提供61618的服務

./activemq-b/bin/activemq stop
netstat -anp |grep 61618
./activemq-b/bin/activemq start
netstat -anp |grep 61617

檢查發現,從新啓動b節點後,b節點61617端口並無提供服務,是由於如今b節點成爲了slave節點,而c節點成爲了master節點。因此,如今b節點啓動了,可是它並不對外提供服務。只有當c節點出現問題後,b節點纔對外提供服務。

6.經過代碼測試集羣配置是否生效

生產者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** * App 生產者-隊列模式-集羣配置測試 * @author * */
public class AppProducerTest {
    /** failover 爲狀態轉移的存在部分 * 因a節點只做爲消費者使用,因此這裏不配置61616節點了。 * */
    private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {

        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啓動鏈接
        connection.start();

        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5. 建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.建立一個生產者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {

            // 7.建立消息
            TextMessage textMessage = session.createTextMessage("test" + i);

            // 8.發佈消息
            producer.send(textMessage);

            System.out.println("消息發送:" + textMessage.getText());
        }

        // 9.關閉鏈接
        connection.close();
    }

}

消費者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/** * App 消費者-隊列模式-集羣配置測試 * @author * */
public class AppConsumerTest {
    /** failover 爲狀態轉移的存在部分 * */
    private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    /** 指定隊列的名稱 */
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {

        // 1.建立ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.建立Connection
        Connection connection = connectionFactory.createConnection();

        // 3.啓動鏈接
        connection.start();

        // 4.建立會話(第一個參數:是否在事務中處理)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.建立一個目標
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.建立一個消費者
        MessageConsumer consumer = session.createConsumer(destination);

        // 7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:" + textMessage.getText());
                } catch (JMSException e) {
                    System.out.println("接收消息異常:");
                    e.printStackTrace();
                }
            }
        });

        // 8.關閉鏈接
        //connection.close();
    }

}

管理界面查看消息

http://127.0.0.1:8161
http://127.0.0.1:8162
http://127.0.0.1:8163

6.擴展

 

本文記錄簡要,推薦閱讀 慕課網-Java消息中間件學習總結

相關文章
相關標籤/搜索