ActivityMq的使用(小例子)

1、ActivityMq的介紹:html

  1.什麼是消息中間件?與傳統的傳輸通信有什麼區別?java

    異步,無需等待,消息存放在隊列裏面。spring

  2.爲何要使用消息中間件?apache

    消息中間件能夠解決高併發。瀏覽器

    兩種通信方式:01.點對點通信。(Queue)緩存

           02.發佈訂閱。(Topic)服務器

    生產者:發送消息,提供接口的,主要向隊列中發送消息session

    隊列:存放消息地址併發

    消息:發送的報文信息異步

    消費者:調用接口的,主要從隊列中獲取消息

  3.步驟:

    0一、生產者向隊列進行發送消息,若是消費者不在,隊列就會將消息緩存

    0二、消費者向隊列中獲取到消息以後,消費成功以後, 該消息隊列直接被清除掉。(不清除就會產生重複消費問題)

  4.生產者向隊列中生產高併發流量,消費者會不會掛掉?

    不會,由於隊列會緩存消息。

  5.爲何MQ可以解決高併發問題?

    不會立馬處理那麼多的消息,隊列會進行緩存,進行排隊。

  6.JMS:java發送消息,客戶端與服務器進行通信的方式,能夠理解成java的消息 中間件

    消息模型:

        0一、點對點通信方式:

        流程:生產者 隊列 消費者。

        特色:一對一 異步通信,生產者生產的消息 只容許有一個消費者進行消費。

        0二、發佈訂閱:

        流程:生產者 主題 消費者

        特色:發佈訂閱 一個生產者 能夠多個消費者 一對多。

2、ActivityMq安裝:

  1.下載ActivityMQ

  官方網站:http://activemq.apache.org/download.html 

  2.運行ActivityMQ

   解壓apache-activemq-5.15.9-bin.zip,進入該文件夾的bin目錄。

     有兩種方式啓動ActivityMQ服務

     0一、在bin目錄下用cmd命令activemq start 啓動服務,關掉黑窗口服務即中止

     0二、進入bin目錄下對應電腦位數的文件夾,64位進入win64,雙擊InstallService.bat批處理文件安裝ActiveMQ服務,而後打開任務管理器啓動ActiveMQ服務

        啓動服務後打開瀏覽器輸入:http://localhost:8161/admin/  輸入默認設置的帳戶:admin密碼admin   

        點擊隊列(Queues),輸入隊列名稱(Queue Name)FirstQueue,而後點建立(Craete)

  3.建立maven項目

     添加一個activemq-all-5.15.3.jar便可,在pom.xml加入

   

<dependencies>
 	<dependency>
   		<groupId>org.apache.activemq</groupId>
   		<artifactId>activemq-all</artifactId>
   		<version>5.15.3</version>
	</dependency>
  </dependencies>

3、建立producer.java和consumer.java

  producer.java

package main;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    // 默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    // 默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    // 默認鏈接地址
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // 鏈接工廠
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

        try {
            // 鏈接
            Connection connection = connectionFactory.createConnection();
            // 啓動鏈接
            connection.start();
            // 建立session
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 消息目的地
            Destination destination = session.createQueue("FirstQueue");
            // 消息生產者
            MessageProducer producer = session.createProducer(null);
            // 設置不持久化,此處學習,實際根據項目決定
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            // 發送消息
            for (int i = 0; i < 10; i++) {
                // 建立一條文本消息
                TextMessage message = session.createTextMessage("ActiveMQ: 這是第 " + i + " 條消息");
                // Destination destination = session.createTopic("FirstQueueTopic");
                // 生產者發送消息
                producer.send(destination, message);
            }
            session.commit();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  consumer.java

package main;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    // 默認鏈接用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    // 默認鏈接密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    // 默認鏈接地址
    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        // 鏈接工廠
        ConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            // 鏈接
            Connection connection = connectionFactory.createConnection();
            // 啓動鏈接
            connection.start();
            // 建立session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 消息目的地
            Destination destination = session.createQueue("FirstQueue");
            // Destination destination = session.createTopic("FirstQueueTopic");
            // 消息消費者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                if (message != null) {
                    System.out.println("接收到消息: " + message.getText());
                } else {
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

4、測試:

  先運行producer.java,再運行consumer.java。

5、項目中使用:

  spring-activemq.xml配置文件:

  生產者

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd"> 
 
    <amq:connectionFactory id="amqConnectionFactory" 
    	brokerURL="${activemq.brokerURL}" userName="${activemq.username}" password="${activemq.password}" />    	
 
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>
     
    <!-- 定義JmsTemplate的Queue類型 -->
	<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
		<constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(發佈/訂閱),即隊列模式 -->
		<property name="pubSubDomain" value="false" />
	</bean>
     
    <!-- 定義JmsTemplate的Topic類型 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		<constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(發佈/訂閱) -->
		<property name="pubSubDomain" value="true" />
	</bean>
     
</beans>

  

config.properties配置用戶名,密碼,url等信息。

activemq.brokerURL=failover:(tcp://10.135.100.59:9203,tcp://10.135.100.60:9203)
#activemq.brokerURL=tcp://10.135.100.59:9203
activemq.username=admin
activemq.password=admin

代碼中的使用:

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;// 經過@Qualifier修飾符來注入對應的bean

    /** 
     * @Title: send 發送一條消息到指定的隊列(目標)
     * @Description: 
     * @param queueName 隊列名稱
     * @param message 消息內容
     * @date 2016年10月12日 上午10:15:56
     */
    public void send(String queueName, final String message) {
        jmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
相關文章
相關標籤/搜索