ActiveMQ學習之Broker

1、概念

           至關於一個ActiveMQ服務實例java

                    Broker其實就是實現了用代碼的形式啓動了ActiveMQ將MQ嵌入到java代碼中,以便隨時用隨時啓動,在用的時候再去啓動這樣能節約資源,也保證了可靠性。spring

2、按照不一樣配置文件啓動ActiveMQ

           一、 先將ActiveMQ根目錄下conf文件夾中的activemq.xml複製一份並重命名爲activemq02.xmlapache

                命令(cp activemq.xml activemq02.xml)json

           二、啓動activemq02.xml,默認啓動的是activemq.xmlspringboot

                命令(./activemq start xbean:file:/usr/local/activeMQ/apache-activemq-5.15.11/conf/activemq02.xml)服務器

3、嵌入式Broker

           用ActiveMQ Broker做爲獨立的消息服務器來構建java應用。ActiveMQ也支持在虛擬機中通訊,基於嵌入式的broker,可以無縫的集成其餘java應用session

4、代碼

            一、pom.xml中引入包tcp

<!--activemq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.9</version>
    </dependency>
    <!--fastjson-->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.9</version>
    </dependency>

            二、broker代碼             url

import org.apache.activemq.broker.BrokerService;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/10 16:04
 * @Version: 1.0
 */
public class EmbedBroker {
    public static void main(String[] args) throws Exception {
        //ActiveMQ也支持在虛擬機中通訊,嵌入broker
        BrokerService brokerService=new BrokerService();
        //將activeMQ嵌入到java程序中
        brokerService.setUseJmx(true);
        //如今是將activeMQ嵌入到java程序中,因此使用本機
        brokerService.addConnector("tcp://127.0.0.1:61616");
        //啓動程序
        brokerService.start();
    }

}

            三、隊列生產者代碼    spa

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/2 17:04
 * @Version: 1.0
 */
public class ActiveMQTest {
    //url路徑
    private static final String ACTRIVE_URL="tcp://127.0.0.1:61616";
    //隊列名稱
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);

        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列
            Queue queue=session.createQueue(QUEUE_NAME);
            //五、建立消息生產者,隊列模式
            MessageProducer messageProducer = session.createProducer(queue);
            //六、經過messageProducer生產三條消息發送到MQ消息隊列中
            for (int i=0;i<3;i++){
                //七、建立消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息

                //八、經過messageProducer發送給mq
                messageProducer.send(textMessage);
                //九、數據非持久化
                messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            }
            messageProducer.close();
            session.commit();
            session.close();
            connection.close();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

            三、隊列消費者代碼   

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/3 8:47
 * @Version: 1.0
 */
public class ActiveMQConsumer {
    //url路徑
    private static final String ACTRIVE_URL="tcp://127.0.0.1:61616";
    //隊列名稱
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //一、建立鏈接工廠
        //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //若是帳號密碼修改的話
        //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //二、經過鏈接工廠獲取鏈接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //三、建立session會話
            //裏面會有兩個參數,第一個爲事物,第二個是簽收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //四、這裏接受的queue的名稱要和發送者的一致
            Queue queue = session.createQueue(QUEUE_NAME);
            //五、建立消費者
            MessageConsumer consumer = session.createConsumer(queue);
            //六、經過監聽的方式消費消息
            while(true){
                //MessageConsumer 調用的receive方法爲同步調用,在消息到達以前一直阻塞線程
                //用什麼格式發送,這裏就用什麼格式接受
                //receive等待消息,不限制時間
                TextMessage message=(TextMessage)consumer.receive();

                //receive帶參數等待消息,限制時間,單位毫秒
                //TextMessage message=(TextMessage)consumer.receive(4000L);

                if(null != message){
                    System.out.println("接受的消息爲------>"+message.getText());
                }else{
                    break;
                }
            }
            //七、閉資源
            consumer.close();
            session.close();
            connection.close();

        }catch (Exception e){
            e.printStackTrace();
        }
    }

}
相關文章
相關標籤/搜索