消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,它能夠在分佈式環境下擴展進程間的通訊。對於消息中間件,常見的角色大體也就有Producer(生產者)、Consumer(消費者)java
常見的消息中間件產品:spring
(1)ActiveMQ數據庫
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。apache
主要特色:服務器
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQPsession
2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)app
3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性tcp
4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上分佈式
5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTAide
6. 支持經過JDBC和journal提供高速的消息持久化
7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點
8. 支持Ajax
9. 支持與Axis的整合
10. 能夠很容易得調用內嵌JMS provider,進行測試
(2)RabbitMQ
AMQP協議的領導實現,支持多種場景。淘寶的MySQL集羣內部有使用它進行通信,OpenStack開源雲平臺的通訊組件,最早在金融行業獲得運用。
(3)ZeroMQ
史上最快的消息隊列系統
(4)Kafka
Apache下的一個子項目 。特色:高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統。適合處理海量數據。
JMS(Java Messaging Service)是Java平臺上有關面向消息中間件的技術規範,它便於消息系統中的Java應用程序進行消息交換,而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。
JMS自己只定義了一系列的接口規範,是一種與廠商無關的 API,用來訪問消息收發系統。它相似於 JDBC(java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您可以經過消息收發服務(有時稱爲消息中介程序或路由器)從一個 JMS 客戶機向另外一個 JML 客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。
JMS 定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。
對於消息的傳遞有兩種類型:
一種是點對點的,即一個生產者和一個消費者一一對應;
另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。
官方網址:http://activemq.apache.org/
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:啓動。
第四步:進入後臺管理: http://192.168.25.130:8161/admin
用戶名:admin
密碼:admin
點對點的模式主要創建在一個隊列上面,當鏈接一個列隊的時候,發送端不須要知道接收端是否正在接收,能夠直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,若是有接收端在監聽,則會發向接收端,若是沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式能夠有多個發送端,多個接收端,可是一條消息,只會被一個接收端給接收到,哪一個接收端先連上ActiveMQ,則會先接收到,然後來的接收端則接收不到那條消息。
【Producer】——生產者:生產消息,發送端
第一步:把jar包添加到工程中。使用5.11.2版本的jar包。
<!-- ActiveMQ客戶端依賴的jar包 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> </dependency>
第二步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第三步:使用ConnectionFactory對象建立一個Connection對象。
第四步:開啓鏈接,調用Connection對象的start方法。
第五步:使用Connection對象建立一個Session對象。
第六步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
第七步:使用Session對象建立一個Producer對象。
第八步:建立一個Message對象,建立一個TextMessage對象。
第九步:使用Producer對象發送消息。
第十步:關閉資源。
@Test public void testQueueProducer() throws Exception { // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。 // brokerURL:服務器的ip及端口號 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616"); // 第二步:使用ConnectionFactory對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接,調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。 // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一個Message對象,建立一個TextMessage對象。 /*ActiveMQTextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer對象發送消息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); }
訪問ActiveMQ首頁,進入Queues標籤後,便可看到發送的消息。(若是點擊Queues或Topics時出現HTTP ERROR: 503的錯誤,處理方式爲:ActiveMQ頁面出現HTTP ERROR: 503錯誤處理方式)
列表各列信息含義以下:
Number Of Pending Messages :等待消費的消息。這個是當前未出隊列的數量。
Number Of Consumers :消費者。這個是消費者端的消費者數量
Messages Enqueued :進入隊列的消息 。進入隊列的總數量,包括出隊列的。
Messages Dequeued :出了隊列的消息 。能夠理解爲是消費這消費掉的數量。
其中:Messages Enqueued -Messages Dequeued =Number Of Pending Messages
【Consumer】——消費者:接收消息
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
//1.建立鏈接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.130:61616"); //2.獲取鏈接 Connection connection = connectionFactory.createConnection(); //3.啓動鏈接 connection.start(); //4.獲取session (參數1:是否啓動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立隊列對象 Queue queue = session.createQueue("test-queue"); //6.建立消息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close();
接收消息後,ActiveMQ首頁的Queues以下
【Producer】
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
@Test public void testTopicProducer() throws Exception { // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。 // brokerURL:服務器的ip及端口號 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616"); // 第二步:使用ConnectionFactory對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接,調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。 // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個topic對象。 // 參數:話題的名稱。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(topic); // 第七步:建立一個Message對象,建立一個TextMessage對象。 TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test"); // 第八步:使用Producer對象發送消息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); }
發送完畢後,查看ActiveMQ後臺的Topics
【Consumer】——消費者:接收消息
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
@Test public void testTopicConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端01。。。"); // 等待接收消息 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); }
Topic和queue的最大區別在於topic是以廣播的形式,通知全部在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。
在e3-manager-service中配置
第一步:引用相關的jar包。
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
第二步:配置Activemq整合spring。配置ConnectionFactory
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.130:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans>
第三步:配置生產者。使用JMSTemplate對象。發送消息。
<!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean>
第四步:在spring容器中配置Destination。
<!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean>
第五步:代碼測試(發送消息)
@Test public void testSpringActiveMq() throws Exception { // 第一步:初始化一個spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第二步:從容器中得到JMSTemplate對象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:從容器中得到一個Destination對象 Destination destination = (Destination) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate對象發送消息,須要知道Destination jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //建立一個消息對象並返回 TextMessage textMessage = session.createTextMessage("spring activemq queue message"); return textMessage; } }); }
在e3-search-service中配置
第一步:把Activemq相關的jar包添加到工程中
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> </dependency>
第二步:建立一個MessageListener的實現類。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
第三步:配置spring和Activemq整合。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
第四步:測試代碼。
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }