這裏就不說怎麼安裝了,直接解壓出來就好了。html
謝絕轉載,做者保留全部權力java
一:JMQ的兩種消息模式 1.1:點對點的消息模式 1.2:訂閱模式 二:點對點的實現代碼 2.1:點對點的發送端 2.2:點對點的接收端 三:訂閱/發佈模式的實現代碼 3.1:訂閱模式的發送端 3.2:訂閱模式的接收端 四:發送消息的數據類型 4.1:傳遞javabean對象 4.2:發送文件 五:ActiveMQ的應用 5.1:保證消息的成功處理 5.2:避免消息隊列的併發 5.2.1:主動接收隊列消息 5.2.2:使用多個接收端 5.3:消息有效期的管理 5.4:過時消息,處理失敗的消息如何處理
六:ActiveMQ的安全配置web
6.1:管理後臺的密碼設置redis
6.2:生產消費者的鏈接密碼數據庫
消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.express
點對點的模式主要創建在一個隊列上面,當鏈接一個列隊的時候,發送端不須要知道接收端是否正在接收,能夠直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,若是有接收端在監聽,則會發向接收端,若是沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式能夠有多個發送端,多個接收端,可是一條消息,只會被一個接收端給接收到,哪一個接收端先連上ActiveMQ,則會先接收到,然後來的接收端則接收不到那條消息apache
訂閱/發佈模式,一樣能夠有着多個發送端與多個接收端,可是接收端與發送端存在時間上的依賴,就是若是發送端發送消息的時候,接收端並無監聽消息,那麼ActiveMQ將不會保存消息,將會認爲消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕之後監聽消息,一樣也是接收不到的。這個模式還有一個特色,那就是,發送端發送的消息,將會被全部的接收端給接收到,不相似點對點,一條消息只會被一個接收端給接收到。緩存
這裏使用java來實現一下ActiveMQ的點對點模式。安全
ActiveMQ版本爲 5.13.3服務器
項目使用MAVEN來構建
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> </dependencies>
都是當前最新的版本
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TOPSend { //鏈接帳號 private String userName = ""; //鏈接密碼 private String password = ""; //鏈接地址 private String brokerURL = "tcp://192.168.0.130:61616"; //connection的工廠 private ConnectionFactory factory; //鏈接對象 private Connection connection; //一個操做會話 private Session session; //目的地,其實就是鏈接到哪一個隊列,若是是點對點,那麼它的實現是Queue,若是是訂閱模式,那它的實現是Topic private Destination destination; //生產者,就是產生數據的對象 private MessageProducer producer; public static void main(String[] args) { TOPSend send = new TOPSend(); send.start(); } public void start(){ try { //根據用戶名,密碼,url建立一個鏈接工廠 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //從工廠中獲取一個鏈接 connection = factory.createConnection(); //測試過這個步驟不寫也是能夠的,可是網上的各個文檔都寫了 connection.start(); //建立一個session //第一個參數:是否支持事務,若是爲true,則會忽略第二個參數,被jms服務器設置爲SESSION_TRANSACTED //第二個參數爲false時,paramB的值可爲Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。 //Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。哪怕是接收端發生異常,也會被看成正常發送成功。 //Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會看成發送成功,並刪除消息。 //DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這裏就是鏈接了一個名爲"text-msg"的隊列,這個會話將會到這個隊列,固然,若是這個隊列不存在,將會被建立 //======================================================= //點對點與訂閱模式惟一不一樣的地方,就是這一行代碼,點對點建立的是Queue,而訂閱模式建立的是Topic destination = session.createTopic("topic-text"); //======================================================= //從session中,獲取一個消息生產者 producer = session.createProducer(destination); //設置生產者的模式,有兩種可選 //DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存 保存在緩存中,生產環境中能夠存入數據庫,redis避免丟失 //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列裏面的數據將會被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //建立一條消息,固然,消息的類型有不少,如文字,字節,對象等,能夠經過session.create..方法來建立出來 TextMessage textMsg = session.createTextMessage("哈哈"); long s = System.currentTimeMillis(); for(int i = 0 ; i < 100 ; i ++){ //發送一條消息 producer.send(textMsg); } long e = System.currentTimeMillis(); System.out.println("發送消息成功"); System.out.println(e - s); //即使生產者的對象關閉了,程序還在運行哦 producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TOPSend { //鏈接帳號 private String userName = ""; //鏈接密碼 private String password = ""; //鏈接地址 private String brokerURL = "tcp://192.168.0.130:61616"; //connection的工廠 private ConnectionFactory factory; //鏈接對象 private Connection connection; //一個操做會話 private Session session; //目的地,其實就是鏈接到哪一個隊列,若是是點對點,那麼它的實現是Queue,若是是訂閱模式,那它的實現是Topic private Destination destination; //生產者,就是產生數據的對象 private MessageProducer producer; public static void main(String[] args) { TOPSend send = new TOPSend(); send.start(); } public void start(){ try { //根據用戶名,密碼,url建立一個鏈接工廠 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //從工廠中獲取一個鏈接 connection = factory.createConnection(); //測試過這個步驟不寫也是能夠的,可是網上的各個文檔都寫了 connection.start(); //建立一個session //第一個參數:是否支持事務,若是爲true,則會忽略第二個參數,被jms服務器設置爲SESSION_TRANSACTED //第二個參數爲false時,paramB的值可爲Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。 //Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。哪怕是接收端發生異常,也會被看成正常發送成功。 //Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會看成發送成功,並刪除消息。 //DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這裏就是鏈接了一個名爲"text-msg"的隊列,這個會話將會到這個隊列,固然,若是這個隊列不存在,將會被建立 //======================================================= //點對點與訂閱模式惟一不一樣的地方,就是這一行代碼,點對點建立的是Queue,而訂閱模式建立的是Topic destination = session.createTopic("topic-text"); //======================================================= //從session中,獲取一個消息生產者 producer = session.createProducer(destination); //設置生產者的模式,有兩種可選 //DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存 //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列裏面的數據將會被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //建立一條消息,固然,消息的類型有不少,如文字,字節,對象等,能夠經過session.create..方法來建立出來 TextMessage textMsg = session.createTextMessage("哈哈"); long s = System.currentTimeMillis(); for(int i = 0 ; i < 100 ; i ++){ //發送一條消息 textMsg.setText("哈哈" + i); producer.send(textMsg); } long e = System.currentTimeMillis(); System.out.println("發送消息成功"); System.out.println(e - s); //即使生產者的對象關閉了,程序還在運行哦 producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
上面的代碼演示,所有都是發送字符串,可是ActiveMQ支持哪些數據呢?
你們能夠看一下 javax.jms.Message 這個接口,只要是這個接口的數據,均可以被髮送。
或者這樣看起來有點麻煩,那麼看到上面的代碼,建立消息,是經過session這個對象來建立的,那咱們來看一下這裏有哪些能夠被建立的呢?
//純字符串的數據 session.createTextMessage(); //序列化的對象 session.createObjectMessage(); //流,能夠用來傳遞文件等 session.createStreamMessage(); //用來傳遞字節 session.createBytesMessage(); //這個方法建立出來的就是一個map,能夠把它看成map來用,當你看了它的一些方法,你就懂了 session.createMapMessage(); //這個方法,拿到的是javax.jms.Message,是全部message的接口 session.createMessage();
傳遞一個java對象,多是最多的使用方式了,並且這種數據接收與使用都方便,那麼,下面的代碼就來演示下如何發送一個java對象
固然了,這個對象必須序列化,也就是實現Serializable接口
//經過這個方法,能夠把一個對象發送出去,固然,這個對象須要序列化,由於一切在網絡在傳輸的,都是字節 ObjectMessage obj = session.createObjectMessage(); for(int i = 0 ; i < 100 ; i ++){ Person p = new Person(i,"名字"); obj.setObject(p); producer.send(obj); }
那麼在接收端要怎麼接收這個對象呢?
//實現一個消息的監聽器 //實現這個監聽器後,之後只要有消息,就會經過這個監聽器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //一樣的,強轉爲ObjectMessage,而後拿到對象,強轉爲Person Person p = (Person) ((ObjectMessage)message).getObject(); System.out.println(p); } catch (JMSException e) { e.printStackTrace(); } } });
發送文件,這裏用BytesMessage
BytesMessage bb = session.createBytesMessage(); bb.writeBytes(new byte[]{2});
至於這裏的new Byte[]{2},確定不是這樣寫的,從文件裏面拿流出來便可
接收的話
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { BytesMessage bm = (BytesMessage)message; FileOutputStream out = null; try { out = new FileOutputStream("d:/1.ext"); } catch (FileNotFoundException e2) { e2.printStackTrace(); } byte[] by = new byte[1024]; int len = 0 ; try { while((len = bm.readBytes(by))!= -1){ out.write(by,0,len); } } catch (JMSException | IOException e1) { e1.printStackTrace(); } } });
消息發送成功後,接收端接收到了消息。而後進行處理,可是可能因爲某種緣由,高併發也好,IO阻塞也好,反正這條消息在接收端處理失敗了。而點對點的特性是一條消息,只會被一個接收端給接收,只要接收端A接收成功了,接收端B,就不可能接收到這條消息,若是是一些普通的消息還好,可是若是是一些很重要的消息,好比說用戶的支付訂單,用戶的退款,這些與金錢相關的,是必須保證成功的,那麼這個時候要怎麼處理呢?
咱們可使用 CLIENT_ACKNOWLEDGE 模式
以前其實就有提到當建立一個session的時候,須要指定其事務,及消息的處理模式,當時使用的是
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE
這一個代碼的是,當消息發送給接收端以後,就自動確認成功了,而無論接收端有沒有處理成功,而一旦確認成功後,就會把隊列裏面的消息給清除掉,避免下一個接收端接收到一樣的消息。
那麼,它還有另一個模式,那就是 CLIENT_ACKNOWLEDGE
這行要寫在接收端裏面,不是寫在發送端的
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
這行代碼之後,若是接收端不確認消息,那麼activemq將會把這條消息一直保留,直到有一個接收端肯定了消息。
那麼要怎麼確認消息呢?
在接收端接收到消息的時候,調用javax.jms.Message的acknowledge方法
@Override public void onMessage(Message message) { try { //獲取到接收的數據 String text = ((TextMessage)message).getText(); System.out.println(text); //確認接收,併成功處理了消息 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } }
這樣,當消息處理成功以後,確認消息,若是不肯定,activemq將會發給下一個接收端處理
注意:只在點對點中有效,訂閱模式,即便不確認,也不會保存消息
JMQ設計出來的緣由,就是用來避免併發的,和溝通兩個系統之間的交互。
先看一下以前的代碼:
//實現一個消息的監聽器 //實現這個監聽器後,之後只要有消息,就會經過這個監聽器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //獲取到接收的數據 String text = ((TextMessage)message).getText(); System.out.println(text); //確認接收,併成功處理了消息 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } });
以前的代碼裏面,實現了一個監聽器,監聽消息的傳遞,這樣只要每有一個消息,都會即時的傳遞到程序中。
可是,這樣的處理,在高併發的時候,由於它是被動接收,並無考慮到程序的處理能力,可能會壓跨系統,那要怎麼辦呢?
答案就是把被動變爲主動,當程序有着處理消息的能力時,主動去接收一條消息進行處理
實現的代碼以下:
if(當程序有能力處理){//當程序有能力處理時接收 Message receive = consumer.receive();
//這個能夠設置超時時間,超過則不等待消息
recieve.receive(10000); //其實receive是一個阻塞式方法,必定會拿到值的 if(null != receive){ String text = ((TextMessage)receive).getText(); receive.acknowledge(); System.out.println(text); }else{ //沒有值嘛 // } }
經過上面的代碼,就可讓程序自已判斷,本身是否有能力接收這條消息,若是不能接收,那就給別的接收端接收,或者等本身有能力處理的時候接收
ActiveMQ是支持多個接收端的,若是當程序沒法處理這麼多數據的時候,能夠考慮多個線程,或者增長服務器來處理。
這樣的場景也是有的,一條消息的有效時間,當發送一條消息的時候,可能但願這條消息在指定的時間被處理,若是超過了指定的時間,那麼這條消息就失效了,就不須要進行處理了,那麼咱們可使用ActiveMQ的設置有效期來實現
代碼以下:
TextMessage msg = session.createTextMessage("哈哈"); for(int i = 0 ; i < 100 ; i ++){ //設置該消息的超時時間 producer.setTimeToLive(i * 1000); producer.send(msg); }
這裏每一條消息的有效期都是不一樣的,打開ip:8161/admin/就能夠查看到,裏面的消息愈來愈少了。
過時的消息是不會被接收到的。
過時的消息會從隊列中清除,並存儲到ActiveMQ.DLQ這個隊列裏面,這個稍後會解釋。
過時的、處理失敗的消息,將會被ActiveMQ置入「ActiveMQ.DLQ」這個隊列中。
這個隊列是ActiveMQ自動建立的。
若是須要查看這些未被處理的消息,能夠進入這個隊列中查看
//指定一個目的地,也就是一個隊列的位置 destination = session.createQueue("ActiveMQ.DLQ");
這樣就能夠進入隊列中,而後實現接口,或者經過receive()方法,就能夠拿到未被處理的消息,從而保證正確的處理
咱們都知道,打開ip:8161/admin/ 就是activemq的管理控制檯,它的默認帳號和密碼都是admin,在生產環境確定須要更改密碼的,這要怎麼作呢?
在activemq/conf/jetty.xml中找到
<pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <!-- 把這個改成true,固然,高版本的已經改成了true --> <property name="authenticate" value="true" /> </bean>
高版本的已經默認成爲了true。因此咱們直接進行下一步便可
在activemq/conf/jetty-realm.properties文件中配置,打開以下
## --------------------------------------------------------------------------- ## Licensed to the Apache Software Foundation (ASF) under one or more ## contributor license agreements. See the NOTICE file distributed with ## this work for additional information regarding copyright ownership. ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- # Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...] #用戶名,密碼,角色 admin: admin, admin user: user, user
注意:你們重點看倒數第二行,那裏三個分別是用戶名,密碼,角色,其中admin角色是固定的
注意:activemq默認是不須要密碼,生產消費者就能夠鏈接的
咱們須要通過配置,才能設置密碼,這一步在生產環境中必定要配置
找到activemq/conf/activemq.xml,並打開
在<broker>節點中,在<systemUsage>節點上面,增長以下的一個插件
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>
這樣就開啓了密碼認證
而後帳號密碼的配置在activemq/conf/credentials.properties文件中
打開這個文件以下
## --------------------------------------------------------------------------- ## Licensed to the Apache Software Foundation (ASF) under one or more ## contributor license agreements. See the NOTICE file distributed with ## this work for additional information regarding copyright ownership. ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- # Defines credentials that will be used by components (like web console) to access the broker #帳號 activemq.username=admin #密碼 activemq.password=123456 guest.password=password
這樣就配置完畢了。