五分鐘快速瞭解ActiveMQ,案例簡單且詳細!

最近得閒,探索了一下ActiveMQ。java

ActiveMQ消息隊列,信息收發的容器,做用有異步消息,流量削鋒,應用耦合。
同行還有 Kafka、RabbitMQ、RocketMQ、ZeroMQ、MetaMQ 。

安裝

下載地址:http://activemq.apache.org/co...mysql

window版本的解壓後雙擊/bin/activemq.bat 便可啓動。
或者以服務方式啓動:右鍵管理員運行InstallService.bat,而後在Windows系統的服務中啓動。spring

它有本身的可視化頁面:http://localhost:8161/admin/sql

clipboard.png

默認訪問密碼是:admin/admin
若是須要修改在:/conf/jetty-realm.properties 中修改數據庫

JmsTemplate

springboot上整合的,使用spring 的JmsTemplate來操做ActiveMQ
1、首先在pom文件中導入所需的jar包座標:apache

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
    </dependency>

2、新增一個ActiveMQ的配置文件spring-jms.xml安全

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <!-- 配置JMS鏈接工廠 -->
    <bean id="innerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${spring.activemq.broker-url}" />
    </bean>
    <!--配置鏈接池-->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop">
        <property name="connectionFactory" ref="innerConnectionFactory" />
        <property name="maxConnections" value="100"></property>
    </bean>
    <!-- 配置JMS模板,Spring提供的JMS工具類 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="defaultDestination" ref="JmsSenderDestination" />
        <property name="receiveTimeout" value="10000" />
    </bean>
</beans>

3、在啓動類上配置以生效springboot

@ImportResource(locations={"classpath:/config/spring-jms.xml"})

4、在application.properties中配置ActiveMQ 的鏈接地址session

spring.activemq.broker-url=tcp://localhost:61616
準備就緒;開始寫生產者和消費者,我這裏把生產者和消費者寫在一個項目裏面。在這以前須要明白兩個概念
隊列(Queue)和主題(Topic)

傳遞模型

隊列(Queue)和主題(Topic)是JMS支持的兩種消息傳遞模型:app

  1. 點對點(point-to-point,簡稱PTP)Queue消息傳遞模型:
    一個消息只能被一個消費者消費
  2. 發佈/訂閱(publish/subscribe,簡稱pub/sub)Topic消息傳遞模型:
    一個消息會被多個消費者消費

Queue

  1. 先在spring-jms.xml裏添加配置一個隊列名稱Queue_love

    <bean id="JmsSenderDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>Queue_love</value>
    </constructor-arg>
    </bean>
  2. 建立一個生產者來發送消息;@Qualifier("JmsSenderDestination")指定了發送到上面配置的Queue_love隊列

    @Component
    public class JmsSender {
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Qualifier("JmsSenderDestination")
    @Autowired
    protected Destination destination;
    
    public void sendMessage(final String msg) {
    
        logger.info("QUEUE destination :" + destination.toString() + ", 發送消息:" + msg);
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(final Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }}
  3. 建立一個消費者來消費消息:

    @Component
    public class JmsTemplateListener implements MessageListener {
    
    @Override
    public void onMessage(Message message) {
        final TextMessage tm = (TextMessage) message;
        try {
            logger.info("QUEUE接收信息==="+tm.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }}
  4. 消費者須要在spring-jms.xml配置一下並指明該消費者須要消費哪一個隊列的消息

    <!-- 配置消息隊列監聽者 -->
    <bean id="JmsListener" class="com.mashu.activeMq.jmsTemplate.JmsTemplateListener" />
    
    <!-- 使用spring進行配置 監聽 -->
    <bean id="JmsTemplateListenerContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="pooledConnectionFactory"></property>
    <property name="destination" ref="JmsSenderDestination"></property>
    <property name="messageListener" ref="JmsListener"></property>
    <property name="sessionTransacted" value="false"></property>
    <property name="concurrentConsumers" value="6"></property>
    <property name="concurrency" value="2-4"></property>
    <property name="maxConcurrentConsumers" value="10"></property>
    </bean>

這樣一個簡單的消息隊列收發程序已經寫好了,調用生產者方法,看看結果

clipboard.png

發出的消息立馬就被消費了!
咱們能夠先把消費者註釋掉,只用生產者發送消息就能夠在可視化頁面上看到尚未被消費的消息內容。

clipboard.png

Topic

Topic的方式和Queue相似,只須要在定義隊列的時候calss=org.apache.activemq.command.ActiveMQTopic便可

<bean id="JmsSenderTDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg>
        <value>Topic_love</value>
    </constructor-arg>
</bean>

Message

除了使用createTextMessage()方法發送純字符串消息,還有

  1. 序列化對象的形式
    session.createObjectMessage();
  2. 流的形式,能夠用來傳遞文件
    session.createStreamMessage();
  3. 字節的形式
    session.createBytesMessage();
  4. map的形式
    session.createMapMessage();

安全配置

ActiveMQ在使用的時候和MySQL同樣,也能夠配置用戶名密碼,默認不沒有,咱們能夠打開:

  1. 在conf/activemq.xml添加如下信息(務必在<systemUsage>標籤上面)

    <plugins>
             <simpleAuthenticationPlugin>
                 <users>
                     <authenticationUser username="${activemq.username}"
                      password="${activemq.password}" groups="users,admins"/>
                 </users>
             </simpleAuthenticationPlugin>
         </plugins>
  2. 對應的用戶名密碼在/conf/credentials.properties中配置

    activemq.username=admin
    activemq.password=123456
    guest.password=password
  3. 那麼咱們在項目中的application.properties須要加上也要用戶名密碼:

    spring.activemq.broker-url=tcp://localhost:61616
    spring.activemq.user=admin
    spring.activemq.password=123456

消息持久化

ActiveMQ的持久化機制包含JDBC,KahaDB(默認)、LevelDB
默認保存的消息在\data\kahadb目錄下;下面方法修改成用MySQL(JDBC)保存,
生產者發送消息存儲到MySQL數據庫,消費者消費後消息從數據庫消失。

  1. 修改/conf/activemq.xml
    將:

    <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

    修改成:

    <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
    </persistenceAdapter>

    createTablesOnStartup 默認值是true,每次ActiveMQ啓動的時候都從新建立數據表,通常是首次啓動設置爲true,以後設置爲false。

  2. 添加:在/conf/activemq.xml 中加入MySQL鏈接信息

    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/db_activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
    <property name="poolPreparedStatements" value="true"/>
    </bean>
  3. 添加 mysql-connector-java.jar 到/bin目錄
  4. 新建數據庫db_activemq

重啓ActiveMQ後數據庫產生三個表activemq_acksactivemq_lockactivemq_msgs

相關文章
相關標籤/搜索