消息中間件--ActiveMQ&JMS消息服務

### 消息中間件 ###

----------

**消息中間件**

1. 消息中間件的概述
2. 消息中間件的應用場景
  * 異步處理
  * 應用解耦
  * 流量削峯
  * 消息通訊
 
----------

### JMS消息服務 ###

----------

**JMS的概述**

1. JMS消息服務的概述
2. JMS消息模型
  * P2P模式
  * Pub/Sub模式
 
3. 消息消費的方式
  * 同步的方式---手動
  * 異步的方式---listener監聽
 
4. JMS編程模型
----------

### 消息中間件:ActiveMQ ###

----------

**ActiveMQ的下載與安裝**

1. ActiveMQ的下載與安裝
  * 下載ActiveMQ的壓縮文件,解壓apache-activemq-5.14.5-bin.zip文件
  * 雙擊運行:activemq.bat文件,啓動服務
 
2. 測試ActiveMQ是否安裝成功
  * 打開瀏覽器,輸入:http://localhost:8161
 
3. 點擊Manage ActiveMQ broker鏈接,能夠查看ActiveMQ中已經發布的消息等
  * 用戶名密碼都是:admin
----------

**ActiveMQ的消息隊列方式入門**(P2P模式)

1. 在父工程的pom.xml文件中引入ActiveMQ和Spring整合JMS的座標依賴
<!-- activemq start -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.2.0</version>
</dependency>
<!-- activemq end -->
<!-- spring 與 mq整合 start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<!-- spring 與 mq整合 end -->
 
2. ActiveMQ的向消息隊列中發送消息的入門程序(沒有使用Spring整合JMS的方式)
@Test public void sendQueueMessage() throws JMSException { // 1 建立鏈接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工廠,建立鏈接
Connection connection = factory.createConnection(); // 3 啓動鏈接
connection.start(); // 4 使用鏈接,建立會話,true表示開始事務,代碼執行後須要提供事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 5 建立隊列隊形(myQueue--隊列的名字)/topic-----------session建立
Queue queue = session.createQueue("myQueue"); // 6 建立生產者-----------session建立
MessageProducer producer = session.createProducer(queue); // 7 建立消息----文本消息-------session建立
TextMessage message = session.createTextMessage(); message.setText("helloworld!!!"); // 8 發送消息
producer.send(message); // 9 提交事務
session.commit(); session.close(); connection.close(); }
3. ActiveMQ從消息隊列中獲取消息
@Test public void receiverQueueMessage() throws JMSException { // 1 建立鏈接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工廠,建立鏈接
Connection connection = factory.createConnection(); // 3 啓動鏈接
connection.start(); // 4 使用鏈接,建立會話,true表示開始事務,代碼執行後須要提供事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 5 建立隊列隊形(hello--隊列的名字)/topic-----------session建立
Queue queue = session.createQueue("myQueue"); // 6 建立消費者-----------session建立
MessageConsumer consumer = session.createConsumer(queue); // 7 接收消息----text格式
TextMessage receive = (TextMessage) consumer.receive(); String text = receive.getText(); System.out.println("接收到的消息====" + text); // 8 提交事務
session.commit(); session.close(); connection.close(); }
4. 使用監聽器的方式,從隊列中消費消息
/** *異步方式 Queue接受用Listener方式接受,多用 若是有多個監聽listener,則交替執行 * @throws Exception */ @Test public void receiverQueueListener() throws Exception{ // 1 建立鏈接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); // 2 使用工廠,建立鏈接
Connection connection = factory.createConnection(); // 3 啓動鏈接
connection.start(); // 4 使用鏈接,建立會話,true表示開始事務,代碼執行後須要提供事務//死循環的不能用事物
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5 建立隊列隊形(hello--隊列的名字)/topic-----------session建立
Queue queue = session.createQueue("myQueue"); // 6 建立消費者-----------session建立
MessageConsumer consumer = session.createConsumer(queue); //7 // 給消費者添加監聽器
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { TextMessage message = (TextMessage) msg; try { System.out.println("Listener1111111111接收到的消息是=="+message.getText()); } catch (JMSException e) { // TODO Auto-generated catch block
e.printStackTrace(); } } }); while(true){} // 使用監聽器的方式不能關閉,須要監聽器一直工做 // session.commit(); // session.close(); // connection.close();
}

 

