ActiveMQ的安裝與入門例子

1、架構和技術介紹

一、簡介

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現html

二、activemq的特性

1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQPjava

2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)linux

3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性web

4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resourceadaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE1.4商業服務器上spring

5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTAapache

6. 支持經過JDBC和journal提供高速的消息持久化windows

7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點服務器

8. 支持Ajaxsession

9. 支持與Axis的整合架構

10. 能夠很容易得調用內嵌JMS provider,進行測試

三、下載和安裝ActiveMQ

一、下載

ActiveMQ的最新版本是5.10.0,但因爲咱們內網下載存在問題,因此目前經過內網只能下載到5.9.0,下載地址:   http://activemq.apache.org/activemq-590-release.html

二、安裝

若是是在windows系統中運行,能夠直接解壓apache-activemq-5.9.0-bin.zip,並運行bin目錄下的activemq.bat文件,此時使用的是默認的服務端口:61616和默認的console端口:8161。

若是是在linux或unix下運行,在bin目錄下執行命令:./activemq setup

三、修改ActiveMQ的服務端口和console端口

A、修改服務端口:打開conf/activemq.xml文件,修改如下紅色字體部分

<transportConnectors>

<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>

</transportConnectors>

B、修改console的地址和端口:打開conf/jetty.xml文件,修改如下紅色字體部分

<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">

<property name="port" value="8162"/>

</bean>

四、經過客戶端代碼試用ActiveMQ

配置maven文件,導入依賴的jar包

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jd.activemq</groupId>
  <artifactId>ActiveMq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>ActiveMq</name>
  <properties>
      <spring-framework.version>3.2.3.RELEASE</spring-framework.version>
  </properties>
  <dependencies>
      <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring-framework.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${spring-framework.version}</version>
    </dependency>
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-aspects</artifactId>  
        <version>${spring-framework.version}</version>  
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>${spring-framework.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>${spring-framework.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.6.1</version>
        <optional>true</optional>
    </dependency>
  </dependencies>
 </project>

發送端代碼:

package com.jd.mq.queue;

import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class QueueSender {
    // 發送次數
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "jd.mq.queue";

    /**

     * <b>function:</b> 發送消息

     * @author yuhailang

     * @param session

     * @param sender

     * @throws Exception

     */    

    public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "queue發送消息第" + (i + 1) + "條";
            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println(map);
            sender.send(map);
        }
    }

    public static void run() throws Exception {

        QueueConnection connection = null;

        QueueSession session = null;
        try {
            // 建立連接工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 經過工廠建立一個鏈接
            connection = factory.createQueueConnection();
            // 啓動鏈接
            connection.start();
            // 建立一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 建立一個消息隊列
            Queue queue = session.createQueue(DESTINATION);
            // 建立消息發送者
            javax.jms.QueueSender sender = session.createSender(queue);
            // 設置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, sender);
            // 提交會話
            session.commit();
        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        QueueSender.run();
    }
}

接收端代碼:

package com.jd.mq.queue;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class QueueReceiver {
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp
    public static final String TARGET = "jd.mq.queue";

    public static void run() throws Exception {
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 建立連接工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 經過工廠建立一個鏈接
            connection = factory.createQueueConnection();
            // 啓動鏈接
            connection.start();
            // 建立一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 建立一個消息隊列
            Queue queue = session.createQueue(TARGET);
            // 建立消息製做者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            receiver.setMessageListener(new MessageListener() { 
                public void onMessage(Message msg) { 
                    if (msg != null) {
                        MapMessage map = (MapMessage) msg;
                        try {
                            System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                } 
            }); 
            // 休眠100ms再關閉
            Thread.sleep(1000 * 100); 
            // 提交會話
            session.commit();
        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        QueueReceiver.run();
    }

}

經過監控查看消息堆棧的記錄:http://localhost:8161/admin/queues.jsp

相關文章
相關標籤/搜索