ActiveMQ消息隊列的使用及應用

 

 

這裏就不說怎麼安裝了,直接解壓出來就好了。html

 

謝絕轉載,做者保留全部權力java

 

 

目錄: 

複製代碼
一:JMQ的兩種消息模式
    1.1:點對點的消息模式
    1.2:訂閱模式
二:點對點的實現代碼
    2.1:點對點的發送端
    2.2:點對點的接收端
三:訂閱/發佈模式的實現代碼
    3.1:訂閱模式的發送端
    3.2:訂閱模式的接收端
四:發送消息的數據類型
    4.1:傳遞javabean對象
    4.2:發送文件
五:ActiveMQ的應用
    5.1:保證消息的成功處理
    5.2:避免消息隊列的併發
        5.2.1:主動接收隊列消息
        5.2.2:使用多個接收端
    5.3:消息有效期的管理
    5.4:過時消息,處理失敗的消息如何處理

六:ActiveMQ的安全配置web

  6.1:管理後臺的密碼設置redis

   6.2:生產消費者的鏈接密碼數據庫

 

複製代碼

 

 

 

一:JMQ的兩種消息模式

消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.express

 

1.1:點對點的消息模式

 

點對點的模式主要創建在一個隊列上面,當鏈接一個列隊的時候,發送端不須要知道接收端是否正在接收,能夠直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,若是有接收端在監聽,則會發向接收端,若是沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式能夠有多個發送端,多個接收端,可是一條消息,只會被一個接收端給接收到,哪一個接收端先連上ActiveMQ,則會先接收到,然後來的接收端則接收不到那條消息apache

 

1.2:訂閱模式

 

訂閱/發佈模式,一樣能夠有着多個發送端與多個接收端,可是接收端與發送端存在時間上的依賴,就是若是發送端發送消息的時候,接收端並無監聽消息,那麼ActiveMQ將不會保存消息,將會認爲消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕之後監聽消息,一樣也是接收不到的。這個模式還有一個特色,那就是,發送端發送的消息,將會被全部的接收端給接收到,不相似點對點,一條消息只會被一個接收端給接收到。緩存

 

 

 

 

二:點對點的實現代碼

這裏使用java來實現一下ActiveMQ的點對點模式。安全

ActiveMQ版本爲 5.13.3服務器

項目使用MAVEN來構建

複製代碼
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
    </dependencies>
複製代碼

都是當前最新的版本

 

2.1:點對點的發送端

按 Ctrl+C 複製代碼
按 Ctrl+C 複製代碼

 

 

2.2:點對點的接收端

 

按 Ctrl+C 複製代碼
按 Ctrl+C 複製代碼

 

 

 

 

 

三:訂閱/發佈模式的實現代碼

3.1:訂閱模式的發送端