**ActiveMQ的消息訂閱方式入門**(Pub/Sub模式

/** * Topic發送 * @throws JMSException */ @Test public void sendTopicMessage() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 建立消息訂閱
Topic topic = session.createTopic("myTopic"); // 建立生產者
MessageProducer producer = session.createProducer(topic); // 建立消息,一組能夠存儲key value的消息
MapMessage message = session.createMapMessage(); message.setString("username", "cgx"); message.setString("password", "123456"); // 發送消息
producer.send(message); // 提交事務
session.commit(); session.close(); connection.close(); }
/** * Topic接受 * * @throws JMSException */ @Test public void testReceiverMessage() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 建立消息訂閱
Topic topic = session.createTopic("myTopic"); // 建立消費者
MessageConsumer consumer = session.createConsumer(topic); // 接收消息
MapMessage message = (MapMessage) consumer.receive(); System.out.println(message.getString("username")); System.out.println(message.getString("password")); session.commit(); session.close(); connection.close(); }
/** * Topic接受Listener監聽方式 * * @throws Exception */ @Test public void receiverQueueListener() throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立消息訂閱
Topic topic = session.createTopic("myTopic"); // 建立消費者
MessageConsumer consumer = session.createConsumer(topic); // 給消費者添加監聽器consumer添加監聽
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { MapMessage message = (MapMessage) msg; try { System.out.println(message.getString("username")); System.out.println(message.getString("password")); } catch (JMSException e) { e.printStackTrace(); } } }); while (true) { } }

 

### Spring整合ActiveMQ ###★★★★★

----------
 
**Spring整合ActiveMQ**★★★★★
 
1. 建立applicationContext-mq.xml的配置文件,導入約束★★★★★
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
</beans>

 

2. 具體的配置以下★★★★★
applicationContext-mq.xml===================mq的消息發送(消息生產者)
<!-- 配置鏈接工廠 -->
<!-- ActiveMQ 鏈接工廠 -->
<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!-- 若是鏈接網絡:tcp://ip:61616;未鏈接網絡:tcp://localhost:61616 以及用戶名,密碼-->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
 
<!-- Spring Caching鏈接工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session緩存數量和連接數有關 -->
<property name="sessionCacheSize" value="100" />
</bean>
 
<!-- 定義JmsTemplate的Queue類型★★★★★ -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(發佈/訂閱),即隊列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
 
<!-- 定義JmsTemplate的Topic類型★★★★★ -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate" >
<!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(發佈/訂閱) -->
<property name="pubSubDomain" value="true" />
</bean>

 

 
3. 發送消息的代碼以下★★★★★
  3.1.Queue方式:★★★★★
@Autowired @Qualifier(value="jmsQueueTemplate") private JmsTemplate queueTemplate;//Queue
 
 
/** * Queue發送消息---spring框架 */ @Test public void sendQueueMessage() { // 發送消息 構造參數指定目標,由於配置文件中的隊列和訂閱模式是經過id與false和true進行區分
queueTemplate.send("myQueue", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { // 使用session建立消息,發送
TextMessage textMessage = session.createTextMessage("測試結合spring框架發送queue消息"); return textMessage; } }); }
  3.2.Topic方式:★★★★★
@Autowired @Qualifier(value = "jmsTopicTemplate") private JmsTemplate topicTemplate;//Topic
 
/** * Topic發送消息---spring框架 */ @Test public void sendTopicMessage() { topicTemplate.send("spring_topic", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("username", "mdzz"); return mapMessage; } }); }

 

4. 接收消息的代碼以下==========不提倡手動,要用監聽器異步獲取
/** * Queue接收消息---spring框架 * 同步手動:不提倡 * receive("myQueue")要寫目標,不寫目標的話會報找不到目標的錯誤NO defaultDestination */ @Test public void receiverMessage() { //接收消息textMessage類型
TextMessage textMessage = (TextMessage) queueTemplate.receive("myQueue"); try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }
 
**Spring配置監聽器**★★★★★★★★★★★★★★★
 
1. 自定義監聽器代碼的編寫----接收消息---spring框架---實現MessageListener接口★★★★★
  1.1.Queue:★★★★★
