1. 學習計劃
一、什麼是MQjava
二、MQ的應用場景spring
三、ActiveMQ的使用方法。數據庫
四、使用消息隊列實現商品同步。apache
2. 同步索引庫分析
方案一:在manager(後臺)中,添加商品的業務邏輯中,添加一個同步索引庫的業務邏輯。服務器
缺點:這樣違背了服務單一職能的原則,業務邏輯耦合度高,業務拆分不明確。session
方案二:業務邏輯在search中實現,調用服務在manager實現。業務邏輯分開。app
缺點:服務之間的耦合度變高,search服務依賴manager服務,服務的啓動有前後順序。dom
方案三:使用消息隊列。MQ是一個消息中間件。tcp
MQ是一個消息中間件,ActiveMQ、RabbitMQ、kafka(大數據)。ide
系統服務之間,非直接通訊,而是經過MQ進行轉發,這樣既解決了系統之間的通訊問題,同時也避免了系統之間的依賴和耦合。
消息隊列的主要應用:
解決系統之間的通訊問題
下降系統之間的耦合度
互聯網項目中,爲了用戶的體驗,必須遵照快速響應用戶的原則,好比在電商項目中,當用戶下單以後,其實還有不少的後續業務須要完成,用戶根本不可能等你的流程所有處理完才獲得下單反饋。這是後咱們能夠利用消息隊列,在既可以快速響應用戶的同時,有能後將業務消息壓入MQ中,經過MQ的流轉,通知後續業務的開展,達到數據最終一致。
3. ActiveMQ
3.1. 什麼是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
主要特色:
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性
4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上
5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持經過JDBC和journal提供高速的消息持久化
7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點
8. 支持Ajax
9. 支持與Axis的整合
10. 能夠很容易得調用內嵌JMS provider,進行測試
3.2. ActiveMQ的消息形式
對於消息的傳遞有兩種類型:
一種是點對點的,即一個生產者和一個消費者一一對應;(只能有一個消費者)
另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。(廣播)
JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。
· StreamMessage -- Java原始值的數據流
· MapMessage--一套名稱-值對
· TextMessage--一個字符串對象
· ObjectMessage--一個序列化的 Java對象
· BytesMessage--一個字節的數據流
4. ActiveMQ的安裝
進入http://activemq.apache.org/下載ActiveMQ
使用的版本是5.12.0
4.1. 安裝環境:
一、須要jdk
二、安裝Linux系統。生產環境都是Linux系統。
4.2. 安裝步驟
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:啓動。
4.3.啓動,查看,關閉
使用bin目錄下的activemq命令啓動:
[root@localhost bin]# ./activemq start
關閉:
[root@localhost bin]# ./activemq stop
查看狀態:
[root@localhost bin]# ./activemq status
4.4 細節
注意:若是ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2
進入管理後臺:
http://192.168.25.168:8161/admin
用戶名:admin
密碼:admin
503錯誤解決:
一、查看機器名
[root@arjenlee168 bin]# cat /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=arjenlee168
二、修改host文件
[root@arjenlee168 bin]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 arjenlee168
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
[root@arjenlee168 bin]#
三、重啓Activemq服務
5. ActiveMQ的使用方法
下面是java代碼依據JMS規範操做MQ。
5.1. Queue
- Queue消息形式,服務端默認進行持久化。
- Queue消息形式,只要被任意一個consumer消費後,服務端消除該消息,即一個消息只能被一個consumer消費。
若是消息沒有被消費,則會一直被保存在服務端,直到被消費爲止。
5.1.1. Producer
生產者:生產消息,發送端。
把jar包添加到工程中。使用5.11.2版本的jar包。
5.1.1.1 建立步驟
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
5.1.1.2 代碼示例
@Test public void testQueueProducer() throws Exception { // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。 //brokerURL服務器的ip及端口號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); // 第二步:使用ConnectionFactory對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接,調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 //第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。 //第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。 //參數:隊列的名稱。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一個Message對象,建立一個TextMessage對象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer對象發送消息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); }
5.1.2. Consumer
消費者:接收消息。
5.1.2.1 建立步驟
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
5.1.2.2 代碼示例
@Test public void testQueueConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致queue,而且隊列的名稱一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); }
5.2. Topic
- Topic消息形式,服務端默認不進行持久化存儲。
- Topic消息能夠被多個consumer接收到,不會由於某一個consumer的消費行爲,而是的消息被服務端刪除。
5.2.1. Producer
5.2.1.1. 建立步驟
第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象建立一個Connection對象。
第三步:開啓鏈接,調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
第六步:使用Session對象建立一個Producer對象。
第七步:建立一個Message對象,建立一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
5.2.1.2. 示例代碼
@Test public void testTopicProducer() throws Exception { // 第一步:建立ConnectionFactory對象,須要指定服務端ip及端口號。 // brokerURL服務器的ip及端口號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); // 第二步:使用ConnectionFactory對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接,調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 // 第一個參數:是否開啓事務。true:開啓事務,第二個參數忽略。 // 第二個參數:當第一個參數爲false時,纔有意義。消息的應答模式。一、自動應答二、手動應答。通常是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個topic對象。 // 參數:話題的名稱。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(topic); // 第七步:建立一個Message對象,建立一個TextMessage對象。 /* * TextMessage message = new ActiveMQTextMessage(); message.setText( * "hello activeMq,this is my first test."); */ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test"); // 第八步:使用Producer對象發送消息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); }
5.2.2. Consumer
消費者:接收消息。
5.2.2.1. 建立步驟
第一步:建立一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中得到一個Connection對象。
第三步:開啓鏈接。調用Connection對象的start方法。
第四步:使用Connection對象建立一個Session對象。
第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。
第六步:使用Session對象建立一個Consumer對象。
第七步:接收消息。須要利用MessageListener。
第八步:打印消息。
第九步:關閉資源
5.2.2.2. 示例代碼
@Test public void testTopicConsumer() throws Exception { // 第一步:建立一個ConnectionFactory對象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); // 第二步:從ConnectionFactory對象中得到一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 第三步:開啓鏈接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象建立一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象建立一個Destination對象。和發送端保持一致topic,而且話題的名稱一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session對象建立一個Consumer對象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的內容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); }
6. Activemq整合spring
使用spring提整合ActiveMQ,能夠避免如上繁瑣的使用步驟。
6.1. 配置producer
首先是spring和ActiveMQ整合中,如何配置producer
6.1.1. 導入整合包
第一步:引用spring和ActiveMQ整合的相關jar包。
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
6.1.2. 配置ConnectionFactory
第二步:配置Activemq整合spring。配置ConnectionFactory
spring對ConnectionFactory進行了更上一層的包裝(接口),真正的connectionFactory由JMS服務廠商提供,在這裏是ActiveMQConnectionFactory,須要注入到spring中的connectionFactory。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans>
6.1.2. 配置生產者
第三步:配置生產者:spring中提供了一個模板,JMSTemplate用來簡化如上展現的發送消息的步驟。
使用JMSTemplate對象。發送消息,template中須要注入connectionFactory。
<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean>
6.1.3. 配置Destination
第四步:在spring容器中配置Destination。能夠是Queue,也能夠是Topic
<!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean>
6.1.4. 完整的配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> </beans>
下面看一下spring和ActiveMQ整合中,如何配置cousumer
6.2. 配置consumer
- 導入jar包
- 配置connectionFactory
- 配置destination
以上同producer
6.2.1. 建立MessageListener
消息的接受須要用到,MessageListener,listener一旦監聽到有消息傳來,便會執行。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
6.2.2 配置消費者
與是手動建立consumer接受消息的代碼相比,在spring中,咱們只須要配置消息監聽器和消息監聽器容器,而後啓動spring容器就能夠了。
<!-- 接收消息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean>
6.2.3. 完整的配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
6.3. 代碼測試
6.3.1. 發送消息
第一步:初始化一個spring容器
第二步:從容器中得到JMSTemplate對象。
第三步:從容器中得到一個Destination對象
第四步:使用JMSTemplate對象發送消息,須要知道Destination
@Test public void testQueueProducer() throws Exception { // 第一步:初始化一個spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第二步:從容器中得到JMSTemplate對象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:從容器中得到一個Destination對象 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate對象發送消息,須要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq test"); return textMessage; } }); }
6.3.2. 接收消息
Taotao-search-Service中接收消息。
第一步:把Activemq相關的jar包添加到工程中
第二步:建立一個MessageListener的實現類。
第三步:配置spring和Activemq整合。
第四步:測試代碼。
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
7. 添加商品同步索引庫
7.1. Producer
manager-service工程中發送消息。
當商品添加完成後發送一個TextMessage,包含一個商品id。search-service從消息隊列MQ中接受到推送過來的id值,根據id值去查詢數據庫,拿到對應的數據;利用查詢獲得的數據創建索引,添加到索引庫中。
@Override public TaotaoResult addItem(TbItem item, String desc) { // 一、生成商品id final long itemId = IDUtils.genItemId(); // 二、補全TbItem對象的屬性 item.setId(itemId); //商品狀態,1-正常,2-下架,3-刪除 item.setStatus((byte) 1); Date date = new Date(); item.setCreated(date); item.setUpdated(date); // 三、向商品表插入數據 itemMapper.insert(item); // 四、建立一個TbItemDesc對象 TbItemDesc itemDesc = new TbItemDesc(); // 五、補全TbItemDesc的屬性 itemDesc.setItemId(itemId); itemDesc.setItemDesc(desc); itemDesc.setCreated(date); itemDesc.setUpdated(date); // 六、向商品描述表插入數據 itemDescMapper.insert(itemDesc); //發送一個商品添加消息 jmsTemplate.send(topicDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(itemId + ""); return textMessage; } }); // 七、TaotaoResult.ok() return TaotaoResult.ok(); }
7.2. Consumer
7.2.1. 功能分析
一、接收消息。須要建立MessageListener接口的實現類。
二、取消息,取商品id。
三、根據商品id查詢數據庫。
四、建立一SolrInputDocument對象。
五、使用SolrServer對象寫入索引庫。
六、返回成功,返回TaotaoResult。
7.2.2. Dao層
根據商品id查詢商品信息。
映射文件:
<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem"> SELECT a.id, a.title, a.sell_point, a.price, a.image, b. NAME category_name, c.item_desc FROM tb_item a JOIN tb_item_cat b ON a.cid = b.id JOIN tb_item_desc c ON a.id = c.item_id WHERE a.status = 1 AND a.id=#{itemId} </select>
7.2.3. Service層
參數:商品ID
業務邏輯:
一、根據商品id查詢商品信息。
二、建立一SolrInputDocument對象。
三、使用SolrServer對象寫入索引庫。
四、返回成功,返回TaotaoResult。
返回值:TaotaoResult
public TaotaoResult addDocument(long itemId) throws Exception { // 一、根據商品id查詢商品信息。 SearchItem searchItem = searchItemMapper.getItemById(itemId); // 二、建立一SolrInputDocument對象。 SolrInputDocument document = new SolrInputDocument(); // 三、使用SolrServer對象寫入索引庫。 document.addField("id", searchItem.getId()); document.addField("item_title", searchItem.getTitle()); document.addField("item_sell_point", searchItem.getSell_point()); document.addField("item_price", searchItem.getPrice()); document.addField("item_image", searchItem.getImage()); document.addField("item_category_name", searchItem.getCategory_name()); document.addField("item_desc", searchItem.getItem_desc()); // 五、向索引庫中添加文檔。 solrServer.add(document); solrServer.commit(); // 四、返回成功,返回TaotaoResult。 return TaotaoResult.ok(); }
7.2.4. Listener
public class ItemChangeListener implements MessageListener { @Autowired private SearchItemServiceImpl searchItemServiceImpl; @Override public void onMessage(Message message) { try { TextMessage textMessage = null; Long itemId = null; //取商品id if (message instanceof TextMessage) { textMessage = (TextMessage) message; itemId = Long.parseLong(textMessage.getText()); } //向索引庫添加文檔 searchItemServiceImpl.addDocument(itemId); } catch (Exception e) { e.printStackTrace(); } } }
7.2.5. Spring配置監聽