複製代碼
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TOPSend {
    //鏈接帳號
        private String userName = "";
        //鏈接密碼
        private String password = "";
        //鏈接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工廠
        private ConnectionFactory factory;
        //鏈接對象
        private Connection connection;
        //一個操做會話
        private Session session;
        //目的地,其實就是鏈接到哪一個隊列,若是是點對點,那麼它的實現是Queue,若是是訂閱模式,那它的實現是Topic
        private Destination destination;
        //生產者,就是產生數據的對象
        private MessageProducer producer;
        
        public static void main(String[] args) {
            TOPSend send = new TOPSend();
            send.start();
        }
        
        public void start(){
            try {
                //根據用戶名,密碼,url建立一個鏈接工廠
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //從工廠中獲取一個鏈接
                connection = factory.createConnection();
                //測試過這個步驟不寫也是能夠的,可是網上的各個文檔都寫了
                connection.start();
                //建立一個session
                //第一個參數:是否支持事務,若是爲true,則會忽略第二個參數,被jms服務器設置爲SESSION_TRANSACTED
                //第二個參數爲false時,paramB的值可爲Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
                //Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。哪怕是接收端發生異常,也會被看成正常發送成功。
                //Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會看成發送成功,並刪除消息。
                //DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這裏就是鏈接了一個名爲"text-msg"的隊列,這個會話將會到這個隊列,固然,若是這個隊列不存在,將會被建立
                
                
                
                //=======================================================
                //點對點與訂閱模式惟一不一樣的地方,就是這一行代碼,點對點建立的是Queue,而訂閱模式建立的是Topic
                destination = session.createTopic("topic-text");
                //=======================================================
                
                
                
                
                //從session中,獲取一個消息生產者
                producer = session.createProducer(destination);
                //設置生產者的模式,有兩種可選
                //DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存 保存在緩存中,生產環境中能夠存入數據庫,redis避免丟失
                //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列裏面的數據將會被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                
                //建立一條消息,固然,消息的類型有不少,如文字,字節,對象等,能夠經過session.create..方法來建立出來
                TextMessage textMsg = session.createTextMessage("哈哈");
                long s = System.currentTimeMillis();
                for(int i = 0 ; i < 100 ; i ++){
                    //發送一條消息
                    producer.send(textMsg);
                }
                long e = System.currentTimeMillis();
                System.out.println("發送消息成功");
                System.out.println(e - s);
                //即使生產者的對象關閉了,程序還在運行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
}
複製代碼

 

 

3.2:訂閱模式的接收端

複製代碼
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TOPSend {
    //鏈接帳號
        private String userName = "";
        //鏈接密碼
        private String password = "";
        //鏈接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工廠
        private ConnectionFactory factory;
        //鏈接對象
        private Connection connection;
        //一個操做會話
        private Session session;
        //目的地,其實就是鏈接到哪一個隊列,若是是點對點,那麼它的實現是Queue,若是是訂閱模式,那它的實現是Topic
        private Destination destination;
        //生產者,就是產生數據的對象
        private MessageProducer producer;
        
        public static void main(String[] args) {
            TOPSend send = new TOPSend();
            send.start();
        }
        
        public void start(){
            try {
                //根據用戶名,密碼,url建立一個鏈接工廠
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //從工廠中獲取一個鏈接
                connection = factory.createConnection();
                //測試過這個步驟不寫也是能夠的,可是網上的各個文檔都寫了
                connection.start();
                //建立一個session
                //第一個參數:是否支持事務,若是爲true,則會忽略第二個參數,被jms服務器設置爲SESSION_TRANSACTED
                //第二個參數爲false時,paramB的值可爲Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
                //Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。哪怕是接收端發生異常,也會被看成正常發送成功。
                //Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會看成發送成功,並刪除消息。
                //DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這裏就是鏈接了一個名爲"text-msg"的隊列,這個會話將會到這個隊列,固然,若是這個隊列不存在,將會被建立
                
                
                
                //=======================================================
                //點對點與訂閱模式惟一不一樣的地方,就是這一行代碼,點對點建立的是Queue,而訂閱模式建立的是Topic
                destination = session.createTopic("topic-text");
                //=======================================================
                
                
                
                
                //從session中,獲取一個消息生產者
                producer = session.createProducer(destination);
                //設置生產者的模式,有兩種可選
                //DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存
                //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列裏面的數據將會被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                
                //建立一條消息,固然,消息的類型有不少,如文字,字節,對象等,能夠經過session.create..方法來建立出來
                TextMessage textMsg = session.createTextMessage("哈哈");
                long s = System.currentTimeMillis();
                for(int i = 0 ; i < 100 ; i ++){
                    //發送一條消息
                    textMsg.setText("哈哈" + i);
                    producer.send(textMsg);
                }
                long e = System.currentTimeMillis();
                System.out.println("發送消息成功");
                System.out.println(e - s);
                //即使生產者的對象關閉了,程序還在運行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
}
複製代碼

 

四:發送消息的數據類型

上面的代碼演示,所有都是發送字符串,可是ActiveMQ支持哪些數據呢?

你們能夠看一下  javax.jms.Message 這個接口,只要是這個接口的數據,均可以被髮送。

 

或者這樣看起來有點麻煩,那麼看到上面的代碼,建立消息,是經過session這個對象來建立的,那咱們來看一下這裏有哪些能夠被建立的呢?

複製代碼
            //純字符串的數據
            session.createTextMessage();
            //序列化的對象
            session.createObjectMessage();
            //流,能夠用來傳遞文件等
            session.createStreamMessage();
            //用來傳遞字節
            session.createBytesMessage();
            //這個方法建立出來的就是一個map,能夠把它看成map來用,當你看了它的一些方法,你就懂了
            session.createMapMessage();
            //這個方法,拿到的是javax.jms.Message,是全部message的接口
            session.createMessage();
複製代碼

 

 

4.1:傳遞javabean對象

傳遞一個java對象,多是最多的使用方式了,並且這種數據接收與使用都方便,那麼,下面的代碼就來演示下如何發送一個java對象

固然了,這個對象必須序列化,也就是實現Serializable接口

 

複製代碼
            //經過這個方法,能夠把一個對象發送出去,固然,這個對象須要序列化,由於一切在網絡在傳輸的,都是字節
            ObjectMessage obj = session.createObjectMessage();
            for(int i = 0 ; i < 100 ; i ++){
                Person p = new Person(i,"名字");
                obj.setObject(p);
                producer.send(obj);
            }
複製代碼

 

那麼在接收端要怎麼接收這個對象呢?

 

複製代碼
            //實現一個消息的監聽器
            //實現這個監聽器後,之後只要有消息,就會經過這個監聽器接收到
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        //一樣的,強轉爲ObjectMessage,而後拿到對象,強轉爲Person
                        Person p = (Person) ((ObjectMessage)message).getObject();
                        System.out.println(p);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
複製代碼

 

 

 

 

4.2:發送文件

 

發送文件,這裏用BytesMessage

            BytesMessage bb = session.createBytesMessage();
            bb.writeBytes(new byte[]{2});

至於這裏的new Byte[]{2},確定不是這樣寫的,從文件裏面拿流出來便可

 

 

接收的話

複製代碼
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    
                    BytesMessage bm = (BytesMessage)message;
                    FileOutputStream out = null;
                    try {
                        out = new FileOutputStream("d:/1.ext");
                    } catch (FileNotFoundException e2) {
                        e2.printStackTrace();
                    }
                    byte[] by = new byte[1024];
                    int len = 0 ;
                    try {
                        while((len = bm.readBytes(by))!= -1){
                            out.write(by,0,len);
                        }
                    } catch (JMSException | IOException e1) {
                        e1.printStackTrace();
                    }
                    
                }
            });
複製代碼

 

 

 

 

 

 

五:ActiveMQ的應用

5.1:保證消息的成功處理

消息發送成功後,接收端接收到了消息。而後進行處理,可是可能因爲某種緣由,高併發也好,IO阻塞也好,反正這條消息在接收端處理失敗了。而點對點的特性是一條消息,只會被一個接收端給接收,只要接收端A接收成功了,接收端B,就不可能接收到這條消息,若是是一些普通的消息還好,可是若是是一些很重要的消息,好比說用戶的支付訂單,用戶的退款,這些與金錢相關的,是必須保證成功的,那麼這個時候要怎麼處理呢?

 

咱們可使用  CLIENT_ACKNOWLEDGE  模式

 

以前其實就有提到當建立一個session的時候,須要指定其事務,及消息的處理模式,當時使用的是

 

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

AUTO_ACKNOWLEDGE 

這一個代碼的是,當消息發送給接收端以後,就自動確認成功了,而無論接收端有沒有處理成功,而一旦確認成功後,就會把隊列裏面的消息給清除掉,避免下一個接收端接收到一樣的消息。

那麼,它還有另一個模式,那就是 CLIENT_ACKNOWLEDGE

這行要寫在接收端裏面,不是寫在發送端的

 

session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

 

這行代碼之後,若是接收端不確認消息,那麼activemq將會把這條消息一直保留,直到有一個接收端肯定了消息。

那麼要怎麼確認消息呢?

在接收端接收到消息的時候,調用javax.jms.Message的acknowledge方法

複製代碼
@Override
                public void onMessage(Message message) {
                    try {
                        //獲取到接收的數據
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        //確認接收,併成功處理了消息
                        message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
複製代碼

這樣,當消息處理成功以後,確認消息,若是不肯定,activemq將會發給下一個接收端處理

 注意:只在點對點中有效,訂閱模式,即便不確認,也不會保存消息

 

 

5.2:避免消息隊列的併發

JMQ設計出來的緣由,就是用來避免併發的,和溝通兩個系統之間的交互。

 

5.2.1:主動接收隊列消息

 

先看一下以前的代碼:

複製代碼
            //實現一個消息的監聽器
            //實現這個監聽器後,之後只要有消息,就會經過這個監聽器接收到
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        //獲取到接收的數據
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        //確認接收,併成功處理了消息
                        message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
複製代碼

以前的代碼裏面,實現了一個監聽器,監聽消息的傳遞,這樣只要每有一個消息,都會即時的傳遞到程序中。

可是,這樣的處理,在高併發的時候,由於它是被動接收,並無考慮到程序的處理能力,可能會壓跨系統,那要怎麼辦呢?

 

答案就是把被動變爲主動,當程序有着處理消息的能力時,主動去接收一條消息進行處理

 

實現的代碼以下:

複製代碼
      if(當程序有能力處理){//當程序有能力處理時接收
                    Message receive = consumer.receive();
           //這個能夠設置超時時間,超過則不等待消息 
            recieve.receive(10000); //其實receive是一個阻塞式方法,必定會拿到值的 if(null != receive){ String text = ((TextMessage)receive).getText(); receive.acknowledge(); System.out.println(text); }else{ //沒有值嘛 // } }
複製代碼

 

經過上面的代碼,就可讓程序自已判斷,本身是否有能力接收這條消息,若是不能接收,那就給別的接收端接收,或者等本身有能力處理的時候接收

 

 

5.2.2:使用多個接收端

ActiveMQ是支持多個接收端的,若是當程序沒法處理這麼多數據的時候,能夠考慮多個線程,或者增長服務器來處理。

 

 

 

5.3:消息有效期的管理

這樣的場景也是有的,一條消息的有效時間,當發送一條消息的時候,可能但願這條消息在指定的時間被處理,若是超過了指定的時間,那麼這條消息就失效了,就不須要進行處理了,那麼咱們可使用ActiveMQ的設置有效期來實現

 

代碼以下:

複製代碼
            TextMessage msg = session.createTextMessage("哈哈");
            for(int i = 0 ; i < 100 ; i ++){
                //設置該消息的超時時間
                producer.setTimeToLive(i * 1000);
                producer.send(msg);
            }
複製代碼

 

這裏每一條消息的有效期都是不一樣的,打開ip:8161/admin/就能夠查看到,裏面的消息愈來愈少了。

 

過時的消息是不會被接收到的。

 

過時的消息會從隊列中清除,並存儲到ActiveMQ.DLQ這個隊列裏面,這個稍後會解釋。

 

 

5.4:過時消息,處理失敗的消息如何處理

 過時的、處理失敗的消息,將會被ActiveMQ置入「ActiveMQ.DLQ」這個隊列中。

這個隊列是ActiveMQ自動建立的。

若是須要查看這些未被處理的消息,能夠進入這個隊列中查看

//指定一個目的地,也就是一個隊列的位置
destination = session.createQueue("ActiveMQ.DLQ");

這樣就能夠進入隊列中,而後實現接口,或者經過receive()方法,就能夠拿到未被處理的消息,從而保證正確的處理

 

 

六:ActiveMQ的安全配置

6.1:管理後臺的密碼設置

咱們都知道,打開ip:8161/admin/ 就是activemq的管理控制檯,它的默認帳號和密碼都是admin,在生產環境確定須要更改密碼的,這要怎麼作呢?

在activemq/conf/jetty.xml中找到

複製代碼
   <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="admin" />
         <!-- 把這個改成true,固然,高版本的已經改成了true -->
        <property name="authenticate" value="true" />
  </bean>
複製代碼

高版本的已經默認成爲了true。因此咱們直接進行下一步便可

在activemq/conf/jetty-realm.properties文件中配置,打開以下

複製代碼
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
#用戶名,密碼,角色
admin: admin, admin
user: user, user
複製代碼

注意:你們重點看倒數第二行,那裏三個分別是用戶名,密碼,角色,其中admin角色是固定的

 

 

6.2:生產消費者的鏈接密碼

注意:activemq默認是不須要密碼,生產消費者就能夠鏈接的

咱們須要通過配置,才能設置密碼,這一步在生產環境中必定要配置

找到activemq/conf/activemq.xml,並打開
在<broker>節點中,在<systemUsage>節點上面,增長以下的一個插件

複製代碼
<plugins>
             <simpleAuthenticationPlugin>
                 <users>
                     <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
                 </users>
             </simpleAuthenticationPlugin>
         </plugins>
複製代碼

這樣就開啓了密碼認證
而後帳號密碼的配置在activemq/conf/credentials.properties文件中

打開這個文件以下

複製代碼
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines credentials that will be used by components (like web console) to access the broker

#帳號
activemq.username=admin
#密碼
activemq.password=123456
guest.password=password
複製代碼

 

 

這樣就配置完畢了。

相關文章
相關標籤/搜索