@Component(value="queueConsumer1") public class QueueListener implements MessageListener { @Override public void onMessage(Message arg0) { // 把arg0強轉
TextMessage textMessage = (TextMessage) arg0; try { // 輸出消息
System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
 
  1.2.Topic:發送一個,兩個都會接受★★★★★topic特色:有幾個監聽幾個都會同時收到
@Component public class TopicConsumer1 implements MessageListener { @Override public void onMessage(Message arg0) { MapMessage mapMessage = (MapMessage) arg0; try { System.out.println("TopicConsumer1===="+mapMessage.getString("username")); } catch (JMSException e) { e.printStackTrace(); } } } @Component public class TopicConsumer2 implements MessageListener { //...
}
 
2. 編寫配置文件
applicationContext-mq-consumer.xml=============mq的消息接受(負責監聽接受消息)
<!-- 掃描包 -->
<context:component-scan base-package="com.my.jms.consumer" />
 
<!-- ActiveMQ 鏈接工廠 -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
 
<!-- Spring Caching鏈接工廠 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
 
<!-- Spring JmsTemplate 的消息生產者 start-->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory">
<jms:listener destination="myQueue" ref="queueConsumer1"/>
</jms:listener-container>
 
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory">
<jms:listener destination="spring_topic" ref="topicConsumer1"/>
<jms:listener destination="spring_topic" ref="topicConsumer2" />
</jms:listener-container>
 
3.不用啓動項目,把spring配置文件applicationContext-mq-consumer.xml啓動起來,能夠用採用下面方法
新建一個test類,讓他一直啓動着,這樣就一直加載spring的配置文件
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-mq-consumer.xml") public class SpringQueueListenerTest { @Test public void test(){ while(true); } }

 

4.只要發送端(發送消息---spring框架)一啓動,監聽器就會監聽到,就會輸出:測試結合spring框架發送queue消息★★★★★

spring整合總結:

消息發送
  1. 建立spring容器
  2. 從容器中獲取JMSTemplate對象,發送消息
  3. 定義Destination
  4. 使用JMSTemplate對象發送消息
消息接受
  1. 建立一個類實現MessageListener 接口。業務處理在此類中實現。
  2.在spring容器中配置DefaultMessageListenerContainer對象,引用MessageListener 實現類對象接收消息。spring

項目整合ActiveMQ:apache

1. 消息生產者整合ActiveMQ
  消息生產者只須要發送消息
  須要把JMSTemplate和Destination交給spring進行管理編程

 部分代碼:
/**===========================activeMQ消息發送========================================*/
// 發送消息!!!
this.send("save", item.getId()); } @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination destination; /** * 此方法就是用來發送消息的 * 考慮:一、發送什麼數據?二、我須要什麼數據? * 在消息中須要:一、消息的標識:save,delete,update;二、商品的ID */
private void send(final String type, final Long itemId) { // TODO Auto-generated method stub
jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //建立消息體
TextMessage textMessage = new ActiveMQTextMessage(); //設置消息內容
Map<String, Object> map = new HashMap<>(); map.put("type", type); map.put("itemId", itemId); try { ObjectMapper mapper = new ObjectMapper(); textMessage.setText(mapper.writeValueAsString(map)); } catch (Exception e) { e.printStackTrace(); } return textMessage; } }); }

 


2. 消息消費改造
  在search-service添加
  ItemMessageListener:json

/**===========================activeMQ消息發送========================================*/ @Autowired private SearchService searchService; @Override public void onMessage(Message message) { //先判斷此消息類型是不是TextMessage
if(message instanceof TextMessage){ //若是是,強轉
TextMessage textMessage = (TextMessage)message; try { //獲取消息:json
String json = textMessage.getText(); //傑克遜第三做用:直接解析json數據
ObjectMapper mapper = new ObjectMapper(); JsonNode jsonNode = mapper.readTree(json); String type = jsonNode.get("type").asText(); Long itemId = jsonNode.get("itemId").asLong(); //根據解析出來的type,判斷此type=save的時候我應該調用indexSearch方法
if("save".equals(type)){ searchService.indexItem(itemId); } } catch (Exception e) { e.printStackTrace(); } } }

索引庫增長商品會觸發mq:瀏覽器

SearchServiceImpl:緩存

@Override public void indexItem(Long itemId) throws Exception { Item item = this.itemMapper.selectByPrimaryKey(itemId); SolrInputDocument doc = new SolrInputDocument(); doc.addField("id", item.getId()); doc.addField("item_title", item.getTitle()); doc.addField("item_image", item.getImage()); doc.addField("item_cid", item.getCid()); doc.addField("item_price", item.getPrice()); doc.addField("item_status", item.getStatus()); this.cloudSolrServer.add(doc); this.cloudSolrServer.commit(); }
相關文章
相關標籤/搜索