2,ActiveMQ-入門

ActiveMQ是Apache出品的,很是流行的消息中間件,能夠說要掌握消息中間件,須要從ActiveMQ開始。首先去官網下載:ActiveMQ官網 html

一,ActiveMQ目錄配置文件

1.1,ActiveMQ目錄 java

binweb

存放的是ActiveMQ的啓動腳本activemq.bat。apache

conf瀏覽器

裏面是配置文件,重點關注的是activemq.xml、jetty.xml、jetty-realm.properties。在登陸ActiveMQ Web控制檯須要用戶名、密碼信息、在JMS CLIENT和ActiveMQ進行何種協議的鏈接端口等這些信息都在上面的配置文件中能夠體現。session

data併發

目錄下是ActiveMQ進行消息持久化存放的地方,默認採用的是kahadb,固然咱們能夠採用leveldb,或者採用JDBC存儲到MySQL,或者乾脆不使用持久化機制。app

webappswebapp

注意ActiveMQ自帶Jetty提供Web管控臺。ide

lib

中ActiveMQ爲咱們提供了分功能的JAR包。

1.2,ActiveMQ啓動

CMD進入ActiveMQ所在的bin目錄,輸入ActiveMQ start便可啓動ActiveMQ,這裏必須已經安裝JDK,須要注意JDK版本號。

瀏覽器輸入localhost:8161便可訪問ActiveMQ web後臺管理

生產的消息查看方式:

Number Of Pending Messages

還有多少條消息沒有被消費,其實是表示消息的積壓程度,就是P-C

Number Of Consumers

在該隊列上還有多少消費者在等待接受消息

Messages Dequeued

消費了多少條消息,記作C

Messages Enqueued

生產了多少條消息,記作P

ActiveMQ 各個版本所依賴的JDK版本:

能夠經過查看文件 activemq-all-*.jar包裏面的\META-INF\MANIFEST.MF 屬性值 Build-Jdk

1.3,ActiveMQ配置文件

jetty.xml

Jetty web容器的配置文件,裏面能夠修改ActiveMQ web後臺的端口號。

jetty-realm.properties

這裏保存了ActiveMQ web控制檯的用戶名、密碼。

activemq.xml

內容太多,單獨說。

二,ActiveMQ入門例

pom.xml中添加ActiveMQ依賴:

<dependency>

  <groupId>org.apache.activemq</groupId>

  <artifactId>activemq-all</artifactId>

  <version>5.14.5</version>

</dependency>

ActiveMQ接受/發送消息流程圖:

