ActiveMQ--模式(隊列模式/主題模式)

兩種模式:隊列模式/主題模式apache

pom.xml

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.9</version>
</dependency>  

 

隊列模式,其實就是分食模式。服務器

   好比生產方發了 10條消息到 activeMQ 服務器, 而此時有多個 消費方,那麼這些消費方就會瓜分這些10條消息,一條消息只會被一個消費方獲得。
主題模式,就是訂閱模式。 session

  好比生產方發了10條消息,而此時有多個消費方,那麼多個消費方都能獲得這 10條消息,就如同訂閱公衆號那樣。dom


 

隊列模式:tcp

1. 首先運行兩次 TestConsumer 類,以啓動兩個不一樣的消費者
2. 運行一次 TestProducer, 以啓動 生產者url

  生產者生產100個,兩個消費者瓜分spa

  消費者:3d

public class TestConsumer {
    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //此次消費的消息名稱
    private static final String topicName="queue_style";

    //消費者有多是多個,爲了區分不一樣的消費者,爲其建立隨機名稱
    private static final String consumerName="consumer-" + RandomUtil.randomString(5);
    public static void main(String[] args) throws JMSException {
        //0. 先判斷端口是否啓動了 Active MQ 服務器
        ActiveMQUtil.checkServer();
        System.out.printf("%s 消費者啓動了。 %n", consumerName);

        //1.建立ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.建立Connection
        Connection connection= factory.createConnection();
        //3.啓動鏈接
        connection.start();
        //4.建立會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立一個目標 (主題類型)
        Destination destination=session.createQueue(topicName);
        //6.建立一個消費者
        MessageConsumer consumer=session.createConsumer(destination);
        //7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message arg0) {
                // TODO Auto-generated method stub
                TextMessage textMessage=(TextMessage)arg0;
                try {
                    System.out.println(consumerName +" 接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        });
        
        //8. 由於不知道何時有,因此無法主動關閉,就不關閉了,一直處於監聽狀態
        //connection.close();
    }
}

生產者:code

public class TestProducer {

    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //此次發送的消息名稱
    private static final String topicName="queue_style";
    public static void main(String[] args) throws JMSException {
        //0. 先判斷端口是否啓動了  Active MQ 服務器
        ActiveMQUtil.checkServer();
        //1.建立ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.建立Connection
        Connection connection= factory.createConnection();
        //3.啓動鏈接
        connection.start();
        //4.建立會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立一個目標 (隊列類型)
        Destination destination=session.createQueue(topicName);
        //6.建立一個生產者
        MessageProducer producer=session.createProducer(destination);


        for (int i = 0; i < 100; i++) {
            //7.建立消息
            TextMessage textMessage=session.createTextMessage("隊列消息-"+i);
            //8.發送消息
            producer.send(textMessage);
            System.out.println("發送:"+textMessage.getText());
        }
        //7. 關閉鏈接
        connection.close();
    }
}

2個consumer:xml

 

 

生產者生產:

 

 

 

 

 

 

 

 

 

 


 

主題模式:

 

  消費者,生產者

public class TestConsumer {
    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //此次消費的消息名稱
    private static final String topicName="topic_style";

    //消費者有多是多個,爲了區分不一樣的消費者,爲其建立隨機名稱
    private static final String consumerName="consumer-" + RandomUtil.randomString(5);
    public static void main(String[] args) throws JMSException {
        

        //0. 先判斷端口是否啓動了 Active MQ 服務器
        ActiveMQUtil.checkServer();
        System.out.printf("%s 消費者啓動了。 %n", consumerName);
        //1.建立ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.建立Connection
        Connection connection= factory.createConnection();
        //3.啓動鏈接
        connection.start();
        //4.建立會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.建立一個目標 (主題類型)
        Destination destination=session.createTopic(topicName);

        //6.建立一個消費者
        MessageConsumer consumer=session.createConsumer(destination);
        //7.建立一個監聽器
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message arg0) {
                // TODO Auto-generated method stub
                TextMessage textMessage=(TextMessage)arg0;
                try {
                    System.out.println(consumerName +" 接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        });
        
        //8. 由於不知道何時有,因此無法主動關閉,就不關閉了,一直處於監聽狀態
        //connection.close();
    }
}
public class TestProducer {

    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //此次發送的消息名稱
    private static final String topicName="topic_style";
    public static void main(String[] args) throws JMSException {
        //0. 先判斷端口是否啓動了  Active MQ 服務器
        ActiveMQUtil.checkServer();
        //1.建立ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.建立Connection
        Connection connection= factory.createConnection();
        //3.啓動鏈接
        connection.start();
        //4.建立會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.建立一個目標 (主題類型)
        Destination destination=session.createTopic(topicName);
        //6.建立一個生產者
        MessageProducer producer=session.createProducer(destination);


        for (int i = 0; i < 100; i++) {
            //7.建立消息
            TextMessage textMessage=session.createTextMessage("主題消息-"+i);
            //8.發送消息
            producer.send(textMessage);
            System.out.println("發送:"+textMessage.getText());
        }
        //7. 關閉鏈接
        connection.close();
    }
}

 

生產者生產100個,兩個消費者都分別接受了100個

 

 

 

 

 

 

相關文章
相關標籤/搜索