Centos7環境下消息隊列之ActiveMQ實戰

Activemq介紹

對於消息的傳遞有兩種類型:apache

一種是點對點的,即一個生產者和一個消費者一一對應;服務器

另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。session

JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。tcp

  · StreamMessage -- Java原始值的數據流ide

  · MapMessage--一套名稱-值對spa

  · TextMessage--一個字符串對象3d

  · ObjectMessage--一個序列化的 Java對象code

  · BytesMessage--一個字節的數據流對象

Activemq安裝

第一步: ActiveMQ 的壓縮包上傳到Linux系統。blog

第二步:解壓縮。

第三步:啓動。

使用bin目錄下的activemq命令啓動:

[root@localhost bin]# ./activemq start

關閉:

[root@localhost bin]# ./activemq stop

查看狀態:

[root@localhost bin]# ./activemq status

進入管理後臺

http://192.168.176.130:8161/admin/

用戶名:admin

密碼:admin

JMQ的兩種消息模式

消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.

點對點的消息模式

    點對點的模式主要創建在一個隊列上面,當鏈接一個列隊的時候,發送端不須要知道接收端是否正在接收,能夠直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,若是有接收端在監聽,則會發向接收端,若是沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式能夠有多個發送端,多個接收端,可是一條消息,只會被一個接收端給接收到,哪一個接收端先連上ActiveMQ,則會先接收到,然後來的接收端則接收不到那條消息. 

訂閱模式

    訂閱/發佈模式,一樣能夠有着多個發送端與多個接收端,可是接收端與發送端存在時間上的依賴,就是若是發送端發送消息的時候,接收端並無監聽消息,那麼ActiveMQ將不會保存消息,將會認爲消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕之後監聽消息,一樣也是接收不到的。這個模式還有一個特色,那就是,發送端發送的消息,將會被全部的接收端給接收到,不相似點對點,一條消息只會被一個接收端給接收到。

jar包添加到工程中

使用5.11.2版本的jar

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
</dependency>

點對點的實現代碼

@Test
    public void testQueueProducer() throws Exception {
        //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
        //二、使用工廠對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        //三、開啓鏈接,調用Connection對象的start方法。
        connection.start();
        //四、建立一個Session對象。
        //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。
        //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用queue
        Queue queue = session.createQueue("test-queue");
        //六、使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(queue);
        //七、建立一個Message對象,可使用TextMessage。
        /*TextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText("hello Activemq");*/
        TextMessage textMessage = session.createTextMessage("hello activemq");
        //八、發送消息
        producer.send(textMessage);
        //九、關閉資源
        producer.close();
        session.close();
        connection.close();
    }
    
    @Test
    public void testQueueConsumer() throws Exception {
        //建立一個ConnectionFactory對象鏈接MQ服務器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
        //建立一個鏈接對象
        Connection connection = connectionFactory.createConnection();
        //開啓鏈接
        connection.start();
        //使用Connection對象建立一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一個Destination對象。queue對象
        Queue queue = session.createQueue("test-queue");
        //使用Session對象建立一個消費者對象。
        MessageConsumer consumer = session.createConsumer(queue);
        //接收消息
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                //打印結果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                
            }
        });
        //等待接收消息
        System.in.read();
        //關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
View Code

訂閱/發佈模式的實現代碼

    @Test
    public void testTopicProducer() throws Exception {
        //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
        //二、使用工廠對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        //三、開啓鏈接,調用Connection對象的start方法。
        connection.start();
        //四、建立一個Session對象。
        //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。
        //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用topic
        Topic topic = session.createTopic("test-topic");
        //六、使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        //七、建立一個Message對象,可使用TextMessage。
        /*TextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText("hello Activemq");*/
        TextMessage textMessage = session.createTextMessage("topic message");
        //八、發送消息
        producer.send(textMessage);
        //九、關閉資源
        producer.close();
        session.close();
        connection.close();
    }
    
    @Test
    public void testTopicConsumer() throws Exception {
        //建立一個ConnectionFactory對象鏈接MQ服務器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.176.130:61616");
        //建立一個鏈接對象
        Connection connection = connectionFactory.createConnection();
        //開啓鏈接
        connection.start();
        //使用Connection對象建立一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一個Destination對象。topic對象
        Topic topic = session.createTopic("test-topic");
        //使用Session對象建立一個消費者對象。
        MessageConsumer consumer = session.createConsumer(topic);
        //接收消息
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                //打印結果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                
            }
        });
        System.out.println("topic消費者3啓動。。。。");
        //等待接收消息
        System.in.read();
        //關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
View Code
相關文章
相關標籤/搜索