2.1,ActiveMQLinkUtil幫助類:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class ActiveMQLinkUtil {
    private static final Logger logger = LoggerFactory.getLogger(ActiveMQLinkUtil.class);
    //ActiveMQ 的默認用戶名
    private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMQ 的默認登陸密碼
    private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的連接地址
    private static String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //連接工廠對象
    private static ConnectionFactory connectionFactory = null;
    //連接對象
    private static Connection connection = null;
    //會話對象
    private static Session session = null;
    /**
     * 
     * @方法名: initConnection
     * @描述: 初始化Connection,並返回Session對象,默認開啓事務和自動簽收
     * @return Session對象
     */
    public static Session initConnection(){
        try {
            String parameters = String.format("鏈接參數:userName=%s, pwd=%s, url=%s", USERNAME, PASSWORD, BROKEN_URL);
            logger.info(parameters);
            //第一步:建立一個連接工廠
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //第二步:從工廠中建立一個連接
            connection  = connectionFactory.createConnection();
            //開啓連接(默認是關閉的)
            connection.start();
            //第三步:建立session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            logger.info("Init Connection success");
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return session;
    }
    
    /**
     * 
     * @方法名: initConnection
     * @描述:  初始化Connection
     * @param transactionState 是否開啓事務
     * @param model 簽收模式
     * @return Session對象
     */
    public static Session initConnection(Boolean transactionState, int model){
        try {
            String parameters = String.format("鏈接參數:userName=%s, pwd=%s, url=%s", USERNAME, PASSWORD, BROKEN_URL);
            logger.info(parameters);
            
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(transactionState, model);
            logger.info("Init Connection success");
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return session;
    }
    
    /**
     * 
     * @方法名: updateLinkParameter
     * @描述: 修改默認ActiveMQ的鏈接屬性
     * @param userName
     * @param pwd
     * @param url
     */
    public static void updateLinkParameter(String userName, String pwd , String url){
        if(!StringUtils.isBlank(userName)){
            USERNAME = userName;
        }
        if(!StringUtils.isBlank(pwd)){
            PASSWORD = pwd;
        }
        if(!StringUtils.isBlank(url)){
            BROKEN_URL = url;
        }
    }
    
    /**
     * 
     * @方法名: closeConnection
     * @描述: 關閉鏈接
     */
    public static void closeConnection(){
        if(connection != null){
            try {
                connection.close();
                logger.info("close Connection success");
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

2.2,建立生產者

package com.zender.activemq;
 
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
 
/**
 * 
 * @類名稱:ActiveMQProducter
 * @類描述:生產者
 */
public class ActiveMQProducter {
    //會話對象
    private static Session session = null;
    
    public static void sendMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection();
            if(session != null){
                //第四步:建立一個消息目標,在PTP模式下是Queue,pub/sub模式下是topic
                Queue queue = session.createQueue(name);
                
                //第五步:建立一個消息生產者
                MessageProducer messageProducer = session.createProducer(queue);
                
                //第六步:設置持久化方式,默認設置爲PERSISTENT(持久性)
                messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(1000);
                    //第七步:建立文本消息,併發送
                    TextMessage textMessage = session.createTextMessage("消息內容" + (i + 1 ));
                    System.out.println("消息內容:" + textMessage.getText());
                    //使用生產者,發送消息
                    messageProducer.send(textMessage);
                    //提交事務
                    session.commit();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第八步:釋放鏈接
            ActiveMQLinkUtil.closeConnection();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQProducter.sendMessages("Zender-MQ");
    }
}

2.3,建立消費者

import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
 
/**
 * 
 * @類名稱:ActiveMQComsumer
 * @類描述:消費者
 */
public class ActiveMQComsumer {
    //會話對象
    private static Session session = null;
    
    public static void getMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection();
            if(session != null){
                //建立一個消息隊列
                Queue queue = session.createQueue(name);
                //建立一個消息消費者
                MessageConsumer  messageConsumer = session.createConsumer(queue);
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(1000);
                    //獲取消息
                    TextMessage msg = (TextMessage) messageConsumer.receive();
                    //確認接到消息,這條代碼能夠不寫,上面設置了自動消息確認
                    //msg.acknowledge();
                    System.out.println("消息內容:" + msg.getText());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ActiveMQLinkUtil.closeConnection();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQComsumer.getMessages("Zender-MQ");
    }
}

2.4,運行,查看結果

先運行生產者,再運行消費者。

生產者日誌:

消費者日誌:

三,Session中的事務和簽收模式(消息確認機制)

在經過Connection建立Session的時候,須要設置2個參數,一個是否支持事務,另外一個是簽收的模式。重點說一下簽收模式,ActiveMQ支持一下三種模式:

AUTO_ACKNOWLEDGE

表示在消費者receive消息的時候自動的簽收

CLIENT_ACKNOWLEDGE

表示消費者receive消息後必須手動的調用acknowledge()方法進行簽收

DUPS_OK_ACKNOWLEDGE

籤不簽收無所謂了,只要消費者可以容忍重複的消息接受,固然這樣會下降Session的開銷

在實際中,咱們應該採用CLIENT_ACKNOWLEDGE這種簽收模式,採用手動的方式較自動的方式可能更好些,由於接收到了消息,並不意味着成功的處理了消息,假設咱們採用手動簽收的方式,只有在消息成功處理的前提下才進行簽收,那麼只要消息處理失敗,那麼消息還有效,仍然會繼續消費,直至成功處理。

四,消息選擇器Selector

JMS Selectors,即消息選擇器,前面介紹過消息的組成部分,其中談到消息對象有消息屬性,用於消息選擇器。咱們來看一段代碼:

生產者:

package com.zender.activemq.selectors;
 
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import com.zender.activemq.ActiveMQLinkUtil;
 
/**
 * 
 * @類名稱:ActiveMQSelectorsProducter
 * @類描述:生產者-JSM選擇器-消息選擇器
 */
public class ActiveMQSelectorsProducter {
    //會話對象
    private static Session session = null;
    
    public static void sendMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(false, Session.CLIENT_ACKNOWLEDGE);
            if(session != null){
                //建立一個消息目標
                Queue queue = session.createQueue(name);
                //建立一個消息生產者
                MessageProducer messageProducer = session.createProducer(queue);
                for (int i = 0; i < 10; i++) {
                    //建立消息,併發送
                    TextMessage textMessage = session.createTextMessage("消息A-"+i);
                    textMessage.setStringProperty("JMSXtestId", "a");
                    //使用生產者,發送消息
                    messageProducer.send(textMessage);
                    
                    TextMessage textMessage1 = session.createTextMessage("消息B-"+i);
                    textMessage1.setStringProperty("JMSXtestId", "b");
                    messageProducer.send(textMessage1);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQSelectorsProducter.sendMessages("SelectorsDemo");
    }
}

消費者:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import com.zender.activemq.ActiveMQLinkUtil;
 
/**
 * 
 * @類名稱:ActiveMQSelectorsComsumer
 * @類描述:消費者-JSM選擇器-消息選擇器
 */
public class ActiveMQSelectorsComsumer {
    //會話對象
    private static Session session = null;
    
    @SuppressWarnings("unchecked")
    public static void getMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(false, Session.CLIENT_ACKNOWLEDGE);
            if(session != null){
                //建立一個消息隊列
                Queue queue = session.createQueue(name);
                //建立一個消息消費者
                MessageConsumer  messageConsumer = session.createConsumer(queue, "JMSXtestId='a'");
                //獲取消息
                messageConsumer.setMessageListener(new MessageListener(){
                    @Override
                    public void onMessage(Message message) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println(textMessage.getText());
                            //簽收
                            textMessage.acknowledge();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQSelectorsComsumer.getMessages("SelectorsDemo");
    }
}

結果:

一個生產了20個消息,這邊只消費了消息A的消息,消息B的並無被消費。

相關文章
相關標籤/搜索