#情景引入
小白:起牀起牀起牀起牀。。。。快起牀~
我:怎麼了又,大驚小怪,嚇到我了。
小白:我有事有事想找你,十萬火急呢~~
我:你能有什麼事?反正我不信。。那你說說看~~
小白:就是我有兩個小表弟,叫大白和二白,他們如今天天睡覺以前都要分別和我聊天,讓我給他們講故事,若是不講他們就不睡覺。可是,若是一個個的跟他們輪流來講的話,我就須要天天說兩遍,並且我還要找準他們的時間點,這個有時候我有事情都沒法實現這個問題,他們就會很生氣。。。
我:這不是挺好的嘛,小孩子就是愛聽故事的呀。。。
小白:我也願意講,可是時間這個不是很好控制,有沒有相似,好比我能夠以前就描述好了,而後定點給他們兩個一塊兒發消息,而能夠拋開時間和其餘因素的影響呢?
我:這個嘛,很簡單呀,你可讓他們關注你的一個公衆號,這樣你再定時的推送給他們故事不就能夠了嘛。。或者,你能夠拉他們進你的一個羣這樣,就方便了呀~
小白:這樣是能夠,可是若是之後還有小表妹要聽我講,我就要如此反覆的作。。感謝好麻煩好麻煩。。。
我:emmm,我理解你的意思,你就想實現一種不少人都可以進行相似一種消息推送的方式嘛。。。
小白:對的對的。。就是這樣一種,,,我記得咱們在技術方面好像也有一種相似的技術,這個叫作什麼去了呢?
我:這就是消息中間件,一種生產者和消費者的關係。
小白:我也想學我也想學,,你快給我講講,給我講講唄。。
我:真拿你沒辦法,好吧。。。下面我就給你講一下這方面的知識。
#情景分析
其實,小白的這個問題,是一種比較廣泛的問題。既然咱們做爲技術人員,固然咱們就要從技術成分去分析如何解決了。這裏面其實就是包含着一種消息中間件的技術。它也是最近技術層面用得很是很是多的,這也是很是值得咱們進行學習。。這在現在的秒殺系統,推薦系統等等,都有普遍的應用。。因此,這章我就主要來跟你們說說這方面的知識。
#基本概念的引導
本模塊主要講解關於消息中間件的相關基礎知識,也是方便咱們後面的學習。
###什麼是中間件?
非操做系統軟件,非業務應用軟件,不是直接給最終用戶使用,不能直接給用戶帶來價值的軟件,咱們就能夠稱爲中間件(好比Dubbo,Tomcat,Jetty,Jboss都是屬於的)。
###什麼是消息中間件?
百度百科解釋:消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。
關鍵點:關注於數據的發送和接受,利用高效可靠的異步消息機制傳遞機制集成分佈式系統。
先簡單的用下面這個圖說明:
###爲何要使用消息中間件
舉幾個例子,我想你就會明白了。(其實使用消息中間件主要就是爲了解耦合和異步兩個做用)
1:微博,都用過吧。那麼,當咱們新關注一個用戶,那麼系統會相應的推送消息給咱們,而且還作了不少關於咱們關注的處理。這就是消息中間件的異步。
2:秒殺系統。100件商品,幾十萬我的在搶,那這個怎麼弄呢?總不能就把服務器給宕機了吧。那麼就能夠把用戶的請求進行緩存,而後再異步處理。
3:系統A給系統B進行通訊,而系統B須要對A的消息進行相應處理以後才能給A反饋,這時候,總不能讓A就傻傻等着吧。那麼,這就是異步的功能。
###什麼是JMS?
Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
總結起來講就是:Java對於應用程序之間進行信息交互的API(並且是異步)。
裏面有下面的概念須要理解,對後續有幫助:html
提供者:實現JMS的消息服務中間件服務器。java
客戶端:發送或接受消息的應用。web
生產者/發佈者:建立併發送消息的客戶端。spring
消費者/訂閱者:接受並處理消息的客戶端。sql
消息:應用程序之間傳遞的數據。apache
消息模式:在客戶端之間傳遞消息的模式,JMS主要是隊列模式和主體模式。windows
隊列模式特色:
(1)客戶端包括生產者和消費者。
(2)隊列中的一個消息只能被一個消費者使用。
(3)消費者能夠隨時取消息。瀏覽器
主體模式特色:
(1)客戶端包括髮布者和訂閱者。
(2)主題中的消息能夠被全部訂閱者消費。
(3)消費者不能消費訂閱以前發送的消息。
###什麼是AMQP?
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。
簡單點說:就是對於消息中間件所接受的消息傳輸層的協議(不懂傳輸層,那麼就須要多看看計算機網絡相關知識了,OSI的層次劃分),只有這樣才能保證客戶端和消息中間件可以進行交互(換位思考:HTTP和HTTPS甚至說是TCP/IP與UDP協議都要的道理)。
emmm,比較一下JMS和AMQP的不一樣吧。。緩存
JMS是定義與Java,而AMQP是一種傳輸層協議。tomcat
JMS是屬於Java的API,而AMQP是跨語言的。
JMS消息類型只有兩種(主題和隊列,後續會說),而AMQP是有五種。
JMS主要就是針對Java的開發的Client,而AMQP是面向消息,隊列,路由。
###什麼是ActiveMQ呢?
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
簡單點說:不就是爲了實現我上述所想要的需求嘛。而後它就是一種實現的方式。就好比,Tomcat是什麼?不就是爲了實現一種client與服務器之間的交互的一種產品嘛。。因此,不須要死記概念,本身理解就好。
#ActiveMQ的安裝
##環境:Windows
步驟:
(1)登陸到ActiveMQ的官網,下載安裝包。http://activemq.apache.org/activemq-5154-release.html
(2)下載Zip文件
(3)解壓Zip文件,目錄以下
(4)啓動ActiveMQ服務(注意:要右鍵以管理員身份進行運行)
注意:有兩種方式,第一種就是相似tomcat啓動,那麼啓動圖會一直顯示。
而第二種的話,就是把這個ActiveMQ註冊到服務列表中,這樣更方便咱們進行操做。(推薦使用這種)
(5)登陸,驗證是否啓動成功
(6)進入管理頁面
OK,進入以後就能夠看咱們的管理頁面啦。。。是否是很簡單呢?
##環境:Linux
步驟:(多餘的我就很少說了。。。請看windows的步驟)
(1)一樣須要下載對應的文件。後綴爲tar.gz的這樣的。其實能夠直接經過下面的這個命令下載,快速一點,省得要移動到Linux(注意:若是是經過ssh鏈接的方式的話)。
wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz
(2)而後解壓下載的文件
(3)一樣進入相對應的目錄,運行
./activemq start
(4)而後再訪問相同的地址就能夠看到啦。(具體看windows安裝步驟)
#ActiveMQ的使用(基於Maven)
首先要再回頭看看JMS中的一些關鍵接口。
<!--添加activemq的依賴--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
###情形一:隊列模型的消息
3. 編寫生產者代碼(使用隊列模型的消息)
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用於消息的建立類 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的隊列名稱 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createQueue(QUEUE_NAME); //建立一個生產者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模擬100個消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("我發送message:" + i); //發送消息 producer.send(message); //在本地打印消息 System.out.println("我如今發的消息是:" + message.getText()); } //關閉鏈接 connection.close(); } }
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消費者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的隊列名稱 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createQueue(QUEUE_NAME); //建立消費者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消費的監聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("獲取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
查看是否進行了消費
**備註:**我上面進行的是隊列模式的消息,並且進行的都是單個消費者,那若是我換成同時有兩個消費者消費生產者的消息會怎麼樣呢?(咱們只須要運行兩個消費者就能夠啦。固然,要保證生產者是產生了消息的哦~~~~不然,拿什麼消費呢~)
一個生產者,兩個消費者的狀況以下:
切記:先運行兩個消費者,而後再運行生產者代碼:
結果以下:
其實,這就是解釋了,我以前說的,隊列模式的消息,是隻會被一個消費者所使用的,而不會被共享,這也就是和主題模型的差異哦~~~哈哈
###情形二:主題模型的消息
前面的步驟都同樣,只是生產者和消費者的代碼有點區別:
編寫生產者(這個和隊列模型其實很像,稍微修改就能夠)
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:48 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicProducer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的主題名稱 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createTopic(TOPIC_NAME); //建立一個生產者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模擬100個消息 for (int i = 1; i <= 100; i++) { TextMessage message = session.createTextMessage("當前message是(主題模型):" + i); //發送消息 producer.send(message); //在本地打印消息 System.out.println("我如今發的消息是:" + message.getText()); } //關閉鏈接 connection.close(); } }
package com.hnu.scw.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:50 2018/7/14 0014 * @ Description:${description} * @ Modified By: * @Version: $version$ */ public class MessageTopicConsumer { //定義ActivMQ的鏈接地址 private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; //定義發送消息的隊列名稱 private static final String TOPIC_NAME = "MyTopicMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createTopic(TOPIC_NAME); //建立消費者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消費的監聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("獲取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hnu.scw</groupId> <artifactId>activemq</artifactId> <version>1.0-SNAPSHOT</version> <name>activemq</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--添加activemq的依賴--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <!--spring整合activemq所須要的依賴--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.0.0</version> </plugin> <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.20.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> </plugins> </pluginManagement> </build> </project>
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd "> <context:annotation-config /> <!--Activemq的鏈接工廠--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> </bean> <!--spring jms爲咱們提供的鏈接池 獲取一個鏈接工廠--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 消息目的地 點對點的模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="SpringActiveMQMsg"/> </bean> <!-- jms模板 用於進行消息發送--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
package com.hnu.scw.spring; /** * @ Author :scw * @ Date :Created in 下午 12:19 2018/7/14 0014 * @ Description:生產者的接口 * @ Modified By: * @Version: $version$ */ public interface ProduceService { void sendMessage(String msg); }
package com.hnu.scw.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 下午 2:21 2018/7/15 0015 * @ Description:生產者的實現類 * @ Modified By: * @Version: $version$ */ public class ProduceServiceImpl implements ProduceService { @Autowired private JmsTemplate jmsTemplate; @Resource(name = "queueDestination") private Destination destination; /** * 發送消息 * @param msg */ @Override public void sendMessage(final String msg) { jmsTemplate.send(destination , new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); return textMessage; } }); System.out.println("如今發送的消息爲: " + msg); } }
<!--注入咱們的生產者--> <bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @ Author :scw * @ Date :Created in 下午 2:27 2018/7/15 0015 * @ Description:生產者的測試 * @ Modified By: * @Version: $version$ */ public class ProducerTest { public static void main(String[] args){ ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml"); ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class); //進行發送消息 for (int i = 0; i < 100 ; i++) { bean.sendMessage("test" + i); } //當消息發送完後,關閉容器 classPathXmlApplicationContext.close(); } }
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd "> <context:annotation-config /> <!--Activemq的鏈接工廠--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> </bean> <!--spring jms爲咱們提供的鏈接池 獲取一個鏈接工廠--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 消息目的地 點對點的模式--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="SpringActiveMQMsg"/> </bean> <!-- 配置消息監聽器--> <bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/> <!--配置消息容器--> <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!--配置鏈接工廠--> <property name="connectionFactory" ref="connectionFactory"/> <!--配置監聽的隊列--> <property name="destination" ref="queueDestination"/> <!--配置消息監聽器--> <property name="messageListener" ref="consumerMessageListener"/> </bean> </beans>
package com.hnu.scw.spring; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @ Author :scw * @ Date :Created in 下午 3:06 2018/7/15 0015 * @ Description:消息的監聽者,用於處理消息 * @ Modified By: * @Version: $version$ */ public class ComsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受到消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
package com.hnu.scw.spring; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @ Author :scw * @ Date :Created in 下午 3:13 2018/7/15 0015 * @ Description:消費者的測試 * @ Modified By: * @Version: $version$ */ public class ConsumerTest { public static void main(String[] args){ //啓動消費者 ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml"); } }
<!-- 消息目的地 (主題模式)--> <!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!–配置隊列模型的消息名稱–> <constructor-arg value="SpringActiveMQMsgTopic"/> </bean>-->
將上面的代碼替換以前的就能夠了。。。
總結:總的來講,基於Spring來使用消息隊列仍是很是方便的,這比咱們正常進行JMS規範操做要簡單不少,畢竟不少對象都是經過Spring的IOC進行容器管理了,因此,值得推薦使用哦~~~
#ActiveMQ的集羣
###爲何要進行集羣呢?
緣由一:實現高可用:以排除單點故障所引發的服務終端。
緣由二:實現負載均衡:以提高效率爲更多的客戶進行服務。
###集羣的方式有哪些?
方式一:客戶端集羣:多個客戶端消費同一個隊列。
方式二:Broker clusters:多個Broker之間同步消息。(實現負載均衡)
這個的實現原理主要是經過網絡鏈接器來進行。
網絡鏈接器:用於配置ActiveMQ服務器與服務器之間的網絡通信方式,用於服務器透析消息。主要分爲靜態鏈接和動態鏈接。
方式三:Master Slave :實現高可用。
這種方式的話,能夠聯想到Mysql的主從配置和Zookeeper的負載均衡的主競爭關係master。
咱們在實際的開發中,通常都是將方式二和方式三進行集成,從而實現高可用和負載均衡。下面的話,我也就這樣的配置思想來進行講解:(經過三臺服務器來模擬消息集羣的實現)
其中的NodeB和NodeC就是一張Master/slave的關係。均可以成爲主服務器。(只要它們某一個宕機,那麼就會其他的一臺就進行繼續服務)
###搭建步驟(基於Windows環境,而Linux環境也是同樣的操做)
三臺服務器的大致功能和描述:
因爲本身沒有三臺服務器,因此就用本身的一臺電腦來模擬三臺消息服務器,其實這個就是假設有三個不一樣ActiveMQ消息服務器了。
<networkConnectors> <networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" /> </networkConnectors>
<!--修改服務端口--> <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <networkConnectors> <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> <!--並修改下面這個標籤的內容 , 做爲B和C的共享文件,目錄就是本身以前建立的一個文件(能夠回看上面的整個結構)--> <persistenceAdapter> <kahaDB directory="D:\Download\MQJiQun\shareDB"/> </persistenceAdapter>
(2)修改jetty.xml內容,修改服務器的服務端口
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8162"/> </bean>
<!--修改服務端口--> <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <networkConnectors> <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" /> </networkConnectors> <!--並修改下面這個標籤的內容 , 做爲B和C的共享文件,目錄就是本身以前建立的一個文件(能夠回看上面的整個結構)--> <persistenceAdapter> <kahaDB directory="D:\Download\MQJiQun\shareDB"/> </persistenceAdapter>
(2)修改jetty.xml中的內容
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8163"/> </bean>
步驟:
(1)建立Maven項目
(2)導入依賴
<!--添加activemq的依賴--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
(3)編寫生產者代碼
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:06 2018/7/14 0014 * @ Description:用於消息的建立類 * @ Modified By: * @Version: $version$ */ public class MessageProducer { //經過集羣的方式進行消息服務器的管理(failover就是進行動態轉移,當某個服務器宕機, // 那麼就進行其餘的服務器選擇,randomize表示隨機選擇) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定義發送消息的隊列名稱 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createQueue(QUEUE_NAME); //建立一個生產者 javax.jms.MessageProducer producer = session.createProducer(destination); //建立模擬100個消息 for (int i = 1 ; i <= 100 ; i++){ TextMessage message = session.createTextMessage("當前message是:" + i); //發送消息 producer.send(message); //在本地打印消息 System.out.println("我如今發的消息是:" + message.getText()); } //關閉鏈接 connection.close(); } }
(4)編寫消費者代碼
package com.hnu.scw.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ Author :scw * @ Date :Created in 上午 11:30 2018/7/14 0014 * @ Description:消息消費者 * @ Modified By: * @Version: $version$ */ public class MessageConsumer { //經過集羣的方式進行消息服務器的管理(failover就是進行動態轉移,當某個服務器宕機, // 那麼就進行其餘的服務器選擇,randomize表示隨機選擇) private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true"; //定義發送消息的隊列名稱 private static final String QUEUE_NAME = "MyMessage"; public static void main(String[] args) throws JMSException { //建立鏈接工廠 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //建立鏈接 Connection connection = activeMQConnectionFactory.createConnection(); //打開鏈接 connection.start(); //建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列目標 Destination destination = session.createQueue(QUEUE_NAME); //建立消費者 javax.jms.MessageConsumer consumer = session.createConsumer(destination); //建立消費的監聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("獲取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
(5)進行查看各自的服務器的消息隊列的狀況。
#其餘的消息中間件
其實,相似ActiveMQ這樣的消息中間件,用得比較多的還有就是RabbitMQ和Kafka。它們三者各自有各自的優點。你們能夠百度進行了解,我就不進行多說了。後面我會一樣把這兩種消息中間件的使用進行詳細的講解,歡迎你們的關注哦~總的來講,只有適合的場景對應的消息中間件才能發揮最大的做用,沒有一種是隻有好處而沒有壞處的~
#總結