ActiveMQ入門練習

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。java

  JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。git

  JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它相似於JDBC(Java Database Connectivity)。這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您可以經過消息收發服務(有時稱爲消息中介程序或路由器)從一個 JMS 客戶機向另外一個 JMS客戶機發送消息。
  本篇我將和你們一塊兒分享一下Apache JMS的使用,ApacheMQ: http://activemq.apache.org/,下載後解壓,這裏須要說明,使用以前請先配置Java環境變量。啓動後打開瀏覽器輸入:http://localhost:8161/admin,這裏須要經過密碼登陸,默認用戶名:admin 密碼:admin(用戶名密碼是在conf/users.properties中配置的)
  這樣咱們的環境就配置好了。更詳細的解說: http://blog.csdn.net/clj198606061111/article/details/38145597
  下面就是咱們如何進行消息的發送和接收,這裏有兩種方式:點對點收發;發佈訂閱。下面咱們就一一進行探討。
  首先搭建一下測試環境,這裏我經過Maven進行項目的建立,咱們須要引入activemq的核心activemq-all-5.15.2.jar,以及單元測試
  <dependencies>
      <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>${active-mq-version}</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
  </dependencies>

  下面咱們就開始消息的收發,先來探討一下點對點的消息收發:github

  一、消息的發送數據庫

  原理上咱們經過ConnectionFactory會話工廠 ---> 建立一個會話鏈接 ---> 經過會話鏈接建立一個會話線程 ---> 經過會話線程建立一個消息隊列 ---> 經過會話線程和消息隊列建立一個消息生產者(MessageProducer) ---> 最後由消息生產者進行消息的發送。apache

private static ConnectionFactory factory;//會話鏈接工廠
private static Connection connection;//會話鏈接
private static Session session;//會話接收或發送消息線程
private static Destination destination;//消息隊列
private static MessageProducer messageProducer;//消息發送者

  經過ActiveMQConnectionFactory建立會話工廠瀏覽器

factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);//建立會話鏈接工廠

  設置會話鏈接額度地址和用戶名密碼在(這裏我使用的是默認的地址和用戶名密碼)服務器

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

  下面封裝了一下上面的過程:session

    /**
     * @Description 發送消息
     * @param queryName 消息隊列名稱
     * @param msg 消息內容
     * @return
     *
     * @author 高尚
     * @version 1.0
     * @date 建立時間:2018年1月31日 下午3:54:22
     */
    public static boolean producerSendQueryMessage(final String queryName, final String msg){
        boolean flag = true;
        try {
            connection = factory.createConnection();//建立會話鏈接
            connection.start();//啓動會話鏈接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立會話線程
            destination = session.createQueue(queryName);//建立消息隊列
            messageProducer = session.createProducer(destination);//建立會話生成者
            Message message = session.createTextMessage(msg);//建立消息對象
            messageProducer.send(message);//發送消息
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
            flag = false;
        }finally {
            if(null != connection){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        return flag;
    }    

  這裏單獨說一下connection.createSession生成會話線程:異步

Session session = connection.createSession(paramA,paramB);
paramA是設置事務,paramB是設置acknowledgment mode

paramA 取值有:
一、true:支持事務
爲true時:paramB的值忽略, acknowledgment mode被jms服務器設置爲SESSION_TRANSACTED 。  
二、false:不支持事務 
爲false時:paramB的值可爲Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE其中一個。

paramB 取值有:
1、Session.AUTO_ACKNOWLEDGE:爲自動確認,客戶端發送和接收消息不須要作額外的工做。
2、Session.CLIENT_ACKNOWLEDGE:爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。 
3、DUPS_OK_ACKNOWLEDGE:容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。
四、SESSION_TRANSACTED

  到這裏咱們的消息發送模塊就封裝完成,下面咱們看一下消息的接收:分佈式

  原理上咱們經過ConnectionFactory會話工廠 ---> 建立一個會話鏈接 ---> 經過會話鏈接建立一個會話線程 ---> 經過會話線程建立一個消息隊列 ---> 經過會話線程和消息隊列建立一個消息消費者(MessageConsumer) ---> 最後由消息消費者進行消息獲取

    /**
     * @Description 接收消息隊列中的消息
     * @param queryName 消息隊列名稱
     * @return
     *
     * @author 高尚
     * @version 1.0
     * @date 建立時間:2018年1月31日 下午4:24:14
     */
    public static void consumerGetQueryMessage(final String queryName){
        try {
            connection = factory.createConnection();//建立會話鏈接
            connection.start();//啓動會話鏈接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queryName);
            messageConsumer = session.createConsumer(destination);
            while(true){
                TextMessage message = (TextMessage) messageConsumer.receive();
                if(null != message){
                    System.out.println(queryName+"發送消息:"+message.getText());
                }
            }
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("消費消息異常");
        }finally {
            if(null != connection){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  最後單獨說一下messageConsumer.receive方法:

messageConsumer.receive();//一直等到消息
messageConsumer.receive(1000);//等到消息1秒鐘
messageConsumer.receiveNoWait();//不等待消息

  固然這樣的接收消息方式是否是感受很low,固然咱們還能夠經過監聽器來實現消息接收的監聽(MessageListener),咱們實現MessageListener接口,自定義消息接收監聽器:

/**
 * @Description 消息監聽器
 * @author 高尚
 * @version 1.0
 * @date 建立時間:2018年1月31日 下午4:52:36
 */
public class MsgListener implements MessageListener {
    
    /**
     * 監聽的會話隊列
     */
    private static String queryName;

    @Override
    public void onMessage(Message msg) {
        TextMessage textMsg = (TextMessage) msg;
        try {
            if(null != textMsg){
                System.out.println("【" + queryName + "】發送的消息:" + textMsg.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
            System.out.println("獲取會話消息異常");
        }
    }

    public MsgListener(String queryName) {
        super();
        // TODO Auto-generated constructor stub
        this.queryName = queryName;
    }
    
}

  而後咱們須要簡單修改一個消息接收策略:

    /**
     * @Description 經過Listener接收消息隊列中的消息
     * @param queryName 消息隊列名稱
     * @return
     *
     * @author 高尚
     * @version 1.0
     * @date 建立時間:2018年1月31日 下午4:24:14
     */
    public static void consumerGetQueryMessageListener(String queryName) {
        try {
            connection = factory.createConnection();//建立會話鏈接
            connection.start();//啓動會話鏈接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queryName);
            messageConsumer = session.createConsumer(destination);
            MsgListener msgListener = new MsgListener(queryName);
            messageConsumer.setMessageListener(msgListener);
            while(true){
                Thread.sleep(10000);
            }
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("消費消息異常");
        }finally {
            if(null != connection){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  最後咱們一塊兒看一下消息的發佈和訂閱,首先消息的發佈和訂閱與點點的消息收發基本一致,不一樣點在於會話線程的建立:

destination = session.createTopic(queryName);//建立消息發佈線程

  其餘部分一致,沒有什麼難度,這裏再也不闡述,你們能夠自行測試。固然我也爲你們提供了測試參考代碼:https://github.com/hpugs/ActiveMQ

  到這裏關於ActiveMQ入門部分就和你們探討完畢,若有錯誤還有指教。謝謝

相關文章
相關標籤/搜索