activemq

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 1.Destination  目的地
 *  Queue 隊列
 *    隊列中的消息,默認只能由惟一的一個消費者處理,一旦處理消息後消息就被刪除
 *
 * 支持存在多個消費者,多個生產者,因此消費者不可能消費到已經被消費的消息
 * 當消費者不存在時,消息會一直保存,直到有消費者消費
 *
 *  Topic 目的地
 *    消息會發送給全部的消費者同時處理,只有在消息能夠重複處理的業務場景中可以使用
 *
 *
 */
public class TestProducer {


    public void sendTextMessage(String mess){

        ConnectionFactory connectionFactory = null;

        Connection connection = null;

        Destination destination = null;

        Session session = null;

        MessageProducer messageProducer =null;

        Message message = null;

        try {

            connectionFactory = new ActiveMQConnectionFactory();

            connection = connectionFactory.createConnection();
            //發送者默認是啓動的,消費者默認是不啓動的,全部客戶端時必須啓動
            //若是有特殊配置,配置後再啓動
            connection.start();
            //1 第一個參數  是否支持事務,不推薦事務,批量時建議使用  若是true  第二個參數無效(只是提供者)
            // 客戶端不支持事務
            //2  消息確認機制
            /**
             * 1 auto_acknowledge  自動確認消息,消息的消費者處理消息後,自動確認,經常使用,商業開發不推薦
             * 2 client_acknowledge 客戶端手動確認,消息的消費者處理後,必須手工確認
             * 3 dups_ok_acknowledge 有副本的客戶端手動確認 一個消息能夠屢次處理,能夠下降session的消耗,在
             *
             * 能夠容忍重複消息時使用(不推薦使用)
//設置該消息的超時時間
            //messageProducer.setTimeToLive(1000);
            /**
             * 過時的、處理失敗的消息,將會被ActiveMQ置入「ActiveMQ.DLQ」這個隊列中。這個隊列是由ActiveMQ自動建立的。
             *
             * 若是須要查看這些未被處理的消息,能夠進入這個隊列中查看
               Destination destination = session.createQueue("ActiveMQ.DLQ");
             */ *
*/ session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("first-mq"); messageProducer = session.createProducer(destination); message = session.createTextMessage(mess); messageProducer.send(message);
//session.commit(); //啓用事務時記得提交事務,否則消費端接收不到消息 System.out.println(
"消息已發送"); }catch (Exception e){ e.printStackTrace(); }finally { if (messageProducer!=null){ try { messageProducer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args) { TestProducer t = new TestProducer(); t.sendTextMessage("發送一條消息"); } }

 

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Administrator on 2019/6/27.
 */
public class TestConsumer {


    public void rTextMessage() {

        ConnectionFactory connectionFactory = null;

        Connection connection = null;

        Destination destination = null;

        Session session = null;


        MessageConsumer messageConsumer = null;

        Message message = null;

        try {

            connectionFactory = new ActiveMQConnectionFactory();

            connection = connectionFactory.createConnection();
            //發送者默認是啓動的,消費者默認是不啓動的,全部客戶端時必須啓動
            //若是有特殊配置,配置後再啓動
            connection.start();
            //1 第一個參數  是否支持事務,不推薦事務,批量時建議使用  若是true  第二個參數無效(只是提供者)
            // 客戶端不支持事務
            //2  消息確認機制
            /**
             * 1 auto_acknowledge  自動確認消息,消息的消費者處理消息後,自動確認,經常使用,商業開發不推薦
             * 2 client_acknowledge 客戶端手動確認,消息的消費者處理後,必須手工確認
             * 3 dups_ok_acknowledge 有副本的客戶端手動確認 一個消息能夠屢次處理,能夠下降session的消耗,在
             *
             * 能夠容忍重複消息時使用(不推薦使用)
             *
             */
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue("first-mq");
            //messageProducer = session.createProducer(destination);
            messageConsumer = session.createConsumer(destination);

            /**
             * 主動活動消息,執行一次,拉去一個消息,開發少用
             * 多個消費者要用監聽
             */

            message =messageConsumer.receive();

            String mess = ((TextMessage)message).getText();

            System.out.println("接收到的消息----------"+mess);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (messageConsumer != null) {
                try {
                    messageConsumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void main(String[] args) {
        TestConsumer t = new TestConsumer();
        t.rTextMessage();
    }


}

 

/**
     * 監聽隊列消息
     */
    public void receiveMessageListener() {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer consumer = null;
        try {
            connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("first-quere");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收消息:"+textMessage.getText());
                        //textMessage.acknowledge();消息確認機制
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }

 

1java

相關文章
相關標籤/搜索